Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
204 changes: 157 additions & 47 deletions ext/rcb_analytics.cxx

Large diffs are not rendered by default.

55 changes: 39 additions & 16 deletions ext/rcb_buckets.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
#include <ruby.h>

#include "rcb_backend.hxx"
#include "rcb_observability.hxx"
#include "rcb_utils.hxx"

namespace couchbase::ruby
Expand Down Expand Up @@ -273,7 +274,10 @@ cb_generate_bucket_settings(VALUE bucket,
}

VALUE
cb_Backend_bucket_create(VALUE self, VALUE bucket_settings, VALUE options)
cb_Backend_bucket_create(VALUE self,
VALUE bucket_settings,
VALUE options,
VALUE observability_handler)
{
auto cluster = cb_backend_to_core_api_cluster(self);

Expand All @@ -286,12 +290,15 @@ cb_Backend_bucket_create(VALUE self, VALUE bucket_settings, VALUE options)
core::operations::management::bucket_create_request req{};
cb_extract_timeout(req, options);
cb_generate_bucket_settings(bucket_settings, req.bucket, true);
auto parent_span = cb_create_parent_span(req, self);
std::promise<core::operations::management::bucket_create_response> promise;
auto f = promise.get_future();
cluster.execute(req, [promise = std::move(promise)](auto&& resp) mutable {
promise.set_value(std::forward<decltype(resp)>(resp));
});
if (auto resp = cb_wait_for_future(f); resp.ctx.ec) {
auto resp = cb_wait_for_future(f);
cb_add_core_spans(observability_handler, std::move(parent_span));
if (resp.ctx.ec) {
cb_throw_error(resp.ctx,
fmt::format("unable to create bucket \"{}\" on the cluster ({})",
req.bucket.name,
Expand All @@ -309,7 +316,10 @@ cb_Backend_bucket_create(VALUE self, VALUE bucket_settings, VALUE options)
}

VALUE
cb_Backend_bucket_update(VALUE self, VALUE bucket_settings, VALUE options)
cb_Backend_bucket_update(VALUE self,
VALUE bucket_settings,
VALUE options,
VALUE observability_handler)
{
auto cluster = cb_backend_to_core_api_cluster(self);

Expand All @@ -321,12 +331,15 @@ cb_Backend_bucket_update(VALUE self, VALUE bucket_settings, VALUE options)
core::operations::management::bucket_update_request req{};
cb_extract_timeout(req, options);
cb_generate_bucket_settings(bucket_settings, req.bucket, false);
auto parent_span = cb_create_parent_span(req, self);
std::promise<core::operations::management::bucket_update_response> promise;
auto f = promise.get_future();
cluster.execute(req, [promise = std::move(promise)](auto&& resp) mutable {
promise.set_value(std::forward<decltype(resp)>(resp));
});
if (auto resp = cb_wait_for_future(f); resp.ctx.ec) {
auto resp = cb_wait_for_future(f);
cb_add_core_spans(observability_handler, std::move(parent_span));
if (resp.ctx.ec) {
cb_throw_error(resp.ctx,
fmt::format("unable to update bucket \"{}\" on the cluster ({})",
req.bucket.name,
Expand All @@ -343,7 +356,7 @@ cb_Backend_bucket_update(VALUE self, VALUE bucket_settings, VALUE options)
}

VALUE
cb_Backend_bucket_drop(VALUE self, VALUE bucket_name, VALUE options)
cb_Backend_bucket_drop(VALUE self, VALUE bucket_name, VALUE options, VALUE observability_handler)
{
auto cluster = cb_backend_to_core_api_cluster(self);

Expand All @@ -355,12 +368,15 @@ cb_Backend_bucket_drop(VALUE self, VALUE bucket_name, VALUE options)
try {
core::operations::management::bucket_drop_request req{ cb_string_new(bucket_name) };
cb_extract_timeout(req, options);
auto parent_span = cb_create_parent_span(req, self);
std::promise<core::operations::management::bucket_drop_response> promise;
auto f = promise.get_future();
cluster.execute(req, [promise = std::move(promise)](auto&& resp) mutable {
promise.set_value(std::forward<decltype(resp)>(resp));
});
if (auto resp = cb_wait_for_future(f); resp.ctx.ec) {
auto resp = cb_wait_for_future(f);
cb_add_core_spans(observability_handler, std::move(parent_span));
if (resp.ctx.ec) {
cb_throw_error(resp.ctx,
fmt::format("unable to remove bucket \"{}\" on the cluster", req.name));
}
Expand All @@ -375,7 +391,7 @@ cb_Backend_bucket_drop(VALUE self, VALUE bucket_name, VALUE options)
}

VALUE
cb_Backend_bucket_flush(VALUE self, VALUE bucket_name, VALUE options)
cb_Backend_bucket_flush(VALUE self, VALUE bucket_name, VALUE options, VALUE observability_handler)
{
auto cluster = cb_backend_to_core_api_cluster(self);

Expand All @@ -387,12 +403,15 @@ cb_Backend_bucket_flush(VALUE self, VALUE bucket_name, VALUE options)
try {
core::operations::management::bucket_flush_request req{ cb_string_new(bucket_name) };
cb_extract_timeout(req, options);
auto parent_span = cb_create_parent_span(req, self);
std::promise<core::operations::management::bucket_flush_response> promise;
auto f = promise.get_future();
cluster.execute(req, [promise = std::move(promise)](auto&& resp) mutable {
promise.set_value(std::forward<decltype(resp)>(resp));
});
if (auto resp = cb_wait_for_future(f); resp.ctx.ec) {
auto resp = cb_wait_for_future(f);
cb_add_core_spans(observability_handler, std::move(parent_span));
if (resp.ctx.ec) {
cb_throw_error(resp.ctx,
fmt::format("unable to flush bucket \"{}\" on the cluster", req.name));
}
Expand Down Expand Up @@ -562,7 +581,7 @@ cb_extract_bucket_settings(const core::management::cluster::bucket_settings& ent
}

VALUE
cb_Backend_bucket_get_all(VALUE self, VALUE options)
cb_Backend_bucket_get_all(VALUE self, VALUE options, VALUE observability_handler)
{
auto cluster = cb_backend_to_core_api_cluster(self);

Expand All @@ -573,12 +592,14 @@ cb_Backend_bucket_get_all(VALUE self, VALUE options)
try {
core::operations::management::bucket_get_all_request req{};
cb_extract_timeout(req, options);
auto parent_span = cb_create_parent_span(req, self);
std::promise<core::operations::management::bucket_get_all_response> promise;
auto f = promise.get_future();
cluster.execute(req, [promise = std::move(promise)](auto&& resp) mutable {
promise.set_value(std::forward<decltype(resp)>(resp));
});
auto resp = cb_wait_for_future(f);
cb_add_core_spans(observability_handler, std::move(parent_span));
if (resp.ctx.ec) {
cb_throw_error(resp.ctx, "unable to get list of the buckets of the cluster");
}
Expand All @@ -601,7 +622,7 @@ cb_Backend_bucket_get_all(VALUE self, VALUE options)
}

VALUE
cb_Backend_bucket_get(VALUE self, VALUE bucket_name, VALUE options)
cb_Backend_bucket_get(VALUE self, VALUE bucket_name, VALUE options, VALUE observability_handler)
{
auto cluster = cb_backend_to_core_api_cluster(self);

Expand All @@ -613,12 +634,14 @@ cb_Backend_bucket_get(VALUE self, VALUE bucket_name, VALUE options)
try {
core::operations::management::bucket_get_request req{ cb_string_new(bucket_name) };
cb_extract_timeout(req, options);
auto parent_span = cb_create_parent_span(req, self);
std::promise<core::operations::management::bucket_get_response> promise;
auto f = promise.get_future();
cluster.execute(req, [promise = std::move(promise)](auto&& resp) mutable {
promise.set_value(std::forward<decltype(resp)>(resp));
});
auto resp = cb_wait_for_future(f);
cb_add_core_spans(observability_handler, std::move(parent_span));
if (resp.ctx.ec) {
cb_throw_error(resp.ctx,
fmt::format("unable to locate bucket \"{}\" on the cluster", req.name));
Expand All @@ -640,11 +663,11 @@ cb_Backend_bucket_get(VALUE self, VALUE bucket_name, VALUE options)
void
init_buckets(VALUE cBackend)
{
rb_define_method(cBackend, "bucket_create", cb_Backend_bucket_create, 2);
rb_define_method(cBackend, "bucket_update", cb_Backend_bucket_update, 2);
rb_define_method(cBackend, "bucket_drop", cb_Backend_bucket_drop, 2);
rb_define_method(cBackend, "bucket_flush", cb_Backend_bucket_flush, 2);
rb_define_method(cBackend, "bucket_get_all", cb_Backend_bucket_get_all, 1);
rb_define_method(cBackend, "bucket_get", cb_Backend_bucket_get, 2);
rb_define_method(cBackend, "bucket_create", cb_Backend_bucket_create, 3);
rb_define_method(cBackend, "bucket_update", cb_Backend_bucket_update, 3);
rb_define_method(cBackend, "bucket_drop", cb_Backend_bucket_drop, 3);
rb_define_method(cBackend, "bucket_flush", cb_Backend_bucket_flush, 3);
rb_define_method(cBackend, "bucket_get_all", cb_Backend_bucket_get_all, 2);
rb_define_method(cBackend, "bucket_get", cb_Backend_bucket_get, 3);
}
} // namespace couchbase::ruby
48 changes: 36 additions & 12 deletions ext/rcb_collections.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,15 @@
#include <ruby.h>

#include "rcb_backend.hxx"
#include "rcb_observability.hxx"
#include "rcb_utils.hxx"

namespace couchbase::ruby
{
namespace
{
VALUE
cb_Backend_scope_get_all(VALUE self, VALUE bucket_name, VALUE options)
cb_Backend_scope_get_all(VALUE self, VALUE bucket_name, VALUE options, VALUE observability_handler)
{
auto cluster = cb_backend_to_core_api_cluster(self);

Expand All @@ -50,12 +51,14 @@ cb_Backend_scope_get_all(VALUE self, VALUE bucket_name, VALUE options)
try {
core::operations::management::scope_get_all_request req{ cb_string_new(bucket_name) };
cb_extract_timeout(req, options);
auto parent_span = cb_create_parent_span(req, self);
std::promise<core::operations::management::scope_get_all_response> promise;
auto f = promise.get_future();
cluster.execute(req, [promise = std::move(promise)](auto&& resp) mutable {
promise.set_value(std::forward<decltype(resp)>(resp));
});
auto resp = cb_wait_for_future(f);
cb_add_core_spans(observability_handler, std::move(parent_span));
if (resp.ctx.ec) {
cb_throw_error(
resp.ctx,
Expand Down Expand Up @@ -97,7 +100,11 @@ cb_Backend_scope_get_all(VALUE self, VALUE bucket_name, VALUE options)
}

VALUE
cb_Backend_scope_create(VALUE self, VALUE bucket_name, VALUE scope_name, VALUE options)
cb_Backend_scope_create(VALUE self,
VALUE bucket_name,
VALUE scope_name,
VALUE options,
VALUE observability_handler)
{
auto cluster = cb_backend_to_core_api_cluster(self);

Expand All @@ -111,12 +118,14 @@ cb_Backend_scope_create(VALUE self, VALUE bucket_name, VALUE scope_name, VALUE o
core::operations::management::scope_create_request req{ cb_string_new(bucket_name),
cb_string_new(scope_name) };
cb_extract_timeout(req, options);
auto parent_span = cb_create_parent_span(req, self);
std::promise<core::operations::management::scope_create_response> promise;
auto f = promise.get_future();
cluster.execute(req, [promise = std::move(promise)](auto&& resp) mutable {
promise.set_value(std::forward<decltype(resp)>(resp));
});
auto resp = cb_wait_for_future(f);
cb_add_core_spans(options, std::move(parent_span));

if (resp.ctx.ec) {
cb_throw_error(resp.ctx,
Expand All @@ -135,7 +144,11 @@ cb_Backend_scope_create(VALUE self, VALUE bucket_name, VALUE scope_name, VALUE o
}

VALUE
cb_Backend_scope_drop(VALUE self, VALUE bucket_name, VALUE scope_name, VALUE options)
cb_Backend_scope_drop(VALUE self,
VALUE bucket_name,
VALUE scope_name,
VALUE options,
VALUE observability_handler)
{
auto cluster = cb_backend_to_core_api_cluster(self);

Expand All @@ -149,12 +162,14 @@ cb_Backend_scope_drop(VALUE self, VALUE bucket_name, VALUE scope_name, VALUE opt
core::operations::management::scope_drop_request req{ cb_string_new(bucket_name),
cb_string_new(scope_name) };
cb_extract_timeout(req, options);
auto parent_span = cb_create_parent_span(req, self);
std::promise<core::operations::management::scope_drop_response> promise;
auto f = promise.get_future();
cluster.execute(req, [promise = std::move(promise)](auto&& resp) mutable {
promise.set_value(std::forward<decltype(resp)>(resp));
});
auto resp = cb_wait_for_future(f);
cb_add_core_spans(options, std::move(parent_span));
if (resp.ctx.ec) {
cb_throw_error(resp.ctx,
fmt::format(R"(unable to drop the scope "{}" on the bucket "{}")",
Expand All @@ -177,7 +192,8 @@ cb_Backend_collection_create(VALUE self,
VALUE scope_name,
VALUE collection_name,
VALUE settings,
VALUE options)
VALUE options,
VALUE observability_handler)
{
auto cluster = cb_backend_to_core_api_cluster(self);

Expand Down Expand Up @@ -220,13 +236,15 @@ cb_Backend_collection_create(VALUE self,
req.history = RTEST(history);
}
}
auto parent_span = cb_create_parent_span(req, self);

std::promise<core::operations::management::collection_create_response> promise;
auto f = promise.get_future();
cluster.execute(req, [promise = std::move(promise)](auto&& resp) mutable {
promise.set_value(std::forward<decltype(resp)>(resp));
});
auto resp = cb_wait_for_future(f);
cb_add_core_spans(observability_handler, std::move(parent_span));
if (resp.ctx.ec) {
cb_throw_error(resp.ctx,
fmt::format(R"(unable create the collection "{}.{}" on the bucket "{}")",
Expand All @@ -250,7 +268,8 @@ cb_Backend_collection_update(VALUE self,
VALUE scope_name,
VALUE collection_name,
VALUE settings,
VALUE options)
VALUE options,
VALUE observability_handler)
{
auto cluster = cb_backend_to_core_api_cluster(self);

Expand Down Expand Up @@ -293,13 +312,15 @@ cb_Backend_collection_update(VALUE self,
req.history = RTEST(history);
}
}
auto parent_span = cb_create_parent_span(req, self);

std::promise<core::operations::management::collection_update_response> promise;
auto f = promise.get_future();
cluster.execute(req, [promise = std::move(promise)](auto&& resp) mutable {
promise.set_value(std::forward<decltype(resp)>(resp));
});
auto resp = cb_wait_for_future(f);
cb_add_core_spans(observability_handler, std::move(parent_span));
if (resp.ctx.ec) {
cb_throw_error(resp.ctx,
fmt::format(R"(unable update the collection "{}.{}" on the bucket "{}")",
Expand All @@ -322,7 +343,8 @@ cb_Backend_collection_drop(VALUE self,
VALUE bucket_name,
VALUE scope_name,
VALUE collection_name,
VALUE options)
VALUE options,
VALUE observability_handler)
{
auto cluster = cb_backend_to_core_api_cluster(self);

Expand All @@ -338,13 +360,15 @@ cb_Backend_collection_drop(VALUE self,
cb_string_new(scope_name),
cb_string_new(collection_name) };
cb_extract_timeout(req, options);
auto parent_span = cb_create_parent_span(req, self);

std::promise<core::operations::management::collection_drop_response> promise;
auto f = promise.get_future();
cluster.execute(req, [promise = std::move(promise)](auto&& resp) mutable {
promise.set_value(std::forward<decltype(resp)>(resp));
});
auto resp = cb_wait_for_future(f);
cb_add_core_spans(observability_handler, std::move(parent_span));
if (resp.ctx.ec) {
cb_throw_error(resp.ctx,
fmt::format(R"(unable to drop the collection "{}.{}" on the bucket "{}")",
Expand All @@ -367,11 +391,11 @@ cb_Backend_collection_drop(VALUE self,
void
init_collections(VALUE cBackend)
{
rb_define_method(cBackend, "scope_get_all", cb_Backend_scope_get_all, 2);
rb_define_method(cBackend, "scope_create", cb_Backend_scope_create, 3);
rb_define_method(cBackend, "scope_drop", cb_Backend_scope_drop, 3);
rb_define_method(cBackend, "collection_create", cb_Backend_collection_create, 5);
rb_define_method(cBackend, "collection_update", cb_Backend_collection_update, 5);
rb_define_method(cBackend, "collection_drop", cb_Backend_collection_drop, 4);
rb_define_method(cBackend, "scope_get_all", cb_Backend_scope_get_all, 3);
rb_define_method(cBackend, "scope_create", cb_Backend_scope_create, 4);
rb_define_method(cBackend, "scope_drop", cb_Backend_scope_drop, 4);
rb_define_method(cBackend, "collection_create", cb_Backend_collection_create, 6);
rb_define_method(cBackend, "collection_update", cb_Backend_collection_update, 6);
rb_define_method(cBackend, "collection_drop", cb_Backend_collection_drop, 5);
}
} // namespace couchbase::ruby
Loading
Loading