diff --git a/src/application/Code51Exception.ts b/src/application/Code51Exception.ts new file mode 100644 index 00000000..b7a7518d --- /dev/null +++ b/src/application/Code51Exception.ts @@ -0,0 +1,62 @@ +"use strict" + +import { ResponseCode } from "../util" + +export type TResponseCode = (typeof ResponseCode)[keyof typeof ResponseCode] + +/** + * Provides distinct domain exception for the package. Contains the optional + * RabbitMQ Stream protocol response code for more convenient processing. + * + * @param message A custom error message. + * @param rmqStreamResponseCode The above mentioned response code. + * + * @see https://github.com/rabbitmq/rabbitmq-server/blob/main/deps/rabbitmq_stream/docs/PROTOCOL.adoc#response-codes + * + * @example Selectively manage the exception type and react differently. + * + * ```typescript + * let result: any; + * + * const isRethrowable = (error_: Error) => { + * const isGenericError = error_ instanceof Code51Exception; + * const isNonManagedResponseCode = (error_ as Code51Exception).code !== ResponseCode.NoOffset; + * + * return isGenericError && isNonManagedResponseCode; + * }; + * + * try { + * result = consumer.queryOffset(); + * // ... process result + * } catch (error_) { + * if (isRethrowable(error_)) { throw error_; } + * + * const error = error_ as Code51Exception; + * if (error.code === ResponseCode.NoOffset) { return null; } + * + * return result; + * } + * ``` + * + */ +export default class Code51Exception extends Error { + readonly #code?: TResponseCode + + constructor(message: string, rmqStreamResponseCode?: TResponseCode) { + super(message) + + Object.setPrototypeOf(this, new.target.prototype) + + this.name = this.constructor.name + this.#code = rmqStreamResponseCode ?? undefined + + // Maintains proper stack trace for where our error was thrown (only available on V8) + if (Error.captureStackTrace) { + Error.captureStackTrace(this, this.constructor) + } + } + + public get code(): TResponseCode | undefined { + return this.#code + } +} diff --git a/src/client.ts b/src/client.ts index 1ae9377a..51a532b6 100644 --- a/src/client.ts +++ b/src/client.ts @@ -803,6 +803,51 @@ export interface ConsumerFilter { matchUnfiltered: boolean } +/** + * The parameters to declare a stream consumer. + * + * @param stream A stream name from which the consumer will be consuming messages. + * + * @param consumerRef A named consumer's name (identifier) for server-side offset tracking and single + * active consumer behavior. + * + * @see https://github.com/coders51/rabbitmq-stream-js-client/blob/main/example/src/offset_tracking_example.js + * @see https://github.com/coders51/rabbitmq-stream-js-client/blob/main/example/src/single_active_consumer_update_example.js + * + * @see {@link QueryOffsetParams.reference} + * @see {@link StoreOffsetParams.reference} + * + * @param offset The value object {@link Offset} representing the possible values for the + * RabbitMQ stream offset. Tells the client from where in the stream you want to start consuming. + * Could be "first", "last", "next", a specific numeric offset, or timestamp depending on the Offset type. + * + * @see https://github.com/WhereJuly/rabbitmq-stream-js-client?tab=readme-ov-file#basic-consuming + * @see https://www.rabbitmq.com/blog/2021/09/13/rabbitmq-streams-offset-tracking#the-different-offset-specifications-in-a-stream + * + * @param connectionClosedListener A callback {@link ConnectionClosedListener} invoked if the connection + * is closed (with parameter indicating error or not) to let you react to the consumer’s connection + * being closed. + * + * @param consumerUpdateListener {@link ConsumerUpdateListener} + * + * @param singleActive A flag to indicate "single active consumer" mode. + * Single active consumer provides exclusive consumption and consumption continuity on a stream. + * @see https://www.rabbitmq.com/blog/2022/07/05/rabbitmq-3-11-feature-preview-single-active-consumer-for-streams + * @see https://github.com/coders51/rabbitmq-stream-js-client?tab=readme-ov-file#single-active-consumer + * + * @param filter A filter object {@link ConsumerFilter} specifying consumer filtering criteria. + * @see https://www.rabbitmq.com/docs/stream-filtering#filter-stages-overview + * @see https://github.com/coders51/rabbitmq-stream-js-client?tab=readme-ov-file#filtering + * + * @param creditPolicy {@link ConsumerCreditPolicy} determines if the consumer requests more message chunks from the broker + * while still processing the current chunk. By default only one chunk is processed ensuring + * the messages will be processed in order. + * + * @see https://github.com/coders51/rabbitmq-stream-js-client?tab=readme-ov-file#custom-policy + * + * @param consumerTag A simpler alias/identifier if you need to tag the consumer on the broker side + * for monitoring or naming. + */ export interface DeclareConsumerParams { stream: string consumerRef?: string @@ -829,12 +874,30 @@ export interface SubscribeParams { offset: Offset } +/** + * The parameters to store the concrete named consumer offset at the given stream. + * + * @member reference See {@link QueryOffsetParams.reference} + * @member stream See {@link QueryOffsetParams.stream} + * @member offsetValue The BigInt offset value. + * + * @see https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/BigInt + */ export interface StoreOffsetParams { reference: string stream: string offsetValue: bigint } +/** + * Query offset parameters. + * + * @see https://www.rabbitmq.com/tutorials/tutorial-two-javascript-stream#server-side-offset-tracking + * + * @member reference The named consumer's name. Identifies the concrete stable consumer to + * persistently track the dedicated offset. Also named as `consumerRef` elsewhere. + * @member stream A stream name. + */ export interface QueryOffsetParams { reference: string stream: string diff --git a/src/connection.ts b/src/connection.ts index 5f8b3eef..39c96bef 100644 --- a/src/connection.ts +++ b/src/connection.ts @@ -57,6 +57,7 @@ import { coerce, lt } from "semver" import EventEmitter from "events" import { MetadataUpdateResponse } from "./responses/metadata_update_response" import { MetadataInfo } from "./responses/raw_response" +import Code51Exception, { TResponseCode } from "./application/Code51Exception" export type ConnectionClosedListener = (hadError: boolean) => void @@ -561,15 +562,70 @@ export class Connection { return res.sequence } + /** + * Store the provided offset at the RabbitMQ server in the given stream for the given consumer. + * + * @param StoreOffsetParams The stream name, consumer name and the offset value. + * + * The offset is stored on the given stream as the additional service message not visible to + * a stream consumers but counted for in the RabbitMQ UI. + * + * For streams with millions of messages per second it is recommended to store the offset + * once per a number of messages to not litter the stream and potentially worsen the performance + * when very high throughput is required. + * + * @see https://www.rabbitmq.com/blog/2021/09/13/rabbitmq-streams-offset-tracking#the-dark-side-of-server-side-offset-tracking + * @see https://www.rabbitmq.com/docs/streams#offset-tracking + */ public storeOffset(params: StoreOffsetParams): Promise { return this.send(new StoreOffsetRequest(params)) } + /** + * Return the server-side saved offset or throws {@link Code51Exception} with the + * RabbitMQ response code. + * + * @see https://www.rabbitmq.com/tutorials/tutorial-two-javascript-stream + * @see https://www.rabbitmq.com/blog/2021/09/13/rabbitmq-streams-offset-tracking + * + * @param QueryOffsetParams The stream name and the named consumer identifier object. + * + * @see {@link Connection.connect} + * @see {@link Connection.storeOffset} + * + * @example Consumer reads previously saved server-tracked offset to start consuming + * from the desired offset. + * + * On consumer side create the client, detect the server-side saved offset to detect + * the desired offset to consume from. Then declare a consumer with the desired offset + * and the consumed message handler. + * + * When consuming, the consumer message handler may save the offset server-side + * for the `client.queryOffset()` to be able to further read it. + * + * ```typescript + * // ... create the RabbitMQ client here + * // Detect the decider starting offset for the next stream operation. + * const offset = await client.queryOffset({ reference: 'consumer-x', stream: 'stream-a' }) + * const startWithOffset = offset ? rmqLibrary.Offset.offset(offset + 1n) : + * rmqLibrary.Offset.(); + * const consumer = await client.declareConsumer({stream: 'stream-b', offset: startWithOffset}, + * async (this: StreamConsumer, message: Message) => { await this.storeOffset(message.offset); }); + * // Note the offset is saved by the message handler on the server. + * ``` + * + * @throws {@link Code51Exception} if the server-side offset cannot be retrieved. The exception + * contains the `code` field that equals the RabbitMQ stream protocol response code value. + * + * @see https://github.com/rabbitmq/rabbitmq-server/blob/main/deps/rabbitmq_stream/docs/PROTOCOL.adoc#response-codes + */ public async queryOffset(params: QueryOffsetParams): Promise { this.logger.debug(`Query Offset...`) const res = await this.sendAndWait(new QueryOffsetRequest(params)) if (!res.ok) { - throw new Error(`Query offset command returned error with code ${res.code}`) + const code = res.code as TResponseCode + + throw new Code51Exception(`Query offset command returned error with code ${res.code}`, code) } this.logger.debug(`Query Offset response: ${res.ok} with params: '${inspect(params)}'`) return res.offsetValue diff --git a/src/util.ts b/src/util.ts index 1a4e8567..0ca566e7 100644 --- a/src/util.ts +++ b/src/util.ts @@ -48,9 +48,23 @@ export const wait = async (ms: number) => { }) } +/** + * The RabbitMQ Stream protocol response codes. + * + * Only codes actually used by this package are defined here however any official code + * can be returned by the actually connected RabbitMQ instance. + * + * @see https://github.com/rabbitmq/rabbitmq-server/blob/v3.9.x/deps/rabbitmq_stream/docs/PROTOCOL.adoc#response-codes + * + * @see {@link connection.ts/Connection.queryOffset} + * @see {@link application/Code51Exception} + */ export const ResponseCode = { StreamDoesNotExist: 2, SubscriptionIdDoesNotExist: 4, + + // Used in src/connection.ts/Connection.queryOffset method + NoOffset: 19, } as const export const isString = (value: unknown): boolean => typeof value === "string" diff --git a/test/e2e/offset.test.ts b/test/e2e/offset.test.ts index 000b415f..2d3359c0 100644 --- a/test/e2e/offset.test.ts +++ b/test/e2e/offset.test.ts @@ -13,6 +13,8 @@ import { username, wait, } from "../support/util" +import Code51Exception from "../../src/application/Code51Exception" +import { ResponseCode } from "../../src/util" describe("offset", () => { const rabbit = new Rabbit(username, password) @@ -267,7 +269,7 @@ describe("offset", () => { }) }).timeout(10000) - it("declaring a consumer without consumerRef and querying for the offset should rise an error", async () => { + it("declaring a consumer without consumerRef and querying for the offset should raise an error", async () => { const consumer = await client.declareConsumer( { stream: testStreamName, offset: Offset.first() }, (_message: Message) => { @@ -292,5 +294,24 @@ describe("offset", () => { await wait(200) await expectToThrowAsync(() => consumer.queryOffset(), Error, `This socket has been ended by the other party`) }) + + it("query offset is able to raise Code51Exception with ResponseCode.NoOffset code value set if there is no offset", async () => { + const params = { stream: testStreamName, consumerRef: "my_consumer", offset: Offset.first() } + const handler = (_message: Message) => { return } // prettier-ignore + const consumer = await client.declareConsumer(params, handler) + + try { + await consumer.queryOffset() + + throw new Error("Expected Code51Exception to be thrown") + } catch (error) { + const actual = error as Code51Exception + + expect(actual).instanceOf(Error) + expect(actual).instanceOf(Code51Exception) + expect(actual.code).equals(ResponseCode.NoOffset) + expect(actual.message).contain("error with code 19") + } + }) }) }) diff --git a/test/unit/application/Code51Exception.test.ts b/test/unit/application/Code51Exception.test.ts new file mode 100644 index 00000000..0b16050f --- /dev/null +++ b/test/unit/application/Code51Exception.test.ts @@ -0,0 +1,38 @@ +import { expect } from "chai" + +import Code51Exception from "../../../src/application/Code51Exception" +import { ResponseCode } from "../../../src/util" + +describe("[unit] Code51Exception Test", () => { + it("+constructor() #1: Should create Code51Exception expected default object", () => { + const expected = "A message" + const actual = new Code51Exception(expected) + + expect(actual).instanceOf(Code51Exception) + expect(actual.message).eql(expected) + expect(actual.code).eql(undefined) + }) + + it("+constructor() #2: Should create Code51Exception expected object with expected response code", () => { + const expected = ResponseCode.NoOffset + const actual = new Code51Exception("a message", expected) + + expect(actual.code).eql(expected) + }) + + it("Should throw expected Code51Exception exception", () => { + const expected = { message: "A message", code: ResponseCode.SubscriptionIdDoesNotExist } + + try { + throw new Code51Exception(expected.message, expected.code) + } catch (error_) { + const actual = error_ as Code51Exception + + expect(actual).instanceOf(Code51Exception) + expect(actual.message).eql(expected.message) + expect(actual.code).eql(expected.code) + } + }) +}) + +// Assert: proper stack trace