-
Notifications
You must be signed in to change notification settings - Fork 707
[#9508] Fix Hive SerDe incompatibility for Gravitino Flink connector created tables #9590
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
|
Added Apache Maven repository fallback to address intermittent CI failures |
|
@Pranaykarvi could you fix the CI by |
|
Hi @FANNG1, I’ve run Could you please take another look when you have a chance? Thanks! |
...r/flink/src/main/java/org/apache/gravitino/flink/connector/hive/HivePropertiesConverter.java
Outdated
Show resolved
Hide resolved
|
Hi @FANNG1, I’ve updated the implementation to derive the default Hive SerDe via I’ve rebased the branch, rerun Spotless, and verified the unit tests. Could you please take another look when you have a moment? Thanks! |
|
Hi @FANNG1, I’ve applied Spotless formatting cleanly to align with CI (spotlessJavaCheck). Thanks for your patience could you please take another look? |
After investigating how Flink handles serde-lib in https://github.com/apache/flink/blob/b2a260ac957dac3b6af5dc73684624dd36dc92ea/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveTableUtil.java#L502 and https://github.com/apache/flink/blob/b2a260ac957dac3b6af5dc73684624dd36dc92ea/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveTableUtil.java#L508 and you should reuse the hiveConf in GravitinoHiveCatalog because this hive conf is initialized from the hive conf dir this may need refactor Hive properties converter since we couldn't use a single instance for now , this issue seems far complicated, do you still want to continue this PR or I could continue the PR based on your change. |
Hi @FANNG1, Thanks a lot for the detailed analysis and for pointing out the exact Flink logic this is very helpful. You’re right that SerDe resolution in Flink involves multiple layers (format, table options, and HiveConf defaults), and reusing the HiveConf from GravitinoHiveCatalog would likely require a broader refactor than this PR originally scoped for. I’m happy to let you continue the PR based on my changes if that makes it cleaner and easier to align with Flink’s behavior. Please feel free to adjust or refactor HivePropertiesConverter as needed. Thanks again for taking this forward, and I’m happy to review or help test any follow-up changes. |
fce919f to
3f9f273
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR fixes a Hive SerDe incompatibility issue for tables created via the Gravitino Flink connector and introduces a significant architectural refactoring of properties converters.
Key changes:
- Splits the
PropertiesConverterinterface into two separate interfaces:CatalogPropertiesConverter(for catalog-level properties) andSchemaAndTablePropertiesConverter(for schema and table-level properties) - Introduces
HiveSchemaAndTablePropertiesConverterwhich implements intelligent SerDe resolution logic, ensuring Hive tables created through Flink default toLazySimpleSerDewhen no SerDe is explicitly specified - Adds comprehensive unit and integration tests to verify SerDe behavior and Flink ↔ Hive interoperability
Reviewed changes
Copilot reviewed 27 out of 27 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| CatalogPropertiesConverter.java | New interface extracted from PropertiesConverter for catalog-level property conversions |
| SchemaAndTablePropertiesConverter.java | New interface extracted from PropertiesConverter for schema and table-level property conversions |
| HiveCatalogPropertiesConverter.java | Renamed from HivePropertiesConverter; implements only CatalogPropertiesConverter |
| HiveSchemaAndTablePropertiesConverter.java | New class implementing SerDe resolution logic for Hive tables with fallback to defaults |
| GravitinoHiveCatalogFactory.java | Updated to create HiveSchemaAndTablePropertiesConverter with HiveConf |
| GravitinoHiveCatalog.java | Updated to use SchemaAndTablePropertiesConverter |
| BaseCatalog.java | Updated to use SchemaAndTablePropertiesConverter for table/schema operations |
| BaseCatalogFactory.java | Updated to use CatalogPropertiesConverter |
| GravitinoCatalogStore.java | Updated to use catalogPropertiesConverter() method |
| PaimonPropertiesConverter.java | Updated to implement both new interfaces |
| GravitinoPaimonCatalogFactory.java | Adds separate methods for catalog and schema/table converters |
| GravitinoPaimonCatalog.java | Updated to use SchemaAndTablePropertiesConverter |
| IcebergPropertiesConverter.java | Updated to implement both new interfaces |
| GravitinoIcebergCatalogFactory.java | Adds separate methods for catalog and schema/table converters |
| GravitinoIcebergCatalog.java | Updated to use SchemaAndTablePropertiesConverter |
| JdbcPropertiesConverter.java | Updated to implement both new interfaces |
| GravitinoJdbcCatalogFactory.java | Defines abstract schemaAndTablePropertiesConverter() method |
| GravitinoJdbcCatalog.java | Updated to use SchemaAndTablePropertiesConverter |
| GravitinoMysqlJdbcCatalogFactory.java | Implements both converter methods |
| GravitinoPostgresJdbcCatalogFactory.java | Implements both converter methods |
| TestHivePropertiesConverter.java | Updated to use HiveCatalogPropertiesConverter |
| TestHiveSchemaAndTablePropertiesConverter.java | New unit tests for SerDe resolution logic |
| FlinkHiveCatalogIT.java | Adds integration tests for SerDe behavior and native Flink Hive catalog interoperability |
| FlinkHiveKerberosClientIT.java | Updated import to use CatalogPropertiesConverter |
| FlinkEnvIT.java | Updated import to use CatalogPropertiesConverter |
| TestPaimonPropertiesConverter.java | Updated import to use CatalogPropertiesConverter |
| TestBaseCatalog.java | Updated mock to use SchemaAndTablePropertiesConverter |
...n/java/org/apache/gravitino/flink/connector/jdbc/mysql/GravitinoMysqlJdbcCatalogFactory.java
Show resolved
Hide resolved
...rg/apache/gravitino/flink/connector/jdbc/postgresql/GravitinoPostgresJdbcCatalogFactory.java
Show resolved
Hide resolved
...test/java/org/apache/gravitino/flink/connector/integration/test/hive/FlinkHiveCatalogIT.java
Show resolved
Hide resolved
|
@jerryshao @Pranaykarvi PTAL |
|
Thanks @FANNG1 for taking this forward and for the refactor to fully align with Flink’s behavior. |
What changes were proposed in this pull request?
This PR fixes an interoperability issue between the Gravitino Flink connector and
the native Flink Hive client.
Why are the changes needed?
Fix: #9508
Does this PR introduce any user-facing change?
no
How was this patch tested?
The patch was tested with both unit and integration tests:
Unit tests
HivePropertiesConverterto verify:Integration test
connector and could be read by native flink client.