Skip to content

Conversation

@joyhaldar
Copy link
Contributor

@joyhaldar joyhaldar commented Jan 27, 2026

This PR optimizes ExpireSnapshotsSparkAction by filtering at the manifest level first, then reading content files only from orphaned manifests. Approach is similar to ReachableFileCleanup but uses distributed Spark operations.

Changes:

  • Added early exits when no snapshots expired or no orphaned manifests
  • Path level except to find orphaned manifests before reading content files
  • Compute manifest lists and stats separately to enable early exit without reading content files
  • Join to get orphaned manifest details, then read only those content files
  • Added contentFilesFromManifestDF() to read content files from a filtered manifest DataFrame (existing contentFileDS() only accepts snapshot IDs, not a filtered DataFrame)
  • Added emptyFileInfoDS() helper
  • Changed ReadManifest to protected in BaseSparkAction

Before

All Expired Files ------+
                        +--> Except --> Orphaned Files
All Live Files ---------+
     (reads all manifests)

After

                    +--> No expired snapshots? --> Return empty (Exit Early)
                    |
Expired Snapshots --+
                    |
                    +--> Find orphaned manifest paths via except
                              |
                              +--> No orphaned manifests? --> Return manifest lists + stats (Exit Early)
                              |
                              +--> Join to get orphaned manifest details
                                        |
                                        +--> Read content files only from orphaned manifests
                                                    |
                                                    +--> Except with live content files --> Orphaned Files

Tests:

  • testEarlyExitWhenNoOrphanedManifests
  • testManifestReusedAcrossSnapshots

