diff --git a/intro_acp_beeai/src/helpers.py b/intro_acp_beeai/src/helpers.py index e2507cb..c30fe04 100644 --- a/intro_acp_beeai/src/helpers.py +++ b/intro_acp_beeai/src/helpers.py @@ -1,8 +1,9 @@ -from typing import List, Dict +from typing import List from datetime import datetime, timezone from acp_sdk.models import Message, MessagePart import json + def flatten_messages(messages: List[Message]) -> str: """ Collapse the *text/plain* content of an entire Message list into one string. @@ -21,12 +22,13 @@ def flatten_messages(messages: List[Message]) -> str: if part.content_type == "text/plain" and part.content is not None ).strip() -def package_response(data: str | dict) -> Dict[str, List[Message]]: - if isinstance(data, dict): # auto-convert + +def package_response(data: str | dict) -> Message: + if isinstance(data, dict): # auto-convert data = json.dumps(data, separators=(",", ":")) assistant_message = Message( parts=[MessagePart(content=data)], created_at=datetime.now(timezone.utc), completed_at=datetime.now(timezone.utc), ) - return {"messages": [assistant_message]} + return assistant_message diff --git a/intro_acp_beeai/src/ticket_response_agent.py b/intro_acp_beeai/src/ticket_response_agent.py index a369c3a..6177371 100644 --- a/intro_acp_beeai/src/ticket_response_agent.py +++ b/intro_acp_beeai/src/ticket_response_agent.py @@ -1,29 +1,32 @@ -import sys import textwrap import os -#Pydantic Framework + +# Pydantic Framework from pydantic_ai import Agent from pydantic_ai.models.openai import OpenAIModel -from pydantic_ai.providers.openai import OpenAIProvider -#ACP SDK + +# ACP SDK from acp_sdk import Metadata -from acp_sdk.models import Message, MessagePart +from acp_sdk.models import Message from acp_sdk.server import RunYield, RunYieldResume, Server from collections.abc import AsyncGenerator -#Helpers + +# Helpers from helpers import package_response, flatten_messages -#load environment variables +# load environment variables from dotenv import load_dotenv + load_dotenv() # Set up the ACP server server = Server() -@server.agent( - metadata=Metadata(ui={"type": "hands-off"}) -) -async def ticket_response_agent(input: list[Message]) -> AsyncGenerator[RunYield, RunYieldResume]: + +@server.agent(metadata=Metadata(ui={"type": "hands-off"})) +async def ticket_response_agent( + input: list[Message], +) -> AsyncGenerator[RunYield, RunYieldResume]: """ An agent that responds to customer support tickets . """ @@ -31,8 +34,10 @@ async def ticket_response_agent(input: list[Message]) -> AsyncGenerator[RunYield model_name = os.getenv("MODEL_NAME", "gpt-4.1-mini-2025-04-14") model = OpenAIModel(model_name) - TicketResponseAgent = Agent( model=model, - system_prompt=(textwrap.dedent(""" + TicketResponseAgent = Agent( + model=model, + system_prompt=( + textwrap.dedent(""" You are a helpful customer support agent that creates clear, helpful, human-sounding replies to a customer. Tone & Style Matrix: Category | Primary Tone | Secondary Goals @@ -42,16 +47,18 @@ async def ticket_response_agent(input: list[Message]) -> AsyncGenerator[RunYield Account | Professional, supportive | Clarify account status or changes, confirm security measures Feedback | Appreciative, receptive | Thank the customer, highlight how feedback is used Other | Warm, helpful | Clarify intent, offer assistance - """))) + """) + ), + ) response = await TicketResponseAgent.run(user_prompt) - + yield package_response(response.output) -#Run these agents on different ports + +# Run these agents on different ports def run(): server.run(port=int(os.getenv("PORT", 8001)), self_registration=False) if __name__ == "__main__": run() - diff --git a/intro_acp_beeai/src/ticket_triage_agent.py b/intro_acp_beeai/src/ticket_triage_agent.py index 01cc0b9..7604256 100644 --- a/intro_acp_beeai/src/ticket_triage_agent.py +++ b/intro_acp_beeai/src/ticket_triage_agent.py @@ -1,76 +1,73 @@ import textwrap -#Framework imports +# Framework imports from beeai_framework.adapters.openai import OpenAIChatModel from beeai_framework.backend import UserMessage, SystemMessage -#ACP SDK imports +# ACP SDK imports from acp_sdk.models import Message from acp_sdk.server import RunYield, RunYieldResume, Server from collections.abc import AsyncGenerator -#Helper imports + +# Helper imports from helpers import package_response, flatten_messages from typing import List, Optional import os from pydantic import BaseModel, Field from dotenv import load_dotenv -#load environment variables +# load environment variables load_dotenv() # Set up the ACP server server = Server() + class TicketClassifierOutput(BaseModel): """Structured payload returned by the LLM for a single ticket.""" + category: List[str] = Field( description="Options: Billing, Technical, Complaint, Account, Feedback, Other" ) customer_name: Optional[str] = Field( - default=None, - description="Full customer name; null if not mentioned." + default=None, description="Full customer name; null if not mentioned." ) account_id: Optional[str] = Field( default=None, - description="Exact account identifier as it appears in the ticket." + description="Exact account identifier as it appears in the ticket.", ) product: Optional[str] = Field( - default=None, - description="Product/SKU referenced in the ticket." + default=None, description="Product/SKU referenced in the ticket." ) issue_summary: str = Field( description="concise plain-language summary of the problem, extracting key insights." ) - severity: str = Field( - description='One of: "critical", "high", "medium", "low".' - ) - sentiment: str = Field( - description='One of: "negative", "neutral", "positive".' - ) + severity: str = Field(description='One of: "critical", "high", "medium", "low".') + sentiment: str = Field(description='One of: "negative", "neutral", "positive".') incident_date: Optional[str] = Field( - default=None, - description="ISO-8601 date (YYYY-MM-DD) if provided." + default=None, description="ISO-8601 date (YYYY-MM-DD) if provided." ) @server.agent() -async def ticket_triage_agent(input: list[Message]) -> AsyncGenerator[RunYield, RunYieldResume]: - """ An agent that classifies customer support tickets. - """ - user_prompt = flatten_messages(input[-1:]) - model_name = os.getenv("MODEL_NAME", "gpt-4.1-mini-2025-04-14") - llm = OpenAIChatModel(model_name) - system_msg = SystemMessage(textwrap.dedent(""" +async def ticket_triage_agent( + input: list[Message], +) -> AsyncGenerator[RunYield, RunYieldResume]: + """An agent that classifies customer support tickets.""" + user_prompt = flatten_messages(input[-1:]) + model_name = os.getenv("MODEL_NAME", "gpt-4.1-mini-2025-04-14") + llm = OpenAIChatModel(model_name) + system_msg = SystemMessage( + textwrap.dedent(""" You are “Support-Sensei,” an AI assistant that must: 1. Choose the single best ticket category. 2. Extract the required fields. - """ - )) - response = await llm.create_structure( - schema=TicketClassifierOutput, - messages=[system_msg, UserMessage(user_prompt)], - ) - yield package_response(response.object) + """) + ) + response = await llm.create( + messages=[system_msg, UserMessage(user_prompt)], + ) + yield package_response(response.get_text_content()) def run(): @@ -78,4 +75,4 @@ def run(): if __name__ == "__main__": - run() \ No newline at end of file + run() diff --git a/intro_acp_beeai/src/ticket_workflow_agent.py b/intro_acp_beeai/src/ticket_workflow_agent.py index a92c079..0a20126 100644 --- a/intro_acp_beeai/src/ticket_workflow_agent.py +++ b/intro_acp_beeai/src/ticket_workflow_agent.py @@ -1,93 +1,101 @@ import textwrap -#Framework imports +# Framework imports from beeai_framework.adapters.openai import OpenAIChatModel from beeai_framework.backend import UserMessage, SystemMessage from pydantic_ai import Agent from pydantic_ai.models.openai import OpenAIModel from pydantic_ai.providers.openai import OpenAIProvider -#ACP SDK + +# ACP SDK from acp_sdk import Metadata from acp_sdk.models import Message, MessagePart from acp_sdk.server import RunYield, RunYieldResume, Server, Context from collections.abc import AsyncGenerator from acp_sdk.client import Client -#Helpers -from helpers import package_response, flatten_messages + +# Helpers +from helpers import flatten_messages from pydantic import BaseModel, Field from typing import List, Optional import os -#load environment variables + +# load environment variables from dotenv import load_dotenv + load_dotenv() -#set up the ACP server +# set up the ACP server server = Server() + class TicketClassifierOutput(BaseModel): """Structured payload returned by the LLM for a single ticket.""" + category: List[str] = Field( description="Options: Billing, Technical, Complaint, Account, Feedback, Other" ) customer_name: Optional[str] = Field( - default=None, - description="Full customer name; null if not mentioned." + default=None, description="Full customer name; null if not mentioned." ) account_id: Optional[str] = Field( default=None, - description="Exact account identifier as it appears in the ticket." + description="Exact account identifier as it appears in the ticket.", ) product: Optional[str] = Field( - default=None, - description="Product/SKU referenced in the ticket." + default=None, description="Product/SKU referenced in the ticket." ) issue_summary: str = Field( description="concise plain-language summary of the problem, extracting key insights." ) - severity: str = Field( - description='One of: "critical", "high", "medium", "low".' - ) - sentiment: str = Field( - description='One of: "negative", "neutral", "positive".' - ) + severity: str = Field(description='One of: "critical", "high", "medium", "low".') + sentiment: str = Field(description='One of: "negative", "neutral", "positive".') incident_date: Optional[str] = Field( - default=None, - description="ISO-8601 date (YYYY-MM-DD) if provided." + default=None, description="ISO-8601 date (YYYY-MM-DD) if provided." ) -#make Agents ACP Compatible +# make Agents ACP Compatible + @server.agent(name="ticket_triage_agent", metadata=Metadata(ui={"type": "hands-off"})) -async def ticket_triage_agent(input: list[Message]) -> AsyncGenerator[RunYield, RunYieldResume]: +async def ticket_triage_agent( + input: list[Message], +) -> AsyncGenerator[RunYield, RunYieldResume]: """An agent that classifies customer support tickets.""" user_prompt = flatten_messages(input[-1:]) model_name = os.getenv("MODEL_NAME", "gpt-4.1-mini-2025-04-14") llm = OpenAIChatModel(model_name) - system_msg = SystemMessage(textwrap.dedent(""" + system_msg = SystemMessage( + textwrap.dedent(""" You are “Support-Sensei,” an AI assistant that must: 1. Choose the single best ticket category. 2. Extract the required fields. """) ) - response = await llm.create_structure( - schema=TicketClassifierOutput, + response = await llm.create( messages=[system_msg, UserMessage(user_prompt)], ) - yield str(response.object) + yield str(response.get_text_content()) @server.agent(name="ticket_response_agent", metadata=Metadata(ui={"type": "hands-off"})) -async def ticket_response_agent(input: list[Message]) -> AsyncGenerator[RunYield, RunYieldResume]: +async def ticket_response_agent( + input: list[Message], +) -> AsyncGenerator[RunYield, RunYieldResume]: """ An agent that responds to customer support tickets . """ user_prompt = flatten_messages(input) model = os.getenv("MODEL_NAME", "gpt-4.1-mini-2025-04-14") - model = OpenAIModel(model, provider=OpenAIProvider(api_key=os.getenv('OPENAI_API_KEY'))) - TicketResponseAgent = Agent( model=model, - system_prompt=(textwrap.dedent(""" + model = OpenAIModel( + model, provider=OpenAIProvider(api_key=os.getenv("OPENAI_API_KEY")) + ) + TicketResponseAgent = Agent( + model=model, + system_prompt=( + textwrap.dedent(""" You are a helpful customer support agent that creates clear, helpful, human-sounding replies to a customer. Tone & Style Matrix: Category | Primary Tone | Secondary Goals @@ -97,32 +105,40 @@ async def ticket_response_agent(input: list[Message]) -> AsyncGenerator[RunYield Account | Professional, supportive | Clarify account status or changes, confirm security measures Feedback | Appreciative, receptive | Thank the customer, highlight how feedback is used Other | Warm, helpful | Clarify intent, offer assistance - """))) + """) + ), + ) response = await TicketResponseAgent.run(user_prompt) - + yield str(response.output) -#Main ACP Agent that orchestrates the workflow +# Main ACP Agent that orchestrates the workflow async def run_agent(agent: str, input: str) -> list[Message]: async with Client(base_url="http://localhost:8000") as client: run = await client.run_sync( - agent=agent, input=[Message(parts=[MessagePart(content=input, content_type="text/plain")])] + agent=agent, + input=[ + Message(parts=[MessagePart(content=input, content_type="text/plain")]) + ], ) return run.output + @server.agent(name="TicketWorkflow", metadata=Metadata(ui={"type": "hands-off"})) async def main_agent(input: list[Message], context: Context) -> AsyncGenerator: """ Main agent that orchestrates the ticket triage and response workflow. """ ticket_triage_response = await run_agent("ticket_triage_agent", str(input)) - ticket_response_to_user = await run_agent("ticket_response_agent", str(ticket_triage_response)) + ticket_response_to_user = await run_agent( + "ticket_response_agent", str(ticket_triage_response) + ) - yield str(ticket_triage_response[0]) yield str(ticket_response_to_user[0]) -#Run these agents + +# Run these agents def run(): server.run()