Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions literals.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,12 @@ func LiteralFromBytes(typ Type, data []byte) (Literal, error) {
var v UUIDLiteral
err := v.UnmarshalBinary(data)

return v, err
case GeometryType, GeographyType:
// Geometry and Geography are stored as WKB (binary format)
var v BinaryLiteral
err := v.UnmarshalBinary(data)

return v, err
}

Expand Down
6 changes: 6 additions & 0 deletions schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -494,6 +494,8 @@ type SchemaVisitorPerPrimitiveType[T any] interface {
VisitBinary() T
VisitUUID() T
VisitUnknown() T
VisitGeometry(GeometryType) T
VisitGeography(GeographyType) T
}

// Visit accepts a visitor and performs a post-order traversal of the given schema.
Expand Down Expand Up @@ -640,6 +642,10 @@ func visitField[T any](f NestedField, visitor SchemaVisitor[T]) T {
return perPrimitive.VisitFixed(t)
case UnknownType:
return perPrimitive.VisitUnknown()
case GeometryType:
return perPrimitive.VisitGeometry(t)
case GeographyType:
return perPrimitive.VisitGeography(t)
}
}

Expand Down
12 changes: 12 additions & 0 deletions schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,18 @@ func TestSerializeSchema(t *testing.T) {
"schema-id": 1,
"identifier-field-ids": [2]
}`, string(data))

// Test schema with geometry and geography
schemaWithGeo := iceberg.NewSchema(2,
iceberg.NestedField{ID: 1, Name: "id", Type: iceberg.PrimitiveTypes.Int32, Required: true},
iceberg.NestedField{ID: 2, Name: "location", Type: iceberg.GeometryTypeOf("srid:3857"), Required: false},
iceberg.NestedField{ID: 3, Name: "gps", Type: iceberg.GeographyTypeOf("srid:4326", iceberg.EdgeAlgorithmKarney), Required: false},
)
data2, err := json.Marshal(schemaWithGeo)
require.NoError(t, err)
var schemaFromJSON iceberg.Schema
require.NoError(t, json.Unmarshal(data2, &schemaFromJSON))
assert.True(t, schemaWithGeo.Equals(&schemaFromJSON))
}

func TestUnmarshalSchema(t *testing.T) {
Expand Down
22 changes: 22 additions & 0 deletions table/arrow_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -622,6 +622,22 @@ func (c convertToArrow) VisitUnknown() arrow.Field {
}
}

func (c convertToArrow) VisitGeometry(iceberg.GeometryType) arrow.Field {
if c.useLargeTypes {
return arrow.Field{Type: arrow.BinaryTypes.LargeBinary}
}

return arrow.Field{Type: arrow.BinaryTypes.Binary}
}

func (c convertToArrow) VisitGeography(iceberg.GeographyType) arrow.Field {
if c.useLargeTypes {
return arrow.Field{Type: arrow.BinaryTypes.LargeBinary}
}

return arrow.Field{Type: arrow.BinaryTypes.Binary}
}

var _ iceberg.SchemaVisitorPerPrimitiveType[arrow.Field] = convertToArrow{}

// SchemaToArrowSchema converts an Iceberg schema to an Arrow schema. If the metadata parameter
Expand Down Expand Up @@ -777,6 +793,12 @@ func (a *arrowProjectionVisitor) castIfNeeded(field iceberg.NestedField, vals ar

panic(fmt.Errorf("unsupported schema projection from %s to %s",
vals.DataType(), targetType))
case iceberg.GeometryType, iceberg.GeographyType:
if arrow.TypeEqual(vals.DataType(), arrow.BinaryTypes.Binary) ||
arrow.TypeEqual(vals.DataType(), arrow.BinaryTypes.LargeBinary) {
vals.Retain()
return vals
}
default:
return retOrPanic(compute.CastArray(a.ctx, vals,
compute.SafeCastOptions(targetType)))
Expand Down
114 changes: 113 additions & 1 deletion table/arrow_utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ func TestArrowToIceberg(t *testing.T) {
{arrow.BinaryTypes.LargeBinary, iceberg.PrimitiveTypes.Binary, false, ""},
{arrow.BinaryTypes.BinaryView, nil, false, "unsupported arrow type for conversion - binary_view"},
{extensions.NewUUIDType(), iceberg.PrimitiveTypes.UUID, true, ""},
// Note: Arrow binary types map to Iceberg BinaryType, not Geometry/Geography
// Geometry/Geography must be explicitly specified in Iceberg schema
{arrow.StructOf(arrow.Field{
Name: "foo",
Type: arrow.BinaryTypes.String,
Expand Down Expand Up @@ -342,9 +344,16 @@ var (
)

func TestArrowSchemaRoundTripConversion(t *testing.T) {
schemaWithGeo := iceberg.NewSchema(3,
iceberg.NestedField{ID: 1, Name: "id", Type: iceberg.PrimitiveTypes.Int32, Required: true},
iceberg.NestedField{ID: 2, Name: "location", Type: iceberg.GeometryTypeOf("srid:3857"), Required: false},
iceberg.NestedField{ID: 3, Name: "gps", Type: iceberg.GeographyTypeOf("srid:4326", iceberg.EdgeAlgorithmKarney), Required: false},
)

schemas := []*iceberg.Schema{
icebergSchemaSimple,
icebergSchemaNested,
schemaWithGeo,
}

for _, tt := range schemas {
Expand All @@ -354,7 +363,14 @@ func TestArrowSchemaRoundTripConversion(t *testing.T) {
ice, err := table.ArrowSchemaToIceberg(sc, false, nil)
require.NoError(t, err)

assert.True(t, tt.Equals(ice), tt.String(), ice.String())
if tt == schemaWithGeo {
geomField := sc.Field(1)
assert.Equal(t, arrow.BinaryTypes.Binary, geomField.Type)
geogField := sc.Field(2)
assert.Equal(t, arrow.BinaryTypes.Binary, geogField.Type)
} else {
assert.True(t, tt.Equals(ice), tt.String(), ice.String())
}
}
}

Expand Down Expand Up @@ -585,3 +601,99 @@ func TestToRequestedSchema(t *testing.T) {

assert.True(t, array.RecordEqual(rec, rec2))
}

func TestToRequestedSchemaGeometryGeography(t *testing.T) {
mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
defer mem.AssertSize(t, 0)

// File schema (from Parquet) has BinaryType for geometry/geography
// This simulates what we get when reading from a Parquet file
fileSchema := arrow.NewSchema([]arrow.Field{
{
Name: "id", Type: arrow.PrimitiveTypes.Int32, Nullable: false,
Metadata: fieldIDMeta("1"),
},
{
Name: "location", Type: arrow.BinaryTypes.Binary, Nullable: true,
Metadata: fieldIDMeta("2"),
},
{
Name: "gps", Type: arrow.BinaryTypes.Binary, Nullable: true,
Metadata: fieldIDMeta("3"),
},
}, nil)

// Requested schema has GeometryType/GeographyType
requestedSchema := iceberg.NewSchema(1,
iceberg.NestedField{ID: 1, Name: "id", Type: iceberg.PrimitiveTypes.Int32, Required: true},
iceberg.NestedField{ID: 2, Name: "location", Type: iceberg.GeometryTypeOf("srid:3857"), Required: false},
iceberg.NestedField{ID: 3, Name: "gps", Type: iceberg.GeographyTypeOf("srid:4326", iceberg.EdgeAlgorithmKarney), Required: false},
)

// Create Arrow record with binary data (simulating WKB format)
// For testing, we'll use simple binary data
bldr := array.NewRecordBuilder(mem, fileSchema)
defer bldr.Release()

idBldr := bldr.Field(0).(*array.Int32Builder)
locationBldr := bldr.Field(1).(*array.BinaryBuilder)
gpsBldr := bldr.Field(2).(*array.BinaryBuilder)

// First row: non-null values
idBldr.Append(1)
locationBldr.Append([]byte{0x01, 0x02, 0x03, 0x04}) // Simulated WKB
gpsBldr.Append([]byte{0x05, 0x06, 0x07, 0x08}) // Simulated WKB

// Second row: null values
idBldr.Append(2)
locationBldr.AppendNull()
gpsBldr.AppendNull()

// Third row: more non-null values
idBldr.Append(3)
locationBldr.Append([]byte{0x09, 0x0A, 0x0B, 0x0C})
gpsBldr.Append([]byte{0x0D, 0x0E, 0x0F, 0x10})

rec := bldr.NewRecordBatch()
defer rec.Release()

// Convert file schema to Iceberg schema
fileIcebergSchema, err := table.ArrowSchemaToIceberg(fileSchema, false, nil)
require.NoError(t, err)

// Convert requested schema to Arrow schema
requestedArrowSchema, err := table.SchemaToArrowSchema(requestedSchema, nil, true, false)
require.NoError(t, err)

// Project to requested schema
projectedRec, err := table.ToRequestedSchema(context.Background(), requestedSchema, fileIcebergSchema, rec, true, true, false)
require.NoError(t, err, "ToRequestedSchema should succeed for BinaryType -> GeometryType/GeographyType")
defer projectedRec.Release()

// Verify the projected record has the correct schema
assert.Equal(t, requestedArrowSchema, projectedRec.Schema())

// Verify the data is preserved
assert.Equal(t, rec.NumRows(), projectedRec.NumRows())
assert.Equal(t, rec.NumCols(), projectedRec.NumCols())

// Verify binary data is preserved
projectedLocation := projectedRec.Column(1).(*array.Binary)
projectedGps := projectedRec.Column(2).(*array.Binary)

// Check first row
assert.False(t, projectedLocation.IsNull(0))
assert.False(t, projectedGps.IsNull(0))
assert.Equal(t, []byte{0x01, 0x02, 0x03, 0x04}, projectedLocation.Value(0))
assert.Equal(t, []byte{0x05, 0x06, 0x07, 0x08}, projectedGps.Value(0))

// Check second row (nulls)
assert.True(t, projectedLocation.IsNull(1))
assert.True(t, projectedGps.IsNull(1))

// Check third row
assert.False(t, projectedLocation.IsNull(2))
assert.False(t, projectedGps.IsNull(2))
assert.Equal(t, []byte{0x09, 0x0A, 0x0B, 0x0C}, projectedLocation.Value(2))
assert.Equal(t, []byte{0x0D, 0x0E, 0x0F, 0x10}, projectedGps.Value(2))
}
6 changes: 6 additions & 0 deletions table/substrait/substrait.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,13 @@
// Unknown types cannot be stored in data files and have no Substrait equivalent
// Returning nil indicates this type cannot be converted to Substrait
return nil
}

Check failure on line 171 in table/substrait/substrait.go

View workflow job for this annotation

GitHub Actions / ubuntu-latest go1.23.6

File is not properly formatted (gofumpt)

Check failure on line 171 in table/substrait/substrait.go

View workflow job for this annotation

GitHub Actions / macos-latest go1.24.9

File is not properly formatted (gofumpt)

Check failure on line 171 in table/substrait/substrait.go

View workflow job for this annotation

GitHub Actions / windows-latest go1.23.6

File is not properly formatted (gofumpt)

Check failure on line 171 in table/substrait/substrait.go

View workflow job for this annotation

GitHub Actions / windows-latest go1.24.9

File is not properly formatted (gofumpt)

Check failure on line 171 in table/substrait/substrait.go

View workflow job for this annotation

GitHub Actions / ubuntu-latest go1.24.9

File is not properly formatted (gofumpt)

Check failure on line 171 in table/substrait/substrait.go

View workflow job for this annotation

GitHub Actions / macos-latest go1.23.6

File is not properly formatted (gofumpt)
func (convertToSubstrait) VisitGeometry(iceberg.GeometryType) types.Type {
return &types.BinaryType{}
}
func (convertToSubstrait) VisitGeography(iceberg.GeographyType) types.Type {
return &types.BinaryType{}
}

var _ iceberg.SchemaVisitorPerPrimitiveType[types.Type] = (*convertToSubstrait)(nil)

Expand Down
5 changes: 5 additions & 0 deletions transforms.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,11 @@ func (t IdentityTransform) MarshalText() ([]byte, error) {
func (IdentityTransform) String() string { return "identity" }

func (IdentityTransform) CanTransform(t Type) bool {
// Exclude geometry, geography, and variant per spec
switch t.(type) {
case GeometryType, GeographyType:
return false
}
_, ok := t.(PrimitiveType)

return ok
Expand Down
Loading
Loading