From a294004fbba42ba5440c40bfa493297aaa3f0681 Mon Sep 17 00:00:00 2001 From: Yong-Jin Lee Date: Mon, 19 Jan 2026 14:26:48 -0500 Subject: [PATCH] Wire external catalog properties into REST client config Pass ExternalCatalog.properties through to federated catalog clients fix double init and add backward compatibility update changelog on ExternalCatalog properties Replace custom merge logic with RESTUtil.merge() in all federated catalog factories (IcebergREST, Hive, Hadoop) Inline logger in ExternalCatalogFactory interface Simplify tests to verify RESTUtil.merge() behavior we depend on Remove redundant RESTUtil.merge() tests Resolve commit conflicts. Simplify ExternalCatalogFactory to breaking change per review Remove deprecated 2-param createCatalog/createGenericCatalog methods and make the 3-param versions the only abstract methods. This follows Polaris evolution guidelines that Java interfaces may change at any time, and avoids runtime WARN noise for legacy implementations. Move CHANGELOG entry from Changes to Breaking changes section. --- CHANGELOG.md | 1 + .../hadoop/HadoopFederatedCatalogFactory.java | 27 ++++++++++++------- .../hive/HiveFederatedCatalogFactory.java | 19 ++++++++----- .../core/catalog/ExternalCatalogFactory.java | 14 ++++++++-- .../generic/GenericTableCatalogHandler.java | 5 +++- .../iceberg/IcebergCatalogHandler.java | 6 ++++- .../IcebergRESTExternalCatalogFactory.java | 21 +++++++++++---- .../federation/iceberg-rest-federation.md | 27 +++++++++++++++++++ 8 files changed, 95 insertions(+), 25 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 34ef7108a6..cb5584ee7d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -38,6 +38,7 @@ request adding CHANGELOG notes for breaking (!) changes and possibly other secti - The (Before/After)CommitViewEvent has been removed. - The (Before/After)CommitTableEvent has been removed. - The `PolarisMetricsReporter.reportMetric()` method signature has been extended to include a `receivedTimestamp` parameter of type `java.time.Instant`. +- The `ExternalCatalogFactory.createCatalog()` and `createGenericCatalog()` method signatures have been extended to include a `catalogProperties` parameter of type `Map` for passing through proxy and timeout settings to federated catalog HTTP clients. ### New Features diff --git a/extensions/federation/hadoop/src/main/java/org/apache/polaris/extensions/federation/hadoop/HadoopFederatedCatalogFactory.java b/extensions/federation/hadoop/src/main/java/org/apache/polaris/extensions/federation/hadoop/HadoopFederatedCatalogFactory.java index b2cc24ec1e..3f44f76f3c 100644 --- a/extensions/federation/hadoop/src/main/java/org/apache/polaris/extensions/federation/hadoop/HadoopFederatedCatalogFactory.java +++ b/extensions/federation/hadoop/src/main/java/org/apache/polaris/extensions/federation/hadoop/HadoopFederatedCatalogFactory.java @@ -20,9 +20,11 @@ import io.smallrye.common.annotation.Identifier; import jakarta.enterprise.context.ApplicationScoped; +import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.hadoop.HadoopCatalog; +import org.apache.iceberg.rest.RESTUtil; import org.apache.polaris.core.catalog.ExternalCatalogFactory; import org.apache.polaris.core.catalog.GenericTableCatalog; import org.apache.polaris.core.connection.AuthenticationParametersDpo; @@ -31,19 +33,17 @@ import org.apache.polaris.core.connection.ConnectionType; import org.apache.polaris.core.connection.hadoop.HadoopConnectionConfigInfoDpo; import org.apache.polaris.core.credentials.PolarisCredentialManager; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** Factory class for creating a Hadoop catalog handle based on connection configuration. */ @ApplicationScoped @Identifier(ConnectionType.HADOOP_FACTORY_IDENTIFIER) public class HadoopFederatedCatalogFactory implements ExternalCatalogFactory { - private static final Logger LOGGER = LoggerFactory.getLogger(HadoopFederatedCatalogFactory.class); @Override public Catalog createCatalog( ConnectionConfigInfoDpo connectionConfigInfoDpo, - PolarisCredentialManager polarisCredentialManager) { + PolarisCredentialManager polarisCredentialManager, + Map catalogProperties) { // Currently, Polaris supports Hadoop federation only via IMPLICIT authentication. // Hence, prior to initializing the configuration, ensure that the catalog uses // IMPLICIT authentication. @@ -53,17 +53,26 @@ public Catalog createCatalog( != AuthenticationType.IMPLICIT.getCode()) { throw new IllegalStateException("Hadoop federation only supports IMPLICIT authentication."); } - Configuration conf = new Configuration(); String warehouse = ((HadoopConnectionConfigInfoDpo) connectionConfigInfoDpo).getWarehouse(); - HadoopCatalog hadoopCatalog = new HadoopCatalog(conf, warehouse); - hadoopCatalog.initialize( - warehouse, connectionConfigInfoDpo.asIcebergCatalogProperties(polarisCredentialManager)); + Map mergedProperties = + RESTUtil.merge( + catalogProperties != null ? catalogProperties : Map.of(), + connectionConfigInfoDpo.asIcebergCatalogProperties(polarisCredentialManager)); + + // Use no-arg constructor + setConf + initialize pattern to avoid double initialization. + // The HadoopCatalog(conf, warehouse) constructor internally calls initialize(), so using + // it followed by another initialize() call would be redundant. + HadoopCatalog hadoopCatalog = new HadoopCatalog(); + hadoopCatalog.setConf(new Configuration()); + hadoopCatalog.initialize(warehouse, mergedProperties); return hadoopCatalog; } @Override public GenericTableCatalog createGenericCatalog( - ConnectionConfigInfoDpo connectionConfig, PolarisCredentialManager polarisCredentialManager) { + ConnectionConfigInfoDpo connectionConfig, + PolarisCredentialManager polarisCredentialManager, + Map catalogProperties) { // TODO implement throw new UnsupportedOperationException( "Generic table federation to this catalog is not supported."); diff --git a/extensions/federation/hive/src/main/java/org/apache/polaris/extensions/federation/hive/HiveFederatedCatalogFactory.java b/extensions/federation/hive/src/main/java/org/apache/polaris/extensions/federation/hive/HiveFederatedCatalogFactory.java index 939bc53847..7965402491 100644 --- a/extensions/federation/hive/src/main/java/org/apache/polaris/extensions/federation/hive/HiveFederatedCatalogFactory.java +++ b/extensions/federation/hive/src/main/java/org/apache/polaris/extensions/federation/hive/HiveFederatedCatalogFactory.java @@ -20,8 +20,10 @@ import io.smallrye.common.annotation.Identifier; import jakarta.enterprise.context.ApplicationScoped; +import java.util.Map; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.hive.HiveCatalog; +import org.apache.iceberg.rest.RESTUtil; import org.apache.polaris.core.catalog.ExternalCatalogFactory; import org.apache.polaris.core.catalog.GenericTableCatalog; import org.apache.polaris.core.connection.AuthenticationParametersDpo; @@ -30,19 +32,17 @@ import org.apache.polaris.core.connection.ConnectionType; import org.apache.polaris.core.connection.hive.HiveConnectionConfigInfoDpo; import org.apache.polaris.core.credentials.PolarisCredentialManager; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** Factory class for creating a Hive catalog handle based on connection configuration. */ @ApplicationScoped @Identifier(ConnectionType.HIVE_FACTORY_IDENTIFIER) public class HiveFederatedCatalogFactory implements ExternalCatalogFactory { - private static final Logger LOGGER = LoggerFactory.getLogger(HiveFederatedCatalogFactory.class); @Override public Catalog createCatalog( ConnectionConfigInfoDpo connectionConfigInfoDpo, - PolarisCredentialManager polarisCredentialManager) { + PolarisCredentialManager polarisCredentialManager, + Map catalogProperties) { // Currently, Polaris supports Hive federation only via IMPLICIT authentication. // Hence, prior to initializing the configuration, ensure that the catalog uses // IMPLICIT authentication. @@ -69,14 +69,19 @@ public Catalog createCatalog( // Polaris could support federating to multiple LDAP based Hive metastores. Multiple // Kerberos instances are not suitable because Kerberos ties a single identity to the server. HiveCatalog hiveCatalog = new HiveCatalog(); - hiveCatalog.initialize( - warehouse, connectionConfigInfoDpo.asIcebergCatalogProperties(polarisCredentialManager)); + Map mergedProperties = + RESTUtil.merge( + catalogProperties != null ? catalogProperties : Map.of(), + connectionConfigInfoDpo.asIcebergCatalogProperties(polarisCredentialManager)); + hiveCatalog.initialize(warehouse, mergedProperties); return hiveCatalog; } @Override public GenericTableCatalog createGenericCatalog( - ConnectionConfigInfoDpo connectionConfig, PolarisCredentialManager polarisCredentialManager) { + ConnectionConfigInfoDpo connectionConfig, + PolarisCredentialManager polarisCredentialManager, + Map catalogProperties) { // TODO implement throw new UnsupportedOperationException( "Generic table federation to this catalog is not supported."); diff --git a/polaris-core/src/main/java/org/apache/polaris/core/catalog/ExternalCatalogFactory.java b/polaris-core/src/main/java/org/apache/polaris/core/catalog/ExternalCatalogFactory.java index 6253b8809f..1614c61fcf 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/catalog/ExternalCatalogFactory.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/catalog/ExternalCatalogFactory.java @@ -18,6 +18,7 @@ */ package org.apache.polaris.core.catalog; +import java.util.Map; import org.apache.iceberg.catalog.Catalog; import org.apache.polaris.core.connection.ConnectionConfigInfoDpo; import org.apache.polaris.core.credentials.PolarisCredentialManager; @@ -36,11 +37,16 @@ public interface ExternalCatalogFactory { * @param connectionConfig the connection configuration * @param polarisCredentialManager the credential manager for generating connection credentials * that Polaris uses to access external systems + * @param catalogProperties additional properties from the ExternalCatalog entity that should be + * passed through to the underlying catalog (e.g., rest.client.proxy.*, timeout settings). + * These are merged with lower precedence than connection config properties. * @return the initialized catalog * @throws IllegalStateException if the connection configuration is invalid */ Catalog createCatalog( - ConnectionConfigInfoDpo connectionConfig, PolarisCredentialManager polarisCredentialManager); + ConnectionConfigInfoDpo connectionConfig, + PolarisCredentialManager polarisCredentialManager, + Map catalogProperties); /** * Creates a generic table catalog for the given connection configuration. @@ -48,9 +54,13 @@ Catalog createCatalog( * @param connectionConfig the connection configuration * @param polarisCredentialManager the credential manager for generating connection credentials * that Polaris uses to access external systems + * @param catalogProperties additional properties from the ExternalCatalog entity that should be + * passed through to the underlying catalog * @return the initialized catalog * @throws IllegalStateException if the connection configuration is invalid */ GenericTableCatalog createGenericCatalog( - ConnectionConfigInfoDpo connectionConfig, PolarisCredentialManager polarisCredentialManager); + ConnectionConfigInfoDpo connectionConfig, + PolarisCredentialManager polarisCredentialManager, + Map catalogProperties); } diff --git a/runtime/service/src/main/java/org/apache/polaris/service/catalog/generic/GenericTableCatalogHandler.java b/runtime/service/src/main/java/org/apache/polaris/service/catalog/generic/GenericTableCatalogHandler.java index 05d2d16b73..7cd906f9a3 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/catalog/generic/GenericTableCatalogHandler.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/catalog/generic/GenericTableCatalogHandler.java @@ -98,10 +98,13 @@ protected void initializeCatalog() { externalCatalogFactories.select( Identifier.Literal.of(connectionType.getFactoryIdentifier())); if (externalCatalogFactory.isResolvable()) { + // Pass through catalog properties (e.g., rest.client.proxy.*, timeout settings) + Map catalogProperties = resolvedCatalogEntity.getPropertiesAsMap(); federatedCatalog = externalCatalogFactory .get() - .createGenericCatalog(connectionConfigInfoDpo, getPolarisCredentialManager()); + .createGenericCatalog( + connectionConfigInfoDpo, getPolarisCredentialManager(), catalogProperties); } else { throw new UnsupportedOperationException( "External catalog factory for type '" + connectionType + "' is unavailable."); diff --git a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java index cf56dec4d4..3a4cbf8d3b 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java @@ -302,10 +302,14 @@ protected void initializeCatalog() { externalCatalogFactories.select( Identifier.Literal.of(connectionType.getFactoryIdentifier())); if (externalCatalogFactory.isResolvable()) { + // Pass through catalog properties (e.g., rest.client.proxy.*, timeout settings) + // to the external catalog factory for configuration of the underlying HTTP client + Map catalogProperties = resolvedCatalogEntity.getPropertiesAsMap(); federatedCatalog = externalCatalogFactory .get() - .createCatalog(connectionConfigInfoDpo, getPolarisCredentialManager()); + .createCatalog( + connectionConfigInfoDpo, getPolarisCredentialManager(), catalogProperties); } else { throw new UnsupportedOperationException( "External catalog factory for type '" + connectionType + "' is unavailable."); diff --git a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergRESTExternalCatalogFactory.java b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergRESTExternalCatalogFactory.java index de12bed4c1..d44dee8f85 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergRESTExternalCatalogFactory.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergRESTExternalCatalogFactory.java @@ -20,10 +20,12 @@ import io.smallrye.common.annotation.Identifier; import jakarta.enterprise.context.ApplicationScoped; +import java.util.Map; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.SessionCatalog; import org.apache.iceberg.rest.HTTPClient; import org.apache.iceberg.rest.RESTCatalog; +import org.apache.iceberg.rest.RESTUtil; import org.apache.polaris.core.catalog.ExternalCatalogFactory; import org.apache.polaris.core.catalog.GenericTableCatalog; import org.apache.polaris.core.connection.ConnectionConfigInfoDpo; @@ -38,7 +40,9 @@ public class IcebergRESTExternalCatalogFactory implements ExternalCatalogFactory @Override public Catalog createCatalog( - ConnectionConfigInfoDpo connectionConfig, PolarisCredentialManager polarisCredentialManager) { + ConnectionConfigInfoDpo connectionConfig, + PolarisCredentialManager polarisCredentialManager, + Map catalogProperties) { if (!(connectionConfig instanceof IcebergRestConnectionConfigInfoDpo icebergConfig)) { throw new IllegalArgumentException( "Expected IcebergRestConnectionConfigInfoDpo but got: " @@ -54,16 +58,23 @@ public Catalog createCatalog( .uri(config.get(org.apache.iceberg.CatalogProperties.URI)) .build()); - federatedCatalog.initialize( - icebergConfig.getRemoteCatalogName(), - connectionConfig.asIcebergCatalogProperties(polarisCredentialManager)); + // Merge properties with precedence: connection config properties override catalog properties + // to ensure required settings like URI and authentication cannot be accidentally overwritten. + Map mergedProperties = + RESTUtil.merge( + catalogProperties != null ? catalogProperties : Map.of(), + connectionConfig.asIcebergCatalogProperties(polarisCredentialManager)); + + federatedCatalog.initialize(icebergConfig.getRemoteCatalogName(), mergedProperties); return federatedCatalog; } @Override public GenericTableCatalog createGenericCatalog( - ConnectionConfigInfoDpo connectionConfig, PolarisCredentialManager polarisCredentialManager) { + ConnectionConfigInfoDpo connectionConfig, + PolarisCredentialManager polarisCredentialManager, + Map catalogProperties) { // TODO implement throw new UnsupportedOperationException( "Generic table federation to this catalog is not supported."); diff --git a/site/content/in-dev/unreleased/federation/iceberg-rest-federation.md b/site/content/in-dev/unreleased/federation/iceberg-rest-federation.md index 8318f45095..c3c4520afa 100644 --- a/site/content/in-dev/unreleased/federation/iceberg-rest-federation.md +++ b/site/content/in-dev/unreleased/federation/iceberg-rest-federation.md @@ -61,6 +61,33 @@ Refer to the [CLI documentation](../command-line-interface.md#catalogs) for deta Grant catalog roles to principal roles the same way you do for internal catalogs so compute engines receive tokens with access to the federated namespace. +## Outbound HTTP settings + +Iceberg REST federation uses Iceberg's HTTP client. You can pass through HTTP settings by adding +catalog properties when creating or updating the external catalog (via `--property` or +`--set-property`). + +Common settings include: + +- `rest.client.proxy.hostname` +- `rest.client.proxy.port` +- `rest.client.proxy.username` +- `rest.client.proxy.password` +- `rest.client.connection-timeout-ms` +- `rest.client.socket-timeout-ms` + +Example: + +```bash +polaris catalogs update analytics_rest \ + --set-property rest.client.proxy.hostname=proxy.example.com \ + --set-property rest.client.proxy.port=3128 \ + --set-property rest.client.connection-timeout-ms=30000 \ + --set-property rest.client.socket-timeout-ms=120000 +``` + +Connection config properties (URI and authentication) take precedence if the same keys are present. + ## Operational notes - **Connectivity checks:** Polaris does not lazily probe the remote service; catalog creation fails if