Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 67 additions & 0 deletions include/concurrency.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,61 @@ namespace tundradb {
*/
template <typename T>
class ConcurrentSet {
public:
/**
* @brief Unsafe view for direct iteration over the underlying
* concurrent_hash_map
*
* WARNING: This view is NOT thread-safe. Use only when you can guarantee
* no concurrent modifications are happening.
*/
class LockedView {
private:
const tbb::concurrent_hash_map<T, std::monostate>& data_;

public:
explicit LockedView(const tbb::concurrent_hash_map<T, std::monostate>& data)
: data_(data) {}

// Iterator that extracts keys from the concurrent_hash_map
class iterator {
private:
typename tbb::concurrent_hash_map<T, std::monostate>::const_iterator it_;

public:
using iterator_category = std::forward_iterator_tag;
using value_type = T;
using difference_type = std::ptrdiff_t;
using pointer = const T*;
using reference = const T&;

explicit iterator(
typename tbb::concurrent_hash_map<T, std::monostate>::const_iterator
it)
: it_(it) {}

reference operator*() const { return it_->first; }
pointer operator->() const { return &(it_->first); }

iterator& operator++() {
++it_;
return *this;
}
iterator operator++(int) {
iterator tmp = *this;
++(*this);
return tmp;
}

bool operator==(const iterator& other) const { return it_ == other.it_; }
bool operator!=(const iterator& other) const { return it_ != other.it_; }
};

iterator begin() const { return iterator(data_.begin()); }
iterator end() const { return iterator(data_.end()); }
size_t size() const { return data_.size(); }
};

private:
tbb::concurrent_hash_map<T, std::monostate> data_;
mutable std::shared_mutex mutex_; // Read-write mutex for synchronization
Expand Down Expand Up @@ -94,6 +149,18 @@ class ConcurrentSet {
return snapshot;
}

/**
* @brief Get an unsafe view for direct iteration
*
* WARNING: This is NOT thread-safe! Use only when you can guarantee
* no concurrent modifications are happening. This avoids the expensive
* copy operation of get_all() for performance-critical code paths.
*
* Returns a view that provides begin()/end() iterators directly over
* the underlying concurrent_hash_map keys.
*/
LockedView get_all_unsafe() const { return LockedView(data_); }

/**
* @brief Clear all elements from the set
*
Expand Down
5 changes: 5 additions & 0 deletions include/edge_store.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,11 @@ class EdgeStore {

std::vector<std::shared_ptr<Edge>> get(const std::set<int64_t> &ids) const;

// Template overload for any iterable container (including
// ConcurrentSet::LockedView)
template <typename Container>
std::vector<std::shared_ptr<Edge>> get(const Container &ids) const;

int64_t get_count_by_type(const std::string &type) const {
if (auto res = get_by_type(type); res.ok()) {
return res.ValueOrDie().size();
Expand Down
6 changes: 6 additions & 0 deletions include/node.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,12 @@ class Node {
return arrow::Status::NotImplemented("");
}

[[nodiscard]] ValueRef get_value_ref(
const std::shared_ptr<Field> &field) const {
const char *ptr = arena_->get_field_value_ptr(*handle_, layout_, field);
return {ptr, field->type()};
}

[[deprecated]]
arrow::Result<Value> get_value(const std::string &field) const {
log_warn("get_value by string is deprecated");
Expand Down
174 changes: 174 additions & 0 deletions include/types.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,180 @@ class Value {
data_;
};

struct ValueRef {
const char* data;
ValueType type;

ValueRef() : data(nullptr), type(ValueType::NA) {}

explicit ValueRef(ValueType type) : data(nullptr), type(type) {}

ValueRef(const char* ptr, ValueType type) : data(ptr), type(type) {}

[[nodiscard]] int32_t as_int32() const {
return *reinterpret_cast<const int32_t*>(data);
}

[[nodiscard]] int64_t as_int64() const {
return *reinterpret_cast<const int64_t*>(data);
}

[[nodiscard]] double as_double() const {
return *reinterpret_cast<const double*>(data);
}

[[nodiscard]] float as_float() const {
return *reinterpret_cast<const float*>(data);
}

[[nodiscard]] bool as_bool() const {
return *reinterpret_cast<const bool*>(data);
}

[[nodiscard]] std::string as_string() const { return std::string(data); }

[[nodiscard]] const StringRef& as_string_ref() const {
return *reinterpret_cast<const StringRef*>(data);
}

arrow::Result<std::shared_ptr<arrow::Scalar>> as_scalar() const {
switch (type) {
case ValueType::INT32:
return arrow::MakeScalar(as_int32());
case ValueType::INT64:
return arrow::MakeScalar(as_int64());
case ValueType::DOUBLE:
return arrow::MakeScalar(as_double());
case ValueType::STRING:
return arrow::MakeScalar(as_string_ref().to_string());
case ValueType::BOOL:
return arrow::MakeScalar(as_bool());
case ValueType::NA:
return arrow::MakeNullScalar(arrow::null());
default:
return arrow::Status::NotImplemented(
"Unsupported Value type for Arrow scalar conversion: ",
to_string(type));
}
}

bool operator==(const ValueRef& other) const {
if (type != other.type) {
std::cout << "different types. this: " << to_string(type)
<< ", other: " << to_string(other.type) << std::endl;
return false;
}

// Both null
if (data == nullptr && other.data == nullptr) {
return true;
}

// One null, one not null
if (data == nullptr || other.data == nullptr) {
return false;
}

// Compare values based on type
switch (type) {
case ValueType::NA:
return true; // Both are NA

case ValueType::INT32:
return *reinterpret_cast<const int32_t*>(data) ==
*reinterpret_cast<const int32_t*>(other.data);

case ValueType::INT64:
return *reinterpret_cast<const int64_t*>(data) ==
*reinterpret_cast<const int64_t*>(other.data);

case ValueType::FLOAT:
return *reinterpret_cast<const float*>(data) ==
*reinterpret_cast<const float*>(other.data);

case ValueType::DOUBLE:
return *reinterpret_cast<const double*>(data) ==
*reinterpret_cast<const double*>(other.data);

case ValueType::BOOL:
return *reinterpret_cast<const bool*>(data) ==
*reinterpret_cast<const bool*>(other.data);

case ValueType::STRING: {
const StringRef& str1 = *reinterpret_cast<const StringRef*>(data);
const StringRef& str2 = *reinterpret_cast<const StringRef*>(other.data);

// Compare string lengths first
if (str1.length != str2.length) {
return false;
}

// Both null strings
if (str1.is_null() && str2.is_null()) {
return true;
}

// One null, one not
if (str1.is_null() || str2.is_null()) {
return false;
}

// Compare string content
return std::memcmp(str1.data, str2.data, str1.length) == 0;
}

default:
return false; // Unknown type
}
}

bool operator!=(const ValueRef& other) const { return !(*this == other); }

[[nodiscard]] bool equals(const ValueRef& other) const {
return *this == other;
}

// todo rename
std::string ToString() const {
if (data == nullptr) {
return "NULL";
}

switch (type) {
case ValueType::NA:
return "NULL";

case ValueType::INT32:
return std::to_string(as_int32());

case ValueType::INT64:
return std::to_string(as_int64());

case ValueType::FLOAT:
return std::to_string(as_float());

case ValueType::DOUBLE:
return std::to_string(as_double());

case ValueType::BOOL:
return as_bool() ? "true" : "false";

case ValueType::FIXED_STRING16:
case ValueType::FIXED_STRING32:
case ValueType::FIXED_STRING64:
case ValueType::STRING: {
const StringRef& str_ref = as_string_ref();
if (str_ref.is_null()) {
return "NULL";
}
return "\"" + str_ref.to_string() + "\"";
}
default:
return "UNKNOWN_TYPE";
}
}
};

// Stream operator for ValueType
inline std::ostream& operator<<(std::ostream& os, const ValueType type) {
return os << to_string(type);
Expand Down
Loading