Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 80 additions & 41 deletions table/internal/parquet_files.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,59 +101,66 @@ func (parquetFormat) PathToIDMapping(sc *iceberg.Schema) (map[string]int, error)
}

func (p parquetFormat) createStatsAgg(typ iceberg.PrimitiveType, physicalTypeStr string, truncLen int) (StatsAgg, error) {
expectedPhysical := p.PrimitiveTypeToPhysicalType(typ)
if physicalTypeStr != expectedPhysical {
switch {
case physicalTypeStr == "INT32" && expectedPhysical == "INT64":
case physicalTypeStr == "FLOAT" && expectedPhysical == "DOUBLE":
default:
return nil, fmt.Errorf("unexpected physical type %s for %s, expected %s",
physicalTypeStr, typ, expectedPhysical)
}
}

switch physicalTypeStr {
case "BOOLEAN":
// Switch on Iceberg logical type first, then handle physical representations.
// This matches iceberg-java's approach in ParquetConversions.java.
switch typ.(type) {
case iceberg.BooleanType:
return newStatAgg[bool](typ, truncLen), nil
case "INT32":
switch typ.(type) {
case iceberg.DecimalType:
return &decAsIntAgg[int32]{
newStatAgg[int32](typ, truncLen).(*statsAggregator[int32]),
}, nil
}

case iceberg.Int32Type, iceberg.DateType:
return newStatAgg[int32](typ, truncLen), nil
case "INT64":
switch typ.(type) {
case iceberg.DecimalType:
return &decAsIntAgg[int64]{
newStatAgg[int64](typ, truncLen).(*statsAggregator[int64]),
}, nil

case iceberg.Int64Type, iceberg.TimeType, iceberg.TimestampType, iceberg.TimestampTzType:
// Allow INT32 physical for INT64 logical (promotion)
if physicalTypeStr == "INT32" {
return newStatAgg[int32](typ, truncLen), nil
}

return newStatAgg[int64](typ, truncLen), nil
case "FLOAT":

case iceberg.Float32Type:
return newStatAgg[float32](typ, truncLen), nil
case "DOUBLE":

case iceberg.Float64Type:
// Allow FLOAT physical for DOUBLE logical (promotion)
if physicalTypeStr == "FLOAT" {
return newStatAgg[float32](typ, truncLen), nil
}

return newStatAgg[float64](typ, truncLen), nil
case "FIXED_LEN_BYTE_ARRAY":
switch typ.(type) {
case iceberg.UUIDType:
return newStatAgg[uuid.UUID](typ, truncLen), nil
case iceberg.DecimalType:

case iceberg.StringType:
return newStatAgg[string](typ, truncLen), nil

case iceberg.BinaryType:
return newStatAgg[[]byte](typ, truncLen), nil

case iceberg.UUIDType:
return newStatAgg[uuid.UUID](typ, truncLen), nil

case iceberg.FixedType:
return newStatAgg[[]byte](typ, truncLen), nil

case iceberg.DecimalType:
// Decimals can be stored as INT32 (precision <= 9), INT64 (precision <= 18),
// FIXED_LEN_BYTE_ARRAY, or BYTE_ARRAY per Parquet spec.
switch physicalTypeStr {
case "INT32":
return &decAsIntAgg[int32]{
newStatAgg[int32](typ, truncLen).(*statsAggregator[int32]),
}, nil
case "INT64":
return &decAsIntAgg[int64]{
newStatAgg[int64](typ, truncLen).(*statsAggregator[int64]),
}, nil
case "FIXED_LEN_BYTE_ARRAY", "BYTE_ARRAY":
return newStatAgg[iceberg.Decimal](typ, truncLen), nil
default:
return newStatAgg[[]byte](typ, truncLen), nil
}
case "BYTE_ARRAY":
if typ.Equals(iceberg.PrimitiveTypes.String) {
return newStatAgg[string](typ, truncLen), nil
return nil, fmt.Errorf("unsupported physical type %s for decimal", physicalTypeStr)
}

return newStatAgg[[]byte](typ, truncLen), nil
default:
return nil, fmt.Errorf("unsupported physical type: %s", physicalTypeStr)
return nil, fmt.Errorf("unsupported iceberg type: %s", typ)
}
}

Expand Down Expand Up @@ -400,6 +407,29 @@ func (w wrappedDecStats) Max() iceberg.Decimal {
return iceberg.Decimal{Val: dec, Scale: w.scale}
}

type wrappedDecByteArrayStats struct {
*metadata.ByteArrayStatistics
scale int
}

func (w wrappedDecByteArrayStats) Min() iceberg.Decimal {
dec, err := BigEndianToDecimal(w.ByteArrayStatistics.Min())
if err != nil {
panic(err)
}

return iceberg.Decimal{Val: dec, Scale: w.scale}
}

func (w wrappedDecByteArrayStats) Max() iceberg.Decimal {
dec, err := BigEndianToDecimal(w.ByteArrayStatistics.Max())
if err != nil {
panic(err)
}

return iceberg.Decimal{Val: dec, Scale: w.scale}
}

func (p parquetFormat) DataFileStatsFromMeta(meta Metadata, statsCols map[int]StatisticsCollector, colMapping map[string]int) *DataFileStatistics {
pqmeta := meta.(*metadata.FileMetaData)
var (
Expand Down Expand Up @@ -487,7 +517,16 @@ func (p parquetFormat) DataFileStatsFromMeta(meta Metadata, statsCols map[int]St
case iceberg.FixedType:
stats = &wrappedFLBAStats{stats.(*metadata.FixedLenByteArrayStatistics)}
case iceberg.DecimalType:
stats = &wrappedDecStats{stats.(*metadata.FixedLenByteArrayStatistics), t.Scale()}
// Decimals can be stored as INT32/INT64 (small precision) or FIXED_LEN_BYTE_ARRAY/BYTE_ARRAY.
// Only wrap FIXED_LEN_BYTE_ARRAY and BYTE_ARRAY statistics; INT32/INT64 stats
// are used directly by decAsIntAgg.
switch s := stats.(type) {
case *metadata.FixedLenByteArrayStatistics:
stats = &wrappedDecStats{s, t.Scale()}
case *metadata.ByteArrayStatistics:
stats = &wrappedDecByteArrayStats{s, t.Scale()}
// INT32/INT64 statistics are used directly by decAsIntAgg
}
}

agg.Update(stats)
Expand Down
155 changes: 155 additions & 0 deletions table/internal/parquet_files_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package internal_test
import (
"bytes"
"context"
"fmt"
"math/big"
"strings"
"testing"
Expand All @@ -33,6 +34,7 @@ import (
"github.com/apache/arrow-go/v18/parquet/file"
"github.com/apache/arrow-go/v18/parquet/metadata"
"github.com/apache/arrow-go/v18/parquet/pqarrow"
"github.com/apache/arrow-go/v18/parquet/schema"
"github.com/apache/iceberg-go"
internal2 "github.com/apache/iceberg-go/internal"
"github.com/apache/iceberg-go/table"
Expand Down Expand Up @@ -330,6 +332,159 @@ func TestMetricsPrimitiveTypes(t *testing.T) {
})
}

// TestDecimalPhysicalTypes tests that decimals stored as INT32/INT64 physical types
// are correctly handled. This is important because Parquet allows decimals with
// precision <= 9 to be stored as INT32, and precision <= 18 as INT64.
func TestDecimalPhysicalTypes(t *testing.T) {
format := internal.GetFileFormat(iceberg.ParquetFile)

tests := []struct {
name string
precision int
scale int
physicalType parquet.Type
values []int64 // unscaled values
expectedMin int64
expectedMax int64
}{
{
name: "decimal_as_int32",
precision: 7,
scale: 2,
physicalType: parquet.Types.Int32,
values: []int64{12345, 67890}, // represents 123.45, 678.90
expectedMin: 12345,
expectedMax: 67890,
},
{
name: "decimal_as_int64",
precision: 15,
scale: 2,
physicalType: parquet.Types.Int64,
values: []int64{123456789012345, 987654321098765},
expectedMin: 123456789012345,
expectedMax: 987654321098765,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// Create a Parquet file with decimal stored as INT32 or INT64
var buf bytes.Buffer

// Build a custom schema with decimal logical type
decType := schema.NewDecimalLogicalType(int32(tt.precision), int32(tt.scale))
var node schema.Node
var err error
if tt.physicalType == parquet.Types.Int32 {
node, err = schema.NewPrimitiveNodeLogical("value", parquet.Repetitions.Required,
decType, parquet.Types.Int32, 0, 1)
} else {
node, err = schema.NewPrimitiveNodeLogical("value", parquet.Repetitions.Required,
decType, parquet.Types.Int64, 0, 1)
}
require.NoError(t, err)

rootNode, err := schema.NewGroupNode("schema", parquet.Repetitions.Required, schema.FieldList{node}, -1)
require.NoError(t, err)

// Write the parquet file
writer := file.NewParquetWriter(&buf,
rootNode,
file.WithWriterProps(parquet.NewWriterProperties(parquet.WithStats(true))))

rgw := writer.AppendRowGroup()
colWriter, err := rgw.NextColumn()
require.NoError(t, err)

if tt.physicalType == parquet.Types.Int32 {
int32Writer := colWriter.(*file.Int32ColumnChunkWriter)
vals := make([]int32, len(tt.values))
for i, v := range tt.values {
vals[i] = int32(v)
}
_, err = int32Writer.WriteBatch(vals, nil, nil)
} else {
int64Writer := colWriter.(*file.Int64ColumnChunkWriter)
_, err = int64Writer.WriteBatch(tt.values, nil, nil)
}
require.NoError(t, err)

require.NoError(t, colWriter.Close())
require.NoError(t, rgw.Close())
require.NoError(t, writer.Close())

// Read back and get metadata
rdr, err := file.NewParquetReader(bytes.NewReader(buf.Bytes()))
require.NoError(t, err)
defer rdr.Close()

meta := rdr.MetaData()

// Create table metadata with decimal type
tableMeta, err := table.ParseMetadataString(fmt.Sprintf(`{
"format-version": 2,
"location": "s3://bucket/test/location",
"last-column-id": 1,
"current-schema-id": 0,
"schemas": [
{
"type": "struct",
"schema-id": 0,
"fields": [
{"id": 1, "name": "value", "required": true, "type": "decimal(%d, %d)"}
]
}
],
"last-partition-id": 0,
"last-updated-ms": -1,
"default-spec-id": 0,
"default-sort-order-id": 0,
"sort-orders": [{"order-id": 0, "fields": []}],
"partition-specs": [{"spec-id": 0, "fields": []}],
"properties": {}
}`, tt.precision, tt.scale))
require.NoError(t, err)

mapping, err := format.PathToIDMapping(tableMeta.CurrentSchema())
require.NoError(t, err)

collector := map[int]internal.StatisticsCollector{
1: {
FieldID: 1,
Mode: internal.MetricsMode{Typ: internal.MetricModeFull},
ColName: "value",
IcebergTyp: iceberg.DecimalTypeOf(tt.precision, tt.scale),
},
}

// This should not panic - the fix allows INT32/INT64 physical types for decimals
stats := format.DataFileStatsFromMeta(internal.Metadata(meta), collector, mapping)
require.NotNil(t, stats)

df := stats.ToDataFile(tableMeta.CurrentSchema(), tableMeta.PartitionSpec(), "test.parquet",
iceberg.ParquetFile, meta.GetSourceFileSize(), nil)

// Verify bounds are correctly extracted
require.Contains(t, df.LowerBoundValues(), 1)
require.Contains(t, df.UpperBoundValues(), 1)

// Verify the actual values
minLit, err := iceberg.LiteralFromBytes(iceberg.DecimalTypeOf(tt.precision, tt.scale), df.LowerBoundValues()[1])
require.NoError(t, err)
minDec := minLit.(iceberg.TypedLiteral[iceberg.Decimal]).Value()
assert.Equal(t, uint64(tt.expectedMin), minDec.Val.LowBits())
assert.Equal(t, tt.scale, minDec.Scale)

maxLit, err := iceberg.LiteralFromBytes(iceberg.DecimalTypeOf(tt.precision, tt.scale), df.UpperBoundValues()[1])
require.NoError(t, err)
maxDec := maxLit.(iceberg.TypedLiteral[iceberg.Decimal]).Value()
assert.Equal(t, uint64(tt.expectedMax), maxDec.Val.LowBits())
assert.Equal(t, tt.scale, maxDec.Scale)
})
}
}

func TestWriteDataFileErrOnClose(t *testing.T) {
ctx := context.Background()
fm := internal.GetFileFormat(iceberg.ParquetFile)
Expand Down
Loading