Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ public class GlueTestBase {
// iceberg
static GlueCatalog glueCatalog;
static GlueCatalog glueCatalogWithSkipNameValidation;
static GlueCatalog glueCatalogWithUniqueLocation;

static Schema schema =
new Schema(Types.NestedField.required(1, "c1", Types.StringType.get(), "c1"));
Expand Down Expand Up @@ -105,6 +106,16 @@ public static void beforeClass() {
GLUE,
null,
ImmutableMap.of());

glueCatalogWithUniqueLocation = new GlueCatalog();
glueCatalogWithUniqueLocation.initialize(
CATALOG_NAME,
TEST_BUCKET_PATH,
awsProperties,
s3FileIOProperties,
GLUE,
null,
true /* uniqTableLocation */);
}

@AfterAll
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,22 @@ public void testRenameTable() {
assertThat(renamedTable.currentSnapshot()).isEqualTo(table.currentSnapshot());
}

@Test
public void testCreateTableInUniqueLocation() {
String namespace = createNamespace();
String tableName = createTable(namespace);
String newTableName = tableName + "_renamed";

glueCatalogWithUniqueLocation.renameTable(
TableIdentifier.of(namespace, tableName), TableIdentifier.of(namespace, newTableName));
Table renamedTable =
glueCatalogWithUniqueLocation.loadTable(TableIdentifier.of(namespace, newTableName));
createTable(namespace, tableName);
Table table = glueCatalogWithUniqueLocation.loadTable(TableIdentifier.of(namespace, tableName));

assertThat(renamedTable.location()).isNotEqualTo(table.location());
}

@Test
public void testRenameTableFailsToCreateNewTable() {
String namespace = createNamespace();
Expand Down Expand Up @@ -743,7 +759,8 @@ public void testTableLevelS3Tags() {
new AwsProperties(properties),
new S3FileIOProperties(properties),
GLUE,
null);
null,
false /* uniqTableLocation */);
String namespace = createNamespace();
String tableName = getRandomName();
createTable(namespace, tableName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.util.LocationUtil;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.iceberg.util.Tasks;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -112,6 +113,7 @@ public class DynamoDbCatalog extends BaseMetastoreCatalog
private FileIO fileIO;
private CloseableGroup closeableGroup;
private Map<String, String> catalogProperties;
private boolean uniqueTableLocation;

public DynamoDbCatalog() {}

Expand All @@ -123,12 +125,21 @@ public void initialize(String name, Map<String, String> properties) {
properties.get(CatalogProperties.WAREHOUSE_LOCATION),
new AwsProperties(properties),
AwsClientFactories.from(properties).dynamo(),
initializeFileIO(properties));
initializeFileIO(properties),
PropertyUtil.propertyAsBoolean(
properties,
CatalogProperties.UNIQUE_TABLE_LOCATION,
CatalogProperties.UNIQUE_TABLE_LOCATION_DEFAULT));
}

@VisibleForTesting
void initialize(
String name, String path, AwsProperties properties, DynamoDbClient client, FileIO io) {
String name,
String path,
AwsProperties properties,
DynamoDbClient client,
FileIO io,
boolean uniqTableLocation) {
Preconditions.checkArgument(
!Strings.isNullOrEmpty(path),
"Cannot initialize DynamoDbCatalog because warehousePath must not be null or empty");
Expand All @@ -138,6 +149,7 @@ void initialize(
this.warehousePath = LocationUtil.stripTrailingSlash(path);
this.dynamo = client;
this.fileIO = io;
this.uniqueTableLocation = uniqTableLocation;

this.closeableGroup = new CloseableGroup();
closeableGroup.addCloseable(dynamo);
Expand Down Expand Up @@ -177,12 +189,12 @@ protected String defaultWarehouseLocation(TableIdentifier tableIdentifier) {
}

String defaultLocationCol = toPropertyCol(PROPERTY_DEFAULT_LOCATION);
String tableLocation = LocationUtil.tableLocation(tableIdentifier, uniqueTableLocation);
if (response.item().containsKey(defaultLocationCol)) {
return String.format(
"%s/%s", response.item().get(defaultLocationCol).s(), tableIdentifier.name());
return String.format("%s/%s", response.item().get(defaultLocationCol).s(), tableLocation);
} else {
return String.format(
"%s/%s.db/%s", warehousePath, tableIdentifier.namespace(), tableIdentifier.name());
"%s/%s.db/%s", warehousePath, tableIdentifier.namespace(), tableLocation);
}
}

Expand Down
28 changes: 23 additions & 5 deletions aws/src/main/java/org/apache/iceberg/aws/glue/GlueCatalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ public class GlueCatalog extends BaseMetastoreCatalog
private Object hadoopConf;
private String catalogName;
private String warehousePath;
private boolean uniqueTableLocation;
private AwsProperties awsProperties;
private S3FileIOProperties s3FileIOProperties;
private LockManager lockManager;
Expand Down Expand Up @@ -144,7 +145,11 @@ public void initialize(String name, Map<String, String> properties) {
new AwsProperties(properties),
new S3FileIOProperties(properties),
awsClientFactory.glue(),
initializeLockManager(properties));
initializeLockManager(properties),
PropertyUtil.propertyAsBoolean(
properties,
CatalogProperties.UNIQUE_TABLE_LOCATION,
CatalogProperties.UNIQUE_TABLE_LOCATION_DEFAULT));
}

