[FLINK] Implement Iceberg lookup join functionality, and source code and junit test code. #15183
+9,099
−0
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Problem
In production environments, there is a common need to join streaming data with dimension data stored in Iceberg tables. The dimension data needs to be periodically refreshed to ensure join accuracy. Currently, Flink lacks native support for Iceberg lookup joins, forcing users to work around this limitation or use alternative solutions.
Solution
This PR implements Iceberg lookup join functionality for Flink, enabling efficient joins between streaming data and Iceberg dimension tables. The implementation includes:
IcebergLookupCache: A cache mechanism for storing and managing lookup data with TTL support
IcebergLookupReader: A reader component for loading and refreshing lookup data from Iceberg tables
IcebergTableSource enhancement: Updated to support lookup join operations
Configuration options: New config options for customizing lookup join behavior (cache size, refresh interval, etc.)
Integration tests: Comprehensive test coverage (IcebergLookupJoinITCase)
Changes
Added IcebergLookupCache for efficient caching of lookup data
Added IcebergLookupReader for reading lookup data from Iceberg tables
Added IcebergLookupJoinITCase for integration testing
Updated IcebergTableSource to support lookup join operations
Added configuration options in FlinkConfigOptions for lookup join settings
Updated build.gradle files for v1.16, v1.17, and v1.18
Benefits
Enables real-time joins with Iceberg dimension tables
Reduces data latency by avoiding frequent full table scans
Improves performance through intelligent caching strategies
Seamlessly integrates with existing Flink lookup join framework
Supports periodic data refresh to ensure data freshness
Testing
Added integration tests to validate lookup join functionality
Tested cache refresh mechanisms
Verified correctness of join results
Ensures backward compatibility