Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
227 changes: 220 additions & 7 deletions crates/iceberg/src/expr/visitors/row_group_metrics_evaluator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -505,13 +505,54 @@ impl BoundPredicateVisitor for RowGroupMetricsEvaluator<'_> {

fn not_in(
&mut self,
_reference: &BoundReference,
_literals: &FnvHashSet<Datum>,
reference: &BoundReference,
literals: &FnvHashSet<Datum>,
_predicate: &BoundPredicate,
) -> Result<bool> {
// Because the bounds are not necessarily a min or max value,
// this cannot be answered using them. notIn(col, {X, ...})
// with (X, Y) doesn't guarantee that X is a value in col.
let field_id = reference.field().id;

// If column contains only nulls, NOT IN might match (NULL NOT IN (...) evaluates to NULL,
// but rows with nulls may still be returned depending on query semantics)
if self.contains_nulls_only(field_id) {
return ROW_GROUP_MIGHT_MATCH;
}

// Skip evaluation if too many literals
if literals.len() > IN_PREDICATE_LIMIT {
return ROW_GROUP_MIGHT_MATCH;
}

// Get both bounds - we need them to check if all values are identical
let lower_bound = self.min_value(field_id)?;
let upper_bound = self.max_value(field_id)?;

// Check for NaN bounds which indicate unreliable statistics
if let Some(ref lb) = lower_bound {
if lb.is_nan() {
return ROW_GROUP_MIGHT_MATCH;
}
}
if let Some(ref ub) = upper_bound {
if ub.is_nan() {
return ROW_GROUP_MIGHT_MATCH;
}
}

// If we have both bounds and they are equal, all values in the row group are identical.
// In this case, if that single value is in the exclusion set, no rows can match NOT IN.
if let (Some(ref lb), Some(ref ub)) = (&lower_bound, &upper_bound) {
if lb == ub {
// All values in this row group are equal to this single value
if literals.contains(lb) {
// The only value in this row group is in the exclusion set,
// so NOT IN cannot match any non-null rows
return ROW_GROUP_CANT_MATCH;
}
}
}

// For general cases, we cannot definitively exclude the row group using only min/max bounds.
// notIn(col, {X, ...}) with bounds (min, max) doesn't tell us if X is actually present.
ROW_GROUP_MIGHT_MATCH
}
}
Expand Down Expand Up @@ -1806,13 +1847,50 @@ mod tests {
}

