Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 47 additions & 1 deletion bmq-sdk/src/main/java/com/bloomberg/bmq/SessionOptions.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2022 Bloomberg Finance L.P.
* Copyright 2022-2025 Bloomberg Finance L.P.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -62,6 +62,10 @@
* HostHealthMonitor} interface responsible for notifying the session when the health of the
* host machine has changed. A {@code hostHealthMonitor} must be specified in oder for queues
* opened through the session to suspend on unhealthy hosts.
* <li>{@code userAgentPrefix}: String to include in the user agent for broker telemetry. This
* string must only contain printable characters and must be less than 128 characters long.
* This is provided for libraries that are wrapping this SDK. Applications directly using the
* SDK are encouraged <strong>NOT</strong> to set this value.
* </ul>
*
* <H2>Thread Safety</H2>
Expand Down Expand Up @@ -215,6 +219,8 @@ public int highWaterMark() {

private final HostHealthMonitor hostHealthMonitor;

private final String userAgentPrefix;

private SessionOptions() {
brokerUri = DEFAULT_URI;
startTimeout = DEFAULT_START_TIMEOUT;
Expand All @@ -226,6 +232,7 @@ private SessionOptions() {
configureQueueTimeout = QUEUE_OPERATION_TIMEOUT;
closeQueueTimeout = QUEUE_OPERATION_TIMEOUT;
hostHealthMonitor = null;
userAgentPrefix = "";
}

private SessionOptions(Builder builder) {
Expand All @@ -239,6 +246,7 @@ private SessionOptions(Builder builder) {
configureQueueTimeout = builder.configureQueueTimeout;
closeQueueTimeout = builder.closeQueueTimeout;
hostHealthMonitor = builder.hostHealthMonitor;
userAgentPrefix = builder.userAgentPrefix;
}

/**
Expand Down Expand Up @@ -383,6 +391,16 @@ public HostHealthMonitor hostHealthMonitor() {
return hostHealthMonitor;
}

/**
* Returns the string that will be prefixed to the user agent used by {@link
* com.bloomberg.bmq.Session} to identify itself to the broker.
*
* @return String user agent string prefix
*/
public String userAgentPrefix() {
return userAgentPrefix;
}

/** Helper class to create a {@code SesssionOptions} object with custom settings. */
public static class Builder {
private URI brokerUri;
Expand All @@ -397,6 +415,9 @@ public static class Builder {
private Duration closeQueueTimeout;

private HostHealthMonitor hostHealthMonitor;

private String userAgentPrefix;

/**
* Creates a {@code SesssionOptions} object based on this {@code Builder} properties.
*
Expand All @@ -417,6 +438,7 @@ private Builder(SessionOptions options) {
configureQueueTimeout = options.configureQueueTimeout;
closeQueueTimeout = options.closeQueueTimeout;
hostHealthMonitor = options.hostHealthMonitor;
userAgentPrefix = options.userAgentPrefix;
}

/**
Expand Down Expand Up @@ -558,5 +580,29 @@ public Builder setHostHealthMonitor(HostHealthMonitor value) {
hostHealthMonitor = Argument.expectNonNull(value, "host health monitor");
return this;
}

/**
* Sets the user agent prefix. This string is prefixed to a user agent constructed by the
* SDK. This is intended ONLY for other libraries that wrap this SDK to identify themselves
* for broker telemetry. Applications that are directly using this SDK are encouraged not to
* set this.
*
* @param value String user agent prefix
* @return Builder this object
* @throws NullPointerException if the specified value is null
* @throws IllegalArgumentException in case the specified value contains non-ASCII
* characters, contains non-printable characters (i.e., control characters), or is
* longer than 127 characters.
*/
public Builder setUserAgentPrefix(String value) {
Argument.expectNonNull(value, "user agent prefix");
Argument.expectCondition(
value.codePoints().allMatch(c -> c < 128 && !Character.isISOControl(c)),
"user agent prefix must be printable ASCII");
Argument.expectCondition(
value.length() < 128, "user agent prefix must be shorter than 128 characters");
userAgentPrefix = value;
return this;
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2022 Bloomberg Finance L.P.
* Copyright 2022-2025 Bloomberg Finance L.P.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -373,9 +373,20 @@ private NegotiationMessageChoice createNegoMsg() {
clientIdentity.setClusterName("");
clientIdentity.setClusterNodeId(-1);
clientIdentity.setSdkLanguage(ClientLanguage.E_JAVA);
clientIdentity.setUserAgent(constructUserAgent());
return negoMsgChoice;
}

private String constructUserAgent() {
String prefix = "";
if (connectionOptions.userAgentPrefix().length() > 0) {
prefix = connectionOptions.userAgentPrefix() + " ";
}
String javaVersion = "java" + SystemUtil.getJavaVersionString();
String sdkVersion = VersionUtil.getJarVersion();
return prefix + "com.bloomberg.bmq(" + javaVersion + "):" + sdkVersion;
}

private WriteStatus negotiate() throws IOException {
if (negotiationMsg == null) {
negotiationMsg = createNegoMsg();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2022 Bloomberg Finance L.P.
* Copyright 2022-2025 Bloomberg Finance L.P.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -27,6 +27,7 @@ public class ClientIdentity {
private String clusterName;
private Integer clusterNodeId;
private ClientLanguage sdkLanguage;
private String userAgent;

public ClientIdentity() {
init();
Expand All @@ -48,6 +49,7 @@ private void init() {
clusterName = "";
clusterNodeId = -1;
sdkLanguage = ClientLanguage.E_UNKNOWN;
userAgent = "";
}

public Integer protocolVersion() {
Expand Down Expand Up @@ -138,6 +140,14 @@ public void setSdkLanguage(ClientLanguage value) {
sdkLanguage = value;
}

public String userAgent() {
return userAgent;
}

public void setUserAgent(String value) {
userAgent = value;
}

public Object createNewInstance() {
return new ClientIdentity();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ public final DisconnectResponse disconnectResponse() {
return disconnectResponse;
}

public void init() {
public final void init() {
configureQueueStream = null;
configureQueueStreamResponse = null;
configureStream = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public final BrokerResponse brokerResponse() {
return brokerResponse;
}

public void init() {
public final void init() {
clientIdentity = null;
brokerResponse = null;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2022 Bloomberg Finance L.P.
* Copyright 2022-2025 Bloomberg Finance L.P.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -39,12 +39,14 @@ public final class ConnectionOptions {
private int startNumRetries = DEFAULT_START_NUM_RETRIES;
private Duration startRetryInterval = DEFAULT_START_RETRY_INTERVAL;
private WriteBufferWaterMark writeWaterMark = new WriteBufferWaterMark();
private String userAgentPrefix = "";

public ConnectionOptions() {}

public ConnectionOptions(SessionOptions sesOpts) {
brokerUri = sesOpts.brokerUri();
writeWaterMark = sesOpts.writeBufferWaterMark();
userAgentPrefix = sesOpts.userAgentPrefix();
}

public ConnectionOptions setBrokerUri(URI value) {
Expand Down Expand Up @@ -80,6 +82,17 @@ public ConnectionOptions setWriteBufferWaterMark(WriteBufferWaterMark value) {
return this;
}

public ConnectionOptions setUserAgentPrefix(String value) {
Argument.expectNonNull(value, "user agent prefix");
Argument.expectCondition(
value.codePoints().allMatch(c -> c < 128 && !Character.isISOControl(c)),
"user agent prefix must be printable ASCII");
Argument.expectCondition(
value.length() < 128, "user agent prefix must be shorter than 128 characters");
userAgentPrefix = value;
return this;
}

public URI brokerUri() {
return brokerUri;
}
Expand All @@ -99,4 +112,8 @@ public Duration startRetryInterval() {
public WriteBufferWaterMark writeBufferWaterMark() {
return writeWaterMark;
}

public String userAgentPrefix() {
return userAgentPrefix;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AckHeader implements Streamable {
public final class AckHeader implements Streamable {
// This class represents header for an 'ACK' event. An 'ACK' event is the
// event sent by the broker to a client in response to a post message on
// queue. Such event is optional, depending on flags used at queue open.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,23 +105,23 @@ public AckMessageImpl(
setQueueId(queueId);
}

public void setStatus(ResultCodes.AckResult status) {
public final void setStatus(ResultCodes.AckResult status) {
int value = ResultCodeUtils.intFromAckResult(status);
statusAndCorrelationId =
(statusAndCorrelationId & CORRID_MASK) | (value << STATUS_START_IDX);
}

public void setCorrelationId(CorrelationIdImpl value) {
public final void setCorrelationId(CorrelationIdImpl value) {
int id = value.toInt();
statusAndCorrelationId = (statusAndCorrelationId & STATUS_MASK) | (id & CORRID_MASK);
correlationId = value;
}

public void setQueueId(int value) {
public final void setQueueId(int value) {
queueId = value;
}

public void setMessageGUID(final MessageGUID value) {
public final void setMessageGUID(final MessageGUID value) {
value.toBinary(messageGUID);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AckMessageIterator extends MessageIterator implements Iterator<AckMessageImpl> {
public final class AckMessageIterator extends MessageIterator implements Iterator<AckMessageImpl> {

static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/
package com.bloomberg.bmq.impl.infr.proto;

public class BinaryMessageProperty extends MessageProperty {
public final class BinaryMessageProperty extends MessageProperty {

public BinaryMessageProperty() {
super(PropertyType.BINARY, MessagePropertyHeader.MAX_PROPERTY_VALUE_LENGTH);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/
package com.bloomberg.bmq.impl.infr.proto;

public class BoolMessageProperty extends MessageProperty {
public final class BoolMessageProperty extends MessageProperty {

public BoolMessageProperty() {
super(PropertyType.BOOL, Byte.SIZE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/
package com.bloomberg.bmq.impl.infr.proto;

public class ByteMessageProperty extends MessageProperty {
public final class ByteMessageProperty extends MessageProperty {

public ByteMessageProperty() {
super(PropertyType.BYTE, Byte.SIZE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import com.bloomberg.bmq.impl.infr.util.BitUtil;
import java.io.IOException;

public class ConfirmHeader implements Streamable {
public final class ConfirmHeader implements Streamable {
// This class represents header for a 'CONFIRM' event. A 'CONFIRM' event
// is the event sent by a client to the broker to signify it's done
// processing a specific message and the broker can dispose of it.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ConfirmMessageIterator extends MessageIterator {
public final class ConfirmMessageIterator extends MessageIterator {

static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ public void setControlEventEncodingType(EncodingType type) {

byte typeAsByte = (byte) (type.toInt());
// Set those bits to represent 'type'
typeSpecificUpdate |= (typeAsByte << CONTROL_EVENT_ENCODING_START_IDX);
typeSpecificUpdate |= (byte) (typeAsByte << CONTROL_EVENT_ENCODING_START_IDX);

setTypeSpecific(typeSpecificUpdate);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

import java.nio.ByteBuffer;

public class Int32MessageProperty extends MessageProperty {
public final class Int32MessageProperty extends MessageProperty {

public Int32MessageProperty() {
super(PropertyType.INT32, Integer.SIZE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

import java.nio.ByteBuffer;

public class Int64MessageProperty extends MessageProperty {
public final class Int64MessageProperty extends MessageProperty {

public Int64MessageProperty() {
super(PropertyType.INT64, Long.SIZE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ public <T extends InputStream & DataInput> int streamInOld(T input) throws IOExc
}

// Read padding bytes
final long numPaddingBytes = input.readByte();
final int numPaddingBytes = input.readByte();

// Skip padding bytes
if (input.skip(numPaddingBytes - 1) != numPaddingBytes - 1) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import java.io.DataOutput;
import java.io.IOException;

public class MessagePropertyHeader {
public final class MessagePropertyHeader {
// This class represents the header for a message property in a PUT or
// PUSH message.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class OptionHeader implements Streamable {
public final class OptionHeader implements Streamable {
// This class represents the header for an option. In a typical
// implementation usage, every Option class will start by an
// 'OptionHeader' member.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PushHeader {
public final class PushHeader {
static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

// This struct represents the header for a 'PUSH' event. A 'PUSH' event is
Expand Down
Loading
Loading