Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ request adding CHANGELOG notes for breaking (!) changes and possibly other secti
- The (Before/After)CommitViewEvent has been removed.
- The (Before/After)CommitTableEvent has been removed.
- The `PolarisMetricsReporter.reportMetric()` method signature has been extended to include a `receivedTimestamp` parameter of type `java.time.Instant`.
- The `ExternalCatalogFactory.createCatalog()` and `createGenericCatalog()` method signatures have been extended to include a `catalogProperties` parameter of type `Map<String, String>` for passing through proxy and timeout settings to federated catalog HTTP clients.

### New Features

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@

import io.smallrye.common.annotation.Identifier;
import jakarta.enterprise.context.ApplicationScoped;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.hadoop.HadoopCatalog;
import org.apache.iceberg.rest.RESTUtil;
import org.apache.polaris.core.catalog.ExternalCatalogFactory;
import org.apache.polaris.core.catalog.GenericTableCatalog;
import org.apache.polaris.core.connection.AuthenticationParametersDpo;
Expand All @@ -31,19 +33,17 @@
import org.apache.polaris.core.connection.ConnectionType;
import org.apache.polaris.core.connection.hadoop.HadoopConnectionConfigInfoDpo;
import org.apache.polaris.core.credentials.PolarisCredentialManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** Factory class for creating a Hadoop catalog handle based on connection configuration. */
@ApplicationScoped
@Identifier(ConnectionType.HADOOP_FACTORY_IDENTIFIER)
public class HadoopFederatedCatalogFactory implements ExternalCatalogFactory {
private static final Logger LOGGER = LoggerFactory.getLogger(HadoopFederatedCatalogFactory.class);

@Override
public Catalog createCatalog(
ConnectionConfigInfoDpo connectionConfigInfoDpo,
PolarisCredentialManager polarisCredentialManager) {
PolarisCredentialManager polarisCredentialManager,
Map<String, String> catalogProperties) {
// Currently, Polaris supports Hadoop federation only via IMPLICIT authentication.
// Hence, prior to initializing the configuration, ensure that the catalog uses
// IMPLICIT authentication.
Expand All @@ -53,17 +53,26 @@ public Catalog createCatalog(
!= AuthenticationType.IMPLICIT.getCode()) {
throw new IllegalStateException("Hadoop federation only supports IMPLICIT authentication.");
}
Configuration conf = new Configuration();
String warehouse = ((HadoopConnectionConfigInfoDpo) connectionConfigInfoDpo).getWarehouse();
HadoopCatalog hadoopCatalog = new HadoopCatalog(conf, warehouse);
hadoopCatalog.initialize(
warehouse, connectionConfigInfoDpo.asIcebergCatalogProperties(polarisCredentialManager));
Map<String, String> mergedProperties =
RESTUtil.merge(
catalogProperties != null ? catalogProperties : Map.of(),
connectionConfigInfoDpo.asIcebergCatalogProperties(polarisCredentialManager));

// Use no-arg constructor + setConf + initialize pattern to avoid double initialization.
// The HadoopCatalog(conf, warehouse) constructor internally calls initialize(), so using
// it followed by another initialize() call would be redundant.
HadoopCatalog hadoopCatalog = new HadoopCatalog();
hadoopCatalog.setConf(new Configuration());
hadoopCatalog.initialize(warehouse, mergedProperties);
return hadoopCatalog;
}

@Override
public GenericTableCatalog createGenericCatalog(
ConnectionConfigInfoDpo connectionConfig, PolarisCredentialManager polarisCredentialManager) {
ConnectionConfigInfoDpo connectionConfig,
PolarisCredentialManager polarisCredentialManager,
Map<String, String> catalogProperties) {
// TODO implement
throw new UnsupportedOperationException(
"Generic table federation to this catalog is not supported.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@

import io.smallrye.common.annotation.Identifier;
import jakarta.enterprise.context.ApplicationScoped;
import java.util.Map;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.hive.HiveCatalog;
import org.apache.iceberg.rest.RESTUtil;
import org.apache.polaris.core.catalog.ExternalCatalogFactory;
import org.apache.polaris.core.catalog.GenericTableCatalog;
import org.apache.polaris.core.connection.AuthenticationParametersDpo;
Expand All @@ -30,19 +32,17 @@
import org.apache.polaris.core.connection.ConnectionType;
import org.apache.polaris.core.connection.hive.HiveConnectionConfigInfoDpo;
import org.apache.polaris.core.credentials.PolarisCredentialManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** Factory class for creating a Hive catalog handle based on connection configuration. */
@ApplicationScoped
@Identifier(ConnectionType.HIVE_FACTORY_IDENTIFIER)
public class HiveFederatedCatalogFactory implements ExternalCatalogFactory {
private static final Logger LOGGER = LoggerFactory.getLogger(HiveFederatedCatalogFactory.class);

@Override
public Catalog createCatalog(
ConnectionConfigInfoDpo connectionConfigInfoDpo,
PolarisCredentialManager polarisCredentialManager) {
PolarisCredentialManager polarisCredentialManager,
Map<String, String> catalogProperties) {
// Currently, Polaris supports Hive federation only via IMPLICIT authentication.
// Hence, prior to initializing the configuration, ensure that the catalog uses
// IMPLICIT authentication.
Expand All @@ -69,14 +69,19 @@ public Catalog createCatalog(
// Polaris could support federating to multiple LDAP based Hive metastores. Multiple
// Kerberos instances are not suitable because Kerberos ties a single identity to the server.
HiveCatalog hiveCatalog = new HiveCatalog();
hiveCatalog.initialize(
warehouse, connectionConfigInfoDpo.asIcebergCatalogProperties(polarisCredentialManager));
Map<String, String> mergedProperties =
RESTUtil.merge(
catalogProperties != null ? catalogProperties : Map.of(),
connectionConfigInfoDpo.asIcebergCatalogProperties(polarisCredentialManager));
hiveCatalog.initialize(warehouse, mergedProperties);
return hiveCatalog;
}

@Override
public GenericTableCatalog createGenericCatalog(
ConnectionConfigInfoDpo connectionConfig, PolarisCredentialManager polarisCredentialManager) {
ConnectionConfigInfoDpo connectionConfig,
PolarisCredentialManager polarisCredentialManager,
Map<String, String> catalogProperties) {
// TODO implement
throw new UnsupportedOperationException(
"Generic table federation to this catalog is not supported.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.polaris.core.catalog;

import java.util.Map;
import org.apache.iceberg.catalog.Catalog;
import org.apache.polaris.core.connection.ConnectionConfigInfoDpo;
import org.apache.polaris.core.credentials.PolarisCredentialManager;
Expand All @@ -36,21 +37,30 @@ public interface ExternalCatalogFactory {
* @param connectionConfig the connection configuration
* @param polarisCredentialManager the credential manager for generating connection credentials
* that Polaris uses to access external systems
* @param catalogProperties additional properties from the ExternalCatalog entity that should be
* passed through to the underlying catalog (e.g., rest.client.proxy.*, timeout settings).
* These are merged with lower precedence than connection config properties.
* @return the initialized catalog
* @throws IllegalStateException if the connection configuration is invalid
*/
Catalog createCatalog(
ConnectionConfigInfoDpo connectionConfig, PolarisCredentialManager polarisCredentialManager);
ConnectionConfigInfoDpo connectionConfig,
PolarisCredentialManager polarisCredentialManager,
Map<String, String> catalogProperties);

/**
* Creates a generic table catalog for the given connection configuration.
*
* @param connectionConfig the connection configuration
* @param polarisCredentialManager the credential manager for generating connection credentials
* that Polaris uses to access external systems
* @param catalogProperties additional properties from the ExternalCatalog entity that should be
* passed through to the underlying catalog
* @return the initialized catalog
* @throws IllegalStateException if the connection configuration is invalid
*/
GenericTableCatalog createGenericCatalog(
ConnectionConfigInfoDpo connectionConfig, PolarisCredentialManager polarisCredentialManager);
ConnectionConfigInfoDpo connectionConfig,
PolarisCredentialManager polarisCredentialManager,
Map<String, String> catalogProperties);
}
Original file line number Diff line number Diff line change
Expand Up @@ -98,10 +98,13 @@ protected void initializeCatalog() {
externalCatalogFactories.select(
Identifier.Literal.of(connectionType.getFactoryIdentifier()));
if (externalCatalogFactory.isResolvable()) {
// Pass through catalog properties (e.g., rest.client.proxy.*, timeout settings)
Map<String, String> catalogProperties = resolvedCatalogEntity.getPropertiesAsMap();
federatedCatalog =
externalCatalogFactory
.get()
.createGenericCatalog(connectionConfigInfoDpo, getPolarisCredentialManager());
.createGenericCatalog(
connectionConfigInfoDpo, getPolarisCredentialManager(), catalogProperties);
} else {
throw new UnsupportedOperationException(
"External catalog factory for type '" + connectionType + "' is unavailable.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -302,10 +302,14 @@ protected void initializeCatalog() {
externalCatalogFactories.select(
Identifier.Literal.of(connectionType.getFactoryIdentifier()));
if (externalCatalogFactory.isResolvable()) {
// Pass through catalog properties (e.g., rest.client.proxy.*, timeout settings)
// to the external catalog factory for configuration of the underlying HTTP client
Map<String, String> catalogProperties = resolvedCatalogEntity.getPropertiesAsMap();
federatedCatalog =
externalCatalogFactory
.get()
.createCatalog(connectionConfigInfoDpo, getPolarisCredentialManager());
.createCatalog(
connectionConfigInfoDpo, getPolarisCredentialManager(), catalogProperties);
} else {
throw new UnsupportedOperationException(
"External catalog factory for type '" + connectionType + "' is unavailable.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@

import io.smallrye.common.annotation.Identifier;
import jakarta.enterprise.context.ApplicationScoped;
import java.util.Map;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.SessionCatalog;
import org.apache.iceberg.rest.HTTPClient;
import org.apache.iceberg.rest.RESTCatalog;
import org.apache.iceberg.rest.RESTUtil;
import org.apache.polaris.core.catalog.ExternalCatalogFactory;
import org.apache.polaris.core.catalog.GenericTableCatalog;
import org.apache.polaris.core.connection.ConnectionConfigInfoDpo;
Expand All @@ -38,7 +40,9 @@ public class IcebergRESTExternalCatalogFactory implements ExternalCatalogFactory

@Override
public Catalog createCatalog(
ConnectionConfigInfoDpo connectionConfig, PolarisCredentialManager polarisCredentialManager) {
ConnectionConfigInfoDpo connectionConfig,
PolarisCredentialManager polarisCredentialManager,
Map<String, String> catalogProperties) {
if (!(connectionConfig instanceof IcebergRestConnectionConfigInfoDpo icebergConfig)) {
throw new IllegalArgumentException(
"Expected IcebergRestConnectionConfigInfoDpo but got: "
Expand All @@ -54,16 +58,23 @@ public Catalog createCatalog(
.uri(config.get(org.apache.iceberg.CatalogProperties.URI))
.build());

federatedCatalog.initialize(
icebergConfig.getRemoteCatalogName(),
connectionConfig.asIcebergCatalogProperties(polarisCredentialManager));
// Merge properties with precedence: connection config properties override catalog properties
// to ensure required settings like URI and authentication cannot be accidentally overwritten.
Map<String, String> mergedProperties =
RESTUtil.merge(
catalogProperties != null ? catalogProperties : Map.of(),
connectionConfig.asIcebergCatalogProperties(polarisCredentialManager));

federatedCatalog.initialize(icebergConfig.getRemoteCatalogName(), mergedProperties);

return federatedCatalog;
}

@Override
public GenericTableCatalog createGenericCatalog(
ConnectionConfigInfoDpo connectionConfig, PolarisCredentialManager polarisCredentialManager) {
ConnectionConfigInfoDpo connectionConfig,
PolarisCredentialManager polarisCredentialManager,
Map<String, String> catalogProperties) {
// TODO implement
throw new UnsupportedOperationException(
"Generic table federation to this catalog is not supported.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,33 @@ Refer to the [CLI documentation](../command-line-interface.md#catalogs) for deta
Grant catalog roles to principal roles the same way you do for internal catalogs so compute engines
receive tokens with access to the federated namespace.

## Outbound HTTP settings

Iceberg REST federation uses Iceberg's HTTP client. You can pass through HTTP settings by adding
catalog properties when creating or updating the external catalog (via `--property` or
`--set-property`).

Common settings include:

- `rest.client.proxy.hostname`
- `rest.client.proxy.port`
- `rest.client.proxy.username`
- `rest.client.proxy.password`
- `rest.client.connection-timeout-ms`
- `rest.client.socket-timeout-ms`

Example:

```bash
polaris catalogs update analytics_rest \
--set-property rest.client.proxy.hostname=proxy.example.com \
--set-property rest.client.proxy.port=3128 \
--set-property rest.client.connection-timeout-ms=30000 \
--set-property rest.client.socket-timeout-ms=120000
```

Connection config properties (URI and authentication) take precedence if the same keys are present.

## Operational notes

- **Connectivity checks:** Polaris does not lazily probe the remote service; catalog creation fails if
Expand Down