diff --git a/ice/src/main/java/com/altinity/ice/cli/internal/cmd/Insert.java b/ice/src/main/java/com/altinity/ice/cli/internal/cmd/Insert.java index b4656c9..1e51068 100644 --- a/ice/src/main/java/com/altinity/ice/cli/internal/cmd/Insert.java +++ b/ice/src/main/java/com/altinity/ice/cli/internal/cmd/Insert.java @@ -423,17 +423,20 @@ private static List processFile( PartitionKey partitionKey = null; if (partitionSpec.isPartitioned()) { - partitionKey = Partitioning.inferPartitionKey(metadata, partitionSpec); - if (partitionKey == null) { + var inferResult = Partitioning.inferPartitionKey(metadata, partitionSpec); + if (!inferResult.success()) { if (options.noCopy || options.s3CopyObject) { throw new BadRequestException( String.format( - "Cannot infer partition key of %s from the metadata", inputFile.location())); + "%s: %s. In no-copy mode, each file must contain data for only one partition value", + inputFile.location(), inferResult.failureReason())); } logger.warn( - "{} does not appear to be partitioned. Falling back to full scan (slow)", - inputFile.location()); + "{}: {}. Falling back to full scan (slow)", + inputFile.location(), + inferResult.failureReason()); } else { + partitionKey = inferResult.partitionKey(); logger.info("{}: using inferred partition key {}", file, partitionKey); } } diff --git a/ice/src/main/java/com/altinity/ice/cli/internal/iceberg/Partitioning.java b/ice/src/main/java/com/altinity/ice/cli/internal/iceberg/Partitioning.java index 530bbf4..3a8e826 100644 --- a/ice/src/main/java/com/altinity/ice/cli/internal/iceberg/Partitioning.java +++ b/ice/src/main/java/com/altinity/ice/cli/internal/iceberg/Partitioning.java @@ -49,6 +49,13 @@ public final class Partitioning { private Partitioning() {} + public record InferPartitionKeyResult( + @Nullable PartitionKey partitionKey, @Nullable String failureReason) { + public boolean success() { + return partitionKey != null; + } + } + public static PartitionSpec newPartitionSpec(Schema schema, List columns) { final PartitionSpec.Builder builder = PartitionSpec.builderFor(schema); if (!columns.isEmpty()) { @@ -123,7 +130,7 @@ public static void apply(UpdatePartitionSpec op, List columns } // TODO: fall back to path when statistics is not available - public static @Nullable PartitionKey inferPartitionKey( + public static InferPartitionKeyResult inferPartitionKey( ParquetMetadata metadata, PartitionSpec spec) { Schema schema = spec.schema(); @@ -138,7 +145,7 @@ public static void apply(UpdatePartitionSpec op, List columns Object value = null; Object valueTransformed = null; - boolean same = true; + String failureReason = null; for (BlockMetaData block : blocks) { ColumnChunkMetaData columnMeta = @@ -148,7 +155,7 @@ public static void apply(UpdatePartitionSpec op, List columns .orElse(null); if (columnMeta == null) { - same = false; + failureReason = String.format("Column '%s' not found in file metadata", sourceName); break; } @@ -158,7 +165,7 @@ public static void apply(UpdatePartitionSpec op, List columns || !stats.hasNonNullValue() || stats.genericGetMin() == null || stats.genericGetMax() == null) { - same = false; + failureReason = String.format("Column '%s' has no statistics", sourceName); break; } @@ -176,7 +183,10 @@ public static void apply(UpdatePartitionSpec op, List columns Object maxTransformed = boundTransform.apply(max); if (!minTransformed.equals(maxTransformed)) { - same = false; + failureReason = + String.format( + "File contains multiple partition values for '%s' (min: %s, max: %s)", + sourceName, minTransformed, maxTransformed); break; } @@ -184,21 +194,24 @@ public static void apply(UpdatePartitionSpec op, List columns valueTransformed = minTransformed; value = min; } else if (!valueTransformed.equals(minTransformed)) { - same = false; + failureReason = + String.format( + "File contains multiple partition values for '%s' (e.g., %s and %s)", + sourceName, valueTransformed, minTransformed); break; } } - if (same && value != null) { + if (failureReason == null && value != null) { partitionRecord.setField(sourceName, decodeStatValue(value, type)); } else { - return null; + return new InferPartitionKeyResult(null, failureReason); } } PartitionKey partitionKey = new PartitionKey(spec, schema); partitionKey.wrap(partitionRecord); - return partitionKey; + return new InferPartitionKeyResult(partitionKey, null); } // Copied from org.apache.iceberg.parquet.ParquetConversions. diff --git a/ice/src/test/java/com/altinity/ice/cli/internal/iceberg/PartitioningTest.java b/ice/src/test/java/com/altinity/ice/cli/internal/iceberg/PartitioningTest.java index ac55542..a0de255 100644 --- a/ice/src/test/java/com/altinity/ice/cli/internal/iceberg/PartitioningTest.java +++ b/ice/src/test/java/com/altinity/ice/cli/internal/iceberg/PartitioningTest.java @@ -90,7 +90,7 @@ private PartitionKey partitionOf( throws IOException { Map> partition = Partitioning.partition(inputFile, partitionSpec.schema(), partitionSpec); - PartitionKey result = Partitioning.inferPartitionKey(metadata, partitionSpec); + PartitionKey result = Partitioning.inferPartitionKey(metadata, partitionSpec).partitionKey(); if (result != null) { assertThat(partition.size()).isEqualTo(1); PartitionKey expected = partition.keySet().stream().findFirst().get();