Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 21 additions & 26 deletions docs/streaming/dev-guide/part1.md
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ graph TB

| Developer provides: | ADK provides: | Live API provide: |
|---------------------|---------------|------------------|
| **Web / Mobile**: Frontend applications that users interact with, handling UI/UX, user input capture, and response display<br><br>**[WebSocket](https://developer.mozilla.org/en-US/docs/Web/API/WebSocket) / [SSE](https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events) Server**: Real-time communication server (such as [FastAPI](https://fastapi.tiangolo.com/)) that manages client connections, handles streaming protocols, and routes messages between clients and ADK<br><br>**`Agent`**: Custom AI agent definition with specific instructions, tools, and behavior tailored to your application's needs | **[LiveRequestQueue](https://github.com/google/adk-python/blob/960b206752918d13f127a9d6ed8d21d34bcbc7fa/src/google/adk/agents/live_request_queue.py)**: Message queue that buffers and sequences incoming user messages (text content, audio blobs, control signals) for orderly processing by the agent<br><br>**[Runner](https://github.com/google/adk-python/blob/960b206752918d13f127a9d6ed8d21d34bcbc7fa/src/google/adk/runners.py)**: Execution engine that orchestrates agent sessions, manages conversation state, and provides the `run_live()` streaming interface<br><br>**[RunConfig](https://github.com/google/adk-python/blob/960b206752918d13f127a9d6ed8d21d34bcbc7fa/src/google/adk/agents/run_config.py)**: Configuration for streaming behavior, modalities, and advanced features<br><br>**Internal components** (managed automatically, not directly used by developers): [LLM Flow](https://github.com/google/adk-python/blob/960b206752918d13f127a9d6ed8d21d34bcbc7fa/src/google/adk/flows/llm_flows/base_llm_flow.py) for processing pipeline and [GeminiLlmConnection](https://github.com/google/adk-python/blob/960b206752918d13f127a9d6ed8d21d34bcbc7fa/src/google/adk/models/gemini_llm_connection.py) for protocol translation | **[Gemini Live API](https://ai.google.dev/gemini-api/docs/live)** (via Google AI Studio) and **[Vertex AI Live API](https://cloud.google.com/vertex-ai/generative-ai/docs/live-api)** (via Google Cloud): Google's real-time language model services that process streaming input, generate responses, handle interruptions, support multimodal content (text, audio, video), and provide advanced AI capabilities like function calling and contextual understanding |
| **Web / Mobile**: Frontend applications that users interact with, handling UI/UX, user input capture, and response display<br><br>**[WebSocket](https://developer.mozilla.org/en-US/docs/Web/API/WebSocket) / [SSE](https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events) Server**: Real-time communication server (such as [FastAPI](https://fastapi.tiangolo.com/)) that manages client connections, handles streaming protocols, and routes messages between clients and ADK<br><br>**`Agent`**: Custom AI agent definition with specific instructions, tools, and behavior tailored to your application's needs | **[LiveRequestQueue](https://github.com/google/adk-python/blob/29c1115959b0084ac1169748863b35323da3cf50/src/google/adk/agents/live_request_queue.py)**: Message queue that buffers and sequences incoming user messages (text content, audio blobs, control signals) for orderly processing by the agent<br><br>**[Runner](https://github.com/google/adk-python/blob/29c1115959b0084ac1169748863b35323da3cf50/src/google/adk/runners.py)**: Execution engine that orchestrates agent sessions, manages conversation state, and provides the `run_live()` streaming interface<br><br>**[RunConfig](https://github.com/google/adk-python/blob/29c1115959b0084ac1169748863b35323da3cf50/src/google/adk/agents/run_config.py)**: Configuration for streaming behavior, modalities, and advanced features<br><br>**Internal components** (managed automatically, not directly used by developers): [LLM Flow](https://github.com/google/adk-python/blob/29c1115959b0084ac1169748863b35323da3cf50/src/google/adk/flows/llm_flows/base_llm_flow.py) for processing pipeline and [GeminiLlmConnection](https://github.com/google/adk-python/blob/29c1115959b0084ac1169748863b35323da3cf50/src/google/adk/models/gemini_llm_connection.py) for protocol translation | **[Gemini Live API](https://ai.google.dev/gemini-api/docs/live)** (via Google AI Studio) and **[Vertex AI Live API](https://cloud.google.com/vertex-ai/generative-ai/docs/live-api)** (via Google Cloud): Google's real-time language model services that process streaming input, generate responses, handle interruptions, support multimodal content (text, audio, video), and provide advanced AI capabilities like function calling and contextual understanding |

This architecture demonstrates ADK's clear separation of concerns: your application handles user interaction and transport protocols, ADK manages the streaming orchestration and state, and Live API provide the AI intelligence. By abstracting away the complexity of LLM-side streaming connection management, event loops, and protocol translation, ADK enables you to focus on building agent behavior and user experiences rather than streaming infrastructure.

Expand Down Expand Up @@ -414,19 +414,19 @@ These components are created once when your application starts and shared across

The `Agent` is the core of your streaming application—it defines what your AI can do, how it should behave, and which AI model powers it. You configure your agent with a specific model, tools it can use (like Google Search or custom APIs), and instructions that shape its personality and behavior.

```python title='Demo implementation: <a href="https://github.com/google/adk-samples/blob/4274c70ae3f4c68595f543ee504474747ea9f0da/python/agents/bidi-demo/app/google_search_agent/agent.py#L10-L15" target="_blank">agent.py:10-15</a>'
```python title='Demo implementation: <a href="https://github.com/google/adk-samples/blob/31847c0723fbf16ddf6eed411eb070d1c76afd1a/python/agents/bidi-demo/app/google_search_agent/agent.py#L10-L15" target="_blank">agent.py:10-15</a>'
"""Google Search Agent definition for ADK Bidi-streaming demo."""

import os
from google.adk.agents import Agent
from google.adk.tools import google_search

# Default models for Live API with native audio support:
# - Gemini Live API: gemini-2.5-flash-native-audio-preview-09-2025
# - Vertex AI Live API: gemini-live-2.5-flash-preview-native-audio-09-2025
# - Gemini Live API: gemini-2.5-flash-native-audio-preview-12-2025
# - Vertex AI Live API: gemini-live-2.5-flash-native-audio
agent = Agent(
name="google_search_agent",
model=os.getenv("DEMO_AGENT_MODEL", "gemini-2.5-flash-native-audio-preview-09-2025"),
model=os.getenv("DEMO_AGENT_MODEL", "gemini-2.5-flash-native-audio-preview-12-2025"),
tools=[google_search],
instruction="You are a helpful assistant that can search the web."
)
Expand All @@ -448,7 +448,7 @@ The ADK [Session](https://google.github.io/adk-docs/sessions/session/) manages c

To create a `Session`, or get an existing one for a specified `session_id`, every ADK application needs to have a [SessionService](https://google.github.io/adk-docs/sessions/session/#managing-sessions-with-a-sessionservice). For development purpose, ADK provides a simple `InMemorySessionService` that will lose the `Session` state when the application shuts down.

```python title='Demo implementation: <a href="https://github.com/google/adk-samples/blob/4274c70ae3f4c68595f543ee504474747ea9f0da/python/agents/bidi-demo/app/main.py#L37" target="_blank">main.py:37</a>'
```python title='Demo implementation: <a href="https://github.com/google/adk-samples/blob/31847c0723fbf16ddf6eed411eb070d1c76afd1a/python/agents/bidi-demo/app/main.py#L37" target="_blank">main.py:37</a>'
from google.adk.sessions import InMemorySessionService

# Define your session service
Expand All @@ -463,28 +463,23 @@ For production applications, choose a persistent session service based on your i
- You're building single-server apps (SQLite) or multi-server deployments (PostgreSQL/MySQL)
- You want full control over data storage and backups
- Examples:
- SQLite: `DatabaseSessionService(db_url="sqlite+aiosqlite:///./sessions.db")`
- SQLite: `DatabaseSessionService(db_url="sqlite:///./sessions.db")`
- PostgreSQL: `DatabaseSessionService(db_url="postgresql://user:pass@host/db")`

<div class="admonition note">
<p class="admonition-title">Note</p>
<p>For SQLite, use <code>sqlite+aiosqlite</code> instead of <code>sqlite</code> because <code>DatabaseSessionService</code> requires an async database driver.</p>
</div>

**Use `VertexAiSessionService` if:**

- You're already using Google Cloud Platform
- You want managed storage with built-in scalability
- You need tight integration with Vertex AI features
- Example: `VertexAiSessionService(project="my-project")`

Both provide session persistence capabilities—choose based on your infrastructure and scale requirements. With persistent session services, the state of the `Session` will be preserved even after application shutdown. See the [ADK Session Management documentation](https://google.github.io/adk-docs/sessions/ for more details.
Both provide session persistence capabilities—choose based on your infrastructure and scale requirements. With persistent session services, the state of the `Session` will be preserved even after application shutdown. See the [ADK Session Management documentation](https://google.github.io/adk-docs/sessions/) for more details.

#### Define Your Runner

The [Runner](https://google.github.io/adk-docs/runtime/) provides the runtime for the `Agent`. It manages the conversation flow, coordinates tool execution, handles events, and integrates with session storage. You create one runner instance at application startup and reuse it for all streaming sessions.

```python title='Demo implementation: <a href="https://github.com/google/adk-samples/blob/4274c70ae3f4c68595f543ee504474747ea9f0da/python/agents/bidi-demo/app/main.py#L50" target="_blank">main.py:50,53</a>'
```python title='Demo implementation: <a href="https://github.com/google/adk-samples/blob/31847c0723fbf16ddf6eed411eb070d1c76afd1a/python/agents/bidi-demo/app/main.py#L50" target="_blank">main.py:50,53</a>'
from google.adk.runners import Runner

APP_NAME = "bidi-demo"
Expand Down Expand Up @@ -540,7 +535,7 @@ This design enables scenarios like:

The recommended production pattern is to check if a session exists first, then create it only if needed. This approach safely handles both new sessions and conversation resumption:

```python title='Demo implementation: <a href="https://github.com/google/adk-samples/blob/4274c70ae3f4c68595f543ee504474747ea9f0da/python/agents/bidi-demo/app/main.py#L127-L133" target="_blank">main.py:127-133</a>'
```python title='Demo implementation: <a href="https://github.com/google/adk-samples/blob/31847c0723fbf16ddf6eed411eb070d1c76afd1a/python/agents/bidi-demo/app/main.py#L155-L161" target="_blank">main.py:155-161</a>'
# Get or create session (handles both new sessions and reconnections)
session = await session_service.get_session(
app_name=APP_NAME,
Expand All @@ -567,7 +562,7 @@ This pattern works correctly in all scenarios:

[RunConfig](part4.md) defines the streaming behavior for this specific session—which modalities to use (text or audio), whether to enable transcription, voice activity detection, proactivity, and other advanced features.

```python title='Demo implementation: <a href="https://github.com/google/adk-samples/blob/4274c70ae3f4c68595f543ee504474747ea9f0da/python/agents/bidi-demo/app/main.py#L98-L104" target="_blank">main.py:98-104</a>'
```python title='Demo implementation: <a href="https://github.com/google/adk-samples/blob/31847c0723fbf16ddf6eed411eb070d1c76afd1a/python/agents/bidi-demo/app/main.py#L110-L124" target="_blank">main.py:110-124</a>'
from google.adk.agents.run_config import RunConfig, StreamingMode
from google.genai import types

Expand All @@ -588,7 +583,7 @@ run_config = RunConfig(

`LiveRequestQueue` is the communication channel for sending messages to the agent during streaming. It's a thread-safe async queue that buffers user messages (text content, audio blobs, activity signals) for orderly processing.

```python title='Demo implementation: <a href="https://github.com/google/adk-samples/blob/4274c70ae3f4c68595f543ee504474747ea9f0da/python/agents/bidi-demo/app/main.py#L135" target="_blank">main.py:135</a>'
```python title='Demo implementation: <a href="https://github.com/google/adk-samples/blob/31847c0723fbf16ddf6eed411eb070d1c76afd1a/python/agents/bidi-demo/app/main.py#L163" target="_blank">main.py:163</a>'
from google.adk.agents.live_request_queue import LiveRequestQueue

live_request_queue = LiveRequestQueue()
Expand All @@ -600,7 +595,7 @@ live_request_queue = LiveRequestQueue()

Never reuse a `LiveRequestQueue` across multiple streaming sessions. Each call to `run_live()` requires a fresh queue. Reusing queues can cause message ordering issues and state corruption.

The close signal persists in the queue (see [`live_request_queue.py:59-60`](https://github.com/google/adk-python/blob/960b206752918d13f127a9d6ed8d21d34bcbc7fa/src/google/adk/agents/live_request_queue.py#L59-L60)) and terminates the sender loop (see [`base_llm_flow.py:264-266`](https://github.com/google/adk-python/blob/960b206752918d13f127a9d6ed8d21d34bcbc7fa/src/google/adk/flows/llm_flows/base_llm_flow.py#L264-L266)). Reusing a queue would carry over this signal and any remaining messages from the previous session.
The close signal persists in the queue (see [`live_request_queue.py:59-60`](https://github.com/google/adk-python/blob/29c1115959b0084ac1169748863b35323da3cf50/src/google/adk/agents/live_request_queue.py#L59-L60)) and terminates the sender loop (see [`base_llm_flow.py:264-266`](https://github.com/google/adk-python/blob/29c1115959b0084ac1169748863b35323da3cf50/src/google/adk/flows/llm_flows/base_llm_flow.py#L264-L266)). Reusing a queue would carry over this signal and any remaining messages from the previous session.

### Phase 3: Bidi-streaming with `run_live()` event loop

Expand All @@ -610,7 +605,7 @@ Once the streaming loop is running, you can send messages to the agent and recei

Use `LiveRequestQueue` methods to send different types of messages to the agent during the streaming session:

```python title='Demo implementation: <a href="https://github.com/google/adk-samples/blob/4274c70ae3f4c68595f543ee504474747ea9f0da/python/agents/bidi-demo/app/main.py#L141-L189" target="_blank">main.py:141-189</a>'
```python title='Demo implementation: <a href="https://github.com/google/adk-samples/blob/31847c0723fbf16ddf6eed411eb070d1c76afd1a/python/agents/bidi-demo/app/main.py#L169-L217" target="_blank">main.py:169-217</a>'
from google.genai import types

# Send text content
Expand All @@ -633,7 +628,7 @@ See [Part 2: Sending messages with LiveRequestQueue](part2.md) for detailed API

The `run_live()` async generator continuously yields `Event` objects as the agent processes input and generates responses. Each event represents a discrete occurrence—partial text generation, audio chunks, tool execution, transcription, interruption, or turn completion.

```python title='Demo implementation: <a href="https://github.com/google/adk-samples/blob/4274c70ae3f4c68595f543ee504474747ea9f0da/python/agents/bidi-demo/app/main.py#L191-L206" target="_blank">main.py:191-206</a>'
```python title='Demo implementation: <a href="https://github.com/google/adk-samples/blob/31847c0723fbf16ddf6eed411eb070d1c76afd1a/python/agents/bidi-demo/app/main.py#L219-L234" target="_blank">main.py:219-234</a>'
async for event in runner.run_live(
user_id=user_id,
session_id=session_id,
Expand All @@ -656,7 +651,7 @@ When the streaming session should end (user disconnects, conversation completes,

Send a close signal through the queue to terminate the streaming loop:

```python title='Demo implementation: <a href="https://github.com/google/adk-samples/blob/4274c70ae3f4c68595f543ee504474747ea9f0da/python/agents/bidi-demo/app/main.py#L225" target="_blank">main.py:225</a>'
```python title='Demo implementation: <a href="https://github.com/google/adk-samples/blob/31847c0723fbf16ddf6eed411eb070d1c76afd1a/python/agents/bidi-demo/app/main.py#L253" target="_blank">main.py:253</a>'
live_request_queue.close()
```

Expand All @@ -668,7 +663,7 @@ Here's a complete FastAPI WebSocket application showing all four phases integrat

!!! note "Complete Demo Implementation"

For the production-ready implementation with multimodal support (text, audio, image), see the complete [`main.py`](https://github.com/google/adk-samples/blob/4274c70ae3f4c68595f543ee504474747ea9f0da/python/agents/bidi-demo/app/main.py) file.
For the production-ready implementation with multimodal support (text, audio, image), see the complete [`main.py`](https://github.com/google/adk-samples/blob/31847c0723fbf16ddf6eed411eb070d1c76afd1a/python/agents/bidi-demo/app/main.py) file.

**Complete Implementation:**

Expand Down Expand Up @@ -802,7 +797,7 @@ async def websocket_endpoint(websocket: WebSocket, user_id: str, session_id: str

The upstream task continuously receives messages from the WebSocket client and forwards them to the `LiveRequestQueue`. This enables the user to send messages to the agent at any time, even while the agent is generating a response.

```python title='Demo implementation: <a href="https://github.com/google/adk-samples/blob/4274c70ae3f4c68595f543ee504474747ea9f0da/python/agents/bidi-demo/app/main.py#L141-L189" target="_blank">main.py:141-189</a>'
```python title='Demo implementation: <a href="https://github.com/google/adk-samples/blob/31847c0723fbf16ddf6eed411eb070d1c76afd1a/python/agents/bidi-demo/app/main.py#L169-L217" target="_blank">main.py:169-217</a>'
async def upstream_task() -> None:
"""Receives messages from WebSocket and sends to LiveRequestQueue."""
try:
Expand All @@ -818,7 +813,7 @@ async def upstream_task() -> None:

The downstream task continuously receives `Event` objects from `run_live()` and sends them to the WebSocket client. This streams the agent's responses, tool executions, transcriptions, and other events to the user in real-time.

```python title='Demo implementation: <a href="https://github.com/google/adk-samples/blob/4274c70ae3f4c68595f543ee504474747ea9f0da/python/agents/bidi-demo/app/main.py#L191-L206" target="_blank">main.py:191-206</a>'
```python title='Demo implementation: <a href="https://github.com/google/adk-samples/blob/31847c0723fbf16ddf6eed411eb070d1c76afd1a/python/agents/bidi-demo/app/main.py#L219-L234" target="_blank">main.py:219-234</a>'
async def downstream_task() -> None:
"""Receives Events from run_live() and sends to WebSocket."""
async for event in runner.run_live(
Expand All @@ -836,7 +831,7 @@ async def downstream_task() -> None:

Both tasks run concurrently using `asyncio.gather()`, enabling true Bidi-streaming. The `try/finally` block ensures `LiveRequestQueue.close()` is called even if exceptions occur, minimizing the session resource usage.

```python title='Demo implementation: <a href="https://github.com/google/adk-samples/blob/4274c70ae3f4c68595f543ee504474747ea9f0da/python/agents/bidi-demo/app/main.py#L210-L225" target="_blank">main.py:210-225</a>'
```python title='Demo implementation: <a href="https://github.com/google/adk-samples/blob/31847c0723fbf16ddf6eed411eb070d1c76afd1a/python/agents/bidi-demo/app/main.py#L238-L253" target="_blank">main.py:238-253</a>'
try:
await asyncio.gather(
upstream_task(),
Expand Down Expand Up @@ -916,4 +911,4 @@ In this introduction, you learned how ADK transforms complex real-time streaming

---

[Next: Part 2 - Sending Messages with LiveRequestQueue](part2.md) →
[Next: Part 2: Sending Messages with LiveRequestQueue](part2.md) →
Loading
Loading