Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 12 additions & 9 deletions include/edge_store.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@
#define EDGE_STORE_HPP

#include <arrow/api.h>
#include <llvm/ADT/DenseMap.h>
#include <llvm/ADT/StringMap.h>

#include <set>
#include <shared_mutex>
#include <string>
#include <unordered_map>
#include <vector>
Expand All @@ -19,24 +22,24 @@ class EdgeStore {
struct TableCache;

private:
tbb::concurrent_hash_map<int64_t, std::shared_ptr<Edge>> edges;
tbb::concurrent_hash_map<std::string, ConcurrentSet<int64_t>> edges_by_type_;
tbb::concurrent_hash_map<int64_t, ConcurrentSet<int64_t>> outgoing_edges_;
tbb::concurrent_hash_map<int64_t, ConcurrentSet<int64_t>> incoming_edges_;
mutable std::shared_mutex edges_mutex_;
llvm::DenseMap<int64_t, std::shared_ptr<Edge>> edges;
llvm::StringMap<std::unordered_set<int64_t>> edges_by_type_;
llvm::DenseMap<int64_t, std::unordered_set<int64_t>> outgoing_edges_;
llvm::DenseMap<int64_t, std::unordered_set<int64_t>> incoming_edges_;

tbb::concurrent_hash_map<std::string, std::atomic<int64_t>>
versions_; // version
llvm::StringMap<std::atomic<int64_t>> versions_; // version
std::atomic<int64_t> edge_id_counter_{0};

ConcurrentSet<int64_t> edge_ids_;

tbb::concurrent_hash_map<std::string, std::shared_ptr<TableCache>>
tables_; // cache
mutable std::shared_mutex tables_mutex_;
llvm::StringMap<std::shared_ptr<TableCache>> tables_; // cache
std::string data_file_;
int64_t chunk_size_;

arrow::Result<std::vector<std::shared_ptr<Edge>>> get_edges_from_map(
const tbb::concurrent_hash_map<int64_t, ConcurrentSet<int64_t>> &edge_map,
const llvm::DenseMap<int64_t, std::unordered_set<int64_t>> &edge_map,
int64_t id, const std::string &type) const;

arrow::Result<std::shared_ptr<arrow::Table>> generate_table(
Expand Down
68 changes: 61 additions & 7 deletions src/core.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -427,8 +427,9 @@ struct QueryState {
llvm::StringMap<std::vector<int>>
schema_field_indices; // "User" -> [0, 1, 2], "Company -> [3,4,5]"
llvm::SmallDenseMap<int, std::string, 64>
field_id_to_name; // 0 -> "user.name"
std::atomic<int> next_field_id{0}; // Global field ID counter
field_id_to_name; // 0 -> "user.name"
llvm::StringMap<int> field_name_to_index; // "user.name" -> 0
std::atomic<int> next_field_id{0}; // Global field ID counter

llvm::StringMap<
llvm::DenseMap<int64_t, llvm::SmallVector<GraphConnection, 4>>>
Expand All @@ -439,6 +440,50 @@ struct QueryState {
std::shared_ptr<SchemaRegistry> schema_registry;
std::vector<Traverse> traversals;

// Connection object pooling to avoid repeated allocations
class ConnectionPool {
private:
std::vector<GraphConnection> pool_;
size_t next_index_ = 0;

public:
explicit ConnectionPool(size_t initial_size = 1000) : pool_(initial_size) {}

GraphConnection& get() {
if (next_index_ >= pool_.size()) {
pool_.resize(pool_.size() * 2); // Grow pool if needed
}
return pool_[next_index_++];
}

void reset() { next_index_ = 0; } // Reset for reuse
size_t size() const { return next_index_; }
};

mutable ConnectionPool connection_pool_; // Mutable for const methods

// Pre-size hash maps to avoid expensive resizing during query execution
void reserve_capacity(const Query& query) {
// Estimate schema count from FROM + TRAVERSE clauses
size_t estimated_schemas = 1; // FROM clause
for (const auto& clause : query.clauses()) {
if (clause->type() == Clause::Type::TRAVERSE) {
estimated_schemas += 2; // source + target schemas
}
}

// Pre-size standard containers (LLVM containers don't support reserve)
tables.reserve(estimated_schemas);
aliases.reserve(estimated_schemas);

// Estimate nodes per schema (conservative estimate)
size_t estimated_nodes_per_schema = 1000;
incoming.reserve(estimated_nodes_per_schema);

// Pre-size field mappings
field_id_to_name.reserve(estimated_schemas * 8); // ~8 fields per schema
}

arrow::Result<std::string> resolve_schema(const SchemaRef& schema_ref) {
// todo we need to separate functions: assign alias , resolve
if (aliases.contains(schema_ref.value()) && schema_ref.is_declaration()) {
Expand Down Expand Up @@ -491,6 +536,7 @@ struct QueryState {
names.emplace_back(fq_name);
indices.emplace_back(field_id);
field_id_to_name[field_id] = fq_name;
field_name_to_index[fq_name] = field_id;
}

fq_field_names[alias] = std::move(names);
Expand Down Expand Up @@ -1637,7 +1683,6 @@ arrow::Result<std::shared_ptr<arrow::Table>> create_table_from_rows(
bool has_value = false;

if (i < row->cells.size() && row->cells[i].data != nullptr) {
// Fast path: use indexed access
value_ref = row->cells[i];
has_value = true;
}
Expand Down Expand Up @@ -1900,6 +1945,10 @@ arrow::Result<std::shared_ptr<QueryResult>> Database::query(
const Query& query) const {
QueryState query_state;
auto result = std::make_shared<QueryResult>();

// Pre-size hash maps to avoid expensive resizing during execution
query_state.reserve_capacity(query);

IF_DEBUG_ENABLED {
log_debug("Executing query starting from schema '{}'",
query.from().toString());
Expand Down Expand Up @@ -2123,10 +2172,15 @@ arrow::Result<std::shared_ptr<QueryResult>> Database::query(
source_had_match = true;
}
matched_target_ids.insert(target_node->id);
auto conn =
GraphConnection{traverse->source(), source_id,
traverse->edge_type(), "",
traverse->target(), target_node->id};
// Use connection pool to avoid allocation
auto& conn = query_state.connection_pool_.get();
conn.source = traverse->source();
conn.source_id = source_id;
conn.edge_type = traverse->edge_type();
conn.label = "";
conn.target = traverse->target();
conn.target_id = target_node->id;

query_state.connections[traverse->source().value()][source_id]
.push_back(conn);
query_state.incoming[target_node->id].push_back(conn);
Expand Down
Loading