Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions ext/couchbase.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include "rcb_hdr_histogram.hxx"
#include "rcb_logger.hxx"
#include "rcb_multi.hxx"
#include "rcb_observability.hxx"
#include "rcb_query.hxx"
#include "rcb_range_scan.hxx"
#include "rcb_search.hxx"
Expand Down Expand Up @@ -66,5 +67,6 @@ Init_libcouchbase(void)
couchbase::ruby::init_extras(cBackend);
couchbase::ruby::init_logger_methods(cBackend);
couchbase::ruby::init_hdr_histogram(mCouchbase);
couchbase::ruby::init_observability(cBackend);
}
}
37 changes: 37 additions & 0 deletions ext/rcb_observability.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,11 @@
* limitations under the License.
*/

#include "rcb_backend.hxx"
#include "rcb_utils.hxx"

#include <core/cluster.hxx>
#include <core/cluster_label_listener.hxx>
#include <core/tracing/wrapper_sdk_tracer.hxx>

#include <ruby.h>
Expand Down Expand Up @@ -76,4 +79,38 @@ cb_add_core_spans(VALUE observability_handler,
rb_funcall(observability_handler, add_retries_func, ULONG2NUM(retry_attempts));
}
}

namespace
{
VALUE
cb_Backend_cluster_labels(VALUE self)
{
VALUE res = rb_hash_new();
{
auto cluster = cb_backend_to_core_api_cluster(self);
auto labels = cluster.cluster_label_listener()->cluster_labels();

static const auto sym_cluster_name = rb_id2sym(rb_intern("cluster_name"));
static const auto sym_cluster_uuid = rb_id2sym(rb_intern("cluster_uuid"));

if (labels.cluster_name.has_value()) {
rb_hash_aset(res, sym_cluster_name, cb_str_new(labels.cluster_name.value()));
} else {
rb_hash_aset(res, sym_cluster_name, Qnil);
}
if (labels.cluster_uuid.has_value()) {
rb_hash_aset(res, sym_cluster_uuid, cb_str_new(labels.cluster_uuid.value()));
} else {
rb_hash_aset(res, sym_cluster_uuid, Qnil);
}
}
return res;
}
} // namespace

void
init_observability(VALUE cBackend)
{
rb_define_method(cBackend, "cluster_labels", cb_Backend_cluster_labels, 0);
}
} // namespace couchbase::ruby
3 changes: 3 additions & 0 deletions ext/rcb_observability.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,7 @@ void
cb_add_core_spans(VALUE observability_handler,
std::shared_ptr<couchbase::core::tracing::wrapper_sdk_span> parent_span,
std::size_t retry_attempts);

void
init_observability(VALUE cBackend);
} // namespace couchbase::ruby
5 changes: 3 additions & 2 deletions lib/couchbase/cluster.rb
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,9 @@ def initialize(connection_string, *args)
end
end

@observability = Observability::Wrapper.new do |w|
@backend = Backend.new

@observability = Observability::Wrapper.new(backend: @backend) do |w|
w.tracer = if !open_options[:enable_tracing].nil? && !open_options[:enable_tracing]
Tracing::NoopTracer.new
elsif tracer.nil?
Expand Down Expand Up @@ -432,7 +434,6 @@ def initialize(connection_string, *args)
end
end

@backend = Backend.new
@backend.open(connection_string, credentials, open_options)
end

Expand Down
22 changes: 18 additions & 4 deletions lib/couchbase/utils/observability.rb
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,16 @@ class Wrapper
attr_accessor :tracer
attr_accessor :meter

def initialize
def initialize(backend:, tracer: nil, meter: nil)
@backend = backend
@tracer = tracer
@meter = meter

yield self if block_given?
end

def record_operation(op_name, parent_span, receiver, service = nil)
handler = Handler.new(op_name, parent_span, receiver, @tracer, @meter)
handler = Handler.new(@backend, op_name, parent_span, receiver, @tracer, @meter)
handler.add_service(service) unless service.nil?
begin
res = yield(handler)
Expand All @@ -57,9 +61,14 @@ def close
class Handler
attr_reader :op_span

def initialize(op_name, parent_span, receiver, tracer, meter)
def initialize(backend, op_name, parent_span, receiver, tracer, meter)
@tracer = tracer
@meter = meter

