Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions src/iceberg/partition_spec.cc
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,42 @@ Status PartitionSpec::Validate(const Schema& schema, bool allow_missing_fields)
return {};
}

Status PartitionSpec::ValidatePartitionName(const Schema& schema) const {
std::unordered_set<std::string> partition_names;
for (const auto& partition_field : fields_) {
auto name = std::string(partition_field.name());
if (name.empty()) {
return InvalidArgument("Cannot use empty partition name: {}", name);
}
if (partition_names.contains(name)) {
return InvalidArgument("Cannot use partition name more than once: {}", name);
}
partition_names.insert(name);

ICEBERG_ASSIGN_OR_RAISE(auto schema_field, schema.FindFieldByName(name));
auto transform_type = partition_field.transform()->transform_type();
if (transform_type == TransformType::kIdentity) {
// for identity transform case we allow conflicts between partition and schema field
// name as long as they are sourced from the same schema field
if (schema_field.has_value() &&
schema_field.value().get().field_id() != partition_field.source_id()) {
return InvalidArgument(
"Cannot create identity partition sourced from different field in schema: {}",
name);
}
} else {
// for all other transforms we don't allow conflicts between partition name and
// schema field name
if (schema_field.has_value()) {
return InvalidArgument(
"Cannot create partition from name that exists in schema: {}", name);
}
}
}

return {};
}

Result<std::vector<std::reference_wrapper<const PartitionField>>>
PartitionSpec::GetFieldsBySourceId(int32_t source_id) const {
ICEBERG_ASSIGN_OR_RAISE(auto source_id_to_fields, source_id_to_fields_.Get(*this));
Expand Down
4 changes: 4 additions & 0 deletions src/iceberg/partition_spec.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@ class ICEBERG_EXPORT PartitionSpec : public util::Formattable {
/// \return Error status if the partition spec is invalid.
Status Validate(const Schema& schema, bool allow_missing_fields) const;

// \brief Validates the partition field names are unique within the partition spec and
// schema.
Status ValidatePartitionName(const Schema& schema) const;

/// \brief Get the partition fields by source ID.
/// \param source_id The id of the source field.
/// \return The partition fields by source ID, or NotFound if the source field is not
Expand Down
40 changes: 40 additions & 0 deletions src/iceberg/schema.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include "iceberg/util/macros.h"
#include "iceberg/util/type_util.h"
#include "iceberg/util/visit_type.h"
#include "table_metadata.h"

namespace iceberg {

Expand Down Expand Up @@ -228,4 +229,43 @@ Result<std::vector<std::string>> Schema::IdentifierFieldNames() const {
return names;
}

Result<int32_t> Schema::HighestFieldId() const {
ICEBERG_ASSIGN_OR_RAISE(auto id_to_field, id_to_field_.Get(*this));

if (id_to_field.get().empty()) {
return kInitialColumnId;
}

auto max_it = std::ranges::max_element(
id_to_field.get(),
[](const auto& lhs, const auto& rhs) { return lhs.first < rhs.first; });

return max_it->first;
}

bool Schema::SameSchema(const Schema& other) const { return fields_ == other.fields_; }

Status Schema::Validate(int32_t format_version) const {
// Get all fields including nested ones
ICEBERG_ASSIGN_OR_RAISE(auto id_to_field, id_to_field_.Get(*this));

// Check each field's type and defaults
for (const auto& [field_id, field_ref] : id_to_field.get()) {
const auto& field = field_ref.get();

// Check if the field's type requires a minimum format version
if (auto it = TableMetadata::kMinFormatVersions.find(field.type()->type_id());
it != TableMetadata::kMinFormatVersions.end()) {
if (int32_t min_format_version = it->second; format_version < min_format_version) {
return InvalidSchema("Invalid type for {}: {} is not supported until v{}",
field.name(), *field.type(), min_format_version);
}
}

// TODO(GuoTao.yu): Check default values when they are supported
}

return {};
}

} // namespace iceberg
18 changes: 18 additions & 0 deletions src/iceberg/schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ namespace iceberg {
class ICEBERG_EXPORT Schema : public StructType {
public:
static constexpr int32_t kInitialSchemaId = 0;
static constexpr int32_t kInitialColumnId = 0;
static constexpr int32_t kInvalidColumnId = -1;

explicit Schema(std::vector<SchemaField> fields,
Expand Down Expand Up @@ -127,6 +128,23 @@ class ICEBERG_EXPORT Schema : public StructType {
/// \brief Return the canonical field names of the identifier fields.
Result<std::vector<std::string>> IdentifierFieldNames() const;

/// \brief Get the highest field ID in the schema.
/// \return The highest field ID.
Result<int32_t> HighestFieldId() const;

/// \brief Checks whether this schema is equivalent to another schema while ignoring the
/// schema id.
bool SameSchema(const Schema& other) const;

/// \brief Validate the schema for a given format version.
///
/// This validates that the schema does not contain types that were released in later
/// format versions.
///
/// \param format_version The format version to validate against.
/// \return Error status if the schema is invalid.
Status Validate(int32_t format_version) const;

friend bool operator==(const Schema& lhs, const Schema& rhs) { return lhs.Equals(rhs); }

private:
Expand Down
Loading
Loading