Skip to content

Conversation

@fightBoxing
Copy link

[FLINK] Implement Iceberg lookup join functionality

Problem

In production environments, there is a common need to join streaming data with dimension data stored in Iceberg tables. The dimension data needs to be periodically refreshed to ensure join accuracy. Currently, Flink lacks native support for Iceberg lookup joins, forcing users to work around this limitation or use alternative solutions.

Solution

This PR implements Iceberg lookup join functionality for Flink, enabling efficient joins between streaming data and Iceberg dimension tables. The implementation includes:

  • IcebergLookupCache: A cache mechanism for storing and managing lookup data with TTL support
  • IcebergLookupReader: A reader component for loading and refreshing lookup data from Iceberg tables
  • IcebergTableSource enhancement: Updated to support lookup join operations
  • Configuration options: New config options for customizing lookup join behavior (cache size, refresh interval, etc.)
  • Integration tests: Comprehensive test coverage (IcebergLookupJoinITCase)

Changes

  • Added IcebergLookupCache for efficient caching of lookup data
  • Added IcebergLookupReader for reading lookup data from Iceberg tables
  • Added IcebergLookupJoinITCase for integration testing
  • Updated IcebergTableSource to support lookup join operations
  • Added configuration options in FlinkConfigOptions for lookup join settings
  • Updated build.gradle files for v1.16, v1.17, and v1.18

Benefits

  • Enables real-time joins with Iceberg dimension tables
  • Reduces data latency by avoiding frequent full table scans
  • Improves performance through intelligent caching strategies
  • Seamlessly integrates with existing Flink lookup join framework
  • Supports periodic data refresh to ensure data freshness

Testing

  • Added integration tests to validate lookup join functionality
  • Tested cache refresh mechanisms
  • Verified correctness of join results
  • Ensures backward compatibility

Versions

This implementation is backported to Flink 1.16, 1.17, and 1.18 to support multiple Flink versions in production environments.

@nastra
Copy link
Contributor

nastra commented Jan 15, 2026

@fightBoxing it looks like this is targeting the 1.5.x branch, which is not maintained anymore. Can you please rebase and target main? Also make sure to target the latest Flink version first. In another PR you can then backport the changes to earlier Flink versions

@fightBoxing
Copy link
Author

fightBoxing commented Jan 30, 2026

hello,I have recreated the pull request and merged it into the main branch.
#15183

@mxm
Copy link
Contributor

mxm commented Jan 30, 2026

Hey @fightBoxing! Thanks for the PR. I've left a comment here: #15183 (review)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants