Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions core/src/main/java/org/apache/iceberg/rest/ErrorHandlers.java
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,10 @@ public static Consumer<ErrorResponse> tableCommitHandler() {
return CommitErrorHandler.INSTANCE;
}

public static Consumer<ErrorResponse> createTableErrorHandler() {
return CreateTableErrorHandler.INSTANCE;
}

public static Consumer<ErrorResponse> planErrorHandler() {
return PlanErrorHandler.INSTANCE;
}
Expand Down Expand Up @@ -138,6 +142,23 @@ public void accept(ErrorResponse error) {
}
}

/** Table create error handler. */
private static class CreateTableErrorHandler extends CommitErrorHandler {
private static final ErrorHandler INSTANCE = new CreateTableErrorHandler();

@Override
public void accept(ErrorResponse error) {
switch (error.code()) {
case 404:
throw new NoSuchNamespaceException("%s", error.message());
case 409:
throw new AlreadyExistsException("%s", error.message());
}

super.accept(error);
}
}

/** Plan level error handler. */
private static class PlanErrorHandler extends DefaultErrorHandler {
private static final ErrorHandler INSTANCE = new PlanErrorHandler();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ public void commit(TableMetadata base, TableMetadata metadata) {
.addAll(metadata.changes())
.build();
requirements = UpdateRequirements.forCreateTable(updates);
errorHandler = ErrorHandlers.tableErrorHandler(); // throws NoSuchTableException
errorHandler = ErrorHandlers.createTableErrorHandler();
break;

case REPLACE:
Expand Down
71 changes: 71 additions & 0 deletions core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -2543,6 +2543,77 @@ public void testNoCleanupForNonCleanableCreateTransaction() {
});
}

@Test
public void testNoCleanupOnCreate503() {
RESTCatalogAdapter adapter =
Mockito.spy(
new RESTCatalogAdapter(backendCatalog) {
@Override
protected <T extends RESTResponse> T execute(
HTTPRequest request,
Class<T> responseType,
Consumer<ErrorResponse> errorHandler,
Consumer<Map<String, String>> responseHeaders) {
var response = super.execute(request, responseType, errorHandler, responseHeaders);
if (request.method() == HTTPMethod.POST && request.path().contains(TABLE.name())) {
// Simulate a 503 Service Unavailable error
ErrorResponse error =
ErrorResponse.builder()
.responseCode(503)
.withMessage("Service unavailable")
.build();

errorHandler.accept(error);
throw new IllegalStateException("Error handler should have thrown");
}
return response;
}
});

RESTCatalog catalog = catalog(adapter);

if (requiresNamespaceCreate()) {
catalog.createNamespace(TABLE.namespace());
}

Transaction createTableTransaction = catalog.newCreateTableTransaction(TABLE, SCHEMA);
createTableTransaction.newAppend().appendFile(FILE_A).commit();

// Verify that 503 is mapped to CommitStateUnknownException (not just ServiceFailureException)
assertThatThrownBy(createTableTransaction::commitTransaction)
.isInstanceOf(CommitStateUnknownException.class)
.cause()
.isInstanceOf(ServiceFailureException.class)
.hasMessageContaining("Service failed: 503");

// Verify files are NOT cleaned up (because commit state is unknown)
assertThat(allRequests(adapter))
.anySatisfy(
req -> {
assertThat(req.method()).isEqualTo(HTTPMethod.POST);
assertThat(req.path()).isEqualTo(RESOURCE_PATHS.table(TABLE));
assertThat(req.body()).isInstanceOf(UpdateTableRequest.class);
UpdateTableRequest body = (UpdateTableRequest) req.body();
assertThat(
body.updates().stream()
.filter(MetadataUpdate.AddSnapshot.class::isInstance)
.map(MetadataUpdate.AddSnapshot.class::cast)
.findFirst())
.hasValueSatisfying(
addSnapshot -> {
String manifestListLocation = addSnapshot.snapshot().manifestListLocation();
// Files should still exist because we don't know if commit succeeded
assertThat(
catalog
.loadTable(TABLE)
.io()
.newInputFile(manifestListLocation)
.exists())
.isTrue();
});
});
}

@Test
public void testCleanupCleanableExceptionsReplace() {
RESTCatalogAdapter adapter = Mockito.spy(new RESTCatalogAdapter(backendCatalog));
Expand Down