From 41b0354005ca1b620640c096a195855b248313fe Mon Sep 17 00:00:00 2001 From: Mike Christensen Date: Tue, 9 Dec 2025 17:21:45 +0000 Subject: [PATCH 1/5] ait/token-streaming: add message per token page --- src/data/nav/aitransport.ts | 9 +++++++++ .../features/token-streaming/message-per-token.mdx | 4 ++++ 2 files changed, 13 insertions(+) create mode 100644 src/pages/docs/ai-transport/features/token-streaming/message-per-token.mdx diff --git a/src/data/nav/aitransport.ts b/src/data/nav/aitransport.ts index 53699c56e5..a0cea2f5cc 100644 --- a/src/data/nav/aitransport.ts +++ b/src/data/nav/aitransport.ts @@ -18,6 +18,15 @@ export default { }, ], }, + { + name: 'Token streaming', + pages: [ + { + name: 'Message per token', + link: '/docs/ai-transport/features/token-streaming/message-per-token', + }, + ], + }, ], api: [], } satisfies NavProduct; diff --git a/src/pages/docs/ai-transport/features/token-streaming/message-per-token.mdx b/src/pages/docs/ai-transport/features/token-streaming/message-per-token.mdx new file mode 100644 index 0000000000..ce3b34eba0 --- /dev/null +++ b/src/pages/docs/ai-transport/features/token-streaming/message-per-token.mdx @@ -0,0 +1,4 @@ +--- +title: Message per token +meta_description: "Stream individual tokens from AI models as separate messages over Ably." +--- From cdcbecb90b0b7c4f778044d5d8cbaa18a5c51994 Mon Sep 17 00:00:00 2001 From: Mike Christensen Date: Tue, 9 Dec 2025 18:11:47 +0000 Subject: [PATCH 2/5] ait/message-per-token: add intro Add intro describing the pattern, its properties, and use cases. --- .../features/token-streaming/message-per-token.mdx | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/src/pages/docs/ai-transport/features/token-streaming/message-per-token.mdx b/src/pages/docs/ai-transport/features/token-streaming/message-per-token.mdx index ce3b34eba0..1b7f6b9920 100644 --- a/src/pages/docs/ai-transport/features/token-streaming/message-per-token.mdx +++ b/src/pages/docs/ai-transport/features/token-streaming/message-per-token.mdx @@ -2,3 +2,12 @@ title: Message per token meta_description: "Stream individual tokens from AI models as separate messages over Ably." --- + +Token streaming with message-per-token is a pattern where every token generated by your model is published as its own Ably message. Each token then appears as one message in the channel history. + +This pattern is useful when clients only care about the most recent part of a response and you are happy to treat the channel history as a short sliding window rather than a full conversation log. For example: + +- **Backend-stored responses**: The backend writes complete responses to a database and clients load those full responses from there, while Ably is used only to deliver live tokens for the current in-progress response. +- **Live transcription, captioning, or translation**: A viewer who joins a live stream only needs the last few tokens for the current "frame" of subtitles, not the entire transcript so far. +- **Code assistance in an editor**: Streamed tokens become part of the file on disk as they are accepted, so past tokens do not need to be replayed from Ably. +- **Autocomplete**: A fresh response is streamed for each change a user makes to a document, with only the latest suggestion being relevant. From 8ae4a5bb6adf2c18bd80cdd90cdce3d7b86944c4 Mon Sep 17 00:00:00 2001 From: Mike Christensen Date: Tue, 9 Dec 2025 18:59:37 +0000 Subject: [PATCH 3/5] ait/message-per-token: add token publishing Includes continuous token streams, correlating tokens for distinct responses, and explicit start/end events. --- .../token-streaming/message-per-token.mdx | 134 ++++++++++++++++++ 1 file changed, 134 insertions(+) diff --git a/src/pages/docs/ai-transport/features/token-streaming/message-per-token.mdx b/src/pages/docs/ai-transport/features/token-streaming/message-per-token.mdx index 1b7f6b9920..1571d0473e 100644 --- a/src/pages/docs/ai-transport/features/token-streaming/message-per-token.mdx +++ b/src/pages/docs/ai-transport/features/token-streaming/message-per-token.mdx @@ -11,3 +11,137 @@ This pattern is useful when clients only care about the most recent part of a re - **Live transcription, captioning, or translation**: A viewer who joins a live stream only needs the last few tokens for the current "frame" of subtitles, not the entire transcript so far. - **Code assistance in an editor**: Streamed tokens become part of the file on disk as they are accepted, so past tokens do not need to be replayed from Ably. - **Autocomplete**: A fresh response is streamed for each change a user makes to a document, with only the latest suggestion being relevant. + + +To get started with token streaming, all you need to do is: + +* [Use a channel](#use) +* [Publish tokens from your server](#publish) +* [Subscribe to the token stream](#subscribe) + +## Use a channel + +[Channels](/docs/channels) separate message traffic into different topics. For token streaming, each conversation or session typically has its own channel. + +Use the [`get()`](/docs/api/realtime-sdk/channels#get) method to create or retrieve a channel instance: + + +```javascript +const channel = realtime.channels.get('{{RANDOM_CHANNEL_NAME}}'); +``` + + +## Publish tokens from your server + +Publishing tokens to a channel is how your AI agent communicates responses to clients. Subscribers receive tokens in realtime as they're published. + + + +Initialize an Ably Realtime client on your server: + + +```javascript +import Ably from 'ably'; + +const realtime = new Ably.Realtime({ key: 'YOUR_API_KEY' }); +``` + + +### Continuous token stream + +For simple streaming scenarios such as live transcription, where all tokens are part of a continuous stream, simply publish each token as a message on the channel: + + +```javascript +const channel = realtime.channels.get('{{RANDOM_CHANNEL_NAME}}'); + +// Example: stream returns events like { type: 'token', text: 'Hello' } +for await (const event of stream) { + if (event.type === 'token') { + await channel.publish('token', event.text); + } +} +``` + + +### Token stream with distinct responses + +For applications with multiple, distinct responses, such as chat conversations, include a `responseId` in message [extras](/docs/messages#properties) to correlate tokens together that belong to the same response: + + +```javascript +const channel = realtime.channels.get('{{RANDOM_CHANNEL_NAME}}'); + +// Example: stream returns events like { type: 'token', text: 'Hello', responseId: 'resp_abc123' } +for await (const event of stream) { + if (event.type === 'token') { + await channel.publish({ + name: 'token', + data: event.text, + extras: { + headers: { + responseId: event.responseId + } + } + }); + } +} +``` + + +Clients use the `responseId` to group tokens belonging to the same response. + +### Token stream with explicit start/end events + +In some cases, your AI model response stream may include explicit events to mark response boundaries: + + +```javascript +const channel = realtime.channels.get('{{RANDOM_CHANNEL_NAME}}'); + +// Example: stream returns events like: +// { type: 'start', responseId: 'resp_abc123', metadata: { model: 'llama-3' } } +// { type: 'token', responseId: 'resp_abc123', text: 'Hello' } +// { type: 'end', responseId: 'resp_abc123' } + +for await (const event of stream) { + if (event.type === 'start') { + // Publish response start + await channel.publish({ + name: 'response.start', + extras: { + headers: { + responseId: event.responseId, + model: event.metadata?.model + } + } + }); + } else if (event.type === 'token') { + // Publish tokens + await channel.publish({ + name: 'token', + data: event.text, + extras: { + headers: { + responseId: event.responseId + } + } + }); + } else if (event.type === 'end') { + // Publish response complete + await channel.publish({ + name: 'response.complete', + extras: { + headers: { + responseId: event.responseId + } + } + }); + } +} +``` + + +This pattern provides explicit boundaries, making it easier for clients to manage response state. From c20195c7cb735de40d85aa1528b57f7280427906 Mon Sep 17 00:00:00 2001 From: Mike Christensen Date: Tue, 9 Dec 2025 21:34:29 +0000 Subject: [PATCH 4/5] ait/message-per-token: token streaming patterns Splits each token streaming approach into distinct patterns and shows both the publish and subscribe side behaviour alongside one another. --- .../token-streaming/message-per-token.mdx | 171 +++++++++++++----- 1 file changed, 128 insertions(+), 43 deletions(-) diff --git a/src/pages/docs/ai-transport/features/token-streaming/message-per-token.mdx b/src/pages/docs/ai-transport/features/token-streaming/message-per-token.mdx index 1571d0473e..f0e7d12442 100644 --- a/src/pages/docs/ai-transport/features/token-streaming/message-per-token.mdx +++ b/src/pages/docs/ai-transport/features/token-streaming/message-per-token.mdx @@ -3,7 +3,7 @@ title: Message per token meta_description: "Stream individual tokens from AI models as separate messages over Ably." --- -Token streaming with message-per-token is a pattern where every token generated by your model is published as its own Ably message. Each token then appears as one message in the channel history. +Token streaming with message-per-token is a pattern where every token generated by your model is published as its own Ably message. Each token then appears as one message in the channel history. This uses [Ably Pub/Sub](/docs/basics) for realtime communication between agents and clients. This pattern is useful when clients only care about the most recent part of a response and you are happy to treat the channel history as a short sliding window rather than a full conversation log. For example: @@ -12,14 +12,9 @@ This pattern is useful when clients only care about the most recent part of a re - **Code assistance in an editor**: Streamed tokens become part of the file on disk as they are accepted, so past tokens do not need to be replayed from Ably. - **Autocomplete**: A fresh response is streamed for each change a user makes to a document, with only the latest suggestion being relevant. +## Publishing tokens -To get started with token streaming, all you need to do is: - -* [Use a channel](#use) -* [Publish tokens from your server](#publish) -* [Subscribe to the token stream](#subscribe) - -## Use a channel +Publish tokens from a [Realtime](/docs/api/realtime-sdk) client, which maintains a persistent connection to the Ably service. This allows you to publish at very high message rates with the lowest possible latencies, while preserving guarantees around message delivery order. For more information, see [Realtime and REST](/docs/basics#realtime-and-rest). [Channels](/docs/channels) separate message traffic into different topics. For token streaming, each conversation or session typically has its own channel. @@ -31,27 +26,37 @@ const channel = realtime.channels.get('{{RANDOM_CHANNEL_NAME}}'); ``` -## Publish tokens from your server - -Publishing tokens to a channel is how your AI agent communicates responses to clients. Subscribers receive tokens in realtime as they're published. - - - -Initialize an Ably Realtime client on your server: +When publishing tokens, don't await the `channel.publish()` call. Ably rolls up acknowledgments and debounces them for efficiency, which means awaiting each publish would unnecessarily slow down your token stream. Messages are still published in the order that `publish()` is called, so delivery order is not affected. ```javascript -import Ably from 'ably'; +// ✅ Do this - publish without await for maximum throughput +for await (const event of stream) { + if (event.type === 'token') { + channel.publish('token', event.text); + } +} -const realtime = new Ably.Realtime({ key: 'YOUR_API_KEY' }); +// ❌ Don't do this - awaiting each publish reduces throughput +for await (const event of stream) { + if (event.type === 'token') { + await channel.publish('token', event.text); + } +} ``` -### Continuous token stream +This approach maximizes throughput while maintaining ordering guarantees, allowing you to stream tokens as fast as your AI model generates them. -For simple streaming scenarios such as live transcription, where all tokens are part of a continuous stream, simply publish each token as a message on the channel: +## Streaming patterns + +Ably is a pub/sub messaging platform, so you can structure your messages however works best for your application. Below are common patterns for streaming tokens, each showing both agent-side publishing and client-side subscription. Choose the approach that fits your use case, or create your own variation. + +### Continuous token stream + +For simple streaming scenarios such as live transcription, where all tokens are part of a continuous stream, simply publish each token as a message. + +#### Publish tokens ```javascript @@ -60,15 +65,33 @@ const channel = realtime.channels.get('{{RANDOM_CHANNEL_NAME}}'); // Example: stream returns events like { type: 'token', text: 'Hello' } for await (const event of stream) { if (event.type === 'token') { - await channel.publish('token', event.text); + channel.publish('token', event.text); } } ``` -### Token stream with distinct responses +#### Subscribe to tokens + + +```javascript +const channel = realtime.channels.get('{{RANDOM_CHANNEL_NAME}}'); + +// Subscribe to token messages +await channel.subscribe('token', (message) => { + const token = message.data; + console.log(token); // log each token as it arrives +}); +``` + + +This pattern is simple and works well when you're displaying a single, continuous stream of tokens. -For applications with multiple, distinct responses, such as chat conversations, include a `responseId` in message [extras](/docs/messages#properties) to correlate tokens together that belong to the same response: +### Token stream with multiple responses + +For applications with multiple responses, such as chat conversations, include a `responseId` in message [extras](/docs/messages#properties) to correlate tokens together that belong to the same response. + +#### Publish tokens ```javascript @@ -77,7 +100,7 @@ const channel = realtime.channels.get('{{RANDOM_CHANNEL_NAME}}'); // Example: stream returns events like { type: 'token', text: 'Hello', responseId: 'resp_abc123' } for await (const event of stream) { if (event.type === 'token') { - await channel.publish({ + channel.publish({ name: 'token', data: event.text, extras: { @@ -91,36 +114,66 @@ for await (const event of stream) { ``` -Clients use the `responseId` to group tokens belonging to the same response. +#### Subscribe to tokens + +Use the `responseId` header in message extras to correlate tokens. The `responseId` allows you to group tokens belonging to the same response and correctly handle token delivery for multiple responses, even when delivered concurrently. + + +```javascript +const channel = realtime.channels.get('{{RANDOM_CHANNEL_NAME}}'); + +// Track responses by ID +const responses = new Map(); + +await channel.subscribe('token', (message) => { + const token = message.data; + const responseId = message.extras?.headers?.responseId; + + if (!responseId) { + console.warn('Token missing responseId'); + return; + } + + // Create an empty response + if (!responses.has(responseId)) { + responses.set(responseId, ''); + } + + // Append token to response + responses.set(responseId, responses.get(responseId) + token); +}); +``` + + +### Token stream with explicit start/stop events -### Token stream with explicit start/end events +In some cases, your AI model response stream may include explicit events to mark response boundaries. You can indicate the event type, such as a response start/stop event, using the Ably message name. -In some cases, your AI model response stream may include explicit events to mark response boundaries: +#### Publish tokens ```javascript const channel = realtime.channels.get('{{RANDOM_CHANNEL_NAME}}'); // Example: stream returns events like: -// { type: 'start', responseId: 'resp_abc123', metadata: { model: 'llama-3' } } -// { type: 'token', responseId: 'resp_abc123', text: 'Hello' } -// { type: 'end', responseId: 'resp_abc123' } +// { type: 'message_start', responseId: 'resp_abc123' } +// { type: 'message_delta', responseId: 'resp_abc123', text: 'Hello' } +// { type: 'message_stop', responseId: 'resp_abc123' } for await (const event of stream) { - if (event.type === 'start') { + if (event.type === 'message_start') { // Publish response start - await channel.publish({ - name: 'response.start', + channel.publish({ + name: 'start', extras: { headers: { - responseId: event.responseId, - model: event.metadata?.model + responseId: event.responseId } } }); - } else if (event.type === 'token') { + } else if (event.type === 'message_delta') { // Publish tokens - await channel.publish({ + channel.publish({ name: 'token', data: event.text, extras: { @@ -129,10 +182,10 @@ for await (const event of stream) { } } }); - } else if (event.type === 'end') { - // Publish response complete - await channel.publish({ - name: 'response.complete', + } else if (event.type === 'message_stop') { + // Publish response stop + channel.publish({ + name: 'stop', extras: { headers: { responseId: event.responseId @@ -144,4 +197,36 @@ for await (const event of stream) { ``` -This pattern provides explicit boundaries, making it easier for clients to manage response state. +#### Subscribe to tokens + +Handle each event type to manage response lifecycle: + + +```javascript +const channel = realtime.channels.get('{{RANDOM_CHANNEL_NAME}}'); + +const responses = new Map(); + +// Handle response start +await channel.subscribe('start', (message) => { + const responseId = message.extras?.headers?.responseId; + responses.set(responseId, ''); +}); + +// Handle tokens +await channel.subscribe('token', (message) => { + const responseId = message.extras?.headers?.responseId; + const token = message.data; + + const currentText = responses.get(responseId) || ''; + responses.set(responseId, currentText + token); +}); + +// Handle response stop +await channel.subscribe('stop', (message) => { + const responseId = message.extras?.headers?.responseId; + const finalText = responses.get(responseId); + console.log('Response complete:', finalText); +}); +``` + From 52e32d8044edcfce8652c7c40cf40c6bd0490661 Mon Sep 17 00:00:00 2001 From: Mike Christensen Date: Wed, 10 Dec 2025 09:51:35 +0000 Subject: [PATCH 5/5] ait/message-per-token: client hydration patterns Includes hydration with rewind and hydration with persisted history + untilAttach. Describes the pattern for handling in-progress live responses with complete responses loaded from the database. --- .../token-streaming/message-per-token.mdx | 197 ++++++++++++++++++ 1 file changed, 197 insertions(+) diff --git a/src/pages/docs/ai-transport/features/token-streaming/message-per-token.mdx b/src/pages/docs/ai-transport/features/token-streaming/message-per-token.mdx index f0e7d12442..7e0f48e794 100644 --- a/src/pages/docs/ai-transport/features/token-streaming/message-per-token.mdx +++ b/src/pages/docs/ai-transport/features/token-streaming/message-per-token.mdx @@ -230,3 +230,200 @@ await channel.subscribe('stop', (message) => { }); ``` + +## Client hydration + +When clients connect or reconnect, such as after a page refresh, they often need to catch up on tokens that were published while they were offline or before they joined. Ably provides several approaches to hydrate client state depending on your application's requirements. + + + +### Using rewind for recent history + +The simplest approach is to use Ably's [rewind](/docs/channels/options/rewind) channel option to automatically retrieve recent tokens when attaching to a channel: + + +```javascript +// Use rewind to receive recent historical messages +const channel = realtime.channels.get('{{RANDOM_CHANNEL_NAME}}', { + params: { rewind: '2m' } // or rewind: 100 for message count +}); + +// Subscribe to receive both recent historical and live messages, +// which are delivered in order to the subscription +await channel.subscribe('token', (message) => { + const token = message.data; + + // Process tokens from both recent history and live stream + console.log('Token received:', token); +}); +``` + + +Rewind supports two formats: + +- **Time-based**: Use a time interval like `'30s'` or `'2m'` to retrieve messages from that time period +- **Count-based**: Use a number like `50` or `100` to retrieve the most recent N messages (maximum 100) + + + +By default, rewind is limited to the last 2 minutes of messages. This is usually sufficient for scenarios where clients need only recent context, such as for continuous token streaming, or when the response stream from a given model request does not exceed 2 minutes. If you need more than 2 minutes of history, see [Using history for longer persistence](#history). + +### Using history for longer persistence + +For applications that need to retrieve tokens beyond the 2-minute rewind window, enable [persistence](/docs/storage-history/storage#all-message-persistence) on your channel. Use [channel history](/docs/storage-history/history) with the [`untilAttach` option](/docs/storage-history/history#continuous-history) to paginate back through history to obtain historical tokens, while preserving continuity with the delivery of live tokens: + + +```javascript +// Use a channel in a namespace called 'persisted', which has persistence enabled +const channel = realtime.channels.get('persisted:{{RANDOM_CHANNEL_NAME}}'); + +let response = ''; + +// Subscribe to live messages (implicitly attaches the channel) +await channel.subscribe('token', (message) => { + // Append the token to the end of the response + response += message.data; +}); + +// Fetch history up until the point of attachment +let page = await channel.history({ untilAttach: true }); + +// Paginate backwards through history +while (page) { + // Messages are newest-first, so prepend them to response + for (const message of page.items) { + response = message.data + response; + } + + // Move to next page if available + page = page.hasNext() ? await page.next() : null; +} +``` + + +### Hydrating an in-progress live response + +A common pattern is to persist complete model responses in your database while using Ably for live token delivery of the in-progress response. + +The client loads completed responses from your database, then reaches back into Ably channel history until it encounters a token for a response it's already loaded. + +You can retrieve partial history using either the [rewind](#rewind) or [history](#history) pattern. + +#### Hydrate using rewind + +Load completed responses from your database, then use rewind to catch up on any in-progress responses, skipping any tokens that belong to a response that was already loaded: + + +```javascript +// Load completed responses from database +const completedResponses = await loadResponsesFromDatabase(); + +// Use rewind to receive recent historical messages +const channel = realtime.channels.get('{{RANDOM_CHANNEL_NAME}}', { + params: { rewind: '2m' } +}); + +// Track in progress responses by ID +const inProgressResponses = new Map(); + +// Subscribe to receive both recent historical and live messages, +// which are delivered in order to the subscription +await channel.subscribe('token', (message) => { + const token = message.data; + const responseId = message.extras?.headers?.responseId; + + if (!responseId) { + console.warn('Token missing responseId'); + return; + } + + // Skip tokens for responses already hydrated from database + if (completedResponses.has(responseId)) { + return; + } + + // Create an empty in-progress response + if (!inProgressResponses.has(responseId)) { + inProgressResponses.set(responseId, ''); + } + + // Append tokens for new responses + inProgressResponses.set(responseId, inProgressResponses.get(responseId) + token); +}); +``` + + +#### Hydrate using history + +Load completed responses from your database, then paginate backwards through history to catch up on in-progress responses until you reach a token that belongs to a response you've already loaded: + + +```javascript +// Load completed responses from database +const completedResponses = await loadResponsesFromDatabase(); + +// Use a channel in a namespace called 'persisted', which has persistence enabled +const channel = realtime.channels.get('persisted:{{RANDOM_CHANNEL_NAME}}'); + +// Track in progress responses by ID +const inProgressResponses = new Map(); + +// Subscribe to live tokens (implicitly attaches) +await channel.subscribe('token', (message) => { + const token = message.data; + const responseId = message.extras?.headers?.responseId; + + if (!responseId) { + console.warn('Token missing responseId'); + return; + } + + // Skip tokens for responses already hydrated from database + if (completedResponses.has(responseId)) { + return; + } + + // Create an empty in-progress response + if (!inProgressResponses.has(responseId)) { + inProgressResponses.set(responseId, ''); + } + + // Append live tokens for in-progress responses + inProgressResponses.set(responseId, inProgressResponses.get(responseId) + token); +}); + +// Paginate backwards through history until we encounter a hydrated response +let page = await channel.history({ untilAttach: true }); + +// Paginate backwards through history +let done = false; +while (page && !done) { + // Messages are newest-first, so prepend them to response + for (const message of page.items) { + const token = message.data; + const responseId = message.extras?.headers?.responseId; + + // Stop when we reach a response already loaded from database + if (completedResponses.has(responseId)) { + done = true; + break; + } + + // Create an empty in-progress response + if (!inProgressResponses.has(responseId)) { + inProgressResponses.set(responseId, ''); + } + + // Prepend historical tokens for in-progress responses + inProgressResponses.set(responseId, token + inProgressResponses.get(responseId)); + } + + // Move to next page if available + page = page.hasNext() ? await page.next() : null; +} +``` +