cluster_labels = backend.cluster_labels
@cluster_name = cluster_labels[:cluster_name]
@cluster_uuid = cluster_labels[:cluster_uuid]

@op_span = create_span(op_name, parent_span)
@meter_attributes = create_meter_attributes
@start_time = Time.now
Expand Down Expand Up @@ -195,14 +204,19 @@ def convert_backend_timestamp(backend_timestamp)
end

def create_meter_attributes
{
attrs = {
ATTR_SYSTEM_NAME => ATTR_VALUE_SYSTEM_NAME,
}
attrs[ATTR_CLUSTER_NAME] = @cluster_name unless @cluster_name.nil?
attrs[ATTR_CLUSTER_UUID] = @cluster_uuid unless @cluster_uuid.nil?
attrs
end

def create_span(name, parent, start_timestamp: nil)
span = @tracer.request_span(name, parent: parent, start_timestamp: start_timestamp)
span.set_attribute(ATTR_SYSTEM_NAME, ATTR_VALUE_SYSTEM_NAME)
span.set_attribute(ATTR_CLUSTER_NAME, @cluster_name) unless @cluster_name.nil?
span.set_attribute(ATTR_CLUSTER_UUID, @cluster_uuid) unless @cluster_uuid.nil?
span
end

Expand Down
7 changes: 7 additions & 0 deletions test/metrics_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ def test_get_and_replace
end

assert_operation_metrics(
env,
10,
operation_name: "get",
service: "kv",
Expand All @@ -51,6 +52,7 @@ def test_get_and_replace
collection_name: "_default",
)
assert_operation_metrics(
env,
10,
operation_name: "replace",
service: "kv",
Expand All @@ -66,6 +68,7 @@ def test_get_document_not_found
end

assert_operation_metrics(
env,
1,
operation_name: "get",
service: "kv",
Expand All @@ -80,6 +83,7 @@ def test_upsert
@collection.upsert(uniq_id(:foo), {foo: "bar"})

assert_operation_metrics(
env,
1,
operation_name: "upsert",
service: "kv",
Expand All @@ -95,6 +99,7 @@ def test_cluster_level_query
@cluster.query("SELECT 1=1")

assert_operation_metrics(
env,
1,
operation_name: "query",
service: "query",
Expand All @@ -107,6 +112,7 @@ def test_scope_level_query
@bucket.default_scope.query("SELECT 1=1")

assert_operation_metrics(
env,
1,
operation_name: "query",
service: "query",
Expand All @@ -123,6 +129,7 @@ def test_query_parsing_failure
end

assert_operation_metrics(
env,
1,
operation_name: "query",
service: "query",
Expand Down
8 changes: 8 additions & 0 deletions test/query_index_manager_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ def test_get_all_indexes

assert_equal 1, get_all_indexes_spans.size
assert_http_span(
env,
get_all_indexes_spans.first,
"manager_query_get_all_indexes",
parent: @parent_span,
Expand All @@ -79,6 +80,7 @@ def test_get_all_indexes
assert_equal 2, create_index_spans.size
create_index_spans.each do |span|
assert_http_span(
env,
span,
"manager_query_create_index",
parent: @parent_span,
Expand Down Expand Up @@ -129,6 +131,7 @@ def test_query_indexes
assert_equal 4, get_all_indexes_root_spans.size
get_all_indexes_root_spans.each do |span|
assert_http_span(
env,
span,
"manager_query_get_all_indexes",
parent: @parent_span,
Expand All @@ -141,6 +144,7 @@ def test_query_indexes

assert_equal 1, create_primary_index_spans.size
assert_http_span(
env,
create_primary_index_spans.first,
"manager_query_create_primary_index",
parent: @parent_span,
Expand All @@ -153,6 +157,7 @@ def test_query_indexes
assert_equal 2, create_index_spans.size
create_index_spans.each do |span|
assert_http_span(
env,
span,
"manager_query_create_index",
parent: @parent_span,
Expand All @@ -165,6 +170,7 @@ def test_query_indexes

assert_equal 1, build_deferred_indexes_spans.size
assert_http_span(
env,
build_deferred_indexes_spans.first,
"manager_query_build_deferred_indexes",
parent: @parent_span,
Expand All @@ -177,6 +183,7 @@ def test_query_indexes
assert_equal 2, watch_indexes_spans.size
watch_indexes_spans.each do |span|
assert_http_span(
env,
span,
"manager_query_watch_indexes",
parent: @parent_span,
Expand All @@ -187,6 +194,7 @@ def test_query_indexes
assert_predicate span.children.size, :positive?
span.children.each do |child_span|
assert_http_span(
env,
child_span,
"manager_query_get_all_indexes",
parent: span,
Expand Down
31 changes: 30 additions & 1 deletion test/test_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,10 @@ def supports_multiple_xattr_keys_mutation?
def supports_server_group_replica_reads?
@version >= Gem::Version.create("7.6.2")
end

def supports_cluster_labels?
@version >= Gem::Version.create("7.6.4")
end
end

require "couchbase"
Expand Down Expand Up @@ -177,7 +181,7 @@ def bucket
end

def management_endpoint
@management_endpoint = ENV.fetch("TEST_MANAGEMENT_ENDPOINT") do
@management_endpoint ||= ENV.fetch("TEST_MANAGEMENT_ENDPOINT") do
if connection_string
parsed = Couchbase::Backend.parse_connection_string(connection_string)
first_node_address = parsed[:nodes].first[:address]
Expand Down Expand Up @@ -216,6 +220,31 @@ def consistency
TestUtilities::MockConsistencyHelper.new
end
end

def cluster_name
fetch_cluster_labels if @cluster_labels.nil?
@cluster_labels[:cluster_name]
end

def cluster_uuid
fetch_cluster_labels if @cluster_labels.nil?
@cluster_labels[:cluster_uuid]
end

private

def fetch_cluster_labels
uri = URI("#{management_endpoint}/pools/default/nodeServices")
req = Net::HTTP::Get.new(uri)
req.basic_auth(username, password)
resp = Net::HTTP.start(uri.hostname, uri.port) { |http| http.request(req) }
body = JSON.parse(resp.body)

@cluster_labels = {
cluster_name: body["clusterName"],
cluster_uuid: body["clusterUUID"],
}
end
end

module TestUtilities
Expand Down
14 changes: 7 additions & 7 deletions test/tracing_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def test_get
spans = @tracer.spans("get")

assert_equal 1, spans.size
assert_kv_span spans[0], "get", parent
assert_kv_span env, spans[0], "get", parent
end

def test_upsert
Expand All @@ -53,8 +53,8 @@ def test_upsert
spans = @tracer.spans("upsert")

assert_equal 1, spans.size
assert_kv_span spans[0], "upsert", parent
assert_has_request_encoding_span spans[0]
assert_kv_span env, spans[0], "upsert", parent
assert_has_request_encoding_span env, spans[0]
end

def test_replace
Expand All @@ -67,8 +67,8 @@ def test_replace
spans = @tracer.spans("replace")

assert_equal 1, spans.size
assert_kv_span spans[0], "replace", parent
assert_has_request_encoding_span spans[0]
assert_kv_span env, spans[0], "replace", parent
assert_has_request_encoding_span env, spans[0]
end

def test_replace_durable
Expand All @@ -84,8 +84,8 @@ def test_replace_durable
spans = @tracer.spans("replace")

assert_equal 1, spans.size
assert_kv_span spans[0], "replace", parent
assert_has_request_encoding_span spans[0]
assert_kv_span env, spans[0], "replace", parent
assert_has_request_encoding_span env, spans[0]
assert_equal "persist_majority", spans[0].attributes["couchbase.durability"]
end
end
Expand Down
6 changes: 6 additions & 0 deletions test/utils/metrics.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
require_relative "metrics/test_value_recorder"

def assert_operation_metrics(
env,
count,
operation_name:,
service: nil,
Expand All @@ -31,6 +32,11 @@ def assert_operation_metrics(
"db.operation.name" => operation_name,
}

if env.server_version.supports_cluster_labels?
attributes["couchbase.cluster.name"] = env.cluster_name
attributes["couchbase.cluster.uuid"] = env.cluster_uuid
end

attributes["couchbase.service"] = service unless service.nil?
attributes["db.namespace"] = bucket_name unless bucket_name.nil?
attributes["couchbase.scope.name"] = scope_name unless scope_name.nil?
Expand Down
Loading
Loading