diff --git a/ice/src/main/java/com/altinity/ice/cli/internal/cmd/Delete.java b/ice/src/main/java/com/altinity/ice/cli/internal/cmd/Delete.java index 9d773cc..63af0bd 100644 --- a/ice/src/main/java/com/altinity/ice/cli/internal/cmd/Delete.java +++ b/ice/src/main/java/com/altinity/ice/cli/internal/cmd/Delete.java @@ -9,17 +9,24 @@ */ package com.altinity.ice.cli.internal.cmd; +import com.altinity.ice.cli.Main.PartitionFilter; import java.io.IOException; import java.net.URISyntaxException; import java.util.ArrayList; import java.util.List; +import java.util.Map; import org.apache.iceberg.DataFile; -import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.ManifestFile; +import org.apache.iceberg.ManifestFiles; +import org.apache.iceberg.ManifestReader; +import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.RewriteFiles; +import org.apache.iceberg.Snapshot; import org.apache.iceberg.Table; -import org.apache.iceberg.TableScan; import org.apache.iceberg.catalog.TableIdentifier; -import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.io.FileIO; import org.apache.iceberg.rest.RESTCatalog; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,30 +40,52 @@ private Delete() {} public static void run( RESTCatalog catalog, TableIdentifier tableId, - List partitions, + List partitions, boolean dryRun) throws IOException, URISyntaxException { Table table = catalog.loadTable(tableId); - TableScan scan = table.newScan(); - if (partitions != null && !partitions.isEmpty()) { - org.apache.iceberg.expressions.Expression expr = null; - for (com.altinity.ice.cli.Main.PartitionFilter pf : partitions) { - org.apache.iceberg.expressions.Expression e = null; + + Snapshot currentSnapshot = table.currentSnapshot(); + if (currentSnapshot == null) { + logger.error("There are no snapshots in this table"); + return; + } + + FileIO io = table.io(); + Map specsById = table.specs(); + + List dataManifests = currentSnapshot.dataManifests(io); + List filesToDelete = new ArrayList<>(); + + Expression expression = null; + + if (partitions != null) { + for (PartitionFilter pf : partitions) { + String fieldName = pf.name(); + + Expression fieldExpr = null; for (Object value : pf.values()) { - org.apache.iceberg.expressions.Expression valueExpr = - org.apache.iceberg.expressions.Expressions.equal(pf.name(), value); - e = (e == null) ? valueExpr : org.apache.iceberg.expressions.Expressions.or(e, valueExpr); + Expression singleValueExpr = Expressions.equal(fieldName, value); + fieldExpr = + fieldExpr == null ? singleValueExpr : Expressions.or(fieldExpr, singleValueExpr); + } + if (fieldExpr == null) { + continue; } - expr = (expr == null) ? e : org.apache.iceberg.expressions.Expressions.and(expr, e); + expression = expression == null ? fieldExpr : Expressions.and(expression, fieldExpr); } - scan = scan.filter(expr); } - List filesToDelete = new ArrayList<>(); - try (CloseableIterable tasks = scan.planFiles()) { - for (FileScanTask task : tasks) { - filesToDelete.add(task.file()); + for (ManifestFile manifest : dataManifests) { + ManifestReader reader = ManifestFiles.read(manifest, io, specsById); + if (expression != null) { + reader.filterPartitions(expression); + } + try (reader) { + for (DataFile dataFile : reader) { + filesToDelete.add(dataFile); + } } }