diff --git a/cspell.json b/cspell.json index 7e29111..3598c67 100644 --- a/cspell.json +++ b/cspell.json @@ -1,4 +1,15 @@ { "language": "en", - "words": ["ackmode", "ampq", "prefetch", "amqpvalue", "RABBITMQ", "Sasl", "Vbin", "akey", "superstream"] + "words": [ + "ackmode", + "akey", + "ampq", + "amqpvalue", + "dedup", + "prefetch", + "RABBITMQ", + "Sasl", + "superstream", + "Vbin" + ] } diff --git a/src/client.ts b/src/client.ts index 1ae9377..b484917 100644 --- a/src/client.ts +++ b/src/client.ts @@ -46,10 +46,29 @@ import { ConsumerCreditPolicy, CreditRequestWrapper, defaultCreditPolicy } from import { PublishConfirmResponse } from "./responses/publish_confirm_response" import { PublishErrorResponse } from "./responses/publish_error_response" +/** + * Callback invoked when a connection is closed + * @param hadError - True if the connection closed due to an error + */ export type ConnectionClosedListener = (hadError: boolean) => void + +/** + * Callback invoked when a publish is confirmed by the server + * @param confirm - The publish confirmation response + * @param connectionId - The ID of the connection that sent the confirmation + */ export type ConnectionPublishConfirmListener = (confirm: PublishConfirmResponse, connectionId: string) => void + +/** + * Callback invoked when a publish error occurs + * @param confirm - The publish error response + * @param connectionId - The ID of the connection that reported the error + */ export type ConnectionPublishErrorListener = (confirm: PublishErrorResponse, connectionId: string) => void +/** + * Parameters for closing connections + */ export type ClosingParams = { closingCode: number; closingReason: string; manuallyClose?: boolean } type ConsumerMappedValue = { connection: Connection; consumer: StreamConsumer; params: DeclareConsumerParams } @@ -66,6 +85,33 @@ type DeliverData = { subscriptionId: number consumerId: string } + +/** + * Main RabbitMQ Stream client for managing connections, publishers, and consumers. + * + * The Client class serves as the primary entry point for interacting with RabbitMQ streams. + * It manages: + * - A locator connection for metadata queries and topology management + * - A connection pool for sharing TCP connections among publishers and consumers + * - Publisher and consumer lifecycle + * - Stream creation and deletion + * + * @example + * ```typescript + * const client = await connect({ + * hostname: 'localhost', + * port: 5552, + * username: 'guest', + * password: 'guest', + * vhost: '/' + * }); + * + * await client.createStream({ stream: 'my-stream' }); + * const publisher = await client.declarePublisher({ stream: 'my-stream' }); + * await publisher.send(Buffer.from('Hello World')); + * await client.close(); + * ``` + */ export class Client { public readonly id: string = randomUUID() private consumers = new Map() @@ -92,6 +138,10 @@ export class Client { this.locatorConnection.registerCompression(compression) } + /** + * Start the client by establishing the locator connection + * @returns A promise that resolves to the client instance + */ public start(): Promise { return this.locatorConnection.start().then( (_res) => { @@ -104,6 +154,17 @@ export class Client { ) } + /** + * Close the client and all associated publishers, consumers, and connections + * @param params - Optional closing parameters with code and reason + * @returns A promise that resolves when the client is fully closed + * @example + * ```typescript + * await client.close(); + * // or with custom parameters + * await client.close({ closingCode: 1, closingReason: 'Shutting down' }); + * ``` + */ public async close(params: ClosingParams = { closingCode: 0, closingReason: "" }) { this.logger.info(`${this.id} Closing client...`) if (this.publisherCounts()) { @@ -117,6 +178,12 @@ export class Client { await this.locatorConnection.close({ ...params, manuallyClose: true }) } + /** + * Query metadata for one or more streams + * @param params - Parameters containing the list of stream names to query + * @returns A promise that resolves to an array of stream metadata + * @throws {Error} If the query returns an error code + */ public async queryMetadata(params: QueryMetadataParams): Promise { const { streams } = params const res = await this.locatorConnection.sendAndWait(new MetadataRequest({ streams })) @@ -129,6 +196,12 @@ export class Client { return streamInfos } + /** + * Query the partitions of a super stream + * @param params - Parameters containing the super stream name + * @returns A promise that resolves to an array of partition stream names + * @throws {Error} If the query returns an error code + */ public async queryPartitions(params: QueryPartitionsParams): Promise { const { superStream } = params const res = await this.locatorConnection.sendAndWait(new PartitionsQuery({ superStream })) @@ -139,6 +212,29 @@ export class Client { return res.streams } + /** + * Declare a publisher for a stream + * + * Publishers are used to send messages to a stream. If a publisherRef is provided, + * deduplication is enabled and the publisher will use monotonically increasing publishing IDs. + * + * @param params - Publisher configuration including stream name and optional publisherRef + * @param filter - Optional filter function for server-side message filtering + * @returns A promise that resolves to a Publisher instance + * @throws {Error} If the declare command fails or filtering is not supported by the broker + * @example + * ```typescript + * // Simple publisher + * const publisher = await client.declarePublisher({ stream: 'my-stream' }); + * await publisher.send(Buffer.from('Hello')); + * + * // Publisher with deduplication + * const dedupPublisher = await client.declarePublisher({ + * stream: 'my-stream', + * publisherRef: 'unique-publisher-ref' + * }); + * ``` + */ public async declarePublisher(params: DeclarePublisherParams, filter?: FilterFunc): Promise { const connection = await this.getConnection(params.stream, "publisher", params.connectionClosedListener) const publisherId = connection.getNextPublisherId() @@ -187,6 +283,42 @@ export class Client { return res.ok } + /** + * Declare a consumer for a stream + * + * Consumers receive messages from a stream starting at a specified offset. + * They can be configured with various options including single active consumer mode, + * filtering, and custom credit policies. + * + * @param params - Consumer configuration including stream, offset, and optional settings + * @param handle - Message handler function that processes received messages + * @param superStreamConsumer - Optional super stream consumer for internal use + * @returns A promise that resolves to a Consumer instance + * @throws {Error} If the consumer cannot be declared or filtering is not supported + * @example + * ```typescript + * // Basic consumer + * const consumer = await client.declareConsumer( + * { stream: 'my-stream', offset: Offset.first() }, + * (message) => console.log(message.content.toString()) + * ); + * + * // Single active consumer with offset tracking + * const sacConsumer = await client.declareConsumer( + * { + * stream: 'my-stream', + * offset: Offset.first(), + * singleActive: true, + * consumerRef: 'my-consumer-group', + * consumerUpdateListener: async (ref, stream) => { + * const offset = await client.queryOffset({ reference: ref, stream }); + * return Offset.offset(offset); + * } + * }, + * (message) => console.log(message.content.toString()) + * ); + * ``` + */ public async declareConsumer( params: DeclareConsumerParams, handle: ConsumerFunc, @@ -301,6 +433,27 @@ export class Client { return Array.from(this.consumers.values()) } + /** + * Create a new stream on the RabbitMQ server + * + * @param params - Stream configuration including name and optional arguments (max-length-bytes, max-age, etc.) + * @returns A promise that resolves to true when the stream is created or already exists + * @throws {Error} If the create command fails with an error other than "already exists" + * @example + * ```typescript + * // Simple stream + * await client.createStream({ stream: 'my-stream' }); + * + * // Stream with retention policy + * await client.createStream({ + * stream: 'my-stream', + * arguments: { + * 'max-length-bytes': 10_000_000_000, // 10GB + * 'max-age': '7D' // 7 days + * } + * }); + * ``` + */ public async createStream(params: { stream: string; arguments?: CreateStreamArguments }): Promise { this.logger.debug(`Create Stream...`) const res = await this.locatorConnection.sendAndWait(new CreateStreamRequest(params)) @@ -315,6 +468,13 @@ export class Client { return res.ok } + /** + * Delete a stream from the RabbitMQ server + * + * @param params - Parameters containing the stream name to delete + * @returns A promise that resolves to true when the stream is deleted + * @throws {Error} If the delete command fails + */ public async deleteStream(params: { stream: string }): Promise { this.logger.debug(`Delete Stream...`) const res = await this.locatorConnection.sendAndWait(new DeleteStreamRequest(params.stream)) @@ -398,6 +558,28 @@ export class Client { return res } + /** + * Restart the client after a connection failure + * + * This method re-establishes all connections (locator, publishers, and consumers) + * and re-declares all publishers and consumers. Useful for automatic reconnection + * after network failures. + * + * @returns A promise that resolves when all connections are restarted + * @example + * ```typescript + * const client = await connect({ + * // ...connection params + * listeners: { + * connection_closed: (hadError) => { + * client.restart() + * .then(() => console.log('Reconnected')) + * .catch(err => console.error('Reconnection failed', err)); + * } + * } + * }); + * ``` + */ public async restart() { this.logger.info(`Restarting client connection ${this.locatorConnection.connectionId}`) const uniqueConnectionIds = new Set() @@ -732,6 +914,23 @@ export class Client { this.logger.info(`Closed consumer with id: ${extendedConsumerId}`) } + /** + * Create and connect a new Client instance + * + * @param params - Connection parameters including hostname, port, credentials, and optional settings + * @param logger - Optional logger instance for debugging + * @returns A promise that resolves to a connected Client instance + * @example + * ```typescript + * const client = await Client.connect({ + * hostname: 'localhost', + * port: 5552, + * username: 'guest', + * password: 'guest', + * vhost: '/' + * }); + * ``` + */ static async connect(params: ClientParams, logger?: Logger): Promise { return new Client(logger ?? new NullLogger(), { ...params, @@ -740,6 +939,9 @@ export class Client { } } +/** + * Listener callbacks for client events + */ export type ClientListenersParams = { metadata_update?: MetadataUpdateListener publish_confirm?: ConnectionPublishConfirmListener @@ -747,6 +949,9 @@ export type ClientListenersParams = { connection_closed?: ConnectionClosedListener } +/** + * TLS/SSL connection parameters for secure connections + */ export interface SSLConnectionParams { key?: string cert?: string @@ -754,6 +959,9 @@ export interface SSLConnectionParams { rejectUnauthorized?: boolean } +/** + * Configuration for load balancer/address resolver mode + */ export type AddressResolverParams = | { enabled: true @@ -761,6 +969,9 @@ export type AddressResolverParams = } | { enabled: false } +/** + * Configuration parameters for connecting to RabbitMQ + */ export interface ClientParams { hostname: string port: number @@ -780,29 +991,54 @@ export interface ClientParams { connectionName?: string } +/** + * Parameters for declaring a publisher + */ export interface DeclarePublisherParams { + /** Name of the stream to publish to */ stream: string + /** Optional reference for deduplication - if provided, enables publishing ID tracking */ publisherRef?: string + /** Optional maximum chunk length for batching */ maxChunkLength?: number + /** Optional callback when the publisher's connection closes */ connectionClosedListener?: ConnectionClosedListener } +/** + * Routing strategy for super stream publishers + */ export type RoutingStrategy = "key" | "hash" +/** + * Parameters for declaring a super stream publisher + */ export interface DeclareSuperStreamPublisherParams { superStream: string publisherRef?: string routingStrategy?: RoutingStrategy } +/** + * Function to filter messages on the client side + */ export type MessageFilter = (msg: Message) => boolean +/** + * Configuration for message filtering (both server-side and client-side) + */ export interface ConsumerFilter { + /** Filter values for server-side bloom filter */ values: string[] + /** Optional client-side post-filter function */ postFilterFunc: MessageFilter + /** Whether to match messages without filter tags */ matchUnfiltered: boolean } +/** + * Parameters for declaring a consumer + */ export interface DeclareConsumerParams { stream: string consumerRef?: string @@ -848,6 +1084,59 @@ export interface QueryPartitionsParams { superStream: string } +/** + * Connect to RabbitMQ and create a new Client instance + * + * This is the main entry point for creating a connection to RabbitMQ streams. + * It establishes a locator connection for metadata queries and sets up connection pooling. + * + * @param params - Connection parameters including hostname, port, credentials, and optional settings + * @param logger - Optional logger instance for debugging and monitoring + * @returns A promise that resolves to a connected Client instance + * @throws {Error} If connection fails (invalid credentials, network issues, etc.) + * @example + * ```typescript + * // Basic connection + * const client = await connect({ + * hostname: 'localhost', + * port: 5552, + * username: 'guest', + * password: 'guest', + * vhost: '/' + * }); + * + * // Connection with TLS + * const secureClient = await connect({ + * hostname: 'rabbitmq.example.com', + * port: 5551, + * username: 'user', + * password: 'pass', + * vhost: '/', + * ssl: { + * cert: fs.readFileSync('client-cert.pem'), + * key: fs.readFileSync('client-key.pem'), + * ca: fs.readFileSync('ca-cert.pem') + * } + * }); + * + * // Connection with listeners + * const client = await connect({ + * hostname: 'localhost', + * port: 5552, + * username: 'guest', + * password: 'guest', + * vhost: '/', + * listeners: { + * connection_closed: (hadError) => { + * console.log('Connection closed', hadError); + * }, + * publish_confirm: (confirm, connId) => { + * console.log('Message confirmed', confirm.publishingId); + * } + * } + * }); + * ``` + */ export function connect(params: ClientParams, logger?: Logger): Promise { return Client.connect(params, logger) } diff --git a/src/connection.ts b/src/connection.ts index 5f8b3ee..97d0c17 100644 --- a/src/connection.ts +++ b/src/connection.ts @@ -58,27 +58,47 @@ import EventEmitter from "events" import { MetadataUpdateResponse } from "./responses/metadata_update_response" import { MetadataInfo } from "./responses/raw_response" +/** + * Callback invoked when a connection closes + */ export type ConnectionClosedListener = (hadError: boolean) => void +/** + * Extended listener parameters for connections, including delivery and consumer update listeners + */ export type ConnectionListenersParams = ClientListenersParams & { deliverV1?: DeliverListener deliverV2?: DeliverV2Listener consumer_update_query?: ConsumerUpdateQueryListener } +/** + * Parameters for creating a connection + */ export type ConnectionParams = ClientParams & { listeners?: ConnectionListenersParams connectionId: string } +/** + * Information about the current state of a connection + */ export type ConnectionInfo = { + /** The host the connection is connected to */ host: string + /** The port the connection is connected to */ port: number + /** Unique identifier for this connection */ id: string + /** Whether the connection handshake has completed */ ready: boolean + /** The virtual host */ vhost: string + /** Whether the socket is readable */ readable?: boolean + /** Whether the socket is writable */ writable?: boolean + /** Local port number of the connection */ localPort?: number } @@ -91,6 +111,20 @@ type ListenerEntry = { stream: string } +/** + * Represents a TCP/TLS connection to a RabbitMQ node + * + * The Connection class handles: + * - Low-level socket management (TCP/TLS) + * - Stream protocol handshake (peer properties, SASL auth, tuning, open) + * - Request/response correlation and routing + * - Heartbeat management + * - Message encoding/decoding via ResponseDecoder + * - Publisher and consumer ID generation + * + * Connections can be pooled and shared among multiple publishers/consumers + * based on stream name, vhost, host, and purpose (publisher/consumer). + */ export class Connection { public readonly hostname: string public readonly vhost: string @@ -200,6 +234,14 @@ export class Connection { this.socket.removeAllListeners("close") } + /** + * Restart the connection after a failure + * + * This recreates the socket and re-establishes the connection including + * the full handshake process (peer properties, auth, tuning, open). + * + * @returns A promise that resolves when the connection is restarted + */ public async restart() { this.unregisterSocketListeners() this.socket = this.createSocket() @@ -207,15 +249,34 @@ export class Connection { this.logSocket("restarted") } + /** + * Create and connect a new Connection instance + * + * @param params - Connection parameters + * @param logger - Logger instance + * @returns A promise that resolves to a connected Connection + */ public static connect(params: ConnectionParams, logger: Logger): Promise { const connection = Connection.create(params, logger) return connection.start() } + /** + * Create a new Connection instance without connecting + * + * @param params - Connection parameters + * @param logger - Logger instance + * @returns A new Connection instance + */ public static create(params: ConnectionParams, logger: Logger): Connection { return new Connection(params, logger) } + /** + * Start the connection by registering listeners and initiating the socket connection + * + * @returns A promise that resolves to the connection once handshake completes + */ public start(): Promise { this.registerListeners(this.params.listeners) return this.registerSocketListeners() @@ -345,6 +406,16 @@ export class Connection { return checkServerDeclaredVersions(this.serverVersions, this.logger, this.peerProperties.version) } + /** + * Send a request and wait for its response + * + * This method sends a request with a correlation ID and waits for the matching response. + * The correlation ID is used to match requests with their responses. + * + * @param cmd - The request to send + * @returns A promise that resolves to the response + * @throws {Error} If the socket write fails or the response indicates an error + */ public sendAndWait(cmd: Request): Promise { return new Promise((res, rej) => { const correlationId = this.incCorrelationId() @@ -385,6 +456,11 @@ export class Connection { }) } + /** + * Get information about the current connection state + * + * @returns Connection information including host, port, ready state, and socket state + */ public getConnectionInfo(): ConnectionInfo { return { host: this.serverEndpoint.host, @@ -422,6 +498,14 @@ export class Connection { return res } + /** + * Send a request without waiting for a response + * + * Used for fire-and-forget commands like credit requests or heartbeats. + * + * @param cmd - The request to send + * @returns A promise that resolves when the request is sent + */ public send(cmd: Request): Promise { return new Promise((res, rej) => { const bufferSizeParams = this.getBufferSizeParams() @@ -536,6 +620,14 @@ export class Connection { return Math.min(this.frameMax, tuneResponseFrameMax) } + /** + * Close the connection + * + * Stops heartbeat, sends close request, and ends the socket. + * + * @param params - Optional closing parameters with code and reason + * @returns A promise that resolves when the connection is closed + */ public async close(params: ClosingParams = { closingCode: 0, closingReason: "" }): Promise { this.logger.info(`Closing connection...`) this.logger.info(`Stopping heartbeat...`) @@ -547,6 +639,15 @@ export class Connection { this.socket.end() } + /** + * Query the last publishing ID for a publisher reference + * + * Used for deduplication to get the last published sequence number for a publisher. + * + * @param params - Stream name and publisher reference + * @returns A promise that resolves to the last publishing sequence number + * @throws {Error} If the query fails + */ public async queryPublisherSequence(params: { stream: string; publisherRef: string }): Promise { const res = await this.sendAndWait(new QueryPublisherRequest(params)) if (!res.ok) { @@ -561,10 +662,23 @@ export class Connection { return res.sequence } + /** + * Store an offset on the server for a consumer reference + * + * @param params - Reference name, stream name, and offset value to store + * @returns A promise that resolves when the offset is stored + */ public storeOffset(params: StoreOffsetParams): Promise { return this.send(new StoreOffsetRequest(params)) } + /** + * Query a stored offset from the server for a consumer reference + * + * @param params - Reference name and stream name + * @returns A promise that resolves to the stored offset value + * @throws {Error} If the query fails + */ public async queryOffset(params: QueryOffsetParams): Promise { this.logger.debug(`Query Offset...`) const res = await this.sendAndWait(new QueryOffsetRequest(params)) diff --git a/src/consumer.ts b/src/consumer.ts index 3476b61..42cabe4 100644 --- a/src/consumer.ts +++ b/src/consumer.ts @@ -5,15 +5,40 @@ import { ConsumerCreditPolicy, defaultCreditPolicy } from "./consumer_credit_pol import { Message } from "./publisher" import { Offset } from "./requests/subscribe_request" +/** + * Message handler function for processing consumed messages + */ export type ConsumerFunc = (message: Message) => Promise | void + +/** + * Listener invoked when a single active consumer becomes active + * + * Returns the offset from which the consumer should start consuming. + * Typically used to restore the last processed offset from a database. + */ export type ConsumerUpdateListener = (consumerRef: string, streamName: string) => Promise + +/** + * Compute an extended consumer ID that includes the connection ID + * + * @param consumerId - The numeric consumer ID + * @param connectionId - The connection ID + * @returns An extended consumer ID in the format "consumerId@connectionId" + */ export const computeExtendedConsumerId = (consumerId: number, connectionId: string) => { return `${consumerId}@${connectionId}` } +/** + * Interface for consuming messages from a RabbitMQ stream + * + * Consumers receive messages from a stream starting at a specified offset. + * They support features like offset tracking, single active consumer mode, + * and credit-based flow control. + */ export interface Consumer { /** - * Close the publisher + * Close the consumer and release the connection */ close(): Promise @@ -55,6 +80,41 @@ export interface Consumer { readonly extendedId: string } +/** + * Implementation of a stream consumer + * + * StreamConsumer handles message consumption from a RabbitMQ stream with features: + * - Automatic offset tracking + * - Credit-based flow control for back-pressure + * - Single active consumer support for high availability + * - Server-side and client-side offset storage + * - Message filtering + * + * The consumer uses a credit policy to control how many message chunks are buffered. + * The default policy processes chunks sequentially to maintain message order. + * + * @example + * ```typescript + * // Basic consumer + * const consumer = await client.declareConsumer( + * { stream: 'my-stream', offset: Offset.first() }, + * (message) => console.log(message.content.toString()) + * ); + * + * // Consumer with offset tracking + * const consumer = await client.declareConsumer( + * { + * stream: 'my-stream', + * offset: Offset.first(), + * consumerRef: 'my-consumer' + * }, + * async (message) => { + * await processMessage(message); + * await consumer.storeOffset(); // Store current offset + * } + * ); + * ``` + */ export class StreamConsumer implements Consumer { private connection: Connection private stream: string diff --git a/src/publisher.ts b/src/publisher.ts index 8fee00f..1aec9ce 100644 --- a/src/publisher.ts +++ b/src/publisher.ts @@ -13,10 +13,26 @@ import { PublishErrorResponse } from "./responses/publish_error_response" import { DEFAULT_UNLIMITED_FRAME_MAX } from "./util" import { MetadataUpdateListener } from "./response_decoder" +/** + * Application-specific properties attached to a message (key-value pairs) + */ export type MessageApplicationProperties = Record + +/** + * Message annotations for internal messaging properties + */ export type MessageAnnotations = Record + +/** + * Allowed value types for message annotations + */ export type MessageAnnotationsValue = string | number | AmqpByte +/** + * Represents a single byte value for AMQP encoding + * + * Used when you need to explicitly specify a byte-sized value in message annotations. + */ export class AmqpByte { private value: number @@ -32,6 +48,11 @@ export class AmqpByte { } } +/** + * AMQP 1.0 message properties + * + * Standard message properties as defined in the AMQP 1.0 specification. + */ export interface MessageProperties { contentType?: string contentEncoding?: string @@ -48,6 +69,11 @@ export interface MessageProperties { replyToGroupId?: string } +/** + * AMQP 1.0 message header + * + * Standard header fields as defined in the AMQP 1.0 specification. + */ export interface MessageHeader { durable?: boolean priority?: number @@ -56,6 +82,11 @@ export interface MessageHeader { deliveryCount?: number } +/** + * A message in a RabbitMQ stream + * + * Represents a complete message with content and optional AMQP 1.0 properties. + */ export interface Message { content: Buffer messageProperties?: MessageProperties @@ -66,6 +97,11 @@ export interface Message { offset?: bigint } +/** + * Options for sending a message + * + * Properties and metadata that can be attached when sending a message. + */ export interface MessageOptions { messageProperties?: MessageProperties applicationProperties?: Record @@ -73,10 +109,26 @@ export interface MessageOptions { publishingId?: bigint } +/** + * Compute an extended publisher ID that includes the connection ID + * + * @param publisherId - The numeric publisher ID + * @param connectionId - The connection ID + * @returns An extended publisher ID in the format "publisherId@connectionId" + */ export const computeExtendedPublisherId = (publisherId: number, connectionId: string) => { return `${publisherId}@${connectionId}` } +/** + * Interface for publishing messages to a RabbitMQ stream + * + * Publishers send messages to streams with features: + * - Automatic batching and flushing + * - Deduplication via publishing IDs + * - Sub-batch entry publishing with optional compression + * - Server-side filtering support + */ export interface Publisher { /** * Sends a message in the stream @@ -153,9 +205,61 @@ export interface Publisher { readonly extendedId: string } +/** + * Function to extract a filter value from a message for server-side filtering + * + * Returns a string tag that will be used for bloom filter-based routing. + */ export type FilterFunc = (msg: Message) => string | undefined + +/** + * Callback for publish confirmations and errors + */ type PublishConfirmCallback = (err: number | null, publishingIds: bigint[]) => void + +/** + * Result of a send operation + */ export type SendResult = { sent: boolean; publishingId: bigint; publisherId: number; connectionId: string } + +/** + * Implementation of a stream publisher + * + * StreamPublisher handles message publishing to a RabbitMQ stream with features: + * - Automatic message batching based on maxChunkLength + * - Deduplication via monotonically increasing publishing IDs + * - Server-side filtering via filter functions + * - Sub-batch entry publishing with optional compression (Gzip, etc.) + * - Frame size validation to prevent oversized messages + * + * Messages are queued internally and sent in batches for optimal performance. + * The publisher automatically schedules flushes when the queue reaches maxChunkLength. + * + * @example + * ```typescript + * // Basic publisher + * const publisher = await client.declarePublisher({ stream: 'my-stream' }); + * await publisher.send(Buffer.from('Hello World')); + * await publisher.close(); + * + * // Publisher with deduplication + * const dedupPublisher = await client.declarePublisher({ + * stream: 'my-stream', + * publisherRef: 'my-publisher-ref' + * }); + * await dedupPublisher.send(Buffer.from('Message 1')); + * await dedupPublisher.send(Buffer.from('Message 2')); + * + * // Publisher with filtering + * const filterPublisher = await client.declarePublisher( + * { stream: 'my-stream' }, + * (msg) => msg.applicationProperties?.region + * ); + * await filterPublisher.send(Buffer.from('Data'), { + * applicationProperties: { region: 'us-east-1' } + * }); + * ``` + */ export class StreamPublisher implements Publisher { private connection: Connection private stream: string