Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 81 additions & 36 deletions src/core.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,76 @@ namespace tundradb {

constexpr static uint64_t NODE_MASK = (1ULL << 48) - 1;

// Deterministic 16-bit tag from alias string (SchemaRef::value()).
// https://www.ietf.org/archive/id/draft-eastlake-fnv-21.html
static uint16_t compute_tag(const SchemaRef& ref) {
// FNV-1a 32-bit, then fold to 16 bits.
const std::string& s = ref.value();
uint32_t h = 2166136261u;
for (unsigned char c : s) {
h ^= c;
h *= 16777619u;
}
h ^= (h >> 16);
return static_cast<uint16_t>(h & 0xFFFFu);
}

/**
* @brief Creates a packed 64-bit hash code for schema+node_id pairs
*
* This function combines a schema identifier and node ID into a single 64-bit
* value for efficient storage and comparison in hash sets/maps. This eliminates
* the need for expensive string concatenation and hashing that was previously
* used for tracking visited nodes during graph traversal.
*
* @param schema The schema reference containing a pre-computed 16-bit tag
* @param node_id The node identifier (48-bit max)
*
* @return A 64-bit packed value with layout:
* - Bits 63-48: Schema tag (16 bits)
* - Bits 47-0: Node ID (48 bits, masked)
*
* @details
* Memory Layout:
* ```
* 63 56 48 40 32 24 16 8 0
* | Schema | Node ID (48 bits) |
* | (16 bit) | |
* ```
*
* Performance Benefits:
* - Replaces string operations: "User:12345" → single uint64_t
* - Enables fast integer comparison instead of string hashing
* - Reduces memory allocations (no temporary strings)
* - Compatible with llvm::DenseSet for O(1) lookups
*
* Constraints:
* - Node IDs must fit in 48 bits (max ~281 trillion nodes)
* - Schema tags must be unique within query context
* - NODE_MASK = (1ULL << 48) - 1 = 0x0000FFFFFFFFFFFF
*
* Example:
* ```cpp
* SchemaRef user_schema = SchemaRef::parse("u:User");
* user_schema.set_tag(0x1234); // Pre-computed schema tag
*
* uint64_t packed = hash_code_(user_schema, 98765);
* // Result: 0x1234000000018149 (schema=0x1234, node=98765)
*
* // Usage in visited tracking:
* llvm::DenseSet<uint64_t> visited;
* visited.insert(packed); // Fast O(1) integer hash
* ```
*
* @see SchemaRef::tag() for schema tag computation
* @see NODE_MASK constant definition
*/
static uint64_t hash_code_(const SchemaRef& schema, int64_t node_id) {
const uint16_t schema_id16 = schema.tag();
return (static_cast<uint64_t>(schema_id16) << 48) |
(static_cast<uint64_t>(node_id) & NODE_MASK);
}

// Utility function to join containers using C++23 ranges
template <typename Container>
std::string join_container(const Container& container,
Expand Down Expand Up @@ -468,20 +538,6 @@ struct QueryState {
return true;
}

// Deterministic 16-bit tag from alias string (SchemaRef::value()).
// https://www.ietf.org/archive/id/draft-eastlake-fnv-21.html
static uint16_t compute_alias_tag(const SchemaRef& ref) {
// FNV-1a 32-bit, then fold to 16 bits.
const std::string& s = ref.value();
uint32_t h = 2166136261u;
for (unsigned char c : s) {
h ^= c;
h *= 16777619u;
}
h ^= (h >> 16);
return static_cast<uint16_t>(h & 0xFFFFu);
}

const llvm::DenseSet<int64_t>& get_ids(const SchemaRef& schema_ref) {
return ids[schema_ref.value()];
}
Expand Down Expand Up @@ -1150,11 +1206,10 @@ void log_grouped_connections(
}
}

template <StringSet VisitedSet>
arrow::Result<std::shared_ptr<std::vector<Row>>> populate_rows_bfs(
int64_t node_id, const SchemaRef& start_schema,
const std::shared_ptr<arrow::Schema>& output_schema,
const QueryState& query_state, VisitedSet& global_visited) {
const QueryState& query_state, llvm::DenseSet<uint64_t>& global_visited) {
IF_DEBUG_ENABLED {
log_debug("populate_rows_bfs::node={}:{}", start_schema.value(), node_id);
}
Expand Down Expand Up @@ -1183,12 +1238,8 @@ arrow::Result<std::shared_ptr<std::vector<Row>>> populate_rows_bfs(
item.schema_ref.value());
}
item.row->set_cell_from_node(it_fq->second, node);
// Pack 16-bit schema id (precomputed in SchemaRef) and 48-bit node id.
const uint16_t schema_id16 = item.schema_ref.tag();
const uint64_t packed = (static_cast<uint64_t>(schema_id16) << 48) |
(static_cast<uint64_t>(item.node_id) & NODE_MASK);
global_visited.insert(item.schema_ref.value() + ":" +
std::to_string(item.node_id));
const uint64_t packed = hash_code_(item.schema_ref, item.node_id);
global_visited.insert(packed);
item.path_visited_nodes.insert(packed);

// group connections by target schema (small, stack-friendly)
Expand All @@ -1201,10 +1252,7 @@ arrow::Result<std::shared_ptr<std::vector<Row>>> populate_rows_bfs(
for (const auto& conn :
query_state.connections.at(item.schema_ref.value())
.at(item.node_id)) {
const uint16_t tgt_schema_id16 = conn.target.tag();
const uint64_t tgt_packed =
(static_cast<uint64_t>(tgt_schema_id16) << 48) |
(static_cast<uint64_t>(conn.target_id) & NODE_MASK);
const uint64_t tgt_packed = hash_code_(conn.target, conn.target_id);
if (!item.path_visited_nodes.contains(tgt_packed)) {
if (query_state.ids.at(conn.target.value())
.contains(conn.target_id)) {
Expand Down Expand Up @@ -1284,15 +1332,14 @@ arrow::Result<std::shared_ptr<std::vector<Row>>> populate_batch_rows(
const llvm::DenseSet<int64_t>& node_ids, const SchemaRef& schema_ref,
const std::shared_ptr<arrow::Schema>& output_schema,
const QueryState& query_state, const TraverseType join_type,
tbb::concurrent_unordered_set<std::string>& global_visited) {
tbb::concurrent_unordered_set<uint64_t>& global_visited) {
auto rows = std::make_shared<std::vector<Row>>();
rows->reserve(node_ids.size());
std::set<std::string> local_visited;
llvm::DenseSet<uint64_t> local_visited;
// For INNER join: only process nodes that have connections
// For LEFT join: process all nodes from the "left" side
for (const auto node_id : node_ids) {
auto key = schema_ref.value() + ":" + std::to_string(node_id);
if (!global_visited.insert(key).second) {
if (!global_visited.insert(hash_code_(schema_ref, node_id)).second) {
// Skip if already processed in an earlier traversal
continue;
}
Expand Down Expand Up @@ -1352,7 +1399,7 @@ arrow::Result<std::shared_ptr<std::vector<Row>>> populate_rows(
const std::shared_ptr<arrow::Schema>& output_schema) {
auto rows = std::make_shared<std::vector<Row>>();
std::mutex rows_mtx;
tbb::concurrent_unordered_set<std::string> global_visited;
tbb::concurrent_unordered_set<uint64_t> global_visited;

// Map schemas to their join types
std::unordered_map<std::string, TraverseType> schema_join_types;
Expand Down Expand Up @@ -1771,7 +1818,7 @@ arrow::Result<std::shared_ptr<QueryResult>> Database::query(
}
// Precompute tag for FROM schema (alias-based hash)
query_state.from = query.from();
query_state.from.set_tag(QueryState::compute_alias_tag(query_state.from));
query_state.from.set_tag(compute_tag(query_state.from));
ARROW_ASSIGN_OR_RAISE(auto source_schema,
query_state.resolve_schema(query.from()));
if (!this->schema_registry_->exists(source_schema)) {
Expand Down Expand Up @@ -1863,10 +1910,8 @@ arrow::Result<std::shared_ptr<QueryResult>> Database::query(
auto traverse = std::static_pointer_cast<Traverse>(clause);
// Precompute and set tags for source/target refs (alias-based,
// deterministic)
traverse->mutable_source().set_tag(
QueryState::compute_alias_tag(traverse->source()));
traverse->mutable_target().set_tag(
QueryState::compute_alias_tag(traverse->target()));
traverse->mutable_source().set_tag(compute_tag(traverse->source()));
traverse->mutable_target().set_tag(compute_tag(traverse->target()));

ARROW_ASSIGN_OR_RAISE(auto source_schema,
query_state.resolve_schema(traverse->source()));
Expand Down