Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
import org.apache.spark.sql.delta.actions.RemoveFile;

import scala.Option;
import scala.Tuple2;
import scala.collection.Seq;

import io.delta.tables.DeltaTable;

Expand Down Expand Up @@ -191,18 +193,58 @@ public CommitsBacklog<Long> getCommitsBacklog(
}

/*
* In Delta Lake, each commit is a self-describing one i.e. it contains list of new files while
* also containing list of files that were deleted. So, vacuum has no special effect on the
* incremental sync. Hence, existence of commit is the only check required.
* Following checks are performed:
* 1. Check if a commit exists at or before the provided instant.
* 2. Verify that commit files needed for incremental sync are still accessible.
*
* Delta Lake's VACUUM operation removes old JSON commit files from _delta_log/, which can
* break incremental sync even though commits are self-describing. This method attempts to
* access the commit chain to ensure files haven't been vacuumed.
*/
@Override
public boolean isIncrementalSyncSafeFrom(Instant instant) {
DeltaHistoryManager.Commit deltaCommitAtOrBeforeInstant =
deltaLog.history().getActiveCommitAtTime(Timestamp.from(instant), true, false, true);
// There is a chance earliest commit of the table is returned if the instant is before the
// earliest commit of the table, hence the additional check.
Instant deltaCommitInstant = Instant.ofEpochMilli(deltaCommitAtOrBeforeInstant.getTimestamp());
return deltaCommitInstant.equals(instant) || deltaCommitInstant.isBefore(instant);
try {
DeltaHistoryManager.Commit deltaCommitAtOrBeforeInstant =
deltaLog.history().getActiveCommitAtTime(Timestamp.from(instant), true, false, true);

// There is a chance earliest commit of the table is returned if the instant is before the
// earliest commit of the table, hence the additional check.
Instant deltaCommitInstant = Instant.ofEpochMilli(deltaCommitAtOrBeforeInstant.getTimestamp());
if (deltaCommitInstant.isAfter(instant)) {
log.info(
"No commit found at or before instant {}. Earliest available commit is at {}",
instant,
deltaCommitInstant);
return false;
}

long versionAtInstant = deltaCommitAtOrBeforeInstant.version();

// Verify that we can actually access commit files from this version onward by attempting
// to read the changes. This will fail if VACUUM has removed the necessary commit files.
// We only need to verify we can start iterating - we don't need to consume all changes.
scala.collection.Iterator<Tuple2<Object, Seq<Action>>> changesIterator =
deltaLog.getChanges(versionAtInstant, true);

// Test if we can access at least the first commit. If commit files are missing due to
// VACUUM, this will throw an exception (typically FileNotFoundException or similar).
if (changesIterator.hasNext()) {
// Successfully verified we can access commit files
return true;
} else {
// No changes available from this version (shouldn't happen for valid commits)
log.warn(
"No changes available starting from version {} (instant: {})", versionAtInstant, instant);
return false;
}
} catch (Exception e) {
// Commit files have been vacuumed or are otherwise inaccessible
log.info(
"Cannot perform incremental sync from instant {} due to missing or inaccessible commit files: {}",
instant,
e.getMessage());
return false;
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,21 +24,27 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;

import lombok.Builder;
import lombok.extern.slf4j.Slf4j;

import io.delta.kernel.Snapshot;
import io.delta.kernel.Table;
import io.delta.kernel.data.ColumnarBatch;
import io.delta.kernel.engine.Engine;
import io.delta.kernel.internal.DeltaLogActionUtils;
import io.delta.kernel.internal.SnapshotImpl;
import io.delta.kernel.internal.TableImpl;
import io.delta.kernel.internal.actions.AddFile;
import io.delta.kernel.internal.actions.RemoveFile;
import io.delta.kernel.internal.actions.RowBackedAction;
import io.delta.kernel.internal.util.VectorUtils;
import io.delta.kernel.utils.CloseableIterator;

import org.apache.xtable.exception.ReadException;
import org.apache.xtable.model.CommitsBacklog;
Expand Down Expand Up @@ -192,9 +198,42 @@ public boolean isIncrementalSyncSafeFrom(Instant instant) {
// There is a chance earliest commit of the table is returned if the instant is before the
// earliest commit of the table, hence the additional check.
Instant deltaCommitInstant = Instant.ofEpochMilli(snapshot.getTimestamp(engine));
return deltaCommitInstant.equals(instant) || deltaCommitInstant.isBefore(instant);
if (deltaCommitInstant.isAfter(instant)) {
log.info(
"No commit found at or before instant {}. Earliest available commit is at {}",
instant,
deltaCommitInstant);
return false;
}

long versionAtInstant = snapshot.getVersion();
long currentVersion = table.getLatestSnapshot(engine).getVersion();

// Verify that we can actually access commit files from this version to current version.
// This will fail if VACUUM has removed the necessary commit files.
Set<DeltaLogActionUtils.DeltaAction> actionSet = new HashSet<>();
actionSet.add(DeltaLogActionUtils.DeltaAction.ADD);
actionSet.add(DeltaLogActionUtils.DeltaAction.REMOVE);

TableImpl tableImpl = (TableImpl) table;
// Attempt to get changes - this will throw if commit files are missing
try (CloseableIterator<ColumnarBatch> changesIterator =
tableImpl.getChanges(engine, versionAtInstant, currentVersion, actionSet)) {
// Test if we can access at least the first batch
if (changesIterator.hasNext()) {
// Successfully verified we can access commit files
return true;
} else {
// No changes available (edge case: versionAtInstant == currentVersion)
return true;
}
}
} catch (Exception e) {
log.error("Error checking if incremental sync is safe from " + instant, e);
// Commit files have been vacuumed or are otherwise inaccessible
log.info(
"Cannot perform incremental sync from instant {} due to missing or inaccessible commit files: {}",
instant,
e.getMessage());
return false;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@
import static org.apache.xtable.testutil.ITTestUtils.validateTable;
import static org.junit.jupiter.api.Assertions.*;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Instant;
Expand Down Expand Up @@ -422,7 +424,7 @@ public void testInsertsUpsertsAndDeletes(boolean isPartitioned) {
}

@Test
public void testsShowingVacuumHasNoEffectOnIncrementalSync() {
public void testVacuumAffectsIncrementalSyncSafety() {
boolean isPartitioned = true;
String tableName = GenericTable.getTableName();
TestSparkDeltaTable testSparkDeltaTable =
Expand Down Expand Up @@ -472,6 +474,97 @@ public void testsShowingVacuumHasNoEffectOnIncrementalSync() {
assertFalse(conversionSource.isIncrementalSyncSafeFrom(instantAsOfHourAgo));
}

@Test
public void testIncrementalSyncSafeWhenCommitFilesExist() {
String tableName = GenericTable.getTableName();
TestSparkDeltaTable testSparkDeltaTable =
new TestSparkDeltaTable(tableName, tempDir, sparkSession, null, false);

// Insert initial data
testSparkDeltaTable.insertRows(50);
Long timestamp1 = testSparkDeltaTable.getLastCommitTimestamp();

// Add more commits
testSparkDeltaTable.insertRows(50);

SourceTable tableConfig =
SourceTable.builder()
.name(tableName)
.basePath(testSparkDeltaTable.getBasePath())
.formatName(TableFormat.DELTA)
.build();
DeltaConversionSource conversionSource =
conversionSourceProvider.getConversionSourceInstance(tableConfig);

// Should be safe - commit files still exist
assertTrue(
conversionSource.isIncrementalSyncSafeFrom(Instant.ofEpochMilli(timestamp1)),
"Incremental sync should be safe when commit files exist");
}

@Test
public void testIncrementalSyncUnsafeForInstantBeforeEarliestCommit() {
String tableName = GenericTable.getTableName();
TestSparkDeltaTable testSparkDeltaTable =
new TestSparkDeltaTable(tableName, tempDir, sparkSession, null, false);

// Insert data
testSparkDeltaTable.insertRows(50);

SourceTable tableConfig =
SourceTable.builder()
.name(tableName)
.basePath(testSparkDeltaTable.getBasePath())
.formatName(TableFormat.DELTA)
.build();
DeltaConversionSource conversionSource =
conversionSourceProvider.getConversionSourceInstance(tableConfig);

// Try instant from 1 hour ago (before table existed)
Instant instantHourAgo = Instant.now().minus(1, ChronoUnit.HOURS);
assertFalse(
conversionSource.isIncrementalSyncSafeFrom(instantHourAgo),
"Incremental sync should be unsafe for instant before earliest commit");
}

@Test
public void testIncrementalSyncUnsafeAfterManualCommitFileDeletion() throws IOException {
String tableName = GenericTable.getTableName();
TestSparkDeltaTable testSparkDeltaTable =
new TestSparkDeltaTable(tableName, tempDir, sparkSession, null, false);

testSparkDeltaTable.insertRows(50);
Long timestamp1 = testSparkDeltaTable.getLastCommitTimestamp();
testSparkDeltaTable.insertRows(50);
testSparkDeltaTable.insertRows(50);

SourceTable tableConfig =
SourceTable.builder()
.name(tableName)
.basePath(testSparkDeltaTable.getBasePath())
.formatName(TableFormat.DELTA)
.build();

DeltaConversionSource conversionSource =
conversionSourceProvider.getConversionSourceInstance(tableConfig);

assertTrue(
conversionSource.isIncrementalSyncSafeFrom(Instant.ofEpochMilli(timestamp1)),
"Should be safe before deleting commit files");

// Delete commit file to simulate VACUUM or filesystem corruption
Path commitFile = Paths.get(testSparkDeltaTable.getBasePath(), "_delta_log", "00000000000000000000.json");
if (Files.exists(commitFile)) {
Files.delete(commitFile);
}

conversionSource = conversionSourceProvider.getConversionSourceInstance(tableConfig);

assertFalse(
conversionSource.isIncrementalSyncSafeFrom(Instant.ofEpochMilli(timestamp1)),
"Should be unsafe after manually deleting commit files");
}

@ParameterizedTest
@MethodSource("testWithPartitionToggle")
public void testVacuum(boolean isPartitioned) {
Expand Down
Loading