From 8242e94b9b6a1fab9db517bd8704bb349978a4ac Mon Sep 17 00:00:00 2001 From: dharma-shashank-meesho Date: Mon, 17 Nov 2025 14:57:32 +0530 Subject: [PATCH 1/2] Fixes --- .gitignore | 4 +- .../xtable/delta/DeltaActionsConverter.java | 2 +- .../xtable/delta/DeltaSchemaExtractor.java | 3 + .../xtable/delta/DeltaStatsExtractor.java | 177 ++++++- .../iceberg/IcebergColumnStatsConverter.java | 11 +- .../iceberg/IcebergConversionTarget.java | 4 +- .../iceberg/IcebergDataFileUpdatesSync.java | 441 +++++++++++++++++- .../parquet/ParquetStatsConverterUtil.java | 211 +++++++-- .../xtable/iceberg/TestIcebergSync.java | 4 +- 9 files changed, 799 insertions(+), 58 deletions(-) diff --git a/.gitignore b/.gitignore index 5a59990d7..33f70eb01 100644 --- a/.gitignore +++ b/.gitignore @@ -45,4 +45,6 @@ my_config.yaml my_config_catalog.yaml # REST generated models -spec/generated \ No newline at end of file +spec/generated +.pre-commit-config.yaml +trufflehog/ \ No newline at end of file diff --git a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaActionsConverter.java b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaActionsConverter.java index 16a320f12..e2ddec6a7 100644 --- a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaActionsConverter.java +++ b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaActionsConverter.java @@ -57,7 +57,7 @@ public InternalDataFile convertAddActionToInternalDataFile( boolean includeColumnStats, DeltaPartitionExtractor partitionExtractor, DeltaStatsExtractor fileStatsExtractor) { - FileStats fileStats = fileStatsExtractor.getColumnStatsForFile(addFile, fields); + FileStats fileStats = fileStatsExtractor.getColumnStatsForFile(addFile, deltaSnapshot, fields); List columnStats = includeColumnStats ? fileStats.getColumnStats() : Collections.emptyList(); long recordCount = fileStats.getNumRecords(); diff --git a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaSchemaExtractor.java b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaSchemaExtractor.java index 1376f884e..86774476a 100644 --- a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaSchemaExtractor.java +++ b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaSchemaExtractor.java @@ -84,6 +84,9 @@ private InternalSchema toInternalSchema( int openParenIndex = typeName.indexOf("("); String trimmedTypeName = openParenIndex > 0 ? typeName.substring(0, openParenIndex) : typeName; switch (trimmedTypeName) { + case "short": + type = InternalType.INT; + break; case "integer": type = InternalType.INT; break; diff --git a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaStatsExtractor.java b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaStatsExtractor.java index a685700e6..7d01f8e9d 100644 --- a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaStatsExtractor.java +++ b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaStatsExtractor.java @@ -40,7 +40,11 @@ import lombok.extern.log4j.Log4j2; import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import org.apache.spark.sql.delta.Snapshot; import org.apache.spark.sql.delta.actions.AddFile; import com.fasterxml.jackson.annotation.JsonAnySetter; @@ -57,6 +61,8 @@ import org.apache.xtable.model.stat.ColumnStat; import org.apache.xtable.model.stat.FileStats; import org.apache.xtable.model.stat.Range; +import org.apache.xtable.parquet.ParquetMetadataExtractor; +import org.apache.xtable.parquet.ParquetStatsExtractor; /** * DeltaStatsExtractor extracts column stats and also responsible for their serialization leveraging @@ -186,13 +192,64 @@ private void insertValueAtPath(Map jsonObject, String[] pathPart } } + /** + * Extracts column statistics for a Delta Lake data file. This method first attempts to read + * statistics from the Delta checkpoint (fast path). If checkpoint statistics are NULL or empty, + * it falls back to reading statistics directly from the Parquet file footer (slow path). + * + *

Delta Lake can store statistics in two locations: + * + *

    + *
  • Checkpoint files (JSON format): Preferred and faster, but may be NULL if + * 'delta.checkpoint.writeStatsAsJson' is false or stats collection was disabled + *
  • Parquet file footers: Fallback option that requires opening each data file + * individually, which is more expensive but ensures statistics are always available + *
+ * + *

