Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions news/71.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Added user agent prefix string to session negotation
1 change: 1 addition & 0 deletions src/blazingmq/_ext.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ class Session:
timeouts: Timeouts = Timeouts(),
monitor_host_health: bool = False,
fake_host_health_monitor: Optional[FakeHostHealthMonitor] = None,
user_agent_prefix: bytes,
) -> None: ...
def stop(self) -> None: ...
def open_queue_sync(
Expand Down
4 changes: 3 additions & 1 deletion src/blazingmq/_ext.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ cdef class Session:
monitor_host_health: bool = False,
fake_host_health_monitor: FakeHostHealthMonitor = None,
_mock: Optional[object] = None,
user_agent_prefix: bytes,
) -> None:
cdef shared_ptr[ManualHostHealthMonitor] fake_host_health_monitor_sp
cdef optional[int] c_num_processing_threads
Expand Down Expand Up @@ -241,7 +242,8 @@ cdef class Session:
fake_host_health_monitor_sp,
Error,
BrokerTimeoutError,
_mock)
_mock,
user_agent_prefix)
self._session.start(c_connect_timeout)
atexit.register(ensure_stop_session_impl, weakref.ref(self))

Expand Down
54 changes: 54 additions & 0 deletions src/blazingmq/_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@

from __future__ import annotations

import string

from typing import Any
from typing import Callable
from typing import Dict
Expand All @@ -23,6 +25,7 @@
from typing import Union

from . import _six as six
from ._about import __version__
from ._enums import CompressionAlgorithmType
from ._enums import PropertyType
from ._ext import DEFAULT_CONSUMER_PRIORITY
Expand Down Expand Up @@ -72,6 +75,38 @@ def _validate_timeouts(timeouts: Timeouts) -> Timeouts:
)


def _validate_user_agent_prefix(user_agent_prefix: bytes) -> bytes:
"""Validate a user agent prefix bytestring for use by the Cython layer.

If the user agent prefix string is longer than 96 bytes or contains
non-printable characters, raise a `ValueError`. Otherwise, return the user
agent prefix string unchanged.
"""
if len(user_agent_prefix) > 96:
raise ValueError(
f"user_agent_prefix ({user_agent_prefix}) must be less than 96 "
f"bytes (is {len(user_agent_prefix)} bytes)"
)
elif any(c not in string.printable for c in user_agent_prefix):
raise ValueError(
f"user_agent_prefix ({user_agent_prefix}) must only contain "
f"printable characters"
)
else:
return _construct_user_agent_prefix(user_agent_prefix)


def _construct_user_agent_prefix(user_agent_prefix: bytes) -> bytes:
"""Construct the user agent prefix for use by the Cython layer.
"""
if user_agent_prefix != b"":
user_agent_prefix += b" "
python_version = platform.python_version().encode('ascii', errors='strict')
blazingmq_version = __version__.encode('ascii', errors='strict')
user_agent_prefix += b"blazingmq(python" + python_version + b"):" + blazingmq_version
return user_agent_prefix


def _convert_timeout(timeout: Optional[float]) -> Optional[float]:
"""Convert the timeout for use by the Cython layer.

Expand Down Expand Up @@ -288,6 +323,12 @@ class SessionOptions:
0, disable the recurring dump of stats (final stats are always
dumped at the end of the session). The default is 5min; the value
must be a multiple of 30s, in the range ``[0s - 60min]``.
user_agent_prefix:
Bytestring to include in the user agent for broker telemetry. This
string must only contain printable characters and must be less than
96 bytes long. This is provided for libraries that are wrapping
this SDK. Applications directly using the SDK are encouraged *NOT*
to set this value.
"""

