From 5456b3b551cd28ae4a835b321d0c8b6aaf910fb1 Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Mon, 19 Jan 2026 20:16:27 +0000 Subject: [PATCH 1/9] New configuration option iceberg.hadoop.delete-trash-schemas * declares whether to look @ trash policy before delete * defaults are currently "hdfs" and "viewfs", though I'm tempted to make the default "". --- .../apache/iceberg/hadoop/HadoopFileIO.java | 39 ++++++++++++++++--- 1 file changed, 33 insertions(+), 6 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java b/core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java index 3740f0abda88..0fd072ee18e9 100644 --- a/core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java +++ b/core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java @@ -28,17 +28,16 @@ import java.util.function.Function; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.fs.Trash; -import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.iceberg.exceptions.RuntimeIOException; import org.apache.iceberg.io.BulkDeletionFailureException; import org.apache.iceberg.io.DelegateFileIO; import org.apache.iceberg.io.FileInfo; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Streams; import org.apache.iceberg.util.SerializableMap; @@ -53,6 +52,7 @@ public class HadoopFileIO implements HadoopConfigurable, DelegateFileIO { private static final Logger LOG = LoggerFactory.getLogger(HadoopFileIO.class); private static final String DELETE_FILE_PARALLELISM = "iceberg.hadoop.delete-file-parallelism"; private static final String DELETE_FILE_POOL_NAME = "iceberg-hadoopfileio-delete"; + private static final String DELETE_TRASH_SCHEMAS = "iceberg.hadoop.delete-trash-schemas"; private static final int DELETE_RETRY_ATTEMPTS = 3; private static final int DEFAULT_DELETE_CORE_MULTIPLE = 4; private static volatile ExecutorService executorService; @@ -214,11 +214,38 @@ private ExecutorService executorService() { return executorService; } + /** + * Is a path in a schema of a filesystem where the hadoop trash policy should be used to move to it to trash? + * @param toDelete path to delete + * @return true if the path is in the list of schemas for which the + */ + @VisibleForTesting + boolean isTrashSchema(Path toDelete) { + String[] schemas = getConf().getTrimmedStrings(DELETE_TRASH_SCHEMAS, "hdfs", "viewfs"); + final String scheme = toDelete.toUri().getScheme(); + for (String s : schemas) { + if (s.equalsIgnoreCase(scheme)) { + return true; + } + } + return false; + } + + /** + * Delete a path; + * @param fs target filesystem. + * @param toDelete path to delete/move + * @param recursive should the delete operation be recursive? + * @throws IOException on a failure + */ private void deletePath(FileSystem fs, Path toDelete, boolean recursive) throws IOException { - Trash trash = new Trash(fs, getConf()); - if ((fs instanceof LocalFileSystem || fs instanceof DistributedFileSystem) - && trash.isEnabled()) { - trash.moveToTrash(toDelete); + if (isTrashSchema(toDelete)) { + Trash trash = new Trash(fs, getConf()); + if (!trash.isEnabled() || !trash.moveToTrash(toDelete)) { + // either trash is disabled or the move operation failed; fallback + // to delete. + fs.delete(toDelete, recursive); + } } else { fs.delete(toDelete, recursive); } From fe62659e6c75566b04e114e672e6fb4c48eeae61 Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Wed, 21 Jan 2026 14:39:37 +0000 Subject: [PATCH 2/9] Core: HadoopFileIO to take list of filesystem schemas to enable trash for Addresses #15093 --- .../apache/iceberg/hadoop/HadoopFileIO.java | 22 ++++++++++--------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java b/core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java index 0fd072ee18e9..f4277a75ec25 100644 --- a/core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java +++ b/core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java @@ -53,6 +53,7 @@ public class HadoopFileIO implements HadoopConfigurable, DelegateFileIO { private static final String DELETE_FILE_PARALLELISM = "iceberg.hadoop.delete-file-parallelism"; private static final String DELETE_FILE_POOL_NAME = "iceberg-hadoopfileio-delete"; private static final String DELETE_TRASH_SCHEMAS = "iceberg.hadoop.delete-trash-schemas"; + public static final String[] DEFAULT_TRASH_SCHEMAS = {"hdfs", "viewfs"}; private static final int DELETE_RETRY_ATTEMPTS = 3; private static final int DEFAULT_DELETE_CORE_MULTIPLE = 4; private static volatile ExecutorService executorService; @@ -215,17 +216,18 @@ private ExecutorService executorService() { } /** - * Is a path in a schema of a filesystem where the hadoop trash policy should be used to move to it to trash? + * Is a path in a schema of a filesystem where the hadoop trash policy should be used to move to + * it to trash? + * * @param toDelete path to delete * @return true if the path is in the list of schemas for which the */ @VisibleForTesting boolean isTrashSchema(Path toDelete) { - String[] schemas = getConf().getTrimmedStrings(DELETE_TRASH_SCHEMAS, "hdfs", "viewfs"); final String scheme = toDelete.toUri().getScheme(); - for (String s : schemas) { + for (String s : getConf().getTrimmedStrings(DELETE_TRASH_SCHEMAS, DEFAULT_TRASH_SCHEMAS)) { if (s.equalsIgnoreCase(scheme)) { - return true; + return true; } } return false; @@ -233,6 +235,7 @@ boolean isTrashSchema(Path toDelete) { /** * Delete a path; + * * @param fs target filesystem. * @param toDelete path to delete/move * @param recursive should the delete operation be recursive? @@ -241,14 +244,13 @@ boolean isTrashSchema(Path toDelete) { private void deletePath(FileSystem fs, Path toDelete, boolean recursive) throws IOException { if (isTrashSchema(toDelete)) { Trash trash = new Trash(fs, getConf()); - if (!trash.isEnabled() || !trash.moveToTrash(toDelete)) { - // either trash is disabled or the move operation failed; fallback - // to delete. - fs.delete(toDelete, recursive); + if (trash.isEnabled() && trash.moveToTrash(toDelete)) { + return; } - } else { - fs.delete(toDelete, recursive); + // either trash is disabled or the move operation failed; fallback + // to delete. } + fs.delete(toDelete, recursive); } /** From efec561947effb5e3cea3381866b63bdae9b2936 Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Wed, 21 Jan 2026 21:26:03 +0000 Subject: [PATCH 3/9] Core: HadoopFileIO to take list of filesystem schemas to enable trash for Addresses #15093 * Default Filesystems are hdfs: and viewfs: schemas * Tests conditionally enable file: schema as trash * Tests set up FS with/without trash enablement. * Also address issue that delete(missing-path) must always succeed; trash policy fails here. --- .../apache/iceberg/hadoop/HadoopFileIO.java | 18 +++- .../iceberg/hadoop/TestHadoopFileIO.java | 97 ++++++++++++++++--- 2 files changed, 97 insertions(+), 18 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java b/core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java index f4277a75ec25..c2e24515d1b8 100644 --- a/core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java +++ b/core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java @@ -52,8 +52,8 @@ public class HadoopFileIO implements HadoopConfigurable, DelegateFileIO { private static final Logger LOG = LoggerFactory.getLogger(HadoopFileIO.class); private static final String DELETE_FILE_PARALLELISM = "iceberg.hadoop.delete-file-parallelism"; private static final String DELETE_FILE_POOL_NAME = "iceberg-hadoopfileio-delete"; - private static final String DELETE_TRASH_SCHEMAS = "iceberg.hadoop.delete-trash-schemas"; - public static final String[] DEFAULT_TRASH_SCHEMAS = {"hdfs", "viewfs"}; + public static final String DELETE_TRASH_SCHEMAS = "iceberg.hadoop.delete-trash-schemas"; + private static final String[] DEFAULT_TRASH_SCHEMAS = {"hdfs", "viewfs"}; private static final int DELETE_RETRY_ATTEMPTS = 3; private static final int DEFAULT_DELETE_CORE_MULTIPLE = 4; private static volatile ExecutorService executorService; @@ -108,7 +108,7 @@ public void deleteFile(String path) { try { deletePath(fs, toDelete, false); } catch (IOException e) { - throw new RuntimeIOException(e, "Failed to delete file: %s", path); + throw new RuntimeIOException(e, "Failed to delete file: %s: %s", path, e.toString()); } } @@ -244,8 +244,16 @@ boolean isTrashSchema(Path toDelete) { private void deletePath(FileSystem fs, Path toDelete, boolean recursive) throws IOException { if (isTrashSchema(toDelete)) { Trash trash = new Trash(fs, getConf()); - if (trash.isEnabled() && trash.moveToTrash(toDelete)) { - return; + if (trash.isEnabled()) { + try { + if (trash.moveToTrash(toDelete)) { + // delete enabled and successful. + return; + } + } catch (FileNotFoundException e) { + // the source file is missing. + + } } // either trash is disabled or the move operation failed; fallback // to delete. diff --git a/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopFileIO.java b/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopFileIO.java index b459c7e062a0..a1a574ae2d24 100644 --- a/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopFileIO.java +++ b/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopFileIO.java @@ -19,10 +19,12 @@ package org.apache.iceberg.hadoop; import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_KEY; +import static org.apache.iceberg.hadoop.HadoopFileIO.DELETE_TRASH_SCHEMAS; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import java.io.File; +import java.io.FileNotFoundException; import java.io.IOException; import java.io.UncheckedIOException; import java.nio.file.Files; @@ -34,6 +36,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.iceberg.TestHelpers; import org.apache.iceberg.common.DynMethods; import org.apache.iceberg.io.BulkDeletionFailureException; @@ -48,20 +51,26 @@ import org.junit.jupiter.api.io.TempDir; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.MethodSource; +import org.junit.jupiter.params.provider.ValueSource; public class TestHadoopFileIO { private final Random random = new Random(1); + /** A filesystem. */ private FileSystem fs; + private HadoopFileIO hadoopFileIO; @TempDir private File tempDir; + private UserGroupInformation currentUser; @BeforeEach public void before() throws Exception { Configuration conf = new Configuration(); - fs = FileSystem.getLocal(conf); - + // remove all filesystems for the current user from cache. + // The next call to FileSystem.get("file://") will get one with the supplied config + currentUser = UserGroupInformation.getCurrentUser(); + resetLocalFS(false); hadoopFileIO = new HadoopFileIO(conf); } @@ -129,11 +138,9 @@ public void testDeletePrefix() { @Test public void testDeletePrefixWithTrashEnabled() throws IOException { - Configuration conf = new Configuration(); - conf.set(FS_TRASH_INTERVAL_KEY, "60"); - fs = FileSystem.getLocal(conf); + resetLocalFS(true); - hadoopFileIO = new HadoopFileIO(conf); + hadoopFileIO = new HadoopFileIO(fs.getConf()); Path parent = new Path(tempDir.toURI()); List scaleSizes = Lists.newArrayList(1, 1000, 2500); @@ -150,13 +157,15 @@ public void testDeletePrefixWithTrashEnabled() throws IOException { assertThatThrownBy( () -> hadoopFileIO.listPrefix(scalePath.toUri().toString()).iterator()) .isInstanceOf(UncheckedIOException.class) - .hasMessageContaining("java.io.FileNotFoundException"); + .hasMessageContaining("java.io.FileNotFoundException") + .cause() + .isInstanceOf(FileNotFoundException.class); filesCreated.forEach( file -> { String fileSuffix = Path.getPathWithoutSchemeAndAuthority(file).toString(); String trashPath = fs.getTrashRoot(scalePath).toString() + "/Current" + fileSuffix; - assertThat(hadoopFileIO.newInputFile(trashPath).exists()).isTrue(); + assertPathExists(trashPath); }); }); @@ -167,6 +176,24 @@ public void testDeletePrefixWithTrashEnabled() throws IOException { .hasMessageContaining("java.io.FileNotFoundException"); } + /** + * Closes active filesystems then creates a new filesystem with the chosen trash policy and + * updates the {@code fs} field with it. This guarantees that HadoopFileIO's getFileSystem() calls + * will get the same filesystem and its configuration settings. + * + * @param useTrash enable trash settings + * @throws IOException failure to create. + */ + private void resetLocalFS(boolean useTrash) throws IOException { + Configuration conf = new Configuration(); + if (useTrash) { + conf.set(FS_TRASH_INTERVAL_KEY, "60"); + conf.set(DELETE_TRASH_SCHEMAS, "file"); + } + FileSystem.closeAllForUGI(currentUser); + fs = FileSystem.getLocal(conf); + } + @Test public void testDeleteFiles() { Path parent = new Path(tempDir.toURI()); @@ -177,13 +204,18 @@ public void testDeleteFiles() { file -> assertThat(hadoopFileIO.newInputFile(file.toString()).exists()).isFalse()); } + @Test + public void testFileIsNotTrashSchema() throws Throwable { + assertThat(hadoopFileIO.isTrashSchema(new Path("file:///"))).isFalse(); + } + @Test public void testDeleteFilesWithTrashEnabled() throws IOException { - Configuration conf = new Configuration(); - conf.set(FS_TRASH_INTERVAL_KEY, "60"); - fs = FileSystem.getLocal(conf); + resetLocalFS(true); + + hadoopFileIO = new HadoopFileIO(fs.getConf()); + assertThat(hadoopFileIO.isTrashSchema(new Path("file:///"))).isTrue(); - hadoopFileIO = new HadoopFileIO(conf); Path parent = new Path(tempDir.toURI()); List filesCreated = createRandomFiles(parent, 10); hadoopFileIO.deleteFiles( @@ -194,10 +226,27 @@ public void testDeleteFilesWithTrashEnabled() throws IOException { file -> { String fileSuffix = Path.getPathWithoutSchemeAndAuthority(file).toString(); String trashPath = fs.getTrashRoot(parent).toString() + "/Current" + fileSuffix; - assertThat(hadoopFileIO.newInputFile(trashPath).exists()).isTrue(); + assertPathExists(trashPath); }); } + /** + * Verify semantics of a missing file delete are the same with and without trash: no reported + * error. + * + * @param useTrash use trash in the FS. + */ + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testDeleteMissingFileToTrash(boolean useTrash) throws IOException { + resetLocalFS(useTrash); + hadoopFileIO = new HadoopFileIO(fs.getConf()); + Path path = new Path(new Path(tempDir.toURI()), "missing"); + final String missing = path.toUri().toString(); + hadoopFileIO.deleteFile(missing); + assertPathDoesNotExist(missing); + } + @Test public void testDeleteFilesErrorHandling() { List filesCreated = @@ -297,4 +346,26 @@ private List createRandomFiles(Path parent, int count) { }); return paths; } + + /** + * Assert a path exists. + * + * @param path URI to file/dir. + */ + private void assertPathExists(String path) { + assertThat(hadoopFileIO.newInputFile(path).exists()) + .describedAs("File %s must exist", path) + .isTrue(); + } + + /** + * Assert a path does not exist. + * + * @param path URI to file/dir. + */ + private void assertPathDoesNotExist(String path) { + assertThat(hadoopFileIO.newInputFile(path).exists()) + .describedAs("File %s must exist", path) + .isFalse(); + } } From b018f7debc2357c93dadaf68b12ec03bc7b3748d Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Thu, 22 Jan 2026 15:28:44 +0000 Subject: [PATCH 4/9] use Trash.moveToAppropriateTrash() to resolve + docs. + test sets in a list of schemas when enabling local trash so verifying that the option splitting works. --- .../apache/iceberg/hadoop/HadoopFileIO.java | 38 ++++++++++++------- .../iceberg/hadoop/TestHadoopFileIO.java | 2 +- docs/docs/fileio.md | 13 +++++++ 3 files changed, 38 insertions(+), 15 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java b/core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java index c2e24515d1b8..006c072db06c 100644 --- a/core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java +++ b/core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java @@ -234,29 +234,39 @@ boolean isTrashSchema(Path toDelete) { } /** - * Delete a path; + * Delete a path. + * + *

If the filesystem is in the trash schemas, it will attempt to move it to trash. If there is + * any failure to move to trash then the fallback is to delete the path. As a result: when this + * operation returns there will not be a file/dir at the target path. * * @param fs target filesystem. * @param toDelete path to delete/move * @param recursive should the delete operation be recursive? - * @throws IOException on a failure + * @throws IOException on a delete failure. */ private void deletePath(FileSystem fs, Path toDelete, boolean recursive) throws IOException { if (isTrashSchema(toDelete)) { - Trash trash = new Trash(fs, getConf()); - if (trash.isEnabled()) { - try { - if (trash.moveToTrash(toDelete)) { - // delete enabled and successful. - return; - } - } catch (FileNotFoundException e) { - // the source file is missing. - + try { + // use moveToAppropriateTrash() which not only resolves through mounted filesystems like + // viewfs, + // it will query the resolved filesystem for its trash policy. + // The HDFS client will ask the remote server for its configuration, rather than + // what the client is configured with. + if (Trash.moveToAppropriateTrash(fs, toDelete, fs.getConf())) { + // trash enabled and operation successful. + return; } + } catch (FileNotFoundException e) { + // the source file is missing. Nothing to do. + return; + } catch (IOException e) { + // other failure (failure to get remote server trash policy, rename,...) + // log one line at info, full stack at debug, so a failure to move many files to trash + // doesn't flood the log + LOG.info("Failed to move {} to trash: {}", toDelete, e.toString()); + LOG.debug("Trash.moveToAppropriateTrash failure", e); } - // either trash is disabled or the move operation failed; fallback - // to delete. } fs.delete(toDelete, recursive); } diff --git a/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopFileIO.java b/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopFileIO.java index a1a574ae2d24..545819a53b49 100644 --- a/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopFileIO.java +++ b/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopFileIO.java @@ -188,7 +188,7 @@ private void resetLocalFS(boolean useTrash) throws IOException { Configuration conf = new Configuration(); if (useTrash) { conf.set(FS_TRASH_INTERVAL_KEY, "60"); - conf.set(DELETE_TRASH_SCHEMAS, "file"); + conf.set(DELETE_TRASH_SCHEMAS, " file , hdfs, viewfs"); } FileSystem.closeAllForUGI(currentUser); fs = FileSystem.getLocal(conf); diff --git a/docs/docs/fileio.md b/docs/docs/fileio.md index 0062219a49bd..c1ceb1c23d50 100644 --- a/docs/docs/fileio.md +++ b/docs/docs/fileio.md @@ -34,7 +34,20 @@ The responsibility of reading and writing data files lies with the processing en Different FileIO implementations are used depending on the type of storage. Iceberg comes with a set of FileIO implementations for popular storage providers. - Amazon S3 +- Azure Data Lake Storage Gen2 - Google Cloud Storage - Object Service Storage (including https) - Dell Enterprise Cloud Storage - Hadoop (adapts any Hadoop FileSystem implementation) + +## HadoopFileIO + +The HadoopFileIO implementation can connect to any filesystem for which there is a Hadoop filesystem client on the classpath. + +Configuration options for these filesystems are generally through properties set in the file `core-site.xml` or in `spark-defaults.conf` prefixed with `spark.hadoop.` + +### Specific options for HadoopFileIO + +| Property | Default | Description | +|-------------------------------------|----------------|------------------------------------------------------------------------------| +| iceberg.hadoop.delete-trash-schemas | "hdfs, viewfs" | Filesystem schemas where the trash feature is to be used when deleting files | From d39925aea2191ad2568ac5d9bf4cffe1be0395e9 Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Fri, 23 Jan 2026 17:22:20 +0000 Subject: [PATCH 5/9] TestHadoopFileIO tests 1. verify setting schemas to "" means default hdfs value goes 2. verify that a failure in moveToTrash() is caught and downgraded. Test case #2 is the key one, as it shows that delete works even if trash move somehow failed. + fix doc trailing space failure. --- .../iceberg/hadoop/TestHadoopFileIO.java | 114 +++++++++++++++++- docs/docs/fileio.md | 2 +- 2 files changed, 112 insertions(+), 4 deletions(-) diff --git a/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopFileIO.java b/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopFileIO.java index 545819a53b49..fc036c15f855 100644 --- a/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopFileIO.java +++ b/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopFileIO.java @@ -32,10 +32,13 @@ import java.util.Random; import java.util.UUID; import java.util.Vector; +import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.Trash; +import org.apache.hadoop.fs.TrashPolicy; import org.apache.hadoop.security.UserGroupInformation; import org.apache.iceberg.TestHelpers; import org.apache.iceberg.common.DynMethods; @@ -205,14 +208,31 @@ public void testDeleteFiles() { } @Test - public void testFileIsNotTrashSchema() throws Throwable { - assertThat(hadoopFileIO.isTrashSchema(new Path("file:///"))).isFalse(); + public void testDefaultTrashSchemas() { + // hdfs is a default trash schema; viewfs is, file isn't + assertThat(hadoopFileIO.isTrashSchema(new Path("hdfs:///"))) + .describedAs("hdfs schema") + .isTrue(); + assertThat(hadoopFileIO.isTrashSchema(new Path("viewfs:///"))) + .describedAs("viewfs schema") + .isTrue(); + assertThat(hadoopFileIO.isTrashSchema(new Path("file:///"))) + .describedAs("file schema") + .isFalse(); + } + + @Test + public void testRemoveTrashSchemas() { + // set the schema list to "" and verify that the default values are gone + final Configuration conf = new Configuration(false); + conf.set(DELETE_TRASH_SCHEMAS, ""); + hadoopFileIO = new HadoopFileIO(conf); + assertThat(hadoopFileIO.isTrashSchema(new Path("hdfs:///"))).isFalse(); } @Test public void testDeleteFilesWithTrashEnabled() throws IOException { resetLocalFS(true); - hadoopFileIO = new HadoopFileIO(fs.getConf()); assertThat(hadoopFileIO.isTrashSchema(new Path("file:///"))).isTrue(); @@ -230,6 +250,38 @@ public void testDeleteFilesWithTrashEnabled() throws IOException { }); } + @Test + public void testTrashFailureFallBack() throws Exception { + resetLocalFS(true); + // the filesystem config needs to be modified to use the test trash policy. + final Configuration conf = fs.getConf(); + conf.set("fs.trash.classname", TestTrashPolicy.class.getName()); + // check loading works. + final long instances = TestTrashPolicy.INSTANCES.get(); + final long exceptions = TestTrashPolicy.EXCEPTIONS.get(); + Trash trash = new Trash(conf); + assertThat(trash.isEnabled()).isTrue(); + assertThat(TestTrashPolicy.INSTANCES.get()).isEqualTo(instances + 1); + + // now create the file IO with the same conf. + hadoopFileIO = new HadoopFileIO(conf); + assertThat(hadoopFileIO.isTrashSchema(new Path("file:///"))).isTrue(); + + Path parent = new Path(tempDir.toURI()); + Path path = new Path(parent, "child"); + fs.createNewFile(path); + final String p = path.toUri().toString(); + // this will delete the file, even with the simulated IOE on moveToTrash + hadoopFileIO.deleteFile(p); + assertPathDoesNotExist(p); + assertThat(TestTrashPolicy.INSTANCES.get()) + .describedAs("TestTrashPolicy instantiations") + .isEqualTo(instances + 2); + assertThat(TestTrashPolicy.EXCEPTIONS.get()) + .describedAs("TestTrashPolicy exceptions") + .isEqualTo(exceptions + 1); + } + /** * Verify semantics of a missing file delete are the same with and without trash: no reported * error. @@ -368,4 +420,60 @@ private void assertPathDoesNotExist(String path) { .describedAs("File %s must exist", path) .isFalse(); } + + /** + * Test TrashPolicy. + * Increments the counter {@link #INSTANCES} on every instantiation. + * On a call to {@link #moveToTrash(Path)} it increments the counter + * {@link #EXCEPTIONS} and throws an exception. + */ + private static final class TestTrashPolicy extends TrashPolicy { + private static final AtomicLong INSTANCES = new AtomicLong(); + private static final AtomicLong EXCEPTIONS = new AtomicLong(); + + public TestTrashPolicy() { + INSTANCES.incrementAndGet(); + } + + @Override + public void initialize(Configuration conf, FileSystem fs, Path home) {} + + @Override + public void initialize(Configuration conf, FileSystem fs) {} + + @Override + public boolean isEnabled() { + return true; + } + + @Override + public boolean moveToTrash(Path path) throws IOException { + EXCEPTIONS.incrementAndGet(); + throw new IOException("Simulated failure"); + } + + @Override + public void createCheckpoint() throws IOException {} + + @Override + public void deleteCheckpoint() throws IOException {} + + @Override + public void deleteCheckpointsImmediately() throws IOException {} + + @Override + public Path getCurrentTrashDir() { + return null; + } + + @Override + public Path getCurrentTrashDir(Path path) throws IOException { + return null; + } + + @Override + public Runnable getEmptier() throws IOException { + return null; + } + } } diff --git a/docs/docs/fileio.md b/docs/docs/fileio.md index c1ceb1c23d50..bb5aa4343995 100644 --- a/docs/docs/fileio.md +++ b/docs/docs/fileio.md @@ -46,7 +46,7 @@ The HadoopFileIO implementation can connect to any filesystem for which there is Configuration options for these filesystems are generally through properties set in the file `core-site.xml` or in `spark-defaults.conf` prefixed with `spark.hadoop.` -### Specific options for HadoopFileIO +### Specific options for HadoopFileIO | Property | Default | Description | |-------------------------------------|----------------|------------------------------------------------------------------------------| From ae098fc740ee97aa31ccd721bfc1d2616ecbe70d Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Fri, 23 Jan 2026 17:39:14 +0000 Subject: [PATCH 6/9] checkstyle --- .../test/java/org/apache/iceberg/hadoop/TestHadoopFileIO.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopFileIO.java b/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopFileIO.java index fc036c15f855..eb64a569d9d9 100644 --- a/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopFileIO.java +++ b/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopFileIO.java @@ -431,7 +431,7 @@ private static final class TestTrashPolicy extends TrashPolicy { private static final AtomicLong INSTANCES = new AtomicLong(); private static final AtomicLong EXCEPTIONS = new AtomicLong(); - public TestTrashPolicy() { + private TestTrashPolicy() { INSTANCES.incrementAndGet(); } From 9340a7466ef7325c340bb50286fe27d0ac0cd595 Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Sat, 24 Jan 2026 14:57:17 +0000 Subject: [PATCH 7/9] spotlessness of javadocs. Need to understand the layout rules there. --- .../java/org/apache/iceberg/hadoop/TestHadoopFileIO.java | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopFileIO.java b/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopFileIO.java index eb64a569d9d9..4d431af227b2 100644 --- a/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopFileIO.java +++ b/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopFileIO.java @@ -422,10 +422,9 @@ private void assertPathDoesNotExist(String path) { } /** - * Test TrashPolicy. - * Increments the counter {@link #INSTANCES} on every instantiation. - * On a call to {@link #moveToTrash(Path)} it increments the counter - * {@link #EXCEPTIONS} and throws an exception. + * Test TrashPolicy. Increments the counter {@link #INSTANCES} on every instantiation. On a call + * to {@link #moveToTrash(Path)} it increments the counter {@link #EXCEPTIONS} and throws an + * exception. */ private static final class TestTrashPolicy extends TrashPolicy { private static final AtomicLong INSTANCES = new AtomicLong(); From 382cef63973ba66003991a54389ce7b359a36d94 Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Thu, 29 Jan 2026 14:37:50 +0000 Subject: [PATCH 8/9] Align with bulk delete. * the explicit trash tests clean up their trash paths; this verifies that attempts to delete under .Trash use delete() instead. * user trash root is cleaned up after tests. Side issue: the path is always /user/$USER/.Trash, even on Linux local filesystems. Does that work reliably? --- .../iceberg/hadoop/TestHadoopFileIO.java | 122 ++++++++++-------- 1 file changed, 69 insertions(+), 53 deletions(-) diff --git a/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopFileIO.java b/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopFileIO.java index 4d431af227b2..2e5360847689 100644 --- a/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopFileIO.java +++ b/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopFileIO.java @@ -49,6 +49,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Streams; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; @@ -59,22 +60,49 @@ public class TestHadoopFileIO { private final Random random = new Random(1); - /** A filesystem. */ private FileSystem fs; - private HadoopFileIO hadoopFileIO; + private boolean trashEnabled; @TempDir private File tempDir; - private UserGroupInformation currentUser; @BeforeEach public void before() throws Exception { + resetBinding(false); + } + + /** + * Purge trash as a cleanup operation if the test case created a FS with trash enabled; + * avoids accrual of many empty files in this path. + * Note: this affects the entire user account. + */ + @AfterEach + public void purgeTrash() throws IOException { + if (trashEnabled) { + fs.delete(fs.getTrashRoot(new Path(tempDir.toURI())), true); + } + } + + /** + * Resets fs and hadoopFileIO fields to a configuration built from the supplied settings. + * + * @param useTrash enable trash settings + * @throws UncheckedIOException on failures to create a new FS. + */ + private void resetBinding(boolean useTrash) { Configuration conf = new Configuration(); - // remove all filesystems for the current user from cache. - // The next call to FileSystem.get("file://") will get one with the supplied config - currentUser = UserGroupInformation.getCurrentUser(); - resetLocalFS(false); - hadoopFileIO = new HadoopFileIO(conf); + trashEnabled = useTrash; + if (useTrash) { + conf.set(FS_TRASH_INTERVAL_KEY, "60"); + conf.set(DELETE_TRASH_SCHEMAS, " file , hdfs, viewfs"); + } + try { + FileSystem.closeAllForUGI(UserGroupInformation.getCurrentUser()); + fs = FileSystem.getLocal(conf); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + hadoopFileIO = new HadoopFileIO(fs.getConf()); } @Test @@ -140,10 +168,8 @@ public void testDeletePrefix() { } @Test - public void testDeletePrefixWithTrashEnabled() throws IOException { - resetLocalFS(true); - - hadoopFileIO = new HadoopFileIO(fs.getConf()); + public void testDeletePrefixWithTrashEnabled() { + resetBinding(true); Path parent = new Path(tempDir.toURI()); List scaleSizes = Lists.newArrayList(1, 1000, 2500); @@ -169,6 +195,8 @@ public void testDeletePrefixWithTrashEnabled() throws IOException { String trashPath = fs.getTrashRoot(scalePath).toString() + "/Current" + fileSuffix; assertPathExists(trashPath); + // delete that path. As it is in trash, it gets deleted. + hadoopFileIO.deleteFile(trashPath); }); }); @@ -179,24 +207,6 @@ public void testDeletePrefixWithTrashEnabled() throws IOException { .hasMessageContaining("java.io.FileNotFoundException"); } - /** - * Closes active filesystems then creates a new filesystem with the chosen trash policy and - * updates the {@code fs} field with it. This guarantees that HadoopFileIO's getFileSystem() calls - * will get the same filesystem and its configuration settings. - * - * @param useTrash enable trash settings - * @throws IOException failure to create. - */ - private void resetLocalFS(boolean useTrash) throws IOException { - Configuration conf = new Configuration(); - if (useTrash) { - conf.set(FS_TRASH_INTERVAL_KEY, "60"); - conf.set(DELETE_TRASH_SCHEMAS, " file , hdfs, viewfs"); - } - FileSystem.closeAllForUGI(currentUser); - fs = FileSystem.getLocal(conf); - } - @Test public void testDeleteFiles() { Path parent = new Path(tempDir.toURI()); @@ -204,7 +214,7 @@ public void testDeleteFiles() { hadoopFileIO.deleteFiles( filesCreated.stream().map(Path::toString).collect(Collectors.toList())); filesCreated.forEach( - file -> assertThat(hadoopFileIO.newInputFile(file.toString()).exists()).isFalse()); + file -> assertPathDoesNotExist(file.toString())); } @Test @@ -231,8 +241,8 @@ public void testRemoveTrashSchemas() { } @Test - public void testDeleteFilesWithTrashEnabled() throws IOException { - resetLocalFS(true); + public void testDeleteFilesWithTrashEnabled() { + resetBinding(true); hadoopFileIO = new HadoopFileIO(fs.getConf()); assertThat(hadoopFileIO.isTrashSchema(new Path("file:///"))).isTrue(); @@ -247,12 +257,19 @@ public void testDeleteFilesWithTrashEnabled() throws IOException { String fileSuffix = Path.getPathWithoutSchemeAndAuthority(file).toString(); String trashPath = fs.getTrashRoot(parent).toString() + "/Current" + fileSuffix; assertPathExists(trashPath); + // delete that path. As it is in trash, it gets deleted. + hadoopFileIO.deleteFile(trashPath); }); } + /** + * Use a trash policy which raises an exception when moving a file to trash; + * verify that deleteFile() falls back to delete. Various counters are + * checked simply to verify that the failing trash policy was invoked. + */ @Test public void testTrashFailureFallBack() throws Exception { - resetLocalFS(true); + resetBinding(true); // the filesystem config needs to be modified to use the test trash policy. final Configuration conf = fs.getConf(); conf.set("fs.trash.classname", TestTrashPolicy.class.getName()); @@ -290,9 +307,8 @@ public void testTrashFailureFallBack() throws Exception { */ @ParameterizedTest @ValueSource(booleans = {true, false}) - public void testDeleteMissingFileToTrash(boolean useTrash) throws IOException { - resetLocalFS(useTrash); - hadoopFileIO = new HadoopFileIO(fs.getConf()); + public void testDeleteMissingFileToTrash(boolean useTrash) { + resetBinding(useTrash); Path path = new Path(new Path(tempDir.toURI()), "missing"); final String missing = path.toUri().toString(); hadoopFileIO.deleteFile(missing); @@ -325,15 +341,15 @@ public void testHadoopFileIOSerialization( @Test public void testResolvingFileIOLoad() { - ResolvingFileIO resolvingFileIO = new ResolvingFileIO(); - resolvingFileIO.setConf(new Configuration()); - resolvingFileIO.initialize(ImmutableMap.of()); - FileIO result = - DynMethods.builder("io") - .hiddenImpl(ResolvingFileIO.class, String.class) - .build(resolvingFileIO) - .invoke("hdfs://foo/bar"); - assertThat(result).isInstanceOf(HadoopFileIO.class); + try (ResolvingFileIO resolvingFileIO = new ResolvingFileIO()) { + resolvingFileIO.setConf(new Configuration()); + resolvingFileIO.initialize(ImmutableMap.of()); + FileIO result = DynMethods.builder("io") + .hiddenImpl(ResolvingFileIO.class, String.class) + .build(resolvingFileIO) + .invoke("hdfs://foo/bar"); + assertThat(result).isInstanceOf(HadoopFileIO.class); + } } @Test @@ -435,10 +451,10 @@ private TestTrashPolicy() { } @Override - public void initialize(Configuration conf, FileSystem fs, Path home) {} + public void initialize(Configuration conf, FileSystem filesystem, Path home) {} @Override - public void initialize(Configuration conf, FileSystem fs) {} + public void initialize(Configuration conf, FileSystem filesystem) {} @Override public boolean isEnabled() { @@ -452,13 +468,13 @@ public boolean moveToTrash(Path path) throws IOException { } @Override - public void createCheckpoint() throws IOException {} + public void createCheckpoint() {} @Override - public void deleteCheckpoint() throws IOException {} + public void deleteCheckpoint() {} @Override - public void deleteCheckpointsImmediately() throws IOException {} + public void deleteCheckpointsImmediately() {} @Override public Path getCurrentTrashDir() { @@ -466,12 +482,12 @@ public Path getCurrentTrashDir() { } @Override - public Path getCurrentTrashDir(Path path) throws IOException { + public Path getCurrentTrashDir(Path path) { return null; } @Override - public Runnable getEmptier() throws IOException { + public Runnable getEmptier() { return null; } } From fa6d4833f26cc40fbccfc1fac66970babafdeb97 Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Fri, 30 Jan 2026 18:17:44 +0000 Subject: [PATCH 9/9] spotless on comment layout. --- .../iceberg/hadoop/TestHadoopFileIO.java | 39 +++++++++---------- 1 file changed, 19 insertions(+), 20 deletions(-) diff --git a/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopFileIO.java b/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopFileIO.java index 2e5360847689..fc736f20db61 100644 --- a/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopFileIO.java +++ b/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopFileIO.java @@ -72,15 +72,14 @@ public void before() throws Exception { } /** - * Purge trash as a cleanup operation if the test case created a FS with trash enabled; - * avoids accrual of many empty files in this path. - * Note: this affects the entire user account. + * Purge trash as a cleanup operation if the test case created a FS with trash enabled; avoids + * accrual of many empty files in this path. Note: this affects the entire user account. */ @AfterEach public void purgeTrash() throws IOException { - if (trashEnabled) { - fs.delete(fs.getTrashRoot(new Path(tempDir.toURI())), true); - } + if (trashEnabled) { + fs.delete(fs.getTrashRoot(new Path(tempDir.toURI())), true); + } } /** @@ -213,8 +212,7 @@ public void testDeleteFiles() { List filesCreated = createRandomFiles(parent, 10); hadoopFileIO.deleteFiles( filesCreated.stream().map(Path::toString).collect(Collectors.toList())); - filesCreated.forEach( - file -> assertPathDoesNotExist(file.toString())); + filesCreated.forEach(file -> assertPathDoesNotExist(file.toString())); } @Test @@ -263,9 +261,9 @@ public void testDeleteFilesWithTrashEnabled() { } /** - * Use a trash policy which raises an exception when moving a file to trash; - * verify that deleteFile() falls back to delete. Various counters are - * checked simply to verify that the failing trash policy was invoked. + * Use a trash policy which raises an exception when moving a file to trash; verify that + * deleteFile() falls back to delete. Various counters are checked simply to verify that the + * failing trash policy was invoked. */ @Test public void testTrashFailureFallBack() throws Exception { @@ -341,15 +339,16 @@ public void testHadoopFileIOSerialization( @Test public void testResolvingFileIOLoad() { - try (ResolvingFileIO resolvingFileIO = new ResolvingFileIO()) { - resolvingFileIO.setConf(new Configuration()); - resolvingFileIO.initialize(ImmutableMap.of()); - FileIO result = DynMethods.builder("io") - .hiddenImpl(ResolvingFileIO.class, String.class) - .build(resolvingFileIO) - .invoke("hdfs://foo/bar"); - assertThat(result).isInstanceOf(HadoopFileIO.class); - } + try (ResolvingFileIO resolvingFileIO = new ResolvingFileIO()) { + resolvingFileIO.setConf(new Configuration()); + resolvingFileIO.initialize(ImmutableMap.of()); + FileIO result = + DynMethods.builder("io") + .hiddenImpl(ResolvingFileIO.class, String.class) + .build(resolvingFileIO) + .invoke("hdfs://foo/bar"); + assertThat(result).isInstanceOf(HadoopFileIO.class); + } } @Test