From 416c041fe2d0f6b8b57c0432a5b27b2e677a5fc6 Mon Sep 17 00:00:00 2001 From: Jordan Epstein Date: Tue, 4 Nov 2025 10:04:35 -0600 Subject: [PATCH] Move deleted files to Hadoop trash if configured As of now, the HadoopFileIO uses the Java delete API, which always skips using a configured trash directory. If the table's hadoop configuration has trash enabled, we should use it. We should also only do this for implementations where trashing files is acceptable. In our case, this is the LocalFileSystem and the DistributedFileSystem. --- .../apache/iceberg/hadoop/HadoopFileIO.java | 17 ++++- .../iceberg/hadoop/TestHadoopFileIO.java | 62 +++++++++++++++++++ 2 files changed, 77 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java b/core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java index a4ac5e2ff67a..3740f0abda88 100644 --- a/core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java +++ b/core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java @@ -28,8 +28,11 @@ import java.util.function.Function; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.fs.Trash; +import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.iceberg.exceptions.RuntimeIOException; import org.apache.iceberg.io.BulkDeletionFailureException; import org.apache.iceberg.io.DelegateFileIO; @@ -102,7 +105,7 @@ public void deleteFile(String path) { Path toDelete = new Path(path); FileSystem fs = Util.getFs(toDelete, getConf()); try { - fs.delete(toDelete, false /* not recursive */); + deletePath(fs, toDelete, false); } catch (IOException e) { throw new RuntimeIOException(e, "Failed to delete file: %s", path); } @@ -167,7 +170,7 @@ public void deletePrefix(String prefix) { FileSystem fs = Util.getFs(prefixToDelete, getConf()); try { - fs.delete(prefixToDelete, true /* recursive */); + deletePath(fs, prefixToDelete, true); } catch (IOException e) { throw new UncheckedIOException(e); } @@ -211,6 +214,16 @@ private ExecutorService executorService() { return executorService; } + private void deletePath(FileSystem fs, Path toDelete, boolean recursive) throws IOException { + Trash trash = new Trash(fs, getConf()); + if ((fs instanceof LocalFileSystem || fs instanceof DistributedFileSystem) + && trash.isEnabled()) { + trash.moveToTrash(toDelete); + } else { + fs.delete(toDelete, recursive); + } + } + /** * This class is a simple adaptor to allow for using Hadoop's RemoteIterator as an Iterator. * diff --git a/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopFileIO.java b/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopFileIO.java index 554ed625b9e3..b459c7e062a0 100644 --- a/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopFileIO.java +++ b/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopFileIO.java @@ -18,6 +18,7 @@ */ package org.apache.iceberg.hadoop; +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_KEY; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -126,6 +127,46 @@ public void testDeletePrefix() { .hasMessageContaining("java.io.FileNotFoundException"); } + @Test + public void testDeletePrefixWithTrashEnabled() throws IOException { + Configuration conf = new Configuration(); + conf.set(FS_TRASH_INTERVAL_KEY, "60"); + fs = FileSystem.getLocal(conf); + + hadoopFileIO = new HadoopFileIO(conf); + Path parent = new Path(tempDir.toURI()); + + List scaleSizes = Lists.newArrayList(1, 1000, 2500); + + scaleSizes.parallelStream() + .forEach( + scale -> { + Path scalePath = new Path(parent, Integer.toString(scale)); + + List filesCreated = createRandomFiles(scalePath, scale); + hadoopFileIO.deletePrefix(scalePath.toUri().toString()); + + // Hadoop filesystem will throw if the path does not exist + assertThatThrownBy( + () -> hadoopFileIO.listPrefix(scalePath.toUri().toString()).iterator()) + .isInstanceOf(UncheckedIOException.class) + .hasMessageContaining("java.io.FileNotFoundException"); + filesCreated.forEach( + file -> { + String fileSuffix = Path.getPathWithoutSchemeAndAuthority(file).toString(); + String trashPath = + fs.getTrashRoot(scalePath).toString() + "/Current" + fileSuffix; + assertThat(hadoopFileIO.newInputFile(trashPath).exists()).isTrue(); + }); + }); + + hadoopFileIO.deletePrefix(parent.toUri().toString()); + // Hadoop filesystem will throw if the path does not exist + assertThatThrownBy(() -> hadoopFileIO.listPrefix(parent.toUri().toString()).iterator()) + .isInstanceOf(UncheckedIOException.class) + .hasMessageContaining("java.io.FileNotFoundException"); + } + @Test public void testDeleteFiles() { Path parent = new Path(tempDir.toURI()); @@ -136,6 +177,27 @@ public void testDeleteFiles() { file -> assertThat(hadoopFileIO.newInputFile(file.toString()).exists()).isFalse()); } + @Test + public void testDeleteFilesWithTrashEnabled() throws IOException { + Configuration conf = new Configuration(); + conf.set(FS_TRASH_INTERVAL_KEY, "60"); + fs = FileSystem.getLocal(conf); + + hadoopFileIO = new HadoopFileIO(conf); + Path parent = new Path(tempDir.toURI()); + List filesCreated = createRandomFiles(parent, 10); + hadoopFileIO.deleteFiles( + filesCreated.stream().map(Path::toString).collect(Collectors.toList())); + filesCreated.forEach( + file -> assertThat(hadoopFileIO.newInputFile(file.toString()).exists()).isFalse()); + filesCreated.forEach( + file -> { + String fileSuffix = Path.getPathWithoutSchemeAndAuthority(file).toString(); + String trashPath = fs.getTrashRoot(parent).toString() + "/Current" + fileSuffix; + assertThat(hadoopFileIO.newInputFile(trashPath).exists()).isTrue(); + }); + } + @Test public void testDeleteFilesErrorHandling() { List filesCreated =