diff --git a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java index d6a13bcd515d..8fff9ce41df2 100644 --- a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java +++ b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java @@ -72,6 +72,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.spark.JobGroupInfo; import org.apache.iceberg.spark.source.SerializableTableWithSize; +import org.apache.iceberg.util.DeleteFileSet; import org.apache.iceberg.util.Pair; import org.apache.spark.api.java.function.ForeachFunction; import org.apache.spark.api.java.function.MapFunction; @@ -297,11 +298,22 @@ private Result rebuildMetadata() { rewriteManifests(deltaSnapshots, endMetadata, rewriteManifestListResult.toRewrite()); // rebuild position delete files - Set deleteFiles = + // Use DeleteFileSet to ensure proper equality comparison based on file location, content + // offset, + // and content size. This is particularly important for deletion vectors (DV files) where + // multiple DV entries can reference the same Puffin file but have different offsets and sizes. + List> allDeleteFiles = rewriteManifestResult.toRewrite().stream() .filter(e -> e instanceof DeleteFile) + .collect(Collectors.toList()); + Set deleteFiles = + allDeleteFiles.stream() .map(e -> (DeleteFile) e) - .collect(Collectors.toSet()); + .collect(Collectors.toCollection(DeleteFileSet::create)); + LOG.debug( + "Delete files before deduplication: {}, after deduplication with DeleteFileSet: {}", + allDeleteFiles.size(), + deleteFiles.size()); rewritePositionDeletes(deleteFiles); ImmutableRewriteTablePath.Result.Builder builder = diff --git a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java index 0bcaf0af6581..3432a10fee23 100644 --- a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java +++ b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java @@ -565,6 +565,91 @@ public void testPositionDeletesAcrossFiles() throws Exception { .isEmpty(); } + /** + * Test for https://github.com/apache/iceberg/issues/14814 + * + *

This test verifies that rewrite_table_path correctly handles position delete files when + * multiple snapshots exist. The fix uses DeleteFileSet to properly deduplicate delete files based + * on file location, content offset, and content size, rather than relying on object identity. + * + *

Note: The original bug occurred when the same delete file appeared in multiple manifest + * entries (e.g., with ADDED and DELETED status), causing AlreadyExistsException. This test + * exercises the code path with DeleteFileSet deduplication but may not fully reproduce the edge + * case that occurs in production environments with complex manifest structures. + */ + @TestTemplate + public void testPositionDeletesWithMultipleSnapshots() throws Exception { + // Format versions 3 and 4 use Deletion Vectors stored in Puffin files, which have different + // validation rules that prevent adding multiple position deletes for the same data file + assumeThat(formatVersion) + .as("Format versions 3+ use DVs with different validation rules") + .isEqualTo(2); + + Table tableWithPosDeletes = + createTableWithSnapshots( + tableDir.toFile().toURI().toString().concat("tableWithMultipleSnapshots"), + 2, + Map.of(TableProperties.DELETE_DEFAULT_FILE_FORMAT, "parquet")); + + // Get a data file to create position deletes for + DataFile dataFile = + tableWithPosDeletes + .currentSnapshot() + .addedDataFiles(tableWithPosDeletes.io()) + .iterator() + .next(); + + // Create first position delete file + List> deletes1 = Lists.newArrayList(Pair.of(dataFile.location(), 0L)); + File file1 = + new File( + removePrefix(tableWithPosDeletes.location() + "/data/deeply/nested/deletes_1.parquet")); + DeleteFile positionDeletes1 = + FileHelpers.writeDeleteFile( + tableWithPosDeletes, + tableWithPosDeletes.io().newOutputFile(file1.toURI().toString()), + deletes1, + formatVersion) + .first(); + tableWithPosDeletes.newRowDelta().addDeletes(positionDeletes1).commit(); + long snapshot1 = tableWithPosDeletes.currentSnapshot().snapshotId(); + + // Create second position delete file + List> deletes2 = Lists.newArrayList(Pair.of(dataFile.location(), 1L)); + File file2 = + new File( + removePrefix(tableWithPosDeletes.location() + "/data/deeply/nested/deletes_2.parquet")); + DeleteFile positionDeletes2 = + FileHelpers.writeDeleteFile( + tableWithPosDeletes, + tableWithPosDeletes.io().newOutputFile(file2.toURI().toString()), + deletes2, + formatVersion) + .first(); + tableWithPosDeletes.newRowDelta().addDeletes(positionDeletes2).commit(); + long snapshot2 = tableWithPosDeletes.currentSnapshot().snapshotId(); + + // Create tags on different snapshots (simulating the production scenario) + tableWithPosDeletes.manageSnapshots().createTag("tag1", snapshot1).commit(); + tableWithPosDeletes.manageSnapshots().createTag("tag2", snapshot2).commit(); + + // This should NOT throw AlreadyExistsException + RewriteTablePath.Result result = + actions() + .rewriteTablePath(tableWithPosDeletes) + .stagingLocation(stagingLocation()) + .rewriteLocationPrefix(tableWithPosDeletes.location(), targetTableLocation()) + .execute(); + + // Verify the rewrite completed successfully - should have rewritten 2 delete files + assertThat(result.rewrittenDeleteFilePathsCount()) + .as("Should have rewritten exactly 2 delete files") + .isEqualTo(2); + + // Copy the metadata files and data files + copyTableFiles(result); + } + @TestTemplate public void testEqualityDeletes() throws Exception { Table sourceTable = createTableWithSnapshots(newTableLocation(), 1);