def __init__(
Expand All @@ -300,6 +341,7 @@ def __init__(
channel_high_watermark: Optional[int] = None,
event_queue_watermarks: Optional[tuple[int, int]] = None,
stats_dump_interval: Optional[float] = None,
user_agent_prefix: Optional[bytes] = None,
) -> None:
self.message_compression_algorithm = message_compression_algorithm
self.timeouts = timeouts
Expand All @@ -309,6 +351,7 @@ def __init__(
self.channel_high_watermark = channel_high_watermark
self.event_queue_watermarks = event_queue_watermarks
self.stats_dump_interval = stats_dump_interval
self.user_agent_prefix = user_agent_prefix

def __eq__(self, other: object) -> bool:
if not isinstance(other, SessionOptions):
Expand All @@ -322,6 +365,7 @@ def __eq__(self, other: object) -> bool:
and self.channel_high_watermark == other.channel_high_watermark
and self.event_queue_watermarks == other.event_queue_watermarks
and self.stats_dump_interval == other.stats_dump_interval
and self.user_agent_prefix == other.user_agent_prefix
)

def __ne__(self, other: object) -> bool:
Expand All @@ -337,6 +381,7 @@ def __repr__(self) -> str:
"channel_high_watermark",
"event_queue_watermarks",
"stats_dump_interval",
"user_agent_prefix",
)

params = []
Expand Down Expand Up @@ -399,6 +444,11 @@ class Session:
stats are always dumped at the end of the session). The default is
5min; the value must be a multiple of 30s, in the range
``[0s - 60min]``.
user_agent_prefix: Bytestring to include in the user agent for broker
telemetry. This string must only contain printable characters and
must be less than 128 bytes long. This is provided for libraries
that are wrapping this SDK. Applications directly using the SDK
are encouraged *NOT* to set this value.

Raises:
`~blazingmq.Error`: If the session start request was not successful.
Expand All @@ -423,6 +473,7 @@ def __init__(
channel_high_watermark: Optional[int] = None,
event_queue_watermarks: Optional[tuple[int, int]] = None,
stats_dump_interval: Optional[float] = None,
user_agent_prefix: Optional[bytes] = None,
) -> None:
if host_health_monitor is not None:
if not isinstance(host_health_monitor, BasicHealthMonitor):
Expand Down Expand Up @@ -459,6 +510,7 @@ def __init__(
timeouts=_validate_timeouts(timeout),
monitor_host_health=monitor_host_health,
fake_host_health_monitor=fake_host_health_monitor,
user_agent_prefix=_validate_user_agent_prefix(user_agent_prefix),
)

@classmethod
Expand Down Expand Up @@ -510,6 +562,7 @@ def with_options(
session_options.channel_high_watermark,
session_options.event_queue_watermarks,
session_options.stats_dump_interval,
session_options.user_agent_prefix,
)
else:
return cls(
Expand All @@ -524,6 +577,7 @@ def with_options(
session_options.channel_high_watermark,
session_options.event_queue_watermarks,
session_options.stats_dump_interval,
session_options.user_agent_prefix,
)

def open_queue(
Expand Down
5 changes: 4 additions & 1 deletion src/cpp/pybmq_session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,8 @@ Session::Session(
bsl::shared_ptr<bmqa::ManualHostHealthMonitor> fake_host_health_monitor_sp,
PyObject* error,
PyObject* broker_timeout_error,
PyObject* mock)
PyObject* mock,
const char* user_agent_prefix)
: d_started_lock()
, d_started(false)
, d_message_compression_type(bmqt::CompressionAlgorithmType::e_NONE)
Expand Down Expand Up @@ -168,6 +169,8 @@ Session::Session(
options.setCloseQueueTimeout(close_queue_timeout);
}

options.setUserAgentPrefix(user_agent_prefix);

bslma::ManagedPtr<bmqa::SessionEventHandler> handler(
new pybmq::SessionEventHandler(
py_session_event_callback,
Expand Down
3 changes: 2 additions & 1 deletion src/cpp/pybmq_session.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ class Session
bsl::shared_ptr<bmqa::ManualHostHealthMonitor> fake_host_health_monitor,
PyObject* d_error,
PyObject* d_broker_timeout_error,
PyObject* mock);
PyObject* mock,
const char* user_agent_string);

~Session();

Expand Down
3 changes: 2 additions & 1 deletion src/declarations/pybmq.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ cdef extern from "pybmq_session.h" namespace "BloombergLP::pybmq" nogil:
shared_ptr[ManualHostHealthMonitor] fake_host_health_monitor_sp,
object error,
object broker_timeout_error,
object mock) except+
object mock,
const char* user_agent_prefix) except+

object start(TimeInterval) except+
object stop(bint) except+
Expand Down
Loading