diff --git a/openai_agents/agent_patterns/README.md b/openai_agents/agent_patterns/README.md index 26867f10..33784747 100644 --- a/openai_agents/agent_patterns/README.md +++ b/openai_agents/agent_patterns/README.md @@ -4,6 +4,8 @@ Common agentic patterns extended with Temporal's durable execution capabilities. *Adapted from [OpenAI Agents SDK agent patterns](https://github.com/openai/openai-agents-python/tree/main/examples/agent_patterns)* +Before running these examples, be sure to review the [prerequisites and background on the integration](../README.md). + ## Running the Examples First, start the worker (supports all patterns): @@ -13,56 +15,83 @@ uv run openai_agents/agent_patterns/run_worker.py Then run individual examples in separate terminals: -## Deterministic Flows - -**TODO** - -A common tactic is to break down a task into a series of smaller steps. Each task can be performed by an agent, and the output of one agent is used as input to the next. For example, if your task was to generate a story, you could break it down into the following steps: - -1. Generate an outline -2. Generate the story -3. Generate the ending - -Each of these steps can be performed by an agent. The output of one agent is used as input to the next. - -## Handoffs and Routing - -**TODO** - -In many situations, you have specialized sub-agents that handle specific tasks. You can use handoffs to route the task to the right agent. +### Deterministic Flows +Sequential agent execution with validation gates - demonstrates breaking complex tasks into smaller steps: +```bash +uv run openai_agents/agent_patterns/run_deterministic_workflow.py +``` -For example, you might have a frontline agent that receives a request, and then hands off to a specialized agent based on the language of the request. +### Parallelization +Run multiple agents in parallel and select the best result - useful for improving quality or reducing latency: +```bash +uv run openai_agents/agent_patterns/run_parallelization_workflow.py +``` -## Agents as Tools +### LLM-as-a-Judge +Iterative improvement using feedback loops - generate content, evaluate it, and improve until satisfied: +```bash +uv run openai_agents/agent_patterns/run_llm_as_a_judge_workflow.py +``` -The mental model for handoffs is that the new agent "takes over". It sees the previous conversation history, and owns the conversation from that point onwards. However, this is not the only way to use agents. You can also use agents as a tool - the tool agent goes off and runs on its own, and then returns the result to the original agent. +### Agents as Tools +Use agents as callable tools within other agents - enables composition and specialized task delegation: +```bash +uv run openai_agents/agent_patterns/run_agents_as_tools_workflow.py +``` -For example, you could model a translation task as tool calls instead: rather than handing over to the language-specific agent, you could call the agent as a tool, and then use the result in the next step. This enables things like translating multiple languages at once. +### Agent Routing and Handoffs +Route requests to specialized agents based on content analysis (adapted for non-streaming): +```bash +uv run openai_agents/agent_patterns/run_routing_workflow.py +``` +### Input Guardrails +Pre-execution validation to prevent unwanted requests - demonstrates safety mechanisms: ```bash -uv run openai_agents/agent_patterns/run_agents_as_tools_workflow.py +uv run openai_agents/agent_patterns/run_input_guardrails_workflow.py ``` -## LLM-as-a-Judge +### Output Guardrails +Post-execution validation to detect sensitive content - ensures safe responses: +```bash +uv run openai_agents/agent_patterns/run_output_guardrails_workflow.py +``` -**TODO** +### Forcing Tool Use +Control tool execution strategies - choose between different approaches to tool usage: +```bash +uv run openai_agents/agent_patterns/run_forcing_tool_use_workflow.py +``` -LLMs can often improve the quality of their output if given feedback. A common pattern is to generate a response using a model, and then use a second model to provide feedback. You can even use a small model for the initial generation and a larger model for the feedback, to optimize cost. +## Pattern Details -For example, you could use an LLM to generate an outline for a story, and then use a second LLM to evaluate the outline and provide feedback. You can then use the feedback to improve the outline, and repeat until the LLM is satisfied with the outline. +### Deterministic Flows +A common tactic is to break down a task into a series of smaller steps. Each task can be performed by an agent, and the output of one agent is used as input to the next. For example, if your task was to generate a story, you could break it down into the following steps: -## Parallelization +1. Generate an outline +2. Check outline quality and genre +3. Write the story (only if outline passes validation) -**TODO** +Each of these steps can be performed by an agent. The output of one agent is used as input to the next. +### Parallelization Running multiple agents in parallel is a common pattern. This can be useful for both latency (e.g. if you have multiple steps that don't depend on each other) and also for other reasons e.g. generating multiple responses and picking the best one. -## Guardrails +### LLM-as-a-Judge +LLMs can often improve the quality of their output if given feedback. A common pattern is to generate a response using a model, and then use a second model to provide feedback. You can even use a small model for the initial generation and a larger model for the feedback, to optimize cost. -**TODO** +### Agents as Tools +The mental model for handoffs is that the new agent "takes over". It sees the previous conversation history, and owns the conversation from that point onwards. However, this is not the only way to use agents. You can also use agents as a tool - the tool agent goes off and runs on its own, and then returns the result to the original agent. +### Guardrails Related to parallelization, you often want to run input guardrails to make sure the inputs to your agents are valid. For example, if you have a customer support agent, you might want to make sure that the user isn't trying to ask for help with a math problem. You can definitely do this without any special Agents SDK features by using parallelization, but we support a special guardrail primitive. Guardrails can have a "tripwire" - if the tripwire is triggered, the agent execution will immediately stop and a `GuardrailTripwireTriggered` exception will be raised. -This is really useful for latency: for example, you might have a very fast model that runs the guardrail and a slow model that runs the actual agent. You wouldn't want to wait for the slow model to finish, so guardrails let you quickly reject invalid inputs. \ No newline at end of file +This is really useful for latency: for example, you might have a very fast model that runs the guardrail and a slow model that runs the actual agent. You wouldn't want to wait for the slow model to finish, so guardrails let you quickly reject invalid inputs. + +## Omitted Examples + +The following patterns from the [reference repository](https://github.com/openai/openai-agents-python/tree/main/examples/agent_patterns) are not included in this Temporal adaptation: + +- **Streaming Guardrails**: Requires streaming capabilities which are not yet available in the Temporal integration \ No newline at end of file diff --git a/openai_agents/agent_patterns/run_agents_as_tools_workflow.py b/openai_agents/agent_patterns/run_agents_as_tools_workflow.py index a42d3aac..8b0c1345 100644 --- a/openai_agents/agent_patterns/run_agents_as_tools_workflow.py +++ b/openai_agents/agent_patterns/run_agents_as_tools_workflow.py @@ -20,9 +20,9 @@ async def main(): # Execute a workflow result = await client.execute_workflow( AgentsAsToolsWorkflow.run, - "Translate to English: '¿Cómo estás?'", - id="my-workflow-id", - task_queue="openai-agents-task-queue", + "Please translate 'Good morning, how are you?' to Spanish and French", + id="agents-as-tools-workflow-example", + task_queue="openai-agents-patterns-task-queue", ) print(f"Result: {result}") diff --git a/openai_agents/agent_patterns/run_deterministic_workflow.py b/openai_agents/agent_patterns/run_deterministic_workflow.py new file mode 100644 index 00000000..abb2e4de --- /dev/null +++ b/openai_agents/agent_patterns/run_deterministic_workflow.py @@ -0,0 +1,31 @@ +import asyncio + +from temporalio.client import Client +from temporalio.contrib.openai_agents import OpenAIAgentsPlugin + +from openai_agents.agent_patterns.workflows.deterministic_workflow import ( + DeterministicWorkflow, +) + + +async def main(): + # Create client connected to server at the given address + client = await Client.connect( + "localhost:7233", + plugins=[ + OpenAIAgentsPlugin(), + ], + ) + + # Execute a workflow + result = await client.execute_workflow( + DeterministicWorkflow.run, + "Write a science fiction story about time travel", + id="deterministic-workflow-example", + task_queue="openai-agents-patterns-task-queue", + ) + print(f"Result: {result}") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/openai_agents/agent_patterns/run_forcing_tool_use_workflow.py b/openai_agents/agent_patterns/run_forcing_tool_use_workflow.py new file mode 100644 index 00000000..5dfa42c5 --- /dev/null +++ b/openai_agents/agent_patterns/run_forcing_tool_use_workflow.py @@ -0,0 +1,50 @@ +import asyncio + +from temporalio.client import Client +from temporalio.contrib.openai_agents import OpenAIAgentsPlugin + +from openai_agents.agent_patterns.workflows.forcing_tool_use_workflow import ( + ForcingToolUseWorkflow, +) + + +async def main(): + # Create client connected to server at the given address + client = await Client.connect( + "localhost:7233", + plugins=[ + OpenAIAgentsPlugin(), + ], + ) + + # Execute workflows with different tool use behaviors + print("Testing default behavior:") + result1 = await client.execute_workflow( + ForcingToolUseWorkflow.run, + "default", + id="forcing-tool-use-workflow-default", + task_queue="openai-agents-patterns-task-queue", + ) + print(f"Default result: {result1}") + + print("\nTesting first_tool behavior:") + result2 = await client.execute_workflow( + ForcingToolUseWorkflow.run, + "first_tool", + id="forcing-tool-use-workflow-first-tool", + task_queue="openai-agents-patterns-task-queue", + ) + print(f"First tool result: {result2}") + + print("\nTesting custom behavior:") + result3 = await client.execute_workflow( + ForcingToolUseWorkflow.run, + "custom", + id="forcing-tool-use-workflow-custom", + task_queue="openai-agents-patterns-task-queue", + ) + print(f"Custom result: {result3}") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/openai_agents/agent_patterns/run_input_guardrails_workflow.py b/openai_agents/agent_patterns/run_input_guardrails_workflow.py new file mode 100644 index 00000000..82536e26 --- /dev/null +++ b/openai_agents/agent_patterns/run_input_guardrails_workflow.py @@ -0,0 +1,40 @@ +import asyncio + +from temporalio.client import Client +from temporalio.contrib.openai_agents import OpenAIAgentsPlugin + +from openai_agents.agent_patterns.workflows.input_guardrails_workflow import ( + InputGuardrailsWorkflow, +) + + +async def main(): + # Create client connected to server at the given address + client = await Client.connect( + "localhost:7233", + plugins=[ + OpenAIAgentsPlugin(), + ], + ) + + # Execute a workflow with a normal question (should pass) + result1 = await client.execute_workflow( + InputGuardrailsWorkflow.run, + "What's the capital of California?", + id="input-guardrails-workflow-normal", + task_queue="openai-agents-patterns-task-queue", + ) + print(f"Normal question result: {result1}") + + # Execute a workflow with a math homework question (should be blocked) + result2 = await client.execute_workflow( + InputGuardrailsWorkflow.run, + "Can you help me solve for x: 2x + 5 = 11?", + id="input-guardrails-workflow-blocked", + task_queue="openai-agents-patterns-task-queue", + ) + print(f"Math homework result: {result2}") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/openai_agents/agent_patterns/run_llm_as_a_judge_workflow.py b/openai_agents/agent_patterns/run_llm_as_a_judge_workflow.py new file mode 100644 index 00000000..aa6d97a6 --- /dev/null +++ b/openai_agents/agent_patterns/run_llm_as_a_judge_workflow.py @@ -0,0 +1,31 @@ +import asyncio + +from temporalio.client import Client +from temporalio.contrib.openai_agents import OpenAIAgentsPlugin + +from openai_agents.agent_patterns.workflows.llm_as_a_judge_workflow import ( + LLMAsAJudgeWorkflow, +) + + +async def main(): + # Create client connected to server at the given address + client = await Client.connect( + "localhost:7233", + plugins=[ + OpenAIAgentsPlugin(), + ], + ) + + # Execute a workflow + result = await client.execute_workflow( + LLMAsAJudgeWorkflow.run, + "A thrilling adventure story about pirates searching for treasure", + id="llm-as-a-judge-workflow-example", + task_queue="openai-agents-patterns-task-queue", + ) + print(f"Result: {result}") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/openai_agents/agent_patterns/run_output_guardrails_workflow.py b/openai_agents/agent_patterns/run_output_guardrails_workflow.py new file mode 100644 index 00000000..16d64764 --- /dev/null +++ b/openai_agents/agent_patterns/run_output_guardrails_workflow.py @@ -0,0 +1,40 @@ +import asyncio + +from temporalio.client import Client +from temporalio.contrib.openai_agents import OpenAIAgentsPlugin + +from openai_agents.agent_patterns.workflows.output_guardrails_workflow import ( + OutputGuardrailsWorkflow, +) + + +async def main(): + # Create client connected to server at the given address + client = await Client.connect( + "localhost:7233", + plugins=[ + OpenAIAgentsPlugin(), + ], + ) + + # Execute a workflow with a normal question (should pass) + result1 = await client.execute_workflow( + OutputGuardrailsWorkflow.run, + "What's the capital of California?", + id="output-guardrails-workflow-normal", + task_queue="openai-agents-patterns-task-queue", + ) + print(f"Normal question result: {result1}") + + # Execute a workflow with input that might trigger sensitive data output + result2 = await client.execute_workflow( + OutputGuardrailsWorkflow.run, + "My phone number is 650-123-4567. Where do you think I live?", + id="output-guardrails-workflow-sensitive", + task_queue="openai-agents-patterns-task-queue", + ) + print(f"Sensitive data result: {result2}") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/openai_agents/agent_patterns/run_parallelization_workflow.py b/openai_agents/agent_patterns/run_parallelization_workflow.py new file mode 100644 index 00000000..5b8d9f5d --- /dev/null +++ b/openai_agents/agent_patterns/run_parallelization_workflow.py @@ -0,0 +1,31 @@ +import asyncio + +from temporalio.client import Client +from temporalio.contrib.openai_agents import OpenAIAgentsPlugin + +from openai_agents.agent_patterns.workflows.parallelization_workflow import ( + ParallelizationWorkflow, +) + + +async def main(): + # Create client connected to server at the given address + client = await Client.connect( + "localhost:7233", + plugins=[ + OpenAIAgentsPlugin(), + ], + ) + + # Execute a workflow + result = await client.execute_workflow( + ParallelizationWorkflow.run, + "Hello, world! How are you today?", + id="parallelization-workflow-example", + task_queue="openai-agents-patterns-task-queue", + ) + print(f"Result: {result}") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/openai_agents/agent_patterns/run_routing_workflow.py b/openai_agents/agent_patterns/run_routing_workflow.py new file mode 100644 index 00000000..51c28233 --- /dev/null +++ b/openai_agents/agent_patterns/run_routing_workflow.py @@ -0,0 +1,29 @@ +import asyncio + +from temporalio.client import Client +from temporalio.contrib.openai_agents import OpenAIAgentsPlugin + +from openai_agents.agent_patterns.workflows.routing_workflow import RoutingWorkflow + + +async def main(): + # Create client connected to server at the given address + client = await Client.connect( + "localhost:7233", + plugins=[ + OpenAIAgentsPlugin(), + ], + ) + + # Execute a workflow + result = await client.execute_workflow( + RoutingWorkflow.run, + "Bonjour! Comment allez-vous aujourd'hui?", + id="routing-workflow-example", + task_queue="openai-agents-patterns-task-queue", + ) + print(f"Result: {result}") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/openai_agents/agent_patterns/run_worker.py b/openai_agents/agent_patterns/run_worker.py index 3ca0eda2..4edb8ae4 100644 --- a/openai_agents/agent_patterns/run_worker.py +++ b/openai_agents/agent_patterns/run_worker.py @@ -10,6 +10,25 @@ from openai_agents.agent_patterns.workflows.agents_as_tools_workflow import ( AgentsAsToolsWorkflow, ) +from openai_agents.agent_patterns.workflows.deterministic_workflow import ( + DeterministicWorkflow, +) +from openai_agents.agent_patterns.workflows.forcing_tool_use_workflow import ( + ForcingToolUseWorkflow, +) +from openai_agents.agent_patterns.workflows.input_guardrails_workflow import ( + InputGuardrailsWorkflow, +) +from openai_agents.agent_patterns.workflows.llm_as_a_judge_workflow import ( + LLMAsAJudgeWorkflow, +) +from openai_agents.agent_patterns.workflows.output_guardrails_workflow import ( + OutputGuardrailsWorkflow, +) +from openai_agents.agent_patterns.workflows.parallelization_workflow import ( + ParallelizationWorkflow, +) +from openai_agents.agent_patterns.workflows.routing_workflow import RoutingWorkflow async def main(): @@ -27,9 +46,16 @@ async def main(): worker = Worker( client, - task_queue="openai-agents-task-queue", + task_queue="openai-agents-patterns-task-queue", workflows=[ AgentsAsToolsWorkflow, + DeterministicWorkflow, + ParallelizationWorkflow, + LLMAsAJudgeWorkflow, + ForcingToolUseWorkflow, + InputGuardrailsWorkflow, + OutputGuardrailsWorkflow, + RoutingWorkflow, ], ) await worker.run() diff --git a/openai_agents/agent_patterns/workflows/deterministic_workflow.py b/openai_agents/agent_patterns/workflows/deterministic_workflow.py new file mode 100644 index 00000000..339e7a0f --- /dev/null +++ b/openai_agents/agent_patterns/workflows/deterministic_workflow.py @@ -0,0 +1,90 @@ +from agents import Agent, RunConfig, Runner, trace +from pydantic import BaseModel +from temporalio import workflow + +""" +This example demonstrates a deterministic flow, where each step is performed by an agent. +1. The first agent generates a story outline +2. We feed the outline into the second agent +3. The second agent checks if the outline is good quality and if it is a scifi story +4. If the outline is not good quality or not a scifi story, we stop here +5. If the outline is good quality and a scifi story, we feed the outline into the third agent +6. The third agent writes the story + +*Adapted from the OpenAI Agents SDK deterministic pattern example* +""" + + +class OutlineCheckerOutput(BaseModel): + good_quality: bool + is_scifi: bool + + +def story_outline_agent() -> Agent: + return Agent( + name="story_outline_agent", + instructions="Generate a very short story outline based on the user's input.", + ) + + +def outline_checker_agent() -> Agent: + return Agent( + name="outline_checker_agent", + instructions="Read the given story outline, and judge the quality. Also, determine if it is a scifi story.", + output_type=OutlineCheckerOutput, + ) + + +def story_agent() -> Agent: + return Agent( + name="story_agent", + instructions="Write a short story based on the given outline.", + output_type=str, + ) + + +@workflow.defn +class DeterministicWorkflow: + @workflow.run + async def run(self, input_prompt: str) -> str: + config = RunConfig() + + # Ensure the entire workflow is a single trace + with trace("Deterministic story flow"): + # 1. Generate an outline + outline_result = await Runner.run( + story_outline_agent(), + input_prompt, + run_config=config, + ) + workflow.logger.info("Outline generated") + + # 2. Check the outline + outline_checker_result = await Runner.run( + outline_checker_agent(), + outline_result.final_output, + run_config=config, + ) + + # 3. Add a gate to stop if the outline is not good quality or not a scifi story + assert isinstance(outline_checker_result.final_output, OutlineCheckerOutput) + if not outline_checker_result.final_output.good_quality: + workflow.logger.info("Outline is not good quality, so we stop here.") + return "Story generation stopped: Outline quality insufficient." + + if not outline_checker_result.final_output.is_scifi: + workflow.logger.info("Outline is not a scifi story, so we stop here.") + return "Story generation stopped: Outline is not science fiction." + + workflow.logger.info( + "Outline is good quality and a scifi story, so we continue to write the story." + ) + + # 4. Write the story + story_result = await Runner.run( + story_agent(), + outline_result.final_output, + run_config=config, + ) + + return f"Final story: {story_result.final_output}" diff --git a/openai_agents/agent_patterns/workflows/forcing_tool_use_workflow.py b/openai_agents/agent_patterns/workflows/forcing_tool_use_workflow.py new file mode 100644 index 00000000..a8eaedd6 --- /dev/null +++ b/openai_agents/agent_patterns/workflows/forcing_tool_use_workflow.py @@ -0,0 +1,83 @@ +from typing import Any, Literal + +from agents import ( + Agent, + FunctionToolResult, + ModelSettings, + RunConfig, + RunContextWrapper, + Runner, + ToolsToFinalOutputFunction, + ToolsToFinalOutputResult, + function_tool, +) +from pydantic import BaseModel +from temporalio import workflow + +""" +This example shows how to force the agent to use a tool. It uses `ModelSettings(tool_choice="required")` +to force the agent to use any tool. + +You can run it with 3 options: +1. `default`: The default behavior, which is to send the tool output to the LLM. In this case, + `tool_choice` is not set, because otherwise it would result in an infinite loop - the LLM would + call the tool, the tool would run and send the results to the LLM, and that would repeat + (because the model is forced to use a tool every time.) +2. `first_tool_result`: The first tool result is used as the final output. +3. `custom`: A custom tool use behavior function is used. The custom function receives all the tool + results, and chooses to use the first tool result to generate the final output. + +*Adapted from the OpenAI Agents SDK forcing_tool_use pattern example* +""" + + +class Weather(BaseModel): + city: str + temperature_range: str + conditions: str + + +@function_tool +def get_weather(city: str) -> Weather: + workflow.logger.info("[debug] get_weather called") + return Weather(city=city, temperature_range="14-20C", conditions="Sunny with wind") + + +async def custom_tool_use_behavior( + context: RunContextWrapper[Any], results: list[FunctionToolResult] +) -> ToolsToFinalOutputResult: + weather: Weather = results[0].output + return ToolsToFinalOutputResult( + is_final_output=True, final_output=f"{weather.city} is {weather.conditions}." + ) + + +@workflow.defn +class ForcingToolUseWorkflow: + @workflow.run + async def run(self, tool_use_behavior: str = "default") -> str: + config = RunConfig() + + if tool_use_behavior == "default": + behavior: Literal[ + "run_llm_again", "stop_on_first_tool" + ] | ToolsToFinalOutputFunction = "run_llm_again" + elif tool_use_behavior == "first_tool": + behavior = "stop_on_first_tool" + elif tool_use_behavior == "custom": + behavior = custom_tool_use_behavior + + agent = Agent( + name="Weather agent", + instructions="You are a helpful agent.", + tools=[get_weather], + tool_use_behavior=behavior, + model_settings=ModelSettings( + tool_choice="required" if tool_use_behavior != "default" else None + ), + ) + + result = await Runner.run( + agent, input="What's the weather in Tokyo?", run_config=config + ) + return str(result.final_output) diff --git a/openai_agents/agent_patterns/workflows/input_guardrails_workflow.py b/openai_agents/agent_patterns/workflows/input_guardrails_workflow.py new file mode 100644 index 00000000..83677840 --- /dev/null +++ b/openai_agents/agent_patterns/workflows/input_guardrails_workflow.py @@ -0,0 +1,87 @@ +from agents import ( + Agent, + GuardrailFunctionOutput, + InputGuardrailTripwireTriggered, + RunConfig, + RunContextWrapper, + Runner, + TResponseInputItem, + input_guardrail, +) +from pydantic import BaseModel +from temporalio import workflow + +""" +This example shows how to use input guardrails. + +Guardrails are checks that run in parallel to the agent's execution. +They can be used to do things like: +- Check if input messages are off-topic +- Check that input messages don't violate any policies +- Take over control of the agent's execution if an unexpected input is detected + +In this example, we'll setup an input guardrail that trips if the user is asking to do math homework. +If the guardrail trips, we'll respond with a refusal message. + +*Adapted from the OpenAI Agents SDK input_guardrails pattern example* +""" + + +class MathHomeworkOutput(BaseModel): + reasoning: str + is_math_homework: bool + + +guardrail_agent = Agent( + name="Guardrail check", + instructions="Check if the user is asking you to do their math homework.", + output_type=MathHomeworkOutput, +) + + +@input_guardrail +async def math_guardrail( + context: RunContextWrapper[None], + agent: Agent, + input: str | list[TResponseInputItem], +) -> GuardrailFunctionOutput: + """This is an input guardrail function, which happens to call an agent to check if the input + is a math homework question. + """ + result = await Runner.run(guardrail_agent, input, context=context.context) + final_output = result.final_output_as(MathHomeworkOutput) + + return GuardrailFunctionOutput( + output_info=final_output, + tripwire_triggered=final_output.is_math_homework, + ) + + +@workflow.defn +class InputGuardrailsWorkflow: + @workflow.run + async def run(self, user_input: str) -> str: + config = RunConfig() + agent = Agent( + name="Customer support agent", + instructions="You are a customer support agent. You help customers with their questions.", + input_guardrails=[math_guardrail], + ) + + input_data: list[TResponseInputItem] = [ + { + "role": "user", + "content": user_input, + } + ] + + try: + result = await Runner.run(agent, input_data, run_config=config) + return str(result.final_output) + except InputGuardrailTripwireTriggered: + # If the guardrail triggered, we instead return a refusal message + message = "Sorry, I can't help you with your math homework." + workflow.logger.info( + "Input guardrail triggered - refusing to help with math homework" + ) + return message diff --git a/openai_agents/agent_patterns/workflows/llm_as_a_judge_workflow.py b/openai_agents/agent_patterns/workflows/llm_as_a_judge_workflow.py new file mode 100644 index 00000000..7ad536f2 --- /dev/null +++ b/openai_agents/agent_patterns/workflows/llm_as_a_judge_workflow.py @@ -0,0 +1,86 @@ +from dataclasses import dataclass +from typing import Literal + +from agents import Agent, ItemHelpers, RunConfig, Runner, TResponseInputItem, trace +from temporalio import workflow + +""" +This example shows the LLM as a judge pattern. The first agent generates an outline for a story. +The second agent judges the outline and provides feedback. We loop until the judge is satisfied +with the outline. + +*Adapted from the OpenAI Agents SDK llm_as_a_judge pattern example* +""" + + +@dataclass +class EvaluationFeedback: + feedback: str + score: Literal["pass", "needs_improvement", "fail"] + + +def story_outline_generator() -> Agent: + return Agent[None]( + name="story_outline_generator", + instructions=( + "You generate a very short story outline based on the user's input." + "If there is any feedback provided, use it to improve the outline." + ), + ) + + +def evaluator() -> Agent: + return Agent[None]( + name="evaluator", + instructions=( + "You evaluate a story outline and decide if it's good enough." + "If it's not good enough, you provide feedback on what needs to be improved." + "Never give it a pass on the first try. After 5 attempts, you can give it a pass if story outline is good enough - do not go for perfection" + ), + output_type=EvaluationFeedback, + ) + + +@workflow.defn +class LLMAsAJudgeWorkflow: + @workflow.run + async def run(self, msg: str) -> str: + config = RunConfig() + input_items: list[TResponseInputItem] = [{"content": msg, "role": "user"}] + latest_outline: str | None = None + + # We'll run the entire workflow in a single trace + with trace("LLM as a judge"): + while True: + story_outline_result = await Runner.run( + story_outline_generator(), + input_items, + run_config=config, + ) + + input_items = story_outline_result.to_input_list() + latest_outline = ItemHelpers.text_message_outputs( + story_outline_result.new_items + ) + workflow.logger.info("Story outline generated") + + evaluator_result = await Runner.run( + evaluator(), + input_items, + run_config=config, + ) + result: EvaluationFeedback = evaluator_result.final_output + + workflow.logger.info(f"Evaluator score: {result.score}") + + if result.score == "pass": + workflow.logger.info("Story outline is good enough, exiting.") + break + + workflow.logger.info("Re-running with feedback") + + input_items.append( + {"content": f"Feedback: {result.feedback}", "role": "user"} + ) + + return f"Final story outline: {latest_outline}" diff --git a/openai_agents/agent_patterns/workflows/output_guardrails_workflow.py b/openai_agents/agent_patterns/workflows/output_guardrails_workflow.py new file mode 100644 index 00000000..58a306c9 --- /dev/null +++ b/openai_agents/agent_patterns/workflows/output_guardrails_workflow.py @@ -0,0 +1,78 @@ +from agents import ( + Agent, + GuardrailFunctionOutput, + OutputGuardrailTripwireTriggered, + RunConfig, + RunContextWrapper, + Runner, + output_guardrail, +) +from pydantic import BaseModel, Field +from temporalio import workflow + +""" +This example shows how to use output guardrails. + +Output guardrails are checks that run on the final output of an agent. +They can be used to do things like: +- Check if the output contains sensitive data +- Check if the output is a valid response to the user's message + +In this example, we'll use a (contrived) example where we check if the agent's response contains +a phone number. + +*Adapted from the OpenAI Agents SDK output_guardrails pattern example* +""" + + +class MessageOutput(BaseModel): + reasoning: str = Field( + description="Thoughts on how to respond to the user's message" + ) + response: str = Field(description="The response to the user's message") + user_name: str | None = Field( + description="The name of the user who sent the message, if known" + ) + + +@output_guardrail +async def sensitive_data_check( + context: RunContextWrapper, agent: Agent, output: MessageOutput +) -> GuardrailFunctionOutput: + phone_number_in_response = "650" in output.response + phone_number_in_reasoning = "650" in output.reasoning + + return GuardrailFunctionOutput( + output_info={ + "phone_number_in_response": phone_number_in_response, + "phone_number_in_reasoning": phone_number_in_reasoning, + }, + tripwire_triggered=phone_number_in_response or phone_number_in_reasoning, + ) + + +def assistant_agent() -> Agent: + return Agent( + name="Assistant", + instructions="You are a helpful assistant.", + output_type=MessageOutput, + output_guardrails=[sensitive_data_check], + ) + + +@workflow.defn +class OutputGuardrailsWorkflow: + @workflow.run + async def run(self, user_input: str) -> str: + config = RunConfig() + agent = assistant_agent() + + try: + result = await Runner.run(agent, user_input, run_config=config) + output = result.final_output_as(MessageOutput) + return f"Response: {output.response}" + except OutputGuardrailTripwireTriggered as e: + workflow.logger.info( + f"Output guardrail triggered. Info: {e.guardrail_result.output.output_info}" + ) + return f"Output guardrail triggered due to sensitive data detection. Info: {e.guardrail_result.output.output_info}" diff --git a/openai_agents/agent_patterns/workflows/parallelization_workflow.py b/openai_agents/agent_patterns/workflows/parallelization_workflow.py new file mode 100644 index 00000000..5a07a030 --- /dev/null +++ b/openai_agents/agent_patterns/workflows/parallelization_workflow.py @@ -0,0 +1,70 @@ +import asyncio + +from agents import Agent, ItemHelpers, RunConfig, Runner, trace +from temporalio import workflow + +""" +This example shows the parallelization pattern. We run the agent three times in parallel, and pick +the best result. + +*Adapted from the OpenAI Agents SDK parallelization pattern example* +""" + + +def spanish_agent() -> Agent: + return Agent( + name="spanish_agent", + instructions="You translate the user's message to Spanish", + ) + + +def translation_picker() -> Agent: + return Agent( + name="translation_picker", + instructions="You pick the best Spanish translation from the given options.", + ) + + +@workflow.defn +class ParallelizationWorkflow: + @workflow.run + async def run(self, msg: str) -> str: + config = RunConfig() + + # Ensure the entire workflow is a single trace + with trace("Parallel translation"): + # Run three translation agents in parallel + res_1, res_2, res_3 = await asyncio.gather( + Runner.run( + spanish_agent(), + msg, + run_config=config, + ), + Runner.run( + spanish_agent(), + msg, + run_config=config, + ), + Runner.run( + spanish_agent(), + msg, + run_config=config, + ), + ) + + outputs = [ + ItemHelpers.text_message_outputs(res_1.new_items), + ItemHelpers.text_message_outputs(res_2.new_items), + ItemHelpers.text_message_outputs(res_3.new_items), + ] + + translations = "\n\n".join(outputs) + workflow.logger.info(f"Generated translations:\n{translations}") + + best_translation = await Runner.run( + translation_picker(), + f"Input: {msg}\n\nTranslations:\n{translations}", + run_config=config, + ) + + return f"Best translation: {best_translation.final_output}" diff --git a/openai_agents/agent_patterns/workflows/routing_workflow.py b/openai_agents/agent_patterns/workflows/routing_workflow.py new file mode 100644 index 00000000..4d821349 --- /dev/null +++ b/openai_agents/agent_patterns/workflows/routing_workflow.py @@ -0,0 +1,67 @@ +from agents import Agent, RunConfig, Runner, TResponseInputItem, trace +from temporalio import workflow + +""" +This example shows the handoffs/routing pattern. The triage agent receives the first message, and +then hands off to the appropriate agent based on the language of the request. + +Note: This is adapted from the original streaming version to work with Temporal's non-streaming approach. + +*Adapted from the OpenAI Agents SDK routing pattern example* +""" + + +def french_agent() -> Agent: + return Agent( + name="french_agent", + instructions="You only speak French", + ) + + +def spanish_agent() -> Agent: + return Agent( + name="spanish_agent", + instructions="You only speak Spanish", + ) + + +def english_agent() -> Agent: + return Agent( + name="english_agent", + instructions="You only speak English", + ) + + +def triage_agent() -> Agent: + return Agent( + name="triage_agent", + instructions="Handoff to the appropriate agent based on the language of the request.", + handoffs=[french_agent(), spanish_agent(), english_agent()], + ) + + +@workflow.defn +class RoutingWorkflow: + @workflow.run + async def run(self, msg: str) -> str: + config = RunConfig() + + with trace("Routing example"): + inputs: list[TResponseInputItem] = [{"content": msg, "role": "user"}] + + # Run the triage agent to determine which language agent to handoff to + result = await Runner.run( + triage_agent(), + input=inputs, + run_config=config, + ) + + # Get the final response after handoff + # Note: current_agent attribute may not be available in all SDK versions + workflow.logger.info("Handoff completed") + + # Convert result to proper input format for next agent + inputs = result.to_input_list() + + # Return the result from the handoff (either the handoff agent's response or triage response) + return f"Response: {result.final_output}" diff --git a/openai_agents/basic/run_worker.py b/openai_agents/basic/run_worker.py index 94d6a882..f9bc0bf5 100644 --- a/openai_agents/basic/run_worker.py +++ b/openai_agents/basic/run_worker.py @@ -47,7 +47,7 @@ async def main(): worker = Worker( client, - task_queue="openai-agents-basic-task-queue", + task_queue="openai-agents-task-queue", workflows=[ HelloWorldAgent, ToolsWorkflow,