diff --git a/src/content/docs/agents/api-reference/run-workflows.mdx b/src/content/docs/agents/api-reference/run-workflows.mdx index 8653145a76ed314..e5a24ba26383713 100644 --- a/src/content/docs/agents/api-reference/run-workflows.mdx +++ b/src/content/docs/agents/api-reference/run-workflows.mdx @@ -106,3 +106,7 @@ You can also call a Workflow that is defined in a different Workers script from Refer to the [cross-script calls](/workflows/build/workers-api/#cross-script-calls) section of the Workflows documentation for more examples. + +## Complete example + +For a complete example showing how to build a Task Runner Agent with Workflow integration, including progress tracking and real-time updates, refer to [Build a Task Runner with Workflows](/agents/guides/task-runner-workflows/). diff --git a/src/content/docs/agents/guides/task-runner-workflows.mdx b/src/content/docs/agents/guides/task-runner-workflows.mdx new file mode 100644 index 000000000000000..b0cb2d18e1136de --- /dev/null +++ b/src/content/docs/agents/guides/task-runner-workflows.mdx @@ -0,0 +1,584 @@ +--- +pcx_content_type: how-to +title: Build a Task Runner with Workflows +sidebar: + order: 6 +description: Learn how to integrate Cloudflare Workflows with Agents to run long-running background tasks with automatic retries and progress tracking +--- + +import { PackageManagers, WranglerConfig, TypeScriptExample } from "~/components"; + +## Build a Task Runner with Workflows Integration + +This guide will show you how to integrate Cloudflare Workflows with Agents to handle long-running background tasks. You will learn two different patterns for running tasks: quick analysis that runs directly in the Agent, and deep analysis that uses durable Workflows for complex, multi-step operations. + +Your Task Runner Agent will be able to: + +- Execute quick tasks directly in the Agent (under 30 seconds) +- Dispatch long-running tasks to durable Workflows +- Track task progress with real-time updates via WebSockets +- Handle automatic retries and error recovery with Workflows +- Maintain persistent task state across sessions + +This pattern is ideal for scenarios like processing uploaded files, analyzing repositories, generating reports, or running any multi-step operation that requires durability and automatic retry logic. + +You can view the full code for this example [on GitHub](https://github.com/cloudflare/agents/tree/main/examples/task-runner). + +## Prerequisites + +Before you begin, you will need: + +- A [Cloudflare account](https://dash.cloudflare.com/sign-up) +- [Node.js](https://nodejs.org/) installed (v18 or later) +- An [OpenAI API key](https://platform.openai.com/api-keys) (or another LLM provider) + +## Understanding the Architecture + +The task runner uses two distinct approaches for background work: + +### Quick Analysis (Agent-based) + +Runs directly in the Agent using Durable Objects. Best for operations under 30 seconds that do not require durability across Worker restarts. + +### Deep Analysis (Workflow-based) + +Runs in the Cloudflare Workflow engine. Best for long-running operations (minutes to days) that need automatic retries, step-based checkpointing, and durability across restarts. + +| Feature | Quick Analysis (Agent) | Deep Analysis (Workflow) | +| ---------- | ---------------------- | ------------------------ | +| Duration | Seconds to minutes | Minutes to days | +| Execution | In Durable Object | Separate Workflow engine | +| Durability | Lost on DO eviction | Survives restarts | +| Retries | Manual | Automatic per-step | +| Sleep | Not durable | Durable (can wait hours) | +| Complexity | Simple | More setup required | + +## 1. Create your project + +1. Create a new project for your Task Runner Agent: + + + +2. Navigate into your project: + +```sh +cd task-runner +``` + +3. Install the required dependencies: + +```sh +npm install openai react react-dom +``` + +## 2. Set up your environment variables + +1. Create a `.dev.vars` file in your project root for local development secrets: + +```sh +touch .dev.vars +``` + +2. Add your credentials to `.dev.vars`: + +```sh +OPENAI_API_KEY="your-openai-api-key" +``` + +3. Configure your `wrangler.jsonc` to set up both the Agent and Workflow: + + + +```jsonc +{ + "name": "task-runner-example", + "main": "src/server.ts", + "compatibility_date": "2025-02-11", + "compatibility_flags": ["nodejs_compat", "enable_ctx_exports"], + "observability": { + "enabled": true + }, + "assets": { + "directory": "./dist", + "binding": "ASSETS" + }, + "durable_objects": { + "bindings": [ + { + "name": "task-runner", + "class_name": "TaskRunner" + } + ] + }, + "migrations": [ + { + "tag": "v1", + "new_sqlite_classes": ["TaskRunner"] + } + ], + "workflows": [ + { + "name": "analysis-workflow", + "binding": "ANALYSIS_WORKFLOW", + "class_name": "AnalysisWorkflow" + } + ] +} +``` + + + +The key configuration elements are: + +- **durable_objects.bindings**: Defines the Agent Durable Object +- **workflows**: Defines the Workflow binding that the Agent can call +- **enable_ctx_exports**: Required for Workflow integration + +## 3. Implement the Agent + +Create your Agent at `src/server.ts`. The Agent will handle both quick and workflow-based analysis: + + + +```ts +import { + Agent, + callable +} from "@cloudflare/agents"; +import OpenAI from "openai"; + +interface Env { + "task-runner": DurableObjectNamespace; + ANALYSIS_WORKFLOW: Workflow; + OPENAI_API_KEY: string; +} + +interface TaskState { + id: string; + status: "pending" | "running" | "completed" | "failed"; + progress?: number; + result?: AnalysisResult; + error?: string; + workflowInstanceId?: string; + events: Array<{ type: string; data?: unknown; timestamp: number }>; +} + +export class TaskRunner extends Agent< + Env, + { tasks: Record } +> { + private openai: OpenAI | null = null; + + initialState = { tasks: {} as Record }; + + private getOpenAI(): OpenAI { + if (!this.openai) { + this.openai = new OpenAI({ + apiKey: this.env.OPENAI_API_KEY + }); + } + return this.openai; + } + + // Quick analysis - runs in Agent + @callable() + async quickAnalysis(input: { + repoUrl: string; + branch?: string; + }): Promise<{ id: string }> { + const taskId = `task_${crypto.randomUUID().slice(0, 12)}`; + const { repoUrl, branch = "main" } = input; + + // Track the task + const task: TaskState = { + id: taskId, + status: "running", + progress: 0, + events: [{ type: "started", timestamp: Date.now() }] + }; + this.setState({ + ...this.state, + tasks: { ...this.state.tasks, [taskId]: task } + }); + + // Run analysis in background + this.runQuickAnalysis(taskId, repoUrl, branch).catch((error) => { + console.error("[TaskRunner] Quick analysis failed:", error); + this.updateTask(taskId, { + status: "failed", + error: error instanceof Error ? error.message : String(error) + }); + }); + + return { id: taskId }; + } + + // Deep analysis - dispatches to Workflow + @callable() + async startAnalysis(input: { repoUrl: string; branch?: string }) { + const taskId = `task_${crypto.randomUUID().slice(0, 12)}`; + + // Create workflow instance + const instance = await this.env.ANALYSIS_WORKFLOW.create({ + id: taskId, + params: { + repoUrl: input.repoUrl, + branch: input.branch || "main", + // Pass agent info for callbacks + _agentBinding: "task-runner", + _agentName: this.name || "default" + } + }); + + // Track the task in state + const task: TaskState = { + id: taskId, + status: "pending", + workflowInstanceId: instance.id, + events: [{ type: "workflow-started", timestamp: Date.now() }] + }; + + this.setState({ + ...this.state, + tasks: { ...this.state.tasks, [taskId]: task } + }); + + return { id: taskId, workflowInstanceId: instance.id }; + } + + // Handle updates from the workflow + @callable() + async handleWorkflowUpdate(update: { + taskId: string; + status?: string; + progress?: number; + result?: AnalysisResult; + error?: string; + event?: { type: string; data?: unknown }; + }) { + const task = this.state.tasks[update.taskId]; + if (!task) { + console.warn(`[TaskRunner] Task ${update.taskId} not found`); + return; + } + + const events = task.events || []; + if (update.event) { + events.push({ ...update.event, timestamp: Date.now() }); + } + + this.setState({ + ...this.state, + tasks: { + ...this.state.tasks, + [update.taskId]: { + ...task, + status: (update.status as TaskState["status"]) || task.status, + progress: update.progress ?? task.progress, + result: update.result || task.result, + error: update.error || task.error, + events + } + } + }); + } + + private updateTask( + taskId: string, + updates: Partial> + ) { + const task = this.state.tasks[taskId]; + if (!task) return; + + this.setState({ + ...this.state, + tasks: { + ...this.state.tasks, + [taskId]: { ...task, ...updates } + } + }); + } + + private async runQuickAnalysis( + taskId: string, + repoUrl: string, + branch: string + ) { + // Implementation details... + // Fetch repository, analyze with AI, update progress + } +} +``` + + + +Key implementation details: + +- **@callable() decorator**: Exposes methods that can be called from the client +- **initialState**: Defines the initial state structure for task tracking +- **quickAnalysis**: Runs tasks directly in the Agent without durability guarantees +- **startAnalysis**: Creates a Workflow instance for durable execution +- **handleWorkflowUpdate**: Receives progress updates from the Workflow + +## 4. Implement the Workflow + +Create your Workflow at `src/workflows/analysis.ts`. The Workflow handles the actual long-running analysis: + + + +```ts +import { + WorkflowEntrypoint, + WorkflowStep, + WorkflowEvent +} from "cloudflare:workers"; + +interface Env { + "task-runner": DurableObjectNamespace; + OPENAI_API_KEY: string; +} + +interface Params { + repoUrl: string; + branch: string; + _agentBinding: string; + _agentName: string; +} + +export class AnalysisWorkflow extends WorkflowEntrypoint { + async run(event: WorkflowEvent, step: WorkflowStep) { + const { repoUrl, branch, _agentBinding, _agentName } = event.payload; + + // Get Agent stub for sending updates + const agentId = this.env[_agentBinding].idFromName(_agentName); + const agent = this.env[_agentBinding].get(agentId); + + try { + // Step 1: Fetch repository data (durable step) + const files = await step.do("fetch-repo", async () => { + await this.sendUpdate(agent, event.id, { + status: "running", + progress: 10, + event: { type: "phase", data: { name: "fetching" } } + }); + + // Fetch files from GitHub API + const repoFiles = await this.fetchRepoTree(repoUrl, branch); + return repoFiles; + }); + + // Step 2: Read key files (durable step) + const keyFiles = await step.do("read-files", async () => { + await this.sendUpdate(agent, event.id, { + progress: 30, + event: { type: "phase", data: { name: "reading" } } + }); + + const contents = await this.fetchKeyFiles(repoUrl, branch, files); + return contents; + }); + + // Step 3: Analyze with AI (durable step with retry) + const analysis = await step.do( + "analyze", + { + retries: { limit: 3, backoff: "exponential" } + }, + async () => { + await this.sendUpdate(agent, event.id, { + progress: 50, + event: { type: "phase", data: { name: "analyzing" } } + }); + + const result = await this.analyzeWithAI(repoUrl, files, keyFiles); + return result; + } + ); + + // Step 4: Send completion + await this.sendUpdate(agent, event.id, { + status: "completed", + progress: 100, + result: { + repoUrl, + branch, + ...analysis, + fileCount: files.length, + analyzedAt: new Date().toISOString() + } + }); + + return analysis; + } catch (error) { + // Send error to Agent + await this.sendUpdate(agent, event.id, { + status: "failed", + error: error instanceof Error ? error.message : String(error) + }); + throw error; + } + } + + private async sendUpdate( + agent: DurableObjectStub, + taskId: string, + update: { + status?: string; + progress?: number; + result?: unknown; + error?: string; + event?: { type: string; data?: unknown }; + } + ) { + try { + await agent.fetch("https://agent/call", { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ + method: "handleWorkflowUpdate", + args: [{ taskId, ...update }] + }) + }); + } catch (error) { + console.error("[AnalysisWorkflow] Failed to send update:", error); + } + } + + private async fetchRepoTree(repoUrl: string, branch: string) { + // Implementation details... + } + + private async fetchKeyFiles( + repoUrl: string, + branch: string, + files: Array<{ path: string }> + ) { + // Implementation details... + } + + private async analyzeWithAI( + repoUrl: string, + files: unknown[], + keyFiles: unknown[] + ) { + // Implementation details... + } +} +``` + + + +Key Workflow features: + +- **step.do()**: Creates durable checkpoints that survive restarts +- **step.sleep()**: Allows durable delays (not shown in example, but available) +- **Retry configuration**: Automatic retries with exponential backoff +- **Agent callbacks**: Sends progress updates back to the Agent via RPC + +## 5. Build the React client + +Create a React client at `src/App.tsx` that connects to the Agent and displays real-time task progress: + +```tsx +import { useState, useCallback } from "react"; +import { useAgent } from "@cloudflare/agents/react"; + +function App() { + const [repoUrl, setRepoUrl] = useState("https://github.com/cloudflare/agents"); + const [branch, setBranch] = useState("main"); + + // Connect to agent with state updates + const agent = useAgent({ + agent: "task-runner", + name: "default", + onStateUpdate: (state) => { + setAgentState(state); + } + }); + + // Start quick analysis + const startQuickAnalysis = useCallback(async () => { + await agent.call("quickAnalysis", [{ repoUrl, branch }]); + }, [agent, repoUrl, branch]); + + // Start deep analysis with workflow + const startDeepAnalysis = useCallback(async () => { + await agent.call("startAnalysis", [{ repoUrl, branch }]); + }, [agent, repoUrl, branch]); + + return ( +
+

Repo Analyzer

+ setRepoUrl(e.target.value)} + placeholder="https://github.com/owner/repo" + /> + setBranch(e.target.value)} + placeholder="main" + /> + + + {/* Display tasks and progress */} +
+ ); +} +``` + +The `useAgent` hook: + +- Establishes a WebSocket connection to the Agent +- Receives real-time state updates as tasks progress +- Allows calling Agent methods like `quickAnalysis` and `startAnalysis` + +## 6. Deploy your Task Runner + +1. Run the development server to test locally: + +```sh +npm run dev +``` + +2. Deploy to Cloudflare: + +```sh +npm run deploy +``` + +3. Set your production secrets: + +```sh +npx wrangler secret put OPENAI_API_KEY +``` + +## When to use each pattern + +### Use Quick Analysis (Agent-based) when: + +- Tasks complete in under 30 seconds +- You do not need durability across Worker restarts +- The task is simple and does not require retry logic +- You want minimal setup and complexity + +### Use Deep Analysis (Workflow-based) when: + +- Tasks take minutes to days to complete +- You need automatic retries on failure +- You need step-based checkpointing +- You want to use durable sleep for rate limiting +- The task involves multiple API calls that could fail + +## Related resources + +- [Run Workflows API reference](/agents/api-reference/run-workflows/) +- [Cloudflare Workflows documentation](/workflows/) +- [Agent class reference](/agents/concepts/agent-class/) +- [Store and sync state](/agents/api-reference/store-and-sync-state/)