Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
232 changes: 219 additions & 13 deletions crates/iceberg/src/arrow/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ use uuid::Uuid;

use crate::error::Result;
use crate::spec::{
Datum, ListType, MapType, NestedField, NestedFieldRef, PrimitiveLiteral, PrimitiveType, Schema,
SchemaVisitor, StructType, Type,
Datum, FIRST_FIELD_ID, ListType, MapType, NestedField, NestedFieldRef, PrimitiveLiteral,
PrimitiveType, Schema, SchemaVisitor, StructType, Type,
};
use crate::{Error, ErrorKind};

Expand Down Expand Up @@ -221,6 +221,19 @@ pub fn arrow_schema_to_schema(schema: &ArrowSchema) -> Result<Schema> {
visit_schema(schema, &mut visitor)
}

/// Convert Arrow schema to Iceberg schema with automatically assigned field IDs.
///
/// Unlike [`arrow_schema_to_schema`], this function does not require field IDs in the Arrow
/// schema metadata. Instead, it automatically assigns unique field IDs starting from 1,
/// following Iceberg's field ID assignment rules.
///
/// This is useful when converting Arrow schemas that don't originate from Iceberg tables,
/// such as schemas from DataFusion or other Arrow-based systems.
pub fn arrow_schema_to_schema_auto_assign_ids(schema: &ArrowSchema) -> Result<Schema> {
let mut visitor = ArrowSchemaConverter::new_with_field_ids_from(FIRST_FIELD_ID);
visit_schema(schema, &mut visitor)
}

