diff --git a/include/concurrency.hpp b/include/concurrency.hpp index 2cfc97d..91f7bcf 100644 --- a/include/concurrency.hpp +++ b/include/concurrency.hpp @@ -21,6 +21,61 @@ namespace tundradb { */ template class ConcurrentSet { + public: + /** + * @brief Unsafe view for direct iteration over the underlying + * concurrent_hash_map + * + * WARNING: This view is NOT thread-safe. Use only when you can guarantee + * no concurrent modifications are happening. + */ + class LockedView { + private: + const tbb::concurrent_hash_map& data_; + + public: + explicit LockedView(const tbb::concurrent_hash_map& data) + : data_(data) {} + + // Iterator that extracts keys from the concurrent_hash_map + class iterator { + private: + typename tbb::concurrent_hash_map::const_iterator it_; + + public: + using iterator_category = std::forward_iterator_tag; + using value_type = T; + using difference_type = std::ptrdiff_t; + using pointer = const T*; + using reference = const T&; + + explicit iterator( + typename tbb::concurrent_hash_map::const_iterator + it) + : it_(it) {} + + reference operator*() const { return it_->first; } + pointer operator->() const { return &(it_->first); } + + iterator& operator++() { + ++it_; + return *this; + } + iterator operator++(int) { + iterator tmp = *this; + ++(*this); + return tmp; + } + + bool operator==(const iterator& other) const { return it_ == other.it_; } + bool operator!=(const iterator& other) const { return it_ != other.it_; } + }; + + iterator begin() const { return iterator(data_.begin()); } + iterator end() const { return iterator(data_.end()); } + size_t size() const { return data_.size(); } + }; + private: tbb::concurrent_hash_map data_; mutable std::shared_mutex mutex_; // Read-write mutex for synchronization @@ -94,6 +149,18 @@ class ConcurrentSet { return snapshot; } + /** + * @brief Get an unsafe view for direct iteration + * + * WARNING: This is NOT thread-safe! Use only when you can guarantee + * no concurrent modifications are happening. This avoids the expensive + * copy operation of get_all() for performance-critical code paths. + * + * Returns a view that provides begin()/end() iterators directly over + * the underlying concurrent_hash_map keys. + */ + LockedView get_all_unsafe() const { return LockedView(data_); } + /** * @brief Clear all elements from the set * diff --git a/include/edge_store.hpp b/include/edge_store.hpp index 78f0dd1..136c025 100644 --- a/include/edge_store.hpp +++ b/include/edge_store.hpp @@ -81,6 +81,11 @@ class EdgeStore { std::vector> get(const std::set &ids) const; + // Template overload for any iterable container (including + // ConcurrentSet::LockedView) + template + std::vector> get(const Container &ids) const; + int64_t get_count_by_type(const std::string &type) const { if (auto res = get_by_type(type); res.ok()) { return res.ValueOrDie().size(); diff --git a/include/node.hpp b/include/node.hpp index e43f826..1e824ef 100644 --- a/include/node.hpp +++ b/include/node.hpp @@ -58,6 +58,12 @@ class Node { return arrow::Status::NotImplemented(""); } + [[nodiscard]] ValueRef get_value_ref( + const std::shared_ptr &field) const { + const char *ptr = arena_->get_field_value_ptr(*handle_, layout_, field); + return {ptr, field->type()}; + } + [[deprecated]] arrow::Result get_value(const std::string &field) const { log_warn("get_value by string is deprecated"); diff --git a/include/types.hpp b/include/types.hpp index e1be9b4..679e8a8 100644 --- a/include/types.hpp +++ b/include/types.hpp @@ -270,6 +270,180 @@ class Value { data_; }; +struct ValueRef { + const char* data; + ValueType type; + + ValueRef() : data(nullptr), type(ValueType::NA) {} + + explicit ValueRef(ValueType type) : data(nullptr), type(type) {} + + ValueRef(const char* ptr, ValueType type) : data(ptr), type(type) {} + + [[nodiscard]] int32_t as_int32() const { + return *reinterpret_cast(data); + } + + [[nodiscard]] int64_t as_int64() const { + return *reinterpret_cast(data); + } + + [[nodiscard]] double as_double() const { + return *reinterpret_cast(data); + } + + [[nodiscard]] float as_float() const { + return *reinterpret_cast(data); + } + + [[nodiscard]] bool as_bool() const { + return *reinterpret_cast(data); + } + + [[nodiscard]] std::string as_string() const { return std::string(data); } + + [[nodiscard]] const StringRef& as_string_ref() const { + return *reinterpret_cast(data); + } + + arrow::Result> as_scalar() const { + switch (type) { + case ValueType::INT32: + return arrow::MakeScalar(as_int32()); + case ValueType::INT64: + return arrow::MakeScalar(as_int64()); + case ValueType::DOUBLE: + return arrow::MakeScalar(as_double()); + case ValueType::STRING: + return arrow::MakeScalar(as_string_ref().to_string()); + case ValueType::BOOL: + return arrow::MakeScalar(as_bool()); + case ValueType::NA: + return arrow::MakeNullScalar(arrow::null()); + default: + return arrow::Status::NotImplemented( + "Unsupported Value type for Arrow scalar conversion: ", + to_string(type)); + } + } + + bool operator==(const ValueRef& other) const { + if (type != other.type) { + std::cout << "different types. this: " << to_string(type) + << ", other: " << to_string(other.type) << std::endl; + return false; + } + + // Both null + if (data == nullptr && other.data == nullptr) { + return true; + } + + // One null, one not null + if (data == nullptr || other.data == nullptr) { + return false; + } + + // Compare values based on type + switch (type) { + case ValueType::NA: + return true; // Both are NA + + case ValueType::INT32: + return *reinterpret_cast(data) == + *reinterpret_cast(other.data); + + case ValueType::INT64: + return *reinterpret_cast(data) == + *reinterpret_cast(other.data); + + case ValueType::FLOAT: + return *reinterpret_cast(data) == + *reinterpret_cast(other.data); + + case ValueType::DOUBLE: + return *reinterpret_cast(data) == + *reinterpret_cast(other.data); + + case ValueType::BOOL: + return *reinterpret_cast(data) == + *reinterpret_cast(other.data); + + case ValueType::STRING: { + const StringRef& str1 = *reinterpret_cast(data); + const StringRef& str2 = *reinterpret_cast(other.data); + + // Compare string lengths first + if (str1.length != str2.length) { + return false; + } + + // Both null strings + if (str1.is_null() && str2.is_null()) { + return true; + } + + // One null, one not + if (str1.is_null() || str2.is_null()) { + return false; + } + + // Compare string content + return std::memcmp(str1.data, str2.data, str1.length) == 0; + } + + default: + return false; // Unknown type + } + } + + bool operator!=(const ValueRef& other) const { return !(*this == other); } + + [[nodiscard]] bool equals(const ValueRef& other) const { + return *this == other; + } + + // todo rename + std::string ToString() const { + if (data == nullptr) { + return "NULL"; + } + + switch (type) { + case ValueType::NA: + return "NULL"; + + case ValueType::INT32: + return std::to_string(as_int32()); + + case ValueType::INT64: + return std::to_string(as_int64()); + + case ValueType::FLOAT: + return std::to_string(as_float()); + + case ValueType::DOUBLE: + return std::to_string(as_double()); + + case ValueType::BOOL: + return as_bool() ? "true" : "false"; + + case ValueType::FIXED_STRING16: + case ValueType::FIXED_STRING32: + case ValueType::FIXED_STRING64: + case ValueType::STRING: { + const StringRef& str_ref = as_string_ref(); + if (str_ref.is_null()) { + return "NULL"; + } + return "\"" + str_ref.to_string() + "\""; + } + default: + return "UNKNOWN_TYPE"; + } + } +}; + // Stream operator for ValueType inline std::ostream& operator<<(std::ostream& os, const ValueType type) { return os << to_string(type); diff --git a/src/core.cpp b/src/core.cpp index d46ce87..cc044d8 100644 --- a/src/core.cpp +++ b/src/core.cpp @@ -421,6 +421,15 @@ struct QueryState { std::unordered_map aliases; // Precomputed fully-qualified field names per alias (SchemaRef::value()) llvm::StringMap> fq_field_names; + + // Field index optimization: replace string-based field lookups with integer + // indices + llvm::StringMap> + schema_field_indices; // "User" -> [0, 1, 2], "Company -> [3,4,5]" + llvm::SmallDenseMap + field_id_to_name; // 0 -> "user.name" + std::atomic next_field_id{0}; // Global field ID counter + llvm::StringMap< llvm::DenseMap>> connections; // outgoing @@ -433,8 +442,11 @@ struct QueryState { arrow::Result resolve_schema(const SchemaRef& schema_ref) { // todo we need to separate functions: assign alias , resolve if (aliases.contains(schema_ref.value()) && schema_ref.is_declaration()) { - log_warn("duplicated schema alias '" + schema_ref.value() + - "' already assigned to '" + aliases[schema_ref.value()] + "'"); + IF_DEBUG_ENABLED { + log_debug("duplicated schema alias '" + schema_ref.value() + + "' already assigned to '" + aliases[schema_ref.value()] + + "'"); + } return aliases[schema_ref.value()]; } if (schema_ref.is_declaration()) { @@ -468,11 +480,21 @@ struct QueryState { } const auto schema = schema_res.ValueOrDie(); std::vector names; + std::vector indices; names.reserve(schema->num_fields()); + indices.reserve(schema->num_fields()); + for (const auto& f : schema->fields()) { - names.emplace_back(alias + "." + f->name()); + std::string fq_name = alias + "." + f->name(); + int field_id = next_field_id.fetch_add(1); + + names.emplace_back(fq_name); + indices.emplace_back(field_id); + field_id_to_name[field_id] = fq_name; } + fq_field_names[alias] = std::move(names); + schema_field_indices[alias] = std::move(indices); return true; } @@ -650,14 +672,27 @@ arrow::Result> build_denormalized_schema( } struct PathSegment { - std::string schema; + uint16_t schema_tag; // Optimized: use 16-bit tag instead of string + std::string schema; // Keep for compatibility/debugging int64_t node_id; + // Constructor with both tag and schema for performance + PathSegment(uint16_t tag, const std::string& schema_name, int64_t id) + : schema_tag(tag), schema(schema_name), node_id(id) {} + + // Legacy constructor for compatibility + PathSegment(const std::string& schema_name, int64_t id) + : schema_tag(0), schema(schema_name), node_id(id) {} + std::string toString() const { return schema + ":" + std::to_string(node_id); } bool operator==(const PathSegment& other) const { + // Fast path: compare tags first, then fallback to string comparison + if (schema_tag != 0 && other.schema_tag != 0) { + return schema_tag == other.schema_tag && node_id == other.node_id; + } return schema == other.schema && node_id == other.node_id; } }; @@ -686,91 +721,92 @@ std::string join_schema_path(const std::vector& schema_path) { struct Row { int64_t id; - std::unordered_map> cells; + std::vector cells; // Optimized: index-based field access std::vector path; + std::unordered_map ids; + bool ids_populated = false; - void set_cell(const std::string& name, - std::shared_ptr scalar) { - cells[name] = std::move(scalar); + // Optimized constructor that pre-allocates cells + explicit Row(size_t max_field_count = 64) : cells(max_field_count) {} + + // Optimized: set cell by field index + void set_cell(int field_id, ValueRef value_ref) { + if (field_id >= 0 && field_id < static_cast(cells.size())) { + cells[field_id] = value_ref; + } } - bool has_value(const std::string& name) const { - return cells.contains(name) && cells.at(name) != nullptr && - cells.at(name)->is_valid; + // Optimized: check value by field index + [[nodiscard]] bool has_value(int field_id) const { + return field_id >= 0 && field_id < static_cast(cells.size()) && + cells[field_id].data != nullptr; } - void set_cell_from_node(const std::vector& fq_field_names, + // Optimized: set cells from node using field indices + void set_cell_from_node(const std::vector& field_indices, const std::shared_ptr& node) { const auto& fields = node->get_schema()->fields(); - const size_t n = fields.size(); + const size_t n = std::min(fields.size(), field_indices.size()); for (size_t i = 0; i < n; ++i) { const auto& field = fields[i]; - const auto& full_name = fq_field_names[i]; - this->set_cell(full_name, node->get_value_ptr(field).ValueOrDie(), - field->type()); + int field_id = field_indices[i]; + this->set_cell(field_id, node->get_value_ref(field)); } } - // New set_cell method for Value objects - void set_cell(const std::string& name, const char* ptr, - const ValueType type) { - if (ptr) { - auto scalar_result = value_ptr_to_arrow_scalar(ptr, type); - if (scalar_result.ok()) { - cells[name] = scalar_result.ValueOrDie(); - return; - } - } - - // Default to null if value is null or conversion fails - cells[name] = nullptr; - } - - void set_cell(const std::string& name, std::shared_ptr array) { - if (array && array->length() > 0) { - auto scalar_result = array->GetScalar(0); - if (scalar_result.ok()) { - cells[name] = scalar_result.ValueOrDie(); - return; - } - } - - // Default to null if array is empty or conversion fails - cells[name] = nullptr; - } - bool start_with(const std::vector& prefix) const { return is_prefix(prefix, this->path); } - std::unordered_map extract_schema_ids() const { - std::unordered_map result; - for (const auto& [field_name, value] : cells) { - if (!value || !value->is_valid) continue; - + const std::unordered_map& extract_schema_ids( + const llvm::SmallDenseMap& field_id_to_name) { + if (ids_populated) { + return ids; + } + for (int i = 0; i < cells.size(); ++i) { + const auto& value = cells[i]; + if (!value.data) continue; + const auto& field_name = field_id_to_name.at(i); // Extract schema prefix (everything before the first dot) size_t dot_pos = field_name.find('.'); if (dot_pos != std::string::npos) { std::string schema = field_name.substr(0, dot_pos); - // Store ID for this schema if it's an ID field if (field_name.substr(dot_pos + 1) == "id") { - auto id_scalar = std::static_pointer_cast(value); - result[schema] = id_scalar->value; + ids[schema] = value.as_int64(); } } } - return result; + return ids; } // returns new Row which is result of merging this row and other - [[nodiscard]] Row merge(const Row& other) const { - Row merged = *this; - for (const auto& [name, value] : other.cells) { - if (!merged.has_value(name)) { - merged.cells[name] = value; + [[nodiscard]] std::shared_ptr merge( + const std::shared_ptr& other) const { + std::shared_ptr merged = + std::make_shared(*this); // Copy needed for merge result + IF_DEBUG_ENABLED { + log_debug("Row::merge() - this: {}", this->ToString()); + log_debug("Row::merge() - other: {}", other->ToString()); + } + + for (int i = 0; i < other->cells.size(); ++i) { + if (!merged->has_value(i)) { + IF_DEBUG_ENABLED { + log_debug("Row::merge() - adding field '{}' with value: {}", i, + cells[i].ToString()); + } + merged->cells[i] = other->cells[i]; + } else { + IF_DEBUG_ENABLED { + log_debug("Row::merge() - skipping field '{}' (already has value)", + i); + } } } + IF_DEBUG_ENABLED { + log_debug("Row::merge() - result: {}", merged->ToString()); + } return merged; } @@ -780,45 +816,40 @@ struct Row { ss << "path='" << join_schema_path(path) << "', "; bool first = true; - for (const auto& [field_name, scalar] : cells) { + for (int i = 0; i < cells.size(); i++) { if (!first) { ss << ", "; } first = false; - ss << field_name << ": "; - - if (!scalar) { + ss << i << ": "; + const auto value_ref = cells[i]; + if (!value_ref.data) { ss << "NULL"; - } else if (scalar->is_valid) { + } else { // Handle different scalar types appropriately - switch (scalar->type->id()) { - case arrow::Type::INT64: - ss << std::static_pointer_cast(scalar)->value; + switch (value_ref.type) { + case ValueType::INT64: + ss << value_ref.as_int64(); + break; + case ValueType::INT32: + ss << value_ref.as_int32(); break; - case arrow::Type::DOUBLE: - ss << std::static_pointer_cast(scalar)->value; + case ValueType::DOUBLE: + ss << value_ref.as_double(); break; - case arrow::Type::STRING: - case arrow::Type::LARGE_STRING: - ss << "\"" - << std::static_pointer_cast(scalar)->view() - << "\""; + case ValueType::STRING: + ss << "\"" << value_ref.as_string_ref().to_string() << "\""; break; - case arrow::Type::BOOL: - ss << (std::static_pointer_cast(scalar)->value - ? "true" - : "false"); + case ValueType::BOOL: + ss << (value_ref.as_bool() ? "true" : "false"); break; default: - ss << scalar->ToString(); + ss << "unknown"; break; } - } else { - ss << "NULL"; } } - ss << "}"; return ss.str(); } @@ -826,19 +857,9 @@ struct Row { static Row create_empty_row_from_schema( const std::shared_ptr& final_output_schema) { - Row new_row; - for (const auto& field : final_output_schema->fields()) { - // Create a null scalar of the correct type - auto null_scalar = arrow::MakeNullScalar(field->type()); - if (null_scalar != nullptr) { - new_row.cells[field->name()] = null_scalar; - } else { - // If creating a null scalar fails, use nullptr as a fallback - new_row.cells[field->name()] = nullptr; - log_warn("Failed to create null scalar for field '{}' with type '{}'", - field->name(), field->type()->ToString()); - } - } + Row new_row(final_output_schema->num_fields() + + 32); // Pre-allocate with some extra space + new_row.id = -1; return new_row; } @@ -856,14 +877,14 @@ std::vector get_child_rows(const Row& parent, } struct RowNode { - std::optional row; + std::optional> row; int depth; PathSegment path_segment; std::vector> children; RowNode() : depth(0), path_segment{"", -1} {} - RowNode(std::optional r, int d, + RowNode(std::optional> r, int d, std::vector> c = {}) : row(std::move(r)), depth(d), @@ -872,14 +893,15 @@ struct RowNode { bool leaf() const { return row.has_value(); } - void insert_row_dfs(size_t path_idx, const Row& new_row) { - if (path_idx == new_row.path.size()) { - this->row = new_row; + void insert_row_dfs(size_t path_idx, const std::shared_ptr& new_row) { + if (path_idx == new_row->path.size()) { + this->row = + new_row; // Share the same Row - no copy needed in tree insertion return; } for (const auto& n : children) { - if (n->path_segment == new_row.path[path_idx]) { + if (n->path_segment == new_row->path[path_idx]) { n->insert_row_dfs(path_idx + 1, new_row); return; } @@ -887,48 +909,81 @@ struct RowNode { auto new_node = std::make_unique(); new_node->depth = depth + 1; - new_node->path_segment = new_row.path[path_idx]; + new_node->path_segment = new_row->path[path_idx]; new_node->insert_row_dfs(path_idx + 1, new_row); children.emplace_back(std::move(new_node)); } - void insert_row(const Row& new_row) { insert_row_dfs(0, new_row); } + void insert_row(const std::shared_ptr& new_row) { + insert_row_dfs(0, new_row); + } - std::vector merge_rows() { + std::vector> merge_rows( + const llvm::SmallDenseMap& field_id_to_name) { if (this->leaf()) { return {this->row.value()}; } // collect all records from child node and group them by schema - std::unordered_map> grouped; + // Optimized: use schema tags instead of strings for faster grouping + llvm::SmallDenseMap>, 8> grouped; for (const auto& c : children) { - auto child_rows = c->merge_rows(); - grouped[c->path_segment.schema].insert( - grouped[c->path_segment.schema].end(), child_rows.begin(), - child_rows.end()); + auto child_rows = c->merge_rows(field_id_to_name); + IF_DEBUG_ENABLED { + log_debug("Child node {} returned {} rows", c->path_segment.toString(), + child_rows.size()); + for (size_t i = 0; i < child_rows.size(); ++i) { + log_debug(" Child row [{}]: {}", i, child_rows[i]->ToString()); + } + } + // Use schema_tag for fast integer-based grouping instead of string lookup + uint16_t tag = c->path_segment.schema_tag; + if (tag == 0) { + // Fallback: compute a simple hash of the schema string + tag = static_cast( + std::hash{}(c->path_segment.schema) & 0xFFFFu); + } + grouped[tag].insert(grouped[tag].end(), child_rows.begin(), + child_rows.end()); } - std::vector> groups_for_product; + std::vector>> groups_for_product; // Add this->row as its own group (that is important for cartesian product) // if it exists and has data, // to represent the node itself if it should be part of the product // independently. if (this->row.has_value()) { - Row node_self_row = this->row.value(); + std::shared_ptr node_self_row = std::make_shared( + *this->row.value()); // Create a copy like original // Normalize path for the node's own row to ensure it combines correctly // and doesn't carry a longer BFS path if it was a leaf of BFS. // i.e. current node path can be a:0->b:1->c:2 // this code sets it to 'c:2' - node_self_row.path = {this->path_segment}; + node_self_row->path = {this->path_segment}; + IF_DEBUG_ENABLED { + log_debug("Adding node self row: {}", node_self_row->ToString()); + } groups_for_product.push_back({node_self_row}); } for (const auto& pair : grouped) { if (!pair.second.empty()) { + IF_DEBUG_ENABLED { + log_debug("Adding group for schema '{}' with {} rows", pair.first, + pair.second.size()); + for (size_t i = 0; i < pair.second.size(); ++i) { + log_debug(" Group row [{}]: {}", i, pair.second[i]->ToString()); + } + } groups_for_product.push_back(pair.second); } } + IF_DEBUG_ENABLED { + log_debug("Total groups for Cartesian product: {}", + groups_for_product.size()); + } + if (groups_for_product.empty()) { return {}; } @@ -936,7 +991,8 @@ struct RowNode { // with data), no Cartesian product is needed. Just return its rows, but // ensure paths are correct. if (groups_for_product.size() == 1) { - std::vector single_group_rows = groups_for_product[0]; + std::vector> single_group_rows = + groups_for_product[0]; // Ensure path is normalized for these rows if they came from children // For rows that are just this->row.value(), path is already set. // This might be too aggressive if child rows are already fully merged @@ -945,9 +1001,28 @@ struct RowNode { return single_group_rows; } - std::vector final_merged_rows = groups_for_product.back(); + std::vector> final_merged_rows = + groups_for_product.back(); + IF_DEBUG_ENABLED { + log_debug("Starting Cartesian product with final group ({} rows)", + final_merged_rows.size()); + for (size_t i = 0; i < final_merged_rows.size(); ++i) { + log_debug(" Final group row [{}]: {}", i, + final_merged_rows[i]->ToString()); + } + } + for (int i = static_cast(groups_for_product.size()) - 2; i >= 0; --i) { - std::vector temp_product_accumulator; + IF_DEBUG_ENABLED { + log_debug("Processing group {} with {} rows", i, + groups_for_product[i].size()); + for (size_t j = 0; j < groups_for_product[i].size(); ++j) { + log_debug(" Current group row [{}]: {}", j, + groups_for_product[i][j]->ToString()); + } + } + + std::vector> temp_product_accumulator; for (const auto& r1_from_current_group : groups_for_product[i]) { for (const auto& r2_from_previous_product : final_merged_rows) { // Check for conflicts in shared variables between rows @@ -955,9 +1030,9 @@ struct RowNode { // Get variable prefixes (schema names) from cells std::unordered_map schema_ids_r1 = - r1_from_current_group.extract_schema_ids(); + r1_from_current_group->extract_schema_ids(field_id_to_name); std::unordered_map schema_ids_r2 = - r2_from_previous_product.extract_schema_ids(); + r2_from_previous_product->extract_schema_ids(field_id_to_name); // Check for conflicts - same schema name but different IDs for (const auto& [schema, id1] : schema_ids_r1) { @@ -977,20 +1052,19 @@ struct RowNode { // Additional cell-by-cell check for conflicts if (can_merge) { - for (const auto& [field_name, value1] : - r1_from_current_group.cells) { - if (!value1 || !value1->is_valid) continue; - - auto it = r2_from_previous_product.cells.find(field_name); - if (it != r2_from_previous_product.cells.end() && it->second && - it->second->is_valid) { + for (int field_index = 0; + field_index < r1_from_current_group->cells.size(); + ++field_index) { + if (r1_from_current_group->has_value(i) && + r2_from_previous_product->has_value(i)) { // Both rows have this field with non-null values - check if // they match - if (!value1->Equals(*(it->second))) { + if (!r1_from_current_group->cells[i].equals( + r2_from_previous_product->cells[i])) { IF_DEBUG_ENABLED { log_debug( "Conflict detected: Field '{}' has different values", - field_name); + field_id_to_name.at(i)); } can_merge = false; break; @@ -1000,12 +1074,19 @@ struct RowNode { } if (can_merge) { - Row merged_r = - r1_from_current_group.merge(r2_from_previous_product); + std::shared_ptr merged_r = + r1_from_current_group->merge(r2_from_previous_product); // Set the path of the newly merged row to the path of the current // RowNode - merged_r.path = {this->path_segment}; + merged_r->path = {this->path_segment}; + IF_DEBUG_ENABLED { + log_debug("Merged row: {}", merged_r->ToString()); + } temp_product_accumulator.push_back(merged_r); + } else { + IF_DEBUG_ENABLED { + log_debug("Cannot merge rows due to conflicts"); + } } } } @@ -1034,36 +1115,36 @@ struct RowNode { // Print Row if (row.has_value()) { ss << indent << " Path: "; - if (row.value().path.empty()) { + if (row.value()->path.empty()) { ss << "(empty)"; } else { - for (size_t i = 0; i < row.value().path.size(); ++i) { + for (size_t i = 0; i < row.value()->path.size(); ++i) { if (i > 0) ss << " → "; - ss << row.value().path[i].schema << ":" - << row.value().path[i].node_id; + ss << row.value()->path[i].schema << ":" + << row.value()->path[i].node_id; } } ss << "\n"; // Print key cell values (limited to avoid overwhelming output) ss << indent << " Cells: "; - if (row.value().cells.empty()) { + if (row.value()->cells.empty()) { ss << "(empty)"; } else { size_t count = 0; ss << "{ "; - for (const auto& [key, value] : row.value().cells) { + for (int i = 0; i < row.value()->cells.size(); i++) { if (count++ > 0) ss << ", "; if (count > 5) { // Limit display - ss << "... +" << (row.value().cells.size() - 5) << " more"; + ss << "... +" << (row.value()->cells.size() - 5) << " more"; break; } - ss << key << ": "; - if (!value) { + ss << i << ": "; + if (!row.value()->has_value(i)) { ss << "NULL"; } else { - ss << value->ToString(); // Assuming arrow::Scalar has ToString() + ss << row.value()->cells[i].ToString(); } } ss << " }"; @@ -1110,7 +1191,7 @@ struct QueueItem { QueueItem(int64_t id, const SchemaRef& schema, int l, std::shared_ptr r) : node_id(id), schema_ref(schema), level(l), row(std::move(r)) { - path.push_back(PathSegment{schema.value(), id}); + path.push_back(PathSegment{schema.tag(), schema.value(), id}); } }; @@ -1145,14 +1226,15 @@ void log_grouped_connections( } } -arrow::Result>> populate_rows_bfs( - int64_t node_id, const SchemaRef& start_schema, - const std::shared_ptr& output_schema, - const QueryState& query_state, llvm::DenseSet& global_visited) { +arrow::Result>>> +populate_rows_bfs(int64_t node_id, const SchemaRef& start_schema, + const std::shared_ptr& output_schema, + const QueryState& query_state, + llvm::DenseSet& global_visited) { IF_DEBUG_ENABLED { log_debug("populate_rows_bfs::node={}:{}", start_schema.value(), node_id); } - auto result = std::make_shared>(); + auto result = std::make_shared>>(); int64_t row_id_counter = 0; auto initial_row = std::make_shared(create_empty_row_from_schema(output_schema)); @@ -1168,8 +1250,8 @@ arrow::Result>> populate_rows_bfs( queue.pop(); auto node = query_state.node_manager->get_node(item.node_id).ValueOrDie(); const auto& it_fq = - query_state.fq_field_names.find(item.schema_ref.value()); - if (it_fq == query_state.fq_field_names.end()) { + query_state.schema_field_indices.find(item.schema_ref.value()); + if (it_fq == query_state.schema_field_indices.end()) { log_error("No fully-qualified field names for schema '{}'", item.schema_ref.value()); return arrow::Status::KeyError( @@ -1207,10 +1289,11 @@ arrow::Result>> populate_rows_bfs( if (grouped_connections.empty()) { // we've done if (!skip) { - auto r = *item.row; - r.path = item.path; - r.id = row_id_counter++; - IF_DEBUG_ENABLED { log_debug("add row: {}", r.ToString()); } + auto r = std::make_shared( + *item.row); // Copy needed: each result needs unique ID and path + r->path = item.path; + r->id = row_id_counter++; + IF_DEBUG_ENABLED { log_debug("add row: {}", r->ToString()); } result->push_back(r); } @@ -1237,8 +1320,8 @@ arrow::Result>> populate_rows_bfs( auto next = QueueItem(conn.target_id, conn.target, item.level + 1, next_row); next.path = item.path; - next.path.push_back( - PathSegment{conn.target.value(), conn.target_id}); + next.path.push_back(PathSegment{ + conn.target.tag(), conn.target.value(), conn.target_id}); IF_DEBUG_ENABLED { log_debug("create a new path {}, node={}", join_schema_path(next.path), conn.target_id); @@ -1251,28 +1334,32 @@ arrow::Result>> populate_rows_bfs( } } RowNode tree; - tree.path_segment = PathSegment{"root", -1}; + tree.path_segment = PathSegment{0, "root", -1}; // Use tag 0 for root for (const auto& r : *result) { - IF_DEBUG_ENABLED { log_debug("bfs result: {}", r.ToString()); } - tree.insert_row(r); + IF_DEBUG_ENABLED { log_debug("bfs result: {}", r->ToString()); } + // Copy needed: tree merge operations will modify rows, so each needs to be + // independent + auto r_copy = std::make_shared(*r); + tree.insert_row(r_copy); } IF_DEBUG_ENABLED { tree.print(); } - auto merged = tree.merge_rows(); + auto merged = tree.merge_rows(query_state.field_id_to_name); IF_DEBUG_ENABLED { for (const auto& row : merged) { - log_debug("merge result: {}", row.ToString()); + log_debug("merge result: {}", row->ToString()); } } - return std::make_shared>(merged); + return std::make_shared>>(merged); } // template -arrow::Result>> populate_batch_rows( - const llvm::DenseSet& node_ids, const SchemaRef& schema_ref, - const std::shared_ptr& output_schema, - const QueryState& query_state, const TraverseType join_type, - tbb::concurrent_unordered_set& global_visited) { - auto rows = std::make_shared>(); +arrow::Result>>> +populate_batch_rows(const llvm::DenseSet& node_ids, + const SchemaRef& schema_ref, + const std::shared_ptr& output_schema, + const QueryState& query_state, const TraverseType join_type, + tbb::concurrent_unordered_set& global_visited) { + auto rows = std::make_shared>>(); rows->reserve(node_ids.size()); llvm::DenseSet local_visited; // For INNER join: only process nodes that have connections @@ -1332,11 +1419,11 @@ std::vector> batch_node_ids( // process all schemas used in traverse // Phase 1: Process connected nodes // Phase 2: Handle outer joins for unmatched nodes -arrow::Result>> populate_rows( +arrow::Result>>> populate_rows( const ExecutionConfig& execution_config, const QueryState& query_state, const std::vector& traverses, const std::shared_ptr& output_schema) { - auto rows = std::make_shared>(); + auto rows = std::make_shared>>(); std::mutex rows_mtx; tbb::concurrent_unordered_set global_visited; @@ -1514,85 +1601,92 @@ arrow::Result> create_empty_table( } arrow::Result> create_table_from_rows( - const std::shared_ptr>& rows, - const std::shared_ptr& schema = nullptr) { - if (!rows || rows->empty()) { - if (schema == nullptr) { - return arrow::Status::Invalid("No rows provided to create table"); - } - return create_empty_table(schema); + const std::shared_ptr>>& rows, + const std::shared_ptr& output_schema) { + if (output_schema == nullptr) { + return arrow::Status::Invalid("output schema is null"); } - - std::shared_ptr output_schema; - - if (schema) { - // Use the provided schema - output_schema = schema; - } else { - // Get all field names from all rows to create a complete schema - std::set all_field_names; - for (const auto& row : *rows) { - for (const auto& field_name : row.cells | std::views::keys) { - all_field_names.insert(field_name); - } - } - - // Create schema from field names - std::vector> fields; - - for (const auto& field_name : all_field_names) { - // Find first non-null value to determine field type - std::shared_ptr field_type = nullptr; - for (const auto& row : *rows) { - auto it = row.cells.find(field_name); - if (it != row.cells.end() && it->second) { - if (auto array_result = arrow::MakeArrayFromScalar(*(it->second), 1); - array_result.ok()) { - field_type = array_result.ValueOrDie()->type(); - break; - } - } - } - - // If we couldn't determine type, default to string - if (!field_type) { - field_type = arrow::utf8(); - } - - fields.push_back(arrow::field(field_name, field_type)); - } - - output_schema = std::make_shared(fields); + if (!rows || rows->empty()) { + return create_empty_table(output_schema); } // Create array builders for each field std::vector> builders; + std::vector + field_names; // Cache field names to avoid repeated lookups + for (const auto& field : output_schema->fields()) { ARROW_ASSIGN_OR_RAISE(auto builder, arrow::MakeBuilder(field->type())); builders.push_back(std::move(builder)); + field_names.push_back(field->name()); + } + + // Pre-allocate builders for better performance + const size_t num_rows = rows->size(); + for (auto& builder : builders) { + ARROW_RETURN_NOT_OK(builder->Reserve(num_rows)); } // Populate the builders from each row for (const auto& row : *rows) { - for (size_t i = 0; i < output_schema->num_fields(); i++) { - const auto& field_name = output_schema->field(i)->name(); - auto it = row.cells.find(field_name); - - if (it != row.cells.end() && it->second) { - // We have a value for this field - auto array_result = arrow::MakeArrayFromScalar(*(it->second), 1); - if (array_result.ok()) { - auto array = array_result.ValueOrDie(); - auto scalar_result = array->GetScalar(0); - if (scalar_result.ok()) { - ARROW_RETURN_NOT_OK( - builders[i]->AppendScalar(*scalar_result.ValueOrDie())); - continue; + for (size_t i = 0; i < field_names.size(); i++) { + const auto& field_name = field_names[i]; // Use cached field name + + // Optimization: try indexed access first, fallback to string lookup + ValueRef value_ref; + bool has_value = false; + + if (i < row->cells.size() && row->cells[i].data != nullptr) { + // Fast path: use indexed access + value_ref = row->cells[i]; + has_value = true; + } + + if (has_value) { + // We have a value for this field - append directly without creating + // scalars + arrow::Status append_status; + + switch (value_ref.type) { + case ValueType::INT32: + append_status = static_cast(builders[i].get()) + ->Append(value_ref.as_int32()); + break; + case ValueType::INT64: + append_status = static_cast(builders[i].get()) + ->Append(value_ref.as_int64()); + break; + case ValueType::DOUBLE: + append_status = + static_cast(builders[i].get()) + ->Append(value_ref.as_double()); + break; + case ValueType::STRING: { + const auto& str_ref = value_ref.as_string_ref(); + append_status = + static_cast(builders[i].get()) + ->Append(str_ref.data, str_ref.length); + break; } + case ValueType::BOOL: + append_status = + static_cast(builders[i].get()) + ->Append(value_ref.as_bool()); + break; + case ValueType::NA: + append_status = builders[i]->AppendNull(); + break; + default: + append_status = builders[i]->AppendNull(); + break; + } + + if (append_status.ok()) { + continue; } } - // Fall back to NULL if we couldn't get or append the scalar + // Fall back to NULL if we couldn't append the value ARROW_RETURN_NOT_OK(builders[i]->AppendNull()); } } diff --git a/src/edge_store.cpp b/src/edge_store.cpp index 5858818..2492c50 100644 --- a/src/edge_store.cpp +++ b/src/edge_store.cpp @@ -106,6 +106,20 @@ std::vector> EdgeStore::get( return res; } +// Template overload for any iterable container (including LockedView) +template +std::vector> EdgeStore::get(const Container& ids) const { + std::vector> res; + tbb::concurrent_hash_map>::const_accessor acc; + + for (const auto& id : ids) { + if (edges.find(acc, id)) { + res.push_back(acc->second); + } + } + return res; +} + arrow::Result> EdgeStore::get(int64_t edge_id) const { tbb::concurrent_hash_map>::const_accessor acc; if (edges.find(acc, edge_id)) { @@ -123,17 +137,32 @@ arrow::Result>> EdgeStore::get_edges_from_map( return std::vector>(); } + const auto edge_ids_view = acc->second.get_all_unsafe(); std::vector> result; - const auto edge_ids = acc->second.get_all(); - result.reserve(edge_ids->size()); - for (const auto& edge_id : *edge_ids) { - tbb::concurrent_hash_map>::const_accessor - edge_acc; - if (edges.find(edge_acc, edge_id)) { - if (auto edge = edge_acc->second; - type.empty() || edge->get_type() == type) { - result.push_back(edge); + // Pre-allocate result vector to avoid reallocations + result.reserve(edge_ids_view.size()); + + // Reuse a single accessor to avoid repeated allocation/deallocation + tbb::concurrent_hash_map>::const_accessor + edge_acc; + + // Optimization: avoid string comparison if no type filter + if (type.empty()) { + // Fast path: no type filtering needed + for (const auto& edge_id : edge_ids_view) { + if (edges.find(edge_acc, edge_id)) { + result.push_back(edge_acc->second); + } + } + } else { + // Slow path: type filtering required - cache type for comparison + for (const auto& edge_id : edge_ids_view) { + if (edges.find(edge_acc, edge_id)) { + const auto& edge = edge_acc->second; + if (edge->get_type() == type) { + result.push_back(edge); + } } } } @@ -160,9 +189,9 @@ arrow::Result>> EdgeStore::get_by_type( } std::vector> result; - auto edge_ids = acc->second.get_all(); + auto edge_ids_view = acc->second.get_all_unsafe(); - for (const auto& edge_id : *edge_ids) { + for (const auto& edge_id : edge_ids_view) { tbb::concurrent_hash_map>::const_accessor edge_acc; if (edges.find(edge_acc, edge_id)) { @@ -197,12 +226,14 @@ arrow::Result> EdgeStore::generate_table( log_info("Generating table for edge type: '" + edge_type + "'"); std::vector> selected_edges; if (edge_type.empty()) { - selected_edges = get(*edge_ids_.get_all()); + auto edge_ids_view = edge_ids_.get_all_unsafe(); + selected_edges = get(edge_ids_view); } else { tbb::concurrent_hash_map>::const_accessor acc; if (edges_by_type_.find(acc, edge_type)) { - selected_edges = get(*acc->second.get_all()); + auto edge_ids_view = acc->second.get_all_unsafe(); + selected_edges = get(edge_ids_view); } }