private LockManager initializeLockManager(Map<String, String> properties) {
Expand Down Expand Up @@ -172,7 +177,17 @@ void initialize(
LockManager lock,
Map<String, String> catalogProps) {
this.catalogProperties = catalogProps;
initialize(name, path, properties, s3Properties, client, lock);
initialize(
name,
path,
properties,
s3Properties,
client,
lock,
PropertyUtil.propertyAsBoolean(
catalogProps,
CatalogProperties.UNIQUE_TABLE_LOCATION,
CatalogProperties.UNIQUE_TABLE_LOCATION_DEFAULT));
}

@VisibleForTesting
Expand All @@ -182,13 +197,15 @@ void initialize(
AwsProperties properties,
S3FileIOProperties s3Properties,
GlueClient client,
LockManager lock) {
LockManager lock,
boolean uniqTableLocation) {
this.catalogName = name;
this.awsProperties = properties;
this.s3FileIOProperties = s3Properties;
this.warehousePath = Strings.isNullOrEmpty(path) ? null : LocationUtil.stripTrailingSlash(path);
this.glue = client;
this.lockManager = lock;
this.uniqueTableLocation = uniqTableLocation;

this.closeableGroup = new CloseableGroup();
this.fileIOTracker = new FileIOTracker();
Expand Down Expand Up @@ -278,9 +295,10 @@ protected String defaultWarehouseLocation(TableIdentifier tableIdentifier) {
tableIdentifier, awsProperties.glueCatalogSkipNameValidation()))
.build());
String dbLocationUri = response.database().locationUri();
String tableLocation = LocationUtil.tableLocation(tableIdentifier, uniqueTableLocation);
if (dbLocationUri != null) {
dbLocationUri = LocationUtil.stripTrailingSlash(dbLocationUri);
return String.format("%s/%s", dbLocationUri, tableIdentifier.name());
return String.format("%s/%s", dbLocationUri, tableLocation);
}

ValidationException.check(
Expand All @@ -292,7 +310,7 @@ protected String defaultWarehouseLocation(TableIdentifier tableIdentifier) {
warehousePath,
IcebergToGlueConverter.getDatabaseName(
tableIdentifier, awsProperties.glueCatalogSkipNameValidation()),
tableIdentifier.name());
tableLocation);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,25 @@ public class TestDynamoDbCatalog {
public void before() {
dynamo = Mockito.mock(DynamoDbClient.class);
dynamoCatalog = new DynamoDbCatalog();
dynamoCatalog.initialize(CATALOG_NAME, WAREHOUSE_PATH, new AwsProperties(), dynamo, null);
dynamoCatalog.initialize(
CATALOG_NAME,
WAREHOUSE_PATH,
new AwsProperties(),
dynamo,
null,
false /* uniqTableLocation */);
}

@Test
public void testConstructorWarehousePathWithEndSlash() {
DynamoDbCatalog catalogWithSlash = new DynamoDbCatalog();
catalogWithSlash.initialize(
CATALOG_NAME, WAREHOUSE_PATH + "/", new AwsProperties(), dynamo, null);
CATALOG_NAME,
WAREHOUSE_PATH + "/",
new AwsProperties(),
dynamo,
null,
false /* uniqTableLocation */);
Mockito.doReturn(GetItemResponse.builder().item(Maps.newHashMap()).build())
.when(dynamo)
.getItem(any(GetItemRequest.class));
Expand Down Expand Up @@ -103,4 +114,49 @@ public void testDefaultWarehouseLocationNoNamespace() {
.isInstanceOf(NoSuchNamespaceException.class)
.hasMessageContaining("Cannot find default warehouse location:");
}

@Test
public void testDefaultWarehouseLocationUniqueWithoutDbUri() throws Exception {
try (DynamoDbCatalog catalog = new DynamoDbCatalog()) {
catalog.initialize(
CATALOG_NAME,
WAREHOUSE_PATH,
new AwsProperties(),
dynamo,
null,
true /* uniqTableLocation */);
Mockito.doReturn(GetItemResponse.builder().item(Maps.newHashMap()).build())
.when(dynamo)
.getItem(any(GetItemRequest.class));

String defaultWarehouseLocation = catalog.defaultWarehouseLocation(TABLE_IDENTIFIER);
assertThat(defaultWarehouseLocation).matches(WAREHOUSE_PATH + "/db.db/table-[a-z0-9]{32}");
}
}

@Test
public void testDefaultWarehouseLocationUniqueWithDbUri() throws Exception {
try (DynamoDbCatalog catalog = new DynamoDbCatalog()) {
catalog.initialize(
CATALOG_NAME,
WAREHOUSE_PATH,
new AwsProperties(),
dynamo,
null,
true /* uniqTableLocation */);
String dbUri = "s3://bucket2/db";
Mockito.doReturn(
GetItemResponse.builder()
.item(
ImmutableMap.of(
toPropertyCol(DynamoDbCatalog.defaultLocationProperty()),
AttributeValue.builder().s(dbUri).build()))
.build())
.when(dynamo)
.getItem(any(GetItemRequest.class));

String defaultWarehouseLocation = catalog.defaultWarehouseLocation(TABLE_IDENTIFIER);
assertThat(defaultWarehouseLocation).matches("s3://bucket2/db/table-[a-z0-9]{32}");
}
}
}
22 changes: 22 additions & 0 deletions aws/src/test/java/org/apache/iceberg/aws/glue/TestGlueCatalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,28 @@ public void testDefaultWarehouseLocationCustomCatalogId() {
Mockito.argThat((GetDatabaseRequest req) -> req.catalogId().equals(catalogId)));
}

