From 6ad19cb740e14140a6ab9cbee20b4387fba55611 Mon Sep 17 00:00:00 2001 From: Karthic Rao Date: Tue, 20 Jan 2026 12:36:34 +0530 Subject: [PATCH 1/2] fix: support INT32/INT64 physical types for decimal columns Parquet decimals can be stored using multiple physical types depending on precision: - INT32 for precision <= 9 - INT64 for precision <= 18 - FIXED_LEN_BYTE_ARRAY for any precision - BYTE_ARRAY for any precision The previous implementation only accepted FIXED_LEN_BYTE_ARRAY for all decimals and rejected valid parquet files with error: unexpected physical type INT32 for decimal(7, 2), expected FIXED_LEN_BYTE_ARRAY This caused AddFiles to fail when importing datasets (like TPC-DS) that use INT32/INT64 for small precision decimals, which is valid per the Parquet specification. This change: - Refactors createStatsAgg to switch on Iceberg logical type first, then handle physical representations (matches iceberg-java's ParquetConversions.java approach) - For DecimalType, accepts all valid parquet physical types - Updates DataFileStatsFromMeta to handle INT32/INT64 decimal statistics - Adds wrappedDecByteArrayStats for BYTE_ARRAY encoded decimals --- table/internal/parquet_files.go | 121 +++++++++++++++++++++----------- 1 file changed, 80 insertions(+), 41 deletions(-) diff --git a/table/internal/parquet_files.go b/table/internal/parquet_files.go index 7ed2f3094..8e7cf252b 100644 --- a/table/internal/parquet_files.go +++ b/table/internal/parquet_files.go @@ -101,59 +101,66 @@ func (parquetFormat) PathToIDMapping(sc *iceberg.Schema) (map[string]int, error) } func (p parquetFormat) createStatsAgg(typ iceberg.PrimitiveType, physicalTypeStr string, truncLen int) (StatsAgg, error) { - expectedPhysical := p.PrimitiveTypeToPhysicalType(typ) - if physicalTypeStr != expectedPhysical { - switch { - case physicalTypeStr == "INT32" && expectedPhysical == "INT64": - case physicalTypeStr == "FLOAT" && expectedPhysical == "DOUBLE": - default: - return nil, fmt.Errorf("unexpected physical type %s for %s, expected %s", - physicalTypeStr, typ, expectedPhysical) - } - } - - switch physicalTypeStr { - case "BOOLEAN": + // Switch on Iceberg logical type first, then handle physical representations. + // This matches iceberg-java's approach in ParquetConversions.java. + switch typ.(type) { + case iceberg.BooleanType: return newStatAgg[bool](typ, truncLen), nil - case "INT32": - switch typ.(type) { - case iceberg.DecimalType: - return &decAsIntAgg[int32]{ - newStatAgg[int32](typ, truncLen).(*statsAggregator[int32]), - }, nil - } + case iceberg.Int32Type, iceberg.DateType: return newStatAgg[int32](typ, truncLen), nil - case "INT64": - switch typ.(type) { - case iceberg.DecimalType: - return &decAsIntAgg[int64]{ - newStatAgg[int64](typ, truncLen).(*statsAggregator[int64]), - }, nil + + case iceberg.Int64Type, iceberg.TimeType, iceberg.TimestampType, iceberg.TimestampTzType: + // Allow INT32 physical for INT64 logical (promotion) + if physicalTypeStr == "INT32" { + return newStatAgg[int32](typ, truncLen), nil } return newStatAgg[int64](typ, truncLen), nil - case "FLOAT": + + case iceberg.Float32Type: return newStatAgg[float32](typ, truncLen), nil - case "DOUBLE": + + case iceberg.Float64Type: + // Allow FLOAT physical for DOUBLE logical (promotion) + if physicalTypeStr == "FLOAT" { + return newStatAgg[float32](typ, truncLen), nil + } + return newStatAgg[float64](typ, truncLen), nil - case "FIXED_LEN_BYTE_ARRAY": - switch typ.(type) { - case iceberg.UUIDType: - return newStatAgg[uuid.UUID](typ, truncLen), nil - case iceberg.DecimalType: + + case iceberg.StringType: + return newStatAgg[string](typ, truncLen), nil + + case iceberg.BinaryType: + return newStatAgg[[]byte](typ, truncLen), nil + + case iceberg.UUIDType: + return newStatAgg[uuid.UUID](typ, truncLen), nil + + case iceberg.FixedType: + return newStatAgg[[]byte](typ, truncLen), nil + + case iceberg.DecimalType: + // Decimals can be stored as INT32 (precision <= 9), INT64 (precision <= 18), + // FIXED_LEN_BYTE_ARRAY, or BYTE_ARRAY per Parquet spec. + switch physicalTypeStr { + case "INT32": + return &decAsIntAgg[int32]{ + newStatAgg[int32](typ, truncLen).(*statsAggregator[int32]), + }, nil + case "INT64": + return &decAsIntAgg[int64]{ + newStatAgg[int64](typ, truncLen).(*statsAggregator[int64]), + }, nil + case "FIXED_LEN_BYTE_ARRAY", "BYTE_ARRAY": return newStatAgg[iceberg.Decimal](typ, truncLen), nil default: - return newStatAgg[[]byte](typ, truncLen), nil - } - case "BYTE_ARRAY": - if typ.Equals(iceberg.PrimitiveTypes.String) { - return newStatAgg[string](typ, truncLen), nil + return nil, fmt.Errorf("unsupported physical type %s for decimal", physicalTypeStr) } - return newStatAgg[[]byte](typ, truncLen), nil default: - return nil, fmt.Errorf("unsupported physical type: %s", physicalTypeStr) + return nil, fmt.Errorf("unsupported iceberg type: %s", typ) } } @@ -400,6 +407,29 @@ func (w wrappedDecStats) Max() iceberg.Decimal { return iceberg.Decimal{Val: dec, Scale: w.scale} } +type wrappedDecByteArrayStats struct { + *metadata.ByteArrayStatistics + scale int +} + +func (w wrappedDecByteArrayStats) Min() iceberg.Decimal { + dec, err := BigEndianToDecimal(w.ByteArrayStatistics.Min()) + if err != nil { + panic(err) + } + + return iceberg.Decimal{Val: dec, Scale: w.scale} +} + +func (w wrappedDecByteArrayStats) Max() iceberg.Decimal { + dec, err := BigEndianToDecimal(w.ByteArrayStatistics.Max()) + if err != nil { + panic(err) + } + + return iceberg.Decimal{Val: dec, Scale: w.scale} +} + func (p parquetFormat) DataFileStatsFromMeta(meta Metadata, statsCols map[int]StatisticsCollector, colMapping map[string]int) *DataFileStatistics { pqmeta := meta.(*metadata.FileMetaData) var ( @@ -487,7 +517,16 @@ func (p parquetFormat) DataFileStatsFromMeta(meta Metadata, statsCols map[int]St case iceberg.FixedType: stats = &wrappedFLBAStats{stats.(*metadata.FixedLenByteArrayStatistics)} case iceberg.DecimalType: - stats = &wrappedDecStats{stats.(*metadata.FixedLenByteArrayStatistics), t.Scale()} + // Decimals can be stored as INT32/INT64 (small precision) or FIXED_LEN_BYTE_ARRAY/BYTE_ARRAY. + // Only wrap FIXED_LEN_BYTE_ARRAY and BYTE_ARRAY statistics; INT32/INT64 stats + // are used directly by decAsIntAgg. + switch s := stats.(type) { + case *metadata.FixedLenByteArrayStatistics: + stats = &wrappedDecStats{s, t.Scale()} + case *metadata.ByteArrayStatistics: + stats = &wrappedDecByteArrayStats{s, t.Scale()} + // INT32/INT64 statistics are used directly by decAsIntAgg + } } agg.Update(stats) From 6cc0a2b778d3cbed640c16d3cc6ccfb44f43cf15 Mon Sep 17 00:00:00 2001 From: Karthic Rao Date: Wed, 21 Jan 2026 09:17:01 +0530 Subject: [PATCH 2/2] test: add test for decimal columns with INT32/INT64 physical types Adds TestDecimalPhysicalTypes which creates Parquet files with decimals stored as INT32 (precision <= 9) and INT64 (precision <= 18) physical types, then verifies that DataFileStatsFromMeta correctly extracts column statistics without panicking. This test validates the fix for the decimal physical type handling. --- table/internal/parquet_files_test.go | 155 +++++++++++++++++++++++++++ 1 file changed, 155 insertions(+) diff --git a/table/internal/parquet_files_test.go b/table/internal/parquet_files_test.go index 47772cc8e..85480da08 100644 --- a/table/internal/parquet_files_test.go +++ b/table/internal/parquet_files_test.go @@ -20,6 +20,7 @@ package internal_test import ( "bytes" "context" + "fmt" "math/big" "strings" "testing" @@ -33,6 +34,7 @@ import ( "github.com/apache/arrow-go/v18/parquet/file" "github.com/apache/arrow-go/v18/parquet/metadata" "github.com/apache/arrow-go/v18/parquet/pqarrow" + "github.com/apache/arrow-go/v18/parquet/schema" "github.com/apache/iceberg-go" internal2 "github.com/apache/iceberg-go/internal" "github.com/apache/iceberg-go/table" @@ -330,6 +332,159 @@ func TestMetricsPrimitiveTypes(t *testing.T) { }) } +// TestDecimalPhysicalTypes tests that decimals stored as INT32/INT64 physical types +// are correctly handled. This is important because Parquet allows decimals with +// precision <= 9 to be stored as INT32, and precision <= 18 as INT64. +func TestDecimalPhysicalTypes(t *testing.T) { + format := internal.GetFileFormat(iceberg.ParquetFile) + + tests := []struct { + name string + precision int + scale int + physicalType parquet.Type + values []int64 // unscaled values + expectedMin int64 + expectedMax int64 + }{ + { + name: "decimal_as_int32", + precision: 7, + scale: 2, + physicalType: parquet.Types.Int32, + values: []int64{12345, 67890}, // represents 123.45, 678.90 + expectedMin: 12345, + expectedMax: 67890, + }, + { + name: "decimal_as_int64", + precision: 15, + scale: 2, + physicalType: parquet.Types.Int64, + values: []int64{123456789012345, 987654321098765}, + expectedMin: 123456789012345, + expectedMax: 987654321098765, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Create a Parquet file with decimal stored as INT32 or INT64 + var buf bytes.Buffer + + // Build a custom schema with decimal logical type + decType := schema.NewDecimalLogicalType(int32(tt.precision), int32(tt.scale)) + var node schema.Node + var err error + if tt.physicalType == parquet.Types.Int32 { + node, err = schema.NewPrimitiveNodeLogical("value", parquet.Repetitions.Required, + decType, parquet.Types.Int32, 0, 1) + } else { + node, err = schema.NewPrimitiveNodeLogical("value", parquet.Repetitions.Required, + decType, parquet.Types.Int64, 0, 1) + } + require.NoError(t, err) + + rootNode, err := schema.NewGroupNode("schema", parquet.Repetitions.Required, schema.FieldList{node}, -1) + require.NoError(t, err) + + // Write the parquet file + writer := file.NewParquetWriter(&buf, + rootNode, + file.WithWriterProps(parquet.NewWriterProperties(parquet.WithStats(true)))) + + rgw := writer.AppendRowGroup() + colWriter, err := rgw.NextColumn() + require.NoError(t, err) + + if tt.physicalType == parquet.Types.Int32 { + int32Writer := colWriter.(*file.Int32ColumnChunkWriter) + vals := make([]int32, len(tt.values)) + for i, v := range tt.values { + vals[i] = int32(v) + } + _, err = int32Writer.WriteBatch(vals, nil, nil) + } else { + int64Writer := colWriter.(*file.Int64ColumnChunkWriter) + _, err = int64Writer.WriteBatch(tt.values, nil, nil) + } + require.NoError(t, err) + + require.NoError(t, colWriter.Close()) + require.NoError(t, rgw.Close()) + require.NoError(t, writer.Close()) + + // Read back and get metadata + rdr, err := file.NewParquetReader(bytes.NewReader(buf.Bytes())) + require.NoError(t, err) + defer rdr.Close() + + meta := rdr.MetaData() + + // Create table metadata with decimal type + tableMeta, err := table.ParseMetadataString(fmt.Sprintf(`{ + "format-version": 2, + "location": "s3://bucket/test/location", + "last-column-id": 1, + "current-schema-id": 0, + "schemas": [ + { + "type": "struct", + "schema-id": 0, + "fields": [ + {"id": 1, "name": "value", "required": true, "type": "decimal(%d, %d)"} + ] + } + ], + "last-partition-id": 0, + "last-updated-ms": -1, + "default-spec-id": 0, + "default-sort-order-id": 0, + "sort-orders": [{"order-id": 0, "fields": []}], + "partition-specs": [{"spec-id": 0, "fields": []}], + "properties": {} + }`, tt.precision, tt.scale)) + require.NoError(t, err) + + mapping, err := format.PathToIDMapping(tableMeta.CurrentSchema()) + require.NoError(t, err) + + collector := map[int]internal.StatisticsCollector{ + 1: { + FieldID: 1, + Mode: internal.MetricsMode{Typ: internal.MetricModeFull}, + ColName: "value", + IcebergTyp: iceberg.DecimalTypeOf(tt.precision, tt.scale), + }, + } + + // This should not panic - the fix allows INT32/INT64 physical types for decimals + stats := format.DataFileStatsFromMeta(internal.Metadata(meta), collector, mapping) + require.NotNil(t, stats) + + df := stats.ToDataFile(tableMeta.CurrentSchema(), tableMeta.PartitionSpec(), "test.parquet", + iceberg.ParquetFile, meta.GetSourceFileSize(), nil) + + // Verify bounds are correctly extracted + require.Contains(t, df.LowerBoundValues(), 1) + require.Contains(t, df.UpperBoundValues(), 1) + + // Verify the actual values + minLit, err := iceberg.LiteralFromBytes(iceberg.DecimalTypeOf(tt.precision, tt.scale), df.LowerBoundValues()[1]) + require.NoError(t, err) + minDec := minLit.(iceberg.TypedLiteral[iceberg.Decimal]).Value() + assert.Equal(t, uint64(tt.expectedMin), minDec.Val.LowBits()) + assert.Equal(t, tt.scale, minDec.Scale) + + maxLit, err := iceberg.LiteralFromBytes(iceberg.DecimalTypeOf(tt.precision, tt.scale), df.UpperBoundValues()[1]) + require.NoError(t, err) + maxDec := maxLit.(iceberg.TypedLiteral[iceberg.Decimal]).Value() + assert.Equal(t, uint64(tt.expectedMax), maxDec.Val.LowBits()) + assert.Equal(t, tt.scale, maxDec.Scale) + }) + } +} + func TestWriteDataFileErrOnClose(t *testing.T) { ctx := context.Background() fm := internal.GetFileFormat(iceberg.ParquetFile)