diff --git a/core/src/main/java/org/apache/iceberg/rest/ErrorHandlers.java b/core/src/main/java/org/apache/iceberg/rest/ErrorHandlers.java index 543e548529dd..be408816c0ba 100644 --- a/core/src/main/java/org/apache/iceberg/rest/ErrorHandlers.java +++ b/core/src/main/java/org/apache/iceberg/rest/ErrorHandlers.java @@ -76,6 +76,10 @@ public static Consumer tableCommitHandler() { return CommitErrorHandler.INSTANCE; } + public static Consumer createTableErrorHandler() { + return CreateTableErrorHandler.INSTANCE; + } + public static Consumer planErrorHandler() { return PlanErrorHandler.INSTANCE; } @@ -138,6 +142,23 @@ public void accept(ErrorResponse error) { } } + /** Table create error handler. */ + private static class CreateTableErrorHandler extends CommitErrorHandler { + private static final ErrorHandler INSTANCE = new CreateTableErrorHandler(); + + @Override + public void accept(ErrorResponse error) { + switch (error.code()) { + case 404: + throw new NoSuchNamespaceException("%s", error.message()); + case 409: + throw new AlreadyExistsException("%s", error.message()); + } + + super.accept(error); + } + } + /** Plan level error handler. */ private static class PlanErrorHandler extends DefaultErrorHandler { private static final ErrorHandler INSTANCE = new PlanErrorHandler(); diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTTableOperations.java b/core/src/main/java/org/apache/iceberg/rest/RESTTableOperations.java index d2a6ab618ca8..be763d30fef1 100644 --- a/core/src/main/java/org/apache/iceberg/rest/RESTTableOperations.java +++ b/core/src/main/java/org/apache/iceberg/rest/RESTTableOperations.java @@ -169,7 +169,7 @@ public void commit(TableMetadata base, TableMetadata metadata) { .addAll(metadata.changes()) .build(); requirements = UpdateRequirements.forCreateTable(updates); - errorHandler = ErrorHandlers.tableErrorHandler(); // throws NoSuchTableException + errorHandler = ErrorHandlers.createTableErrorHandler(); break; case REPLACE: diff --git a/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java b/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java index d202680e5626..09543bd0f932 100644 --- a/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java +++ b/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java @@ -2543,6 +2543,77 @@ public void testNoCleanupForNonCleanableCreateTransaction() { }); } + @Test + public void testNoCleanupOnCreate503() { + RESTCatalogAdapter adapter = + Mockito.spy( + new RESTCatalogAdapter(backendCatalog) { + @Override + protected T execute( + HTTPRequest request, + Class responseType, + Consumer errorHandler, + Consumer> responseHeaders) { + var response = super.execute(request, responseType, errorHandler, responseHeaders); + if (request.method() == HTTPMethod.POST && request.path().contains(TABLE.name())) { + // Simulate a 503 Service Unavailable error + ErrorResponse error = + ErrorResponse.builder() + .responseCode(503) + .withMessage("Service unavailable") + .build(); + + errorHandler.accept(error); + throw new IllegalStateException("Error handler should have thrown"); + } + return response; + } + }); + + RESTCatalog catalog = catalog(adapter); + + if (requiresNamespaceCreate()) { + catalog.createNamespace(TABLE.namespace()); + } + + Transaction createTableTransaction = catalog.newCreateTableTransaction(TABLE, SCHEMA); + createTableTransaction.newAppend().appendFile(FILE_A).commit(); + + // Verify that 503 is mapped to CommitStateUnknownException (not just ServiceFailureException) + assertThatThrownBy(createTableTransaction::commitTransaction) + .isInstanceOf(CommitStateUnknownException.class) + .cause() + .isInstanceOf(ServiceFailureException.class) + .hasMessageContaining("Service failed: 503"); + + // Verify files are NOT cleaned up (because commit state is unknown) + assertThat(allRequests(adapter)) + .anySatisfy( + req -> { + assertThat(req.method()).isEqualTo(HTTPMethod.POST); + assertThat(req.path()).isEqualTo(RESOURCE_PATHS.table(TABLE)); + assertThat(req.body()).isInstanceOf(UpdateTableRequest.class); + UpdateTableRequest body = (UpdateTableRequest) req.body(); + assertThat( + body.updates().stream() + .filter(MetadataUpdate.AddSnapshot.class::isInstance) + .map(MetadataUpdate.AddSnapshot.class::cast) + .findFirst()) + .hasValueSatisfying( + addSnapshot -> { + String manifestListLocation = addSnapshot.snapshot().manifestListLocation(); + // Files should still exist because we don't know if commit succeeded + assertThat( + catalog + .loadTable(TABLE) + .io() + .newInputFile(manifestListLocation) + .exists()) + .isTrue(); + }); + }); + } + @Test public void testCleanupCleanableExceptionsReplace() { RESTCatalogAdapter adapter = Mockito.spy(new RESTCatalogAdapter(backendCatalog));