diff --git a/CMakeLists.txt b/CMakeLists.txt index 14c02dd..1b1e825 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -4,6 +4,17 @@ project(tundradb) set(CMAKE_CXX_STANDARD 23) set(CMAKE_CXX_STANDARD_REQUIRED ON) +# Configure compile-time logging levels +if(CMAKE_BUILD_TYPE STREQUAL "Debug") + add_compile_definitions(TUNDRA_LOG_LEVEL_DEBUG) +elseif(CMAKE_BUILD_TYPE STREQUAL "Release") + add_compile_definitions(TUNDRA_LOG_LEVEL_INFO) +elseif(CMAKE_BUILD_TYPE STREQUAL "RelWithDebInfo") + add_compile_definitions(TUNDRA_LOG_LEVEL_INFO) +else() + add_compile_definitions(TUNDRA_LOG_LEVEL_DEBUG) +endif() + # Enable Address Sanitizer #set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fsanitize=address -fno-omit-frame-pointer") #set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -fsanitize=address -fno-omit-frame-pointer") @@ -269,6 +280,20 @@ target_link_libraries(tundra_shell LLVMCore ) +# Benchmark runner executable for loading CSVs and running queries +add_executable(tundra_bench_runner bench/tundra_runner.cpp) +target_link_libraries(tundra_bench_runner + PRIVATE + core + Arrow::arrow_shared + ${ARROW_DATASET_LIB} + Parquet::parquet_shared + ${UUID_LIBRARY} + ${ANTLR4_RUNTIME} + LLVMSupport + LLVMCore +) + # ANTLR Integration # Find Java for running ANTLR generator find_package(Java REQUIRED) diff --git a/include/concurrency.hpp b/include/concurrency.hpp index 2cfc97d..8d51c9b 100644 --- a/include/concurrency.hpp +++ b/include/concurrency.hpp @@ -94,6 +94,25 @@ class ConcurrentSet { return snapshot; } + class LockedView { + public: + using iterator = + typename tbb::concurrent_hash_map::const_iterator; + + LockedView(const tbb::concurrent_hash_map& data) + : data_(data) {} + + iterator begin() const { return data_.begin(); } + iterator end() const { return data_.end(); } + + size_t size() const { return data_.size(); } + + private: + const tbb::concurrent_hash_map& data_; + }; + + LockedView get_all_unsafe() const { return LockedView(data_); } + /** * @brief Clear all elements from the set * diff --git a/include/core.hpp b/include/core.hpp index c525032..0fbd4db 100644 --- a/include/core.hpp +++ b/include/core.hpp @@ -765,28 +765,31 @@ class Database { arrow::Result> get_table( const std::string &schema_name, size_t chunk_size = 10000) const { - ARROW_ASSIGN_OR_RAISE(auto schema, - schema_registry_->get_arrow(schema_name)); - - ARROW_ASSIGN_OR_RAISE(auto all_nodes, - shard_manager_->get_nodes(schema_name)); - - if (all_nodes.empty()) { - std::vector> empty_columns; - empty_columns.reserve(schema->num_fields()); - for (int i = 0; i < schema->num_fields(); i++) { - empty_columns.push_back(std::make_shared( - std::vector>{})); - } - return arrow::Table::Make(schema, empty_columns); - } - - std::ranges::sort(all_nodes, [](const std::shared_ptr &a, - const std::shared_ptr &b) { - return a->id < b->id; - }); - - return create_table(schema, all_nodes, chunk_size); + auto shard = shard_manager_->get_shard(schema_name, 0).ValueOrDie(); + return shard->get_table(); + + // ARROW_ASSIGN_OR_RAISE(auto schema, + // schema_registry_->get_arrow(schema_name)); + // + // ARROW_ASSIGN_OR_RAISE(auto all_nodes, + // shard_manager_->get_nodes(schema_name)); + // + // if (all_nodes.empty()) { + // std::vector> empty_columns; + // empty_columns.reserve(schema->num_fields()); + // for (int i = 0; i < schema->num_fields(); i++) { + // empty_columns.push_back(std::make_shared( + // std::vector>{})); + // } + // return arrow::Table::Make(schema, empty_columns); + // } + // + // std::ranges::sort(all_nodes, [](const std::shared_ptr &a, + // const std::shared_ptr &b) { + // return a->id < b->id; + // }); + // + // return create_table(schema, all_nodes, chunk_size); } arrow::Result get_shard_count(const std::string &schema_name) const { diff --git a/include/edge_store.hpp b/include/edge_store.hpp index 78f0dd1..6c08abd 100644 --- a/include/edge_store.hpp +++ b/include/edge_store.hpp @@ -14,6 +14,98 @@ namespace tundradb { +// Forward declaration for EdgeView +class EdgeStore; + +/** + * @brief A view over edges that avoids copying shared_ptr objects + * + * This class provides iteration over edges without materializing them into + * a vector, reducing memory allocations and improving performance. + */ +class EdgeView { + public: + class iterator { + public: + using iterator_category = std::forward_iterator_tag; + using value_type = std::shared_ptr; + using difference_type = std::ptrdiff_t; + using pointer = const value_type *; + using reference = const value_type &; + + iterator(const EdgeStore *store, + ConcurrentSet::LockedView::iterator edge_ids_it, + ConcurrentSet::LockedView::iterator edge_ids_end, + const std::string &type_filter) + : store_(store), + edge_ids_it_(edge_ids_it), + edge_ids_end_(edge_ids_end), + type_filter_(type_filter) { + advance_to_valid(); + } + + iterator &operator++() { + ++edge_ids_it_; + advance_to_valid(); + return *this; + } + + iterator operator++(int) { + iterator tmp = *this; + ++(*this); + return tmp; + } + + reference operator*() const { return current_edge_; } + pointer operator->() const { return ¤t_edge_; } + + bool operator==(const iterator &other) const { + return edge_ids_it_ == other.edge_ids_it_; + } + + bool operator!=(const iterator &other) const { return !(*this == other); } + + private: + void advance_to_valid(); + + const EdgeStore *store_; + ConcurrentSet::LockedView::iterator edge_ids_it_; + ConcurrentSet::LockedView::iterator edge_ids_end_; + std::string type_filter_; + std::shared_ptr current_edge_; + }; + + EdgeView(const EdgeStore *store, const ConcurrentSet &edge_ids, + const std::string &type_filter = "") + : store_(store), + edge_ids_view_(edge_ids.get_all_unsafe()), + type_filter_(type_filter) {} + + iterator begin() const { + return iterator(store_, edge_ids_view_.begin(), edge_ids_view_.end(), + type_filter_); + } + + iterator end() const { + return iterator(store_, edge_ids_view_.end(), edge_ids_view_.end(), + type_filter_); + } + + // Convenience method to count matching edges without materializing them + size_t count() const { + size_t result = 0; + for (auto it = begin(); it != end(); ++it) { + ++result; + } + return result; + } + + private: + const EdgeStore *store_; + ConcurrentSet::LockedView edge_ids_view_; + std::string type_filter_; +}; + // todo rename to EdgeManager class EdgeStore { struct TableCache; @@ -94,6 +186,13 @@ class EdgeStore { arrow::Result>> get_incoming_edges( int64_t id, const std::string &type = "") const; + // New view-based methods that avoid copying + arrow::Result get_outgoing_edges_view( + int64_t id, const std::string &type = "") const; + + arrow::Result get_incoming_edges_view( + int64_t id, const std::string &type = "") const; + arrow::Result>> get_by_type( const std::string &type) const; @@ -114,6 +213,9 @@ class EdgeStore { void set_id_seq(const int64_t v) { edge_id_counter_.store(v, std::memory_order_relaxed); } + + // Friend class for EdgeView iterator access + friend class EdgeView::iterator; }; } // namespace tundradb diff --git a/include/logger.hpp b/include/logger.hpp index 48f8266..81a0c6a 100644 --- a/include/logger.hpp +++ b/include/logger.hpp @@ -278,6 +278,68 @@ class ContextLogger { std::string prefix_; }; +// ============================================================================ +// COMPILE-TIME LOGGING OPTIMIZATIONS +// ============================================================================ + +// Compile-time log level configuration +#ifdef TUNDRA_LOG_LEVEL_DEBUG +constexpr LogLevel COMPILE_TIME_LOG_LEVEL = LogLevel::DEBUG; +#elif defined(TUNDRA_LOG_LEVEL_INFO) +constexpr LogLevel COMPILE_TIME_LOG_LEVEL = LogLevel::INFO; +#elif defined(TUNDRA_LOG_LEVEL_WARN) +constexpr LogLevel COMPILE_TIME_LOG_LEVEL = LogLevel::WARN; +#elif defined(TUNDRA_LOG_LEVEL_ERROR) +constexpr LogLevel COMPILE_TIME_LOG_LEVEL = LogLevel::ERROR; +#else +// Default to INFO in release builds, DEBUG in debug builds +#ifdef NDEBUG +constexpr LogLevel COMPILE_TIME_LOG_LEVEL = LogLevel::INFO; +#else +constexpr LogLevel COMPILE_TIME_LOG_LEVEL = LogLevel::DEBUG; +#endif +#endif + +// Compile-time log level checks - completely eliminated in release builds +constexpr bool is_debug_enabled() { + return COMPILE_TIME_LOG_LEVEL <= LogLevel::DEBUG; +} + +constexpr bool is_info_enabled() { + return COMPILE_TIME_LOG_LEVEL <= LogLevel::INFO; +} + +constexpr bool is_warn_enabled() { + return COMPILE_TIME_LOG_LEVEL <= LogLevel::WARN; +} + +// Fast logging macros that compile to nothing when disabled +#define LOG_DEBUG_FAST(msg, ...) \ + do { \ + if constexpr (is_debug_enabled()) { \ + log_debug(msg, ##__VA_ARGS__); \ + } \ + } while (0) + +#define LOG_INFO_FAST(msg, ...) \ + do { \ + if constexpr (is_info_enabled()) { \ + log_info(msg, ##__VA_ARGS__); \ + } \ + } while (0) + +#define LOG_WARN_FAST(msg, ...) \ + do { \ + if constexpr (is_warn_enabled()) { \ + log_warn(msg, ##__VA_ARGS__); \ + } \ + } while (0) + +// Conditional code blocks - completely eliminated when disabled +#define IF_DEBUG_ENABLED if constexpr (is_debug_enabled()) + +#define IF_INFO_ENABLED if constexpr (is_info_enabled()) + } // namespace tundradb #endif // LOGGER_HPP \ No newline at end of file diff --git a/include/node.hpp b/include/node.hpp index 8fe584c..ccdcec7 100644 --- a/include/node.hpp +++ b/include/node.hpp @@ -3,10 +3,12 @@ #include +#include #include #include #include +#include "llvm/ADT/DenseMap.h" #include "node_arena.hpp" #include "schema.hpp" #include "types.hpp" @@ -20,7 +22,7 @@ enum UpdateType { class Node { private: - std::unordered_map data_; + llvm::StringMap data_; std::unique_ptr handle_; std::shared_ptr arena_; std::shared_ptr schema_; @@ -31,7 +33,7 @@ class Node { std::string schema_name; explicit Node(const int64_t id, std::string schema_name, - std::unordered_map initial_data, + llvm::StringMap initial_data, std::unique_ptr handle = nullptr, std::shared_ptr arena = nullptr, std::shared_ptr schema = nullptr, @@ -56,21 +58,32 @@ class Node { data_[field_name] = std::move(value); } - arrow::Result get_value_ptr( - const std::string &field_name) const { + ValueRef get_value_ref(const std::string &field_name) const { if (arena_ != nullptr) { // if (schema_->get_field(field_name) == nullptr) { // // Logger::get_instance().debug("Field not found"); // return arrow::Status::KeyError("Field not found: ", field_name); // } - return arena_->get_field_value_ptr(*handle_, layout_, field_name); + ValueType out_type; + const char *ptr = + arena_->get_field_value_ptr(*handle_, layout_, field_name, &out_type); + return {ptr, out_type}; } + // const char * get_value_ptr(const std::string &field_name) const { + // return get_value_ptr(field_name, nullptr); + // } const auto it = data_.find(field_name); - if (it == data_.end()) { - return arrow::Status::KeyError("Field not found: ", field_name); - } - return arrow::Status::NotImplemented(""); + // if (it == data_.end()) { + // return arrow::Status::KeyError("Field not found: ", field_name); + // } + // Logger::get_instance().debug("get value ptr {}", field_name); + // const char * p = it->second.data_ptr(); + // auto v = Value::read_value_from_memory(p, it->second.type()); + // Logger::get_instance().debug("get value ptr {}={}", field_name, + // v.to_string() ); + return it->second.as_ref(); + // return arrow::Status::NotImplemented(""); } arrow::Result get_value(const std::string &field_name) const { @@ -132,10 +145,20 @@ class NodeManager { use_node_arena_ = use_node_arena; schema_registry_ = std::move(schema_registry); layout_registry_ = std::make_shared(); - node_arena_ = node_arena_factory::create_free_list_arena(layout_registry_); + if (use_node_arena) { + node_arena_ = + node_arena_factory::create_free_list_arena(layout_registry_); + } } - ~NodeManager() { node_arena_->clear(); } + ~NodeManager() { + if (node_arena_) { + node_arena_->clear(); + } + if (string_arena_) { + string_arena_->clear(); + } + } arrow::Result> get_node(const int64_t id) { return nodes[id]; @@ -216,7 +239,7 @@ class NodeManager { nodes[id] = node; return node; } else { - std::unordered_map normalized_data; + llvm::StringMap normalized_data; normalized_data["id"] = Value{id}; for (const auto &field : schema_->fields()) { @@ -225,7 +248,18 @@ class NodeManager { normalized_data[field->name()] = Value(); } else { const auto value = data.find(field->name())->second; - normalized_data[field->name()] = value; + if (is_string_type(value.type())) { + auto str_ref = + string_arena_->store_string(value.get()); + normalized_data[field->name()] = Value{str_ref}; + // Logger::get_instance().debug("string arena: {}", + // normalized_data[field->name()].to_string()); + // Logger::get_instance().debug("string arena2: {}", + // Value::read_value_from_memory(normalized_data[field->name()].data_ptr(), + // ValueType::STRING).to_string() ); + } else { + normalized_data[field->name()] = value; + } } } @@ -248,6 +282,7 @@ class NodeManager { std::shared_ptr schema_registry_; std::shared_ptr layout_registry_; std::shared_ptr node_arena_; + std::shared_ptr string_arena_ = std::make_shared(); bool validation_enabled_; bool use_node_arena_; @@ -258,7 +293,7 @@ class NodeManager { // cache layout std::shared_ptr layout_; - const std::unordered_map EMPTY_DATA{}; + const llvm::StringMap EMPTY_DATA{}; // since node creation is single threaded, we can cache the layout // w/o synchronization @@ -279,7 +314,9 @@ class NodeManager { if (schema_name_ == schema_name) return; schema_name_ = schema_name; schema_ = schema_registry_->get(schema_name).ValueOrDie(); - layout_ = create_or_get_layout(schema_name); + if (use_node_arena_) { + layout_ = create_or_get_layout(schema_name); + } } }; diff --git a/include/node_arena.hpp b/include/node_arena.hpp index b1a5fa0..13d4c11 100644 --- a/include/node_arena.hpp +++ b/include/node_arena.hpp @@ -133,7 +133,8 @@ class NodeArena { */ const char* get_field_value_ptr(const NodeHandle& handle, const std::shared_ptr& layout, - const std::string& field_name) const { + const std::string& field_name, + ValueType* out_type) const { // Logger::get_instance().debug("get_field_value: {}.{}", schema_name, // field_name); if (handle.is_null()) { @@ -142,7 +143,7 @@ class NodeArena { } return layout->get_field_value_ptr(static_cast(handle.ptr), - field_name); + field_name, out_type); } Value get_field_value(const NodeHandle& handle, diff --git a/include/query.hpp b/include/query.hpp index e9dbd2d..2d07ab7 100644 --- a/include/query.hpp +++ b/include/query.hpp @@ -184,9 +184,13 @@ class ComparisonExpr : public Clause, public WhereExpr { CompareOp op_; Value value_; bool inlined_ = false; + std::string field_name; - static arrow::Result compare_values(const Value& value, CompareOp op, - const Value& where_value) { + static arrow::Result compare_values(const std::string& field_name, + const char* value_ptr, CompareOp op, + const Value& where_value, + ValueType value_type) { + /* if (value.type() == ValueType::NA || where_value.type() == ValueType::NA) { switch (op) { case CompareOp::Eq: @@ -228,35 +232,38 @@ class ComparisonExpr : public Clause, public WhereExpr { return arrow::Status::Invalid("Type mismatch: field is ", value.type(), " but WHERE value is ", where_value.type()); } +*/ - switch (value.type()) { + switch (value_type) { case ValueType::INT32: { - int32_t field_val = value.get(); + int32_t field_val = *reinterpret_cast(value_ptr); int32_t where_val = where_value.get(); return apply_comparison(field_val, op, where_val); } case ValueType::INT64: { - int64_t field_val = value.get(); + int64_t field_val = *reinterpret_cast(value_ptr); int64_t where_val = where_value.get(); return apply_comparison(field_val, op, where_val); } case ValueType::FLOAT: { - float field_val = value.get(); + float field_val = *reinterpret_cast(value_ptr); float where_val = where_value.get(); return apply_comparison(field_val, op, where_val); } case ValueType::DOUBLE: { - double field_val = value.get(); + double field_val = *reinterpret_cast(value_ptr); double where_val = where_value.get(); return apply_comparison(field_val, op, where_val); } case ValueType::STRING: { - const std::string& field_val = value.as_string(); + auto str_ref = *reinterpret_cast(value_ptr); + const std::string& field_val = + std::string(str_ref.data, str_ref.length); const std::string& where_val = where_value.as_string(); return apply_comparison(field_val, op, where_val); } case ValueType::BOOL: { - bool field_val = value.get(); + bool field_val = *reinterpret_cast(value_ptr); bool where_val = where_value.get(); return apply_comparison(field_val, op, where_val); } @@ -264,7 +271,7 @@ class ComparisonExpr : public Clause, public WhereExpr { return arrow::Status::Invalid("Unexpected null value in comparison"); default: return arrow::Status::NotImplemented( - "Unsupported value type for comparison: ", value.type()); + "Unsupported value type for comparison: ", value_type); } } @@ -308,7 +315,13 @@ class ComparisonExpr : public Clause, public WhereExpr { public: ComparisonExpr(std::string field, CompareOp op, Value value) - : field_(std::move(field)), op_(op), value_(std::move(value)) {} + : field_(std::move(field)), op_(op), value_(std::move(value)) { + if (const size_t dot_pos = field_.find('.'); dot_pos != std::string::npos) { + field_name = field_.substr(dot_pos + 1); + } else { + field_name = field_; + } + } [[nodiscard]] const std::string& field() const { return field_; } [[nodiscard]] CompareOp op() const { return op_; } @@ -397,34 +410,14 @@ class ComparisonExpr : public Clause, public WhereExpr { if (!node) { return arrow::Status::Invalid("Node is null"); } - - // parse field name to extract variable and field parts - // expected format: "variable.field" (e.g., "user.age", "company.name") - const size_t dot_pos = field_.find('.'); - std::string field_name; - - if (dot_pos != std::string::npos) { - field_name = field_.substr(dot_pos + 1); - } else { - field_name = field_; - } - - ARROW_ASSIGN_OR_RAISE(auto field_value, node->get_value(field_name)); - return compare_values(field_value, op_, value_); + auto value_ref = node->get_value_ref(field_name); + return compare_values(field_,value_ref.data, op_, value_, value_ref.type); } [[nodiscard]] arrow::compute::Expression to_arrow_expression( bool strip_var) const override { - std::string field_name = field_; - if (strip_var) { - if (const size_t dot_pos = field_.find('.'); - dot_pos != std::string::npos) { - field_name = field_.substr(dot_pos + 1); - } else { - field_name = field_; - } - } - const auto field_expr = arrow::compute::field_ref(field_name); + const auto field_expr = + arrow::compute::field_ref(strip_var ? field_name : field_); const auto value_expr = value_to_expression(value_); return apply_comparison_op(field_expr, value_expr, op_); diff --git a/include/schema_layout.hpp b/include/schema_layout.hpp index e40d085..3818feb 100644 --- a/include/schema_layout.hpp +++ b/include/schema_layout.hpp @@ -143,9 +143,13 @@ class SchemaLayout { } const char* get_field_value_ptr(const char* node_data, - const std::string& field_name) const { + const std::string& field_name, + ValueType* out_type) const { const size_t field_index = get_field_index(field_name); const FieldLayout& field = fields_[field_index]; + if (out_type) { + *out_type = field.type; + } return get_field_value_ptr(node_data, field); } @@ -239,6 +243,11 @@ class SchemaLayout { size_t get_field_index(const std::string& name) const { const auto it = field_index_.find(name); return it != field_index_.end() ? it->second : -1; + // if (name[0] == 'i' && name.size() == 2) return 0; + // if (name[0] == 'n' || name[0] == 'i') return 1; + // if (name[0] == 'a') return 2; + // if (name[0] == 'c') return 3; + return -1; } const FieldLayout* get_field_layout(const std::string& name) const { diff --git a/include/types.hpp b/include/types.hpp index e1be9b4..419a8fb 100644 --- a/include/types.hpp +++ b/include/types.hpp @@ -53,6 +53,205 @@ enum class ValueType { BOOL }; +inline std::string to_string(const ValueType type) { + switch (type) { + case ValueType::NA: + return "Null"; + case ValueType::INT32: + return "Int32"; + case ValueType::INT64: + return "Int64"; + case ValueType::DOUBLE: + return "Double"; + case ValueType::STRING: + return "String"; + case ValueType::FIXED_STRING16: + return "FixedString16"; + case ValueType::FIXED_STRING32: + return "FixedString32"; + case ValueType::FIXED_STRING64: + return "FixedString64"; + case ValueType::BOOL: + return "Bool"; + default: + return "Unknown"; + } +} + +struct ValueRef { + const char* data; + ValueType type; + + // Default constructor + ValueRef() : data(nullptr), type(ValueType::NA) {} + + // Constructor + ValueRef(const char* ptr, ValueType type) : data(ptr), type(type) {} + + // Copy constructor (allowed) + // ValueRef(const ValueRef&) = default; + // + // // Move constructor + // ValueRef(ValueRef&&) = default; + // + // // Copy assignment is deleted due to const member + // ValueRef& operator=(const ValueRef&) = delete; + // + // // Move assignment is also deleted due to const member + // ValueRef& operator=(ValueRef&&) = delete; + + int32_t as_int32() const { return *reinterpret_cast(data); } + + int64_t as_int64() const { return *reinterpret_cast(data); } + + double as_double() const { return *reinterpret_cast(data); } + + float as_float() const { return *reinterpret_cast(data); } + + bool as_bool() const { return *reinterpret_cast(data); } + + std::string as_string() const { return std::string(data); } + + const StringRef& as_string_ref() const { + return *reinterpret_cast(data); + } + + arrow::Result> as_scalar() const { + switch (type) { + case ValueType::INT32: + return arrow::MakeScalar(as_int32()); + case ValueType::INT64: + return arrow::MakeScalar(as_int64()); + case ValueType::DOUBLE: + return arrow::MakeScalar(as_double()); + case ValueType::STRING: + return arrow::MakeScalar(as_string_ref().to_string()); + case ValueType::BOOL: + return arrow::MakeScalar(as_bool()); + case ValueType::NA: + return arrow::MakeNullScalar(arrow::null()); + default: + return arrow::Status::NotImplemented( + "Unsupported Value type for Arrow scalar conversion: ", + to_string(type)); + } + } + + // Equality comparison + bool operator==(const ValueRef& other) const { + if (type != other.type) { + return false; + } + + // Both null + if (data == nullptr && other.data == nullptr) { + return true; + } + + // One null, one not null + if (data == nullptr || other.data == nullptr) { + return false; + } + + // Compare values based on type + switch (type) { + case ValueType::NA: + return true; // Both are NA + + case ValueType::INT32: + return *reinterpret_cast(data) == + *reinterpret_cast(other.data); + + case ValueType::INT64: + return *reinterpret_cast(data) == + *reinterpret_cast(other.data); + + case ValueType::FLOAT: + return *reinterpret_cast(data) == + *reinterpret_cast(other.data); + + case ValueType::DOUBLE: + return *reinterpret_cast(data) == + *reinterpret_cast(other.data); + + case ValueType::BOOL: + return *reinterpret_cast(data) == + *reinterpret_cast(other.data); + + case ValueType::STRING: { + const StringRef& str1 = *reinterpret_cast(data); + const StringRef& str2 = *reinterpret_cast(other.data); + + // Compare string lengths first + if (str1.length != str2.length) { + return false; + } + + // Both null strings + if (str1.is_null() && str2.is_null()) { + return true; + } + + // One null, one not + if (str1.is_null() || str2.is_null()) { + return false; + } + + // Compare string content + return std::memcmp(str1.data, str2.data, str1.length) == 0; + } + + default: + return false; // Unknown type + } + } + + bool operator!=(const ValueRef& other) const { return !(*this == other); } + + // Standalone equals function (if you prefer functional style) + bool equals(const ValueRef& other) const { return *this == other; } + + // ToString method for debugging and display + std::string ToString() const { + if (data == nullptr) { + return "NULL"; + } + + switch (type) { + case ValueType::NA: + return "NULL"; + + case ValueType::INT32: + return std::to_string(as_int32()); + + case ValueType::INT64: + return std::to_string(as_int64()); + + case ValueType::FLOAT: + return std::to_string(as_float()); + + case ValueType::DOUBLE: + return std::to_string(as_double()); + + case ValueType::BOOL: + return as_bool() ? "true" : "false"; + + case ValueType::FIXED_STRING16: + case ValueType::FIXED_STRING32: + case ValueType::FIXED_STRING64: + case ValueType::STRING: { + const StringRef& str_ref = as_string_ref(); + if (str_ref.is_null()) { + return "NULL"; + } + return "\"" + str_ref.to_string() + "\""; + } + default: + return "UNKNOWN_TYPE"; + } + } +}; + /** * Get the maximum size for fixed-size string types */ @@ -117,31 +316,6 @@ static size_t get_type_alignment(const ValueType type) { } } -inline std::string to_string(const ValueType type) { - switch (type) { - case ValueType::NA: - return "Null"; - case ValueType::INT32: - return "Int32"; - case ValueType::INT64: - return "Int64"; - case ValueType::DOUBLE: - return "Double"; - case ValueType::STRING: - return "String"; - case ValueType::FIXED_STRING16: - return "FixedString16"; - case ValueType::FIXED_STRING32: - return "FixedString32"; - case ValueType::FIXED_STRING64: - return "FixedString64"; - case ValueType::BOOL: - return "Bool"; - default: - return "Unknown"; - } -} - class Value { public: Value() : type_(ValueType::NA), data_(std::monostate{}) {} @@ -168,6 +342,32 @@ class Value { ValueType type() const { return type_; } + const char* data_ptr() const { + switch (type_) { + case ValueType::INT32: + return reinterpret_cast(&std::get(data_)); + case ValueType::INT64: + return reinterpret_cast(&std::get(data_)); + case ValueType::FLOAT: + return reinterpret_cast(&std::get(data_)); + case ValueType::DOUBLE: + return reinterpret_cast(&std::get(data_)); + case ValueType::BOOL: + return reinterpret_cast(&std::get(data_)); + case ValueType::STRING: + case ValueType::FIXED_STRING16: + case ValueType::FIXED_STRING32: + case ValueType::FIXED_STRING64: { + return reinterpret_cast(&std::get(data_)); + } + case ValueType::NA: + default: + return nullptr; + } + } + + ValueRef as_ref() const { return {data_ptr(), type_}; } + template const T& get() const { return std::get(data_); diff --git a/include/utils.hpp b/include/utils.hpp index 3640a3d..2112a3d 100644 --- a/include/utils.hpp +++ b/include/utils.hpp @@ -159,58 +159,59 @@ static arrow::Result> create_table( for (const auto& node : nodes) { for (int i = 0; i < schema->num_fields(); i++) { const auto& field = schema->field(i); - auto field_result = node->get_value_ptr(field->name()); - if (!field_result.ok()) { + auto value_ptr = node->get_value_ref(field->name()).data; + + // if (!field_result.ok()) { + // ARROW_RETURN_NOT_OK(builders[i]->AppendNull()); + // } + // else { + // const auto value_ptr = field_result.ValueOrDie(); + if (value_ptr == nullptr) { ARROW_RETURN_NOT_OK(builders[i]->AppendNull()); } else { - const auto value_ptr = field_result.ValueOrDie(); - if (value_ptr == nullptr) { - ARROW_RETURN_NOT_OK(builders[i]->AppendNull()); - } else { - switch (field->type()->id()) { - case arrow::Type::INT32: { - ARROW_RETURN_NOT_OK( - dynamic_cast(builders[i].get()) - ->Append(*reinterpret_cast(value_ptr))); - break; - } - case arrow::Type::INT64: { - ARROW_RETURN_NOT_OK( - dynamic_cast(builders[i].get()) - ->Append(*reinterpret_cast(value_ptr))); - break; - } - case arrow::Type::FLOAT: { - // return Value{*reinterpret_cast(ptr)}; - ARROW_RETURN_NOT_OK( - dynamic_cast(builders[i].get()) - ->Append(*reinterpret_cast(value_ptr))); - break; - } - case arrow::Type::DOUBLE: { - ARROW_RETURN_NOT_OK( - dynamic_cast(builders[i].get()) - ->Append(*reinterpret_cast(value_ptr))); - break; - } - case arrow::Type::BOOL: { - ARROW_RETURN_NOT_OK( - dynamic_cast(builders[i].get()) - ->Append(*reinterpret_cast(value_ptr))); - break; - } - case arrow::Type::STRING: { - auto str_ref = *reinterpret_cast(value_ptr); - - ARROW_RETURN_NOT_OK( - dynamic_cast(builders[i].get()) - ->Append(str_ref.to_string())); - break; - } - default: - return arrow::Status::NotImplemented("Unsupported type: ", - field->type()->ToString()); + switch (field->type()->id()) { + case arrow::Type::INT32: { + ARROW_RETURN_NOT_OK( + dynamic_cast(builders[i].get()) + ->Append(*reinterpret_cast(value_ptr))); + break; } + case arrow::Type::INT64: { + ARROW_RETURN_NOT_OK( + dynamic_cast(builders[i].get()) + ->Append(*reinterpret_cast(value_ptr))); + break; + } + case arrow::Type::FLOAT: { + // return Value{*reinterpret_cast(ptr)}; + ARROW_RETURN_NOT_OK( + dynamic_cast(builders[i].get()) + ->Append(*reinterpret_cast(value_ptr))); + break; + } + case arrow::Type::DOUBLE: { + ARROW_RETURN_NOT_OK( + dynamic_cast(builders[i].get()) + ->Append(*reinterpret_cast(value_ptr))); + break; + } + case arrow::Type::BOOL: { + ARROW_RETURN_NOT_OK( + dynamic_cast(builders[i].get()) + ->Append(*reinterpret_cast(value_ptr))); + break; + } + case arrow::Type::STRING: { + auto str_ref = *reinterpret_cast(value_ptr); + + ARROW_RETURN_NOT_OK( + dynamic_cast(builders[i].get()) + ->Append(str_ref.to_string())); + break; + } + default: + return arrow::Status::NotImplemented("Unsupported type: ", + field->type()->ToString()); } } } diff --git a/src/arrow_utils.cpp b/src/arrow_utils.cpp index 97d92d7..46c7a77 100644 --- a/src/arrow_utils.cpp +++ b/src/arrow_utils.cpp @@ -35,8 +35,8 @@ bool initialize_arrow_compute() { } auto function_names = registry->GetFunctionNames(); - log_info("Arrow Compute initialized with {} functions", - function_names.size()); + log_debug("Arrow Compute initialized with {} functions", + function_names.size()); // Check for essential functions const bool has_equal = diff --git a/src/core.cpp b/src/core.cpp index 3d19ae2..18e7619 100644 --- a/src/core.cpp +++ b/src/core.cpp @@ -37,6 +37,76 @@ namespace tundradb { constexpr static uint64_t NODE_MASK = (1ULL << 48) - 1; +// Deterministic 16-bit tag from alias string (SchemaRef::value()). +// https://www.ietf.org/archive/id/draft-eastlake-fnv-21.html +static uint16_t compute_tag(const SchemaRef& ref) { + // FNV-1a 32-bit, then fold to 16 bits. + const std::string& s = ref.value(); + uint32_t h = 2166136261u; + for (unsigned char c : s) { + h ^= c; + h *= 16777619u; + } + h ^= (h >> 16); + return static_cast(h & 0xFFFFu); +} + +/** + * @brief Creates a packed 64-bit hash code for schema+node_id pairs + * + * This function combines a schema identifier and node ID into a single 64-bit + * value for efficient storage and comparison in hash sets/maps. This eliminates + * the need for expensive string concatenation and hashing that was previously + * used for tracking visited nodes during graph traversal. + * + * @param schema The schema reference containing a pre-computed 16-bit tag + * @param node_id The node identifier (48-bit max) + * + * @return A 64-bit packed value with layout: + * - Bits 63-48: Schema tag (16 bits) + * - Bits 47-0: Node ID (48 bits, masked) + * + * @details + * Memory Layout: + * ``` + * 63 56 48 40 32 24 16 8 0 + * | Schema | Node ID (48 bits) | + * | (16 bit) | | + * ``` + * + * Performance Benefits: + * - Replaces string operations: "User:12345" → single uint64_t + * - Enables fast integer comparison instead of string hashing + * - Reduces memory allocations (no temporary strings) + * - Compatible with llvm::DenseSet for O(1) lookups + * + * Constraints: + * - Node IDs must fit in 48 bits (max ~281 trillion nodes) + * - Schema tags must be unique within query context + * - NODE_MASK = (1ULL << 48) - 1 = 0x0000FFFFFFFFFFFF + * + * Example: + * ```cpp + * SchemaRef user_schema = SchemaRef::parse("u:User"); + * user_schema.set_tag(0x1234); // Pre-computed schema tag + * + * uint64_t packed = hash_code_(user_schema, 98765); + * // Result: 0x1234000000018149 (schema=0x1234, node=98765) + * + * // Usage in visited tracking: + * llvm::DenseSet visited; + * visited.insert(packed); // Fast O(1) integer hash + * ``` + * + * @see SchemaRef::tag() for schema tag computation + * @see NODE_MASK constant definition + */ +static uint64_t hash_code_(const SchemaRef& schema, int64_t node_id) { + const uint16_t schema_id16 = schema.tag(); + return (static_cast(schema_id16) << 48) | + (static_cast(node_id) & NODE_MASK); +} + // Utility function to join containers using C++23 ranges template std::string join_container(const Container& container, @@ -54,17 +124,17 @@ std::string join_container(const Container& container, arrow::compute::Expression value_to_expression(const Value& value) { switch (value.type()) { case ValueType::INT32: - return arrow::compute::literal(value.get()); + return arrow::compute::literal(value.as_int32()); case ValueType::INT64: - return arrow::compute::literal(value.get()); + return arrow::compute::literal(value.as_int64()); case ValueType::STRING: - return arrow::compute::literal(value.get()); + return arrow::compute::literal(value.to_string()); case ValueType::FLOAT: - return arrow::compute::literal(value.get()); + return arrow::compute::literal(value.as_float()); case ValueType::DOUBLE: - return arrow::compute::literal(value.get()); + return arrow::compute::literal(value.as_double()); case ValueType::BOOL: - return arrow::compute::literal(value.get()); + return arrow::compute::literal(value.as_bool()); case ValueType::NA: return arrow::compute::literal( arrow::Datum(arrow::MakeNullScalar(arrow::null()))); @@ -84,7 +154,7 @@ arrow::Result> value_to_arrow_scalar( case ValueType::DOUBLE: return arrow::MakeScalar(value.as_double()); case ValueType::STRING: - return arrow::MakeScalar(value.as_string()); + return arrow::MakeScalar(value.as_string_ref().to_string()); case ValueType::BOOL: return arrow::MakeScalar(value.as_bool()); case ValueType::NA: @@ -96,30 +166,6 @@ arrow::Result> value_to_arrow_scalar( } } -arrow::Result> value_ptr_to_arrow_scalar( - const char* ptr, const ValueType type) { - switch (type) { - case ValueType::INT32: - return arrow::MakeScalar(*reinterpret_cast(ptr)); - case ValueType::INT64: - return arrow::MakeScalar(*reinterpret_cast(ptr)); - case ValueType::DOUBLE: - return arrow::MakeScalar(*reinterpret_cast(ptr)); - case ValueType::STRING: { - auto str_ref = *reinterpret_cast(ptr); - return arrow::MakeScalar(str_ref.to_string()); - } - case ValueType::BOOL: - return arrow::MakeScalar(*reinterpret_cast(ptr)); - case ValueType::NA: - return arrow::MakeNullScalar(arrow::null()); - default: - return arrow::Status::NotImplemented( - "Unsupported Value type for Arrow scalar conversion: ", - tundradb::to_string(type)); - } -} - // Convert CompareOp to appropriate Arrow compute function arrow::compute::Expression apply_comparison_op( const arrow::compute::Expression& field, @@ -171,14 +217,19 @@ arrow::compute::Expression where_condition_to_expression( arrow::Result> create_table_from_nodes( const std::shared_ptr& schema, const std::vector>& nodes) { - log_debug("Creating table from {} nodes with schema '{}'", nodes.size(), - schema->ToString()); + IF_DEBUG_ENABLED { + log_debug("Creating table from {} nodes with schema '{}'", nodes.size(), + schema->ToString()); + } // Create builders for each field std::vector> builders; + builders.reserve(schema->fields().size()); for (const auto& field : schema->fields()) { - log_debug("Creating builder for field '{}' with type {}", field->name(), - field->type()->ToString()); + IF_DEBUG_ENABLED { + log_debug("Creating builder for field '{}' with type {}", field->name(), + field->type()->ToString()); + } auto builder_result = arrow::MakeBuilder(field->type()); if (!builder_result.ok()) { log_error("Failed to create builder for field '{}': {}", field->name(), @@ -189,45 +240,38 @@ arrow::Result> create_table_from_nodes( } // Populate builders with data from each node - log_debug("Adding data from {} nodes to builders", nodes.size()); + IF_DEBUG_ENABLED { + log_debug("Adding data from {} nodes to builders", nodes.size()); + } for (const auto& node : nodes) { // Add each field's value to the appropriate builder for (int i = 0; i < schema->num_fields(); i++) { - auto field = schema->field(i); + const auto& field = schema->field(i); const auto& field_name = field->name(); // Find the value in the node's data - auto res = node->get_value_ptr(field_name); - if (res.ok()) { - // Convert Value to Arrow scalar and append to builder - auto value = res.ValueOrDie(); - if (value) { - auto scalar_result = value_ptr_to_arrow_scalar( - value, arrow_type_to_value_type(field->type())); - if (!scalar_result.ok()) { - log_error("Failed to convert value to scalar for field '{}': {}", - field_name, scalar_result.status().ToString()); - return scalar_result.status(); - } + auto value_ref = node->get_value_ref(field_name); + + // Convert Value to Arrow scalar and append to builder + if (value_ref.data) { + auto scalar_result = value_ref.as_scalar(); + if (!scalar_result.ok()) { + log_error("Failed to convert value to scalar for field '{}': {}", + field_name, scalar_result.status().ToString()); + return scalar_result.status(); + } - auto scalar = scalar_result.ValueOrDie(); - auto status = builders[i]->AppendScalar(*scalar); - if (!status.ok()) { - log_error("Failed to append scalar for field '{}': {}", field_name, - status.ToString()); - return status; - } - } else { - log_debug("Null value for field '{}', appending null", field_name); - auto status = builders[i]->AppendNull(); - if (!status.ok()) { - log_error("Failed to append null for field '{}': {}", field_name, - status.ToString()); - return status; - } + auto scalar = scalar_result.ValueOrDie(); + auto status = builders[i]->AppendScalar(*scalar); + if (!status.ok()) { + log_error("Failed to append scalar for field '{}': {}", field_name, + status.ToString()); + return status; } } else { - log_debug("Field '{}' not found in node, appending null", field_name); + IF_DEBUG_ENABLED { + log_debug("Null value for field '{}', appending null", field_name); + } auto status = builders[i]->AppendNull(); if (!status.ok()) { log_error("Failed to append null for field '{}': {}", field_name, @@ -239,7 +283,7 @@ arrow::Result> create_table_from_nodes( } // Finish building arrays - log_debug("Finalizing arrays from builders"); + IF_DEBUG_ENABLED { log_debug("Finalizing arrays from builders"); } std::vector> arrays; arrays.reserve(builders.size()); for (auto& builder : builders) { @@ -253,26 +297,32 @@ arrow::Result> create_table_from_nodes( } // Create table - log_debug("Creating table with {} rows and {} columns", - arrays.empty() ? 0 : arrays[0]->length(), arrays.size()); + IF_DEBUG_ENABLED { + log_debug("Creating table with {} rows and {} columns", + arrays.empty() ? 0 : arrays[0]->length(), arrays.size()); + } return arrow::Table::Make(schema, arrays); } arrow::Result> filter( std::shared_ptr table, const WhereExpr& condition, bool strip_var) { - log_debug("Filtering table with WhereCondition: {}", condition.toString()); + IF_DEBUG_ENABLED { + log_debug("Filtering table with WhereCondition: {}", condition.toString()); + } try { // Convert WhereCondition to Arrow compute expression auto filter_expr = where_condition_to_expression(condition, strip_var); - log_debug("Creating in-memory dataset from table with {} rows", - table->num_rows()); + IF_DEBUG_ENABLED { + log_debug("Creating in-memory dataset from table with {} rows", + table->num_rows()); + } auto dataset = std::make_shared(table); // Create scanner builder - log_debug("Creating scanner builder"); + IF_DEBUG_ENABLED { log_debug("Creating scanner builder"); } auto scan_builder_result = dataset->NewScan(); if (!scan_builder_result.ok()) { log_error("Failed to create scanner builder: {}", @@ -281,14 +331,16 @@ arrow::Result> filter( } auto scan_builder = scan_builder_result.ValueOrDie(); - log_debug("Applying compound filter to scanner builder"); + IF_DEBUG_ENABLED { + log_debug("Applying compound filter to scanner builder"); + } auto filter_status = scan_builder->Filter(filter_expr); if (!filter_status.ok()) { log_error("Failed to apply filter: {}", filter_status.ToString()); return filter_status; } - log_debug("Finishing scanner"); + IF_DEBUG_ENABLED { log_debug("Finishing scanner"); } auto scanner_result = scan_builder->Finish(); if (!scanner_result.ok()) { log_error("Failed to finish scanner: {}", @@ -297,7 +349,7 @@ arrow::Result> filter( } auto scanner = scanner_result.ValueOrDie(); - log_debug("Executing scan to table"); + IF_DEBUG_ENABLED { log_debug("Executing scan to table"); } auto table_result = scanner->ToTable(); if (!table_result.ok()) { log_error("Failed to convert scan results to table: {}", @@ -306,8 +358,10 @@ arrow::Result> filter( } auto result_table = table_result.ValueOrDie(); - log_debug("Filter completed: {} rows in, {} rows out", table->num_rows(), - result_table->num_rows()); + IF_DEBUG_ENABLED { + log_debug("Filter completed: {} rows in, {} rows out", table->num_rows(), + result_table->num_rows()); + } return result_table; } catch (const std::exception& e) { @@ -446,20 +500,6 @@ struct QueryState { return true; } - // Deterministic 16-bit tag from alias string (SchemaRef::value()). - // https://www.ietf.org/archive/id/draft-eastlake-fnv-21.html - static uint16_t compute_alias_tag(const SchemaRef& ref) { - // FNV-1a 32-bit, then fold to 16 bits. - const std::string& s = ref.value(); - uint32_t h = 2166136261u; - for (unsigned char c : s) { - h ^= c; - h *= 16777619u; - } - h ^= (h >> 16); - return static_cast(h & 0xFFFFu); - } - const llvm::DenseSet& get_ids(const SchemaRef& schema_ref) { return ids[schema_ref.value()]; } @@ -567,7 +607,7 @@ struct QueryState { arrow::Result> build_denormalized_schema( const QueryState& query_state) { - log_debug("Building schema for denormalized table"); + IF_DEBUG_ENABLED { log_debug("Building schema for denormalized table"); } std::set processed_fields; std::vector> fields; @@ -576,7 +616,9 @@ arrow::Result> build_denormalized_schema( // First add fields from the FROM schema std::string from_schema = query_state.from.value(); - log_debug("Adding fields from FROM schema '{}'", from_schema); + IF_DEBUG_ENABLED { + log_debug("Adding fields from FROM schema '{}'", from_schema); + } auto schema_result = query_state.schema_registry->get_arrow( query_state.aliases.at(from_schema)); @@ -603,7 +645,9 @@ arrow::Result> build_denormalized_schema( } for (const auto& schema_ref : unique_schemas) { - log_debug("Adding fields from schema '{}'", schema_ref.value()); + IF_DEBUG_ENABLED { + log_debug("Adding fields from schema '{}'", schema_ref.value()); + } schema_result = query_state.schema_registry->get_arrow( query_state.aliases.at(schema_ref.value())); @@ -629,11 +673,11 @@ arrow::Result> build_denormalized_schema( } struct PathSegment { - std::string schema; + uint16_t schema; // tag int64_t node_id; std::string toString() const { - return schema + ":" + std::to_string(node_id); + return std::to_string(schema) + ":" + std::to_string(node_id); } bool operator==(const PathSegment& other) const { @@ -665,17 +709,13 @@ std::string join_schema_path(const std::vector& schema_path) { struct Row { int64_t id; - std::unordered_map> cells; + llvm::StringMap cells; std::vector path; + llvm::StringMap schema_ids; // can we use tag + bool schema_ids_set = false; - void set_cell(const std::string& name, - std::shared_ptr scalar) { - cells[name] = std::move(scalar); - } - - bool has_value(const std::string& name) const { - return cells.contains(name) && cells.at(name) != nullptr && - cells.at(name)->is_valid; + bool has_value(const llvm::StringRef name) const { + return cells.contains(name) && cells.at(name).data != nullptr; } void set_cell_from_node(const std::vector& fq_field_names, @@ -685,69 +725,53 @@ struct Row { for (size_t i = 0; i < n; ++i) { const auto& field = fields[i]; const auto& full_name = fq_field_names[i]; - this->set_cell(full_name, node->get_value_ptr(field->name()).ValueOrDie(), - field->type()); + const char* ptr = node->get_value_ref(field->name()).data; + this->set_cell(full_name, ptr, field->type()); } } // New set_cell method for Value objects void set_cell(const std::string& name, const char* ptr, const ValueType type) { - if (ptr) { - auto scalar_result = value_ptr_to_arrow_scalar(ptr, type); - if (scalar_result.ok()) { - cells[name] = scalar_result.ValueOrDie(); - return; - } + if (!cells.try_emplace(name, ptr, type).second) { + cells[name].data = ptr; } - - // Default to null if value is null or conversion fails - cells[name] = nullptr; - } - - void set_cell(const std::string& name, std::shared_ptr array) { - if (array && array->length() > 0) { - auto scalar_result = array->GetScalar(0); - if (scalar_result.ok()) { - cells[name] = scalar_result.ValueOrDie(); - return; - } - } - - // Default to null if array is empty or conversion fails - cells[name] = nullptr; } bool start_with(const std::vector& prefix) const { return is_prefix(prefix, this->path); } - std::unordered_map extract_schema_ids() const { - std::unordered_map result; + const llvm::StringMap& extract_schema_ids() { + if (schema_ids_set) { + return schema_ids; + } + // std::unordered_map result; for (const auto& [field_name, value] : cells) { - if (!value || !value->is_valid) continue; + if (!value.data) continue; // Extract schema prefix (everything before the first dot) size_t dot_pos = field_name.find('.'); if (dot_pos != std::string::npos) { - std::string schema = field_name.substr(0, dot_pos); + llvm::StringRef schema = field_name.substr(0, dot_pos); // Store ID for this schema if it's an ID field if (field_name.substr(dot_pos + 1) == "id") { - auto id_scalar = std::static_pointer_cast(value); - result[schema] = id_scalar->value; + schema_ids[schema] = value.as_int64(); } } } - return result; + schema_ids_set = true; + return schema_ids; } // returns new Row which is result of merging this row and other - [[nodiscard]] Row merge(const Row& other) const { - Row merged = *this; - for (const auto& [name, value] : other.cells) { - if (!merged.has_value(name)) { - merged.cells[name] = value; + [[nodiscard]] std::shared_ptr merge( + const std::shared_ptr& other) const { + std::shared_ptr merged = std::make_shared(*this); + for (const auto& [name, value] : other->cells) { + if (!merged->has_value(name)) { + merged->cells.try_emplace(name, value.data, value.type); } } return merged; @@ -759,43 +783,13 @@ struct Row { ss << "path='" << join_schema_path(path) << "', "; bool first = true; - for (const auto& [field_name, scalar] : cells) { + for (const auto& [field_name, value_ref] : cells) { if (!first) { ss << ", "; } first = false; - ss << field_name << ": "; - - if (!scalar) { - ss << "NULL"; - } else if (scalar->is_valid) { - // Handle different scalar types appropriately - switch (scalar->type->id()) { - case arrow::Type::INT64: - ss << std::static_pointer_cast(scalar)->value; - break; - case arrow::Type::DOUBLE: - ss << std::static_pointer_cast(scalar)->value; - break; - case arrow::Type::STRING: - case arrow::Type::LARGE_STRING: - ss << "\"" - << std::static_pointer_cast(scalar)->view() - << "\""; - break; - case arrow::Type::BOOL: - ss << (std::static_pointer_cast(scalar)->value - ? "true" - : "false"); - break; - default: - ss << scalar->ToString(); - break; - } - } else { - ss << "NULL"; - } + ss << field_name.str() << ": " << value_ref.ToString(); } ss << "}"; @@ -806,17 +800,10 @@ struct Row { static Row create_empty_row_from_schema( const std::shared_ptr& final_output_schema) { Row new_row; + new_row.id = -1; for (const auto& field : final_output_schema->fields()) { - // Create a null scalar of the correct type - auto null_scalar = arrow::MakeNullScalar(field->type()); - if (null_scalar != nullptr) { - new_row.cells[field->name()] = null_scalar; - } else { - // If creating a null scalar fails, use nullptr as a fallback - new_row.cells[field->name()] = nullptr; - log_warn("Failed to create null scalar for field '{}' with type '{}'", - field->name(), field->type()->ToString()); - } + new_row.cells.try_emplace(field->name(), nullptr, + arrow_type_to_value_type(field->type())); } return new_row; } @@ -834,31 +821,36 @@ std::vector get_child_rows(const Row& parent, return child; } +struct MergeState { + llvm::SmallVector result; + llvm::SmallDenseMap> grouped; +}; + struct RowNode { - std::optional row; + std::optional> row; int depth; PathSegment path_segment; std::vector> children; - RowNode() : depth(0), path_segment{"", -1} {} + RowNode() : depth(0), path_segment{0, -1} {} - RowNode(std::optional r, int d, + RowNode(std::optional> r, int d, std::vector> c = {}) : row(std::move(r)), depth(d), - path_segment{"", -1}, + path_segment{0, -1}, children(std::move(c)) {} bool leaf() const { return row.has_value(); } - void insert_row_dfs(size_t path_idx, const Row& new_row) { - if (path_idx == new_row.path.size()) { + void insert_row_dfs(size_t path_idx, const std::shared_ptr& new_row) { + if (path_idx == new_row->path.size()) { this->row = new_row; return; } for (const auto& n : children) { - if (n->path_segment == new_row.path[path_idx]) { + if (n->path_segment == new_row->path[path_idx]) { n->insert_row_dfs(path_idx + 1, new_row); return; } @@ -866,20 +858,23 @@ struct RowNode { auto new_node = std::make_unique(); new_node->depth = depth + 1; - new_node->path_segment = new_row.path[path_idx]; + new_node->path_segment = new_row->path[path_idx]; new_node->insert_row_dfs(path_idx + 1, new_row); children.emplace_back(std::move(new_node)); } - void insert_row(const Row& new_row) { insert_row_dfs(0, new_row); } + void insert_row(const std::shared_ptr& new_row) { + insert_row_dfs(0, new_row); + } - std::vector merge_rows() { + llvm::SmallVector, 4> merge_rows() { if (this->leaf()) { return {this->row.value()}; } // collect all records from child node and group them by schema - std::unordered_map> grouped; + llvm::SmallDenseMap, 4>> + grouped; for (const auto& c : children) { auto child_rows = c->merge_rows(); grouped[c->path_segment.schema].insert( @@ -887,18 +882,19 @@ struct RowNode { child_rows.end()); } - std::vector> groups_for_product; + std::vector, 4>> groups_for_product; + groups_for_product.reserve(grouped.size() + 1); // Add this->row as its own group (that is important for cartesian product) // if it exists and has data, // to represent the node itself if it should be part of the product // independently. if (this->row.has_value()) { - Row node_self_row = this->row.value(); + std::shared_ptr node_self_row = this->row.value(); // Normalize path for the node's own row to ensure it combines correctly // and doesn't carry a longer BFS path if it was a leaf of BFS. // i.e. current node path can be a:0->b:1->c:2 // this code sets it to 'c:2' - node_self_row.path = {this->path_segment}; + node_self_row->path = {this->path_segment}; groups_for_product.push_back({node_self_row}); } @@ -915,35 +911,36 @@ struct RowNode { // with data), no Cartesian product is needed. Just return its rows, but // ensure paths are correct. if (groups_for_product.size() == 1) { - std::vector single_group_rows = groups_for_product[0]; + // std::vector single_group_rows = groups_for_product[0]; // Ensure path is normalized for these rows if they came from children // For rows that are just this->row.value(), path is already set. // This might be too aggressive if child rows are already fully merged // products. For now, let's assume rows from c->merge_rows() are final // products of that child branch. - return single_group_rows; + return groups_for_product[0]; } - std::vector final_merged_rows = groups_for_product.back(); + llvm::SmallVector, 4> final_merged_rows = + groups_for_product.back(); for (int i = static_cast(groups_for_product.size()) - 2; i >= 0; --i) { - std::vector temp_product_accumulator; + llvm::SmallVector, 4> temp_product_accumulator; for (const auto& r1_from_current_group : groups_for_product[i]) { for (const auto& r2_from_previous_product : final_merged_rows) { // Check for conflicts in shared variables between rows bool can_merge = true; // Get variable prefixes (schema names) from cells - std::unordered_map schema_ids_r1 = - r1_from_current_group.extract_schema_ids(); - std::unordered_map schema_ids_r2 = - r2_from_previous_product.extract_schema_ids(); + llvm::StringMap schema_ids_r1 = + r1_from_current_group->extract_schema_ids(); + llvm::StringMap schema_ids_r2 = + r2_from_previous_product->extract_schema_ids(); // Check for conflicts - same schema name but different IDs for (const auto& [schema, id1] : schema_ids_r1) { if (schema_ids_r2.contains(schema) && schema_ids_r2[schema] != id1) { // Found a conflict - same schema but different IDs - if (Logger::get_instance().get_level() == LogLevel::DEBUG) { + IF_DEBUG_ENABLED { log_debug( "Conflict detected: Schema '{}' has different IDs: {} vs " "{}", @@ -957,16 +954,16 @@ struct RowNode { // Additional cell-by-cell check for conflicts if (can_merge) { for (const auto& [field_name, value1] : - r1_from_current_group.cells) { - if (!value1 || !value1->is_valid) continue; + r1_from_current_group->cells) { + if (!value1.data) continue; - auto it = r2_from_previous_product.cells.find(field_name); - if (it != r2_from_previous_product.cells.end() && it->second && - it->second->is_valid) { + auto it = r2_from_previous_product->cells.find(field_name); + if (it != r2_from_previous_product->cells.end() && + it->second.data) { // Both rows have this field with non-null values - check if // they match - if (!value1->Equals(*(it->second))) { - if (Logger::get_instance().get_level() == LogLevel::DEBUG) { + if (!value1.equals(it->second)) { + IF_DEBUG_ENABLED { log_debug( "Conflict detected: Field '{}' has different values", field_name); @@ -979,18 +976,20 @@ struct RowNode { } if (can_merge) { - Row merged_r = - r1_from_current_group.merge(r2_from_previous_product); + std::shared_ptr merged_r = + r1_from_current_group->merge(r2_from_previous_product); // Set the path of the newly merged row to the path of the current // RowNode - merged_r.path = {this->path_segment}; + merged_r->path = {this->path_segment}; temp_product_accumulator.push_back(merged_r); } } } final_merged_rows = std::move(temp_product_accumulator); if (final_merged_rows.empty()) { - log_debug("product_accumulator is empty. stop merge"); + IF_DEBUG_ENABLED { + log_debug("product_accumulator is empty. stop merge"); + } break; } } @@ -1011,36 +1010,36 @@ struct RowNode { // Print Row if (row.has_value()) { ss << indent << " Path: "; - if (row.value().path.empty()) { + if (row.value()->path.empty()) { ss << "(empty)"; } else { - for (size_t i = 0; i < row.value().path.size(); ++i) { + for (size_t i = 0; i < row.value()->path.size(); ++i) { if (i > 0) ss << " → "; - ss << row.value().path[i].schema << ":" - << row.value().path[i].node_id; + ss << row.value()->path[i].schema << ":" + << row.value()->path[i].node_id; } } ss << "\n"; // Print key cell values (limited to avoid overwhelming output) ss << indent << " Cells: "; - if (row.value().cells.empty()) { + if (row.value()->cells.empty()) { ss << "(empty)"; } else { size_t count = 0; ss << "{ "; - for (const auto& [key, value] : row.value().cells) { + for (const auto& [key, value] : row.value()->cells) { if (count++ > 0) ss << ", "; if (count > 5) { // Limit display - ss << "... +" << (row.value().cells.size() - 5) << " more"; + ss << "... +" << (row.value()->cells.size() - 5) << " more"; break; } - ss << key << ": "; - if (!value) { + ss << key.str() << ": "; + if (!value.data) { ss << "NULL"; } else { - ss << value->ToString(); // Assuming arrow::Scalar has ToString() + ss << value.ToString(); } } ss << " }"; @@ -1087,17 +1086,16 @@ struct QueueItem { QueueItem(int64_t id, const SchemaRef& schema, int l, std::shared_ptr r) : node_id(id), schema_ref(schema), level(l), row(std::move(r)) { - path.push_back(PathSegment{schema.value(), id}); + path.push_back(PathSegment{schema.tag(), id}); } }; // Log grouped connections for a node void log_grouped_connections( int64_t node_id, - const llvm::SmallDenseMap, 4>& + const llvm::SmallDenseMap>& grouped_connections) { - if (Logger::get_instance().get_level() == LogLevel::DEBUG) { + IF_DEBUG_ENABLED { if (grouped_connections.empty()) { log_debug("Node {} has no grouped connections", node_id); return; @@ -1109,7 +1107,7 @@ void log_grouped_connections( for (const auto& it : grouped_connections) { auto target_schema = it.first; const auto& connections = it.second; - log_debug(" To schema '{}': {} connections", target_schema.str(), + log_debug(" To schema '{}': {} connections", target_schema, connections.size()); for (size_t i = 0; i < connections.size(); ++i) { @@ -1122,13 +1120,15 @@ void log_grouped_connections( } } -template -arrow::Result>> populate_rows_bfs( - int64_t node_id, const SchemaRef& start_schema, - const std::shared_ptr& output_schema, - const QueryState& query_state, VisitedSet& global_visited) { - log_debug("populate_rows_bfs::node={}:{}", start_schema.value(), node_id); - auto result = std::make_shared>(); +arrow::Result, 4>>> +populate_rows_bfs(int64_t node_id, const SchemaRef& start_schema, + const std::shared_ptr& output_schema, + const QueryState& query_state, + llvm::DenseSet& global_visited) { + IF_DEBUG_ENABLED { + log_debug("populate_rows_bfs::node={}:{}", start_schema.value(), node_id); + } + auto result = std::make_shared, 4>>(); int64_t row_id_counter = 0; auto initial_row = std::make_shared(create_empty_row_from_schema(output_schema)); @@ -1146,25 +1146,17 @@ arrow::Result>> populate_rows_bfs( const auto& it_fq = query_state.fq_field_names.find(item.schema_ref.value()); if (it_fq == query_state.fq_field_names.end()) { - std::cout - << "ERROR: Could not find fully qualified field names for schema " - << item.schema_ref.value() << std::endl; return arrow::Status::KeyError( "Missing precomputed fq_field_names for alias {}", item.schema_ref.value()); } item.row->set_cell_from_node(it_fq->second, node); - // Pack 16-bit schema id (precomputed in SchemaRef) and 48-bit node id. - const uint16_t schema_id16 = item.schema_ref.tag(); - const uint64_t packed = (static_cast(schema_id16) << 48) | - (static_cast(item.node_id) & NODE_MASK); - global_visited.insert(item.schema_ref.value() + ":" + - std::to_string(item.node_id)); + const uint64_t packed = hash_code_(item.schema_ref, item.node_id); + global_visited.insert(packed); item.path_visited_nodes.insert(packed); // group connections by target schema (small, stack-friendly) - llvm::SmallDenseMap, 4> + llvm::SmallDenseMap> grouped_connections; bool skip = false; @@ -1172,14 +1164,11 @@ arrow::Result>> populate_rows_bfs( for (const auto& conn : query_state.connections.at(item.schema_ref.value()) .at(item.node_id)) { - const uint16_t tgt_schema_id16 = conn.target.tag(); - const uint64_t tgt_packed = - (static_cast(tgt_schema_id16) << 48) | - (static_cast(conn.target_id) & NODE_MASK); + const uint64_t tgt_packed = hash_code_(conn.target, conn.target_id); if (!item.path_visited_nodes.contains(tgt_packed)) { if (query_state.ids.at(conn.target.value()) .contains(conn.target_id)) { - grouped_connections[conn.target.value()].push_back(conn); + grouped_connections[conn.target.tag()].push_back(conn); } else { skip = true; } @@ -1191,12 +1180,10 @@ arrow::Result>> populate_rows_bfs( if (grouped_connections.empty()) { // we've done if (!skip) { - auto r = *item.row; - r.path = item.path; - r.id = row_id_counter++; - if (Logger::get_instance().get_level() == LogLevel::DEBUG) { - log_debug("add row: {}", r.ToString()); - } + auto r = item.row; + r->path = item.path; + r->id = row_id_counter++; + IF_DEBUG_ENABLED { log_debug("add row: {}", r->ToString()); } result->push_back(r); } @@ -1211,9 +1198,9 @@ arrow::Result>> populate_rows_bfs( item.level + 1, item.row); next.path = item.path; - next.path.push_back(PathSegment{connections[0].target.value(), + next.path.push_back(PathSegment{connections[0].target.tag(), connections[0].target_id}); - if (Logger::get_instance().get_level() == LogLevel::DEBUG) { + IF_DEBUG_ENABLED { log_debug("continue the path: {}", join_schema_path(next.path)); } queue.push(next); @@ -1224,8 +1211,8 @@ arrow::Result>> populate_rows_bfs( next_row); next.path = item.path; next.path.push_back( - PathSegment{conn.target.value(), conn.target_id}); - if (Logger::get_instance().get_level() == LogLevel::DEBUG) { + PathSegment{conn.target.tag(), conn.target_id}); + IF_DEBUG_ENABLED { log_debug("create a new path {}, node={}", join_schema_path(next.path), conn.target_id); } @@ -1237,39 +1224,36 @@ arrow::Result>> populate_rows_bfs( } } RowNode tree; - tree.path_segment = PathSegment{"root", -1}; + tree.path_segment = PathSegment{0, -1}; for (const auto& r : *result) { - if (Logger::get_instance().get_level() == LogLevel::DEBUG) { - log_debug("bfs result: {}", r.ToString()); - } + IF_DEBUG_ENABLED { log_debug("bfs result: {}", r->ToString()); } tree.insert_row(r); } - if (Logger::get_instance().get_level() == LogLevel::DEBUG) { - tree.print(); - } + IF_DEBUG_ENABLED { tree.print(); } auto merged = tree.merge_rows(); - if (Logger::get_instance().get_level() == LogLevel::DEBUG) { + IF_DEBUG_ENABLED { for (const auto& row : merged) { - log_debug("merge result: {}", row.ToString()); + log_debug("merge result: {}", row->ToString()); } } - return std::make_shared>(merged); + return std::make_shared, 4>>(merged); } // template -arrow::Result>> populate_batch_rows( - const llvm::DenseSet& node_ids, const SchemaRef& schema_ref, - const std::shared_ptr& output_schema, - const QueryState& query_state, const TraverseType join_type, - tbb::concurrent_unordered_set& global_visited) { - auto rows = std::make_shared>(); +arrow::Result, 4>>> +populate_batch_rows(const llvm::DenseSet& node_ids, + const SchemaRef& schema_ref, + const std::shared_ptr& output_schema, + const QueryState& query_state, const TraverseType join_type, + llvm::DenseSet& global_visited) { + auto rows = std::make_shared, 4>>(); rows->reserve(node_ids.size()); - std::set local_visited; + llvm::DenseSet local_visited; // For INNER join: only process nodes that have connections // For LEFT join: process all nodes from the "left" side for (const auto node_id : node_ids) { - auto key = schema_ref.value() + ":" + std::to_string(node_id); - if (!global_visited.insert(key).second) { + const uint64_t packed = hash_code_(schema_ref, node_id); + if (!global_visited.insert(packed).second) { // Skip if already processed in an earlier traversal continue; } @@ -1323,13 +1307,13 @@ std::vector> batch_node_ids( // process all schemas used in traverse // Phase 1: Process connected nodes // Phase 2: Handle outer joins for unmatched nodes -arrow::Result>> populate_rows( +arrow::Result>>> populate_rows( const ExecutionConfig& execution_config, const QueryState& query_state, const std::vector& traverses, const std::shared_ptr& output_schema) { - auto rows = std::make_shared>(); + auto rows = std::make_shared>>(); std::mutex rows_mtx; - tbb::concurrent_unordered_set global_visited; + llvm::DenseSet global_visited; // Map schemas to their join types std::unordered_map schema_join_types; @@ -1369,14 +1353,18 @@ arrow::Result>> populate_rows( } } - log_debug("Processing {} schemas with their respective join types", - ordered_schemas.size()); + IF_DEBUG_ENABLED { + log_debug("Processing {} schemas with their respective join types", + ordered_schemas.size()); + } // Process each schema in order for (const auto& schema_ref : ordered_schemas) { TraverseType join_type = schema_join_types[schema_ref.value()]; - log_debug("Processing schema '{}' with join type {}", schema_ref.value(), - static_cast(join_type)); + IF_DEBUG_ENABLED { + log_debug("Processing schema '{}' with join type {}", schema_ref.value(), + static_cast(join_type)); + } if (!query_state.ids.contains(schema_ref.value())) { log_warn("Schema '{}' not found in query state IDs", schema_ref.value()); @@ -1394,7 +1382,7 @@ arrow::Result>> populate_rows( batch_size = execution_config.calculate_batch_size(schema_nodes.size()); } auto batches = batch_node_ids(schema_nodes, batch_size); - if (Logger::get_instance().get_level() == LogLevel::DEBUG) { + IF_DEBUG_ENABLED { log_debug( "process concurrently. thread_count={}, batch_size={}, " "batches_count={}", @@ -1446,14 +1434,16 @@ arrow::Result>> populate_rows( std::make_move_iterator(res_value->end())); } - if (Logger::get_instance().get_level() == LogLevel::DEBUG) { + IF_DEBUG_ENABLED { log_debug("Processing schema '{}' nodes: [{}]", schema_ref.value(), join_container(schema_nodes)); } } - log_debug("Generated {} total rows after processing all schemas", - rows->size()); + IF_DEBUG_ENABLED { + log_debug("Generated {} total rows after processing all schemas", + rows->size()); + } return rows; } @@ -1499,7 +1489,7 @@ arrow::Result> create_empty_table( } arrow::Result> create_table_from_rows( - const std::shared_ptr>& rows, + const std::shared_ptr>>& rows, const std::shared_ptr& schema = nullptr) { if (!rows || rows->empty()) { if (schema == nullptr) { @@ -1517,8 +1507,8 @@ arrow::Result> create_table_from_rows( // Get all field names from all rows to create a complete schema std::set all_field_names; for (const auto& row : *rows) { - for (const auto& field_name : row.cells | std::views::keys) { - all_field_names.insert(field_name); + for (const auto& entry : row->cells) { + all_field_names.insert(entry.first().str()); } } @@ -1529,9 +1519,10 @@ arrow::Result> create_table_from_rows( // Find first non-null value to determine field type std::shared_ptr field_type = nullptr; for (const auto& row : *rows) { - auto it = row.cells.find(field_name); - if (it != row.cells.end() && it->second) { - if (auto array_result = arrow::MakeArrayFromScalar(*(it->second), 1); + auto it = row->cells.find(field_name); + if (it != row->cells.end() && it->second.data != nullptr) { + if (auto array_result = arrow::MakeArrayFromScalar( + *(it->second.as_scalar().ValueOrDie()), 1); array_result.ok()) { field_type = array_result.ValueOrDie()->type(); break; @@ -1561,24 +1552,17 @@ arrow::Result> create_table_from_rows( for (const auto& row : *rows) { for (size_t i = 0; i < output_schema->num_fields(); i++) { const auto& field_name = output_schema->field(i)->name(); - auto it = row.cells.find(field_name); - - if (it != row.cells.end() && it->second) { - // We have a value for this field - auto array_result = arrow::MakeArrayFromScalar(*(it->second), 1); - if (array_result.ok()) { - auto array = array_result.ValueOrDie(); - auto scalar_result = array->GetScalar(0); - if (scalar_result.ok()) { - ARROW_RETURN_NOT_OK( - builders[i]->AppendScalar(*scalar_result.ValueOrDie())); - continue; - } + auto it = row->cells.find(field_name); + if (it != row->cells.end() && it->second.data) { + if (auto res = + builders[i]->AppendScalar(*it->second.as_scalar().ValueOrDie()); + !res.ok()) { + return res; } + } else { + // Fall back to NULL if we couldn't get or append the scalar + ARROW_RETURN_NOT_OK(builders[i]->AppendNull()); } - - // Fall back to NULL if we couldn't get or append the scalar - ARROW_RETURN_NOT_OK(builders[i]->AppendNull()); } } @@ -1669,7 +1653,9 @@ std::vector> get_where_to_inline( if (clauses[i]->type() == Clause::Type::WHERE) { auto where_expr = std::dynamic_pointer_cast(clauses[i]); if (where_expr->can_inline(target_var)) { - log_debug("inline where: '{}'", where_expr->toString()); + IF_DEBUG_ENABLED { + log_debug("inline where: '{}'", where_expr->toString()); + } inlined.push_back(where_expr); } } @@ -1683,7 +1669,7 @@ arrow::Result> inline_where( const std::vector>& where_exprs) { auto curr_table = std::move(table); for (const auto& exp : where_exprs) { - log_debug("inline where '{}'", exp->toString()); + IF_DEBUG_ENABLED { log_debug("inline where '{}'", exp->toString()); } auto result = filter(curr_table, *exp, true); if (!result.ok()) { log_error( @@ -1726,17 +1712,21 @@ arrow::Result> Database::query( const Query& query) const { QueryState query_state; auto result = std::make_shared(); - log_debug("Executing query starting from schema '{}'", - query.from().toString()); + IF_DEBUG_ENABLED { + log_debug("Executing query starting from schema '{}'", + query.from().toString()); + } query_state.node_manager = this->node_manager_; query_state.schema_registry = this->schema_registry_; query_state.from = query.from(); { - log_debug("processing 'from' {}", query.from().toString()); + IF_DEBUG_ENABLED { + log_debug("processing 'from' {}", query.from().toString()); + } // Precompute tag for FROM schema (alias-based hash) query_state.from = query.from(); - query_state.from.set_tag(QueryState::compute_alias_tag(query_state.from)); + query_state.from.set_tag(compute_tag(query_state.from)); ARROW_ASSIGN_OR_RAISE(auto source_schema, query_state.resolve_schema(query.from())); if (!this->schema_registry_->exists(source_schema)) { @@ -1770,7 +1760,9 @@ arrow::Result> Database::query( } } - log_debug("Processing {} query clauses", query.clauses().size()); + IF_DEBUG_ENABLED { + log_debug("Processing {} query clauses", query.clauses().size()); + } // Precompute 16-bit alias-based tags for all SchemaRefs // Also precompute fully-qualified field names per alias used in the query @@ -1781,7 +1773,9 @@ arrow::Result> Database::query( case Clause::Type::WHERE: { auto where = std::dynamic_pointer_cast(clause); if (where->inlined()) { - log_debug("where '{}' is inlined, skip", where->toString()); + IF_DEBUG_ENABLED { + log_debug("where '{}' is inlined, skip", where->toString()); + } continue; } auto variables = where->get_all_variables(); @@ -1792,7 +1786,9 @@ arrow::Result> Database::query( where->toString()); } if (variables.size() == 1) { - log_debug("Processing WHERE clause: '{}'", where->toString()); + IF_DEBUG_ENABLED { + log_debug("Processing WHERE clause: '{}'", where->toString()); + } std::unordered_map> new_front_ids; std::string variable = *variables.begin(); @@ -1810,8 +1806,10 @@ arrow::Result> Database::query( ARROW_RETURN_NOT_OK(query_state.update_table( filtered_table_result.ValueOrDie(), SchemaRef::parse(variable))); } else { - log_debug("Add compound WHERE expression: '{}' to post process", - where->toString()); + IF_DEBUG_ENABLED { + log_debug("Add compound WHERE expression: '{}' to post process", + where->toString()); + } post_where.emplace_back(where); } break; @@ -1820,10 +1818,8 @@ arrow::Result> Database::query( auto traverse = std::static_pointer_cast(clause); // Precompute and set tags for source/target refs (alias-based, // deterministic) - traverse->mutable_source().set_tag( - QueryState::compute_alias_tag(traverse->source())); - traverse->mutable_target().set_tag( - QueryState::compute_alias_tag(traverse->target())); + traverse->mutable_source().set_tag(compute_tag(traverse->source())); + traverse->mutable_target().set_tag(compute_tag(traverse->target())); ARROW_ASSIGN_OR_RAISE(auto source_schema, query_state.resolve_schema(traverse->source())); @@ -1847,14 +1843,14 @@ arrow::Result> Database::query( where_clauses.size(); } query_state.traversals.push_back(*traverse); - if (Logger::get_instance().get_level() == LogLevel::DEBUG) { + IF_DEBUG_ENABLED { log_debug("Processing TRAVERSE {}-({})->{}", traverse->source().toString(), traverse->edge_type(), traverse->target().toString()); } auto source = traverse->source(); if (!query_state.tables.contains(source.value())) { - if (Logger::get_instance().get_level() == LogLevel::DEBUG) { + IF_DEBUG_ENABLED { log_debug("Source table '{}' not found. Loading", traverse->source().toString()); } @@ -1864,7 +1860,7 @@ arrow::Result> Database::query( query_state.update_table(source_table, traverse->source())); } - if (Logger::get_instance().get_level() == LogLevel::DEBUG) { + IF_DEBUG_ENABLED { log_debug("Traversing from {} source nodes", query_state.ids[source.value()].size()); } @@ -1873,11 +1869,12 @@ arrow::Result> Database::query( llvm::DenseSet unmatched_source_ids; for (auto source_id : query_state.ids[source.value()]) { auto outgoing_edges = - edge_store_->get_outgoing_edges(source_id, traverse->edge_type()) + edge_store_ + ->get_outgoing_edges_view(source_id, traverse->edge_type()) .ValueOrDie(); // todo check result - if (Logger::get_instance().get_level() == LogLevel::DEBUG) { + IF_DEBUG_ENABLED { log_debug("Node {} has {} outgoing edges of type '{}'", source_id, - outgoing_edges.size(), traverse->edge_type()); + outgoing_edges.count(), traverse->edge_type()); } bool source_had_match = false; @@ -1905,7 +1902,7 @@ arrow::Result> Database::query( } } if (passes_all_filters) { - if (Logger::get_instance().get_level() == LogLevel::DEBUG) { + IF_DEBUG_ENABLED { log_debug("found edge {}:{} -[{}]-> {}:{}", source.value(), source_id, traverse->edge_type(), traverse->target().value(), target_node->id); @@ -1932,7 +1929,7 @@ arrow::Result> Database::query( } } if (!source_had_match) { - if (Logger::get_instance().get_level() == LogLevel::DEBUG) { + IF_DEBUG_ENABLED { log_debug("no edge found from {}:{}", source.value(), source_id); } unmatched_source_ids.insert(source_id); @@ -1941,11 +1938,15 @@ arrow::Result> Database::query( if (traverse->traverse_type() == TraverseType::Inner && !unmatched_source_ids.empty()) { for (auto id : unmatched_source_ids) { - log_debug("remove unmatched node={}:{}", source.value(), id); + IF_DEBUG_ENABLED { + log_debug("remove unmatched node={}:{}", source.value(), id); + } query_state.remove_node(id, source); } - log_debug("rebuild table for schema {}:{}", source.value(), - query_state.aliases[source.value()]); + IF_DEBUG_ENABLED { + log_debug("rebuild table for schema {}:{}", source.value(), + query_state.aliases[source.value()]); + } auto table_result = filter_table_by_id(query_state.tables[source.value()], query_state.ids[source.value()]); @@ -1954,8 +1955,10 @@ arrow::Result> Database::query( } query_state.tables[source.value()] = table_result.ValueOrDie(); } - log_debug("found {} neighbors for {}", matched_target_ids.size(), - traverse->target().toString()); + IF_DEBUG_ENABLED { + log_debug("found {} neighbors for {}", matched_target_ids.size(), + traverse->target().toString()); + } if (traverse->traverse_type() == TraverseType::Inner) { // intersect @@ -1973,7 +1976,7 @@ arrow::Result> Database::query( } query_state.ids[traverse->target().value()] = intersect_ids; - if (Logger::get_instance().get_level() == LogLevel::DEBUG) { + IF_DEBUG_ENABLED { log_debug("intersect_ids count: {}", intersect_ids.size()); log_debug("{} intersect_ids: {}", traverse->target().toString(), join_container(intersect_ids)); @@ -1986,7 +1989,7 @@ arrow::Result> Database::query( auto target_ids = get_ids_from_table(get_table(target_schema).ValueOrDie()) .ValueOrDie(); - if (Logger::get_instance().get_level() == LogLevel::DEBUG) { + IF_DEBUG_ENABLED { log_debug( "traverse type: '{}', matched_source_ids=[{}], " "target_ids=[{}]", @@ -2000,7 +2003,7 @@ arrow::Result> Database::query( std::vector> neighbors; for (auto id : query_state.ids[traverse->target().value()]) { - auto node_res = node_manager_->get_node(id); + auto const node_res = node_manager_->get_node(id); if (node_res.ok()) { neighbors.push_back(node_res.ValueOrDie()); } @@ -2026,9 +2029,15 @@ arrow::Result> Database::query( } } - if (Logger::get_instance().get_level() == LogLevel::DEBUG) { + IF_DEBUG_ENABLED { log_debug("Query processing complete, building result"); log_debug("Query state: {}", query_state.ToString()); + for (auto traversal : query_state.traversals) { + log_debug("schema tag {}->{}", traversal.source().value(), + traversal.source().tag()); + log_debug("schema tag {}->{}", traversal.target().value(), + traversal.target().tag()); + } } auto output_schema_res = build_denormalized_schema(query_state); @@ -2036,7 +2045,7 @@ arrow::Result> Database::query( return output_schema_res.status(); } const auto output_schema = output_schema_res.ValueOrDie(); - log_debug("output_schema={}", output_schema->ToString()); + IF_DEBUG_ENABLED { log_debug("output_schema={}", output_schema->ToString()); } auto row_res = populate_rows(query.execution_config(), query_state, query_state.traversals, output_schema); @@ -2053,7 +2062,7 @@ arrow::Result> Database::query( auto output_table = output_table_res.ValueOrDie(); for (const auto& expr : post_where) { result->mutable_execution_stats().num_where_clauses_post_processed++; - log_debug("post process where: {}", expr->toString()); + IF_DEBUG_ENABLED { log_debug("post process where: {}", expr->toString()); } output_table = filter(output_table, *expr, false).ValueOrDie(); } result->set_table(apply_select(query.select(), output_table)); diff --git a/src/edge_store.cpp b/src/edge_store.cpp index 5858818..2d65349 100644 --- a/src/edge_store.cpp +++ b/src/edge_store.cpp @@ -3,6 +3,37 @@ #include "logger.hpp" namespace tundradb { +// EdgeView::iterator implementation - optimized +void EdgeView::iterator::advance_to_valid() { + // Pre-check if type filter is empty to avoid string comparisons + const bool has_type_filter = !type_filter_.empty(); + + while (edge_ids_it_ != edge_ids_end_) { + tbb::concurrent_hash_map>::const_accessor + edge_acc; + + // Fast path: try to find edge (this is the main bottleneck) + if (store_->edges.find(edge_acc, edge_ids_it_->first)) { + auto edge = edge_acc->second; + + // Fast path: no type filter + if (!has_type_filter) { + current_edge_ = edge; + return; + } + + // Slow path: check type filter + if (edge->get_type() == type_filter_) { + current_edge_ = edge; + return; + } + } + + ++edge_ids_it_; + } + current_edge_.reset(); +} + arrow::Result> EdgeStore::create_edge( int64_t source_id, const std::string& type, int64_t target_id, std::unordered_map> properties) { @@ -124,13 +155,13 @@ arrow::Result>> EdgeStore::get_edges_from_map( } std::vector> result; - const auto edge_ids = acc->second.get_all(); - result.reserve(edge_ids->size()); + const auto edge_ids = acc->second.get_all_unsafe(); + result.reserve(edge_ids.size()); - for (const auto& edge_id : *edge_ids) { + for (const auto& edge_id : edge_ids) { tbb::concurrent_hash_map>::const_accessor edge_acc; - if (edges.find(edge_acc, edge_id)) { + if (edges.find(edge_acc, edge_id.first)) { if (auto edge = edge_acc->second; type.empty() || edge->get_type() == type) { result.push_back(edge); @@ -151,6 +182,28 @@ arrow::Result>> EdgeStore::get_incoming_edges( return get_edges_from_map(incoming_edges_, id, type); } +arrow::Result EdgeStore::get_outgoing_edges_view( + const int64_t id, const std::string& type) const { + tbb::concurrent_hash_map>::const_accessor acc; + if (!outgoing_edges_.find(acc, id)) { + // Return empty view - create a temporary empty ConcurrentSet + static const ConcurrentSet empty_set; + return EdgeView(this, empty_set, type); + } + return EdgeView(this, acc->second, type); +} + +arrow::Result EdgeStore::get_incoming_edges_view( + const int64_t id, const std::string& type) const { + tbb::concurrent_hash_map>::const_accessor acc; + if (!incoming_edges_.find(acc, id)) { + // Return empty view - create a temporary empty ConcurrentSet + static const ConcurrentSet empty_set; + return EdgeView(this, empty_set, type); + } + return EdgeView(this, acc->second, type); +} + arrow::Result>> EdgeStore::get_by_type( const std::string& type) const { tbb::concurrent_hash_map>::const_accessor diff --git a/tests/where_expression_test.cpp b/tests/where_expression_test.cpp index c3e7e28..f3c4c2f 100644 --- a/tests/where_expression_test.cpp +++ b/tests/where_expression_test.cpp @@ -151,6 +151,7 @@ TEST_F(WhereExpressionTest, SimpleWhereCondition) { // Test compound WHERE with AND - fluent API TEST_F(WhereExpressionTest, CompoundWhereAndFluent) { // Test: age > 30 AND city = "NYC" + Logger::get_instance().set_level(LogLevel::DEBUG); Query query = Query::from("u:User") .where("u.age", CompareOp::Gt, 30) .and_where("u.city", CompareOp::Eq, "NYC") @@ -559,6 +560,7 @@ TEST_F(WhereExpressionTest, TraversalWhereCombinations) { } TEST_F(WhereExpressionTest, TraversalWhereCombinations2) { + Logger::get_instance().set_level(LogLevel::DEBUG); Query query = Query::from("u:User") .traverse("u", "WORKS_AT", "c:Company") .where("u.age", CompareOp::Gte, 35)