References:

  • Early exit when no expired snapshots or no orphaned manifests, similar to ReachableFileCleanup.cleanFiles()
    if (!deletionCandidates.isEmpty()) {
    Set<ManifestFile> currentManifests = ConcurrentHashMap.newKeySet();
    Set<ManifestFile> manifestsToDelete =
    pruneReferencedManifests(
    snapshotsAfterExpiration, deletionCandidates, currentManifests::add);
    if (!manifestsToDelete.isEmpty()) {
  • Finding orphaned manifests by removing current references, similar to ReachableFileCleanup.pruneReferencedManifests()
    private Set<ManifestFile> pruneReferencedManifests(
    Set<Snapshot> snapshots,
    Set<ManifestFile> deletionCandidates,
    Consumer<ManifestFile> currentManifestCallback) {
    Set<ManifestFile> candidateSet = ConcurrentHashMap.newKeySet();
    candidateSet.addAll(deletionCandidates);
    Tasks.foreach(snapshots)
    .retry(3)
    .stopOnFailure()
    .throwFailureWhenFinished()
    .executeWith(planExecutorService)
    .onFailure(
    (snapshot, exc) ->
    LOG.warn(
    "Failed to determine manifests for snapshot {}", snapshot.snapshotId(), exc))
    .run(
    snapshot -> {
    try (CloseableIterable<ManifestFile> manifestFiles = readManifests(snapshot)) {
    for (ManifestFile manifestFile : manifestFiles) {
    candidateSet.remove(manifestFile);
    if (candidateSet.isEmpty()) {
    return;
    }
    currentManifestCallback.accept(manifestFile.copy());
    }
    } catch (IOException e) {
    throw new RuntimeIOException(
    e, "Failed to close manifest list: %s", snapshot.manifestListLocation());
    }
    });
    return candidateSet;
    }
  • Reading content files only from orphaned manifests, similar to ReachableFileCleanup.findFilesToDelete()
    private Set<String> findFilesToDelete(
    Set<ManifestFile> manifestFilesToDelete, Set<ManifestFile> currentManifestFiles) {
    Set<String> filesToDelete = ConcurrentHashMap.newKeySet();
    Tasks.foreach(manifestFilesToDelete)
    .retry(3)
    .suppressFailureWhenFinished()
    .executeWith(planExecutorService)
    .onFailure(
    (item, exc) ->
    LOG.warn(
    "Failed to determine live files in manifest {}. Retrying", item.path(), exc))
    .run(
    manifest -> {
    try (CloseableIterable<String> paths = ManifestFiles.readPaths(manifest, fileIO)) {
    paths.forEach(filesToDelete::add);
    } catch (IOException e) {
    throw new RuntimeIOException(e, "Failed to read manifest file: %s", manifest);
    }
    });

@github-actions github-actions bot added the spark label Jan 27, 2026
@manuzhang manuzhang changed the title Spark: Optimize ExpireSnapshotsSparkAction with manifest-level filtering Spark 4.1: Optimize ExpireSnapshotsSparkAction with manifest-level filtering Jan 27, 2026
@manuzhang manuzhang requested a review from Copilot January 27, 2026 16:27
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR optimizes ExpireSnapshotsSparkAction by replacing driver-side collection with distributed Spark operations for manifest filtering. Instead of reading content files from all manifests in expired snapshots, the implementation now filters at the manifest level first using join-based operations, then reads content files only from orphaned manifests.

Changes:

  • Added early exit paths when no snapshots are expired or no orphaned manifests exist
  • Implemented distributed join-based filtering to identify orphaned manifests before reading their content files
  • Refactored helper methods in BaseSparkAction to support the new distributed approach

Reviewed changes

Copilot reviewed 3 out of 3 changed files in this pull request and generated 2 comments.

File Description
ExpireSnapshotsSparkAction.java Replaced driver-side collection logic with distributed Spark operations for manifest-level filtering and added contentFilesFromManifestDF() method
BaseSparkAction.java Added emptyFileInfoDS() helper method and changed ReadManifest visibility to protected
TestExpireSnapshotsAction.java Updated expected job count in testUseLocalIterator() test from 4 to 12

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

.as(
"Expected total number of jobs with stream-results should match the expected number")
.isEqualTo(4L);
.isEqualTo(12L);
Copy link

Copilot AI Jan 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The expected job count increased from 4 to 12 due to the new distributed operations. Consider adding a comment explaining why this specific count is expected, or add a test case that validates the optimization logic (e.g., verifying early exits when no orphaned manifests exist).

Copilot uses AI. Check for mistakes.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a comment explaining the job count.

Dataset<FileInfo> liveStats = statisticsFileDS(updatedTable, null);
Dataset<FileInfo> orphanedStats = expiredStats.except(liveStats);

if (orphanedManifestPaths.isEmpty()) {
Copy link

Copilot AI Jan 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using isEmpty() on a Dataset triggers a Spark action that collects data to the driver. Consider using first() wrapped in a try-catch or take(1).length == 0 to avoid potentially expensive operations when checking if a dataset is empty.

Suggested change
if (orphanedManifestPaths.isEmpty()) {
boolean hasOrphanedManifestPaths = orphanedManifestPaths.limit(1).toLocalIterator().hasNext();
if (!hasOrphanedManifestPaths) {

Copilot uses AI. Check for mistakes.
Copy link
Contributor Author

@joyhaldar joyhaldar Jan 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review. Dataset.isEmpty() uses limit(1) and executeTake(1), it only fetches a single row to check emptiness, not the full dataset.

Source: https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/classic/Dataset.scala#L557-L560

@joyhaldar joyhaldar marked this pull request as ready for review January 28, 2026 03:55
@@ -1200,10 +1200,12 @@ public void testUseLocalIterator() {

checkExpirationResults(1L, 0L, 0L, 1L, 2L, results);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible for you to write another test for this functionality?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for your review Alex.

Sorry about that, I have added two tests for the optimization:

  • testEarlyExitWhenNoOrphanedManifests
  • testManifestReusedAcrossSnapshots

Let me know if I have misunderstood your comment and if you were looking for something different.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks a lot! I'll take a look at this first thing tomorrow. The tests help make it easier for myself and others to digest what exactly the code should be doing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you Alex. I have also added a References section to the PR description linking to the patterns in ReachableFileCleanup that this is based on. Please let me know if it helps with the review.

Copy link
Contributor

@rambleraptor rambleraptor left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not an expert on this area of the codebase, but the rough idea seems reasonable:

  • Find list of orphaned manifest lists / stats
  • Get list of files from there

}

private static class ReadManifest implements FlatMapFunction<ManifestFileBean, FileInfo> {
protected static class ReadManifest implements FlatMapFunction<ManifestFileBean, FileInfo> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just making this protected seems fine, but I'd love to get another opinion here.

Dataset<FileInfo> validFileDS = fileDS(updatedMetadata);

// fetch files referenced by expired snapshots
// find IDs of expired snapshots
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add some comments to break up these code sections? I think it helps to understand the flow of the code

Copy link
Contributor

@amogh-jahagirdar amogh-jahagirdar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @joyhaldar it's still a bit unclear to me why the new changes are significantly improving the execution? If we look at how fileDS works, and how spark would execute the antijoin I think we'd be implicitly covered? Do we have any numbers before/after this change or any particular cases which are egregiously inefficient at the moment?

// fetch files referenced by expired snapshots
// find IDs of expired snapshots
Set<Long> deletedSnapshotIds = findExpiredSnapshotIds(originalMetadata, updatedMetadata);
Dataset<FileInfo> deleteCandidateFileDS = fileDS(originalMetadata, deletedSnapshotIds);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, a lot of the cases called out in the PR description should be implicitly handled when you look at how fileDS works? e.g. we're creating the set of files from the set of manifests, and if there are no manifests it's already an empty set that we're doing the anti-join against.

Do we have any particular cases that we see improve after this change (if there are numbers that would be helpful)?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants