diff --git a/include/query.hpp b/include/query.hpp index b39ec1a..e9dbd2d 100644 --- a/include/query.hpp +++ b/include/query.hpp @@ -22,11 +22,15 @@ struct SchemaRef { std::string schema_; std::string value_; bool declaration_; + // Cached 16-bit tag for fast path operations (e.g., BFS packing) + uint16_t schema_tag_ = 0; public: [[nodiscard]] std::string schema() const { return schema_; } [[nodiscard]] std::string value() const { return value_; } [[nodiscard]] bool is_declaration() const { return declaration_; } + [[nodiscard]] uint16_t tag() const { return schema_tag_; } + void set_tag(uint16_t t) { schema_tag_ = t; } static SchemaRef parse(const std::string& s) { SchemaRef r; @@ -154,6 +158,10 @@ class Traverse final : public Clause { [[nodiscard]] const std::string& edge_type() const { return edge_type_; } [[nodiscard]] const SchemaRef& target() const { return target_; } [[nodiscard]] TraverseType traverse_type() const { return traverse_type_; } + + // Internal mutation helpers for precomputing tags + SchemaRef& mutable_source() { return source_; } + SchemaRef& mutable_target() { return target_; } }; struct Select final : Clause { diff --git a/include/schema_layout.hpp b/include/schema_layout.hpp index af9a510..e40d085 100644 --- a/include/schema_layout.hpp +++ b/include/schema_layout.hpp @@ -20,23 +20,24 @@ namespace tundradb { * Helper functions for bit set manipulation to track which fields are set */ inline size_t get_bitset_size_bytes(const size_t num_fields) { - return (num_fields + 7) / 8; // Round up to nearest byte + size_t bit_words = (num_fields + 63) / 64; + size_t bitset_bytes = bit_words * sizeof(uint64_t); + return bitset_bytes; } -inline bool is_field_set(const char* bitset, const size_t field_index) { - const size_t byte_index = field_index / 8; - const size_t bit_index = field_index % 8; - return (bitset[byte_index] & (1 << bit_index)) != 0; +inline bool is_field_set(const char* base, const size_t idx) { + auto words = reinterpret_cast(base); + return (words[idx >> 6] >> (idx & 63)) & 1ULL; } -inline void set_field_bit(char* bitset, const size_t field_index, - const bool is_set) { - const size_t byte_index = field_index / 8; - const size_t bit_index = field_index % 8; +inline void set_field_bit(char* base, size_t idx, bool is_set) { + auto words = reinterpret_cast(base); + uint64_t mask = 1ULL << (idx & 63); + uint64_t& w = words[idx >> 6]; if (is_set) { - bitset[byte_index] |= (1 << bit_index); + w |= mask; } else { - bitset[byte_index] &= ~(1 << bit_index); + w &= ~mask; } } diff --git a/include/utils.hpp b/include/utils.hpp index bc93d45..3640a3d 100644 --- a/include/utils.hpp +++ b/include/utils.hpp @@ -6,6 +6,7 @@ #include #include #include +#include #include #include @@ -84,7 +85,7 @@ static arrow::Result> filter_table_by_id( return filtered_table.table(); } -static arrow::Result> get_ids_from_table( +static arrow::Result> get_ids_from_table( std::shared_ptr table) { log_debug("Extracting IDs from table with {} rows", table->num_rows()); @@ -95,7 +96,8 @@ static arrow::Result> get_ids_from_table( } auto id_column = table->column(id_idx); - std::set result_ids; + llvm::DenseSet result_ids; + result_ids.reserve(table->num_rows()); for (int chunk_idx = 0; chunk_idx < id_column->num_chunks(); chunk_idx++) { auto chunk = std::static_pointer_cast( diff --git a/src/core.cpp b/src/core.cpp index 5bf4f7c..3d19ae2 100644 --- a/src/core.cpp +++ b/src/core.cpp @@ -4,6 +4,11 @@ #include #include #include +#include +#include +#include +#include +#include #include #include #include @@ -30,6 +35,8 @@ namespace fs = std::filesystem; namespace tundradb { +constexpr static uint64_t NODE_MASK = (1ULL << 48) - 1; + // Utility function to join containers using C++23 ranges template std::string join_container(const Container& container, @@ -380,13 +387,14 @@ std::set get_roots( struct QueryState { SchemaRef from; std::unordered_map> tables; - std::unordered_map> ids; + llvm::StringMap> ids; std::unordered_map aliases; - std::unordered_map>> + // Precomputed fully-qualified field names per alias (SchemaRef::value()) + llvm::StringMap> fq_field_names; + llvm::StringMap< + llvm::DenseMap>> connections; // outgoing - - std::unordered_map> incoming; + llvm::DenseMap> incoming; std::shared_ptr node_manager; std::shared_ptr schema_registry; @@ -406,7 +414,53 @@ struct QueryState { return aliases[schema_ref.value()]; } - const std::set& get_ids(const SchemaRef& schema_ref) { + // Precompute fully-qualified field names for source and target aliases + arrow::Result compute_fully_qualified_names( + const SchemaRef& schema_ref) { + const auto it = aliases.find(schema_ref.value()); + if (it == aliases.end()) { + return arrow::Status::KeyError("keyset does not contain alias '{}'", + schema_ref.value()); + } + return compute_fully_qualified_names(schema_ref, it->second); + } + + // Precompute fully-qualified field names for source and target aliases + arrow::Result compute_fully_qualified_names( + const SchemaRef& schema_ref, const std::string& resolved_schema) { + const std::string& alias = schema_ref.value(); + if (fq_field_names.contains(alias)) { + return false; + } + auto schema_res = schema_registry->get_arrow(resolved_schema); + if (!schema_res.ok()) { + return schema_res.status(); + } + const auto schema = schema_res.ValueOrDie(); + std::vector names; + names.reserve(schema->num_fields()); + for (const auto& f : schema->fields()) { + names.emplace_back(alias + "." + f->name()); + } + fq_field_names[alias] = std::move(names); + return true; + } + + // Deterministic 16-bit tag from alias string (SchemaRef::value()). + // https://www.ietf.org/archive/id/draft-eastlake-fnv-21.html + static uint16_t compute_alias_tag(const SchemaRef& ref) { + // FNV-1a 32-bit, then fold to 16 bits. + const std::string& s = ref.value(); + uint32_t h = 2166136261u; + for (unsigned char c : s) { + h ^= c; + h *= 16777619u; + } + h ^= (h >> 16); + return static_cast(h & 0xFFFFu); + } + + const llvm::DenseSet& get_ids(const SchemaRef& schema_ref) { return ids[schema_ref.value()]; } @@ -450,7 +504,7 @@ struct QueryState { ss << " IDs (" << ids.size() << "):\n"; for (const auto& [alias, id_set] : ids) { - ss << " - " << alias << ": " << id_set.size() << " IDs\n"; + ss << " - " << alias.str() << ": " << id_set.size() << " IDs\n"; } ss << " Aliases (" << aliases.size() << "):\n"; @@ -462,7 +516,7 @@ struct QueryState { << " source nodes):"; for (const auto& [from, conns] : connections) { for (const auto& [from_id, conn_vec] : conns) { - ss << "from " << from << ":" << from_id << ":\n"; + ss << "from " << from.str() << ":" << from_id << ":\n"; for (const auto& conn : conn_vec) { ss << " - " << conn.target.value() << ":" << conn.target_id << "\n"; @@ -624,10 +678,13 @@ struct Row { cells.at(name)->is_valid; } - void set_cell_from_node(const SchemaRef& schema_ref, + void set_cell_from_node(const std::vector& fq_field_names, const std::shared_ptr& node) { - for (const auto& field : node->get_schema()->fields()) { - auto full_name = schema_ref.value() + "." + field->name(); + const auto& fields = node->get_schema()->fields(); + const size_t n = fields.size(); + for (size_t i = 0; i < n; ++i) { + const auto& field = fields[i]; + const auto& full_name = fq_field_names[i]; this->set_cell(full_name, node->get_value_ptr(field->name()).ValueOrDie(), field->type()); } @@ -1024,13 +1081,12 @@ struct QueueItem { SchemaRef schema_ref; int level; std::shared_ptr row; - std::set - path_visited_nodes; // schema:node visited in this specific path + llvm::SmallDenseSet + path_visited_nodes; // packed (schema_id<<48 | node_id) for this path std::vector path; QueueItem(int64_t id, const SchemaRef& schema, int l, std::shared_ptr r) : node_id(id), schema_ref(schema), level(l), row(std::move(r)) { - path_visited_nodes.insert(schema_ref.value() + ":" + std::to_string(id)); path.push_back(PathSegment{schema.value(), id}); } }; @@ -1038,7 +1094,8 @@ struct QueueItem { // Log grouped connections for a node void log_grouped_connections( int64_t node_id, - const std::unordered_map>& + const llvm::SmallDenseMap, 4>& grouped_connections) { if (Logger::get_instance().get_level() == LogLevel::DEBUG) { if (grouped_connections.empty()) { @@ -1049,8 +1106,10 @@ void log_grouped_connections( log_debug("Node {} has connections to {} target schemas:", node_id, grouped_connections.size()); - for (const auto& [target_schema, connections] : grouped_connections) { - log_debug(" To schema '{}': {} connections", target_schema, + for (const auto& it : grouped_connections) { + auto target_schema = it.first; + const auto& connections = it.second; + log_debug(" To schema '{}': {} connections", target_schema.str(), connections.size()); for (size_t i = 0; i < connections.size(); ++i) { @@ -1076,6 +1135,7 @@ arrow::Result>> populate_rows_bfs( std::queue queue; queue.emplace(node_id, start_schema, 0, initial_row); + // Use precomputed fully-qualified field names from QueryState while (!queue.empty()) { auto size = queue.size(); @@ -1083,14 +1143,28 @@ arrow::Result>> populate_rows_bfs( auto item = queue.front(); queue.pop(); auto node = query_state.node_manager->get_node(item.node_id).ValueOrDie(); - item.row->set_cell_from_node(item.schema_ref, node); - std::string schema_node_key = - item.schema_ref.value() + ":" + std::to_string(item.node_id); - global_visited.insert(schema_node_key); - item.path_visited_nodes.insert(schema_node_key); - - // group connections by target schema - std::unordered_map> + const auto& it_fq = + query_state.fq_field_names.find(item.schema_ref.value()); + if (it_fq == query_state.fq_field_names.end()) { + std::cout + << "ERROR: Could not find fully qualified field names for schema " + << item.schema_ref.value() << std::endl; + return arrow::Status::KeyError( + "Missing precomputed fq_field_names for alias {}", + item.schema_ref.value()); + } + item.row->set_cell_from_node(it_fq->second, node); + // Pack 16-bit schema id (precomputed in SchemaRef) and 48-bit node id. + const uint16_t schema_id16 = item.schema_ref.tag(); + const uint64_t packed = (static_cast(schema_id16) << 48) | + (static_cast(item.node_id) & NODE_MASK); + global_visited.insert(item.schema_ref.value() + ":" + + std::to_string(item.node_id)); + item.path_visited_nodes.insert(packed); + + // group connections by target schema (small, stack-friendly) + llvm::SmallDenseMap, 4> grouped_connections; bool skip = false; @@ -1098,9 +1172,11 @@ arrow::Result>> populate_rows_bfs( for (const auto& conn : query_state.connections.at(item.schema_ref.value()) .at(item.node_id)) { - std::string schema_node_key = - conn.target.value() + ":" + std::to_string(conn.target_id); - if (!item.path_visited_nodes.contains(schema_node_key)) { + const uint16_t tgt_schema_id16 = conn.target.tag(); + const uint64_t tgt_packed = + (static_cast(tgt_schema_id16) << 48) | + (static_cast(conn.target_id) & NODE_MASK); + if (!item.path_visited_nodes.contains(tgt_packed)) { if (query_state.ids.at(conn.target.value()) .contains(conn.target_id)) { grouped_connections[conn.target.value()].push_back(conn); @@ -1180,13 +1256,14 @@ arrow::Result>> populate_rows_bfs( return std::make_shared>(merged); } -template +// template arrow::Result>> populate_batch_rows( - const NodeIdsT& node_ids, const SchemaRef& schema_ref, + const llvm::DenseSet& node_ids, const SchemaRef& schema_ref, const std::shared_ptr& output_schema, const QueryState& query_state, const TraverseType join_type, tbb::concurrent_unordered_set& global_visited) { auto rows = std::make_shared>(); + rows->reserve(node_ids.size()); std::set local_visited; // For INNER join: only process nodes that have connections // For LEFT join: process all nodes from the "left" side @@ -1219,14 +1296,15 @@ arrow::Result>> populate_batch_rows( return rows; } -std::vector> batch_node_ids(const std::set& ids, - size_t batch_size) { - std::vector> batches; - std::vector current_batch; +std::vector> batch_node_ids( + const llvm::DenseSet& ids, const size_t batch_size) { + std::vector> batches; + batches.reserve(ids.size() / batch_size + 1); + llvm::DenseSet current_batch; current_batch.reserve(batch_size); for (const auto& id : ids) { - current_batch.push_back(id); + current_batch.insert(id); if (current_batch.size() >= batch_size) { batches.push_back(std::move(current_batch)); @@ -1255,10 +1333,12 @@ arrow::Result>> populate_rows( // Map schemas to their join types std::unordered_map schema_join_types; - schema_join_types[query_state.from.value()] = - TraverseType::Inner; // FROM is always inner by default + schema_join_types.reserve(traverses.size()); if (traverses.empty()) { schema_join_types[query_state.from.value()] = TraverseType::Left; + } else { + // FROM is always inner by default + schema_join_types[query_state.from.value()] = TraverseType::Inner; } // Only apply LEFT JOIN to FROM schema if the FROM schema is directly involved @@ -1618,6 +1698,30 @@ arrow::Result> inline_where( return curr_table; } +template +void dense_intersection(const SetA& a, const SetB& b, OutSet& out) { + const auto& small = a.size() < b.size() ? a : b; + const auto& large = a.size() < b.size() ? b : a; + out.clear(); + out.reserve(std::min(a.size(), b.size())); + for (const auto& x : small) { + if (large.contains(x)) { + out.insert(x); + } + } +} + +template +void dense_difference(const SetA& a, const SetB& b, OutSet& out) { + out.clear(); + out.reserve(a.size()); + for (const auto& x : a) { + if (!b.contains(x)) { + out.insert(x); + } + } +} + arrow::Result> Database::query( const Query& query) const { QueryState query_state; @@ -1630,6 +1734,9 @@ arrow::Result> Database::query( { log_debug("processing 'from' {}", query.from().toString()); + // Precompute tag for FROM schema (alias-based hash) + query_state.from = query.from(); + query_state.from.set_tag(QueryState::compute_alias_tag(query_state.from)); ARROW_ASSIGN_OR_RAISE(auto source_schema, query_state.resolve_schema(query.from())); if (!this->schema_registry_->exists(source_schema)) { @@ -1643,6 +1750,11 @@ arrow::Result> Database::query( } ARROW_ASSIGN_OR_RAISE(auto source_table, this->get_table(source_schema)); ARROW_RETURN_NOT_OK(query_state.update_table(source_table, query.from())); + if (auto res = query_state.compute_fully_qualified_names(query.from(), + source_schema); + !res.ok()) { + return res.status(); + } } { @@ -1659,6 +1771,9 @@ arrow::Result> Database::query( } log_debug("Processing {} query clauses", query.clauses().size()); + + // Precompute 16-bit alias-based tags for all SchemaRefs + // Also precompute fully-qualified field names per alias used in the query std::vector> post_where; for (auto i = 0; i < query.clauses().size(); ++i) { auto clause = query.clauses()[i]; @@ -1703,6 +1818,27 @@ arrow::Result> Database::query( } case Clause::Type::TRAVERSE: { auto traverse = std::static_pointer_cast(clause); + // Precompute and set tags for source/target refs (alias-based, + // deterministic) + traverse->mutable_source().set_tag( + QueryState::compute_alias_tag(traverse->source())); + traverse->mutable_target().set_tag( + QueryState::compute_alias_tag(traverse->target())); + + ARROW_ASSIGN_OR_RAISE(auto source_schema, + query_state.resolve_schema(traverse->source())); + ARROW_ASSIGN_OR_RAISE(auto target_schema, + query_state.resolve_schema(traverse->target())); + if (auto res = query_state.compute_fully_qualified_names( + traverse->source(), source_schema); + !res.ok()) { + return res.status(); + } + if (auto res = query_state.compute_fully_qualified_names( + traverse->target(), target_schema); + !res.ok()) { + return res.status(); + } std::vector> where_clauses; if (query.inline_where()) { where_clauses = get_where_to_inline(traverse->target().value(), i + 1, @@ -1710,11 +1846,6 @@ arrow::Result> Database::query( result->mutable_execution_stats().num_where_clauses_inlined += where_clauses.size(); } - - ARROW_ASSIGN_OR_RAISE(auto source_schema, - query_state.resolve_schema(traverse->source())); - ARROW_ASSIGN_OR_RAISE(auto target_schema, - query_state.resolve_schema(traverse->target())); query_state.traversals.push_back(*traverse); if (Logger::get_instance().get_level() == LogLevel::DEBUG) { log_debug("Processing TRAVERSE {}-({})->{}", @@ -1737,17 +1868,19 @@ arrow::Result> Database::query( log_debug("Traversing from {} source nodes", query_state.ids[source.value()].size()); } - std::set matched_source_ids; - std::set matched_target_ids; - std::set unmatched_source_ids; + llvm::DenseSet matched_source_ids; + llvm::DenseSet matched_target_ids; + llvm::DenseSet unmatched_source_ids; for (auto source_id : query_state.ids[source.value()]) { auto outgoing_edges = edge_store_->get_outgoing_edges(source_id, traverse->edge_type()) .ValueOrDie(); // todo check result - log_debug("Node {} has {} outgoing edges of type '{}'", source_id, - outgoing_edges.size(), traverse->edge_type()); + if (Logger::get_instance().get_level() == LogLevel::DEBUG) { + log_debug("Node {} has {} outgoing edges of type '{}'", source_id, + outgoing_edges.size(), traverse->edge_type()); + } - std::vector> target_nodes; + bool source_had_match = false; for (auto edge : outgoing_edges) { auto target_id = edge->get_target_id(); if (query_state.ids.contains(traverse->target().value()) && @@ -1772,10 +1905,24 @@ arrow::Result> Database::query( } } if (passes_all_filters) { - log_debug("found edge {}:{} -[{}]-> {}:{}", source.value(), - source_id, traverse->edge_type(), - traverse->target().value(), target_node->id); - target_nodes.push_back(target_node); + if (Logger::get_instance().get_level() == LogLevel::DEBUG) { + log_debug("found edge {}:{} -[{}]-> {}:{}", source.value(), + source_id, traverse->edge_type(), + traverse->target().value(), target_node->id); + } + // record match immediately to avoid extra containers/copies + if (!source_had_match) { + matched_source_ids.insert(source_id); + source_had_match = true; + } + matched_target_ids.insert(target_node->id); + auto conn = + GraphConnection{traverse->source(), source_id, + traverse->edge_type(), "", + traverse->target(), target_node->id}; + query_state.connections[traverse->source().value()][source_id] + .push_back(conn); + query_state.incoming[target_node->id].push_back(conn); } } } else { @@ -1784,20 +1931,10 @@ arrow::Result> Database::query( node_result.status().ToString()); } } - if (!target_nodes.empty()) { - matched_source_ids.insert(source_id); - for (const auto& target_node : target_nodes) { - matched_target_ids.insert(target_node->id); - auto conn = GraphConnection{ - traverse->source(), source_id, traverse->edge_type(), "", - traverse->target(), target_node->id}; - - query_state.connections[traverse->source().value()][source_id] - .push_back(conn); - query_state.incoming[target_node->id].push_back(conn); + if (!source_had_match) { + if (Logger::get_instance().get_level() == LogLevel::DEBUG) { + log_debug("no edge found from {}:{}", source.value(), source_id); } - } else { - log_debug("no edge found from {}:{}", source.value(), source_id); unmatched_source_ids.insert(source_id); } } @@ -1828,19 +1965,20 @@ arrow::Result> Database::query( // after b:0 -> c:1 we need to intersect it with ids[c] = // intersect({0}, {1}) => {} auto target_ids = query_state.get_ids(traverse->target()); - std::set intersect_ids; + llvm::DenseSet intersect_ids; if (target_ids.empty()) { intersect_ids = matched_target_ids; } else { - std::ranges::set_intersection( - target_ids, matched_target_ids, - std::inserter(intersect_ids, intersect_ids.begin())); + dense_intersection(target_ids, matched_target_ids, intersect_ids); } query_state.ids[traverse->target().value()] = intersect_ids; - log_debug("intersect_ids count: {}", intersect_ids.size()); - log_debug("{} intersect_ids: {}", traverse->target().toString(), - join_container(intersect_ids)); + if (Logger::get_instance().get_level() == LogLevel::DEBUG) { + log_debug("intersect_ids count: {}", intersect_ids.size()); + log_debug("{} intersect_ids: {}", traverse->target().toString(), + join_container(intersect_ids)); + } + } else if (traverse->traverse_type() == TraverseType::Left) { query_state.ids[traverse->target().value()].insert( matched_target_ids.begin(), matched_target_ids.end()); @@ -1848,14 +1986,15 @@ arrow::Result> Database::query( auto target_ids = get_ids_from_table(get_table(target_schema).ValueOrDie()) .ValueOrDie(); - log_debug( - "traverse type: '{}', matched_source_ids=[{}], " - "target_ids=[{}]", - traverse->target().value(), join_container(matched_source_ids), - join_container(target_ids)); - std::set result; - std::ranges::set_difference(target_ids, matched_source_ids, - std::inserter(result, result.begin())); + if (Logger::get_instance().get_level() == LogLevel::DEBUG) { + log_debug( + "traverse type: '{}', matched_source_ids=[{}], " + "target_ids=[{}]", + traverse->target().value(), join_container(matched_source_ids), + join_container(target_ids)); + } + llvm::DenseSet result; + dense_difference(target_ids, matched_source_ids, result); query_state.ids[traverse->target().value()] = result; }