#[test]
fn eval_true_for_not_in() -> Result<()> {
fn eval_false_for_not_in_when_all_values_match_exclusion() -> Result<()> {
// When min == max and that value is in the exclusion set,
// NOT IN should return false (can't match)
let row_group_metadata = create_row_group_metadata(
1,
1,
None,
1,
Some(Statistics::byte_array(
Some(ByteArray::from("iceberg".as_bytes())),
Some(ByteArray::from("iceberg".as_bytes())),
None,
Some(0),
false,
)),
)?;

let (iceberg_schema_ref, field_id_map) = build_iceberg_schema_and_field_map()?;

let filter = Reference::new("col_string")
.is_not_in([Datum::string("iceberg")])
.bind(iceberg_schema_ref.clone(), false)?;

let result = RowGroupMetricsEvaluator::eval(
&filter,
&row_group_metadata,
&field_id_map,
iceberg_schema_ref.as_ref(),
)?;

// All values are "iceberg" and it's in the exclusion set, so no rows can match
assert!(!result);
Ok(())
}

#[test]
fn eval_true_for_not_in_when_all_values_not_in_exclusion() -> Result<()> {
// When min == max but that value is NOT in the exclusion set,
// NOT IN should return true (might match)
let row_group_metadata = create_row_group_metadata(
1,
1,
None,
1,
// Max val of 0xFF is not valid utf8
Some(Statistics::byte_array(
Some(ByteArray::from("iceberg".as_bytes())),
Some(ByteArray::from("iceberg".as_bytes())),
Expand All @@ -1824,6 +1902,41 @@ mod tests {

let (iceberg_schema_ref, field_id_map) = build_iceberg_schema_and_field_map()?;

let filter = Reference::new("col_string")
.is_not_in([Datum::string("spark"), Datum::string("flink")])
.bind(iceberg_schema_ref.clone(), false)?;

let result = RowGroupMetricsEvaluator::eval(
&filter,
&row_group_metadata,
&field_id_map,
iceberg_schema_ref.as_ref(),
)?;

// All values are "iceberg" which is not in {spark, flink}, so rows might match
assert!(result);
Ok(())
}

#[test]
fn eval_true_for_not_in_with_different_min_max() -> Result<()> {
// When min != max, NOT IN should conservatively return true
let row_group_metadata = create_row_group_metadata(
1,
1,
None,
1,
Some(Statistics::byte_array(
Some(ByteArray::from("a".as_bytes())),
Some(ByteArray::from("z".as_bytes())),
None,
Some(0),
false,
)),
)?;

let (iceberg_schema_ref, field_id_map) = build_iceberg_schema_and_field_map()?;

let filter = Reference::new("col_string")
.is_not_in([Datum::string("iceberg")])
.bind(iceberg_schema_ref.clone(), false)?;
Expand All @@ -1835,6 +1948,106 @@ mod tests {
iceberg_schema_ref.as_ref(),
)?;

// min != max, so we can't be sure all values are "iceberg"
assert!(result);
Ok(())
}

#[test]
fn eval_true_for_not_in_with_nulls_only() -> Result<()> {
// When column contains only nulls, NOT IN should return true
let row_group_metadata = create_row_group_metadata(
1,
1,
None,
1,
Some(Statistics::byte_array(None, None, None, Some(1), false)),
)?;

let (iceberg_schema_ref, field_id_map) = build_iceberg_schema_and_field_map()?;

let filter = Reference::new("col_string")
.is_not_in([Datum::string("iceberg")])
.bind(iceberg_schema_ref.clone(), false)?;

let result = RowGroupMetricsEvaluator::eval(
&filter,
&row_group_metadata,
&field_id_map,
iceberg_schema_ref.as_ref(),
)?;

// Column has only nulls, so might match (NULL NOT IN evaluates to NULL)
assert!(result);
Ok(())
}

#[test]
fn eval_false_for_not_in_float_when_all_values_match() -> Result<()> {
// Test NOT IN with float type
let row_group_metadata = create_row_group_metadata(
1,
1,
Some(Statistics::float(
Some(1.0),
Some(1.0),
None,
Some(0),
false,
)),
1,
None,
)?;

let (iceberg_schema_ref, field_id_map) = build_iceberg_schema_and_field_map()?;

let filter = Reference::new("col_float")
.is_not_in([Datum::float(1.0), Datum::float(2.0)])
.bind(iceberg_schema_ref.clone(), false)?;

let result = RowGroupMetricsEvaluator::eval(
&filter,
&row_group_metadata,
&field_id_map,
iceberg_schema_ref.as_ref(),
)?;

// All values are 1.0 which is in the exclusion set
assert!(!result);
Ok(())
}

#[test]
fn eval_true_for_not_in_with_nan_bounds() -> Result<()> {
// When bounds contain NaN, should return true (unreliable stats)
let row_group_metadata = create_row_group_metadata(
1,
1,
Some(Statistics::float(
Some(f32::NAN),
Some(f32::NAN),
None,
Some(0),
false,
)),
1,
None,
)?;

let (iceberg_schema_ref, field_id_map) = build_iceberg_schema_and_field_map()?;

let filter = Reference::new("col_float")
.is_not_in([Datum::float(1.0)])
.bind(iceberg_schema_ref.clone(), false)?;

let result = RowGroupMetricsEvaluator::eval(
&filter,
&row_group_metadata,
&field_id_map,
iceberg_schema_ref.as_ref(),
)?;

// NaN bounds are unreliable, so conservatively return true
assert!(result);
Ok(())
}
Expand Down
Loading