Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions intro_acp_beeai/src/helpers.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
from typing import List, Dict
from typing import List
from datetime import datetime, timezone
from acp_sdk.models import Message, MessagePart
import json


def flatten_messages(messages: List[Message]) -> str:
"""
Collapse the *text/plain* content of an entire Message list into one string.
Expand All @@ -21,12 +22,13 @@ def flatten_messages(messages: List[Message]) -> str:
if part.content_type == "text/plain" and part.content is not None
).strip()

def package_response(data: str | dict) -> Dict[str, List[Message]]:
if isinstance(data, dict): # auto-convert

def package_response(data: str | dict) -> Message:
if isinstance(data, dict): # auto-convert
data = json.dumps(data, separators=(",", ":"))
assistant_message = Message(
parts=[MessagePart(content=data)],
created_at=datetime.now(timezone.utc),
completed_at=datetime.now(timezone.utc),
)
return {"messages": [assistant_message]}
return assistant_message
41 changes: 24 additions & 17 deletions intro_acp_beeai/src/ticket_response_agent.py
Original file line number Diff line number Diff line change
@@ -1,38 +1,43 @@
import sys
import textwrap
import os
#Pydantic Framework

# Pydantic Framework
from pydantic_ai import Agent
from pydantic_ai.models.openai import OpenAIModel
from pydantic_ai.providers.openai import OpenAIProvider
#ACP SDK

# ACP SDK
from acp_sdk import Metadata
from acp_sdk.models import Message, MessagePart
from acp_sdk.models import Message
from acp_sdk.server import RunYield, RunYieldResume, Server
from collections.abc import AsyncGenerator
#Helpers

# Helpers
from helpers import package_response, flatten_messages

#load environment variables
# load environment variables
from dotenv import load_dotenv

load_dotenv()

# Set up the ACP server
server = Server()

@server.agent(
metadata=Metadata(ui={"type": "hands-off"})
)
async def ticket_response_agent(input: list[Message]) -> AsyncGenerator[RunYield, RunYieldResume]:

@server.agent(metadata=Metadata(ui={"type": "hands-off"}))
async def ticket_response_agent(
input: list[Message],
) -> AsyncGenerator[RunYield, RunYieldResume]:
"""
An agent that responds to customer support tickets .
"""
user_prompt = flatten_messages(input)

model_name = os.getenv("MODEL_NAME", "gpt-4.1-mini-2025-04-14")
model = OpenAIModel(model_name)
TicketResponseAgent = Agent( model=model,
system_prompt=(textwrap.dedent("""
TicketResponseAgent = Agent(
model=model,
system_prompt=(
textwrap.dedent("""
You are a helpful customer support agent that creates clear, helpful, human-sounding replies to a customer.
Tone & Style Matrix:
Category | Primary Tone | Secondary Goals
Expand All @@ -42,16 +47,18 @@ async def ticket_response_agent(input: list[Message]) -> AsyncGenerator[RunYield
Account | Professional, supportive | Clarify account status or changes, confirm security measures
Feedback | Appreciative, receptive | Thank the customer, highlight how feedback is used
Other | Warm, helpful | Clarify intent, offer assistance
""")))
""")
),
)
response = await TicketResponseAgent.run(user_prompt)

yield package_response(response.output)

#Run these agents on different ports

# Run these agents on different ports
def run():
server.run(port=int(os.getenv("PORT", 8001)), self_registration=False)


if __name__ == "__main__":
run()

61 changes: 29 additions & 32 deletions intro_acp_beeai/src/ticket_triage_agent.py
Original file line number Diff line number Diff line change
@@ -1,81 +1,78 @@
import textwrap

#Framework imports
# Framework imports
from beeai_framework.adapters.openai import OpenAIChatModel
from beeai_framework.backend import UserMessage, SystemMessage

#ACP SDK imports
# ACP SDK imports
from acp_sdk.models import Message
from acp_sdk.server import RunYield, RunYieldResume, Server
from collections.abc import AsyncGenerator
#Helper imports

# Helper imports
from helpers import package_response, flatten_messages
from typing import List, Optional
import os
from pydantic import BaseModel, Field
from dotenv import load_dotenv

#load environment variables
# load environment variables
load_dotenv()

# Set up the ACP server
server = Server()


class TicketClassifierOutput(BaseModel):
"""Structured payload returned by the LLM for a single ticket."""

category: List[str] = Field(
description="Options: Billing, Technical, Complaint, Account, Feedback, Other"
)
customer_name: Optional[str] = Field(
default=None,
description="Full customer name; null if not mentioned."
default=None, description="Full customer name; null if not mentioned."
)
account_id: Optional[str] = Field(
default=None,
description="Exact account identifier as it appears in the ticket."
description="Exact account identifier as it appears in the ticket.",
)
product: Optional[str] = Field(
default=None,
description="Product/SKU referenced in the ticket."
default=None, description="Product/SKU referenced in the ticket."
)
issue_summary: str = Field(
description="concise plain-language summary of the problem, extracting key insights."
)
severity: str = Field(
description='One of: "critical", "high", "medium", "low".'
)
sentiment: str = Field(
description='One of: "negative", "neutral", "positive".'
)
severity: str = Field(description='One of: "critical", "high", "medium", "low".')
sentiment: str = Field(description='One of: "negative", "neutral", "positive".')
incident_date: Optional[str] = Field(
default=None,
description="ISO-8601 date (YYYY-MM-DD) if provided."
default=None, description="ISO-8601 date (YYYY-MM-DD) if provided."
)


@server.agent()
async def ticket_triage_agent(input: list[Message]) -> AsyncGenerator[RunYield, RunYieldResume]:
""" An agent that classifies customer support tickets.
"""
user_prompt = flatten_messages(input[-1:])
model_name = os.getenv("MODEL_NAME", "gpt-4.1-mini-2025-04-14")
llm = OpenAIChatModel(model_name)
system_msg = SystemMessage(textwrap.dedent("""
async def ticket_triage_agent(
input: list[Message],
) -> AsyncGenerator[RunYield, RunYieldResume]:
"""An agent that classifies customer support tickets."""
user_prompt = flatten_messages(input[-1:])
model_name = os.getenv("MODEL_NAME", "gpt-4.1-mini-2025-04-14")
llm = OpenAIChatModel(model_name)
system_msg = SystemMessage(
textwrap.dedent("""
You are “Support-Sensei,” an AI assistant that must:
1. Choose the single best ticket category.
2. Extract the required fields.
"""
))
response = await llm.create_structure(
schema=TicketClassifierOutput,
messages=[system_msg, UserMessage(user_prompt)],
)
yield package_response(response.object)
""")
)
response = await llm.create(
messages=[system_msg, UserMessage(user_prompt)],
)
yield package_response(response.get_text_content())


def run():
server.run(port=int(os.getenv("PORT", 8000)), self_registration=False)


if __name__ == "__main__":
run()
run()
88 changes: 52 additions & 36 deletions intro_acp_beeai/src/ticket_workflow_agent.py
Original file line number Diff line number Diff line change
@@ -1,93 +1,101 @@
import textwrap

#Framework imports
# Framework imports
from beeai_framework.adapters.openai import OpenAIChatModel
from beeai_framework.backend import UserMessage, SystemMessage
from pydantic_ai import Agent
from pydantic_ai.models.openai import OpenAIModel
from pydantic_ai.providers.openai import OpenAIProvider
#ACP SDK

# ACP SDK
from acp_sdk import Metadata
from acp_sdk.models import Message, MessagePart
from acp_sdk.server import RunYield, RunYieldResume, Server, Context
from collections.abc import AsyncGenerator
from acp_sdk.client import Client
#Helpers
from helpers import package_response, flatten_messages

# Helpers
from helpers import flatten_messages
from pydantic import BaseModel, Field
from typing import List, Optional
import os
#load environment variables

# load environment variables
from dotenv import load_dotenv

load_dotenv()

#set up the ACP server
# set up the ACP server
server = Server()


class TicketClassifierOutput(BaseModel):
"""Structured payload returned by the LLM for a single ticket."""

category: List[str] = Field(
description="Options: Billing, Technical, Complaint, Account, Feedback, Other"
)
customer_name: Optional[str] = Field(
default=None,
description="Full customer name; null if not mentioned."
default=None, description="Full customer name; null if not mentioned."
)
account_id: Optional[str] = Field(
default=None,
description="Exact account identifier as it appears in the ticket."
description="Exact account identifier as it appears in the ticket.",
)
product: Optional[str] = Field(
default=None,
description="Product/SKU referenced in the ticket."
default=None, description="Product/SKU referenced in the ticket."
)
issue_summary: str = Field(
description="concise plain-language summary of the problem, extracting key insights."
)
severity: str = Field(
description='One of: "critical", "high", "medium", "low".'
)
sentiment: str = Field(
description='One of: "negative", "neutral", "positive".'
)
severity: str = Field(description='One of: "critical", "high", "medium", "low".')
sentiment: str = Field(description='One of: "negative", "neutral", "positive".')
incident_date: Optional[str] = Field(
default=None,
description="ISO-8601 date (YYYY-MM-DD) if provided."
default=None, description="ISO-8601 date (YYYY-MM-DD) if provided."
)


#make Agents ACP Compatible
# make Agents ACP Compatible


@server.agent(name="ticket_triage_agent", metadata=Metadata(ui={"type": "hands-off"}))
async def ticket_triage_agent(input: list[Message]) -> AsyncGenerator[RunYield, RunYieldResume]:
async def ticket_triage_agent(
input: list[Message],
) -> AsyncGenerator[RunYield, RunYieldResume]:
"""An agent that classifies customer support tickets."""
user_prompt = flatten_messages(input[-1:])
model_name = os.getenv("MODEL_NAME", "gpt-4.1-mini-2025-04-14")
llm = OpenAIChatModel(model_name)
system_msg = SystemMessage(textwrap.dedent("""
system_msg = SystemMessage(
textwrap.dedent("""
You are “Support-Sensei,” an AI assistant that must:
1. Choose the single best ticket category.
2. Extract the required fields.
""")
)
response = await llm.create_structure(
schema=TicketClassifierOutput,
response = await llm.create(
messages=[system_msg, UserMessage(user_prompt)],
)
yield str(response.object)
yield str(response.get_text_content())


@server.agent(name="ticket_response_agent", metadata=Metadata(ui={"type": "hands-off"}))
async def ticket_response_agent(input: list[Message]) -> AsyncGenerator[RunYield, RunYieldResume]:
async def ticket_response_agent(
input: list[Message],
) -> AsyncGenerator[RunYield, RunYieldResume]:
"""
An agent that responds to customer support tickets .
"""
user_prompt = flatten_messages(input)

model = os.getenv("MODEL_NAME", "gpt-4.1-mini-2025-04-14")
model = OpenAIModel(model, provider=OpenAIProvider(api_key=os.getenv('OPENAI_API_KEY')))
TicketResponseAgent = Agent( model=model,
system_prompt=(textwrap.dedent("""
model = OpenAIModel(
model, provider=OpenAIProvider(api_key=os.getenv("OPENAI_API_KEY"))
)
TicketResponseAgent = Agent(
model=model,
system_prompt=(
textwrap.dedent("""
You are a helpful customer support agent that creates clear, helpful, human-sounding replies to a customer.
Tone & Style Matrix:
Category | Primary Tone | Secondary Goals
Expand All @@ -97,32 +105,40 @@ async def ticket_response_agent(input: list[Message]) -> AsyncGenerator[RunYield
Account | Professional, supportive | Clarify account status or changes, confirm security measures
Feedback | Appreciative, receptive | Thank the customer, highlight how feedback is used
Other | Warm, helpful | Clarify intent, offer assistance
""")))
""")
),
)
response = await TicketResponseAgent.run(user_prompt)

yield str(response.output)


#Main ACP Agent that orchestrates the workflow
# Main ACP Agent that orchestrates the workflow
async def run_agent(agent: str, input: str) -> list[Message]:
async with Client(base_url="http://localhost:8000") as client:
run = await client.run_sync(
agent=agent, input=[Message(parts=[MessagePart(content=input, content_type="text/plain")])]
agent=agent,
input=[
Message(parts=[MessagePart(content=input, content_type="text/plain")])
],
)
return run.output


@server.agent(name="TicketWorkflow", metadata=Metadata(ui={"type": "hands-off"}))
async def main_agent(input: list[Message], context: Context) -> AsyncGenerator:
"""
Main agent that orchestrates the ticket triage and response workflow.
"""
ticket_triage_response = await run_agent("ticket_triage_agent", str(input))
ticket_response_to_user = await run_agent("ticket_response_agent", str(ticket_triage_response))
ticket_response_to_user = await run_agent(
"ticket_response_agent", str(ticket_triage_response)
)

yield str(ticket_triage_response[0])
yield str(ticket_response_to_user[0])

#Run these agents

# Run these agents
def run():
server.run()

Expand Down