Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.spark.JobGroupInfo;
import org.apache.iceberg.spark.source.SerializableTableWithSize;
import org.apache.iceberg.util.DeleteFileSet;
import org.apache.iceberg.util.Pair;
import org.apache.spark.api.java.function.ForeachFunction;
import org.apache.spark.api.java.function.MapFunction;
Expand Down Expand Up @@ -297,11 +298,22 @@ private Result rebuildMetadata() {
rewriteManifests(deltaSnapshots, endMetadata, rewriteManifestListResult.toRewrite());

// rebuild position delete files
Set<DeleteFile> deleteFiles =
// Use DeleteFileSet to ensure proper equality comparison based on file location, content
// offset,
// and content size. This is particularly important for deletion vectors (DV files) where
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this line can go with the previous one

// multiple DV entries can reference the same Puffin file but have different offsets and sizes.
List<ContentFile<?>> allDeleteFiles =
rewriteManifestResult.toRewrite().stream()
.filter(e -> e instanceof DeleteFile)
.collect(Collectors.toList());
Set<DeleteFile> deleteFiles =
allDeleteFiles.stream()
.map(e -> (DeleteFile) e)
.collect(Collectors.toSet());
.collect(Collectors.toCollection(DeleteFileSet::create));
LOG.debug(
"Delete files before deduplication: {}, after deduplication with DeleteFileSet: {}",
allDeleteFiles.size(),
deleteFiles.size());
rewritePositionDeletes(deleteFiles);

ImmutableRewriteTablePath.Result.Builder builder =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -565,6 +565,91 @@ public void testPositionDeletesAcrossFiles() throws Exception {
.isEmpty();
}

/**
* Test for https://github.com/apache/iceberg/issues/14814
*
* <p>This test verifies that rewrite_table_path correctly handles position delete files when
* multiple snapshots exist. The fix uses DeleteFileSet to properly deduplicate delete files based
* on file location, content offset, and content size, rather than relying on object identity.
*
* <p>Note: The original bug occurred when the same delete file appeared in multiple manifest
* entries (e.g., with ADDED and DELETED status), causing AlreadyExistsException. This test
* exercises the code path with DeleteFileSet deduplication but may not fully reproduce the edge
* case that occurs in production environments with complex manifest structures.
*/
@TestTemplate
public void testPositionDeletesWithMultipleSnapshots() throws Exception {
// Format versions 3 and 4 use Deletion Vectors stored in Puffin files, which have different
// validation rules that prevent adding multiple position deletes for the same data file
assumeThat(formatVersion)
.as("Format versions 3+ use DVs with different validation rules")
.isEqualTo(2);

Table tableWithPosDeletes =
createTableWithSnapshots(
tableDir.toFile().toURI().toString().concat("tableWithMultipleSnapshots"),
2,
Map.of(TableProperties.DELETE_DEFAULT_FILE_FORMAT, "parquet"));

// Get a data file to create position deletes for
DataFile dataFile =
tableWithPosDeletes
.currentSnapshot()
.addedDataFiles(tableWithPosDeletes.io())
.iterator()
.next();

// Create first position delete file
List<Pair<CharSequence, Long>> deletes1 = Lists.newArrayList(Pair.of(dataFile.location(), 0L));
File file1 =
new File(
removePrefix(tableWithPosDeletes.location() + "/data/deeply/nested/deletes_1.parquet"));
DeleteFile positionDeletes1 =
FileHelpers.writeDeleteFile(
tableWithPosDeletes,
tableWithPosDeletes.io().newOutputFile(file1.toURI().toString()),
deletes1,
formatVersion)
.first();
tableWithPosDeletes.newRowDelta().addDeletes(positionDeletes1).commit();
long snapshot1 = tableWithPosDeletes.currentSnapshot().snapshotId();

// Create second position delete file
List<Pair<CharSequence, Long>> deletes2 = Lists.newArrayList(Pair.of(dataFile.location(), 1L));
File file2 =
new File(
removePrefix(tableWithPosDeletes.location() + "/data/deeply/nested/deletes_2.parquet"));
DeleteFile positionDeletes2 =
FileHelpers.writeDeleteFile(
tableWithPosDeletes,
tableWithPosDeletes.io().newOutputFile(file2.toURI().toString()),
deletes2,
formatVersion)
.first();
tableWithPosDeletes.newRowDelta().addDeletes(positionDeletes2).commit();
long snapshot2 = tableWithPosDeletes.currentSnapshot().snapshotId();

// Create tags on different snapshots (simulating the production scenario)
tableWithPosDeletes.manageSnapshots().createTag("tag1", snapshot1).commit();
tableWithPosDeletes.manageSnapshots().createTag("tag2", snapshot2).commit();

// This should NOT throw AlreadyExistsException
RewriteTablePath.Result result =
actions()
.rewriteTablePath(tableWithPosDeletes)
.stagingLocation(stagingLocation())
.rewriteLocationPrefix(tableWithPosDeletes.location(), targetTableLocation())
.execute();

// Verify the rewrite completed successfully - should have rewritten 2 delete files
assertThat(result.rewrittenDeleteFilePathsCount())
.as("Should have rewritten exactly 2 delete files")
.isEqualTo(2);

// Copy the metadata files and data files
copyTableFiles(result);
}

@TestTemplate
public void testEqualityDeletes() throws Exception {
Table sourceTable = createTableWithSnapshots(newTableLocation(), 1);
Expand Down