From 1506b462e3a959c0af9076b340c32b967ab8c055 Mon Sep 17 00:00:00 2001 From: dmgcodevil Date: Thu, 2 Oct 2025 14:32:02 -0400 Subject: [PATCH] use llvm map in edge_store.cpp instead of tbb connections pool, reserve collection size in query state --- include/edge_store.hpp | 21 ++- src/core.cpp | 68 +++++++- src/edge_store.cpp | 364 +++++++++++++++++++---------------------- 3 files changed, 239 insertions(+), 214 deletions(-) diff --git a/include/edge_store.hpp b/include/edge_store.hpp index 136c025..2ed5a3b 100644 --- a/include/edge_store.hpp +++ b/include/edge_store.hpp @@ -2,8 +2,11 @@ #define EDGE_STORE_HPP #include +#include +#include #include +#include #include #include #include @@ -19,24 +22,24 @@ class EdgeStore { struct TableCache; private: - tbb::concurrent_hash_map> edges; - tbb::concurrent_hash_map> edges_by_type_; - tbb::concurrent_hash_map> outgoing_edges_; - tbb::concurrent_hash_map> incoming_edges_; + mutable std::shared_mutex edges_mutex_; + llvm::DenseMap> edges; + llvm::StringMap> edges_by_type_; + llvm::DenseMap> outgoing_edges_; + llvm::DenseMap> incoming_edges_; - tbb::concurrent_hash_map> - versions_; // version + llvm::StringMap> versions_; // version std::atomic edge_id_counter_{0}; ConcurrentSet edge_ids_; - tbb::concurrent_hash_map> - tables_; // cache + mutable std::shared_mutex tables_mutex_; + llvm::StringMap> tables_; // cache std::string data_file_; int64_t chunk_size_; arrow::Result>> get_edges_from_map( - const tbb::concurrent_hash_map> &edge_map, + const llvm::DenseMap> &edge_map, int64_t id, const std::string &type) const; arrow::Result> generate_table( diff --git a/src/core.cpp b/src/core.cpp index cc044d8..03e0d22 100644 --- a/src/core.cpp +++ b/src/core.cpp @@ -427,8 +427,9 @@ struct QueryState { llvm::StringMap> schema_field_indices; // "User" -> [0, 1, 2], "Company -> [3,4,5]" llvm::SmallDenseMap - field_id_to_name; // 0 -> "user.name" - std::atomic next_field_id{0}; // Global field ID counter + field_id_to_name; // 0 -> "user.name" + llvm::StringMap field_name_to_index; // "user.name" -> 0 + std::atomic next_field_id{0}; // Global field ID counter llvm::StringMap< llvm::DenseMap>> @@ -439,6 +440,50 @@ struct QueryState { std::shared_ptr schema_registry; std::vector traversals; + // Connection object pooling to avoid repeated allocations + class ConnectionPool { + private: + std::vector pool_; + size_t next_index_ = 0; + + public: + explicit ConnectionPool(size_t initial_size = 1000) : pool_(initial_size) {} + + GraphConnection& get() { + if (next_index_ >= pool_.size()) { + pool_.resize(pool_.size() * 2); // Grow pool if needed + } + return pool_[next_index_++]; + } + + void reset() { next_index_ = 0; } // Reset for reuse + size_t size() const { return next_index_; } + }; + + mutable ConnectionPool connection_pool_; // Mutable for const methods + + // Pre-size hash maps to avoid expensive resizing during query execution + void reserve_capacity(const Query& query) { + // Estimate schema count from FROM + TRAVERSE clauses + size_t estimated_schemas = 1; // FROM clause + for (const auto& clause : query.clauses()) { + if (clause->type() == Clause::Type::TRAVERSE) { + estimated_schemas += 2; // source + target schemas + } + } + + // Pre-size standard containers (LLVM containers don't support reserve) + tables.reserve(estimated_schemas); + aliases.reserve(estimated_schemas); + + // Estimate nodes per schema (conservative estimate) + size_t estimated_nodes_per_schema = 1000; + incoming.reserve(estimated_nodes_per_schema); + + // Pre-size field mappings + field_id_to_name.reserve(estimated_schemas * 8); // ~8 fields per schema + } + arrow::Result resolve_schema(const SchemaRef& schema_ref) { // todo we need to separate functions: assign alias , resolve if (aliases.contains(schema_ref.value()) && schema_ref.is_declaration()) { @@ -491,6 +536,7 @@ struct QueryState { names.emplace_back(fq_name); indices.emplace_back(field_id); field_id_to_name[field_id] = fq_name; + field_name_to_index[fq_name] = field_id; } fq_field_names[alias] = std::move(names); @@ -1637,7 +1683,6 @@ arrow::Result> create_table_from_rows( bool has_value = false; if (i < row->cells.size() && row->cells[i].data != nullptr) { - // Fast path: use indexed access value_ref = row->cells[i]; has_value = true; } @@ -1900,6 +1945,10 @@ arrow::Result> Database::query( const Query& query) const { QueryState query_state; auto result = std::make_shared(); + + // Pre-size hash maps to avoid expensive resizing during execution + query_state.reserve_capacity(query); + IF_DEBUG_ENABLED { log_debug("Executing query starting from schema '{}'", query.from().toString()); @@ -2123,10 +2172,15 @@ arrow::Result> Database::query( source_had_match = true; } matched_target_ids.insert(target_node->id); - auto conn = - GraphConnection{traverse->source(), source_id, - traverse->edge_type(), "", - traverse->target(), target_node->id}; + // Use connection pool to avoid allocation + auto& conn = query_state.connection_pool_.get(); + conn.source = traverse->source(); + conn.source_id = source_id; + conn.edge_type = traverse->edge_type(); + conn.label = ""; + conn.target = traverse->target(); + conn.target_id = target_node->id; + query_state.connections[traverse->source().value()][source_id] .push_back(conn); query_state.incoming[target_node->id].push_back(conn); diff --git a/src/edge_store.cpp b/src/edge_store.cpp index 2492c50..a2d620e 100644 --- a/src/edge_store.cpp +++ b/src/edge_store.cpp @@ -1,5 +1,7 @@ #include "edge_store.hpp" +#include + #include "logger.hpp" namespace tundradb { @@ -12,82 +14,74 @@ arrow::Result> EdgeStore::create_edge( } arrow::Result EdgeStore::add(const std::shared_ptr& edge) { - { - tbb::concurrent_hash_map>::accessor acc; - if (!this->edges.insert(acc, edge->get_id())) { - return arrow::Status::KeyError("Edge already exists with id=" + - std::to_string(edge->get_id())); - } - edge_ids_.insert(edge->get_id()); - acc->second = edge; - } + std::unique_lock lock(edges_mutex_); - { - tbb::concurrent_hash_map>::accessor acc; - this->edges_by_type_.insert(acc, edge->get_type()); - acc->second.insert(edge->get_id()); + // Check if edge already exists + if (this->edges.find(edge->get_id()) != this->edges.end()) { + return arrow::Status::KeyError("Edge already exists with id=" + + std::to_string(edge->get_id())); } - { - tbb::concurrent_hash_map>::accessor acc; - this->outgoing_edges_.insert(acc, edge->get_source_id()); - acc->second.insert(edge->get_id()); - } + // Add edge to main edges map + this->edges[edge->get_id()] = edge; + edge_ids_.insert(edge->get_id()); - { - tbb::concurrent_hash_map>::accessor acc; - this->incoming_edges_.insert(acc, edge->get_target_id()); - acc->second.insert(edge->get_id()); - } + // Add to edges_by_type + this->edges_by_type_[edge->get_type()].insert(edge->get_id()); - { - tbb::concurrent_hash_map>::accessor acc; - if (this->versions_.insert(acc, edge->get_type())) { - acc->second.store(1); - } else { - acc->second.fetch_add(1); - } + // Add to outgoing_edges + this->outgoing_edges_[edge->get_source_id()].insert(edge->get_id()); + + // Add to incoming_edges + this->incoming_edges_[edge->get_target_id()].insert(edge->get_id()); + + // Update version counter (atomic, no additional locking needed) + auto version_it = this->versions_.find(edge->get_type()); + if (version_it == this->versions_.end()) { + this->versions_[edge->get_type()].store(1); + } else { + version_it->second.fetch_add(1); } return true; } arrow::Result EdgeStore::remove(int64_t edge_id) { - tbb::concurrent_hash_map>::accessor acc; - - if (edges.find(acc, edge_id)) { - const auto edge = acc->second; - if (edges.erase(acc)) { - { - tbb::concurrent_hash_map>::accessor - edges_by_type_acc; - if (edges_by_type_.find(edges_by_type_acc, edge->get_type())) { - edges_by_type_acc->second.remove(edge->get_id()); - } - } - { - tbb::concurrent_hash_map>::accessor - outgoing_edges_acc; - if (outgoing_edges_.find(outgoing_edges_acc, edge->get_source_id())) { - outgoing_edges_acc->second.remove(edge->get_id()); - } - } - { - tbb::concurrent_hash_map>::accessor - incoming_edges_acc; - if (incoming_edges_.find(incoming_edges_acc, edge->get_target_id())) { - incoming_edges_acc->second.remove(edge->get_id()); - } - } + std::unique_lock lock(edges_mutex_); + + auto edge_it = edges.find(edge_id); + if (edge_it != edges.end()) { + const auto edge = edge_it->second; + + // Remove from main edges map + edges.erase(edge_it); + + // Remove from edges_by_type + auto type_it = edges_by_type_.find(edge->get_type()); + if (type_it != edges_by_type_.end()) { + type_it->second.erase(edge->get_id()); } - { - tbb::concurrent_hash_map>::accessor acc; - if (this->versions_.insert(acc, edge->get_type())) { - acc->second.store(1); - } else { - acc->second.fetch_add(1); - } + + // Remove from outgoing_edges + auto outgoing_it = outgoing_edges_.find(edge->get_source_id()); + if (outgoing_it != outgoing_edges_.end()) { + outgoing_it->second.erase(edge->get_id()); + } + + // Remove from incoming_edges + auto incoming_it = incoming_edges_.find(edge->get_target_id()); + if (incoming_it != incoming_edges_.end()) { + incoming_it->second.erase(edge->get_id()); + } + + // Update version counter (atomic, no additional locking needed) + auto version_it = this->versions_.find(edge->get_type()); + if (version_it == this->versions_.end()) { + this->versions_[edge->get_type()].store(1); + } else { + version_it->second.fetch_add(1); } + return true; } return false; @@ -95,12 +89,13 @@ arrow::Result EdgeStore::remove(int64_t edge_id) { std::vector> EdgeStore::get( const std::set& ids) const { + std::shared_lock lock(edges_mutex_); std::vector> res; - tbb::concurrent_hash_map>::const_accessor acc; for (auto id : ids) { - if (edges.find(acc, id)) { - res.push_back(acc->second); + auto it = edges.find(id); + if (it != edges.end()) { + res.push_back(it->second); } } return res; @@ -109,57 +104,63 @@ std::vector> EdgeStore::get( // Template overload for any iterable container (including LockedView) template std::vector> EdgeStore::get(const Container& ids) const { + std::shared_lock lock(edges_mutex_); std::vector> res; - tbb::concurrent_hash_map>::const_accessor acc; for (const auto& id : ids) { - if (edges.find(acc, id)) { - res.push_back(acc->second); + auto it = edges.find(id); + if (it != edges.end()) { + res.push_back(it->second); } } return res; } arrow::Result> EdgeStore::get(int64_t edge_id) const { - tbb::concurrent_hash_map>::const_accessor acc; - if (edges.find(acc, edge_id)) { - return acc->second; + std::shared_lock lock(edges_mutex_); + auto it = edges.find(edge_id); + if (it != edges.end()) { + return it->second; } return arrow::Status::KeyError("Edge not found with id=" + std::to_string(edge_id)); } arrow::Result>> EdgeStore::get_edges_from_map( - const tbb::concurrent_hash_map>& edge_map, + const llvm::DenseMap>& edge_map, const int64_t id, const std::string& type) const { - tbb::concurrent_hash_map>::const_accessor acc; - if (!edge_map.find(acc, id)) { + std::shared_lock lock(edges_mutex_); + + auto it = edge_map.find(id); + if (it == edge_map.end()) { return std::vector>(); } - const auto edge_ids_view = acc->second.get_all_unsafe(); + const auto& edge_ids = it->second; std::vector> result; // Pre-allocate result vector to avoid reallocations - result.reserve(edge_ids_view.size()); + result.reserve(edge_ids.size()); - // Reuse a single accessor to avoid repeated allocation/deallocation - tbb::concurrent_hash_map>::const_accessor - edge_acc; + // Convert unordered_set to sorted vector for consistent ordering + std::vector sorted_edge_ids(edge_ids.begin(), edge_ids.end()); + std::sort(sorted_edge_ids.begin(), sorted_edge_ids.end()); // Optimization: avoid string comparison if no type filter if (type.empty()) { // Fast path: no type filtering needed - for (const auto& edge_id : edge_ids_view) { - if (edges.find(edge_acc, edge_id)) { - result.push_back(edge_acc->second); + for (const auto& edge_id : sorted_edge_ids) { + auto edge_it = edges.find(edge_id); + if (edge_it != edges.end()) { + result.push_back(edge_it->second); } } } else { // Slow path: type filtering required - cache type for comparison - for (const auto& edge_id : edge_ids_view) { - if (edges.find(edge_acc, edge_id)) { - const auto& edge = edge_acc->second; + for (const auto& edge_id : sorted_edge_ids) { + auto edge_it = edges.find(edge_id); + if (edge_it != edges.end()) { + const auto& edge = edge_it->second; if (edge->get_type() == type) { result.push_back(edge); } @@ -182,20 +183,24 @@ arrow::Result>> EdgeStore::get_incoming_edges( arrow::Result>> EdgeStore::get_by_type( const std::string& type) const { - tbb::concurrent_hash_map>::const_accessor - acc; - if (!edges_by_type_.find(acc, type)) { + std::shared_lock lock(edges_mutex_); + + auto it = edges_by_type_.find(type); + if (it == edges_by_type_.end()) { return std::vector>(); } std::vector> result; - auto edge_ids_view = acc->second.get_all_unsafe(); + const auto& edge_ids = it->second; + + // Convert unordered_set to sorted vector for consistent ordering + std::vector sorted_edge_ids(edge_ids.begin(), edge_ids.end()); + std::sort(sorted_edge_ids.begin(), sorted_edge_ids.end()); - for (const auto& edge_id : edge_ids_view) { - tbb::concurrent_hash_map>::const_accessor - edge_acc; - if (edges.find(edge_acc, edge_id)) { - result.push_back(edge_acc->second); + for (const auto& edge_id : sorted_edge_ids) { + auto edge_it = edges.find(edge_id); + if (edge_it != edges.end()) { + result.push_back(edge_it->second); } } @@ -204,19 +209,19 @@ arrow::Result>> EdgeStore::get_by_type( arrow::Result EdgeStore::get_version( const std::string& edge_type) const { - tbb::concurrent_hash_map>::const_accessor - acc; - if (versions_.find(acc, edge_type)) { - return acc->second.load(std::memory_order_acquire); + auto it = versions_.find(edge_type); + if (it != versions_.end()) { + return it->second.load(std::memory_order_acquire); } return arrow::Status::KeyError("No version found for edge type: " + edge_type); } std::set EdgeStore::get_edge_types() const { + std::shared_lock lock(edges_mutex_); std::set result; for (auto it = edges_by_type_.begin(); it != edges_by_type_.end(); ++it) { - result.insert(it->first); + result.insert(std::string(it->first())); // Convert StringRef to string } return result; } @@ -225,18 +230,26 @@ arrow::Result> EdgeStore::generate_table( const std::string& edge_type) const { log_info("Generating table for edge type: '" + edge_type + "'"); std::vector> selected_edges; - if (edge_type.empty()) { - auto edge_ids_view = edge_ids_.get_all_unsafe(); - selected_edges = get(edge_ids_view); - } else { - tbb::concurrent_hash_map>::const_accessor acc; - if (edges_by_type_.find(acc, edge_type)) { - auto edge_ids_view = acc->second.get_all_unsafe(); + { + std::shared_lock lock(edges_mutex_); + if (edge_type.empty()) { + auto edge_ids_view = edge_ids_.get_all_unsafe(); selected_edges = get(edge_ids_view); + } else { + auto it = edges_by_type_.find(edge_type); + if (it != edges_by_type_.end()) { + const auto& edge_ids = it->second; + // Convert unordered_set to sorted vector for consistent ordering + std::vector sorted_edge_ids(edge_ids.begin(), edge_ids.end()); + std::sort(sorted_edge_ids.begin(), sorted_edge_ids.end()); + selected_edges = get(sorted_edge_ids); + } } } + // Edges are already sorted by ID since we sorted the edge_ids before calling + // get() + if (selected_edges.empty()) { log_info("No edges found for type '" + edge_type + "', returning empty table"); @@ -381,116 +394,71 @@ arrow::Result> EdgeStore::generate_table( arrow::Result EdgeStore::get_version_snapshot( const std::string& edge_type) const { - tbb::concurrent_hash_map>::const_accessor - acc; - if (versions_.find(acc, edge_type)) { - return acc->second.load(std::memory_order_acquire); + auto it = versions_.find(edge_type); + if (it != versions_.end()) { + return it->second.load(std::memory_order_acquire); } return arrow::Status::KeyError("versions does have edge=" + edge_type); } arrow::Result> EdgeStore::get_table( const std::string& edge_type) { - constexpr int MAX_RETRIES = 5; - int retry_count = 0; - - if (edges_by_type_.empty() || edges_by_type_.count(edge_type) == 0) { - return arrow::Status::KeyError("edge type doesn't exists"); + { + std::shared_lock edges_lock(edges_mutex_); + if (edges_by_type_.empty() || + edges_by_type_.find(edge_type) == edges_by_type_.end()) { + return arrow::Status::KeyError("edge type doesn't exists"); + } } - while (retry_count < MAX_RETRIES) { - { - tbb::concurrent_hash_map< - std::string, std::shared_ptr>::const_accessor tables_acc; - if (tables_.find(tables_acc, edge_type)) { - auto latest_version_res = get_version_snapshot(edge_type); - if (!latest_version_res.ok()) { - return latest_version_res.status(); - } - const int64_t latest_version = latest_version_res.ValueOrDie(); + // Check cache first + { + std::shared_lock tables_lock(tables_mutex_); + auto cache_it = tables_.find(edge_type); + if (cache_it != tables_.end()) { + auto latest_version_res = get_version_snapshot(edge_type); + if (latest_version_res.ok()) { + const int64_t latest_version = *latest_version_res; const int64_t current_version = - tables_acc->second->version.load(std::memory_order_acquire); + cache_it->second->version.load(std::memory_order_acquire); - if (current_version > latest_version) { - return arrow::Status::Invalid( - "Invalid state: current_version > latest_version"); - } if (current_version == latest_version && - tables_acc->second->table != nullptr) { - return tables_acc->second->table; + cache_it->second->table != nullptr) { + return cache_it->second->table; } } } + } - tbb::concurrent_hash_map>::accessor - tables_acc; - - if (tables_.find(tables_acc, edge_type)) { - auto latest_version_res = get_version_snapshot(edge_type); - if (!latest_version_res.ok()) { - return latest_version_res.status(); - } - const int64_t latest_version = latest_version_res.ValueOrDie(); - const int64_t current_version = - tables_acc->second->version.load(std::memory_order_acquire); - - if (current_version < latest_version) { - std::lock_guard lock(tables_acc->second->lock); - // Double-check under lock - if (current_version == - tables_acc->second->version.load(std::memory_order_acquire)) { - // Generate table safely - auto table_res = generate_table(edge_type); - if (!table_res.ok()) { - return table_res.status(); - } - - tables_acc->second->table = table_res.ValueOrDie(); - tables_acc->second->version.store(latest_version, - std::memory_order_release); - return tables_acc->second->table; - } - // Another thread updated the table, we'll retry in the next loop - // iteration - } else { - // Current version is already up-to-date, return it - return tables_acc->second->table; - } - } else if (tables_.insert(tables_acc, edge_type)) { - const auto table_cache = std::make_shared(); - - tables_acc->second = table_cache; - std::lock_guard lock(table_cache->lock); - - auto latest_version_res = get_version_snapshot(edge_type); - if (!latest_version_res.ok()) { - return latest_version_res.status(); - } - const int64_t latest_version = latest_version_res.ValueOrDie(); - - auto table_res = generate_table(edge_type); - if (!table_res.ok()) { - return table_res.status(); - } + // Generate new table + auto table_res = generate_table(edge_type); + if (!table_res.ok()) { + return table_res.status(); + } - table_cache->table = table_res.ValueOrDie(); + // Update cache + auto latest_version_res = get_version_snapshot(edge_type); + if (latest_version_res.ok()) { + const int64_t latest_version = *latest_version_res; + + std::unique_lock tables_lock(tables_mutex_); + auto cache_it = tables_.find(edge_type); + if (cache_it == tables_.end()) { + // Create new cache entry + auto table_cache = std::make_shared(); + table_cache->table = *table_res; table_cache->version.store(latest_version, std::memory_order_release); - - return table_cache->table; - } - - // If we reached here, we need to retry - retry_count++; - - // Small backoff to reduce contention - if (retry_count < MAX_RETRIES) { - std::this_thread::sleep_for(std::chrono::milliseconds(10 * retry_count)); + tables_[edge_type] = table_cache; + } else { + // Update existing cache entry + std::lock_guard lock(cache_it->second->lock); + cache_it->second->table = *table_res; + cache_it->second->version.store(latest_version, + std::memory_order_release); } } - return arrow::Status::Invalid("Failed to get table for edge type '" + - edge_type + "' after " + - std::to_string(MAX_RETRIES) + " retries"); + return table_res; } } // namespace tundradb