/// Convert Arrow type to iceberg type.
pub fn arrow_type_to_type(ty: &DataType) -> Result<Type> {
let mut visitor = ArrowSchemaConverter::new();
Expand All @@ -229,7 +242,7 @@ pub fn arrow_type_to_type(ty: &DataType) -> Result<Type> {

const ARROW_FIELD_DOC_KEY: &str = "doc";

pub(super) fn get_field_id(field: &Field) -> Result<i32> {
pub(super) fn get_field_id_from_metadata(field: &Field) -> Result<i32> {
if let Some(value) = field.metadata().get(PARQUET_FIELD_ID_META_KEY) {
return value.parse::<i32>().map_err(|e| {
Error::new(
Expand All @@ -253,19 +266,54 @@ fn get_field_doc(field: &Field) -> Option<String> {
None
}

struct ArrowSchemaConverter;
struct ArrowSchemaConverter {
/// When set, the schema builder will reassign field IDs starting from this value
/// using level-order traversal (breadth-first).
reassign_field_ids_from: Option<i32>,
/// Generates unique placeholder IDs for fields before reassignment.
/// Required because `ReassignFieldIds` builds an old-to-new ID mapping
/// that expects unique input IDs.
temp_field_id_counter: i32,
}

impl ArrowSchemaConverter {
fn new() -> Self {
Self {}
Self {
reassign_field_ids_from: None,
temp_field_id_counter: 0,
}
}

fn new_with_field_ids_from(start_from: i32) -> Self {
Self {
reassign_field_ids_from: Some(start_from),
temp_field_id_counter: 0,
}
}

fn get_field_id(&mut self, field: &Field) -> Result<i32> {
if self.reassign_field_ids_from.is_some() {
// Field IDs will be reassigned by the schema builder.
// We need unique temporary IDs because ReassignFieldIds builds an
// old->new ID mapping that requires unique input IDs.
let temp_id = self.temp_field_id_counter;
self.temp_field_id_counter += 1;
Ok(temp_id)
} else {
get_field_id_from_metadata(field)
}
}

fn convert_fields(fields: &Fields, field_results: &[Type]) -> Result<Vec<NestedFieldRef>> {
fn convert_fields(
&mut self,
fields: &Fields,
field_results: &[Type],
) -> Result<Vec<NestedFieldRef>> {
let mut results = Vec::with_capacity(fields.len());
for i in 0..fields.len() {
let field = &fields[i];
let field_type = &field_results[i];
let id = get_field_id(field)?;
let id = self.get_field_id(field)?;
let doc = get_field_doc(field);
let nested_field = NestedField {
id,
Expand All @@ -287,13 +335,16 @@ impl ArrowSchemaVisitor for ArrowSchemaConverter {
type U = Schema;

fn schema(&mut self, schema: &ArrowSchema, values: Vec<Self::T>) -> Result<Self::U> {
let fields = Self::convert_fields(schema.fields(), &values)?;
let builder = Schema::builder().with_fields(fields);
let fields = self.convert_fields(schema.fields(), &values)?;
let mut builder = Schema::builder().with_fields(fields);
if let Some(start_from) = self.reassign_field_ids_from {
builder = builder.with_reassigned_field_ids(start_from)
}
builder.build()
}

fn r#struct(&mut self, fields: &Fields, results: Vec<Self::T>) -> Result<Self::T> {
let fields = Self::convert_fields(fields, &results)?;
let fields = self.convert_fields(fields, &results)?;
Ok(Type::Struct(StructType::new(fields)))
}

Expand All @@ -310,7 +361,7 @@ impl ArrowSchemaVisitor for ArrowSchemaConverter {
}
};

let id = get_field_id(element_field)?;
let id = self.get_field_id(element_field)?;
let doc = get_field_doc(element_field);
let mut element_field =
NestedField::list_element(id, value.clone(), !element_field.is_nullable());
Expand All @@ -335,15 +386,15 @@ impl ArrowSchemaVisitor for ArrowSchemaConverter {
let key_field = &fields[0];
let value_field = &fields[1];

let key_id = get_field_id(key_field)?;
let key_id = self.get_field_id(key_field)?;
let key_doc = get_field_doc(key_field);
let mut key_field = NestedField::map_key_element(key_id, key_value.clone());
if let Some(doc) = key_doc {
key_field = key_field.with_doc(doc);
}
let key_field = Arc::new(key_field);

let value_id = get_field_id(value_field)?;
let value_id = self.get_field_id(value_field)?;
let value_doc = get_field_doc(value_field);
let mut value_field = NestedField::map_value_element(
value_id,
Expand Down Expand Up @@ -1932,4 +1983,159 @@ mod tests {
assert_eq!(array.value(0), [66u8; 16]);
}
}

#[test]
fn test_arrow_schema_to_schema_with_field_id() {
// Create a complex Arrow schema without field ID metadata
// Including: primitives, list, nested struct, map, and nested list of structs
let arrow_schema = ArrowSchema::new(vec![
Field::new("id", DataType::Int64, false),
Field::new("name", DataType::Utf8, true),
Field::new("price", DataType::Decimal128(10, 2), false),
Field::new(
"created_at",
DataType::Timestamp(TimeUnit::Microsecond, Some("+00:00".into())),
true,
),
Field::new(
"tags",
DataType::List(Arc::new(Field::new("item", DataType::Utf8, true))),
true,
),
Field::new(
"address",
DataType::Struct(Fields::from(vec![
Field::new("street", DataType::Utf8, true),
Field::new("city", DataType::Utf8, false),
Field::new("zip", DataType::Int32, true),
])),
true,
),
Field::new(
"attributes",
DataType::Map(
Arc::new(Field::new(
DEFAULT_MAP_FIELD_NAME,
DataType::Struct(Fields::from(vec![
Field::new("key", DataType::Utf8, false),
Field::new("value", DataType::Utf8, true),
])),
false,
)),
false,
),
true,
),
Field::new(
"orders",
DataType::List(Arc::new(Field::new(
"element",
DataType::Struct(Fields::from(vec![
Field::new("order_id", DataType::Int64, false),
Field::new("amount", DataType::Float64, false),
])),
true,
))),
true,
),
]);

let schema = arrow_schema_to_schema_auto_assign_ids(&arrow_schema).unwrap();

// Build expected schema with exact field IDs following level-order assignment:
// Level 0: id=1, name=2, price=3, created_at=4, tags=5, address=6, attributes=7, orders=8
// Level 1: tags.element=9, address.{street=10,city=11,zip=12}, attributes.{key=13,value=14}, orders.element=15
// Level 2: orders.element.{order_id=16,amount=17}
let expected = Schema::builder()
.with_fields(vec![
NestedField::required(1, "id", Type::Primitive(PrimitiveType::Long)).into(),
NestedField::optional(2, "name", Type::Primitive(PrimitiveType::String)).into(),
NestedField::required(
3,
"price",
Type::Primitive(PrimitiveType::Decimal {
precision: 10,
scale: 2,
}),
)
.into(),
NestedField::optional(4, "created_at", Type::Primitive(PrimitiveType::Timestamptz))
.into(),
NestedField::optional(
5,
"tags",
Type::List(ListType {
element_field: NestedField::list_element(
9,
Type::Primitive(PrimitiveType::String),
false,
)
.into(),
}),
)
.into(),
NestedField::optional(
6,
"address",
Type::Struct(StructType::new(vec![
NestedField::optional(10, "street", Type::Primitive(PrimitiveType::String))
.into(),
NestedField::required(11, "city", Type::Primitive(PrimitiveType::String))
.into(),
NestedField::optional(12, "zip", Type::Primitive(PrimitiveType::Int))
.into(),
])),
)
.into(),
NestedField::optional(
7,
"attributes",
Type::Map(MapType {
key_field: NestedField::map_key_element(
13,
Type::Primitive(PrimitiveType::String),
)
.into(),
value_field: NestedField::map_value_element(
14,
Type::Primitive(PrimitiveType::String),
false,
)
.into(),
}),
)
.into(),
NestedField::optional(
8,
"orders",
Type::List(ListType {
element_field: NestedField::list_element(
15,
Type::Struct(StructType::new(vec![
NestedField::required(
16,
"order_id",
Type::Primitive(PrimitiveType::Long),
)
.into(),
NestedField::required(
17,
"amount",
Type::Primitive(PrimitiveType::Double),
)
.into(),
])),
false,
)
.into(),
}),
)
.into(),
])
.build()
.unwrap();

pretty_assertions::assert_eq!(schema, expected);
assert_eq!(schema.highest_field_id(), 17);
}
}
4 changes: 2 additions & 2 deletions crates/iceberg/src/arrow/value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use arrow_buffer::NullBuffer;
use arrow_schema::{DataType, FieldRef};
use uuid::Uuid;

use super::get_field_id;
use super::get_field_id_from_metadata;
use crate::spec::{
ListType, Literal, Map, MapType, NestedField, PartnerAccessor, PrimitiveLiteral, PrimitiveType,
SchemaWithPartnerVisitor, Struct, StructType, Type, visit_struct_with_partner,
Expand Down Expand Up @@ -450,7 +450,7 @@ impl FieldMatchMode {
/// Determines if an Arrow field matches an Iceberg field based on the matching mode.
pub fn match_field(&self, arrow_field: &FieldRef, iceberg_field: &NestedField) -> bool {
match self {
FieldMatchMode::Id => get_field_id(arrow_field)
FieldMatchMode::Id => get_field_id_from_metadata(arrow_field)
.map(|id| id == iceberg_field.id)
.unwrap_or(false),
FieldMatchMode::Name => arrow_field.name() == &iceberg_field.name,
Expand Down
1 change: 1 addition & 0 deletions crates/iceberg/src/spec/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ pub use snapshot_summary::*;
pub use sort::*;
pub use statistic_file::*;
pub use table_metadata::*;
pub(crate) use table_metadata_builder::FIRST_FIELD_ID;
pub use table_properties::*;
pub use transform::*;
pub use values::*;
Expand Down
Loading
Loading