Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions ice/src/main/java/com/altinity/ice/cli/internal/cmd/Insert.java
Original file line number Diff line number Diff line change
Expand Up @@ -423,17 +423,20 @@ private static List<DataFile> processFile(

PartitionKey partitionKey = null;
if (partitionSpec.isPartitioned()) {
partitionKey = Partitioning.inferPartitionKey(metadata, partitionSpec);
if (partitionKey == null) {
var inferResult = Partitioning.inferPartitionKey(metadata, partitionSpec);
if (!inferResult.success()) {
if (options.noCopy || options.s3CopyObject) {
throw new BadRequestException(
String.format(
"Cannot infer partition key of %s from the metadata", inputFile.location()));
"%s: %s. In no-copy mode, each file must contain data for only one partition value",
inputFile.location(), inferResult.failureReason()));
}
logger.warn(
"{} does not appear to be partitioned. Falling back to full scan (slow)",
inputFile.location());
"{}: {}. Falling back to full scan (slow)",
inputFile.location(),
inferResult.failureReason());
} else {
partitionKey = inferResult.partitionKey();
logger.info("{}: using inferred partition key {}", file, partitionKey);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,13 @@ public final class Partitioning {

private Partitioning() {}

public record InferPartitionKeyResult(
@Nullable PartitionKey partitionKey, @Nullable String failureReason) {
public boolean success() {
return partitionKey != null;
}
}

public static PartitionSpec newPartitionSpec(Schema schema, List<Main.IcePartition> columns) {
final PartitionSpec.Builder builder = PartitionSpec.builderFor(schema);
if (!columns.isEmpty()) {
Expand Down Expand Up @@ -123,7 +130,7 @@ public static void apply(UpdatePartitionSpec op, List<Main.IcePartition> columns
}

// TODO: fall back to path when statistics is not available
public static @Nullable PartitionKey inferPartitionKey(
public static InferPartitionKeyResult inferPartitionKey(
ParquetMetadata metadata, PartitionSpec spec) {
Schema schema = spec.schema();

Expand All @@ -138,7 +145,7 @@ public static void apply(UpdatePartitionSpec op, List<Main.IcePartition> columns

Object value = null;
Object valueTransformed = null;
boolean same = true;
String failureReason = null;

for (BlockMetaData block : blocks) {
ColumnChunkMetaData columnMeta =
Expand All @@ -148,7 +155,7 @@ public static void apply(UpdatePartitionSpec op, List<Main.IcePartition> columns
.orElse(null);

if (columnMeta == null) {
same = false;
failureReason = String.format("Column '%s' not found in file metadata", sourceName);
break;
}

Expand All @@ -158,7 +165,7 @@ public static void apply(UpdatePartitionSpec op, List<Main.IcePartition> columns
|| !stats.hasNonNullValue()
|| stats.genericGetMin() == null
|| stats.genericGetMax() == null) {
same = false;
failureReason = String.format("Column '%s' has no statistics", sourceName);
break;
}

Expand All @@ -176,29 +183,35 @@ public static void apply(UpdatePartitionSpec op, List<Main.IcePartition> columns
Object maxTransformed = boundTransform.apply(max);

if (!minTransformed.equals(maxTransformed)) {
same = false;
failureReason =
String.format(
"File contains multiple partition values for '%s' (min: %s, max: %s)",
sourceName, minTransformed, maxTransformed);
break;
}

if (valueTransformed == null) {
valueTransformed = minTransformed;
value = min;
} else if (!valueTransformed.equals(minTransformed)) {
same = false;
failureReason =
String.format(
"File contains multiple partition values for '%s' (e.g., %s and %s)",
sourceName, valueTransformed, minTransformed);
break;
}
}

if (same && value != null) {
if (failureReason == null && value != null) {
partitionRecord.setField(sourceName, decodeStatValue(value, type));
} else {
return null;
return new InferPartitionKeyResult(null, failureReason);
}
}

PartitionKey partitionKey = new PartitionKey(spec, schema);
partitionKey.wrap(partitionRecord);
return partitionKey;
return new InferPartitionKeyResult(partitionKey, null);
}

// Copied from org.apache.iceberg.parquet.ParquetConversions.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ private PartitionKey partitionOf(
throws IOException {
Map<PartitionKey, List<Record>> partition =
Partitioning.partition(inputFile, partitionSpec.schema(), partitionSpec);
PartitionKey result = Partitioning.inferPartitionKey(metadata, partitionSpec);
PartitionKey result = Partitioning.inferPartitionKey(metadata, partitionSpec).partitionKey();
if (result != null) {
assertThat(partition.size()).isEqualTo(1);
PartitionKey expected = partition.keySet().stream().findFirst().get();
Expand Down