Skip to content
65 changes: 56 additions & 9 deletions core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,16 @@
import java.util.function.Function;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.Trash;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.io.BulkDeletionFailureException;
import org.apache.iceberg.io.DelegateFileIO;
import org.apache.iceberg.io.FileInfo;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Streams;
import org.apache.iceberg.util.SerializableMap;
Expand All @@ -53,6 +52,8 @@ public class HadoopFileIO implements HadoopConfigurable, DelegateFileIO {
private static final Logger LOG = LoggerFactory.getLogger(HadoopFileIO.class);
private static final String DELETE_FILE_PARALLELISM = "iceberg.hadoop.delete-file-parallelism";
private static final String DELETE_FILE_POOL_NAME = "iceberg-hadoopfileio-delete";
public static final String DELETE_TRASH_SCHEMAS = "iceberg.hadoop.delete-trash-schemas";
private static final String[] DEFAULT_TRASH_SCHEMAS = {"hdfs", "viewfs"};
private static final int DELETE_RETRY_ATTEMPTS = 3;
private static final int DEFAULT_DELETE_CORE_MULTIPLE = 4;
private static volatile ExecutorService executorService;
Expand Down Expand Up @@ -107,7 +108,7 @@ public void deleteFile(String path) {
try {
deletePath(fs, toDelete, false);
} catch (IOException e) {
throw new RuntimeIOException(e, "Failed to delete file: %s", path);
throw new RuntimeIOException(e, "Failed to delete file: %s: %s", path, e.toString());
}
}

Expand Down Expand Up @@ -214,14 +215,60 @@ private ExecutorService executorService() {
return executorService;
}

/**
* Is a path in a schema of a filesystem where the hadoop trash policy should be used to move to
* it to trash?
*
* @param toDelete path to delete
* @return true if the path is in the list of schemas for which the
*/
@VisibleForTesting
boolean isTrashSchema(Path toDelete) {
final String scheme = toDelete.toUri().getScheme();
for (String s : getConf().getTrimmedStrings(DELETE_TRASH_SCHEMAS, DEFAULT_TRASH_SCHEMAS)) {
if (s.equalsIgnoreCase(scheme)) {
return true;
}
}
return false;
}

/**
* Delete a path.
*
* <p>If the filesystem is in the trash schemas, it will attempt to move it to trash. If there is
* any failure to move to trash then the fallback is to delete the path. As a result: when this
* operation returns there will not be a file/dir at the target path.
*
* @param fs target filesystem.
* @param toDelete path to delete/move
* @param recursive should the delete operation be recursive?
* @throws IOException on a delete failure.
*/
private void deletePath(FileSystem fs, Path toDelete, boolean recursive) throws IOException {
Trash trash = new Trash(fs, getConf());
if ((fs instanceof LocalFileSystem || fs instanceof DistributedFileSystem)
&& trash.isEnabled()) {
trash.moveToTrash(toDelete);
} else {
fs.delete(toDelete, recursive);
if (isTrashSchema(toDelete)) {
try {
// use moveToAppropriateTrash() which not only resolves through mounted filesystems like
// viewfs,
// it will query the resolved filesystem for its trash policy.
// The HDFS client will ask the remote server for its configuration, rather than
// what the client is configured with.
if (Trash.moveToAppropriateTrash(fs, toDelete, fs.getConf())) {
// trash enabled and operation successful.
return;
}
} catch (FileNotFoundException e) {
// the source file is missing. Nothing to do.
return;
} catch (IOException e) {
// other failure (failure to get remote server trash policy, rename,...)
// log one line at info, full stack at debug, so a failure to move many files to trash
// doesn't flood the log
LOG.info("Failed to move {} to trash: {}", toDelete, e.toString());
LOG.debug("Trash.moveToAppropriateTrash failure", e);
}
}
fs.delete(toDelete, recursive);
}

/**
Expand Down
Loading