diff --git a/core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java b/core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java index a4ac5e2ff67a..3740f0abda88 100644 --- a/core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java +++ b/core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java @@ -28,8 +28,11 @@ import java.util.function.Function; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.fs.Trash; +import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.iceberg.exceptions.RuntimeIOException; import org.apache.iceberg.io.BulkDeletionFailureException; import org.apache.iceberg.io.DelegateFileIO; @@ -102,7 +105,7 @@ public void deleteFile(String path) { Path toDelete = new Path(path); FileSystem fs = Util.getFs(toDelete, getConf()); try { - fs.delete(toDelete, false /* not recursive */); + deletePath(fs, toDelete, false); } catch (IOException e) { throw new RuntimeIOException(e, "Failed to delete file: %s", path); } @@ -167,7 +170,7 @@ public void deletePrefix(String prefix) { FileSystem fs = Util.getFs(prefixToDelete, getConf()); try { - fs.delete(prefixToDelete, true /* recursive */); + deletePath(fs, prefixToDelete, true); } catch (IOException e) { throw new UncheckedIOException(e); } @@ -211,6 +214,16 @@ private ExecutorService executorService() { return executorService; } + private void deletePath(FileSystem fs, Path toDelete, boolean recursive) throws IOException { + Trash trash = new Trash(fs, getConf()); + if ((fs instanceof LocalFileSystem || fs instanceof DistributedFileSystem) + && trash.isEnabled()) { + trash.moveToTrash(toDelete); + } else { + fs.delete(toDelete, recursive); + } + } + /** * This class is a simple adaptor to allow for using Hadoop's RemoteIterator as an Iterator. * diff --git a/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopFileIO.java b/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopFileIO.java index 554ed625b9e3..b459c7e062a0 100644 --- a/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopFileIO.java +++ b/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopFileIO.java @@ -18,6 +18,7 @@ */ package org.apache.iceberg.hadoop; +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_KEY; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -126,6 +127,46 @@ public void testDeletePrefix() { .hasMessageContaining("java.io.FileNotFoundException"); } + @Test + public void testDeletePrefixWithTrashEnabled() throws IOException { + Configuration conf = new Configuration(); + conf.set(FS_TRASH_INTERVAL_KEY, "60"); + fs = FileSystem.getLocal(conf); + + hadoopFileIO = new HadoopFileIO(conf); + Path parent = new Path(tempDir.toURI()); + + List scaleSizes = Lists.newArrayList(1, 1000, 2500); + + scaleSizes.parallelStream() + .forEach( + scale -> { + Path scalePath = new Path(parent, Integer.toString(scale)); + + List filesCreated = createRandomFiles(scalePath, scale); + hadoopFileIO.deletePrefix(scalePath.toUri().toString()); + + // Hadoop filesystem will throw if the path does not exist + assertThatThrownBy( + () -> hadoopFileIO.listPrefix(scalePath.toUri().toString()).iterator()) + .isInstanceOf(UncheckedIOException.class) + .hasMessageContaining("java.io.FileNotFoundException"); + filesCreated.forEach( + file -> { + String fileSuffix = Path.getPathWithoutSchemeAndAuthority(file).toString(); + String trashPath = + fs.getTrashRoot(scalePath).toString() + "/Current" + fileSuffix; + assertThat(hadoopFileIO.newInputFile(trashPath).exists()).isTrue(); + }); + }); + + hadoopFileIO.deletePrefix(parent.toUri().toString()); + // Hadoop filesystem will throw if the path does not exist + assertThatThrownBy(() -> hadoopFileIO.listPrefix(parent.toUri().toString()).iterator()) + .isInstanceOf(UncheckedIOException.class) + .hasMessageContaining("java.io.FileNotFoundException"); + } + @Test public void testDeleteFiles() { Path parent = new Path(tempDir.toURI()); @@ -136,6 +177,27 @@ public void testDeleteFiles() { file -> assertThat(hadoopFileIO.newInputFile(file.toString()).exists()).isFalse()); } + @Test + public void testDeleteFilesWithTrashEnabled() throws IOException { + Configuration conf = new Configuration(); + conf.set(FS_TRASH_INTERVAL_KEY, "60"); + fs = FileSystem.getLocal(conf); + + hadoopFileIO = new HadoopFileIO(conf); + Path parent = new Path(tempDir.toURI()); + List filesCreated = createRandomFiles(parent, 10); + hadoopFileIO.deleteFiles( + filesCreated.stream().map(Path::toString).collect(Collectors.toList())); + filesCreated.forEach( + file -> assertThat(hadoopFileIO.newInputFile(file.toString()).exists()).isFalse()); + filesCreated.forEach( + file -> { + String fileSuffix = Path.getPathWithoutSchemeAndAuthority(file).toString(); + String trashPath = fs.getTrashRoot(parent).toString() + "/Current" + fileSuffix; + assertThat(hadoopFileIO.newInputFile(trashPath).exists()).isTrue(); + }); + } + @Test public void testDeleteFilesErrorHandling() { List filesCreated =