From 72c9a23de7f8560fa2ca58c645028a05e5c1ea78 Mon Sep 17 00:00:00 2001 From: dmitriya Date: Fri, 30 Jan 2026 16:45:39 +0100 Subject: [PATCH] Add support for unique table locations via catalog property --- .../apache/iceberg/aws/glue/GlueTestBase.java | 11 ++ .../aws/glue/TestGlueCatalogTable.java | 19 ++- .../iceberg/aws/dynamodb/DynamoDbCatalog.java | 22 ++- .../apache/iceberg/aws/glue/GlueCatalog.java | 28 ++- .../aws/dynamodb/TestDynamoDbCatalog.java | 60 ++++++- .../iceberg/aws/glue/TestGlueCatalog.java | 22 +++ .../gcp/bigquery/TestBigQueryCatalog.java | 13 ++ .../org/apache/iceberg/CatalogProperties.java | 9 + .../iceberg/inmemory/InMemoryCatalog.java | 12 +- .../org/apache/iceberg/jdbc/JdbcCatalog.java | 9 +- .../org/apache/iceberg/util/LocationUtil.java | 24 +++ .../apache/iceberg/catalog/CatalogTests.java | 82 +++++++++ .../apache/iceberg/rest/TestRESTCatalog.java | 33 ++-- .../apache/iceberg/dell/ecs/EcsCatalog.java | 12 +- .../iceberg/dell/ecs/TestEcsCatalog.java | 34 +++- docs/docs/configuration.md | 1 + .../org/apache/iceberg/hive/HiveCatalog.java | 13 +- .../RESTCompatibilityKitCatalogTests.java | 8 + .../iceberg/spark/SparkCatalogConfig.java | 18 +- .../spark/sql/TestUniqueTableLocation.java | 160 ++++++++++++++++++ 20 files changed, 557 insertions(+), 33 deletions(-) create mode 100644 spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/sql/TestUniqueTableLocation.java diff --git a/aws/src/integration/java/org/apache/iceberg/aws/glue/GlueTestBase.java b/aws/src/integration/java/org/apache/iceberg/aws/glue/GlueTestBase.java index 65e37eba4cd3..b02537bf40b2 100644 --- a/aws/src/integration/java/org/apache/iceberg/aws/glue/GlueTestBase.java +++ b/aws/src/integration/java/org/apache/iceberg/aws/glue/GlueTestBase.java @@ -65,6 +65,7 @@ public class GlueTestBase { // iceberg static GlueCatalog glueCatalog; static GlueCatalog glueCatalogWithSkipNameValidation; + static GlueCatalog glueCatalogWithUniqueLocation; static Schema schema = new Schema(Types.NestedField.required(1, "c1", Types.StringType.get(), "c1")); @@ -105,6 +106,16 @@ public static void beforeClass() { GLUE, null, ImmutableMap.of()); + + glueCatalogWithUniqueLocation = new GlueCatalog(); + glueCatalogWithUniqueLocation.initialize( + CATALOG_NAME, + TEST_BUCKET_PATH, + awsProperties, + s3FileIOProperties, + GLUE, + null, + true /* uniqTableLocation */); } @AfterAll diff --git a/aws/src/integration/java/org/apache/iceberg/aws/glue/TestGlueCatalogTable.java b/aws/src/integration/java/org/apache/iceberg/aws/glue/TestGlueCatalogTable.java index 2c9459c5e36c..cb015b79fb9b 100644 --- a/aws/src/integration/java/org/apache/iceberg/aws/glue/TestGlueCatalogTable.java +++ b/aws/src/integration/java/org/apache/iceberg/aws/glue/TestGlueCatalogTable.java @@ -310,6 +310,22 @@ public void testRenameTable() { assertThat(renamedTable.currentSnapshot()).isEqualTo(table.currentSnapshot()); } + @Test + public void testCreateTableInUniqueLocation() { + String namespace = createNamespace(); + String tableName = createTable(namespace); + String newTableName = tableName + "_renamed"; + + glueCatalogWithUniqueLocation.renameTable( + TableIdentifier.of(namespace, tableName), TableIdentifier.of(namespace, newTableName)); + Table renamedTable = + glueCatalogWithUniqueLocation.loadTable(TableIdentifier.of(namespace, newTableName)); + createTable(namespace, tableName); + Table table = glueCatalogWithUniqueLocation.loadTable(TableIdentifier.of(namespace, tableName)); + + assertThat(renamedTable.location()).isNotEqualTo(table.location()); + } + @Test public void testRenameTableFailsToCreateNewTable() { String namespace = createNamespace(); @@ -743,7 +759,8 @@ public void testTableLevelS3Tags() { new AwsProperties(properties), new S3FileIOProperties(properties), GLUE, - null); + null, + false /* uniqTableLocation */); String namespace = createNamespace(); String tableName = getRandomName(); createTable(namespace, tableName); diff --git a/aws/src/main/java/org/apache/iceberg/aws/dynamodb/DynamoDbCatalog.java b/aws/src/main/java/org/apache/iceberg/aws/dynamodb/DynamoDbCatalog.java index 0c991af75076..7c75f99d6d69 100644 --- a/aws/src/main/java/org/apache/iceberg/aws/dynamodb/DynamoDbCatalog.java +++ b/aws/src/main/java/org/apache/iceberg/aws/dynamodb/DynamoDbCatalog.java @@ -53,6 +53,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.util.LocationUtil; +import org.apache.iceberg.util.PropertyUtil; import org.apache.iceberg.util.Tasks; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -112,6 +113,7 @@ public class DynamoDbCatalog extends BaseMetastoreCatalog private FileIO fileIO; private CloseableGroup closeableGroup; private Map catalogProperties; + private boolean uniqueTableLocation; public DynamoDbCatalog() {} @@ -123,12 +125,21 @@ public void initialize(String name, Map properties) { properties.get(CatalogProperties.WAREHOUSE_LOCATION), new AwsProperties(properties), AwsClientFactories.from(properties).dynamo(), - initializeFileIO(properties)); + initializeFileIO(properties), + PropertyUtil.propertyAsBoolean( + properties, + CatalogProperties.UNIQUE_TABLE_LOCATION, + CatalogProperties.UNIQUE_TABLE_LOCATION_DEFAULT)); } @VisibleForTesting void initialize( - String name, String path, AwsProperties properties, DynamoDbClient client, FileIO io) { + String name, + String path, + AwsProperties properties, + DynamoDbClient client, + FileIO io, + boolean uniqTableLocation) { Preconditions.checkArgument( !Strings.isNullOrEmpty(path), "Cannot initialize DynamoDbCatalog because warehousePath must not be null or empty"); @@ -138,6 +149,7 @@ void initialize( this.warehousePath = LocationUtil.stripTrailingSlash(path); this.dynamo = client; this.fileIO = io; + this.uniqueTableLocation = uniqTableLocation; this.closeableGroup = new CloseableGroup(); closeableGroup.addCloseable(dynamo); @@ -177,12 +189,12 @@ protected String defaultWarehouseLocation(TableIdentifier tableIdentifier) { } String defaultLocationCol = toPropertyCol(PROPERTY_DEFAULT_LOCATION); + String tableLocation = LocationUtil.tableLocation(tableIdentifier, uniqueTableLocation); if (response.item().containsKey(defaultLocationCol)) { - return String.format( - "%s/%s", response.item().get(defaultLocationCol).s(), tableIdentifier.name()); + return String.format("%s/%s", response.item().get(defaultLocationCol).s(), tableLocation); } else { return String.format( - "%s/%s.db/%s", warehousePath, tableIdentifier.namespace(), tableIdentifier.name()); + "%s/%s.db/%s", warehousePath, tableIdentifier.namespace(), tableLocation); } } diff --git a/aws/src/main/java/org/apache/iceberg/aws/glue/GlueCatalog.java b/aws/src/main/java/org/apache/iceberg/aws/glue/GlueCatalog.java index 47807a2b9f37..94e53cc1ab69 100644 --- a/aws/src/main/java/org/apache/iceberg/aws/glue/GlueCatalog.java +++ b/aws/src/main/java/org/apache/iceberg/aws/glue/GlueCatalog.java @@ -89,6 +89,7 @@ public class GlueCatalog extends BaseMetastoreCatalog private Object hadoopConf; private String catalogName; private String warehousePath; + private boolean uniqueTableLocation; private AwsProperties awsProperties; private S3FileIOProperties s3FileIOProperties; private LockManager lockManager; @@ -144,7 +145,11 @@ public void initialize(String name, Map properties) { new AwsProperties(properties), new S3FileIOProperties(properties), awsClientFactory.glue(), - initializeLockManager(properties)); + initializeLockManager(properties), + PropertyUtil.propertyAsBoolean( + properties, + CatalogProperties.UNIQUE_TABLE_LOCATION, + CatalogProperties.UNIQUE_TABLE_LOCATION_DEFAULT)); } private LockManager initializeLockManager(Map properties) { @@ -172,7 +177,17 @@ void initialize( LockManager lock, Map catalogProps) { this.catalogProperties = catalogProps; - initialize(name, path, properties, s3Properties, client, lock); + initialize( + name, + path, + properties, + s3Properties, + client, + lock, + PropertyUtil.propertyAsBoolean( + catalogProps, + CatalogProperties.UNIQUE_TABLE_LOCATION, + CatalogProperties.UNIQUE_TABLE_LOCATION_DEFAULT)); } @VisibleForTesting @@ -182,13 +197,15 @@ void initialize( AwsProperties properties, S3FileIOProperties s3Properties, GlueClient client, - LockManager lock) { + LockManager lock, + boolean uniqTableLocation) { this.catalogName = name; this.awsProperties = properties; this.s3FileIOProperties = s3Properties; this.warehousePath = Strings.isNullOrEmpty(path) ? null : LocationUtil.stripTrailingSlash(path); this.glue = client; this.lockManager = lock; + this.uniqueTableLocation = uniqTableLocation; this.closeableGroup = new CloseableGroup(); this.fileIOTracker = new FileIOTracker(); @@ -278,9 +295,10 @@ protected String defaultWarehouseLocation(TableIdentifier tableIdentifier) { tableIdentifier, awsProperties.glueCatalogSkipNameValidation())) .build()); String dbLocationUri = response.database().locationUri(); + String tableLocation = LocationUtil.tableLocation(tableIdentifier, uniqueTableLocation); if (dbLocationUri != null) { dbLocationUri = LocationUtil.stripTrailingSlash(dbLocationUri); - return String.format("%s/%s", dbLocationUri, tableIdentifier.name()); + return String.format("%s/%s", dbLocationUri, tableLocation); } ValidationException.check( @@ -292,7 +310,7 @@ protected String defaultWarehouseLocation(TableIdentifier tableIdentifier) { warehousePath, IcebergToGlueConverter.getDatabaseName( tableIdentifier, awsProperties.glueCatalogSkipNameValidation()), - tableIdentifier.name()); + tableLocation); } @Override diff --git a/aws/src/test/java/org/apache/iceberg/aws/dynamodb/TestDynamoDbCatalog.java b/aws/src/test/java/org/apache/iceberg/aws/dynamodb/TestDynamoDbCatalog.java index b602cea303d8..e172831a2428 100644 --- a/aws/src/test/java/org/apache/iceberg/aws/dynamodb/TestDynamoDbCatalog.java +++ b/aws/src/test/java/org/apache/iceberg/aws/dynamodb/TestDynamoDbCatalog.java @@ -49,14 +49,25 @@ public class TestDynamoDbCatalog { public void before() { dynamo = Mockito.mock(DynamoDbClient.class); dynamoCatalog = new DynamoDbCatalog(); - dynamoCatalog.initialize(CATALOG_NAME, WAREHOUSE_PATH, new AwsProperties(), dynamo, null); + dynamoCatalog.initialize( + CATALOG_NAME, + WAREHOUSE_PATH, + new AwsProperties(), + dynamo, + null, + false /* uniqTableLocation */); } @Test public void testConstructorWarehousePathWithEndSlash() { DynamoDbCatalog catalogWithSlash = new DynamoDbCatalog(); catalogWithSlash.initialize( - CATALOG_NAME, WAREHOUSE_PATH + "/", new AwsProperties(), dynamo, null); + CATALOG_NAME, + WAREHOUSE_PATH + "/", + new AwsProperties(), + dynamo, + null, + false /* uniqTableLocation */); Mockito.doReturn(GetItemResponse.builder().item(Maps.newHashMap()).build()) .when(dynamo) .getItem(any(GetItemRequest.class)); @@ -103,4 +114,49 @@ public void testDefaultWarehouseLocationNoNamespace() { .isInstanceOf(NoSuchNamespaceException.class) .hasMessageContaining("Cannot find default warehouse location:"); } + + @Test + public void testDefaultWarehouseLocationUniqueWithoutDbUri() throws Exception { + try (DynamoDbCatalog catalog = new DynamoDbCatalog()) { + catalog.initialize( + CATALOG_NAME, + WAREHOUSE_PATH, + new AwsProperties(), + dynamo, + null, + true /* uniqTableLocation */); + Mockito.doReturn(GetItemResponse.builder().item(Maps.newHashMap()).build()) + .when(dynamo) + .getItem(any(GetItemRequest.class)); + + String defaultWarehouseLocation = catalog.defaultWarehouseLocation(TABLE_IDENTIFIER); + assertThat(defaultWarehouseLocation).matches(WAREHOUSE_PATH + "/db.db/table-[a-z0-9]{32}"); + } + } + + @Test + public void testDefaultWarehouseLocationUniqueWithDbUri() throws Exception { + try (DynamoDbCatalog catalog = new DynamoDbCatalog()) { + catalog.initialize( + CATALOG_NAME, + WAREHOUSE_PATH, + new AwsProperties(), + dynamo, + null, + true /* uniqTableLocation */); + String dbUri = "s3://bucket2/db"; + Mockito.doReturn( + GetItemResponse.builder() + .item( + ImmutableMap.of( + toPropertyCol(DynamoDbCatalog.defaultLocationProperty()), + AttributeValue.builder().s(dbUri).build())) + .build()) + .when(dynamo) + .getItem(any(GetItemRequest.class)); + + String defaultWarehouseLocation = catalog.defaultWarehouseLocation(TABLE_IDENTIFIER); + assertThat(defaultWarehouseLocation).matches("s3://bucket2/db/table-[a-z0-9]{32}"); + } + } } diff --git a/aws/src/test/java/org/apache/iceberg/aws/glue/TestGlueCatalog.java b/aws/src/test/java/org/apache/iceberg/aws/glue/TestGlueCatalog.java index 2042948eb3c9..82f7e84d563b 100644 --- a/aws/src/test/java/org/apache/iceberg/aws/glue/TestGlueCatalog.java +++ b/aws/src/test/java/org/apache/iceberg/aws/glue/TestGlueCatalog.java @@ -194,6 +194,28 @@ public void testDefaultWarehouseLocationCustomCatalogId() { Mockito.argThat((GetDatabaseRequest req) -> req.catalogId().equals(catalogId))); } + @Test + public void testDefaultWarehouseLocationUnique() { + GlueCatalog catalog = new GlueCatalog(); + catalog.initialize( + CATALOG_NAME, + WAREHOUSE_PATH, + new AwsProperties(), + new S3FileIOProperties(), + glue, + LockManagers.defaultLockManager(), + true /* uniqTableLocation */); + + Mockito.doReturn( + GetDatabaseResponse.builder() + .database(Database.builder().name("db").locationUri("s3://bucket2/db").build()) + .build()) + .when(glue) + .getDatabase(Mockito.any(GetDatabaseRequest.class)); + String location = catalog.defaultWarehouseLocation(TableIdentifier.of("db", "table")); + assertThat(location).matches("s3://bucket2/db/table-[a-z0-9]{32}"); + } + @Test public void testListTables() { Mockito.doReturn( diff --git a/bigquery/src/test/java/org/apache/iceberg/gcp/bigquery/TestBigQueryCatalog.java b/bigquery/src/test/java/org/apache/iceberg/gcp/bigquery/TestBigQueryCatalog.java index cdeaa1ef1e63..23441d0db184 100644 --- a/bigquery/src/test/java/org/apache/iceberg/gcp/bigquery/TestBigQueryCatalog.java +++ b/bigquery/src/test/java/org/apache/iceberg/gcp/bigquery/TestBigQueryCatalog.java @@ -24,6 +24,7 @@ import static org.assertj.core.api.Assertions.assertThat; import java.io.File; +import java.io.IOException; import java.util.List; import java.util.Map; import org.apache.iceberg.CatalogProperties; @@ -169,6 +170,18 @@ public void testRenameTableMissingSourceTable() { super.testRenameTableMissingSourceTable(); } + @Disabled("BigQuery Metastore does not support rename tables") + @Test + public void createTableInUniqueLocation() { + super.createTableInUniqueLocation(); + } + + @Disabled("BigQuery Metastore does not support rename tables") + @Test + public void dropAfterRenameDoesntCorruptTable() throws IOException { + super.dropAfterRenameDoesntCorruptTable(); + } + @Test public void testIsValidIdentifierWithValidSingleLevelNamespace() { assertThat(catalog.isValidIdentifier(TableIdentifier.of("dataset1", "table1"))).isTrue(); diff --git a/core/src/main/java/org/apache/iceberg/CatalogProperties.java b/core/src/main/java/org/apache/iceberg/CatalogProperties.java index f35c90c4e80c..8c5dcb22dde7 100644 --- a/core/src/main/java/org/apache/iceberg/CatalogProperties.java +++ b/core/src/main/java/org/apache/iceberg/CatalogProperties.java @@ -158,6 +158,15 @@ private CatalogProperties() {} public static final String APP_NAME = "app-name"; public static final String USER = "user"; + /** + * Requests that the catalog provide unique locations for new tables. + * + *

Relevant only for catalogs which support unique table names. + */ + public static final String UNIQUE_TABLE_LOCATION = "unique-table-location"; + + public static final boolean UNIQUE_TABLE_LOCATION_DEFAULT = false; + public static final String AUTH_SESSION_TIMEOUT_MS = "auth.session-timeout-ms"; public static final long AUTH_SESSION_TIMEOUT_MS_DEFAULT = TimeUnit.HOURS.toMillis(1); diff --git a/core/src/main/java/org/apache/iceberg/inmemory/InMemoryCatalog.java b/core/src/main/java/org/apache/iceberg/inmemory/InMemoryCatalog.java index 985127d651b4..c2c432996132 100644 --- a/core/src/main/java/org/apache/iceberg/inmemory/InMemoryCatalog.java +++ b/core/src/main/java/org/apache/iceberg/inmemory/InMemoryCatalog.java @@ -48,6 +48,8 @@ import org.apache.iceberg.relocated.com.google.common.base.Objects; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.util.LocationUtil; +import org.apache.iceberg.util.PropertyUtil; import org.apache.iceberg.view.BaseMetastoreViewCatalog; import org.apache.iceberg.view.BaseViewOperations; import org.apache.iceberg.view.ViewMetadata; @@ -71,6 +73,7 @@ public class InMemoryCatalog extends BaseMetastoreViewCatalog private String catalogName; private String warehouseLocation; private CloseableGroup closeableGroup; + private boolean uniqueTableLocation; private Map catalogProperties; public InMemoryCatalog() { @@ -88,6 +91,11 @@ public String name() { public void initialize(String name, Map properties) { this.catalogName = name != null ? name : InMemoryCatalog.class.getSimpleName(); this.catalogProperties = ImmutableMap.copyOf(properties); + this.uniqueTableLocation = + PropertyUtil.propertyAsBoolean( + properties, + CatalogProperties.UNIQUE_TABLE_LOCATION, + CatalogProperties.UNIQUE_TABLE_LOCATION_DEFAULT); String warehouse = properties.getOrDefault(CatalogProperties.WAREHOUSE_LOCATION, ""); this.warehouseLocation = warehouse.replaceAll("/*$", ""); @@ -104,8 +112,8 @@ protected TableOperations newTableOps(TableIdentifier tableIdentifier) { @Override protected String defaultWarehouseLocation(TableIdentifier tableIdentifier) { - return SLASH.join( - defaultNamespaceLocation(tableIdentifier.namespace()), tableIdentifier.name()); + String tableLocation = LocationUtil.tableLocation(tableIdentifier, uniqueTableLocation); + return SLASH.join(defaultNamespaceLocation(tableIdentifier.namespace()), tableLocation); } private String defaultNamespaceLocation(Namespace namespace) { diff --git a/core/src/main/java/org/apache/iceberg/jdbc/JdbcCatalog.java b/core/src/main/java/org/apache/iceberg/jdbc/JdbcCatalog.java index 0c8fbe41df9e..3a06edef3cbb 100644 --- a/core/src/main/java/org/apache/iceberg/jdbc/JdbcCatalog.java +++ b/core/src/main/java/org/apache/iceberg/jdbc/JdbcCatalog.java @@ -88,6 +88,7 @@ public class JdbcCatalog extends BaseMetastoreViewCatalog private Object conf; private JdbcClientPool connections; private Map catalogProperties; + private boolean uniqueTableLocation; private final Function, FileIO> ioBuilder; private final Function, JdbcClientPool> clientPoolBuilder; private boolean initializeCatalogTables; @@ -120,6 +121,11 @@ public void initialize(String name, Map properties) { this.warehouseLocation = LocationUtil.stripTrailingSlash(inputWarehouseLocation); this.catalogProperties = ImmutableMap.copyOf(properties); + this.uniqueTableLocation = + PropertyUtil.propertyAsBoolean( + properties, + CatalogProperties.UNIQUE_TABLE_LOCATION, + CatalogProperties.UNIQUE_TABLE_LOCATION_DEFAULT); if (name != null) { this.catalogName = name; @@ -280,7 +286,8 @@ protected ViewOperations newViewOps(TableIdentifier viewIdentifier) { @Override protected String defaultWarehouseLocation(TableIdentifier table) { - return SLASH.join(defaultNamespaceLocation(table.namespace()), table.name()); + String tableLocation = LocationUtil.tableLocation(table, uniqueTableLocation); + return SLASH.join(defaultNamespaceLocation(table.namespace()), tableLocation); } @Override diff --git a/core/src/main/java/org/apache/iceberg/util/LocationUtil.java b/core/src/main/java/org/apache/iceberg/util/LocationUtil.java index 400307149238..4c0d401c74b9 100644 --- a/core/src/main/java/org/apache/iceberg/util/LocationUtil.java +++ b/core/src/main/java/org/apache/iceberg/util/LocationUtil.java @@ -18,6 +18,8 @@ */ package org.apache.iceberg.util; +import java.util.UUID; +import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.base.Strings; @@ -33,4 +35,26 @@ public static String stripTrailingSlash(String path) { } return result; } + + /** + * Returns a path component derived from the {@code tableIdentifier}, used as part of the table + * location URI. + * + *

If {@code useUniqueLocation} is {@code true}, the returned component will include a random + * UUID suffix. Otherwise, the plain table name is returned. + * + * @param tableIdentifier Iceberg table identifier + * @param useUniqueLocation whether to ensure uniqueness + * @return a string representing the table name component for a location URI + */ + public static String tableLocation(TableIdentifier tableIdentifier, boolean useUniqueLocation) { + Preconditions.checkArgument(null != tableIdentifier, "Invalid identifier: null"); + + if (useUniqueLocation) { + String uniqueSuffix = UUID.randomUUID().toString().replace("-", ""); + return String.format("%s-%s", tableIdentifier.name(), uniqueSuffix); + } else { + return tableIdentifier.name(); + } + } } diff --git a/core/src/test/java/org/apache/iceberg/catalog/CatalogTests.java b/core/src/test/java/org/apache/iceberg/catalog/CatalogTests.java index 833b2fb0b46f..9053f21ea112 100644 --- a/core/src/test/java/org/apache/iceberg/catalog/CatalogTests.java +++ b/core/src/test/java/org/apache/iceberg/catalog/CatalogTests.java @@ -72,6 +72,8 @@ import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.expressions.Literal; import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.InputFile; import org.apache.iceberg.metrics.CommitReport; import org.apache.iceberg.metrics.MetricsReport; import org.apache.iceberg.metrics.MetricsReporter; @@ -1025,6 +1027,86 @@ public void testRenameTable() { assertEmpty("Should not contain table after drop", catalog, NS); } + @Test + public void createTableInUniqueLocation() { + Map additionalProperties = + ImmutableMap.of(CatalogProperties.UNIQUE_TABLE_LOCATION, "true"); + C catalog = initCatalog("uniq_path_catalog", additionalProperties); + + if (requiresNamespaceCreate()) { + catalog.createNamespace(NS); + } + + catalog.createTable(TABLE, SCHEMA, PartitionSpec.unpartitioned()); + catalog.renameTable(TABLE, RENAMED_TABLE); + catalog.createTable(TABLE, SCHEMA, PartitionSpec.unpartitioned()); + + Table table = catalog.loadTable(TABLE); + Table renamedTable = catalog.loadTable(RENAMED_TABLE); + + assertThat(table.location()) + .as("Tables %s and %s have different location", TABLE, RENAMED_TABLE) + .isNotEqualTo(renamedTable.location()); + } + + @Test + public void dropAfterRenameDoesntCorruptTable() throws IOException { + C catalog = catalog(); + + if (requiresNamespaceCreate()) { + catalog.createNamespace(TABLE.namespace()); + } + + PartitionSpec spec = PartitionSpec.unpartitioned(); + + Table initialTable = catalog.createTable(TABLE, SCHEMA, spec); + String initialFilePath = initialTable.locationProvider().newDataLocation("data-a.parquet"); + DataFile dataFile = + DataFiles.builder(spec) + .withPath(initialFilePath) + .withFileSizeInBytes(10) + .withRecordCount(2) + .build(); + initialTable.io().newOutputFile(initialFilePath).create().close(); + initialTable.newAppend().appendFile(dataFile).commit(); + + catalog.renameTable(TABLE, RENAMED_TABLE); + + Table newTable = catalog.createTable(TABLE, SCHEMA, spec); + String newFilePath = newTable.locationProvider().newDataLocation("data-b.parquet"); + DataFile anotherFile = + DataFiles.builder(spec) + .withPath(newFilePath) + .withFileSizeInBytes(10) + .withRecordCount(2) + .build(); + newTable.io().newOutputFile(newFilePath).create().close(); + newTable.newAppend().appendFile(anotherFile).commit(); + + catalog.dropTable(RENAMED_TABLE, true); + + assertThat(catalog.tableExists(RENAMED_TABLE)) + .as("After PURGE, %s must not exist", RENAMED_TABLE) + .isFalse(); + assertThat(catalog.tableExists(TABLE)) + .as( + "After dropping the renamed table with PURGE, the recreated table with the original name (%s) must exist", + TABLE) + .isTrue(); + + Table table = catalog.loadTable(TABLE); + FileIO io = table.io(); + try (CloseableIterable tasks = table.newScan().planFiles()) { + tasks.forEach( + task -> { + InputFile file = io.newInputFile(task.file().location()); + assertThat(file.exists()) + .as("Table %s should remain unaffected by dropping %s", TABLE, RENAMED_TABLE) + .isTrue(); + }); + } + } + @Test public void testRenameTableMissingSourceTable() { C catalog = catalog(); diff --git a/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java b/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java index 2d569ae8264b..b46a0af25d30 100644 --- a/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java +++ b/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java @@ -89,6 +89,7 @@ import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.rest.HTTPRequest.HTTPMethod; import org.apache.iceberg.rest.RESTCatalogProperties.SnapshotMode; @@ -132,6 +133,8 @@ public class TestRESTCatalog extends CatalogTests { ImmutableMap.of( RESTCatalogProperties.NAMESPACE_SEPARATOR, RESTCatalogAdapter.NAMESPACE_SEPARATOR_URLENCODED_UTF_8)); + private static final Set BACKEND_PROPS = + ImmutableSet.of(CatalogProperties.UNIQUE_TABLE_LOCATION); private static final class IdempotentEnv { private final TableIdentifier ident; @@ -264,12 +267,7 @@ protected T execute( @BeforeEach public void createCatalog() throws Exception { - File warehouse = temp.toFile(); - this.backendCatalog = new InMemoryCatalog(); - this.backendCatalog.initialize( - "in-memory", - ImmutableMap.of(CatalogProperties.WAREHOUSE_LOCATION, warehouse.getAbsolutePath())); HTTPHeaders catalogHeaders = HTTPHeaders.of( @@ -304,6 +302,7 @@ public void createCatalog() throws Exception { @Override protected RESTCatalog initCatalog(String catalogName, Map additionalProperties) { + File warehouse = temp.toFile(); Configuration conf = new Configuration(); RESTCatalog catalog = @@ -339,12 +338,26 @@ protected RESTCatalog initCatalog(String catalogName, Map additi "catalog:12345", "header.test-header", "test-value"); - catalog.initialize( - catalogName, - ImmutableMap.builder() - .putAll(properties) - .putAll(additionalProperties) + + var props = ImmutableMap.builder().putAll(properties); + var backendProps = ImmutableMap.builder(); + + additionalProperties.forEach( + (k, v) -> { + if (BACKEND_PROPS.contains(k)) { + backendProps.put(k, v); + } else { + props.put(k, v); + } + }); + + backendCatalog.initialize( + "in-memory", + backendProps + .put(CatalogProperties.WAREHOUSE_LOCATION, warehouse.getAbsolutePath()) .build()); + + catalog.initialize(catalogName, props.build()); return catalog; } diff --git a/dell/src/main/java/org/apache/iceberg/dell/ecs/EcsCatalog.java b/dell/src/main/java/org/apache/iceberg/dell/ecs/EcsCatalog.java index 07ad68365837..bb8150d16dca 100644 --- a/dell/src/main/java/org/apache/iceberg/dell/ecs/EcsCatalog.java +++ b/dell/src/main/java/org/apache/iceberg/dell/ecs/EcsCatalog.java @@ -59,6 +59,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.io.ByteStreams; import org.apache.iceberg.util.LocationUtil; +import org.apache.iceberg.util.PropertyUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -86,6 +87,7 @@ public class EcsCatalog extends BaseMetastoreCatalog private FileIO fileIO; private CloseableGroup closeableGroup; private Map catalogProperties; + private boolean uniqueTableLocation; /** * No-arg constructor to load the catalog dynamically. @@ -102,6 +104,12 @@ public void initialize(String name, Map properties) { !Strings.isNullOrEmpty(inputWarehouseLocation), "Cannot initialize EcsCatalog because warehousePath must not be null or empty"); + this.uniqueTableLocation = + PropertyUtil.propertyAsBoolean( + properties, + CatalogProperties.UNIQUE_TABLE_LOCATION, + CatalogProperties.UNIQUE_TABLE_LOCATION_DEFAULT); + this.catalogName = name; this.warehouseLocation = new EcsURI(LocationUtil.stripTrailingSlash(inputWarehouseLocation)); this.client = DellClientFactories.from(properties).ecsS3(); @@ -136,8 +144,8 @@ protected TableOperations newTableOps(TableIdentifier tableIdentifier) { @Override protected String defaultWarehouseLocation(TableIdentifier tableIdentifier) { - return String.format( - "%s/%s", namespacePrefix(tableIdentifier.namespace()), tableIdentifier.name()); + String tableLocation = LocationUtil.tableLocation(tableIdentifier, uniqueTableLocation); + return String.format("%s/%s", namespacePrefix(tableIdentifier.namespace()), tableLocation); } /** Iterate all table objects with the namespace prefix. */ diff --git a/dell/src/test/java/org/apache/iceberg/dell/ecs/TestEcsCatalog.java b/dell/src/test/java/org/apache/iceberg/dell/ecs/TestEcsCatalog.java index 4714d37d72b9..82549f1eccd9 100644 --- a/dell/src/test/java/org/apache/iceberg/dell/ecs/TestEcsCatalog.java +++ b/dell/src/test/java/org/apache/iceberg/dell/ecs/TestEcsCatalog.java @@ -55,11 +55,17 @@ public class TestEcsCatalog { @BeforeEach public void before() { - ecsCatalog = new EcsCatalog(); + ecsCatalog = createCatalog("test", ImmutableMap.of()); + } + + private EcsCatalog createCatalog(String name, Map additionalProperties) { + EcsCatalog catalog = new EcsCatalog(); Map properties = Maps.newHashMap(); properties.put(CatalogProperties.WAREHOUSE_LOCATION, new EcsURI(rule.bucket(), "").location()); properties.putAll(rule.clientProperties()); - ecsCatalog.initialize("test", properties); + properties.putAll(additionalProperties); + catalog.initialize(name, properties); + return catalog; } @AfterEach @@ -172,6 +178,30 @@ public void testRenameTable() { .isTrue(); } + @Test + public void testCreateTableInUniqueLocation() throws Exception { + try (EcsCatalog catalog = + createCatalog( + "unique_location_catalog", + ImmutableMap.of(CatalogProperties.UNIQUE_TABLE_LOCATION, "true"))) { + + Namespace ns = Namespace.of("a"); + TableIdentifier tableIdent = TableIdentifier.of(ns, "t1"); + TableIdentifier renamedIdent = TableIdentifier.of(ns, "t2"); + + catalog.createNamespace(ns); + catalog.createTable(tableIdent, SCHEMA); + catalog.renameTable(tableIdent, renamedIdent); + + Table table = catalog.createTable(tableIdent, SCHEMA); + Table renamedTable = catalog.loadTable(renamedIdent); + + assertThat(table.location()) + .as("Should have a different table location") + .isNotEqualTo(renamedTable.location()); + } + } + @Test public void testRegisterTable() { TableIdentifier identifier = TableIdentifier.of("a", "t1"); diff --git a/docs/docs/configuration.md b/docs/docs/configuration.md index 3f730a4f4c13..7376945d64b6 100644 --- a/docs/docs/configuration.md +++ b/docs/docs/configuration.md @@ -147,6 +147,7 @@ Iceberg catalogs support using catalog properties to configure catalog behaviors | cache-enabled | true | Whether to cache catalog entries | | cache.expiration-interval-ms | 30000 | How long catalog entries are locally cached, in milliseconds; 0 disables caching, negative values disable expiration | | metrics-reporter-impl | org.apache.iceberg.metrics.LoggingMetricsReporter | Custom `MetricsReporter` implementation to use in a catalog. See the [Metrics reporting](metrics-reporting.md) section for additional details | +| unique-table-location | false | Whether to give each new table a unique storage path. Only for catalogs that support table renaming | | encryption.kms-impl | null | a custom `KeyManagementClient` implementation to use in a catalog for interactions with KMS (key management service). See the [Encryption](encryption.md) document for additional details | `HadoopCatalog` and `HiveCatalog` can access the properties in their constructors. diff --git a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java index a6264c67fd88..34c43115324a 100644 --- a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java +++ b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java @@ -65,6 +65,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.util.LocationUtil; +import org.apache.iceberg.util.PropertyUtil; import org.apache.iceberg.view.BaseMetastoreViewCatalog; import org.apache.iceberg.view.View; import org.apache.iceberg.view.ViewBuilder; @@ -94,6 +95,7 @@ public class HiveCatalog extends BaseMetastoreViewCatalog private KeyManagementClient keyManagementClient; private ClientPool clients; private boolean listAllTables = false; + private boolean uniqueTableLocation; private Map catalogProperties; public HiveCatalog() {} @@ -130,6 +132,12 @@ public void initialize(String inputName, Map properties) { this.keyManagementClient = EncryptionUtil.createKmsClient(properties); } + this.uniqueTableLocation = + PropertyUtil.propertyAsBoolean( + properties, + CatalogProperties.UNIQUE_TABLE_LOCATION, + CatalogProperties.UNIQUE_TABLE_LOCATION_DEFAULT); + this.clients = new CachedClientPool(conf, properties); } @@ -707,13 +715,14 @@ protected String defaultWarehouseLocation(TableIdentifier tableIdentifier) { // - Create the metadata in HMS, and this way committing the changes // Create a new location based on the namespace / database if it is set on database level + String tableLocation = LocationUtil.tableLocation(tableIdentifier, uniqueTableLocation); try { Database databaseData = clients.run(client -> client.getDatabase(tableIdentifier.namespace().levels()[0])); if (databaseData.getLocationUri() != null) { // If the database location is set use it as a base. String databaseLocation = LocationUtil.stripTrailingSlash(databaseData.getLocationUri()); - return String.format("%s/%s", databaseLocation, tableIdentifier.name()); + return String.format("%s/%s", databaseLocation, tableLocation); } } catch (NoSuchObjectException e) { @@ -730,7 +739,7 @@ protected String defaultWarehouseLocation(TableIdentifier tableIdentifier) { // Otherwise, stick to the {WAREHOUSE_DIR}/{DB_NAME}.db/{TABLE_NAME} path String databaseLocation = databaseLocation(tableIdentifier.namespace().levels()[0]); - return String.format("%s/%s", databaseLocation, tableIdentifier.name()); + return String.format("%s/%s", databaseLocation, tableLocation); } private String databaseLocation(String databaseName) { diff --git a/open-api/src/test/java/org/apache/iceberg/rest/RESTCompatibilityKitCatalogTests.java b/open-api/src/test/java/org/apache/iceberg/rest/RESTCompatibilityKitCatalogTests.java index 87ec90663db2..795e3ff862ea 100644 --- a/open-api/src/test/java/org/apache/iceberg/rest/RESTCompatibilityKitCatalogTests.java +++ b/open-api/src/test/java/org/apache/iceberg/rest/RESTCompatibilityKitCatalogTests.java @@ -26,6 +26,8 @@ import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -97,4 +99,10 @@ protected boolean supportsNamesWithDot() { return PropertyUtil.propertyAsBoolean( restCatalog.properties(), RESTCompatibilityKitSuite.RCK_SUPPORTS_NAMES_WITH_DOT, false); } + + @Disabled("RESTServerExtension isn’t configurable per test") + @Test + public void createTableInUniqueLocation() { + super.createTableInUniqueLocation(); + } } diff --git a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/SparkCatalogConfig.java b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/SparkCatalogConfig.java index a9fbee2fc262..b20c87619ed8 100644 --- a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/SparkCatalogConfig.java +++ b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/SparkCatalogConfig.java @@ -65,7 +65,23 @@ public enum SparkCatalogConfig { SPARK_WITH_HIVE_VIEWS( "spark_hive_with_views", SparkCatalog.class.getName(), - ImmutableMap.of("type", "hive", "default-namespace", "default", "cache-enabled", "false")); + ImmutableMap.of("type", "hive", "default-namespace", "default", "cache-enabled", "false")), + SPARK_SESSION_WITH_UNIQUE_LOCATION( + "spark_catalog", + SparkSessionCatalog.class.getName(), + ImmutableMap.of( + "type", "hive", + "default-namespace", "default", + "parquet-enabled", "true", + "unique-table-location", "true", + "cache-enabled", "false")), + HIVE_WITH_UNIQUE_LOCATION( + "hive_with_unique_location", + SparkCatalog.class.getName(), + ImmutableMap.of( + "type", "hive", + "default-namespace", "default", + "unique-table-location", "true")); private final String catalogName; private final String implementation; diff --git a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/sql/TestUniqueTableLocation.java b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/sql/TestUniqueTableLocation.java new file mode 100644 index 000000000000..3581780ac71e --- /dev/null +++ b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/sql/TestUniqueTableLocation.java @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.sql; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.UUID; +import org.apache.iceberg.ParameterizedTestExtension; +import org.apache.iceberg.Parameters; +import org.apache.iceberg.Table; +import org.apache.iceberg.actions.DeleteOrphanFiles; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.exceptions.NotFoundException; +import org.apache.iceberg.spark.CatalogTestBase; +import org.apache.iceberg.spark.SparkCatalogConfig; +import org.apache.iceberg.spark.actions.SparkActions; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.TestTemplate; +import org.junit.jupiter.api.extension.ExtendWith; + +@ExtendWith(ParameterizedTestExtension.class) +public class TestUniqueTableLocation extends CatalogTestBase { + + private String renamedTableName; + private TableIdentifier renamedIdent; + + @Parameters(name = "catalogName = {0}, implementation = {1}, config = {2}") + protected static Object[][] parameters() { + return new Object[][] { + { + SparkCatalogConfig.HIVE_WITH_UNIQUE_LOCATION.catalogName(), + SparkCatalogConfig.HIVE_WITH_UNIQUE_LOCATION.implementation(), + SparkCatalogConfig.HIVE_WITH_UNIQUE_LOCATION.properties() + }, + { + SparkCatalogConfig.SPARK_SESSION_WITH_UNIQUE_LOCATION.catalogName(), + SparkCatalogConfig.SPARK_SESSION_WITH_UNIQUE_LOCATION.implementation(), + SparkCatalogConfig.SPARK_SESSION_WITH_UNIQUE_LOCATION.properties() + }, + }; + } + + @BeforeEach + public void initTableName() { + renamedTableName = tableName("table_2"); + renamedIdent = TableIdentifier.of(Namespace.of("default"), "table_2"); + } + + @AfterEach + public void dropTestTable() { + try { + sql("DROP TABLE IF EXISTS %s", tableName); + sql("DROP TABLE IF EXISTS %s", renamedTableName); + } catch (NotFoundException ignore) { + // Swallow FNF exception in case of corrupted table so test failure reason is clearer + } + } + + @TestTemplate + public void testNoCollisionAfterRename() { + assertThat(validationCatalog.tableExists(tableIdent)) + .as("%s should not exist", tableIdent) + .isFalse(); + assertThat(validationCatalog.tableExists(renamedIdent)) + .as("%s should not exist", renamedIdent) + .isFalse(); + + sql("CREATE TABLE %s (id BIGINT NOT NULL, data STRING) USING iceberg", tableName); + + sql("ALTER TABLE %s RENAME TO %s", tableName, renamedTableName); + + sql("CREATE TABLE %s (id BIGINT NOT NULL, data STRING) USING iceberg", tableName); + + Table table = validationCatalog.loadTable(tableIdent); + Table renamedTable = validationCatalog.loadTable(renamedIdent); + + assertThat(table.location()) + .as( + "After rename+recreate, %s and %s must have different locations", + tableName, renamedTableName) + .isNotEqualTo(renamedTable.location()); + } + + @TestTemplate + public void testDropDoesntCorruptTable() { + assertThat(validationCatalog.tableExists(tableIdent)) + .as("%s should not exist", tableIdent) + .isFalse(); + assertThat(validationCatalog.tableExists(renamedIdent)) + .as("%s should not exist", renamedIdent) + .isFalse(); + + sql("CREATE TABLE %s (id BIGINT NOT NULL, data STRING) USING iceberg", tableName); + sql("INSERT INTO %s VALUES(0, '%s')", tableName, UUID.randomUUID().toString()); + + sql("ALTER TABLE %s RENAME TO %s", tableName, renamedTableName); + + sql("CREATE TABLE %s (id BIGINT NOT NULL, data STRING) USING iceberg", tableName); + sql("INSERT INTO %s VALUES(1, '%s')", tableName, UUID.randomUUID().toString()); + + sql("DROP TABLE %s PURGE", renamedTableName); + + assertThat(validationCatalog.tableExists(renamedIdent)) + .as("After PURGE, %s must not exist", renamedIdent) + .isFalse(); + + assertThat(scalarSql("SELECT count(*) FROM %s", tableName)) + .as("Table %s should remain unaffected by dropping %s", tableName, renamedTableName) + .isEqualTo(1L); + } + + @TestTemplate + public void testOrphanCleanupDoesntCorruptTable() { + SparkActions actions = SparkActions.get(); + + assertThat(validationCatalog.tableExists(tableIdent)) + .as("%s should not exist", tableIdent) + .isFalse(); + assertThat(validationCatalog.tableExists(renamedIdent)) + .as("%s should not exist", renamedIdent) + .isFalse(); + + sql("CREATE TABLE %s (id BIGINT NOT NULL, data STRING) USING iceberg", tableName); + sql("INSERT INTO %s VALUES(0, '%s')", tableName, UUID.randomUUID().toString()); + + sql("ALTER TABLE %s RENAME TO %s", tableName, renamedTableName); + + sql("CREATE TABLE %s (id BIGINT NOT NULL, data STRING) USING iceberg", tableName); + sql("INSERT INTO %s VALUES(1, '%s')", tableName, UUID.randomUUID().toString()); + + Table table = validationCatalog.loadTable(tableIdent); + assertThat(table).as("Should load %s", table).isNotNull(); + + long cutoff = System.currentTimeMillis() + 1; + DeleteOrphanFiles.Result result = actions.deleteOrphanFiles(table).olderThan(cutoff).execute(); + assertThat(result.orphanFileLocations()).as("Should not touch any files").isEmpty(); + + assertThat(scalarSql("SELECT count(*) FROM %s", renamedTableName)) + .as("Table %s should remain unaffected by %s table cleanup", renamedTableName, tableName) + .isEqualTo(1L); + } +}