diff --git a/core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java b/core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java index 3740f0abda88..006c072db06c 100644 --- a/core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java +++ b/core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java @@ -28,17 +28,16 @@ import java.util.function.Function; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.fs.Trash; -import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.iceberg.exceptions.RuntimeIOException; import org.apache.iceberg.io.BulkDeletionFailureException; import org.apache.iceberg.io.DelegateFileIO; import org.apache.iceberg.io.FileInfo; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Streams; import org.apache.iceberg.util.SerializableMap; @@ -53,6 +52,8 @@ public class HadoopFileIO implements HadoopConfigurable, DelegateFileIO { private static final Logger LOG = LoggerFactory.getLogger(HadoopFileIO.class); private static final String DELETE_FILE_PARALLELISM = "iceberg.hadoop.delete-file-parallelism"; private static final String DELETE_FILE_POOL_NAME = "iceberg-hadoopfileio-delete"; + public static final String DELETE_TRASH_SCHEMAS = "iceberg.hadoop.delete-trash-schemas"; + private static final String[] DEFAULT_TRASH_SCHEMAS = {"hdfs", "viewfs"}; private static final int DELETE_RETRY_ATTEMPTS = 3; private static final int DEFAULT_DELETE_CORE_MULTIPLE = 4; private static volatile ExecutorService executorService; @@ -107,7 +108,7 @@ public void deleteFile(String path) { try { deletePath(fs, toDelete, false); } catch (IOException e) { - throw new RuntimeIOException(e, "Failed to delete file: %s", path); + throw new RuntimeIOException(e, "Failed to delete file: %s: %s", path, e.toString()); } } @@ -214,14 +215,60 @@ private ExecutorService executorService() { return executorService; } + /** + * Is a path in a schema of a filesystem where the hadoop trash policy should be used to move to + * it to trash? + * + * @param toDelete path to delete + * @return true if the path is in the list of schemas for which the + */ + @VisibleForTesting + boolean isTrashSchema(Path toDelete) { + final String scheme = toDelete.toUri().getScheme(); + for (String s : getConf().getTrimmedStrings(DELETE_TRASH_SCHEMAS, DEFAULT_TRASH_SCHEMAS)) { + if (s.equalsIgnoreCase(scheme)) { + return true; + } + } + return false; + } + + /** + * Delete a path. + * + *

If the filesystem is in the trash schemas, it will attempt to move it to trash. If there is + * any failure to move to trash then the fallback is to delete the path. As a result: when this + * operation returns there will not be a file/dir at the target path. + * + * @param fs target filesystem. + * @param toDelete path to delete/move + * @param recursive should the delete operation be recursive? + * @throws IOException on a delete failure. + */ private void deletePath(FileSystem fs, Path toDelete, boolean recursive) throws IOException { - Trash trash = new Trash(fs, getConf()); - if ((fs instanceof LocalFileSystem || fs instanceof DistributedFileSystem) - && trash.isEnabled()) { - trash.moveToTrash(toDelete); - } else { - fs.delete(toDelete, recursive); + if (isTrashSchema(toDelete)) { + try { + // use moveToAppropriateTrash() which not only resolves through mounted filesystems like + // viewfs, + // it will query the resolved filesystem for its trash policy. + // The HDFS client will ask the remote server for its configuration, rather than + // what the client is configured with. + if (Trash.moveToAppropriateTrash(fs, toDelete, fs.getConf())) { + // trash enabled and operation successful. + return; + } + } catch (FileNotFoundException e) { + // the source file is missing. Nothing to do. + return; + } catch (IOException e) { + // other failure (failure to get remote server trash policy, rename,...) + // log one line at info, full stack at debug, so a failure to move many files to trash + // doesn't flood the log + LOG.info("Failed to move {} to trash: {}", toDelete, e.toString()); + LOG.debug("Trash.moveToAppropriateTrash failure", e); + } } + fs.delete(toDelete, recursive); } /** diff --git a/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopFileIO.java b/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopFileIO.java index b459c7e062a0..fc736f20db61 100644 --- a/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopFileIO.java +++ b/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopFileIO.java @@ -19,10 +19,12 @@ package org.apache.iceberg.hadoop; import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_KEY; +import static org.apache.iceberg.hadoop.HadoopFileIO.DELETE_TRASH_SCHEMAS; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import java.io.File; +import java.io.FileNotFoundException; import java.io.IOException; import java.io.UncheckedIOException; import java.nio.file.Files; @@ -30,10 +32,14 @@ import java.util.Random; import java.util.UUID; import java.util.Vector; +import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.Trash; +import org.apache.hadoop.fs.TrashPolicy; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.iceberg.TestHelpers; import org.apache.iceberg.common.DynMethods; import org.apache.iceberg.io.BulkDeletionFailureException; @@ -43,26 +49,59 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Streams; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.MethodSource; +import org.junit.jupiter.params.provider.ValueSource; public class TestHadoopFileIO { private final Random random = new Random(1); private FileSystem fs; private HadoopFileIO hadoopFileIO; + private boolean trashEnabled; @TempDir private File tempDir; @BeforeEach public void before() throws Exception { - Configuration conf = new Configuration(); - fs = FileSystem.getLocal(conf); + resetBinding(false); + } - hadoopFileIO = new HadoopFileIO(conf); + /** + * Purge trash as a cleanup operation if the test case created a FS with trash enabled; avoids + * accrual of many empty files in this path. Note: this affects the entire user account. + */ + @AfterEach + public void purgeTrash() throws IOException { + if (trashEnabled) { + fs.delete(fs.getTrashRoot(new Path(tempDir.toURI())), true); + } + } + + /** + * Resets fs and hadoopFileIO fields to a configuration built from the supplied settings. + * + * @param useTrash enable trash settings + * @throws UncheckedIOException on failures to create a new FS. + */ + private void resetBinding(boolean useTrash) { + Configuration conf = new Configuration(); + trashEnabled = useTrash; + if (useTrash) { + conf.set(FS_TRASH_INTERVAL_KEY, "60"); + conf.set(DELETE_TRASH_SCHEMAS, " file , hdfs, viewfs"); + } + try { + FileSystem.closeAllForUGI(UserGroupInformation.getCurrentUser()); + fs = FileSystem.getLocal(conf); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + hadoopFileIO = new HadoopFileIO(fs.getConf()); } @Test @@ -128,12 +167,8 @@ public void testDeletePrefix() { } @Test - public void testDeletePrefixWithTrashEnabled() throws IOException { - Configuration conf = new Configuration(); - conf.set(FS_TRASH_INTERVAL_KEY, "60"); - fs = FileSystem.getLocal(conf); - - hadoopFileIO = new HadoopFileIO(conf); + public void testDeletePrefixWithTrashEnabled() { + resetBinding(true); Path parent = new Path(tempDir.toURI()); List scaleSizes = Lists.newArrayList(1, 1000, 2500); @@ -150,13 +185,17 @@ public void testDeletePrefixWithTrashEnabled() throws IOException { assertThatThrownBy( () -> hadoopFileIO.listPrefix(scalePath.toUri().toString()).iterator()) .isInstanceOf(UncheckedIOException.class) - .hasMessageContaining("java.io.FileNotFoundException"); + .hasMessageContaining("java.io.FileNotFoundException") + .cause() + .isInstanceOf(FileNotFoundException.class); filesCreated.forEach( file -> { String fileSuffix = Path.getPathWithoutSchemeAndAuthority(file).toString(); String trashPath = fs.getTrashRoot(scalePath).toString() + "/Current" + fileSuffix; - assertThat(hadoopFileIO.newInputFile(trashPath).exists()).isTrue(); + assertPathExists(trashPath); + // delete that path. As it is in trash, it gets deleted. + hadoopFileIO.deleteFile(trashPath); }); }); @@ -173,17 +212,38 @@ public void testDeleteFiles() { List filesCreated = createRandomFiles(parent, 10); hadoopFileIO.deleteFiles( filesCreated.stream().map(Path::toString).collect(Collectors.toList())); - filesCreated.forEach( - file -> assertThat(hadoopFileIO.newInputFile(file.toString()).exists()).isFalse()); + filesCreated.forEach(file -> assertPathDoesNotExist(file.toString())); } @Test - public void testDeleteFilesWithTrashEnabled() throws IOException { - Configuration conf = new Configuration(); - conf.set(FS_TRASH_INTERVAL_KEY, "60"); - fs = FileSystem.getLocal(conf); + public void testDefaultTrashSchemas() { + // hdfs is a default trash schema; viewfs is, file isn't + assertThat(hadoopFileIO.isTrashSchema(new Path("hdfs:///"))) + .describedAs("hdfs schema") + .isTrue(); + assertThat(hadoopFileIO.isTrashSchema(new Path("viewfs:///"))) + .describedAs("viewfs schema") + .isTrue(); + assertThat(hadoopFileIO.isTrashSchema(new Path("file:///"))) + .describedAs("file schema") + .isFalse(); + } + @Test + public void testRemoveTrashSchemas() { + // set the schema list to "" and verify that the default values are gone + final Configuration conf = new Configuration(false); + conf.set(DELETE_TRASH_SCHEMAS, ""); hadoopFileIO = new HadoopFileIO(conf); + assertThat(hadoopFileIO.isTrashSchema(new Path("hdfs:///"))).isFalse(); + } + + @Test + public void testDeleteFilesWithTrashEnabled() { + resetBinding(true); + hadoopFileIO = new HadoopFileIO(fs.getConf()); + assertThat(hadoopFileIO.isTrashSchema(new Path("file:///"))).isTrue(); + Path parent = new Path(tempDir.toURI()); List filesCreated = createRandomFiles(parent, 10); hadoopFileIO.deleteFiles( @@ -194,10 +254,65 @@ public void testDeleteFilesWithTrashEnabled() throws IOException { file -> { String fileSuffix = Path.getPathWithoutSchemeAndAuthority(file).toString(); String trashPath = fs.getTrashRoot(parent).toString() + "/Current" + fileSuffix; - assertThat(hadoopFileIO.newInputFile(trashPath).exists()).isTrue(); + assertPathExists(trashPath); + // delete that path. As it is in trash, it gets deleted. + hadoopFileIO.deleteFile(trashPath); }); } + /** + * Use a trash policy which raises an exception when moving a file to trash; verify that + * deleteFile() falls back to delete. Various counters are checked simply to verify that the + * failing trash policy was invoked. + */ + @Test + public void testTrashFailureFallBack() throws Exception { + resetBinding(true); + // the filesystem config needs to be modified to use the test trash policy. + final Configuration conf = fs.getConf(); + conf.set("fs.trash.classname", TestTrashPolicy.class.getName()); + // check loading works. + final long instances = TestTrashPolicy.INSTANCES.get(); + final long exceptions = TestTrashPolicy.EXCEPTIONS.get(); + Trash trash = new Trash(conf); + assertThat(trash.isEnabled()).isTrue(); + assertThat(TestTrashPolicy.INSTANCES.get()).isEqualTo(instances + 1); + + // now create the file IO with the same conf. + hadoopFileIO = new HadoopFileIO(conf); + assertThat(hadoopFileIO.isTrashSchema(new Path("file:///"))).isTrue(); + + Path parent = new Path(tempDir.toURI()); + Path path = new Path(parent, "child"); + fs.createNewFile(path); + final String p = path.toUri().toString(); + // this will delete the file, even with the simulated IOE on moveToTrash + hadoopFileIO.deleteFile(p); + assertPathDoesNotExist(p); + assertThat(TestTrashPolicy.INSTANCES.get()) + .describedAs("TestTrashPolicy instantiations") + .isEqualTo(instances + 2); + assertThat(TestTrashPolicy.EXCEPTIONS.get()) + .describedAs("TestTrashPolicy exceptions") + .isEqualTo(exceptions + 1); + } + + /** + * Verify semantics of a missing file delete are the same with and without trash: no reported + * error. + * + * @param useTrash use trash in the FS. + */ + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testDeleteMissingFileToTrash(boolean useTrash) { + resetBinding(useTrash); + Path path = new Path(new Path(tempDir.toURI()), "missing"); + final String missing = path.toUri().toString(); + hadoopFileIO.deleteFile(missing); + assertPathDoesNotExist(missing); + } + @Test public void testDeleteFilesErrorHandling() { List filesCreated = @@ -224,15 +339,16 @@ public void testHadoopFileIOSerialization( @Test public void testResolvingFileIOLoad() { - ResolvingFileIO resolvingFileIO = new ResolvingFileIO(); - resolvingFileIO.setConf(new Configuration()); - resolvingFileIO.initialize(ImmutableMap.of()); - FileIO result = - DynMethods.builder("io") - .hiddenImpl(ResolvingFileIO.class, String.class) - .build(resolvingFileIO) - .invoke("hdfs://foo/bar"); - assertThat(result).isInstanceOf(HadoopFileIO.class); + try (ResolvingFileIO resolvingFileIO = new ResolvingFileIO()) { + resolvingFileIO.setConf(new Configuration()); + resolvingFileIO.initialize(ImmutableMap.of()); + FileIO result = + DynMethods.builder("io") + .hiddenImpl(ResolvingFileIO.class, String.class) + .build(resolvingFileIO) + .invoke("hdfs://foo/bar"); + assertThat(result).isInstanceOf(HadoopFileIO.class); + } } @Test @@ -297,4 +413,81 @@ private List createRandomFiles(Path parent, int count) { }); return paths; } + + /** + * Assert a path exists. + * + * @param path URI to file/dir. + */ + private void assertPathExists(String path) { + assertThat(hadoopFileIO.newInputFile(path).exists()) + .describedAs("File %s must exist", path) + .isTrue(); + } + + /** + * Assert a path does not exist. + * + * @param path URI to file/dir. + */ + private void assertPathDoesNotExist(String path) { + assertThat(hadoopFileIO.newInputFile(path).exists()) + .describedAs("File %s must exist", path) + .isFalse(); + } + + /** + * Test TrashPolicy. Increments the counter {@link #INSTANCES} on every instantiation. On a call + * to {@link #moveToTrash(Path)} it increments the counter {@link #EXCEPTIONS} and throws an + * exception. + */ + private static final class TestTrashPolicy extends TrashPolicy { + private static final AtomicLong INSTANCES = new AtomicLong(); + private static final AtomicLong EXCEPTIONS = new AtomicLong(); + + private TestTrashPolicy() { + INSTANCES.incrementAndGet(); + } + + @Override + public void initialize(Configuration conf, FileSystem filesystem, Path home) {} + + @Override + public void initialize(Configuration conf, FileSystem filesystem) {} + + @Override + public boolean isEnabled() { + return true; + } + + @Override + public boolean moveToTrash(Path path) throws IOException { + EXCEPTIONS.incrementAndGet(); + throw new IOException("Simulated failure"); + } + + @Override + public void createCheckpoint() {} + + @Override + public void deleteCheckpoint() {} + + @Override + public void deleteCheckpointsImmediately() {} + + @Override + public Path getCurrentTrashDir() { + return null; + } + + @Override + public Path getCurrentTrashDir(Path path) { + return null; + } + + @Override + public Runnable getEmptier() { + return null; + } + } } diff --git a/docs/docs/fileio.md b/docs/docs/fileio.md index 0062219a49bd..bb5aa4343995 100644 --- a/docs/docs/fileio.md +++ b/docs/docs/fileio.md @@ -34,7 +34,20 @@ The responsibility of reading and writing data files lies with the processing en Different FileIO implementations are used depending on the type of storage. Iceberg comes with a set of FileIO implementations for popular storage providers. - Amazon S3 +- Azure Data Lake Storage Gen2 - Google Cloud Storage - Object Service Storage (including https) - Dell Enterprise Cloud Storage - Hadoop (adapts any Hadoop FileSystem implementation) + +## HadoopFileIO + +The HadoopFileIO implementation can connect to any filesystem for which there is a Hadoop filesystem client on the classpath. + +Configuration options for these filesystems are generally through properties set in the file `core-site.xml` or in `spark-defaults.conf` prefixed with `spark.hadoop.` + +### Specific options for HadoopFileIO + +| Property | Default | Description | +|-------------------------------------|----------------|------------------------------------------------------------------------------| +| iceberg.hadoop.delete-trash-schemas | "hdfs, viewfs" | Filesystem schemas where the trash feature is to be used when deleting files |