From d634eac305fb2926e93ebf4ba1653cf292f5b6dd Mon Sep 17 00:00:00 2001 From: Lukas Hering Date: Mon, 22 Dec 2025 22:42:06 -0500 Subject: [PATCH 1/4] initial implementation for aiobotocore instrumentation --- .../pyproject.toml | 1 + .../instrumentation/botocore/__init__.py | 278 +++++++++++------- .../botocore/extensions/__init__.py | 29 +- .../botocore/extensions/registry.py | 202 +++++++++++++ .../instrumentation/botocore/package.py | 1 + .../instrumentation/botocore/utils.py | 16 + 6 files changed, 402 insertions(+), 125 deletions(-) create mode 100644 instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/extensions/registry.py diff --git a/instrumentation/opentelemetry-instrumentation-botocore/pyproject.toml b/instrumentation/opentelemetry-instrumentation-botocore/pyproject.toml index cc4f480c45..6ade1d7fb4 100644 --- a/instrumentation/opentelemetry-instrumentation-botocore/pyproject.toml +++ b/instrumentation/opentelemetry-instrumentation-botocore/pyproject.toml @@ -38,6 +38,7 @@ instruments = [ [project.entry-points.opentelemetry_instrumentor] botocore = "opentelemetry.instrumentation.botocore:BotocoreInstrumentor" +aiobotocore = "opentelemetry.instrumentation.botocore.aio:AiobotocoreInstrumentor" [project.urls] Homepage = "https://github.com/open-telemetry/opentelemetry-python-contrib/tree/main/instrumentation/opentelemetry-instrumentation-botocore" diff --git a/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/__init__.py b/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/__init__.py index a5c4bd1c19..edfcfc9c87 100644 --- a/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/__init__.py @@ -97,25 +97,24 @@ def response_hook(span, service_name, operation_name, result): """ import logging -from typing import Any, Callable, Collection, Dict, Optional, Tuple +from typing import Any, Collection, Dict, Optional, Tuple from botocore.client import BaseClient from botocore.endpoint import Endpoint from botocore.exceptions import ClientError from wrapt import wrap_function_wrapper -from opentelemetry._logs import get_logger from opentelemetry.instrumentation.botocore.extensions import ( - _find_extension, - _has_extension, + _BOTOCORE_EXTENSIONS, _AIOBOTOCORE_EXTENSIONS, ) +from opentelemetry.instrumentation.botocore.extensions.registry import ExtensionRegistry from opentelemetry.instrumentation.botocore.extensions.types import ( _AwsSdkCallContext, _AwsSdkExtension, _BotocoreInstrumentorContext, ) -from opentelemetry.instrumentation.botocore.package import _instruments -from opentelemetry.instrumentation.botocore.utils import get_server_attributes +from opentelemetry.instrumentation.botocore.package import _instruments, _aiobotocore_instruments +from opentelemetry.instrumentation.botocore.utils import get_server_attributes, _safe_invoke from opentelemetry.instrumentation.botocore.version import __version__ from opentelemetry.instrumentation.instrumentor import BaseInstrumentor from opentelemetry.instrumentation.utils import ( @@ -123,13 +122,11 @@ def response_hook(span, service_name, operation_name, result): suppress_http_instrumentation, unwrap, ) -from opentelemetry.metrics import Instrument, Meter, get_meter -from opentelemetry.propagators.aws.aws_xray_propagator import AwsXRayPropagator +from opentelemetry.propagators.aws.aws_xray_propagator import AwsXRayPropagator, TRACE_HEADER_KEY from opentelemetry.semconv._incubating.attributes.cloud_attributes import ( CLOUD_REGION, ) from opentelemetry.semconv.trace import SpanAttributes -from opentelemetry.trace import get_tracer from opentelemetry.trace.span import Span logger = logging.getLogger(__name__) @@ -145,6 +142,7 @@ def __init__(self): super().__init__() self.request_hook = None self.response_hook = None + self.extension_registry = None self.propagator = AwsXRayPropagator() def instrumentation_dependencies(self) -> Collection[str]: @@ -152,16 +150,6 @@ def instrumentation_dependencies(self) -> Collection[str]: def _instrument(self, **kwargs): # pylint: disable=attribute-defined-outside-init - - # tracers are lazy initialized per-extension in _get_tracer - self._tracers = {} - # loggers are lazy initialized per-extension in _get_logger - self._loggers = {} - # meters are lazy initialized per-extension in _get_meter - self._meters = {} - # metrics are lazy initialized per-extension in _get_metrics - self._metrics: Dict[str, Dict[str, Instrument]] = {} - self.request_hook = kwargs.get("request_hook") self.response_hook = kwargs.get("response_hook") @@ -169,9 +157,12 @@ def _instrument(self, **kwargs): if propagator is not None: self.propagator = propagator - self.tracer_provider = kwargs.get("tracer_provider") - self.logger_provider = kwargs.get("logger_provider") - self.meter_provider = kwargs.get("meter_provider") + self.extension_registry = ExtensionRegistry( + _BOTOCORE_EXTENSIONS, + kwargs.get("tracer_provider"), + kwargs.get("logger_provider"), + kwargs.get("meter_provider"), + ) wrap_function_wrapper( "botocore.client", @@ -185,84 +176,161 @@ def _instrument(self, **kwargs): self._patched_endpoint_prepare_request, ) - @staticmethod - def _get_instrumentation_name(extension: _AwsSdkExtension) -> str: - has_extension = _has_extension(extension._call_context) - return ( - f"{__name__}.{extension._call_context.service}" - if has_extension - else __name__ + def _uninstrument(self, **kwargs): + unwrap(BaseClient, "_make_api_call") + unwrap(Endpoint, "prepare_request") + + # pylint: disable=unused-argument + def _patched_endpoint_prepare_request( + self, wrapped, instance, args, kwargs + ): + request = args[0] + headers = request.headers + + + # There may be situations where both Botocore and Aiobotocore are + # instrumented at the same time. To avoid double-injection of headers, + # we add a check to see if the header is already present. If it is, + # we skip injection. + if TRACE_HEADER_KEY in headers: + return wrapped(*args, **kwargs) + + # Only the x-ray header is propagated by AWS services. Using any + # other propagator will lose the trace context. + self.propagator.inject(headers) + + return wrapped(*args, **kwargs) + + # pylint: disable=too-many-branches + def _patched_api_call(self, original_func, instance, args, kwargs): + if not is_instrumentation_enabled(): + return original_func(*args, **kwargs) + + call_context = _determine_call_context(instance, args) + if call_context is None: + return original_func(*args, **kwargs) + + extension = self.extension_registry.get_extension(call_context) + if not extension.should_trace_service_call(): + return original_func(*args, **kwargs) + + attributes = { + SpanAttributes.RPC_SYSTEM: "aws-api", + SpanAttributes.RPC_SERVICE: call_context.service_id, + SpanAttributes.RPC_METHOD: call_context.operation, + CLOUD_REGION: call_context.region, + **get_server_attributes(call_context.endpoint_url), + } + + _safe_invoke(extension.extract_attributes, attributes) + end_span_on_exit = extension.should_end_span_on_exit() + + tracer = self.extension_registry.get_tracer(extension) + meter = self.extension_registry.get_meter(extension) + metrics = self.extension_registry.get_metrics(extension, meter) + instrumentor_ctx = _BotocoreInstrumentorContext( + logger=self.extension_registry.get_logger(extension), + metrics=metrics, ) + with tracer.start_as_current_span( + call_context.span_name, + kind=call_context.span_kind, + attributes=attributes, + # tracing streaming services require to close the span manually + # at a later time after the stream has been consumed + end_on_exit=end_span_on_exit, + ) as span: + _safe_invoke(extension.before_service_call, span, instrumentor_ctx) + self._call_request_hook(span, call_context) - def _get_tracer(self, extension: _AwsSdkExtension): - """This is a multiplexer in order to have a tracer per extension""" + try: + with suppress_http_instrumentation(): + result = None + try: + result = original_func(*args, **kwargs) + except ClientError as error: + result = getattr(error, "response", None) + _apply_response_attributes(span, result) + _safe_invoke( + extension.on_error, span, error, instrumentor_ctx + ) + raise + _apply_response_attributes(span, result) + _safe_invoke( + extension.on_success, span, result, instrumentor_ctx + ) + finally: + _safe_invoke(extension.after_service_call, instrumentor_ctx) + self._call_response_hook(span, call_context, result) - instrumentation_name = self._get_instrumentation_name(extension) - tracer = self._tracers.get(instrumentation_name) - if tracer: - return tracer + return result - schema_version = extension.tracer_schema_version() - self._tracers[instrumentation_name] = get_tracer( - instrumentation_name, - __version__, - self.tracer_provider, - schema_url=f"https://opentelemetry.io/schemas/{schema_version}", + def _call_request_hook(self, span: Span, call_context: _AwsSdkCallContext): + if not callable(self.request_hook): + return + self.request_hook( + span, + call_context.service, + call_context.operation, + call_context.params, ) - return self._tracers[instrumentation_name] - - def _get_logger(self, extension: _AwsSdkExtension): - """This is a multiplexer in order to have a logger per extension""" - - instrumentation_name = self._get_instrumentation_name(extension) - instrumentation_logger = self._loggers.get(instrumentation_name) - if instrumentation_logger: - return instrumentation_logger - - schema_version = extension.event_logger_schema_version() - self._loggers[instrumentation_name] = get_logger( - instrumentation_name, - "", - schema_url=f"https://opentelemetry.io/schemas/{schema_version}", - logger_provider=self.logger_provider, + + def _call_response_hook( + self, span: Span, call_context: _AwsSdkCallContext, result + ): + if not callable(self.response_hook): + return + self.response_hook( + span, call_context.service, call_context.operation, result ) - return self._loggers[instrumentation_name] - def _get_meter(self, extension: _AwsSdkExtension): - """This is a multiplexer in order to have a meter per extension""" +class AiobotocoreInstrumentor(BaseInstrumentor): + """An instrumentor for Aiobotocore. - instrumentation_name = self._get_instrumentation_name(extension) - meter = self._meters.get(instrumentation_name) - if meter: - return meter + See `BaseInstrumentor` + """ - schema_version = extension.meter_schema_version() - self._meters[instrumentation_name] = get_meter( - instrumentation_name, - "", - schema_url=f"https://opentelemetry.io/schemas/{schema_version}", - meter_provider=self.meter_provider, - ) + def __init__(self): + super().__init__() + self.request_hook = None + self.response_hook = None + self.extension_registry = None + self.propagator = AwsXRayPropagator() - return self._meters[instrumentation_name] + def instrumentation_dependencies(self) -> Collection[str]: + return _instruments + _aiobotocore_instruments - def _get_metrics( - self, extension: _AwsSdkExtension, meter: Meter - ) -> Dict[str, Instrument]: - """This is a multiplexer for lazy initialization of metrics required by extensions""" - instrumentation_name = self._get_instrumentation_name(extension) - metrics = self._metrics.get(instrumentation_name) - if metrics is not None: - return metrics + def _instrument(self, **kwargs): + # pylint: disable=attribute-defined-outside-init + self.request_hook = kwargs.get("request_hook") + self.response_hook = kwargs.get("response_hook") - self._metrics.setdefault(instrumentation_name, {}) - metrics = self._metrics[instrumentation_name] - _safe_invoke(extension.setup_metrics, meter, metrics) - return metrics + propagator = kwargs.get("propagator") + if propagator is not None: + self.propagator = propagator + + self.extension_registry = ExtensionRegistry( + _AIOBOTOCORE_EXTENSIONS, + kwargs.get("tracer_provider"), + kwargs.get("logger_provider"), + kwargs.get("meter_provider"), + ) + + wrap_function_wrapper( + "aiobotocore.client", + "AioBaseClient._make_api_call", + self._patched_api_call, + ) + + wrap_function_wrapper( + "botocore.endpoint", + "Endpoint.prepare_request", + self._patched_endpoint_prepare_request, + ) def _uninstrument(self, **kwargs): - unwrap(BaseClient, "_make_api_call") + unwrap("aiobotocore.client.AioBaseClient", "_make_api_call") unwrap(Endpoint, "prepare_request") # pylint: disable=unused-argument @@ -272,6 +340,13 @@ def _patched_endpoint_prepare_request( request = args[0] headers = request.headers + # There may be situations where both Botocore and Aiobotocore are + # instrumented at the same time. To avoid double-injection of headers, + # we add a check to see if the header is already present. If it is, + # we skip injection. + if TRACE_HEADER_KEY in headers: + return wrapped(*args, **kwargs) + # Only the x-ray header is propagated by AWS services. Using any # other propagator will lose the trace context. self.propagator.inject(headers) @@ -279,17 +354,17 @@ def _patched_endpoint_prepare_request( return wrapped(*args, **kwargs) # pylint: disable=too-many-branches - def _patched_api_call(self, original_func, instance, args, kwargs): + async def _patched_api_call(self, original_func, instance, args, kwargs): if not is_instrumentation_enabled(): - return original_func(*args, **kwargs) + return await original_func(*args, **kwargs) call_context = _determine_call_context(instance, args) if call_context is None: - return original_func(*args, **kwargs) + return await original_func(*args, **kwargs) - extension = _find_extension(call_context) + extension = self.extension_registry.get_extension(call_context) if not extension.should_trace_service_call(): - return original_func(*args, **kwargs) + return await original_func(*args, **kwargs) attributes = { SpanAttributes.RPC_SYSTEM: "aws-api", @@ -302,11 +377,11 @@ def _patched_api_call(self, original_func, instance, args, kwargs): _safe_invoke(extension.extract_attributes, attributes) end_span_on_exit = extension.should_end_span_on_exit() - tracer = self._get_tracer(extension) - meter = self._get_meter(extension) - metrics = self._get_metrics(extension, meter) + tracer = self.extension_registry.get_tracer(extension) + meter = self.extension_registry.get_meter(extension) + metrics = self.extension_registry.get_metrics(extension, meter) instrumentor_ctx = _BotocoreInstrumentorContext( - logger=self._get_logger(extension), + logger=self.extension_registry.get_logger(extension), metrics=metrics, ) with tracer.start_as_current_span( @@ -324,7 +399,7 @@ def _patched_api_call(self, original_func, instance, args, kwargs): with suppress_http_instrumentation(): result = None try: - result = original_func(*args, **kwargs) + result = await original_func(*args, **kwargs) except ClientError as error: result = getattr(error, "response", None) _apply_response_attributes(span, result) @@ -353,7 +428,7 @@ def _call_request_hook(self, span: Span, call_context: _AwsSdkCallContext): ) def _call_response_hook( - self, span: Span, call_context: _AwsSdkCallContext, result + self, span: Span, call_context: _AwsSdkCallContext, result ): if not callable(self.response_hook): return @@ -411,14 +486,3 @@ def _determine_call_context( # extracting essential attributes ('service' and 'operation') failed. logger.error("Error when initializing call context", exc_info=ex) return None - - -def _safe_invoke(function: Callable, *args): - function_name = "" - try: - function_name = function.__name__ - function(*args) - except Exception as ex: # pylint:disable=broad-except - logger.error( - "Error when invoking function '%s'", function_name, exc_info=ex - ) diff --git a/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/extensions/__init__.py b/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/extensions/__init__.py index dd8ba24e9f..4cf96fee19 100644 --- a/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/extensions/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/extensions/__init__.py @@ -27,11 +27,10 @@ def _lazy_load(module, cls): def loader(): imported_mod = importlib.import_module(module, __name__) return getattr(imported_mod, cls, None) - return loader -_KNOWN_EXTENSIONS = { +_BOTOCORE_EXTENSIONS = { "bedrock-runtime": _lazy_load(".bedrock", "_BedrockRuntimeExtension"), "dynamodb": _lazy_load(".dynamodb", "_DynamoDbExtension"), "lambda": _lazy_load(".lmbd", "_LambdaExtension"), @@ -43,19 +42,13 @@ def loader(): "sqs": _lazy_load(".sqs", "_SqsExtension"), } - -def _has_extension(call_context: _AwsSdkCallContext) -> bool: - return call_context.service in _KNOWN_EXTENSIONS - - -def _find_extension(call_context: _AwsSdkCallContext) -> _AwsSdkExtension: - try: - loader = _KNOWN_EXTENSIONS.get(call_context.service) - if loader is None: - return _AwsSdkExtension(call_context) - - extension_cls = loader() - return extension_cls(call_context) - except Exception as ex: # pylint: disable=broad-except - _logger.error("Error when loading extension: %s", ex) - return _AwsSdkExtension(call_context) +_AIOBOTOCORE_EXTENSIONS = { + "dynamodb": _lazy_load(".dynamodb", "_DynamoDbExtension"), + "lambda": _lazy_load(".lmbd", "_LambdaExtension"), + "secretsmanager": _lazy_load( + ".secretsmanager", "_SecretsManagerExtension" + ), + "stepfunctions": _lazy_load(".sfns", "_StepFunctionsExtension"), + "sns": _lazy_load(".sns", "_SnsExtension"), + "sqs": _lazy_load(".sqs", "_SqsExtension"), +} diff --git a/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/extensions/registry.py b/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/extensions/registry.py new file mode 100644 index 0000000000..eb54a07e4d --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/extensions/registry.py @@ -0,0 +1,202 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from __future__ import annotations + +from typing import Optional, TYPE_CHECKING, Mapping, Callable +import logging + +from opentelemetry.instrumentation.botocore import __name__ as package_name, __version__, _safe_invoke + +if TYPE_CHECKING: + from opentelemetry.trace import Tracer, TracerProvider, get_tracer + from opentelemetry.metrics import Meter, Instrument, MeterProvider, get_meter + from opentelemetry._logs import Logger, LoggerProvider, get_logger + from opentelemetry.instrumentation.botocore.extensions import _AwsSdkExtension, _AwsSdkCallContext + + +_logger = logging.getLogger(__name__) + + +class ExtensionRegistry: + """ + Registry for AWS SDK extensions that manages extension lookup and + associated OpenTelemetry instrumentation components (tracers, loggers, meters, metrics). + """ + + def __init__( + self, + extensions: Mapping[str, Callable[[], type[_AwsSdkExtension]]], + tracer_provider: Optional[TracerProvider] = None, + logger_provider: Optional[LoggerProvider] = None, + meter_provider: Optional[MeterProvider] = None, + ): + self._extensions: Mapping[str, Callable[[], type[_AwsSdkExtension]]] = extensions + self._tracer_provider: TracerProvider = tracer_provider + self._logger_provider: LoggerProvider = logger_provider + self._meter_provider: MeterProvider = meter_provider + self._tracers: dict[str, Tracer] = {} + self._loggers: dict[str, Logger] = {} + self._meters: dict[str, Meter] = {} + self._metrics: dict[str, dict[str, Instrument]] = {} + + def get_extension(self, call_context: _AwsSdkCallContext) -> _AwsSdkExtension: + """ + Get the appropriate extension for a given call context. + + Args: + call_context: The AWS SDK call context + + Returns: + The matching extension for the service/operation + """ + try: + loader: Callable[[], type[_AwsSdkExtension]] = self._extensions.get(call_context.service) + if loader is None: + return _AwsSdkExtension(call_context) + extension_cls = loader() + return extension_cls(call_context) + except Exception as exc: # pylint: disable=broad-except + _logger.error("Error when loading extension: %s", exc) + return _AwsSdkExtension(call_context) + + def has_extension(self, call_context: _AwsSdkCallContext) -> bool: + """ + Check if a dedicated extension exists for the given call context. + + Args: + call_context: The AWS SDK call context + + Returns: + True if a service-specific extension exists, False otherwise + """ + return call_context.service in self._extensions + + def get_instrumentation_name(self, extension: _AwsSdkExtension) -> str: + """ + Get the instrumentation name for an extension. + + Service-specific extensions get a namespaced name (e.g., 'module.s3'), + while the default extension uses just the module name. + + Args: + extension: The AWS SDK extension + + Returns: + The instrumentation name string + """ + if self.has_extension(extension._call_context): + return f"{package_name}.{extension._call_context.service}" + return package_name + + def get_tracer(self, extension: _AwsSdkExtension) -> Tracer: + """ + Get or create a tracer for the given extension. + + Tracers are cached per instrumentation name for reuse. + + Args: + extension: The AWS SDK extension + + Returns: + A configured Tracer instance + """ + instrumentation_name: str = self.get_instrumentation_name(extension) + if instrumentation_name in self._tracers: + return self._tracers[instrumentation_name] + + schema_version: str = extension.tracer_schema_version() + tracer: Tracer = get_tracer( + instrumentation_name, + __version__, + self._tracer_provider, + schema_url=f"https://opentelemetry.io/schemas/{schema_version}", + ) + self._tracers[instrumentation_name] = tracer + return tracer + + def get_logger(self, extension: _AwsSdkExtension): + """ + Get or create a logger for the given extension. + + Loggers are cached per instrumentation name for reuse. + + Args: + extension: The AWS SDK extension + + Returns: + A configured Logger instance + """ + instrumentation_name: str = self.get_instrumentation_name(extension) + if instrumentation_name in self._loggers: + return self._loggers[instrumentation_name] + + schema_version: str = extension.event_logger_schema_version() + logger: Logger = get_logger( + instrumentation_name, + "", + schema_url=f"https://opentelemetry.io/schemas/{schema_version}", + logger_provider=self._logger_provider, + ) + self._loggers[instrumentation_name] = logger + return logger + + def get_meter(self, extension: _AwsSdkExtension) -> Meter: + """ + Get or create a meter for the given extension. + + Meters are cached per instrumentation name for reuse. + + Args: + extension: The AWS SDK extension + + Returns: + A configured Meter instance + """ + instrumentation_name: str = self.get_instrumentation_name(extension) + if instrumentation_name in self._meters: + return self._meters[instrumentation_name] + + schema_version: str = extension.meter_schema_version() + meter = get_meter( + instrumentation_name, + "", + schema_url=f"https://opentelemetry.io/schemas/{schema_version}", + meter_provider=self._meter_provider, + ) + self._meters[instrumentation_name] = meter + return meter + + def get_metrics( + self, extension: _AwsSdkExtension + ) -> dict[str, Instrument]: + """ + Get or create metrics for the given extension. + + Metrics are lazily initialized by calling the extension's setup_metrics method. + + Args: + extension: The AWS SDK extension + + Returns: + A dictionary mapping metric names to Instrument instances + """ + instrumentation_name: str = self.get_instrumentation_name(extension) + if instrumentation_name in self._metrics: + return self._metrics[instrumentation_name] + + meter: Meter = self.get_meter(extension) + metrics: dict[str, Instrument] = {} + _safe_invoke(extension.setup_metrics, meter, metrics) + self._metrics[instrumentation_name] = metrics + return metrics diff --git a/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/package.py b/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/package.py index a06b9c206b..051e4db308 100644 --- a/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/package.py +++ b/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/package.py @@ -14,3 +14,4 @@ _instruments = ("botocore ~= 1.0",) +_aiobotocore_instruments = ("aiobotocore ~= 2.0",) diff --git a/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/utils.py b/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/utils.py index 4309e6e9bd..8ac9938435 100644 --- a/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/utils.py +++ b/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/utils.py @@ -13,6 +13,8 @@ # limitations under the License. from __future__ import annotations +import logging +from typing import Callable from urllib.parse import urlparse from opentelemetry.semconv._incubating.attributes import ( @@ -21,6 +23,9 @@ from opentelemetry.util.types import AttributeValue +_logger = logging.getLogger(__name__) + + def get_server_attributes(endpoint_url: str) -> dict[str, AttributeValue]: """Extract server.* attributes from AWS endpoint URL.""" parsed = urlparse(endpoint_url) @@ -29,3 +34,14 @@ def get_server_attributes(endpoint_url: str) -> dict[str, AttributeValue]: attributes[ServerAttributes.SERVER_ADDRESS] = parsed.hostname attributes[ServerAttributes.SERVER_PORT] = parsed.port or 443 return attributes + + +def _safe_invoke(function: Callable, *args): + function_name = "" + try: + function_name = function.__name__ + function(*args) + except Exception as ex: # pylint:disable=broad-except + _logger.error( + "Error when invoking function '%s'", function_name, exc_info=ex + ) From f625693824cd4be15ca8a1ad7d84ec32f05bbcb8 Mon Sep 17 00:00:00 2001 From: Lukas Hering Date: Mon, 22 Dec 2025 23:00:48 -0500 Subject: [PATCH 2/4] fix failing tests --- .../pyproject.toml | 2 +- .../instrumentation/botocore/__init__.py | 39 +++++++++++------ .../botocore/extensions/__init__.py | 6 +-- .../botocore/extensions/registry.py | 43 +++++++++++++------ .../instrumentation/botocore/utils.py | 1 - 5 files changed, 56 insertions(+), 35 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-botocore/pyproject.toml b/instrumentation/opentelemetry-instrumentation-botocore/pyproject.toml index 6ade1d7fb4..42450e1868 100644 --- a/instrumentation/opentelemetry-instrumentation-botocore/pyproject.toml +++ b/instrumentation/opentelemetry-instrumentation-botocore/pyproject.toml @@ -38,7 +38,7 @@ instruments = [ [project.entry-points.opentelemetry_instrumentor] botocore = "opentelemetry.instrumentation.botocore:BotocoreInstrumentor" -aiobotocore = "opentelemetry.instrumentation.botocore.aio:AiobotocoreInstrumentor" +aiobotocore = "opentelemetry.instrumentation.botocore:AiobotocoreInstrumentor" [project.urls] Homepage = "https://github.com/open-telemetry/opentelemetry-python-contrib/tree/main/instrumentation/opentelemetry-instrumentation-botocore" diff --git a/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/__init__.py b/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/__init__.py index edfcfc9c87..17051a9f29 100644 --- a/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/__init__.py @@ -105,24 +105,34 @@ def response_hook(span, service_name, operation_name, result): from wrapt import wrap_function_wrapper from opentelemetry.instrumentation.botocore.extensions import ( - _BOTOCORE_EXTENSIONS, _AIOBOTOCORE_EXTENSIONS, + _AIOBOTOCORE_EXTENSIONS, + _BOTOCORE_EXTENSIONS, +) +from opentelemetry.instrumentation.botocore.extensions.registry import ( + ExtensionRegistry, ) -from opentelemetry.instrumentation.botocore.extensions.registry import ExtensionRegistry from opentelemetry.instrumentation.botocore.extensions.types import ( _AwsSdkCallContext, - _AwsSdkExtension, _BotocoreInstrumentorContext, ) -from opentelemetry.instrumentation.botocore.package import _instruments, _aiobotocore_instruments -from opentelemetry.instrumentation.botocore.utils import get_server_attributes, _safe_invoke -from opentelemetry.instrumentation.botocore.version import __version__ +from opentelemetry.instrumentation.botocore.package import ( + _aiobotocore_instruments, + _instruments, +) +from opentelemetry.instrumentation.botocore.utils import ( + _safe_invoke, + get_server_attributes, +) from opentelemetry.instrumentation.instrumentor import BaseInstrumentor from opentelemetry.instrumentation.utils import ( is_instrumentation_enabled, suppress_http_instrumentation, unwrap, ) -from opentelemetry.propagators.aws.aws_xray_propagator import AwsXRayPropagator, TRACE_HEADER_KEY +from opentelemetry.propagators.aws.aws_xray_propagator import ( + TRACE_HEADER_KEY, + AwsXRayPropagator, +) from opentelemetry.semconv._incubating.attributes.cloud_attributes import ( CLOUD_REGION, ) @@ -142,7 +152,9 @@ def __init__(self): super().__init__() self.request_hook = None self.response_hook = None - self.extension_registry = None + self.extension_registry = ExtensionRegistry( + __name__, _BOTOCORE_EXTENSIONS, None, None, None + ) self.propagator = AwsXRayPropagator() def instrumentation_dependencies(self) -> Collection[str]: @@ -158,6 +170,7 @@ def _instrument(self, **kwargs): self.propagator = propagator self.extension_registry = ExtensionRegistry( + __name__, _BOTOCORE_EXTENSIONS, kwargs.get("tracer_provider"), kwargs.get("logger_provider"), @@ -187,7 +200,6 @@ def _patched_endpoint_prepare_request( request = args[0] headers = request.headers - # There may be situations where both Botocore and Aiobotocore are # instrumented at the same time. To avoid double-injection of headers, # we add a check to see if the header is already present. If it is, @@ -226,8 +238,7 @@ def _patched_api_call(self, original_func, instance, args, kwargs): end_span_on_exit = extension.should_end_span_on_exit() tracer = self.extension_registry.get_tracer(extension) - meter = self.extension_registry.get_meter(extension) - metrics = self.extension_registry.get_metrics(extension, meter) + metrics = self.extension_registry.get_metrics(extension) instrumentor_ctx = _BotocoreInstrumentorContext( logger=self.extension_registry.get_logger(extension), metrics=metrics, @@ -311,6 +322,7 @@ def _instrument(self, **kwargs): self.propagator = propagator self.extension_registry = ExtensionRegistry( + __name__, _AIOBOTOCORE_EXTENSIONS, kwargs.get("tracer_provider"), kwargs.get("logger_provider"), @@ -378,8 +390,7 @@ async def _patched_api_call(self, original_func, instance, args, kwargs): end_span_on_exit = extension.should_end_span_on_exit() tracer = self.extension_registry.get_tracer(extension) - meter = self.extension_registry.get_meter(extension) - metrics = self.extension_registry.get_metrics(extension, meter) + metrics = self.extension_registry.get_metrics(extension) instrumentor_ctx = _BotocoreInstrumentorContext( logger=self.extension_registry.get_logger(extension), metrics=metrics, @@ -428,7 +439,7 @@ def _call_request_hook(self, span: Span, call_context: _AwsSdkCallContext): ) def _call_response_hook( - self, span: Span, call_context: _AwsSdkCallContext, result + self, span: Span, call_context: _AwsSdkCallContext, result ): if not callable(self.response_hook): return diff --git a/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/extensions/__init__.py b/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/extensions/__init__.py index 4cf96fee19..48b78e8efd 100644 --- a/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/extensions/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/extensions/__init__.py @@ -15,11 +15,6 @@ import importlib import logging -from opentelemetry.instrumentation.botocore.extensions.types import ( - _AwsSdkCallContext, - _AwsSdkExtension, -) - _logger = logging.getLogger(__name__) @@ -27,6 +22,7 @@ def _lazy_load(module, cls): def loader(): imported_mod = importlib.import_module(module, __name__) return getattr(imported_mod, cls, None) + return loader diff --git a/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/extensions/registry.py b/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/extensions/registry.py index eb54a07e4d..5754e4d148 100644 --- a/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/extensions/registry.py +++ b/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/extensions/registry.py @@ -13,16 +13,23 @@ # limitations under the License. from __future__ import annotations -from typing import Optional, TYPE_CHECKING, Mapping, Callable import logging - -from opentelemetry.instrumentation.botocore import __name__ as package_name, __version__, _safe_invoke +from typing import TYPE_CHECKING, Callable, Mapping, Optional + +from opentelemetry._logs import get_logger +from opentelemetry.instrumentation.botocore.extensions import ( + _AwsSdkCallContext, + _AwsSdkExtension, +) +from opentelemetry.instrumentation.botocore.utils import _safe_invoke +from opentelemetry.instrumentation.botocore.version import __version__ +from opentelemetry.metrics import get_meter +from opentelemetry.trace import get_tracer if TYPE_CHECKING: - from opentelemetry.trace import Tracer, TracerProvider, get_tracer - from opentelemetry.metrics import Meter, Instrument, MeterProvider, get_meter - from opentelemetry._logs import Logger, LoggerProvider, get_logger - from opentelemetry.instrumentation.botocore.extensions import _AwsSdkExtension, _AwsSdkCallContext + from opentelemetry._logs import Logger, LoggerProvider + from opentelemetry.metrics import Instrument, Meter, MeterProvider + from opentelemetry.trace import Tracer, TracerProvider _logger = logging.getLogger(__name__) @@ -30,18 +37,22 @@ class ExtensionRegistry: """ - Registry for AWS SDK extensions that manages extension lookup and + Registry for AWS SDK extensions that manages extension lookup and associated OpenTelemetry instrumentation components (tracers, loggers, meters, metrics). """ def __init__( self, + package_name: str, extensions: Mapping[str, Callable[[], type[_AwsSdkExtension]]], tracer_provider: Optional[TracerProvider] = None, logger_provider: Optional[LoggerProvider] = None, meter_provider: Optional[MeterProvider] = None, ): - self._extensions: Mapping[str, Callable[[], type[_AwsSdkExtension]]] = extensions + self._package_name = package_name + self._extensions: Mapping[ + str, Callable[[], type[_AwsSdkExtension]] + ] = extensions self._tracer_provider: TracerProvider = tracer_provider self._logger_provider: LoggerProvider = logger_provider self._meter_provider: MeterProvider = meter_provider @@ -50,7 +61,9 @@ def __init__( self._meters: dict[str, Meter] = {} self._metrics: dict[str, dict[str, Instrument]] = {} - def get_extension(self, call_context: _AwsSdkCallContext) -> _AwsSdkExtension: + def get_extension( + self, call_context: _AwsSdkCallContext + ) -> _AwsSdkExtension: """ Get the appropriate extension for a given call context. @@ -61,12 +74,14 @@ def get_extension(self, call_context: _AwsSdkCallContext) -> _AwsSdkExtension: The matching extension for the service/operation """ try: - loader: Callable[[], type[_AwsSdkExtension]] = self._extensions.get(call_context.service) + loader: Callable[[], type[_AwsSdkExtension]] = ( + self._extensions.get(call_context.service) + ) if loader is None: return _AwsSdkExtension(call_context) extension_cls = loader() return extension_cls(call_context) - except Exception as exc: # pylint: disable=broad-except + except Exception as exc: # pylint: disable=broad-except _logger.error("Error when loading extension: %s", exc) return _AwsSdkExtension(call_context) @@ -96,8 +111,8 @@ def get_instrumentation_name(self, extension: _AwsSdkExtension) -> str: The instrumentation name string """ if self.has_extension(extension._call_context): - return f"{package_name}.{extension._call_context.service}" - return package_name + return f"{self._package_name}.{extension._call_context.service}" + return self._package_name def get_tracer(self, extension: _AwsSdkExtension) -> Tracer: """ diff --git a/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/utils.py b/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/utils.py index 8ac9938435..29e8d12b88 100644 --- a/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/utils.py +++ b/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/utils.py @@ -22,7 +22,6 @@ ) from opentelemetry.util.types import AttributeValue - _logger = logging.getLogger(__name__) From 9f0efaeb8491fe4fa44e6f7d563ebb585b592d37 Mon Sep 17 00:00:00 2001 From: Lukas Hering Date: Wed, 24 Dec 2025 00:38:29 -0500 Subject: [PATCH 3/4] add basic set of tests for aiobotocore instrumentation --- .../pyproject.toml | 1 + .../botocore/extensions/registry.py | 2 +- .../test-requirements-0.txt | 5 +- .../test-requirements-1.txt | 5 +- .../tests/test_aiobotocore_instrumentation.py | 407 ++++++++++++++++++ 5 files changed, 415 insertions(+), 5 deletions(-) create mode 100644 instrumentation/opentelemetry-instrumentation-botocore/tests/test_aiobotocore_instrumentation.py diff --git a/instrumentation/opentelemetry-instrumentation-botocore/pyproject.toml b/instrumentation/opentelemetry-instrumentation-botocore/pyproject.toml index 42450e1868..d55da30f5c 100644 --- a/instrumentation/opentelemetry-instrumentation-botocore/pyproject.toml +++ b/instrumentation/opentelemetry-instrumentation-botocore/pyproject.toml @@ -34,6 +34,7 @@ dependencies = [ [project.optional-dependencies] instruments = [ "botocore ~= 1.0", + "aiobotocore ~= 2.0", ] [project.entry-points.opentelemetry_instrumentor] diff --git a/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/extensions/registry.py b/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/extensions/registry.py index 5754e4d148..f3429cf23c 100644 --- a/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/extensions/registry.py +++ b/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/extensions/registry.py @@ -17,7 +17,7 @@ from typing import TYPE_CHECKING, Callable, Mapping, Optional from opentelemetry._logs import get_logger -from opentelemetry.instrumentation.botocore.extensions import ( +from opentelemetry.instrumentation.botocore.extensions.types import ( _AwsSdkCallContext, _AwsSdkExtension, ) diff --git a/instrumentation/opentelemetry-instrumentation-botocore/test-requirements-0.txt b/instrumentation/opentelemetry-instrumentation-botocore/test-requirements-0.txt index f6243241bd..69dc82bdfa 100644 --- a/instrumentation/opentelemetry-instrumentation-botocore/test-requirements-0.txt +++ b/instrumentation/opentelemetry-instrumentation-botocore/test-requirements-0.txt @@ -1,7 +1,8 @@ asgiref==3.8.1 aws-xray-sdk==2.12.1 -boto3==1.28.80 -botocore==1.31.80 +boto3==1.29.4 +botocore==1.32.4 +aiobotocore==2.8.0 certifi==2024.7.4 cffi==1.17.0 charset-normalizer==3.3.2 diff --git a/instrumentation/opentelemetry-instrumentation-botocore/test-requirements-1.txt b/instrumentation/opentelemetry-instrumentation-botocore/test-requirements-1.txt index 5c7cb24a0c..518da691cc 100644 --- a/instrumentation/opentelemetry-instrumentation-botocore/test-requirements-1.txt +++ b/instrumentation/opentelemetry-instrumentation-botocore/test-requirements-1.txt @@ -1,7 +1,8 @@ asgiref==3.8.1 aws-xray-sdk==2.12.1 -boto3==1.35.56 -botocore==1.35.56 +boto3==1.35.16 +botocore==1.35.16 +aiobotocore==2.15.0 certifi==2024.7.4 cffi==1.17.0 charset-normalizer==3.3.2 diff --git a/instrumentation/opentelemetry-instrumentation-botocore/tests/test_aiobotocore_instrumentation.py b/instrumentation/opentelemetry-instrumentation-botocore/tests/test_aiobotocore_instrumentation.py new file mode 100644 index 0000000000..58ffec922b --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-botocore/tests/test_aiobotocore_instrumentation.py @@ -0,0 +1,407 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# pylint:disable=too-many-public-methods +import asyncio +import json +from typing import Any +from unittest.mock import Mock, patch + +import aiobotocore.session +import botocore.stub +from botocore.exceptions import ClientError + +from opentelemetry import trace as trace_api +from opentelemetry.instrumentation.botocore import AiobotocoreInstrumentor +from opentelemetry.instrumentation.utils import suppress_instrumentation +from opentelemetry.semconv._incubating.attributes.cloud_attributes import ( + CLOUD_REGION, +) +from opentelemetry.semconv.trace import SpanAttributes +from opentelemetry.test.test_base import TestBase + +_REQUEST_ID_REGEX_MATCH = r"[A-Za-z0-9]{52}" + + +class TestAiobotocoreInstrumentor(TestBase): + """Aiobotocore integration testsuite""" + + def setUp(self): + super().setUp() + AiobotocoreInstrumentor().instrument() + self.session = aiobotocore.session.get_session() + self.session.set_credentials( + access_key="access-key", + secret_key="secret-key", + ) + self.region = "us-west-2" + + def tearDown(self): + super().tearDown() + AiobotocoreInstrumentor().uninstrument() + + def _default_span_attributes(self, service: str, operation: str): + return { + SpanAttributes.RPC_SYSTEM: "aws-api", + SpanAttributes.RPC_SERVICE: service, + SpanAttributes.RPC_METHOD: operation, + CLOUD_REGION: self.region, + "retry_attempts": 0, + SpanAttributes.HTTP_STATUS_CODE: 200, + SpanAttributes.SERVER_ADDRESS: f"{service.lower()}.{self.region}.amazonaws.com", + SpanAttributes.SERVER_PORT: 443, + } + + def assert_only_span(self): + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(1, len(spans)) + return spans[0] + + def assert_span( + self, + service: str, + operation: str, + request_id=None, + attributes=None, + ): + span = self.assert_only_span() + expected = self._default_span_attributes(service, operation) + if attributes: + expected.update(attributes) + + span_attributes_request_id = "aws.request_id" + if request_id is _REQUEST_ID_REGEX_MATCH: + actual_request_id = span.attributes[span_attributes_request_id] + self.assertRegex(actual_request_id, _REQUEST_ID_REGEX_MATCH) + expected[span_attributes_request_id] = actual_request_id + elif request_id is not None: + expected[span_attributes_request_id] = request_id + + self.assertSpanHasAttributes(span, expected) + self.assertEqual(f"{service}.{operation}", span.name) + return span + + def _make_client(self, service: str): + return self.session.create_client(service, region_name=self.region) + + def _make_response_meta(self, request_id: str) -> dict[str, Any]: + return { + "RequestId": request_id, + "RetryAttempts": 0, + "HTTPStatusCode": 200, + } + + def test_traced_client_ec2(self): + """Test basic EC2 client tracing with stubbed response.""" + request_id = "fdcdcab1-ae5c-489e-9c33-4637c5dda355" + + async def _test(): + async with self._make_client("ec2") as client: + with botocore.stub.Stubber(client) as stubber: + stubber.add_response( + "describe_instances", + { + "Reservations": [], + "ResponseMetadata": self._make_response_meta( + request_id + ), + }, + ) + await client.describe_instances() + + asyncio.run(_test()) + self.assert_span("EC2", "DescribeInstances", request_id=request_id) + + def test_traced_client_s3(self): + """Test S3 client tracing with stubbed response.""" + request_id = "s3-request-id-12345" + + async def _test(): + async with self._make_client("s3") as client: + with botocore.stub.Stubber(client) as stubber: + stubber.add_response( + "list_buckets", + { + "Buckets": [], + "Owner": {"ID": "owner-id"}, + "ResponseMetadata": self._make_response_meta( + request_id + ), + }, + ) + await client.list_buckets() + + asyncio.run(_test()) + self.assert_span("S3", "ListBuckets", request_id=request_id) + + def test_no_op_tracer_provider(self): + """Test that no spans are created when using NoOpTracerProvider.""" + AiobotocoreInstrumentor().uninstrument() + AiobotocoreInstrumentor().instrument( + tracer_provider=trace_api.NoOpTracerProvider() + ) + + async def _test(): + async with self._make_client("ec2") as client: + with botocore.stub.Stubber(client) as stubber: + stubber.add_response( + "describe_instances", + { + "Reservations": [], + "ResponseMetadata": self._make_response_meta( + "test-id" + ), + }, + ) + await client.describe_instances() + + asyncio.run(_test()) + spans_list = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans_list), 0) + + def test_not_recording(self): + """Test behavior when span is not recording.""" + mock_tracer = Mock() + mock_span = Mock() + mock_span.is_recording.return_value = False + mock_tracer.start_span.return_value = mock_span + + async def _test(): + with patch("opentelemetry.trace.get_tracer") as tracer: + tracer.return_value = mock_tracer + async with self._make_client("ec2") as client: + with botocore.stub.Stubber(client) as stubber: + stubber.add_response( + "describe_instances", + { + "Reservations": [], + "ResponseMetadata": self._make_response_meta( + "test-id" + ), + }, + ) + await client.describe_instances() + self.assertFalse(mock_span.is_recording()) + self.assertTrue(mock_span.is_recording.called) + self.assertFalse(mock_span.set_attribute.called) + self.assertFalse(mock_span.set_status.called) + + asyncio.run(_test()) + + def test_client_error(self): + """Test that ClientError is properly traced with error status.""" + + async def _test(): + async with self._make_client("s3") as client: + with botocore.stub.Stubber(client) as stubber: + stubber.add_client_error( + "get_object", + service_error_code="NoSuchKey", + service_message="The specified key does not exist.", + http_status_code=404, + ) + with self.assertRaises(ClientError): + await client.get_object( + Bucket="test-bucket", Key="test-key" + ) + + asyncio.run(_test()) + + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(1, len(spans)) + span = spans[0] + + self.assertIs(span.status.status_code, trace_api.StatusCode.ERROR) + self.assertEqual("S3.GetObject", span.name) + + # Verify exception event was recorded + self.assertEqual(1, len(span.events)) + event = span.events[0] + self.assertIn(SpanAttributes.EXCEPTION_STACKTRACE, event.attributes) + self.assertIn(SpanAttributes.EXCEPTION_TYPE, event.attributes) + self.assertIn(SpanAttributes.EXCEPTION_MESSAGE, event.attributes) + + def test_suppress_instrumentation(self): + """Test that instrumentation can be suppressed.""" + + async def _test(): + async with self._make_client("ec2") as client: + with botocore.stub.Stubber(client) as stubber: + stubber.add_response( + "describe_instances", + { + "Reservations": [], + "ResponseMetadata": self._make_response_meta( + "test-id" + ), + }, + ) + stubber.add_response( + "describe_instances", + { + "Reservations": [], + "ResponseMetadata": self._make_response_meta( + "test-id-2" + ), + }, + ) + with suppress_instrumentation(): + await client.describe_instances() + await client.describe_instances() + + asyncio.run(_test()) + self.assertEqual(0, len(self.get_finished_spans())) + + def test_request_hook(self): + """Test that request hook is called with correct parameters.""" + request_hook_service_attribute_name = "request_hook.service_name" + request_hook_operation_attribute_name = "request_hook.operation_name" + request_hook_api_params_attribute_name = "request_hook.api_params" + + def request_hook(span, service_name, operation_name, api_params): + hook_attributes = { + request_hook_service_attribute_name: service_name, + request_hook_operation_attribute_name: operation_name, + request_hook_api_params_attribute_name: json.dumps(api_params), + } + span.set_attributes(hook_attributes) + + AiobotocoreInstrumentor().uninstrument() + AiobotocoreInstrumentor().instrument(request_hook=request_hook) + + request_id = "hook-test-request-id" + + async def _test(): + async with self._make_client("s3") as client: + with botocore.stub.Stubber(client) as stubber: + stubber.add_response( + "list_objects_v2", + { + "Contents": [], + "ResponseMetadata": self._make_response_meta( + request_id + ), + }, + expected_params={"Bucket": "test-bucket"}, + ) + await client.list_objects_v2(Bucket="test-bucket") + + asyncio.run(_test()) + + self.assert_span( + "S3", + "ListObjectsV2", + request_id=request_id, + attributes={ + request_hook_service_attribute_name: "s3", + request_hook_operation_attribute_name: "ListObjectsV2", + request_hook_api_params_attribute_name: json.dumps( + {"Bucket": "test-bucket"} + ), + }, + ) + + def test_response_hook(self): + """Test that response hook is called with correct parameters.""" + response_hook_service_attribute_name = "response_hook.service_name" + response_hook_operation_attribute_name = "response_hook.operation_name" + response_hook_bucket_count_attribute_name = ( + "response_hook.bucket_count" + ) + + def response_hook(span, service_name, operation_name, result): + hook_attributes = { + response_hook_service_attribute_name: service_name, + response_hook_operation_attribute_name: operation_name, + response_hook_bucket_count_attribute_name: len( + result["Buckets"] + ), + } + span.set_attributes(hook_attributes) + + AiobotocoreInstrumentor().uninstrument() + AiobotocoreInstrumentor().instrument(response_hook=response_hook) + + request_id = "response-hook-test-id" + + async def _test(): + async with self._make_client("s3") as client: + with botocore.stub.Stubber(client) as stubber: + stubber.add_response( + "list_buckets", + { + "Buckets": [ + {"Name": "bucket1"}, + {"Name": "bucket2"}, + ], + "Owner": {"ID": "owner-id"}, + "ResponseMetadata": self._make_response_meta( + request_id + ), + }, + ) + await client.list_buckets() + + asyncio.run(_test()) + + self.assert_span( + "S3", + "ListBuckets", + request_id=request_id, + attributes={ + response_hook_service_attribute_name: "s3", + response_hook_operation_attribute_name: "ListBuckets", + response_hook_bucket_count_attribute_name: 2, + }, + ) + + def test_multiple_operations(self): + """Test tracing multiple sequential operations.""" + + async def _test(): + async with self._make_client("s3") as client: + with botocore.stub.Stubber(client) as stubber: + stubber.add_response( + "list_buckets", + { + "Buckets": [], + "Owner": {"ID": "owner-id"}, + "ResponseMetadata": self._make_response_meta( + "req-1" + ), + }, + ) + stubber.add_response( + "list_buckets", + { + "Buckets": [], + "Owner": {"ID": "owner-id"}, + "ResponseMetadata": self._make_response_meta( + "req-2" + ), + }, + ) + await client.list_buckets() + await client.list_buckets() + + asyncio.run(_test()) + + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(2, len(spans)) + + for span in spans: + self.assertEqual("S3.ListBuckets", span.name) + + self.assertEqual("req-1", spans[0].attributes["aws.request_id"]) + self.assertEqual("req-2", spans[1].attributes["aws.request_id"]) From 72d3e9eba3bd14861f20e1c663d63ea0f19ba624 Mon Sep 17 00:00:00 2001 From: Lukas Hering Date: Wed, 24 Dec 2025 00:44:32 -0500 Subject: [PATCH 4/4] fix lint error --- .../opentelemetry-instrumentation-botocore/pyproject.toml | 1 - .../tests/test_aiobotocore_instrumentation.py | 3 ++- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-botocore/pyproject.toml b/instrumentation/opentelemetry-instrumentation-botocore/pyproject.toml index d55da30f5c..42450e1868 100644 --- a/instrumentation/opentelemetry-instrumentation-botocore/pyproject.toml +++ b/instrumentation/opentelemetry-instrumentation-botocore/pyproject.toml @@ -34,7 +34,6 @@ dependencies = [ [project.optional-dependencies] instruments = [ "botocore ~= 1.0", - "aiobotocore ~= 2.0", ] [project.entry-points.opentelemetry_instrumentor] diff --git a/instrumentation/opentelemetry-instrumentation-botocore/tests/test_aiobotocore_instrumentation.py b/instrumentation/opentelemetry-instrumentation-botocore/tests/test_aiobotocore_instrumentation.py index 58ffec922b..970c845270 100644 --- a/instrumentation/opentelemetry-instrumentation-botocore/tests/test_aiobotocore_instrumentation.py +++ b/instrumentation/opentelemetry-instrumentation-botocore/tests/test_aiobotocore_instrumentation.py @@ -95,7 +95,8 @@ def assert_span( def _make_client(self, service: str): return self.session.create_client(service, region_name=self.region) - def _make_response_meta(self, request_id: str) -> dict[str, Any]: + @staticmethod + def _make_response_meta(request_id: str) -> dict[str, Any]: return { "RequestId": request_id, "RetryAttempts": 0,