@Test
public void testDefaultWarehouseLocationUnique() {
GlueCatalog catalog = new GlueCatalog();
catalog.initialize(
CATALOG_NAME,
WAREHOUSE_PATH,
new AwsProperties(),
new S3FileIOProperties(),
glue,
LockManagers.defaultLockManager(),
true /* uniqTableLocation */);

Mockito.doReturn(
GetDatabaseResponse.builder()
.database(Database.builder().name("db").locationUri("s3://bucket2/db").build())
.build())
.when(glue)
.getDatabase(Mockito.any(GetDatabaseRequest.class));
String location = catalog.defaultWarehouseLocation(TableIdentifier.of("db", "table"));
assertThat(location).matches("s3://bucket2/db/table-[a-z0-9]{32}");
}

@Test
public void testListTables() {
Mockito.doReturn(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import static org.assertj.core.api.Assertions.assertThat;

import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import org.apache.iceberg.CatalogProperties;
Expand Down Expand Up @@ -169,6 +170,18 @@ public void testRenameTableMissingSourceTable() {
super.testRenameTableMissingSourceTable();
}

@Disabled("BigQuery Metastore does not support rename tables")
@Test
public void createTableInUniqueLocation() {
super.createTableInUniqueLocation();
}

@Disabled("BigQuery Metastore does not support rename tables")
@Test
public void dropAfterRenameDoesntCorruptTable() throws IOException {
super.dropAfterRenameDoesntCorruptTable();
}

@Test
public void testIsValidIdentifierWithValidSingleLevelNamespace() {
assertThat(catalog.isValidIdentifier(TableIdentifier.of("dataset1", "table1"))).isTrue();
Expand Down
9 changes: 9 additions & 0 deletions core/src/main/java/org/apache/iceberg/CatalogProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,15 @@ private CatalogProperties() {}
public static final String APP_NAME = "app-name";
public static final String USER = "user";

/**
* Requests that the catalog provide unique locations for new tables.
*
* <p>Relevant only for catalogs which support unique table names.
*/
public static final String UNIQUE_TABLE_LOCATION = "unique-table-location";

public static final boolean UNIQUE_TABLE_LOCATION_DEFAULT = false;

public static final String AUTH_SESSION_TIMEOUT_MS = "auth.session-timeout-ms";
public static final long AUTH_SESSION_TIMEOUT_MS_DEFAULT = TimeUnit.HOURS.toMillis(1);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@
import org.apache.iceberg.relocated.com.google.common.base.Objects;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.util.LocationUtil;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.iceberg.view.BaseMetastoreViewCatalog;
import org.apache.iceberg.view.BaseViewOperations;
import org.apache.iceberg.view.ViewMetadata;
Expand All @@ -71,6 +73,7 @@ public class InMemoryCatalog extends BaseMetastoreViewCatalog
private String catalogName;
private String warehouseLocation;
private CloseableGroup closeableGroup;
private boolean uniqueTableLocation;
private Map<String, String> catalogProperties;

public InMemoryCatalog() {
Expand All @@ -88,6 +91,11 @@ public String name() {
public void initialize(String name, Map<String, String> properties) {
this.catalogName = name != null ? name : InMemoryCatalog.class.getSimpleName();
this.catalogProperties = ImmutableMap.copyOf(properties);
this.uniqueTableLocation =
PropertyUtil.propertyAsBoolean(
properties,
CatalogProperties.UNIQUE_TABLE_LOCATION,
CatalogProperties.UNIQUE_TABLE_LOCATION_DEFAULT);

String warehouse = properties.getOrDefault(CatalogProperties.WAREHOUSE_LOCATION, "");
this.warehouseLocation = warehouse.replaceAll("/*$", "");
Expand All @@ -104,8 +112,8 @@ protected TableOperations newTableOps(TableIdentifier tableIdentifier) {

@Override
protected String defaultWarehouseLocation(TableIdentifier tableIdentifier) {
return SLASH.join(
defaultNamespaceLocation(tableIdentifier.namespace()), tableIdentifier.name());
String tableLocation = LocationUtil.tableLocation(tableIdentifier, uniqueTableLocation);
return SLASH.join(defaultNamespaceLocation(tableIdentifier.namespace()), tableLocation);
}

private String defaultNamespaceLocation(Namespace namespace) {
Expand Down
Loading