From b6cd550fbefcd8493e9f827ef4ac559a89ba297b Mon Sep 17 00:00:00 2001 From: "Patrick M. Niedzielski" Date: Fri, 6 Feb 2026 16:23:34 -0500 Subject: [PATCH] Feat: Add user agent prefix string The BlazingMQ broker and C++ SDK recently gained the ability to use a user agent string during session negotiation for more precise broker telemetry of sessions. One of the primary drivers of this new capability was to be able to distinguish Python `blazingmq` sessions, which wrap `libbmq`, from applications that directly use `libbmq`. This patch makes good on that promise by teaching `blazingmq` to set the a user agent prefix, and to expose a session option for any further libraries that may wrap the Python SDK. Applications that are implemented on top of this library will start sending a user agent string of the form `blazingmq(python3.9):1.3.0 libbmq:99.99.99`, containing the Python interpreter version of the application, the version of the `blazingmq` Python library, and the version of the `libbmq` C++ library on which `blazingmq` was built. Signed-off-by: Patrick M. Niedzielski --- news/71.feature.rst | 1 + src/blazingmq/_ext.pyi | 1 + src/blazingmq/_ext.pyx | 4 ++- src/blazingmq/_session.py | 54 ++++++++++++++++++++++++++++++++++++++ src/cpp/pybmq_session.cpp | 5 +++- src/cpp/pybmq_session.h | 3 ++- src/declarations/pybmq.pxd | 3 ++- 7 files changed, 67 insertions(+), 4 deletions(-) create mode 100644 news/71.feature.rst diff --git a/news/71.feature.rst b/news/71.feature.rst new file mode 100644 index 0000000..40116dc --- /dev/null +++ b/news/71.feature.rst @@ -0,0 +1 @@ +Added user agent prefix string to session negotation diff --git a/src/blazingmq/_ext.pyi b/src/blazingmq/_ext.pyi index ca022e4..9739888 100644 --- a/src/blazingmq/_ext.pyi +++ b/src/blazingmq/_ext.pyi @@ -53,6 +53,7 @@ class Session: timeouts: Timeouts = Timeouts(), monitor_host_health: bool = False, fake_host_health_monitor: Optional[FakeHostHealthMonitor] = None, + user_agent_prefix: bytes, ) -> None: ... def stop(self) -> None: ... def open_queue_sync( diff --git a/src/blazingmq/_ext.pyx b/src/blazingmq/_ext.pyx index 52f979d..7054bb3 100644 --- a/src/blazingmq/_ext.pyx +++ b/src/blazingmq/_ext.pyx @@ -174,6 +174,7 @@ cdef class Session: monitor_host_health: bool = False, fake_host_health_monitor: FakeHostHealthMonitor = None, _mock: Optional[object] = None, + user_agent_prefix: bytes, ) -> None: cdef shared_ptr[ManualHostHealthMonitor] fake_host_health_monitor_sp cdef optional[int] c_num_processing_threads @@ -241,7 +242,8 @@ cdef class Session: fake_host_health_monitor_sp, Error, BrokerTimeoutError, - _mock) + _mock, + user_agent_prefix) self._session.start(c_connect_timeout) atexit.register(ensure_stop_session_impl, weakref.ref(self)) diff --git a/src/blazingmq/_session.py b/src/blazingmq/_session.py index e4ff31c..a7635fd 100644 --- a/src/blazingmq/_session.py +++ b/src/blazingmq/_session.py @@ -15,6 +15,8 @@ from __future__ import annotations +import string + from typing import Any from typing import Callable from typing import Dict @@ -23,6 +25,7 @@ from typing import Union from . import _six as six +from ._about import __version__ from ._enums import CompressionAlgorithmType from ._enums import PropertyType from ._ext import DEFAULT_CONSUMER_PRIORITY @@ -72,6 +75,38 @@ def _validate_timeouts(timeouts: Timeouts) -> Timeouts: ) +def _validate_user_agent_prefix(user_agent_prefix: bytes) -> bytes: + """Validate a user agent prefix bytestring for use by the Cython layer. + + If the user agent prefix string is longer than 96 bytes or contains + non-printable characters, raise a `ValueError`. Otherwise, return the user + agent prefix string unchanged. + """ + if len(user_agent_prefix) > 96: + raise ValueError( + f"user_agent_prefix ({user_agent_prefix}) must be less than 96 " + f"bytes (is {len(user_agent_prefix)} bytes)" + ) + elif any(c not in string.printable for c in user_agent_prefix): + raise ValueError( + f"user_agent_prefix ({user_agent_prefix}) must only contain " + f"printable characters" + ) + else: + return _construct_user_agent_prefix(user_agent_prefix) + + +def _construct_user_agent_prefix(user_agent_prefix: bytes) -> bytes: + """Construct the user agent prefix for use by the Cython layer. + """ + if user_agent_prefix != b"": + user_agent_prefix += b" " + python_version = platform.python_version().encode('ascii', errors='strict') + blazingmq_version = __version__.encode('ascii', errors='strict') + user_agent_prefix += b"blazingmq(python" + python_version + b"):" + blazingmq_version + return user_agent_prefix + + def _convert_timeout(timeout: Optional[float]) -> Optional[float]: """Convert the timeout for use by the Cython layer. @@ -288,6 +323,12 @@ class SessionOptions: 0, disable the recurring dump of stats (final stats are always dumped at the end of the session). The default is 5min; the value must be a multiple of 30s, in the range ``[0s - 60min]``. + user_agent_prefix: + Bytestring to include in the user agent for broker telemetry. This + string must only contain printable characters and must be less than + 96 bytes long. This is provided for libraries that are wrapping + this SDK. Applications directly using the SDK are encouraged *NOT* + to set this value. """ def __init__( @@ -300,6 +341,7 @@ def __init__( channel_high_watermark: Optional[int] = None, event_queue_watermarks: Optional[tuple[int, int]] = None, stats_dump_interval: Optional[float] = None, + user_agent_prefix: Optional[bytes] = None, ) -> None: self.message_compression_algorithm = message_compression_algorithm self.timeouts = timeouts @@ -309,6 +351,7 @@ def __init__( self.channel_high_watermark = channel_high_watermark self.event_queue_watermarks = event_queue_watermarks self.stats_dump_interval = stats_dump_interval + self.user_agent_prefix = user_agent_prefix def __eq__(self, other: object) -> bool: if not isinstance(other, SessionOptions): @@ -322,6 +365,7 @@ def __eq__(self, other: object) -> bool: and self.channel_high_watermark == other.channel_high_watermark and self.event_queue_watermarks == other.event_queue_watermarks and self.stats_dump_interval == other.stats_dump_interval + and self.user_agent_prefix == other.user_agent_prefix ) def __ne__(self, other: object) -> bool: @@ -337,6 +381,7 @@ def __repr__(self) -> str: "channel_high_watermark", "event_queue_watermarks", "stats_dump_interval", + "user_agent_prefix", ) params = [] @@ -399,6 +444,11 @@ class Session: stats are always dumped at the end of the session). The default is 5min; the value must be a multiple of 30s, in the range ``[0s - 60min]``. + user_agent_prefix: Bytestring to include in the user agent for broker + telemetry. This string must only contain printable characters and + must be less than 128 bytes long. This is provided for libraries + that are wrapping this SDK. Applications directly using the SDK + are encouraged *NOT* to set this value. Raises: `~blazingmq.Error`: If the session start request was not successful. @@ -423,6 +473,7 @@ def __init__( channel_high_watermark: Optional[int] = None, event_queue_watermarks: Optional[tuple[int, int]] = None, stats_dump_interval: Optional[float] = None, + user_agent_prefix: Optional[bytes] = None, ) -> None: if host_health_monitor is not None: if not isinstance(host_health_monitor, BasicHealthMonitor): @@ -459,6 +510,7 @@ def __init__( timeouts=_validate_timeouts(timeout), monitor_host_health=monitor_host_health, fake_host_health_monitor=fake_host_health_monitor, + user_agent_prefix=_validate_user_agent_prefix(user_agent_prefix), ) @classmethod @@ -510,6 +562,7 @@ def with_options( session_options.channel_high_watermark, session_options.event_queue_watermarks, session_options.stats_dump_interval, + session_options.user_agent_prefix, ) else: return cls( @@ -524,6 +577,7 @@ def with_options( session_options.channel_high_watermark, session_options.event_queue_watermarks, session_options.stats_dump_interval, + session_options.user_agent_prefix, ) def open_queue( diff --git a/src/cpp/pybmq_session.cpp b/src/cpp/pybmq_session.cpp index 80c8947..bebe0e2 100644 --- a/src/cpp/pybmq_session.cpp +++ b/src/cpp/pybmq_session.cpp @@ -94,7 +94,8 @@ Session::Session( bsl::shared_ptr fake_host_health_monitor_sp, PyObject* error, PyObject* broker_timeout_error, - PyObject* mock) + PyObject* mock, + const char* user_agent_prefix) : d_started_lock() , d_started(false) , d_message_compression_type(bmqt::CompressionAlgorithmType::e_NONE) @@ -168,6 +169,8 @@ Session::Session( options.setCloseQueueTimeout(close_queue_timeout); } + options.setUserAgentPrefix(user_agent_prefix); + bslma::ManagedPtr handler( new pybmq::SessionEventHandler( py_session_event_callback, diff --git a/src/cpp/pybmq_session.h b/src/cpp/pybmq_session.h index f37a407..d171aae 100644 --- a/src/cpp/pybmq_session.h +++ b/src/cpp/pybmq_session.h @@ -68,7 +68,8 @@ class Session bsl::shared_ptr fake_host_health_monitor, PyObject* d_error, PyObject* d_broker_timeout_error, - PyObject* mock); + PyObject* mock, + const char* user_agent_string); ~Session(); diff --git a/src/declarations/pybmq.pxd b/src/declarations/pybmq.pxd index 9ba293e..bf9a208 100644 --- a/src/declarations/pybmq.pxd +++ b/src/declarations/pybmq.pxd @@ -56,7 +56,8 @@ cdef extern from "pybmq_session.h" namespace "BloombergLP::pybmq" nogil: shared_ptr[ManualHostHealthMonitor] fake_host_health_monitor_sp, object error, object broker_timeout_error, - object mock) except+ + object mock, + const char* user_agent_prefix) except+ object start(TimeInterval) except+ object stop(bint) except+