Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 16 additions & 7 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,9 @@
<lombok.version>1.18.36</lombok.version>
<lombok-maven-plugin.version>1.18.20.0</lombok-maven-plugin.version>
<hadoop.version>3.4.1</hadoop.version>
<hudi.version>0.14.0</hudi.version>
<hudi.version>1.1.0</hudi.version>
<aws.version>2.29.40</aws.version>
<hive.version>2.3.9</hive.version>
<hive.version>3.1.3</hive.version>
<maven-source-plugin.version>3.3.1</maven-source-plugin.version>
<maven-javadoc-plugin.version>3.8.0</maven-javadoc-plugin.version>
<maven-gpg-plugin.version>3.2.4</maven-gpg-plugin.version>
Expand All @@ -85,10 +85,10 @@
<scala13.version>2.13.15</scala13.version>
<scala.version>${scala12.version}</scala.version>
<scala.binary.version>2.12</scala.binary.version>
<spark.version>3.4.2</spark.version>
<spark.version.prefix>3.4</spark.version.prefix>
<iceberg.version>1.4.2</iceberg.version>
<delta.version>2.4.0</delta.version>
<spark.version>3.5.2</spark.version>
<spark.version.prefix>3.5</spark.version.prefix>
<iceberg.version>1.5.2</iceberg.version>
<delta.version>3.0.0</delta.version>
<paimon.version>1.2.0</paimon.version>
<jackson.version>2.18.2</jackson.version>
<spotless.version>2.43.0</spotless.version>
Expand Down Expand Up @@ -270,6 +270,12 @@
<version>${hudi.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-utilities_2.12</artifactId>
<version>${hudi.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-spark${spark.version.prefix}-bundle_${scala.binary.version}</artifactId>
Expand Down Expand Up @@ -321,7 +327,7 @@
<!-- Delta -->
<dependency>
<groupId>io.delta</groupId>
<artifactId>delta-core_${scala.binary.version}</artifactId>
<artifactId>delta-spark_${scala.binary.version}</artifactId>
<version>${delta.version}</version>
</dependency>
<dependency>
Expand Down Expand Up @@ -731,6 +737,9 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<release>${maven.compiler.target}</release>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
Expand Down
2 changes: 1 addition & 1 deletion xtable-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@
<!-- Delta dependencies -->
<dependency>
<groupId>io.delta</groupId>
<artifactId>delta-core_${scala.binary.version}</artifactId>
<artifactId>delta-spark_${scala.binary.version}</artifactId>
</dependency>
<dependency>
<groupId>io.delta</groupId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,223 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.stats;

import static org.apache.xtable.model.schema.InternalSchema.MetadataKey.TIMESTAMP_PRECISION;
import static org.apache.xtable.model.schema.InternalSchema.MetadataValue.MICROS;

import java.lang.reflect.Constructor;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;

import org.apache.hudi.metadata.HoodieIndexVersion;

import org.apache.xtable.model.schema.InternalSchema;
import org.apache.xtable.model.stat.ColumnStat;

/**
* Utility class for creating and converting Hudi {@link ValueMetadata} instances from XTable's
* internal schema representation.
*
* <p>This class bridges XTable's {@link InternalSchema} types to Hudi's {@link ValueType} and
* {@link ValueMetadata} used for column statistics. It handles the conversion of various data types
* including timestamps, decimals, and dates.
*
* <p>Note: This class uses reflection to create {@link ValueMetadata} instances because XTable
* classes may be loaded by a different classloader than Hudi classes in Spark environments, making
* direct constructor access illegal.
*/
public class XTableValueMetadata {

/**
* Creates a {@link ValueMetadata} instance from a {@link ColumnStat} for the specified Hudi index
* version.
*
* @param columnStat the column statistics containing schema information
* @param indexVersion the Hudi index version to use for metadata creation
* @return the appropriate {@link ValueMetadata} for the column's data type
* @throws IllegalArgumentException if columnStat is null (for V2+ index), or if decimal metadata
* is missing required precision/scale
* @throws IllegalStateException if an unsupported internal type is encountered
*/
public static ValueMetadata getValueMetadata(
ColumnStat columnStat, HoodieIndexVersion indexVersion) {
if (indexVersion.lowerThan(HoodieIndexVersion.V2)) {
return ValueMetadata.V1EmptyMetadata.get();
}
if (columnStat == null) {
throw new IllegalArgumentException("ColumnStat cannot be null");
}
InternalSchema internalSchema = columnStat.getField().getSchema();
ValueType valueType = fromInternalSchema(internalSchema);
if (valueType == ValueType.V1) {
throw new IllegalStateException(
"InternalType V1 should not be returned from fromInternalSchema");
} else if (valueType == ValueType.DECIMAL) {
if (internalSchema.getMetadata() == null) {
throw new IllegalArgumentException("Decimal metadata is null");
} else if (!internalSchema
.getMetadata()
.containsKey(InternalSchema.MetadataKey.DECIMAL_SCALE)) {
throw new IllegalArgumentException("Decimal scale is null");
} else if (!internalSchema
.getMetadata()
.containsKey(InternalSchema.MetadataKey.DECIMAL_PRECISION)) {
throw new IllegalArgumentException("Decimal precision is null");
}
int scale = (int) internalSchema.getMetadata().get(InternalSchema.MetadataKey.DECIMAL_SCALE);
int precision =
(int) internalSchema.getMetadata().get(InternalSchema.MetadataKey.DECIMAL_PRECISION);
return ValueMetadata.DecimalMetadata.create(precision, scale);
} else {
return createValueMetadata(valueType);
}
}

/**
* Maps an XTable {@link InternalSchema} to the corresponding Hudi {@link ValueType}.
*
* @param internalSchema the internal schema to convert
* @return the corresponding Hudi value type
* @throws UnsupportedOperationException if the internal data type is not supported
*/
static ValueType fromInternalSchema(InternalSchema internalSchema) {
switch (internalSchema.getDataType()) {
case NULL:
return ValueType.NULL;
case BOOLEAN:
return ValueType.BOOLEAN;
case INT:
return ValueType.INT;
case LONG:
return ValueType.LONG;
case FLOAT:
return ValueType.FLOAT;
case DOUBLE:
return ValueType.DOUBLE;
case STRING:
return ValueType.STRING;
case BYTES:
return ValueType.BYTES;
case FIXED:
return ValueType.FIXED;
case DECIMAL:
return ValueType.DECIMAL;
case UUID:
return ValueType.UUID;
case DATE:
return ValueType.DATE;
case TIMESTAMP:
if (internalSchema.getMetadata() != null
&& MICROS == internalSchema.getMetadata().get(TIMESTAMP_PRECISION)) {
return ValueType.TIMESTAMP_MICROS;
} else {
return ValueType.TIMESTAMP_MILLIS;
}
case TIMESTAMP_NTZ:
if (internalSchema.getMetadata() != null
&& MICROS == internalSchema.getMetadata().get(TIMESTAMP_PRECISION)) {
return ValueType.LOCAL_TIMESTAMP_MICROS;
} else {
return ValueType.LOCAL_TIMESTAMP_MILLIS;
}
default:
throw new UnsupportedOperationException(
"InternalType " + internalSchema.getDataType() + " is not supported");
}
}

/**
* Creates a {@link ValueMetadata} instance from a {@link ValueType} for the specified Hudi index
* version. This method is primarily intended for testing purposes.
*
* @param valueType the Hudi value type
* @param indexVersion the Hudi index version to use for metadata creation
* @return the appropriate {@link ValueMetadata} for the value type
*/
public static ValueMetadata getValueMetadata(
ValueType valueType, HoodieIndexVersion indexVersion) {
if (indexVersion.lowerThan(HoodieIndexVersion.V2)) {
return ValueMetadata.V1EmptyMetadata.get();
}
return createValueMetadata(valueType);
}

/**
* Creates a ValueMetadata instance using reflection to access the protected constructor. This is
* necessary because XTable classes may be loaded by a different classloader than Hudi classes in
* Spark environments, making direct constructor access illegal.
*/
private static ValueMetadata createValueMetadata(ValueType valueType) {
try {
Constructor<ValueMetadata> constructor =
ValueMetadata.class.getDeclaredConstructor(ValueType.class);
constructor.setAccessible(true);
return constructor.newInstance(valueType);
} catch (Exception e) {
throw new RuntimeException(
"Failed to create ValueMetadata instance for type: " + valueType, e);
}
}

/**
* Converts a value from its XTable representation to the appropriate Hudi range type for column
* statistics.
*
* <p>This method handles the conversion of temporal types ({@link Instant}, {@link
* LocalDateTime}, {@link LocalDate}) to their corresponding Hudi representations based on the
* value metadata.
*
* @param val the value to convert
* @param valueMetadata the metadata describing the target value type
* @return the converted value suitable for Hudi range statistics
* @throws IllegalArgumentException if the value type doesn't match the expected metadata type
*/
public static Comparable<?> convertHoodieTypeToRangeType(
Comparable<?> val, ValueMetadata valueMetadata) {
if (val instanceof Instant) {
if (valueMetadata.getValueType().equals(ValueType.TIMESTAMP_MILLIS)) {
return ValueType.fromTimestampMillis(val, valueMetadata);
} else if (valueMetadata.getValueType().equals(ValueType.TIMESTAMP_MICROS)) {
return ValueType.fromTimestampMicros(val, valueMetadata);
} else {
throw new IllegalArgumentException(
"Unsupported value type: " + valueMetadata.getValueType());
}
} else if (val instanceof LocalDateTime) {
if (valueMetadata.getValueType().equals(ValueType.LOCAL_TIMESTAMP_MILLIS)) {
return ValueType.fromLocalTimestampMillis(val, valueMetadata);
} else if (valueMetadata.getValueType().equals(ValueType.LOCAL_TIMESTAMP_MICROS)) {
return ValueType.fromLocalTimestampMicros(val, valueMetadata);
} else {
throw new IllegalArgumentException(
"Unsupported value type: " + valueMetadata.getValueType());
}
} else if (val instanceof LocalDate) {
if (valueMetadata.getValueType().equals(ValueType.DATE)) {
return ValueType.fromDate(val, valueMetadata);
} else {
throw new IllegalArgumentException(
"Unsupported value type: " + valueMetadata.getValueType());
}
} else {
return val;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -391,8 +391,9 @@ private Schema fromInternalSchema(InternalSchema internalSchema, String currentP
return finalizeSchema(
LogicalTypes.date().addToSchema(Schema.create(Schema.Type.INT)), internalSchema);
case TIMESTAMP:
if (internalSchema.getMetadata().get(InternalSchema.MetadataKey.TIMESTAMP_PRECISION)
== InternalSchema.MetadataValue.MICROS) {
if (internalSchema.getMetadata() != null
&& internalSchema.getMetadata().get(InternalSchema.MetadataKey.TIMESTAMP_PRECISION)
== InternalSchema.MetadataValue.MICROS) {
return finalizeSchema(
LogicalTypes.timestampMicros().addToSchema(Schema.create(Schema.Type.LONG)),
internalSchema);
Expand All @@ -402,8 +403,9 @@ private Schema fromInternalSchema(InternalSchema internalSchema, String currentP
internalSchema);
}
case TIMESTAMP_NTZ:
if (internalSchema.getMetadata().get(InternalSchema.MetadataKey.TIMESTAMP_PRECISION)
== InternalSchema.MetadataValue.MICROS) {
if (internalSchema.getMetadata() != null
&& internalSchema.getMetadata().get(InternalSchema.MetadataKey.TIMESTAMP_PRECISION)
== InternalSchema.MetadataValue.MICROS) {
return finalizeSchema(
LogicalTypes.localTimestampMicros().addToSchema(Schema.create(Schema.Type.LONG)),
internalSchema);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.spark.sql.delta.actions.Action;
import org.apache.spark.sql.delta.actions.AddFile;

import scala.Option;
import scala.collection.JavaConverters;
import scala.collection.Seq;

Expand Down Expand Up @@ -122,7 +123,9 @@ private Stream<AddFile> createAddFileAction(
true,
getColumnStats(schema, dataFile.getRecordCount(), dataFile.getColumnStats()),
null,
null));
null,
Option.empty(),
Option.empty()));
}

private String getColumnStats(
Expand Down
Loading