diff --git a/CHANGELOG.md b/CHANGELOG.md index 34ef7108a6..cb5584ee7d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -38,6 +38,7 @@ request adding CHANGELOG notes for breaking (!) changes and possibly other secti - The (Before/After)CommitViewEvent has been removed. - The (Before/After)CommitTableEvent has been removed. - The `PolarisMetricsReporter.reportMetric()` method signature has been extended to include a `receivedTimestamp` parameter of type `java.time.Instant`. +- The `ExternalCatalogFactory.createCatalog()` and `createGenericCatalog()` method signatures have been extended to include a `catalogProperties` parameter of type `Map` for passing through proxy and timeout settings to federated catalog HTTP clients. ### New Features diff --git a/extensions/federation/hadoop/src/main/java/org/apache/polaris/extensions/federation/hadoop/HadoopFederatedCatalogFactory.java b/extensions/federation/hadoop/src/main/java/org/apache/polaris/extensions/federation/hadoop/HadoopFederatedCatalogFactory.java index b2cc24ec1e..3f44f76f3c 100644 --- a/extensions/federation/hadoop/src/main/java/org/apache/polaris/extensions/federation/hadoop/HadoopFederatedCatalogFactory.java +++ b/extensions/federation/hadoop/src/main/java/org/apache/polaris/extensions/federation/hadoop/HadoopFederatedCatalogFactory.java @@ -20,9 +20,11 @@ import io.smallrye.common.annotation.Identifier; import jakarta.enterprise.context.ApplicationScoped; +import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.hadoop.HadoopCatalog; +import org.apache.iceberg.rest.RESTUtil; import org.apache.polaris.core.catalog.ExternalCatalogFactory; import org.apache.polaris.core.catalog.GenericTableCatalog; import org.apache.polaris.core.connection.AuthenticationParametersDpo; @@ -31,19 +33,17 @@ import org.apache.polaris.core.connection.ConnectionType; import org.apache.polaris.core.connection.hadoop.HadoopConnectionConfigInfoDpo; import org.apache.polaris.core.credentials.PolarisCredentialManager; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** Factory class for creating a Hadoop catalog handle based on connection configuration. */ @ApplicationScoped @Identifier(ConnectionType.HADOOP_FACTORY_IDENTIFIER) public class HadoopFederatedCatalogFactory implements ExternalCatalogFactory { - private static final Logger LOGGER = LoggerFactory.getLogger(HadoopFederatedCatalogFactory.class); @Override public Catalog createCatalog( ConnectionConfigInfoDpo connectionConfigInfoDpo, - PolarisCredentialManager polarisCredentialManager) { + PolarisCredentialManager polarisCredentialManager, + Map catalogProperties) { // Currently, Polaris supports Hadoop federation only via IMPLICIT authentication. // Hence, prior to initializing the configuration, ensure that the catalog uses // IMPLICIT authentication. @@ -53,17 +53,26 @@ public Catalog createCatalog( != AuthenticationType.IMPLICIT.getCode()) { throw new IllegalStateException("Hadoop federation only supports IMPLICIT authentication."); } - Configuration conf = new Configuration(); String warehouse = ((HadoopConnectionConfigInfoDpo) connectionConfigInfoDpo).getWarehouse(); - HadoopCatalog hadoopCatalog = new HadoopCatalog(conf, warehouse); - hadoopCatalog.initialize( - warehouse, connectionConfigInfoDpo.asIcebergCatalogProperties(polarisCredentialManager)); + Map mergedProperties = + RESTUtil.merge( + catalogProperties != null ? catalogProperties : Map.of(), + connectionConfigInfoDpo.asIcebergCatalogProperties(polarisCredentialManager)); + + // Use no-arg constructor + setConf + initialize pattern to avoid double initialization. + // The HadoopCatalog(conf, warehouse) constructor internally calls initialize(), so using + // it followed by another initialize() call would be redundant. + HadoopCatalog hadoopCatalog = new HadoopCatalog(); + hadoopCatalog.setConf(new Configuration()); + hadoopCatalog.initialize(warehouse, mergedProperties); return hadoopCatalog; } @Override public GenericTableCatalog createGenericCatalog( - ConnectionConfigInfoDpo connectionConfig, PolarisCredentialManager polarisCredentialManager) { + ConnectionConfigInfoDpo connectionConfig, + PolarisCredentialManager polarisCredentialManager, + Map catalogProperties) { // TODO implement throw new UnsupportedOperationException( "Generic table federation to this catalog is not supported."); diff --git a/extensions/federation/hive/src/main/java/org/apache/polaris/extensions/federation/hive/HiveFederatedCatalogFactory.java b/extensions/federation/hive/src/main/java/org/apache/polaris/extensions/federation/hive/HiveFederatedCatalogFactory.java index 939bc53847..7965402491 100644 --- a/extensions/federation/hive/src/main/java/org/apache/polaris/extensions/federation/hive/HiveFederatedCatalogFactory.java +++ b/extensions/federation/hive/src/main/java/org/apache/polaris/extensions/federation/hive/HiveFederatedCatalogFactory.java @@ -20,8 +20,10 @@ import io.smallrye.common.annotation.Identifier; import jakarta.enterprise.context.ApplicationScoped; +import java.util.Map; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.hive.HiveCatalog; +import org.apache.iceberg.rest.RESTUtil; import org.apache.polaris.core.catalog.ExternalCatalogFactory; import org.apache.polaris.core.catalog.GenericTableCatalog; import org.apache.polaris.core.connection.AuthenticationParametersDpo; @@ -30,19 +32,17 @@ import org.apache.polaris.core.connection.ConnectionType; import org.apache.polaris.core.connection.hive.HiveConnectionConfigInfoDpo; import org.apache.polaris.core.credentials.PolarisCredentialManager; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** Factory class for creating a Hive catalog handle based on connection configuration. */ @ApplicationScoped @Identifier(ConnectionType.HIVE_FACTORY_IDENTIFIER) public class HiveFederatedCatalogFactory implements ExternalCatalogFactory { - private static final Logger LOGGER = LoggerFactory.getLogger(HiveFederatedCatalogFactory.class); @Override public Catalog createCatalog( ConnectionConfigInfoDpo connectionConfigInfoDpo, - PolarisCredentialManager polarisCredentialManager) { + PolarisCredentialManager polarisCredentialManager, + Map catalogProperties) { // Currently, Polaris supports Hive federation only via IMPLICIT authentication. // Hence, prior to initializing the configuration, ensure that the catalog uses // IMPLICIT authentication. @@ -69,14 +69,19 @@ public Catalog createCatalog( // Polaris could support federating to multiple LDAP based Hive metastores. Multiple // Kerberos instances are not suitable because Kerberos ties a single identity to the server. HiveCatalog hiveCatalog = new HiveCatalog(); - hiveCatalog.initialize( - warehouse, connectionConfigInfoDpo.asIcebergCatalogProperties(polarisCredentialManager)); + Map mergedProperties = + RESTUtil.merge( + catalogProperties != null ? catalogProperties : Map.of(), + connectionConfigInfoDpo.asIcebergCatalogProperties(polarisCredentialManager)); + hiveCatalog.initialize(warehouse, mergedProperties); return hiveCatalog; } @Override public GenericTableCatalog createGenericCatalog( - ConnectionConfigInfoDpo connectionConfig, PolarisCredentialManager polarisCredentialManager) { + ConnectionConfigInfoDpo connectionConfig, + PolarisCredentialManager polarisCredentialManager, + Map catalogProperties) { // TODO implement throw new UnsupportedOperationException( "Generic table federation to this catalog is not supported."); diff --git a/polaris-core/src/main/java/org/apache/polaris/core/catalog/ExternalCatalogFactory.java b/polaris-core/src/main/java/org/apache/polaris/core/catalog/ExternalCatalogFactory.java index 6253b8809f..1614c61fcf 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/catalog/ExternalCatalogFactory.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/catalog/ExternalCatalogFactory.java @@ -18,6 +18,7 @@ */ package org.apache.polaris.core.catalog; +import java.util.Map; import org.apache.iceberg.catalog.Catalog; import org.apache.polaris.core.connection.ConnectionConfigInfoDpo; import org.apache.polaris.core.credentials.PolarisCredentialManager; @@ -36,11 +37,16 @@ public interface ExternalCatalogFactory { * @param connectionConfig the connection configuration * @param polarisCredentialManager the credential manager for generating connection credentials * that Polaris uses to access external systems + * @param catalogProperties additional properties from the ExternalCatalog entity that should be + * passed through to the underlying catalog (e.g., rest.client.proxy.*, timeout settings). + * These are merged with lower precedence than connection config properties. * @return the initialized catalog * @throws IllegalStateException if the connection configuration is invalid */ Catalog createCatalog( - ConnectionConfigInfoDpo connectionConfig, PolarisCredentialManager polarisCredentialManager); + ConnectionConfigInfoDpo connectionConfig, + PolarisCredentialManager polarisCredentialManager, + Map catalogProperties); /** * Creates a generic table catalog for the given connection configuration. @@ -48,9 +54,13 @@ Catalog createCatalog( * @param connectionConfig the connection configuration * @param polarisCredentialManager the credential manager for generating connection credentials * that Polaris uses to access external systems + * @param catalogProperties additional properties from the ExternalCatalog entity that should be + * passed through to the underlying catalog * @return the initialized catalog * @throws IllegalStateException if the connection configuration is invalid */ GenericTableCatalog createGenericCatalog( - ConnectionConfigInfoDpo connectionConfig, PolarisCredentialManager polarisCredentialManager); + ConnectionConfigInfoDpo connectionConfig, + PolarisCredentialManager polarisCredentialManager, + Map catalogProperties); } diff --git a/runtime/service/src/main/java/org/apache/polaris/service/catalog/generic/GenericTableCatalogHandler.java b/runtime/service/src/main/java/org/apache/polaris/service/catalog/generic/GenericTableCatalogHandler.java index 05d2d16b73..7cd906f9a3 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/catalog/generic/GenericTableCatalogHandler.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/catalog/generic/GenericTableCatalogHandler.java @@ -98,10 +98,13 @@ protected void initializeCatalog() { externalCatalogFactories.select( Identifier.Literal.of(connectionType.getFactoryIdentifier())); if (externalCatalogFactory.isResolvable()) { + // Pass through catalog properties (e.g., rest.client.proxy.*, timeout settings) + Map catalogProperties = resolvedCatalogEntity.getPropertiesAsMap(); federatedCatalog = externalCatalogFactory .get() - .createGenericCatalog(connectionConfigInfoDpo, getPolarisCredentialManager()); + .createGenericCatalog( + connectionConfigInfoDpo, getPolarisCredentialManager(), catalogProperties); } else { throw new UnsupportedOperationException( "External catalog factory for type '" + connectionType + "' is unavailable."); diff --git a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java index cf56dec4d4..3a4cbf8d3b 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java @@ -302,10 +302,14 @@ protected void initializeCatalog() { externalCatalogFactories.select( Identifier.Literal.of(connectionType.getFactoryIdentifier())); if (externalCatalogFactory.isResolvable()) { + // Pass through catalog properties (e.g., rest.client.proxy.*, timeout settings) + // to the external catalog factory for configuration of the underlying HTTP client + Map catalogProperties = resolvedCatalogEntity.getPropertiesAsMap(); federatedCatalog = externalCatalogFactory .get() - .createCatalog(connectionConfigInfoDpo, getPolarisCredentialManager()); + .createCatalog( + connectionConfigInfoDpo, getPolarisCredentialManager(), catalogProperties); } else { throw new UnsupportedOperationException( "External catalog factory for type '" + connectionType + "' is unavailable."); diff --git a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergRESTExternalCatalogFactory.java b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergRESTExternalCatalogFactory.java index de12bed4c1..d44dee8f85 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergRESTExternalCatalogFactory.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergRESTExternalCatalogFactory.java @@ -20,10 +20,12 @@ import io.smallrye.common.annotation.Identifier; import jakarta.enterprise.context.ApplicationScoped; +import java.util.Map; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.SessionCatalog; import org.apache.iceberg.rest.HTTPClient; import org.apache.iceberg.rest.RESTCatalog; +import org.apache.iceberg.rest.RESTUtil; import org.apache.polaris.core.catalog.ExternalCatalogFactory; import org.apache.polaris.core.catalog.GenericTableCatalog; import org.apache.polaris.core.connection.ConnectionConfigInfoDpo; @@ -38,7 +40,9 @@ public class IcebergRESTExternalCatalogFactory implements ExternalCatalogFactory @Override public Catalog createCatalog( - ConnectionConfigInfoDpo connectionConfig, PolarisCredentialManager polarisCredentialManager) { + ConnectionConfigInfoDpo connectionConfig, + PolarisCredentialManager polarisCredentialManager, + Map catalogProperties) { if (!(connectionConfig instanceof IcebergRestConnectionConfigInfoDpo icebergConfig)) { throw new IllegalArgumentException( "Expected IcebergRestConnectionConfigInfoDpo but got: " @@ -54,16 +58,23 @@ public Catalog createCatalog( .uri(config.get(org.apache.iceberg.CatalogProperties.URI)) .build()); - federatedCatalog.initialize( - icebergConfig.getRemoteCatalogName(), - connectionConfig.asIcebergCatalogProperties(polarisCredentialManager)); + // Merge properties with precedence: connection config properties override catalog properties + // to ensure required settings like URI and authentication cannot be accidentally overwritten. + Map mergedProperties = + RESTUtil.merge( + catalogProperties != null ? catalogProperties : Map.of(), + connectionConfig.asIcebergCatalogProperties(polarisCredentialManager)); + + federatedCatalog.initialize(icebergConfig.getRemoteCatalogName(), mergedProperties); return federatedCatalog; } @Override public GenericTableCatalog createGenericCatalog( - ConnectionConfigInfoDpo connectionConfig, PolarisCredentialManager polarisCredentialManager) { + ConnectionConfigInfoDpo connectionConfig, + PolarisCredentialManager polarisCredentialManager, + Map catalogProperties) { // TODO implement throw new UnsupportedOperationException( "Generic table federation to this catalog is not supported."); diff --git a/site/content/in-dev/unreleased/federation/iceberg-rest-federation.md b/site/content/in-dev/unreleased/federation/iceberg-rest-federation.md index 8318f45095..c3c4520afa 100644 --- a/site/content/in-dev/unreleased/federation/iceberg-rest-federation.md +++ b/site/content/in-dev/unreleased/federation/iceberg-rest-federation.md @@ -61,6 +61,33 @@ Refer to the [CLI documentation](../command-line-interface.md#catalogs) for deta Grant catalog roles to principal roles the same way you do for internal catalogs so compute engines receive tokens with access to the federated namespace. +## Outbound HTTP settings + +Iceberg REST federation uses Iceberg's HTTP client. You can pass through HTTP settings by adding +catalog properties when creating or updating the external catalog (via `--property` or +`--set-property`). + +Common settings include: + +- `rest.client.proxy.hostname` +- `rest.client.proxy.port` +- `rest.client.proxy.username` +- `rest.client.proxy.password` +- `rest.client.connection-timeout-ms` +- `rest.client.socket-timeout-ms` + +Example: + +```bash +polaris catalogs update analytics_rest \ + --set-property rest.client.proxy.hostname=proxy.example.com \ + --set-property rest.client.proxy.port=3128 \ + --set-property rest.client.connection-timeout-ms=30000 \ + --set-property rest.client.socket-timeout-ms=120000 +``` + +Connection config properties (URI and authentication) take precedence if the same keys are present. + ## Operational notes - **Connectivity checks:** Polaris does not lazily probe the remote service; catalog creation fails if