From bbfa13eb61accf9ad6b3aebd5d8b89a0a5053f7f Mon Sep 17 00:00:00 2001 From: Vinish Reddy Date: Mon, 12 Jan 2026 08:09:19 -0800 Subject: [PATCH] Add fallback for log truncation issue in Delta source --- .../xtable/delta/DeltaConversionSource.java | 60 ++++++++++-- .../kernel/DeltaKernelConversionSource.java | 43 ++++++++- .../xtable/delta/ITDeltaConversionSource.java | 95 ++++++++++++++++++- 3 files changed, 186 insertions(+), 12 deletions(-) diff --git a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionSource.java b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionSource.java index bb40a3158..3cc7f0917 100644 --- a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionSource.java +++ b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionSource.java @@ -41,6 +41,8 @@ import org.apache.spark.sql.delta.actions.RemoveFile; import scala.Option; +import scala.Tuple2; +import scala.collection.Seq; import io.delta.tables.DeltaTable; @@ -191,18 +193,58 @@ public CommitsBacklog getCommitsBacklog( } /* - * In Delta Lake, each commit is a self-describing one i.e. it contains list of new files while - * also containing list of files that were deleted. So, vacuum has no special effect on the - * incremental sync. Hence, existence of commit is the only check required. + * Following checks are performed: + * 1. Check if a commit exists at or before the provided instant. + * 2. Verify that commit files needed for incremental sync are still accessible. + * + * Delta Lake's VACUUM operation removes old JSON commit files from _delta_log/, which can + * break incremental sync even though commits are self-describing. This method attempts to + * access the commit chain to ensure files haven't been vacuumed. */ @Override public boolean isIncrementalSyncSafeFrom(Instant instant) { - DeltaHistoryManager.Commit deltaCommitAtOrBeforeInstant = - deltaLog.history().getActiveCommitAtTime(Timestamp.from(instant), true, false, true); - // There is a chance earliest commit of the table is returned if the instant is before the - // earliest commit of the table, hence the additional check. - Instant deltaCommitInstant = Instant.ofEpochMilli(deltaCommitAtOrBeforeInstant.getTimestamp()); - return deltaCommitInstant.equals(instant) || deltaCommitInstant.isBefore(instant); + try { + DeltaHistoryManager.Commit deltaCommitAtOrBeforeInstant = + deltaLog.history().getActiveCommitAtTime(Timestamp.from(instant), true, false, true); + + // There is a chance earliest commit of the table is returned if the instant is before the + // earliest commit of the table, hence the additional check. + Instant deltaCommitInstant = Instant.ofEpochMilli(deltaCommitAtOrBeforeInstant.getTimestamp()); + if (deltaCommitInstant.isAfter(instant)) { + log.info( + "No commit found at or before instant {}. Earliest available commit is at {}", + instant, + deltaCommitInstant); + return false; + } + + long versionAtInstant = deltaCommitAtOrBeforeInstant.version(); + + // Verify that we can actually access commit files from this version onward by attempting + // to read the changes. This will fail if VACUUM has removed the necessary commit files. + // We only need to verify we can start iterating - we don't need to consume all changes. + scala.collection.Iterator>> changesIterator = + deltaLog.getChanges(versionAtInstant, true); + + // Test if we can access at least the first commit. If commit files are missing due to + // VACUUM, this will throw an exception (typically FileNotFoundException or similar). + if (changesIterator.hasNext()) { + // Successfully verified we can access commit files + return true; + } else { + // No changes available from this version (shouldn't happen for valid commits) + log.warn( + "No changes available starting from version {} (instant: {})", versionAtInstant, instant); + return false; + } + } catch (Exception e) { + // Commit files have been vacuumed or are otherwise inaccessible + log.info( + "Cannot perform incremental sync from instant {} due to missing or inaccessible commit files: {}", + instant, + e.getMessage()); + return false; + } } @Override diff --git a/xtable-core/src/main/java/org/apache/xtable/kernel/DeltaKernelConversionSource.java b/xtable-core/src/main/java/org/apache/xtable/kernel/DeltaKernelConversionSource.java index fa088f087..631c69e51 100644 --- a/xtable-core/src/main/java/org/apache/xtable/kernel/DeltaKernelConversionSource.java +++ b/xtable-core/src/main/java/org/apache/xtable/kernel/DeltaKernelConversionSource.java @@ -24,21 +24,27 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; import lombok.Builder; import lombok.extern.slf4j.Slf4j; import io.delta.kernel.Snapshot; import io.delta.kernel.Table; +import io.delta.kernel.data.ColumnarBatch; import io.delta.kernel.engine.Engine; +import io.delta.kernel.internal.DeltaLogActionUtils; import io.delta.kernel.internal.SnapshotImpl; +import io.delta.kernel.internal.TableImpl; import io.delta.kernel.internal.actions.AddFile; import io.delta.kernel.internal.actions.RemoveFile; import io.delta.kernel.internal.actions.RowBackedAction; import io.delta.kernel.internal.util.VectorUtils; +import io.delta.kernel.utils.CloseableIterator; import org.apache.xtable.exception.ReadException; import org.apache.xtable.model.CommitsBacklog; @@ -192,9 +198,42 @@ public boolean isIncrementalSyncSafeFrom(Instant instant) { // There is a chance earliest commit of the table is returned if the instant is before the // earliest commit of the table, hence the additional check. Instant deltaCommitInstant = Instant.ofEpochMilli(snapshot.getTimestamp(engine)); - return deltaCommitInstant.equals(instant) || deltaCommitInstant.isBefore(instant); + if (deltaCommitInstant.isAfter(instant)) { + log.info( + "No commit found at or before instant {}. Earliest available commit is at {}", + instant, + deltaCommitInstant); + return false; + } + + long versionAtInstant = snapshot.getVersion(); + long currentVersion = table.getLatestSnapshot(engine).getVersion(); + + // Verify that we can actually access commit files from this version to current version. + // This will fail if VACUUM has removed the necessary commit files. + Set actionSet = new HashSet<>(); + actionSet.add(DeltaLogActionUtils.DeltaAction.ADD); + actionSet.add(DeltaLogActionUtils.DeltaAction.REMOVE); + + TableImpl tableImpl = (TableImpl) table; + // Attempt to get changes - this will throw if commit files are missing + try (CloseableIterator changesIterator = + tableImpl.getChanges(engine, versionAtInstant, currentVersion, actionSet)) { + // Test if we can access at least the first batch + if (changesIterator.hasNext()) { + // Successfully verified we can access commit files + return true; + } else { + // No changes available (edge case: versionAtInstant == currentVersion) + return true; + } + } } catch (Exception e) { - log.error("Error checking if incremental sync is safe from " + instant, e); + // Commit files have been vacuumed or are otherwise inaccessible + log.info( + "Cannot perform incremental sync from instant {} due to missing or inaccessible commit files: {}", + instant, + e.getMessage()); return false; } } diff --git a/xtable-core/src/test/java/org/apache/xtable/delta/ITDeltaConversionSource.java b/xtable-core/src/test/java/org/apache/xtable/delta/ITDeltaConversionSource.java index 3a754e278..fc855a593 100644 --- a/xtable-core/src/test/java/org/apache/xtable/delta/ITDeltaConversionSource.java +++ b/xtable-core/src/test/java/org/apache/xtable/delta/ITDeltaConversionSource.java @@ -21,8 +21,10 @@ import static org.apache.xtable.testutil.ITTestUtils.validateTable; import static org.junit.jupiter.api.Assertions.*; +import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; +import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; import java.time.Instant; @@ -422,7 +424,7 @@ public void testInsertsUpsertsAndDeletes(boolean isPartitioned) { } @Test - public void testsShowingVacuumHasNoEffectOnIncrementalSync() { + public void testVacuumAffectsIncrementalSyncSafety() { boolean isPartitioned = true; String tableName = GenericTable.getTableName(); TestSparkDeltaTable testSparkDeltaTable = @@ -472,6 +474,97 @@ public void testsShowingVacuumHasNoEffectOnIncrementalSync() { assertFalse(conversionSource.isIncrementalSyncSafeFrom(instantAsOfHourAgo)); } + @Test + public void testIncrementalSyncSafeWhenCommitFilesExist() { + String tableName = GenericTable.getTableName(); + TestSparkDeltaTable testSparkDeltaTable = + new TestSparkDeltaTable(tableName, tempDir, sparkSession, null, false); + + // Insert initial data + testSparkDeltaTable.insertRows(50); + Long timestamp1 = testSparkDeltaTable.getLastCommitTimestamp(); + + // Add more commits + testSparkDeltaTable.insertRows(50); + + SourceTable tableConfig = + SourceTable.builder() + .name(tableName) + .basePath(testSparkDeltaTable.getBasePath()) + .formatName(TableFormat.DELTA) + .build(); + DeltaConversionSource conversionSource = + conversionSourceProvider.getConversionSourceInstance(tableConfig); + + // Should be safe - commit files still exist + assertTrue( + conversionSource.isIncrementalSyncSafeFrom(Instant.ofEpochMilli(timestamp1)), + "Incremental sync should be safe when commit files exist"); + } + + @Test + public void testIncrementalSyncUnsafeForInstantBeforeEarliestCommit() { + String tableName = GenericTable.getTableName(); + TestSparkDeltaTable testSparkDeltaTable = + new TestSparkDeltaTable(tableName, tempDir, sparkSession, null, false); + + // Insert data + testSparkDeltaTable.insertRows(50); + + SourceTable tableConfig = + SourceTable.builder() + .name(tableName) + .basePath(testSparkDeltaTable.getBasePath()) + .formatName(TableFormat.DELTA) + .build(); + DeltaConversionSource conversionSource = + conversionSourceProvider.getConversionSourceInstance(tableConfig); + + // Try instant from 1 hour ago (before table existed) + Instant instantHourAgo = Instant.now().minus(1, ChronoUnit.HOURS); + assertFalse( + conversionSource.isIncrementalSyncSafeFrom(instantHourAgo), + "Incremental sync should be unsafe for instant before earliest commit"); + } + + @Test + public void testIncrementalSyncUnsafeAfterManualCommitFileDeletion() throws IOException { + String tableName = GenericTable.getTableName(); + TestSparkDeltaTable testSparkDeltaTable = + new TestSparkDeltaTable(tableName, tempDir, sparkSession, null, false); + + testSparkDeltaTable.insertRows(50); + Long timestamp1 = testSparkDeltaTable.getLastCommitTimestamp(); + testSparkDeltaTable.insertRows(50); + testSparkDeltaTable.insertRows(50); + + SourceTable tableConfig = + SourceTable.builder() + .name(tableName) + .basePath(testSparkDeltaTable.getBasePath()) + .formatName(TableFormat.DELTA) + .build(); + + DeltaConversionSource conversionSource = + conversionSourceProvider.getConversionSourceInstance(tableConfig); + + assertTrue( + conversionSource.isIncrementalSyncSafeFrom(Instant.ofEpochMilli(timestamp1)), + "Should be safe before deleting commit files"); + + // Delete commit file to simulate VACUUM or filesystem corruption + Path commitFile = Paths.get(testSparkDeltaTable.getBasePath(), "_delta_log", "00000000000000000000.json"); + if (Files.exists(commitFile)) { + Files.delete(commitFile); + } + + conversionSource = conversionSourceProvider.getConversionSourceInstance(tableConfig); + + assertFalse( + conversionSource.isIncrementalSyncSafeFrom(Instant.ofEpochMilli(timestamp1)), + "Should be unsafe after manually deleting commit files"); + } + @ParameterizedTest @MethodSource("testWithPartitionToggle") public void testVacuum(boolean isPartitioned) {