Performance Considerations: When checkpoint statistics are NULL for many files, the fallback + * to Parquet footers can significantly slow down conversion. For large tables with thousands of + * files, consider enabling Delta checkpoint statistics via: {@code ALTER TABLE table_name SET + * TBLPROPERTIES ('delta.checkpoint.writeStatsAsJson' = 'true')} + * + * @param addFile the Delta AddFile action containing file metadata + * @param snapshot the Delta snapshot providing table context and base path + * @param fields the schema fields for which to extract statistics + * @return FileStats containing column statistics and record count + */ + public FileStats getColumnStatsForFile( + AddFile addFile, Snapshot snapshot, List fields) { + // Attempt to read statistics from Delta checkpoint (fast path) + String statsString = addFile.stats(); + + if (StringUtils.isNotEmpty(statsString)) { + log.debug("Reading stats from checkpoint for file: {}", addFile.path()); + return parseStatsFromJson(statsString, fields); + } + + // Checkpoint statistics are NULL or empty - fall back to Parquet footer (slow path) + log.debug( + "Stats not found in Delta checkpoint for file: {}, falling back to Parquet footer read", + addFile.path()); + return readStatsFromParquetFooter(addFile, snapshot, fields); + } + + /** + * Legacy method for backward compatibility. Use getColumnStatsForFile(AddFile, Snapshot, List) + * instead. + */ public FileStats getColumnStatsForFile(AddFile addFile, List fields) { - if (StringUtils.isEmpty(addFile.stats())) { + String statsString = addFile.stats(); + if (StringUtils.isEmpty(statsString)) { return FileStats.builder().columnStats(Collections.emptyList()).numRecords(0).build(); } + return parseStatsFromJson(statsString, fields); + } + + /** Parses stats from JSON string and converts to FileStats. */ + private FileStats parseStatsFromJson(String statsString, List fields) { // TODO: Additional work needed to track maps & arrays. try { - DeltaStats deltaStats = MAPPER.readValue(addFile.stats(), DeltaStats.class); + DeltaStats deltaStats = MAPPER.readValue(statsString, DeltaStats.class); collectUnsupportedStats(deltaStats.getAdditionalStats()); Map fieldPathToMaxValue = flattenStatMap(deltaStats.getMaxValues()); @@ -229,6 +286,114 @@ public FileStats getColumnStatsForFile(AddFile addFile, List fiel } } + /** + * Reads column statistics directly from a Parquet file footer. This method is used as a fallback + * when Delta checkpoint statistics are NULL or unavailable. + * + *

This operation is expensive as it requires: + * + *

    + *
  • Opening each Parquet file individually (I/O overhead) + *
  • Reading the file footer metadata + *
  • Parsing column chunk metadata for all columns + *
  • Converting Parquet statistics to internal format + *
+ * + *

For cloud storage (S3, GCS, ADLS), this can add significant latency due to network overhead. + * The method performs several safety checks to prevent errors: + * + *

    + *
  • Filters out statistics with NULL min/max ranges (prevents NullPointerException) + *
  • Skips DECIMAL and complex types (prevents ClassCastException) + *
  • Validates Binary-to-primitive type conversions + *
+ * + *

Record Count: The record count is read from Parquet row group metadata, which is always + * reliable regardless of column statistics availability. + * + * @param addFile the Delta AddFile action containing the file path + * @param snapshot the Delta snapshot providing table base path + * @param fields the schema fields for which to extract statistics + * @return FileStats with extracted statistics, or empty stats if reading fails + */ + private FileStats readStatsFromParquetFooter( + AddFile addFile, Snapshot snapshot, List fields) { + try { + // Construct absolute path to the Parquet data file + // Handle both absolute paths and relative paths from table base + String tableBasePath = snapshot.deltaLog().dataPath().toString(); + String filePath = addFile.path(); + String fullPath = + filePath.startsWith(tableBasePath) ? filePath : tableBasePath + "/" + filePath; + + // Read Parquet file footer metadata using Hadoop FileSystem API + Configuration conf = new Configuration(); + Path parquetPath = new Path(fullPath); + + ParquetMetadata footer = ParquetMetadataExtractor.readParquetMetadata(conf, parquetPath); + List parquetStats = ParquetStatsExtractor.getColumnStatsForaFile(footer); + + // Extract record count from Parquet row groups metadata + // This is always reliable and doesn't depend on column statistics + long numRecords = footer.getBlocks().stream().mapToLong(block -> block.getRowCount()).sum(); + + // Build lookup map for efficient field matching by path + Map pathToStat = + parquetStats.stream() + .collect( + Collectors.toMap( + stat -> stat.getField().getPath(), + Function.identity(), + (stat1, stat2) -> stat1)); // Keep first occurrence on collision + + // Map Parquet stats to requested Delta schema fields + // Filter out statistics with NULL ranges to prevent downstream NullPointerException + List mappedStats = + fields.stream() + .filter(field -> pathToStat.containsKey(field.getPath())) + .map( + field -> { + ColumnStat parquetStat = pathToStat.get(field.getPath()); + // Rebuild ColumnStat with correct Delta field reference + // while preserving Parquet statistics values + return ColumnStat.builder() + .field(field) + .numValues(parquetStat.getNumValues()) + .numNulls(parquetStat.getNumNulls()) + .totalSize(parquetStat.getTotalSize()) + .range(parquetStat.getRange()) + .build(); + }) + .filter( + stat -> + stat.getRange() != null + && stat.getRange().getMinValue() != null + && stat.getRange().getMaxValue() != null) + .collect(Collectors.toList()); + + log.debug( + "Successfully extracted {} column stats from Parquet footer for file: {}", + mappedStats.size(), + addFile.path()); + + return FileStats.builder().columnStats(mappedStats).numRecords(numRecords).build(); + + } catch (Exception e) { + // Log warning but continue conversion - the file will be added without statistics + // This is preferable to failing the entire conversion + log.warn( + "Failed to read stats from Parquet footer for file {}: {}. " + + "File will be included without column statistics.", + addFile.path(), + e.getMessage()); + + // Return empty statistics but note that record count is also 0 + // Delta AddFile doesn't contain record count, so we cannot preserve it here + // The file will still be added to target table with 0 record count in metadata + return FileStats.builder().columnStats(Collections.emptyList()).numRecords(0).build(); + } + } + private void collectUnsupportedStats(Map additionalStats) { if (additionalStats == null || additionalStats.isEmpty()) { return; @@ -251,10 +416,18 @@ private void collectUnsupportedStats(Map additionalStats) { */ private Map flattenStatMap(Map statMap) { Map result = new HashMap<>(); + // Return empty map if input is null + if (statMap == null) { + return result; + } Queue statFieldQueue = new ArrayDeque<>(); statFieldQueue.add(StatField.of("", statMap)); while (!statFieldQueue.isEmpty()) { StatField statField = statFieldQueue.poll(); + // Skip if values map is null (can happen with malformed or partial stats) + if (statField.getValues() == null) { + continue; + } String prefix = statField.getParentPath().isEmpty() ? "" : statField.getParentPath() + "."; statField .getValues() diff --git a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergColumnStatsConverter.java b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergColumnStatsConverter.java index 60d4a453b..980deb6ab 100644 --- a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergColumnStatsConverter.java +++ b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergColumnStatsConverter.java @@ -67,13 +67,16 @@ public Metrics toIceberg(Schema schema, long totalRowCount, List fie valueCounts.put(fieldId, columnStats.getNumValues()); nullValueCounts.put(fieldId, columnStats.getNumNulls()); Type fieldType = icebergField.type(); - if (columnStats.getRange().getMinValue() != null) { + // Add min/max bounds if available (they're optional in Iceberg Metrics) + // Native Iceberg includes columns even without bounds - they just have null bounds + Range range = columnStats.getRange(); + if (range != null && range.getMinValue() != null) { lowerBounds.put( - fieldId, Conversions.toByteBuffer(fieldType, columnStats.getRange().getMinValue())); + fieldId, Conversions.toByteBuffer(fieldType, range.getMinValue())); } - if (columnStats.getRange().getMaxValue() != null) { + if (range != null && range.getMaxValue() != null) { upperBounds.put( - fieldId, Conversions.toByteBuffer(fieldType, columnStats.getRange().getMaxValue())); + fieldId, Conversions.toByteBuffer(fieldType, range.getMaxValue())); } }); return new Metrics( diff --git a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionTarget.java b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionTarget.java index b05089d0a..d430f437c 100644 --- a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionTarget.java +++ b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionTarget.java @@ -136,7 +136,8 @@ public void init(TargetTable targetTable, Configuration configuration) { IcebergPartitionSpecSync.getInstance(), IcebergDataFileUpdatesSync.of( IcebergColumnStatsConverter.getInstance(), - IcebergPartitionValueConverter.getInstance()), + IcebergPartitionValueConverter.getInstance(), + configuration), IcebergTableManager.of(configuration)); } @@ -226,6 +227,7 @@ public void syncFilesForSnapshot(List partitionedDataFiles) @Override public void syncFilesForDiff(InternalFilesDiff internalFilesDiff) { dataFileUpdatesExtractor.applyDiff( + table, transaction, internalFilesDiff, transaction.table().schema(), diff --git a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergDataFileUpdatesSync.java b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergDataFileUpdatesSync.java index 77a4eeedd..e87a2777f 100644 --- a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergDataFileUpdatesSync.java +++ b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergDataFileUpdatesSync.java @@ -19,11 +19,16 @@ package org.apache.xtable.iceberg; import java.util.*; +import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.StreamSupport; import lombok.AllArgsConstructor; +import lombok.extern.log4j.Log4j2; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.iceberg.*; import org.apache.iceberg.io.CloseableIterable; @@ -31,16 +36,24 @@ import org.apache.xtable.exception.ReadException; import org.apache.xtable.model.InternalTable; import org.apache.xtable.model.metadata.TableSyncMetadata; +import org.apache.xtable.model.schema.InternalField; +import org.apache.xtable.model.schema.InternalSchema; +import org.apache.xtable.model.stat.ColumnStat; +import org.apache.xtable.model.stat.Range; import org.apache.xtable.model.storage.FilesDiff; import org.apache.xtable.model.storage.InternalDataFile; import org.apache.xtable.model.storage.InternalFile; import org.apache.xtable.model.storage.InternalFilesDiff; import org.apache.xtable.model.storage.PartitionFileGroup; +import org.apache.xtable.parquet.ParquetMetadataExtractor; +import org.apache.xtable.parquet.ParquetStatsExtractor; +@Log4j2 @AllArgsConstructor(staticName = "of") public class IcebergDataFileUpdatesSync { private final IcebergColumnStatsConverter columnStatsConverter; private final IcebergPartitionValueConverter partitionValueConverter; + private final Configuration hadoopConf; public void applySnapshot( Table table, @@ -52,12 +65,20 @@ public void applySnapshot( TableSyncMetadata metadata) { Map previousFiles = new HashMap<>(); - try (CloseableIterable iterator = table.newScan().planFiles()) { - StreamSupport.stream(iterator.spliterator(), false) - .map(FileScanTask::file) - .forEach(file -> previousFiles.put(file.path().toString(), file)); - } catch (Exception e) { - throw new ReadException("Failed to iterate through Iceberg data files", e); + + // Optimize: Check if table has a snapshot before scanning + // For empty tables, this avoids expensive manifest file reads from cloud storage (GCS, S3, etc.) + Snapshot currentSnapshot = table.currentSnapshot(); + if (currentSnapshot != null) { + try (CloseableIterable iterator = table.newScan().planFiles()) { + StreamSupport.stream(iterator.spliterator(), false) + .map(FileScanTask::file) + .forEach(file -> previousFiles.put(file.path().toString(), file)); + } catch (Exception e) { + throw new ReadException("Failed to iterate through Iceberg data files", e); + } + } else { + log.debug("Table has no snapshot, skipping file scan (table is empty)"); } FilesDiff diff = @@ -68,24 +89,49 @@ public void applySnapshot( } public void applyDiff( + Table table, Transaction transaction, InternalFilesDiff internalFilesDiff, Schema schema, PartitionSpec partitionSpec, TableSyncMetadata metadata) { + // Get existing files in Iceberg to filter out duplicates + // This handles cases where source (e.g., Delta with blind append OPTIMIZE) + // sends AddFile actions for files that are already synced + // Note: Must scan the base table (not transaction.table()) as transaction tables don't support + // scans + Map existingFiles = new HashMap<>(); + + // Optimize: Check if table has a snapshot before scanning + // For empty tables, this avoids expensive manifest file reads from cloud storage (GCS, S3, etc.) + Snapshot currentSnapshot = table.currentSnapshot(); + if (currentSnapshot != null) { + try (CloseableIterable iterator = table.newScan().planFiles()) { + StreamSupport.stream(iterator.spliterator(), false) + .map(FileScanTask::file) + .forEach(file -> existingFiles.put(file.path().toString(), file)); + } catch (Exception e) { + throw new ReadException("Failed to read existing Iceberg files during incremental sync", e); + } + } else { + log.debug("Table has no snapshot, skipping file scan (table is empty)"); + } + + // Filter out files that already exist in Iceberg + Collection filesToAdd = + internalFilesDiff.dataFilesAdded().stream() + .filter(InternalDataFile.class::isInstance) + .map(file -> (InternalDataFile) file) + .filter(file -> !existingFiles.containsKey(file.getPhysicalPath())) + .collect(Collectors.toList()); + Collection filesRemoved = internalFilesDiff.dataFilesRemoved().stream() .map(file -> getDataFile(partitionSpec, schema, file)) .collect(Collectors.toList()); - applyDiff( - transaction, - internalFilesDiff.dataFilesAdded(), - filesRemoved, - schema, - partitionSpec, - metadata); + applyDiff(transaction, filesToAdd, filesRemoved, schema, partitionSpec, metadata); } private void applyDiff( @@ -107,13 +153,95 @@ private void applyDiff( private DataFile getDataFile( PartitionSpec partitionSpec, Schema schema, InternalDataFile dataFile) { + // Convert Iceberg schema to InternalSchema for stats extraction + InternalSchema internalSchema = IcebergSchemaExtractor.getInstance().fromIceberg(schema); + + // Get existing stats and check if they are complete + List existingStats = dataFile.getColumnStats(); + long recordCount = dataFile.getRecordCount(); + List columnStats; + + // For Parquet files, ALWAYS read from footer to match native Iceberg behavior + // Native Iceberg always reads Parquet footer during insertion for accuracy and completeness. + // This ensures: + // 1. Statistics are always accurate (source of truth from Parquet file) + // 2. All columns present in the file have statistics + // 3. Statistics match what's actually in the file (not potentially stale existing stats) + // 4. Query performance is optimal (complete and accurate stats enable better predicate pushdown) + if (dataFile.getFileFormat() == org.apache.xtable.model.storage.FileFormat.APACHE_PARQUET) { + log.debug( + "Reading stats from Parquet footer for file: {} to ensure accuracy and completeness (matching native Iceberg behavior).", + dataFile.getPhysicalPath()); + try { + StatsFromParquet statsFromParquet = + readStatsFromParquetFooter(dataFile, internalSchema); + + // Use Parquet footer stats as primary source (most accurate) + // Merge with existing stats only for columns not in Parquet footer (rare edge case) + List parquetStats = statsFromParquet.getColumnStats(); + columnStats = mergeStats(existingStats, parquetStats, internalSchema); + recordCount = statsFromParquet.getRecordCount(); + + log.debug( + "Successfully extracted {} column stats from Parquet footer for file: {}. Stats will be stored in manifest files for optimal query performance.", + columnStats.size(), + dataFile.getPhysicalPath()); + } catch (Exception e) { + log.warn( + "Failed to read stats from Parquet footer for file: {}. Using existing stats as fallback. Error: {}", + dataFile.getPhysicalPath(), + e.getMessage()); + // Fallback to existing stats if Parquet footer read fails + // This should be rare - Parquet footer is the source of truth + columnStats = existingStats != null ? existingStats : Collections.emptyList(); + } + } else { + // For non-Parquet files, use existing stats + log.debug( + "Using existing stats for non-Parquet file: {} (format: {})", + dataFile.getPhysicalPath(), + dataFile.getFileFormat()); + columnStats = existingStats != null ? existingStats : Collections.emptyList(); + } + + // Get accurate file size from filesystem if available, matching Iceberg's behavior + // Iceberg always reads actual file size during data insertion for accuracy + long fileSizeBytes = dataFile.getFileSizeBytes(); + if (fileSizeBytes <= 0) { + // If file size is missing or invalid, read from filesystem + try { + Path filePath = new Path(dataFile.getPhysicalPath()); + FileSystem fs = filePath.getFileSystem(hadoopConf); + if (fs.exists(filePath)) { + fileSizeBytes = fs.getFileStatus(filePath).getLen(); + log.debug( + "Read file size {} from filesystem for file: {}", + fileSizeBytes, + dataFile.getPhysicalPath()); + } + } catch (Exception e) { + log.warn( + "Failed to read file size from filesystem for file: {}. Using provided size: {}. Error: {}", + dataFile.getPhysicalPath(), + fileSizeBytes, + e.getMessage()); + } + } + + // Build DataFile matching Iceberg's native insertion behavior: + // - Path: File location + // - File size: Accurate size from filesystem + // - Metrics: Complete column statistics (min/max, null counts, value counts, sizes) + // - Format: File format (Parquet, ORC, Avro) + // - Partition: Partition values if table is partitioned + // - Content: DATA (default for data files, not delete files) + // Note: Sequence numbers and sort order are handled by Iceberg automatically DataFiles.Builder builder = DataFiles.builder(partitionSpec) .withPath(dataFile.getPhysicalPath()) - .withFileSizeInBytes(dataFile.getFileSizeBytes()) + .withFileSizeInBytes(fileSizeBytes) .withMetrics( - columnStatsConverter.toIceberg( - schema, dataFile.getRecordCount(), dataFile.getColumnStats())) + columnStatsConverter.toIceberg(schema, recordCount, columnStats)) .withFormat(convertFileFormat(dataFile.getFileFormat())); if (partitionSpec.isPartitioned()) { builder.withPartition( @@ -122,6 +250,287 @@ private DataFile getDataFile( return builder.build(); } + + /** + * Reads column statistics from Parquet file footer for all columns in the schema. This method + * ensures that manifest files always have complete statistics, similar to how Iceberg handles + * stats during data insertion. + * + *

This operation reads the Parquet file footer which contains: + *

    + *
  • Column chunk metadata with min/max values for each column + *
  • Row group metadata with record counts + *
  • Schema information + *
+ * + *

Performance: Reading Parquet footers is efficient as it only reads metadata (typically a few + * KB), not the entire file. However, for cloud storage, there is network latency per file. + * + * @param dataFile the data file to read stats from + * @param schema the table schema to extract stats for all columns + * @return StatsFromParquet containing column stats and record count + * @throws ReadException if reading the Parquet file fails + */ + private StatsFromParquet readStatsFromParquetFooter( + InternalDataFile dataFile, InternalSchema schema) { + try { + Path parquetPath = new Path(dataFile.getPhysicalPath()); + org.apache.parquet.hadoop.metadata.ParquetMetadata footer = + ParquetMetadataExtractor.readParquetMetadata(hadoopConf, parquetPath); + + // Extract stats for all columns from Parquet footer + List parquetStats = ParquetStatsExtractor.getColumnStatsForaFile(footer); + + // Extract record count from Parquet row groups metadata + // This is always reliable and doesn't depend on column statistics + long numRecords = + footer.getBlocks().stream().mapToLong(block -> block.getRowCount()).sum(); + + // Group stats by column path - a Parquet file can have multiple row groups, + // each with its own stats. We need to aggregate them like Iceberg does during insertion. + Map> pathToStatsList = + parquetStats.stream() + .collect(Collectors.groupingBy(stat -> stat.getField().getPath())); + + // Aggregate stats across all row groups for each column, matching Iceberg's behavior: + // - Min value = min of all row group mins + // - Max value = max of all row group maxes + // - Null count = sum of all row group null counts + // - Value count = sum of all row group value counts + // - Total size = sum of all row group sizes + Map pathToAggregatedStat = new HashMap<>(); + pathToStatsList.forEach( + (path, statsList) -> { + if (statsList.isEmpty()) { + return; + } + // Use the first stat as base (all stats for same column should have same field) + ColumnStat firstStat = statsList.get(0); + InternalField field = firstStat.getField(); + + // Aggregate across row groups + long totalValues = statsList.stream().mapToLong(ColumnStat::getNumValues).sum(); + long totalNulls = statsList.stream().mapToLong(ColumnStat::getNumNulls).sum(); + long totalSize = statsList.stream().mapToLong(ColumnStat::getTotalSize).sum(); + + // Aggregate min/max across row groups + Object minValue = null; + Object maxValue = null; + for (ColumnStat stat : statsList) { + Range range = stat.getRange(); + if (range != null && range.getMinValue() != null && range.getMaxValue() != null) { + if (minValue == null || compareValues(range.getMinValue(), minValue) < 0) { + minValue = range.getMinValue(); + } + if (maxValue == null || compareValues(range.getMaxValue(), maxValue) > 0) { + maxValue = range.getMaxValue(); + } + } + } + + if (minValue != null && maxValue != null) { + pathToAggregatedStat.put( + path, + ColumnStat.builder() + .field(field) + .numValues(totalValues) + .numNulls(totalNulls) + .totalSize(totalSize) + .range(Range.vector(minValue, maxValue)) + .build()); + } + }); + + // Build stats list for all schema fields + // Include all columns that have stats (even without min/max bounds) + // Native Iceberg includes columns in Metrics even if they don't have min/max bounds + // This is important for query performance - null counts and value counts are still useful + List mappedStats = + schema.getAllFields().stream() + .filter(field -> pathToAggregatedStat.containsKey(field.getPath())) + .map( + field -> { + ColumnStat aggregatedStat = pathToAggregatedStat.get(field.getPath()); + // Rebuild ColumnStat with correct schema field reference + // while preserving aggregated Parquet statistics values + // Include stats even if min/max are null (native Iceberg behavior) + return ColumnStat.builder() + .field(field) + .numValues(aggregatedStat.getNumValues()) + .numNulls(aggregatedStat.getNumNulls()) + .totalSize(aggregatedStat.getTotalSize()) + .range(aggregatedStat.getRange()) + .build(); + }) + // Include all stats - min/max bounds are optional in Iceberg Metrics + // Columns with counts but no bounds are still valuable for query planning + .collect(Collectors.toList()); + + return new StatsFromParquet(mappedStats, numRecords); + } catch (Exception e) { + throw new ReadException( + "Failed to read stats from Parquet footer for file: " + dataFile.getPhysicalPath(), e); + } + } + + /** + * Checks if existing stats are complete for all columns in the schema. + * Stats are considered complete if: + * - All columns in the schema have stats + * - Each stat has a valid range with min/max values + * - Each stat has non-negative counts (numValues, numNulls) + * + * @param existingStats the existing column statistics + * @param schema the table schema + * @return true if stats are complete for all columns, false otherwise + */ + private static boolean areStatsComplete( + List existingStats, InternalSchema schema) { + if (existingStats == null || existingStats.isEmpty()) { + return false; + } + + // Build a map of existing stats by field path for quick lookup + Map statsByPath = + existingStats.stream() + .filter(stat -> stat != null && stat.getField() != null) + .collect( + Collectors.toMap( + stat -> stat.getField().getPath(), Function.identity(), (s1, s2) -> s1)); + + // Check if all schema fields have complete stats + for (InternalField field : schema.getAllFields()) { + ColumnStat stat = statsByPath.get(field.getPath()); + if (stat == null) { + // Missing stat for this column + return false; + } + + // Check if stat has valid range with min/max + Range range = stat.getRange(); + if (range == null + || range.getMinValue() == null + || range.getMaxValue() == null) { + // Incomplete stat (missing min/max) + return false; + } + + // Check if counts are valid (non-negative) + if (stat.getNumValues() < 0 || stat.getNumNulls() < 0) { + // Invalid counts + return false; + } + } + + return true; + } + + /** + * Merges existing stats with Parquet footer stats to ensure completeness. + * Parquet footer stats take precedence (most accurate), but existing stats are preserved + * for columns that might not be in the Parquet footer. + * + * @param existingStats existing column statistics (may be null or incomplete) + * @param parquetStats statistics from Parquet footer + * @param schema the table schema + * @return merged list of complete column statistics + */ + private static List mergeStats( + List existingStats, + List parquetStats, + InternalSchema schema) { + // Build maps for efficient lookup + Map existingByPath = new HashMap<>(); + if (existingStats != null) { + existingStats.stream() + .filter(stat -> stat != null && stat.getField() != null) + .forEach(stat -> existingByPath.put(stat.getField().getPath(), stat)); + } + + Map parquetByPath = new HashMap<>(); + if (parquetStats != null) { + parquetStats.stream() + .filter(stat -> stat != null && stat.getField() != null) + .forEach(stat -> parquetByPath.put(stat.getField().getPath(), stat)); + } + + // Merge: Parquet stats take precedence, but use existing stats for columns not in Parquet + Map mergedByPath = new HashMap<>(existingByPath); + mergedByPath.putAll(parquetByPath); // Parquet stats override existing stats + + // Build final list with all schema fields, preferring Parquet stats + // Include all columns with stats - min/max bounds are optional in Iceberg Metrics + // Native Iceberg includes columns even without min/max bounds (they just have null bounds) + // This ensures maximum query performance - null counts and value counts are still useful + return schema.getAllFields().stream() + .filter(field -> mergedByPath.containsKey(field.getPath())) + .map( + field -> { + ColumnStat stat = mergedByPath.get(field.getPath()); + // Rebuild ColumnStat with correct schema field reference + // Include stats even if min/max are null (native Iceberg behavior) + return ColumnStat.builder() + .field(field) + .numValues(stat.getNumValues()) + .numNulls(stat.getNumNulls()) + .totalSize(stat.getTotalSize()) + .range(stat.getRange()) + .build(); + }) + // Include all stats - don't filter out columns without min/max bounds + // Iceberg Metrics allows null bounds, and counts are still valuable for query planning + .collect(Collectors.toList()); + } + + /** + * Compares two values for min/max aggregation. Handles Comparable types and nulls. + * + * @param v1 first value + * @param v2 second value + * @return negative if v1 < v2, positive if v1 > v2, 0 if equal + */ + @SuppressWarnings("unchecked") + private static int compareValues(Object v1, Object v2) { + if (v1 == null && v2 == null) { + return 0; + } + if (v1 == null) { + return -1; + } + if (v2 == null) { + return 1; + } + if (v1 instanceof Comparable && v2 instanceof Comparable) { + try { + return ((Comparable) v1).compareTo(v2); + } catch (ClassCastException e) { + // If types are incompatible, compare by string representation + return v1.toString().compareTo(v2.toString()); + } + } + // Fallback to string comparison + return v1.toString().compareTo(v2.toString()); + } + + /** Container for stats extracted from Parquet footer. */ + private static class StatsFromParquet { + private final List columnStats; + private final long recordCount; + + StatsFromParquet(List columnStats, long recordCount) { + this.columnStats = columnStats; + this.recordCount = recordCount; + } + + List getColumnStats() { + return columnStats; + } + + long getRecordCount() { + return recordCount; + } + } + private static FileFormat convertFileFormat( org.apache.xtable.model.storage.FileFormat fileFormat) { switch (fileFormat) { diff --git a/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetStatsConverterUtil.java b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetStatsConverterUtil.java index e5fd2d07f..d92c88a76 100644 --- a/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetStatsConverterUtil.java +++ b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetStatsConverterUtil.java @@ -18,52 +18,199 @@ package org.apache.xtable.parquet; +import java.math.BigDecimal; +import java.math.BigInteger; import java.nio.charset.StandardCharsets; import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; import org.apache.parquet.io.api.Binary; +import org.apache.parquet.schema.LogicalTypeAnnotation; import org.apache.parquet.schema.PrimitiveType; +/** + * Utility class for converting Parquet statistics from binary format to logical types. This class + * provides safe type conversions that avoid ClassCastException by skipping complex or incompatible + * type conversions. + */ public class ParquetStatsConverterUtil { + + /** + * Converts Parquet statistics from binary representation to their logical type representation. + * This method performs type-safe conversions and returns null for types that cannot be safely + * converted, preventing ClassCastException errors during stats extraction. + * + *

Supported conversions: + * + *

    + *
  • STRING (BINARY with STRING logical type) - converted to Java String + *
  • Primitive types (INT32, INT64, FLOAT, DOUBLE, BOOLEAN) - returned as-is if already + * primitives + *
  • DECIMAL - converted to BigDecimal using precision and scale from metadata + *
+ * + *

Unsupported types that return null: + * + *

    + *
  • Binary-encoded primitives - indicates encoding mismatch + *
  • Complex BINARY types (JSON, BSON, UUID, etc.) + *
  • INT96 - complex temporal types (legacy timestamp format) + *
+ * + * @param columnMetaData the Parquet column metadata containing statistics + * @param isMin true to extract minimum value, false for maximum value + * @return the converted logical value, or null if conversion is not supported or value is null + */ public static Object convertStatBinaryTypeToLogicalType( ColumnChunkMetaData columnMetaData, boolean isMin) { - Object returnedObj = null; PrimitiveType primitiveType = columnMetaData.getPrimitiveType(); + LogicalTypeAnnotation logicalType = primitiveType.getLogicalTypeAnnotation(); + + // Handle DECIMAL types with proper precision/scale conversion + if (logicalType instanceof LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) { + return convertDecimalStat(columnMetaData, isMin, logicalType); + } + + // Extract the raw statistic value from Parquet metadata + Object rawValue = + isMin + ? columnMetaData.getStatistics().genericGetMin() + : columnMetaData.getStatistics().genericGetMax(); + + if (rawValue == null) { + return null; + } + + // Perform type-specific conversion based on Parquet primitive type + // Only safe conversions are performed to prevent runtime ClassCastException switch (primitiveType.getPrimitiveTypeName()) { - case BINARY: // TODO check if other primitiveType' needs to be handled as well - if (primitiveType.getLogicalTypeAnnotation() != null) { - if (columnMetaData - .getPrimitiveType() - .getLogicalTypeAnnotation() - .toString() - .equals("STRING")) { - returnedObj = - new String( - (isMin - ? (Binary) columnMetaData.getStatistics().genericGetMin() - : (Binary) columnMetaData.getStatistics().genericGetMax()) - .getBytes(), - StandardCharsets.UTF_8); - } else { - returnedObj = - isMin - ? columnMetaData.getStatistics().genericGetMin() - : columnMetaData.getStatistics().genericGetMax(); + case BINARY: + // Handle BINARY type with STRING logical annotation + if (logicalType != null && logicalType.toString().equals("STRING")) { + if (rawValue instanceof Binary) { + return new String(((Binary) rawValue).getBytes(), StandardCharsets.UTF_8); } - } else { - returnedObj = - isMin - ? columnMetaData.getStatistics().genericGetMin() - : columnMetaData.getStatistics().genericGetMax(); } - break; + // Skip other BINARY logical types (JSON, BSON, UUID, etc.) as they require + // specialized conversion logic + return null; + + case INT32: + // Regular INT32 - return as-is if not Binary + // Note: DECIMAL stored as INT32 is already handled at the top level + if (!(rawValue instanceof Binary)) { + return rawValue; + } + return null; + + case INT64: + // Regular INT64 - return as-is if not Binary + // Note: DECIMAL stored as INT64 is already handled at the top level + if (!(rawValue instanceof Binary)) { + return rawValue; + } + return null; + + case FLOAT: + case DOUBLE: + case BOOLEAN: + // These types should be returned as Java primitives by Parquet + // If we receive a Binary object instead, it indicates an encoding mismatch + // and we skip the conversion to prevent ClassCastException + if (!(rawValue instanceof Binary)) { + return rawValue; + } + return null; + + case FIXED_LEN_BYTE_ARRAY: + // Note: DECIMAL stored as FIXED_LEN_BYTE_ARRAY is already handled at the top level + // Other FIXED_LEN_BYTE_ARRAY types - skip to avoid conversion issues + return null; + + case INT96: + // INT96 is legacy timestamp format - skip to avoid conversion issues + return null; + default: - returnedObj = - isMin - ? columnMetaData.getStatistics().genericGetMin() - : columnMetaData.getStatistics().genericGetMax(); - // TODO JSON and DECIMAL... of BINARY primitiveType + // Unknown or unsupported Parquet type - skip for safety + return null; + } + } + + /** + * Converts DECIMAL statistics from Parquet format to BigDecimal. Handles DECIMAL values stored as + * INT32, INT64, or FIXED_LEN_BYTE_ARRAY in Parquet. + * + *

Parquet DECIMAL storage formats: + *

    + *
  • INT32: For precision <= 9, stored as signed integer + *
  • INT64: For precision <= 18, stored as signed long + *
  • FIXED_LEN_BYTE_ARRAY: For precision > 18, stored as binary (big-endian) + *
+ * + * @param columnMetaData the Parquet column metadata containing statistics + * @param isMin true to extract minimum value, false for maximum value + * @param decimalType the DECIMAL logical type annotation with precision and scale + * @return BigDecimal value, or null if conversion fails + */ + private static Object convertDecimalStat( + ColumnChunkMetaData columnMetaData, + boolean isMin, + LogicalTypeAnnotation decimalType) { + try { + LogicalTypeAnnotation.DecimalLogicalTypeAnnotation decimalAnnotation = + (LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) decimalType; + int scale = decimalAnnotation.getScale(); + + // Extract the raw statistic value + Object rawValue = + isMin + ? columnMetaData.getStatistics().genericGetMin() + : columnMetaData.getStatistics().genericGetMax(); + + if (rawValue == null) { + return null; + } + + PrimitiveType primitiveType = columnMetaData.getPrimitiveType(); + PrimitiveType.PrimitiveTypeName typeName = primitiveType.getPrimitiveTypeName(); + + // Convert based on Parquet storage format + switch (typeName) { + case INT32: + // DECIMAL stored as INT32 (precision <= 9) + if (rawValue instanceof Integer) { + return BigDecimal.valueOf((Integer) rawValue, scale); + } + return null; + + case INT64: + // DECIMAL stored as INT64 (precision <= 18) + if (rawValue instanceof Long) { + return BigDecimal.valueOf((Long) rawValue, scale); + } + return null; + + case FIXED_LEN_BYTE_ARRAY: + case BINARY: + // DECIMAL stored as FIXED_LEN_BYTE_ARRAY or BINARY (precision > 18) + // Values are stored as big-endian binary + if (rawValue instanceof Binary) { + Binary binaryValue = (Binary) rawValue; + byte[] bytes = binaryValue.getBytes(); + // Parquet DECIMAL is stored as big-endian signed integer + BigInteger unscaledValue = new BigInteger(bytes); + return new BigDecimal(unscaledValue, scale); + } + return null; + + default: + // Unsupported DECIMAL storage format + return null; + } + } catch (Exception e) { + // If conversion fails, return null to skip this statistic + // This prevents ClassCastException and allows other stats to be extracted + return null; } - return returnedObj; } } diff --git a/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergSync.java b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergSync.java index d5a25b022..8bc228086 100644 --- a/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergSync.java +++ b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergSync.java @@ -203,7 +203,9 @@ private IcebergConversionTarget getConversionTarget() { mockPartitionSpecExtractor, mockPartitionSpecSync, IcebergDataFileUpdatesSync.of( - mockColumnStatsConverter, IcebergPartitionValueConverter.getInstance()), + mockColumnStatsConverter, + IcebergPartitionValueConverter.getInstance(), + CONFIGURATION), IcebergTableManager.of(CONFIGURATION)); } From 9e14f33d1984d4c3379cb91f5c255bcb77407c50 Mon Sep 17 00:00:00 2001 From: dharma-shashank-meesho Date: Tue, 25 Nov 2025 12:17:39 +0530 Subject: [PATCH 2/2] tests updated --- .gitignore | 2 - .../delta/TestDeltaSchemaExtractor.java | 39 +++++++++++++++++++ 2 files changed, 39 insertions(+), 2 deletions(-) diff --git a/.gitignore b/.gitignore index 33f70eb01..12d8ce0b2 100644 --- a/.gitignore +++ b/.gitignore @@ -46,5 +46,3 @@ my_config_catalog.yaml # REST generated models spec/generated -.pre-commit-config.yaml -trufflehog/ \ No newline at end of file diff --git a/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaSchemaExtractor.java b/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaSchemaExtractor.java index 81ab34d24..818b4010d 100644 --- a/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaSchemaExtractor.java +++ b/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaSchemaExtractor.java @@ -834,4 +834,43 @@ public void testIcebergToDeltaUUIDSupport() { Assertions.assertEquals( internalSchema, DeltaSchemaExtractor.getInstance().toInternalSchema(structRepresentation)); } + + @Test + public void testShortTypeConversion() { + InternalSchema internalSchema = + InternalSchema.builder() + .name("struct") + .dataType(InternalType.RECORD) + .isNullable(false) + .fields( + Arrays.asList( + InternalField.builder() + .name("requiredShort") + .schema( + InternalSchema.builder() + .name("short") + .dataType(InternalType.INT) + .isNullable(false) + .build()) + .build(), + InternalField.builder() + .name("optionalShort") + .schema( + InternalSchema.builder() + .name("short") + .dataType(InternalType.INT) + .isNullable(true) + .build()) + .defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE) + .build())) + .build(); + + StructType structRepresentation = + new StructType() + .add("requiredShort", DataTypes.ShortType, false) + .add("optionalShort", DataTypes.ShortType, true); + + Assertions.assertEquals( + internalSchema, DeltaSchemaExtractor.getInstance().toInternalSchema(structRepresentation)); + } }