Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 15 additions & 2 deletions core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,11 @@
import java.util.function.Function;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.Trash;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.io.BulkDeletionFailureException;
import org.apache.iceberg.io.DelegateFileIO;
Expand Down Expand Up @@ -102,7 +105,7 @@ public void deleteFile(String path) {
Path toDelete = new Path(path);
FileSystem fs = Util.getFs(toDelete, getConf());
try {
fs.delete(toDelete, false /* not recursive */);
deletePath(fs, toDelete, false);
} catch (IOException e) {
throw new RuntimeIOException(e, "Failed to delete file: %s", path);
}
Expand Down Expand Up @@ -167,7 +170,7 @@ public void deletePrefix(String prefix) {
FileSystem fs = Util.getFs(prefixToDelete, getConf());

try {
fs.delete(prefixToDelete, true /* recursive */);
deletePath(fs, prefixToDelete, true);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
Expand Down Expand Up @@ -211,6 +214,16 @@ private ExecutorService executorService() {
return executorService;
}

private void deletePath(FileSystem fs, Path toDelete, boolean recursive) throws IOException {
Trash trash = new Trash(fs, getConf());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm concerned about number of Trash objects we create. Does Hadoop API ensure we can reuse the trash object for a given (fs, conf)?
I couldn't tell from https://hadoop.apache.org/docs/stable/api/org/apache/hadoop/fs/Trash.html#Trash-org.apache.hadoop.fs.FileSystem-org.apache.hadoop.conf.Configuration-

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a good call. I've added a new toggle that can be put in the hadoop configuration to determine if we want to use the trash for iceberg, following Russel Spitzer's example in other HadoopFileIO changes.

I've taken a look regarding object reuse. The trash can change due to lots of changes in configuration (meaning I'd have to create a cache based on 5+ configuration values which are susceptible to change in the future), unlike the file system (Key doesn't actually rely on conf, just relies on the URI and user group information). With that being said, the change that I made to check for hadoop configuration first makes it so that we don't create the Trash object unless specifically opted into. I hope that this is good enough for now - an iceberg user will now have to opt into this change to experience any possible object churn.

if ((fs instanceof LocalFileSystem || fs instanceof DistributedFileSystem)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about ViewFileSystem?

&& trash.isEnabled()) {
trash.moveToTrash(toDelete);
} else {
fs.delete(toDelete, recursive);
}
}

/**
* This class is a simple adaptor to allow for using Hadoop's RemoteIterator as an Iterator.
*
Expand Down
62 changes: 62 additions & 0 deletions core/src/test/java/org/apache/iceberg/hadoop/TestHadoopFileIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.iceberg.hadoop;

import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_KEY;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

Expand Down Expand Up @@ -126,6 +127,46 @@ public void testDeletePrefix() {
.hasMessageContaining("java.io.FileNotFoundException");
}

@Test
public void testDeletePrefixWithTrashEnabled() throws IOException {
Configuration conf = new Configuration();
conf.set(FS_TRASH_INTERVAL_KEY, "60");
fs = FileSystem.getLocal(conf);

hadoopFileIO = new HadoopFileIO(conf);
Path parent = new Path(tempDir.toURI());

List<Integer> scaleSizes = Lists.newArrayList(1, 1000, 2500);

scaleSizes.parallelStream()
.forEach(
scale -> {
Path scalePath = new Path(parent, Integer.toString(scale));

List<Path> filesCreated = createRandomFiles(scalePath, scale);
hadoopFileIO.deletePrefix(scalePath.toUri().toString());

// Hadoop filesystem will throw if the path does not exist
assertThatThrownBy(
() -> hadoopFileIO.listPrefix(scalePath.toUri().toString()).iterator())
.isInstanceOf(UncheckedIOException.class)
.hasMessageContaining("java.io.FileNotFoundException");
filesCreated.forEach(
file -> {
String fileSuffix = Path.getPathWithoutSchemeAndAuthority(file).toString();
String trashPath =
fs.getTrashRoot(scalePath).toString() + "/Current" + fileSuffix;
assertThat(hadoopFileIO.newInputFile(trashPath).exists()).isTrue();
});
});

hadoopFileIO.deletePrefix(parent.toUri().toString());
// Hadoop filesystem will throw if the path does not exist
assertThatThrownBy(() -> hadoopFileIO.listPrefix(parent.toUri().toString()).iterator())
.isInstanceOf(UncheckedIOException.class)
.hasMessageContaining("java.io.FileNotFoundException");
}

@Test
public void testDeleteFiles() {
Path parent = new Path(tempDir.toURI());
Expand All @@ -136,6 +177,27 @@ public void testDeleteFiles() {
file -> assertThat(hadoopFileIO.newInputFile(file.toString()).exists()).isFalse());
}

@Test
public void testDeleteFilesWithTrashEnabled() throws IOException {
Configuration conf = new Configuration();
conf.set(FS_TRASH_INTERVAL_KEY, "60");
fs = FileSystem.getLocal(conf);

hadoopFileIO = new HadoopFileIO(conf);
Path parent = new Path(tempDir.toURI());
List<Path> filesCreated = createRandomFiles(parent, 10);
hadoopFileIO.deleteFiles(
filesCreated.stream().map(Path::toString).collect(Collectors.toList()));
filesCreated.forEach(
file -> assertThat(hadoopFileIO.newInputFile(file.toString()).exists()).isFalse());
filesCreated.forEach(
file -> {
String fileSuffix = Path.getPathWithoutSchemeAndAuthority(file).toString();
String trashPath = fs.getTrashRoot(parent).toString() + "/Current" + fileSuffix;
assertThat(hadoopFileIO.newInputFile(trashPath).exists()).isTrue();
});
}

@Test
public void testDeleteFilesErrorHandling() {
List<String> filesCreated =
Expand Down