diff --git a/speech-to-speech/workshops/README.md b/speech-to-speech/workshops/README.md index 2303c498..3e2c93c7 100644 --- a/speech-to-speech/workshops/README.md +++ b/speech-to-speech/workshops/README.md @@ -1,6 +1,6 @@ # Nova S2S workshop sample code -> August 26, 2025 🆕🚀 A new Nova Sonic Multi-Agent Architecture lab using Amazon Bedrock AgentCore has been added. For more information and sample code refer to [./agent-core](./agent-core/) folder. +> August 26, 2025 🆕🚀 A new Nova Sonic Multi-Agent Architecture lab using Amazon Bedrock AgentCore has been added. For more information and sample code refer to [./agent-core](./agent-core/README.md) folder. This project is for the [Amazon Nova Sonic speech-to-speech (S2S) workshop](https://catalog.workshops.aws/amazon-nova-sonic-s2s/en-US) and is intended for training purposes. It showcases a sample architecture for building applications that integrate with Nova Sonic, with features specifically designed to expose technical details for educational use. @@ -106,7 +106,7 @@ cd nova-s2s-workshop npm start ``` -When using Chrome, if there’s no sound, please ensure the sound setting is set to Allow, as shown below. +When using Chrome, if there's no sound, please ensure the sound setting is set to Allow, as shown below. ![chrome-sound](./static/chrome-sound-setting.png) ⚠️ **Warning:** Known issue: This UI is intended for demonstration purposes and may encounter state management issues after frequent conversation start/stop actions. Refreshing the page can help resolve the issue. @@ -229,7 +229,79 @@ python server.py --agent strands ``` - You can then try asking questions using the sample UI such as: ``` -What’s the weather like in Seattle today? +What's the weather like in Seattle today? ``` Refer to [the Strands Agent lab](https://catalog.workshops.aws/amazon-nova-sonic-s2s/en-US/200-labs/02-repeatable-pattern/03-strands) for more detailed instructions. + +## Enable Observability + + + +### Prerequisites + +1. Enable transaction search on Amazon CloudWatch. First-time users must enable CloudWatch Transaction Search to view Bedrock AgentCore spans and traces. To enable transaction search, please refer to the our documentation. + + + + +2. Create log group and log stream in Amazon CloudWatch +```bash +import boto3 +cloudwatch_client = boto3.client("logs", region_name=region) +response = cloudwatch_client.create_log_group( + logGroupName='bedrock-agentcore-observability' +) +``` + +### Start backend with observability instrumentation + +Start the python server with the below shell command. + +```bash +run_server_with_telemetry.sh +``` + +The shells script reads the environment variables from the local .env file with the following variables + +```bash +# AWS OpenTelemetry Configuration +OTEL_PYTHON_DISTRO=aws_distro +OTEL_PYTHON_CONFIGURATOR=aws_configurator + +# Service Identification +OTEL_RESOURCE_ATTRIBUTES=service.name="s2s_agent" +AGENT_OBSERVABILITY_ENABLED=true +OTEL_EXPORTER_OTLP_PROTOCOL=http/protobuf +OTEL_EXPORTER_OTLP_TRACES_ENDPOINT="https://xray.us-east-1.amazonaws.com/v1/traces" + +# CloudWatch Integration +OTEL_EXPORTER_OTLP_LOGS_HEADERS=x-aws-log-group=bedrock-agentcore-observability,x-aws-log-stream=default,x-aws-metric-namespace=bedrock-agentcore + +# Instrumentation Exclusions - Extended to include all patterns +OTEL_PYTHON_FASTAPI_EXCLUDED_URLS="/ws/.*|/health|/metrics|.*websocket.*|/api/.*|.*\.amazonaws\.com.*|.*bedrock-runtime\..*|.*dynamodb\..*|.*cognito-identity\..*|.*s3\..*" +OTEL_PYTHON_REQUESTS_EXCLUDED_URLS="/ws/.*|/health|/metrics|.*websocket.*|/api/.*|.*\.amazonaws\.com.*|.*bedrock-runtime\..*|.*dynamodb\..*|.*cognito-identity\..*|.*s3\..*" +OTEL_PYTHON_URLLIB3_EXCLUDED_URLS="/ws/.*|/health|/metrics|.*websocket.*|/api/.*|.*\.amazonaws\.com.*|.*bedrock-runtime\..*|.*dynamodb\..*|.*cognito-identity\..*|.*s3\..*" +OTEL_PYTHON_HTTPX_EXCLUDED_URLS="/ws/.*|/health|/metrics|.*websocket.*|/api/.*|.*\.amazonaws\.com.*|.*bedrock-runtime\..*|.*dynamodb\..*|.*cognito-identity\..*|.*s3\..*" +OTEL_PYTHON_AIOHTTP_CLIENT_EXCLUDED_URLS="/ws/.*|/health|/metrics|.*websocket.*|/api/.*|.*\.amazonaws\.com.*|.*bedrock-runtime\..*|.*dynamodb\..*|.*cognito-identity\..*|.*s3\..*" +OTEL_PYTHON_BOTO3SQS_EXCLUDED_URLS="/ws/.*|/health|/metrics|.*websocket.*|/api/.*|.*\.amazonaws\.com.*|.*bedrock-runtime\..*|.*dynamodb\..*|.*cognito-identity\..*|.*s3\..*" +OTEL_PYTHON_BOTOCORE_EXCLUDED_URLS="/ws/.*|/health|/metrics|.*websocket.*|/api/.*|.*\.amazonaws\.com.*|.*bedrock-runtime\..*|.*dynamodb\..*|.*cognito-identity\..*|.*s3\..*" + +# Disable unwanted instrumentations +OTEL_PYTHON_DISABLED_INSTRUMENTATIONS="boto3sqs,botocore,requests,urllib3,httpx,aiohttp-client,asyncio,threading,logging,system_metrics,psutil,sqlite3,redis,pymongo,sqlalchemy,django,flask,tornado,pyramid,falcon,starlette,fastapi,websockets" + +# Propagation and Sampling +OTEL_PROPAGATORS=tracecontext,baggage,xray +OTEL_TRACES_SAMPLER=always_on +OTEL_BSP_SCHEDULE_DELAY=1000 +OTEL_BSP_MAX_EXPORT_BATCH_SIZE=512 +OTEL_BSP_EXPORT_TIMEOUT=30000 + +# AWS X-Ray specific +AWS_XRAY_TRACING_NAME=s2s_agent +AWS_XRAY_CONTEXT_MISSING=LOG_ERROR +``` + +This will create the following trace spans + + diff --git a/speech-to-speech/workshops/agent-core/banking_agent/.dockerignore b/speech-to-speech/workshops/agent-core/banking_agent/.dockerignore new file mode 100644 index 00000000..7a9716d5 --- /dev/null +++ b/speech-to-speech/workshops/agent-core/banking_agent/.dockerignore @@ -0,0 +1,68 @@ +# Build artifacts +build/ +dist/ +*.egg-info/ +*.egg + +# Python cache +__pycache__/ +__pycache__* +*.py[cod] +*$py.class +*.so +.Python + +# Virtual environments +.venv/ +.env +venv/ +env/ +ENV/ + +# Testing +.pytest_cache/ +.coverage +.coverage* +htmlcov/ +.tox/ +*.cover +.hypothesis/ +.mypy_cache/ +.ruff_cache/ + +# Development +*.log +*.bak +*.swp +*.swo +*~ +.DS_Store + +# IDEs +.vscode/ +.idea/ + +# Version control +.git/ +.gitignore +.gitattributes + +# Documentation +docs/ +*.md +!README.md + +# CI/CD +.github/ +.gitlab-ci.yml +.travis.yml + +# Project specific +tests/ + +# Bedrock AgentCore specific - keep config but exclude runtime files +.bedrock_agentcore.yaml +.dockerignore + +# Keep wheelhouse for offline installations +# wheelhouse/ diff --git a/speech-to-speech/workshops/agent-core/banking_agent/.env b/speech-to-speech/workshops/agent-core/banking_agent/.env new file mode 100644 index 00000000..1a0d9508 --- /dev/null +++ b/speech-to-speech/workshops/agent-core/banking_agent/.env @@ -0,0 +1,23 @@ +# AWS OpenTelemetry Configuration +OTEL_PYTHON_DISTRO=aws_distro +OTEL_PYTHON_CONFIGURATOR=aws_configurator +# OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:4318 +OTEL_EXPORTER_OTLP_TRACES_ENDPOINT="https://xray.us-east-1.amazonaws.com/v1/traces" +OTEL_EXPORTER_OTLP_PROTOCOL=http/protobuf + +# Service Identification +OTEL_RESOURCE_ATTRIBUTES=service.name=banking_local_agent +AGENT_OBSERVABILITY_ENABLED=true + +# CloudWatch Integration +OTEL_EXPORTER_OTLP_LOGS_HEADERS=x-aws-log-group=bedrock-agentcore-observability,x-aws-log-stream=default,x-aws-metric-namespace=bedrock-agentcore + +# Propagation and Sampling - Enabled for custom exporter +OTEL_PROPAGATORS=tracecontext,baggage,xray +OTEL_TRACES_SAMPLER=always_on +OTEL_BSP_SCHEDULE_DELAY=1000 +OTEL_BSP_MAX_EXPORT_BATCH_SIZE=512 +OTEL_BSP_EXPORT_TIMEOUT=30000 + +# Disable unwanted instrumentations +OTEL_PYTHON_DISABLED_INSTRUMENTATIONS="bedrock-agentcore,strands-agents,strands-agents-tools,boto3sqs,botocore,requests,urllib3,httpx,aiohttp-client,asyncio,threading,logging,system_metrics,psutil,sqlite3,redis,pymongo,sqlalchemy,django,flask,tornado,pyramid,falcon,starlette,fastapi,websockets" \ No newline at end of file diff --git a/speech-to-speech/workshops/agent-core/banking_agent/Dockerfile b/speech-to-speech/workshops/agent-core/banking_agent/Dockerfile new file mode 100644 index 00000000..434888d6 --- /dev/null +++ b/speech-to-speech/workshops/agent-core/banking_agent/Dockerfile @@ -0,0 +1,49 @@ +FROM ghcr.io/astral-sh/uv:python3.12-bookworm-slim +WORKDIR /app + +# Configure UV for container environment +ENV UV_SYSTEM_PYTHON=1 UV_COMPILE_BYTECODE=1 + + + +COPY requirements.txt requirements.txt +# Install from requirements file +RUN uv pip install -r requirements.txt + + + + +RUN uv pip install aws-opentelemetry-distro>=0.10.1 + + +# Set AWS region environment variable + +ENV AWS_REGION=us-east-1 +ENV AWS_DEFAULT_REGION=us-east-1 + + +# Signal that this is running in Docker for host binding logic +ENV DOCKER_CONTAINER=1 + +ENV OTEL_PYTHON_DISTRO=aws_distro +ENV OTEL_PYTHON_CONFIGURATOR=aws_configurator +ENV OTEL_EXPORTER_OTLP_TRACES_ENDPOINT=https://xray.us-east-1.amazonaws.com/v1/traces +ENV OTEL_EXPORTER_OTLP_PROTOCOL=http/protobuf +ENV OTEL_RESOURCE_ATTRIBUTES=service.name={agent_name} +ENV AGENT_OBSERVABILITY_ENABLED=true +ENV OTEL_EXPORTER_OTLP_LOGS_HEADERS=x-aws-log-group=bedrock-agentcore-observability,x-aws-log-stream=default,x-aws-metric-namespace=bedrock-agentcore +ENV OTEL_PYTHON_DISABLED_INSTRUMENTATIONS=bedrock-agentcore,strands-agents,strands-agents-tools,boto3sqs,botocore,requests,urllib3,httpx,aiohttp-client,asyncio,threading,logging,system_metrics,psutil,sqlite3,redis,pymongo,sqlalchemy,django,flask,tornado,pyramid,falcon,starlette,fastapi,websockets + +# Create non-root user +RUN useradd -m -u 1000 bedrock_agentcore +USER bedrock_agentcore + +EXPOSE 8080 +EXPOSE 8000 + +# Copy entire project (respecting .dockerignore) +COPY . . + +# Use the full module path + +CMD ["opentelemetry-instrument", "python", "-m", "banking_agent"] diff --git a/speech-to-speech/workshops/agent-core/banking_agent/banking_agent.py b/speech-to-speech/workshops/agent-core/banking_agent/banking_agent.py index 0756f4db..9af0fb6f 100644 --- a/speech-to-speech/workshops/agent-core/banking_agent/banking_agent.py +++ b/speech-to-speech/workshops/agent-core/banking_agent/banking_agent.py @@ -3,6 +3,21 @@ from bedrock_agentcore.runtime import BedrockAgentCoreApp from strands.models import BedrockModel import re, argparse +import datetime +import os + +# Import the tracer to create spans +from strands.telemetry.tracer import get_tracer +tracer = get_tracer() + +# # OpenTelemetry setup +from opentelemetry import baggage, context + +def set_session_context(session_id): + """Set the session ID in OpenTelemetry baggage for trace correlation""" + ctx = baggage.set_baggage("session.id", session_id) + token = context.attach(ctx) + return token app = BedrockAgentCoreApp() @@ -16,6 +31,7 @@ def get_account_balance(account_id) -> str: # In this sample, we use a mock response. # The actual implementation will retrieve information from a database API or another backend service. + result = { "account_id": "1234567890", "account_type": "Checking", @@ -38,6 +54,9 @@ def get_statement(account_id: str, year_and_month: str) -> str: """ # In this sample, we use a mock response. # The actual implementation will retrieve information from a database API or another backend service. + + + # A sample bank statement for August 2025 result = { "account_id": "1234567890", "account_type": "Checking", @@ -125,13 +144,27 @@ def get_statement(account_id: str, year_and_month: str) -> str: @app.entrypoint def banking_agent(payload): - response = agent(json.dumps(payload)) - output = response.message['content'][0]['text'] - if "" in output and "" in output: - match = re.search(r"(.*?)", output, re.DOTALL) - if match: - output = match.group(1) - return output + try: + #generate a random session id to associate with this agent run + import uuid + session_id = str(uuid.uuid4()) + + context_token = set_session_context(session_id) + + # Execute the agent + response = agent(json.dumps(payload)) + + # Process the response + output = response.message['content'][0]['text'] + if "" in output and "" in output: + match = re.search(r"(.*?)", output, re.DOTALL) + if match: + output = match.group(1) + return output + + finally: + context.detach(context_token) + pass if __name__ == "__main__": - app.run() \ No newline at end of file + app.run() diff --git a/speech-to-speech/workshops/agent-core/banking_agent/deploy.py b/speech-to-speech/workshops/agent-core/banking_agent/deploy.py index afa8d71b..20893b3a 100644 --- a/speech-to-speech/workshops/agent-core/banking_agent/deploy.py +++ b/speech-to-speech/workshops/agent-core/banking_agent/deploy.py @@ -20,6 +20,97 @@ ) print(f"Initialized docker file for {agent_name}") + +# Update the docker with the below environment variables for instrumentation with filtering + +def modify_dockerfile(dockerfile_path, env_vars): + """ + Read a Dockerfile, add environment variable definitions, and write it back. + + Args: + dockerfile_path (str): Path to the Dockerfile + env_vars (dict): Dictionary of environment variables to add (key-value pairs) + + Returns: + bool: True if successful, False otherwise + """ + try: + # Read the Dockerfile + with open(dockerfile_path, 'r') as file: + content = file.readlines() + + # Find the best position to add environment variables + # Strategy: Look for existing ENV statements and add after the last one + # If no ENV statements, add after the WORKDIR statement + # If no WORKDIR, add after the FROM statement + # If none of the above, add at the beginning + + env_positions = [] + workdir_positions = [] + from_positions = [] + + for i, line in enumerate(content): + if line.strip().startswith('ENV '): + env_positions.append(i) + elif line.strip().startswith('WORKDIR '): + workdir_positions.append(i) + elif line.strip().startswith('FROM '): + from_positions.append(i) + + if env_positions: + insert_position = env_positions[-1] + 1 + elif workdir_positions: + insert_position = workdir_positions[-1] + 1 + elif from_positions: + insert_position = from_positions[-1] + 1 + else: + insert_position = 0 + + # Create ENV statements for the new variables + env_statements = [] + for key, value in env_vars.items(): + env_statements.append(f"ENV {key}={value}\n") + + # If we're adding after an existing ENV statement, add a blank line for readability + if env_positions and insert_position > 0: + if not content[insert_position-1].strip() == '': + env_statements.insert(0, '\n') + + # Insert the new ENV statements + for i, statement in enumerate(env_statements): + content.insert(insert_position + i, statement) + + # Write the updated content back to the Dockerfile + with open(dockerfile_path, 'w') as file: + file.writelines(content) + + return True + + except Exception as e: + print(f"Error modifying Dockerfile: {e}") + return False + + +# Example usage +dockerfile_path = "Dockerfile" +env_vars = { + "OTEL_PYTHON_DISTRO": "aws_distro", + "OTEL_PYTHON_CONFIGURATOR": "aws_configurator", + "OTEL_EXPORTER_OTLP_TRACES_ENDPOINT": "https://xray.us-east-1.amazonaws.com/v1/traces", + "OTEL_EXPORTER_OTLP_PROTOCOL": "http/protobuf", + "OTEL_RESOURCE_ATTRIBUTES": "service.name={agent_name}", + "AGENT_OBSERVABILITY_ENABLED": "true", + "OTEL_EXPORTER_OTLP_LOGS_HEADERS":"x-aws-log-group=bedrock-agentcore-observability,x-aws-log-stream=default,x-aws-metric-namespace=bedrock-agentcore", + "OTEL_PYTHON_DISABLED_INSTRUMENTATIONS":"bedrock-agentcore,strands-agents,strands-agents-tools,boto3sqs,botocore,requests,urllib3,httpx,aiohttp-client,asyncio,threading,logging,system_metrics,psutil,sqlite3,redis,pymongo,sqlalchemy,django,flask,tornado,pyramid,falcon,starlette,fastapi,websockets" +} + +success = modify_dockerfile(dockerfile_path, env_vars) +if success: + print(f"Successfully updated {dockerfile_path} with new environment variables") +else: + print(f"Failed to update {dockerfile_path}") + + # launch agentCore runtime launch_result = agentcore_runtime.launch() print(f"Launching AgentCore runtime {agent_name}") diff --git a/speech-to-speech/workshops/agent-core/banking_agent/observability_deploy.ipynb b/speech-to-speech/workshops/agent-core/banking_agent/observability_deploy.ipynb new file mode 100644 index 00000000..e4bde827 --- /dev/null +++ b/speech-to-speech/workshops/agent-core/banking_agent/observability_deploy.ipynb @@ -0,0 +1,395 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "b6d95f9a", + "metadata": {}, + "source": [ + "# Setup Agents on AgentCore Runtime with filtered Instrumentation" + ] + }, + { + "cell_type": "markdown", + "id": "b1f49c0d", + "metadata": {}, + "source": [ + "## Prerequisites\n", + "\n", + "- Enable transaction search on Amazon CloudWatch. First-time users must enable CloudWatch Transaction Search to view Bedrock AgentCore spans and traces. To enable transaction search, please refer to the our documentation.\n", + "\n", + "\n", + "\n", + "\n", + "- Log group and Log stream configured on Amazon Cloudwatch to be added to the environment variables." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "79066ada", + "metadata": {}, + "outputs": [], + "source": [ + "from bedrock_agentcore_starter_toolkit import Runtime\n", + "import boto3\n", + "import time, argparse, os\n", + "\n", + "region = os.environ.get(\"AWS_DEFAULT_REGION\", \"us-east-1\")\n", + "\n", + "agent_name = \"banking_runtime_agent\"\n", + "entrypoint = \"./banking_agent.py\"\n", + "\n", + "# Prepare docker file\n", + "agentcore_runtime = Runtime()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "d45ce097", + "metadata": {}, + "outputs": [], + "source": [ + "import boto3\n", + "cloudwatch_client = boto3.client(\"logs\", region_name=region)\n", + "response = cloudwatch_client.create_log_group(\n", + " logGroupName='bedrock-agentcore-observability'\n", + ")\n", + "response" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "38f6574b", + "metadata": {}, + "outputs": [], + "source": [ + "response = cloudwatch_client.create_log_stream(\n", + " logGroupName='bedrock-agentcore-observability',\n", + " logStreamName='default'\n", + ")\n", + "response" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "b8a003ed", + "metadata": {}, + "outputs": [], + "source": [ + "\n", + "response = agentcore_runtime.configure(\n", + " entrypoint=entrypoint,\n", + " auto_create_execution_role=True,\n", + " auto_create_ecr=True,\n", + " requirements_file=\"requirements.txt\",\n", + " region=region,\n", + " agent_name=agent_name\n", + ")\n", + "print(f\"Initialized docker file for {agent_name}\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "13e7709a", + "metadata": {}, + "outputs": [], + "source": [ + "# Update the docker with the below environment variables for instrumentation with filtering\n", + "\n", + "def modify_dockerfile(dockerfile_path, env_vars):\n", + " \"\"\"\n", + " Read a Dockerfile, add environment variable definitions, and write it back.\n", + " \n", + " Args:\n", + " dockerfile_path (str): Path to the Dockerfile\n", + " env_vars (dict): Dictionary of environment variables to add (key-value pairs)\n", + " \n", + " Returns:\n", + " bool: True if successful, False otherwise\n", + " \"\"\"\n", + " try:\n", + " # Read the Dockerfile\n", + " with open(dockerfile_path, 'r') as file:\n", + " content = file.readlines()\n", + " \n", + " # Find the best position to add environment variables\n", + " # Strategy: Look for existing ENV statements and add after the last one\n", + " # If no ENV statements, add after the WORKDIR statement\n", + " # If no WORKDIR, add after the FROM statement\n", + " # If none of the above, add at the beginning\n", + " \n", + " env_positions = []\n", + " workdir_positions = []\n", + " from_positions = []\n", + " \n", + " for i, line in enumerate(content):\n", + " if line.strip().startswith('ENV '):\n", + " env_positions.append(i)\n", + " elif line.strip().startswith('WORKDIR '):\n", + " workdir_positions.append(i)\n", + " elif line.strip().startswith('FROM '):\n", + " from_positions.append(i)\n", + " \n", + " if env_positions:\n", + " insert_position = env_positions[-1] + 1\n", + " elif workdir_positions:\n", + " insert_position = workdir_positions[-1] + 1\n", + " elif from_positions:\n", + " insert_position = from_positions[-1] + 1\n", + " else:\n", + " insert_position = 0\n", + " \n", + " # Create ENV statements for the new variables\n", + " env_statements = []\n", + " for key, value in env_vars.items():\n", + " env_statements.append(f\"ENV {key}={value}\\n\")\n", + " \n", + " # If we're adding after an existing ENV statement, add a blank line for readability\n", + " if env_positions and insert_position > 0:\n", + " if not content[insert_position-1].strip() == '':\n", + " env_statements.insert(0, '\\n')\n", + " \n", + " # Insert the new ENV statements\n", + " for i, statement in enumerate(env_statements):\n", + " content.insert(insert_position + i, statement)\n", + " \n", + " # Write the updated content back to the Dockerfile\n", + " with open(dockerfile_path, 'w') as file:\n", + " file.writelines(content)\n", + " \n", + " return True\n", + " \n", + " except Exception as e:\n", + " print(f\"Error modifying Dockerfile: {e}\")\n", + " return False\n", + "\n", + "\n", + "# Example usage\n", + "dockerfile_path = \"Dockerfile\"\n", + "env_vars = {\n", + " \"OTEL_PYTHON_DISTRO\": \"aws_distro\",\n", + " \"OTEL_PYTHON_CONFIGURATOR\": \"aws_configurator\",\n", + " \"OTEL_EXPORTER_OTLP_TRACES_ENDPOINT\": \"https://xray.us-east-1.amazonaws.com/v1/traces\",\n", + " \"OTEL_EXPORTER_OTLP_PROTOCOL\": \"http/protobuf\",\n", + " \"OTEL_RESOURCE_ATTRIBUTES\": \"service.name=banking_runtime_agent\",\n", + " \"AGENT_OBSERVABILITY_ENABLED\": \"true\",\n", + " \"OTEL_EXPORTER_OTLP_LOGS_HEADERS\":\"x-aws-log-group=bedrock-agentcore-observability,x-aws-log-stream=default,x-aws-metric-namespace=bedrock-agentcore\",\n", + " \"OTEL_PYTHON_DISABLED_INSTRUMENTATIONS\":\"bedrock-agentcore,strands-agents,strands-agents-tools,boto3sqs,botocore,requests,urllib3,httpx,aiohttp-client,asyncio,threading,logging,system_metrics,psutil,sqlite3,redis,pymongo,sqlalchemy,django,flask,tornado,pyramid,falcon,starlette,fastapi,websockets\"\n", + "}\n", + "\n", + "success = modify_dockerfile(dockerfile_path, env_vars)\n", + "if success:\n", + " print(f\"Successfully updated {dockerfile_path} with new environment variables\")\n", + "else:\n", + " print(f\"Failed to update {dockerfile_path}\")\n", + "\n", + "\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "1a64b27e", + "metadata": {}, + "outputs": [], + "source": [ + "# launch agentCore runtime\n", + "launch_result = agentcore_runtime.launch()\n", + "print(f\"Launching AgentCore runtime {agent_name}\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "04a378be", + "metadata": {}, + "outputs": [], + "source": [ + "# Check agentcore runtime deployment status\n", + "status_response = agentcore_runtime.status()\n", + "status = status_response.endpoint['status']\n", + "end_status = ['READY', 'CREATE_FAILED', 'DELETE_FAILED', 'UPDATE_FAILED']\n", + "while status not in end_status:\n", + " time.sleep(10)\n", + " status_response = agentcore_runtime.status()\n", + " status = status_response.endpoint['status']\n", + " print(\".\")\n", + "print(\"AgentCore Runtime deployed succssfully:\", agent_name)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "ada0de23", + "metadata": {}, + "outputs": [], + "source": [ + "# Invoke the agentCore runtime\n", + "invoke_response = agentcore_runtime.invoke({\"prompt\": \"What’s the account balance for 1234567890?\"})\n", + "invoke_response" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "0d7c8705", + "metadata": {}, + "outputs": [], + "source": [ + "agent_arn = status_response.endpoint['agentRuntimeEndpointArn']\n", + "print(\"Agent ARN:\", agent_arn)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "1e5215ac", + "metadata": {}, + "outputs": [], + "source": [ + "# test with boto3 client\n", + "\n", + "import boto3\n", + "import json\n", + "from IPython.display import display, Markdown\n", + "region = 'us-east-1'\n", + "agentcore_client = boto3.client(\n", + " 'bedrock-agentcore',\n", + " region_name=region\n", + ")\n", + "\n", + "boto3_response = agentcore_client.invoke_agent_runtime(\n", + " agentRuntimeArn=agent_arn,\n", + " qualifier=\"default\",\n", + " payload=json.dumps({\"prompt\": \"What’s the account balance for 1234567890?\"})\n", + ")\n", + "if \"text/event-stream\" in boto3_response.get(\"contentType\", \"\"):\n", + " content = []\n", + " for line in boto3_response[\"response\"].iter_lines(chunk_size=1):\n", + " if line:\n", + " line = line.decode(\"utf-8\")\n", + " if line.startswith(\"data: \"):\n", + " line = line[6:]\n", + " print(line)\n", + " content.append(line)\n", + " display(Markdown(\"\\n\".join(content)))\n", + "else:\n", + " try:\n", + " events = []\n", + " for event in boto3_response.get(\"response\", []):\n", + " events.append(event)\n", + " except Exception as e:\n", + " events = [f\"Error reading EventStream: {e}\"]\n", + " display(Markdown(json.loads(events[0].decode(\"utf-8\"))))" + ] + }, + { + "cell_type": "markdown", + "id": "f0722e2a", + "metadata": {}, + "source": [ + "## Traces output when agent is hosted on runtime\n", + "\n", + "\n", + "\n", + "\n", + "## Traces output when agent is hosted outside of runtime\n", + "\n", + "" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "dea3bb92", + "metadata": {}, + "outputs": [], + "source": [ + "# get resources to clean up\n", + "launch_result.ecr_uri, launch_result.agent_id, launch_result.ecr_uri.split('/')[1]" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "3b3016b0", + "metadata": {}, + "outputs": [], + "source": [ + "# Cleanup the resources\n", + "agentcore_control_client = boto3.client(\n", + " 'bedrock-agentcore-control',\n", + " region_name=region\n", + ")\n", + "ecr_client = boto3.client(\n", + " 'ecr',\n", + " region_name=region\n", + " \n", + ")\n", + "\n", + "runtime_delete_response = agentcore_control_client.delete_agent_runtime(\n", + " agentRuntimeId=launch_result.agent_id,\n", + " \n", + ")\n", + "\n", + "response = ecr_client.delete_repository(\n", + " repositoryName=launch_result.ecr_uri.split('/')[1],\n", + " force=True\n", + ")" + ] + }, + { + "cell_type": "markdown", + "id": "51073a2c", + "metadata": {}, + "source": [ + "## Test endpoint locally" + ] + }, + { + "cell_type": "markdown", + "id": "41c0741e", + "metadata": {}, + "source": [ + "Run ./run_agent_with_telemetry.sh in cli which uses the environment variables from .env" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "4c2b3e32", + "metadata": {}, + "outputs": [], + "source": [ + "!curl -X POST http://localhost:8080/invocations \\\n", + "-H \"Content-Type: application/json\" \\\n", + "-d '{ \"input\": {\"prompt\": \"What’s the account balance for 1234567890?\"} }'" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": ".venv", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.12.8" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/speech-to-speech/workshops/agent-core/banking_agent/requirements.txt b/speech-to-speech/workshops/agent-core/banking_agent/requirements.txt index e977e03c..4adc0b12 100644 --- a/speech-to-speech/workshops/agent-core/banking_agent/requirements.txt +++ b/speech-to-speech/workshops/agent-core/banking_agent/requirements.txt @@ -1,6 +1,11 @@ -strands-agents -strands-agents-tools +strands-agents==1.8.0 +strands-agents-tools==0.2.7 uv boto3 bedrock-agentcore -bedrock-agentcore-starter-toolkit \ No newline at end of file +bedrock-agentcore-starter-toolkit +# Telemetry and observability +aws-opentelemetry-distro==0.12.0 +strands-agents[otel] +# Environment and configuration +python-dotenv>=0.19.0 \ No newline at end of file diff --git a/speech-to-speech/workshops/agent-core/banking_agent/run_agent_with_telemetry.sh b/speech-to-speech/workshops/agent-core/banking_agent/run_agent_with_telemetry.sh new file mode 100755 index 00000000..4b60d833 --- /dev/null +++ b/speech-to-speech/workshops/agent-core/banking_agent/run_agent_with_telemetry.sh @@ -0,0 +1,17 @@ +#!/bin/bash + +# Change to the backend directory +cd "$(dirname "$0")" + +# Load environment variables from .env file +if [ -f .env ]; then + export $(grep -v '^#' .env | xargs) + echo "Loaded environment variables from .env" +else + echo "Warning: .env file not found" +fi + +# # Run the server with OpenTelemetry instrumentation and collector config +# opentelemetry-instrument --config otel-collector-config.yaml python banking_agent.py +# Run the server with OpenTelemetry instrumentation and collector-less +opentelemetry-instrument python banking_agent.py \ No newline at end of file diff --git a/speech-to-speech/workshops/agent-core/deploy-agentcore-runtime.sh b/speech-to-speech/workshops/agent-core/deploy-agentcore-runtime.sh old mode 100644 new mode 100755 index 38b82014..c7c06782 --- a/speech-to-speech/workshops/agent-core/deploy-agentcore-runtime.sh +++ b/speech-to-speech/workshops/agent-core/deploy-agentcore-runtime.sh @@ -13,12 +13,14 @@ pip install --upgrade pip pip install -r requirements.txt # Deploy Strands agents to AgentCore Runtime -# Banking agent +# # Banking agent cd ./banking_agent python ./deploy.py +cd .. + # Mortgage agent -cd ../mortgage_agent +cd ./mortgage_agent python ./deploy.py cd .. diff --git a/speech-to-speech/workshops/agent-core/images/remote.png b/speech-to-speech/workshops/agent-core/images/remote.png new file mode 100644 index 00000000..860addfd Binary files /dev/null and b/speech-to-speech/workshops/agent-core/images/remote.png differ diff --git a/speech-to-speech/workshops/agent-core/images/runtime.png b/speech-to-speech/workshops/agent-core/images/runtime.png new file mode 100644 index 00000000..011960a7 Binary files /dev/null and b/speech-to-speech/workshops/agent-core/images/runtime.png differ diff --git a/speech-to-speech/workshops/agent-core/images/s2s_observability.png b/speech-to-speech/workshops/agent-core/images/s2s_observability.png new file mode 100644 index 00000000..6e254f5e Binary files /dev/null and b/speech-to-speech/workshops/agent-core/images/s2s_observability.png differ diff --git a/speech-to-speech/workshops/agent-core/images/transactional_search.png b/speech-to-speech/workshops/agent-core/images/transactional_search.png new file mode 100644 index 00000000..00b8fedf Binary files /dev/null and b/speech-to-speech/workshops/agent-core/images/transactional_search.png differ diff --git a/speech-to-speech/workshops/agent-core/mortgage_agent/.dockerignore b/speech-to-speech/workshops/agent-core/mortgage_agent/.dockerignore new file mode 100644 index 00000000..7a9716d5 --- /dev/null +++ b/speech-to-speech/workshops/agent-core/mortgage_agent/.dockerignore @@ -0,0 +1,68 @@ +# Build artifacts +build/ +dist/ +*.egg-info/ +*.egg + +# Python cache +__pycache__/ +__pycache__* +*.py[cod] +*$py.class +*.so +.Python + +# Virtual environments +.venv/ +.env +venv/ +env/ +ENV/ + +# Testing +.pytest_cache/ +.coverage +.coverage* +htmlcov/ +.tox/ +*.cover +.hypothesis/ +.mypy_cache/ +.ruff_cache/ + +# Development +*.log +*.bak +*.swp +*.swo +*~ +.DS_Store + +# IDEs +.vscode/ +.idea/ + +# Version control +.git/ +.gitignore +.gitattributes + +# Documentation +docs/ +*.md +!README.md + +# CI/CD +.github/ +.gitlab-ci.yml +.travis.yml + +# Project specific +tests/ + +# Bedrock AgentCore specific - keep config but exclude runtime files +.bedrock_agentcore.yaml +.dockerignore + +# Keep wheelhouse for offline installations +# wheelhouse/ diff --git a/speech-to-speech/workshops/agent-core/mortgage_agent/Dockerfile b/speech-to-speech/workshops/agent-core/mortgage_agent/Dockerfile new file mode 100644 index 00000000..5b347de1 --- /dev/null +++ b/speech-to-speech/workshops/agent-core/mortgage_agent/Dockerfile @@ -0,0 +1,49 @@ +FROM ghcr.io/astral-sh/uv:python3.12-bookworm-slim +WORKDIR /app + +# Configure UV for container environment +ENV UV_SYSTEM_PYTHON=1 UV_COMPILE_BYTECODE=1 + + + +COPY requirements.txt requirements.txt +# Install from requirements file +RUN uv pip install -r requirements.txt + + + + +RUN uv pip install aws-opentelemetry-distro>=0.10.1 + + +# Set AWS region environment variable + +ENV AWS_REGION=us-east-1 +ENV AWS_DEFAULT_REGION=us-east-1 + + +# Signal that this is running in Docker for host binding logic +ENV DOCKER_CONTAINER=1 + +ENV OTEL_PYTHON_DISTRO=aws_distro +ENV OTEL_PYTHON_CONFIGURATOR=aws_configurator +ENV OTEL_EXPORTER_OTLP_TRACES_ENDPOINT=https://xray.us-east-1.amazonaws.com/v1/traces +ENV OTEL_EXPORTER_OTLP_PROTOCOL=http/protobuf +ENV OTEL_RESOURCE_ATTRIBUTES=service.name={agent_name} +ENV AGENT_OBSERVABILITY_ENABLED=true +ENV OTEL_EXPORTER_OTLP_LOGS_HEADERS=x-aws-log-group=bedrock-agentcore-observability,x-aws-log-stream=default,x-aws-metric-namespace=bedrock-agentcore +ENV OTEL_PYTHON_DISABLED_INSTRUMENTATIONS=bedrock-agentcore,strands-agents,strands-agents-tools,boto3sqs,botocore,requests,urllib3,httpx,aiohttp-client,asyncio,threading,logging,system_metrics,psutil,sqlite3,redis,pymongo,sqlalchemy,django,flask,tornado,pyramid,falcon,starlette,fastapi,websockets + +# Create non-root user +RUN useradd -m -u 1000 bedrock_agentcore +USER bedrock_agentcore + +EXPOSE 8080 +EXPOSE 8000 + +# Copy entire project (respecting .dockerignore) +COPY . . + +# Use the full module path + +CMD ["opentelemetry-instrument", "python", "-m", "mortgage_agent"] diff --git a/speech-to-speech/workshops/agent-core/mortgage_agent/deploy.py b/speech-to-speech/workshops/agent-core/mortgage_agent/deploy.py index d3462411..6e60f2a4 100644 --- a/speech-to-speech/workshops/agent-core/mortgage_agent/deploy.py +++ b/speech-to-speech/workshops/agent-core/mortgage_agent/deploy.py @@ -19,6 +19,96 @@ ) print(f"Initialized docker file for {agent_name}") +# Update the docker with the below environment variables for instrumentation with filtering + +def modify_dockerfile(dockerfile_path, env_vars): + """ + Read a Dockerfile, add environment variable definitions, and write it back. + + Args: + dockerfile_path (str): Path to the Dockerfile + env_vars (dict): Dictionary of environment variables to add (key-value pairs) + + Returns: + bool: True if successful, False otherwise + """ + try: + # Read the Dockerfile + with open(dockerfile_path, 'r') as file: + content = file.readlines() + + # Find the best position to add environment variables + # Strategy: Look for existing ENV statements and add after the last one + # If no ENV statements, add after the WORKDIR statement + # If no WORKDIR, add after the FROM statement + # If none of the above, add at the beginning + + env_positions = [] + workdir_positions = [] + from_positions = [] + + for i, line in enumerate(content): + if line.strip().startswith('ENV '): + env_positions.append(i) + elif line.strip().startswith('WORKDIR '): + workdir_positions.append(i) + elif line.strip().startswith('FROM '): + from_positions.append(i) + + if env_positions: + insert_position = env_positions[-1] + 1 + elif workdir_positions: + insert_position = workdir_positions[-1] + 1 + elif from_positions: + insert_position = from_positions[-1] + 1 + else: + insert_position = 0 + + # Create ENV statements for the new variables + env_statements = [] + for key, value in env_vars.items(): + env_statements.append(f"ENV {key}={value}\n") + + # If we're adding after an existing ENV statement, add a blank line for readability + if env_positions and insert_position > 0: + if not content[insert_position-1].strip() == '': + env_statements.insert(0, '\n') + + # Insert the new ENV statements + for i, statement in enumerate(env_statements): + content.insert(insert_position + i, statement) + + # Write the updated content back to the Dockerfile + with open(dockerfile_path, 'w') as file: + file.writelines(content) + + return True + + except Exception as e: + print(f"Error modifying Dockerfile: {e}") + return False + + +# Example usage +dockerfile_path = "Dockerfile" +env_vars = { + "OTEL_PYTHON_DISTRO": "aws_distro", + "OTEL_PYTHON_CONFIGURATOR": "aws_configurator", + "OTEL_EXPORTER_OTLP_TRACES_ENDPOINT": "https://xray.us-east-1.amazonaws.com/v1/traces", + "OTEL_EXPORTER_OTLP_PROTOCOL": "http/protobuf", + "OTEL_RESOURCE_ATTRIBUTES": "service.name={agent_name}", + "AGENT_OBSERVABILITY_ENABLED": "true", + "OTEL_EXPORTER_OTLP_LOGS_HEADERS":"x-aws-log-group=bedrock-agentcore-observability,x-aws-log-stream=default,x-aws-metric-namespace=bedrock-agentcore", + "OTEL_PYTHON_DISABLED_INSTRUMENTATIONS":"bedrock-agentcore,strands-agents,strands-agents-tools,boto3sqs,botocore,requests,urllib3,httpx,aiohttp-client,asyncio,threading,logging,system_metrics,psutil,sqlite3,redis,pymongo,sqlalchemy,django,flask,tornado,pyramid,falcon,starlette,fastapi,websockets" +} + +success = modify_dockerfile(dockerfile_path, env_vars) +if success: + print(f"Successfully updated {dockerfile_path} with new environment variables") +else: + print(f"Failed to update {dockerfile_path}") + + # launch agentCore runtime launch_result = agentcore_runtime.launch() print(f"Launching AgentCore runtime {agent_name}") diff --git a/speech-to-speech/workshops/agent-core/requirements.txt b/speech-to-speech/workshops/agent-core/requirements.txt index e977e03c..dbe3c41e 100644 --- a/speech-to-speech/workshops/agent-core/requirements.txt +++ b/speech-to-speech/workshops/agent-core/requirements.txt @@ -1,6 +1,13 @@ -strands-agents -strands-agents-tools +strands-agents==1.8.0 +strands-agents-tools==0.2.7 uv boto3 bedrock-agentcore -bedrock-agentcore-starter-toolkit \ No newline at end of file +bedrock-agentcore-starter-toolkit +# Telemetry and observability +aws-opentelemetry-distro==0.12.0 +# aws-opentelemetry-distro~=0.10.1 + +strands-agents[otel] +# Environment and configuration +python-dotenv>=0.19.0 \ No newline at end of file diff --git a/speech-to-speech/workshops/python-server/.env b/speech-to-speech/workshops/python-server/.env new file mode 100644 index 00000000..aa4a059b --- /dev/null +++ b/speech-to-speech/workshops/python-server/.env @@ -0,0 +1,42 @@ +# AWS OpenTelemetry Configuration +OTEL_PYTHON_DISTRO=aws_distro +OTEL_PYTHON_CONFIGURATOR=aws_configurator + +# Service Identification +OTEL_RESOURCE_ATTRIBUTES=service.name="s2s_agent" +AGENT_OBSERVABILITY_ENABLED=true +OTEL_EXPORTER_OTLP_PROTOCOL=http/protobuf +OTEL_EXPORTER_OTLP_TRACES_ENDPOINT="https://xray.us-east-1.amazonaws.com/v1/traces" + +# CloudWatch Integration +OTEL_EXPORTER_OTLP_LOGS_HEADERS=x-aws-log-group=bedrock-agentcore-observability,x-aws-log-stream=default,x-aws-metric-namespace=bedrock-agentcore + +# Instrumentation Exclusions - Extended to include all patterns +OTEL_PYTHON_FASTAPI_EXCLUDED_URLS="/ws/.*|/health|/metrics|.*websocket.*|/api/.*|.*\.amazonaws\.com.*|.*bedrock-runtime\..*|.*dynamodb\..*|.*cognito-identity\..*|.*s3\..*" +OTEL_PYTHON_REQUESTS_EXCLUDED_URLS="/ws/.*|/health|/metrics|.*websocket.*|/api/.*|.*\.amazonaws\.com.*|.*bedrock-runtime\..*|.*dynamodb\..*|.*cognito-identity\..*|.*s3\..*" +OTEL_PYTHON_URLLIB3_EXCLUDED_URLS="/ws/.*|/health|/metrics|.*websocket.*|/api/.*|.*\.amazonaws\.com.*|.*bedrock-runtime\..*|.*dynamodb\..*|.*cognito-identity\..*|.*s3\..*" +OTEL_PYTHON_HTTPX_EXCLUDED_URLS="/ws/.*|/health|/metrics|.*websocket.*|/api/.*|.*\.amazonaws\.com.*|.*bedrock-runtime\..*|.*dynamodb\..*|.*cognito-identity\..*|.*s3\..*" +OTEL_PYTHON_AIOHTTP_CLIENT_EXCLUDED_URLS="/ws/.*|/health|/metrics|.*websocket.*|/api/.*|.*\.amazonaws\.com.*|.*bedrock-runtime\..*|.*dynamodb\..*|.*cognito-identity\..*|.*s3\..*" +OTEL_PYTHON_BOTO3SQS_EXCLUDED_URLS="/ws/.*|/health|/metrics|.*websocket.*|/api/.*|.*\.amazonaws\.com.*|.*bedrock-runtime\..*|.*dynamodb\..*|.*cognito-identity\..*|.*s3\..*" +OTEL_PYTHON_BOTOCORE_EXCLUDED_URLS="/ws/.*|/health|/metrics|.*websocket.*|/api/.*|.*\.amazonaws\.com.*|.*bedrock-runtime\..*|.*dynamodb\..*|.*cognito-identity\..*|.*s3\..*" + +# Disable unwanted instrumentations +OTEL_PYTHON_DISABLED_INSTRUMENTATIONS="boto3sqs,botocore,requests,urllib3,httpx,aiohttp-client,asyncio,threading,logging,system_metrics,psutil,sqlite3,redis,pymongo,sqlalchemy,django,flask,tornado,pyramid,falcon,starlette,fastapi,websockets" + +# Propagation and Sampling +OTEL_PROPAGATORS=tracecontext,baggage,xray +OTEL_TRACES_SAMPLER=always_on +OTEL_BSP_SCHEDULE_DELAY=1000 +OTEL_BSP_MAX_EXPORT_BATCH_SIZE=512 +OTEL_BSP_EXPORT_TIMEOUT=30000 + +# AWS X-Ray specific +AWS_XRAY_TRACING_NAME=s2s_agent +AWS_XRAY_CONTEXT_MISSING=LOG_ERROR + +# Debug Configuration (optional - enable when needed) +OTEL_LOG_LEVEL=DEBUG +OTEL_PYTHON_LOG_LEVEL=DEBUG +AWS_XRAY_DEBUG_MODE=true +OTEL_PYTHON_LOGGING_AUTO_INSTRUMENTATION_ENABLED=true +OTEL_PYTHON_LOGGING_LEVEL=DEBUG \ No newline at end of file diff --git a/speech-to-speech/workshops/python-server/requirements.txt b/speech-to-speech/workshops/python-server/requirements.txt index a71a7c5a..7e1f64bf 100644 --- a/speech-to-speech/workshops/python-server/requirements.txt +++ b/speech-to-speech/workshops/python-server/requirements.txt @@ -6,4 +6,10 @@ websockets==15.0.1 requests==2.32.3 aws_sdk_bedrock_runtime mcp==1.13.1 -strands-agents==0.1.6 \ No newline at end of file +strands-agents==0.1.6 + +# Telemetry and observability +aws-opentelemetry-distro~=0.10.1 +strands-agents[otel] +# Environment and configuration +python-dotenv>=0.19.0 \ No newline at end of file diff --git a/speech-to-speech/workshops/python-server/run_server_with_telemetry.sh b/speech-to-speech/workshops/python-server/run_server_with_telemetry.sh new file mode 100755 index 00000000..8f19d87a --- /dev/null +++ b/speech-to-speech/workshops/python-server/run_server_with_telemetry.sh @@ -0,0 +1,15 @@ +#!/bin/bash + +# Change to the backend directory +cd "$(dirname "$0")" + +# Load environment variables from .env file +if [ -f .env ]; then + export $(grep -v '^#' .env | xargs) + echo "Loaded environment variables from .env" +else + echo "Warning: .env file not found" +fi + +# Run the server with OpenTelemetry instrumentation +opentelemetry-instrument python server.py --agent strands \ No newline at end of file diff --git a/speech-to-speech/workshops/python-server/s2s_session_manager.py b/speech-to-speech/workshops/python-server/s2s_session_manager.py index ccfa97e4..6d7d1a9f 100644 --- a/speech-to-speech/workshops/python-server/s2s_session_manager.py +++ b/speech-to-speech/workshops/python-server/s2s_session_manager.py @@ -11,10 +11,18 @@ from smithy_aws_core.credentials_resolvers.environment import EnvironmentCredentialsResolver from integration import inline_agent, bedrock_knowledge_bases as kb, agent_core +# load environment variables from .env file +from dotenv import load_dotenv +import os +load_dotenv() + +# AgentCore Observability integration +from opentelemetry import baggage, context, trace + # Suppress warnings warnings.filterwarnings("ignore") -DEBUG = False +DEBUG = True def debug_print(message): """Print only if debug mode is enabled""" @@ -49,6 +57,261 @@ def __init__(self, region, model_id='amazon.nova-sonic-v1:0', mcp_client=None, s self.mcp_loc_client = mcp_client self.strands_agent = strands_agent + # Track content generation stages by contentId + self.content_stages = {} # Maps contentId to generationStage + + # Usage event tracking + self.usage_events = [] + self.token_usage = { + "totalInputTokens": 0, + "totalOutputTokens": 0, + "totalTokens": 0, + "details": { + "input": { + "speechTokens": 0, + "textTokens": 0 + }, + "output": { + "speechTokens": 0, + "textTokens": 0 + } + } + } + + # Telemetry + self.session_id = str(uuid.uuid4()) # Unique session ID + self.session_span = None + self._create_session_span() + + def _calculate_cost(self, token_usage): + """ + Calculate the cost based on token usage and Nova Sonic pricing. + + Args: + token_usage: Dictionary containing token usage information + + Returns: + Total cost in USD + """ + # Nova Sonic pricing information (USD per 1000 tokens) + NOVA_SONIC_PRICING = { + "speech_input": 0.0034, # $0.0034 per 1000 speech input tokens + "speech_output": 0.0136, # $0.0136 per 1000 speech output tokens + "text_input": 0.00006, # $0.00006 per 1000 text input tokens + "text_output": 0.00024 # $0.00024 per 1000 text output tokens + } + if not token_usage: + return 0.0 + + speech_input_tokens = token_usage.get('input', {}).get('speechTokens', 0) + text_input_tokens = token_usage.get('input', {}).get('textTokens', 0) + speech_output_tokens = token_usage.get('output', {}).get('speechTokens', 0) + text_output_tokens = token_usage.get('output', {}).get('textTokens', 0) + + # Calculate cost components (convert from price per 1000 tokens) + speech_input_cost = (speech_input_tokens / 1000) * NOVA_SONIC_PRICING["speech_input"] + text_input_cost = (text_input_tokens / 1000) * NOVA_SONIC_PRICING["text_input"] + speech_output_cost = (speech_output_tokens / 1000) * NOVA_SONIC_PRICING["speech_output"] + text_output_cost = (text_output_tokens / 1000) * NOVA_SONIC_PRICING["text_output"] + + # Calculate total cost + total_cost = speech_input_cost + text_input_cost + speech_output_cost + text_output_cost + debug_print(f"Calculated cost: {total_cost:.6f} USD for session {self.session_id}") + return total_cost + + def _create_session_span(self): + """Create the session span for telemetry when the session manager is initialized""" + if not self.session_id: + # Generate a session ID if not provided + self.session_id = str(uuid.uuid4()) + + # Create a session span (this implicitly creates a trace) + trace_name = f"{self.session_id}" + + # Set session context for telemetry + context_token = self.set_session_context(self.session_id) + debug_print(f"Session context set with token: {context_token}") + + # Get tracer for main application + try: + tracer = trace.get_tracer("s2s_agent", "1.0.0") + # Create the session span + self.session_span = tracer.start_span(trace_name) + if hasattr(self.session_span, 'set_attribute'): + self.session_span.set_attribute("session.id", self.session_id) + self.session_span.set_attribute("model.id", self.model_id) + self.session_span.set_attribute("region", self.region) + + except Exception as telemetry_error: + raise + + + def set_session_context(self, session_id): + """Set the session ID in OpenTelemetry baggage for trace correlation""" + ctx = baggage.set_baggage("session.id", session_id) + token = context.attach(ctx) + + return token + + + def _create_child_span(self, name, input=None, parent_span=None, metadata=None, output=None): + """Create a child span for telemetry using OpenTelemetry""" + + try: + debug_print(f"Creating child span: {name}") + # Get a tracer for the agent + tracer = trace.get_tracer("s2s_agent", "1.0.0") + + # Start a new span as a child of the parent span if provided + # If no parent span is provided, it will be a child of the current active span + span_context = None + if parent_span and isinstance(parent_span, trace.Span): + # If we have a parent span, use its context + debug_print("Using provided parent span for child span") + span_context = trace.set_span_in_context(parent_span) + + # Create the span with the provided name + span = tracer.start_span(name, context=span_context) + + # Add standard attributes + if hasattr(span, 'set_attribute'): + span.set_attribute("session.id", self.session_id) + + # Add input data if provided + if input: + self._add_attributes_to_span(span, input, "input") + + # Add metadata if provided + if metadata: + self._add_attributes_to_span(span, metadata, "") + + # Add output data if provided + if output: + self._add_attributes_to_span(span, output, "output") + + # Add start time event + span.add_event("span_started") + return span + except Exception as e: + raise + + def _add_attributes_to_span(self, span, data, prefix=""): + """ + Recursively add attributes to a span from complex data structures. + + Args: + span: The OpenTelemetry span to add attributes to + data: The data to add (can be dict, list, or primitive) + prefix: The attribute name prefix + """ + if not hasattr(span, 'set_attribute'): + return + + def _flatten_and_add(obj, current_prefix=""): + """Recursively flatten nested objects and add as span attributes""" + if isinstance(obj, dict): + for key, value in obj.items(): + new_prefix = f"{current_prefix}.{key}" if current_prefix else key + if isinstance(value, (dict, list)): + # For complex nested objects, serialize to JSON string + try: + json_str = json.dumps(value) + # Truncate very long JSON strings + if len(json_str) > 1000: + json_str = json_str[:997] + "..." + span.set_attribute(new_prefix, json_str) + except (TypeError, ValueError): + # If JSON serialization fails, convert to string + str_value = str(value) + if len(str_value) > 1000: + str_value = str_value[:997] + "..." + span.set_attribute(new_prefix, str_value) + elif isinstance(value, (str, int, float, bool, type(None))): + # Handle primitive types directly + if value is None: + span.set_attribute(new_prefix, "null") + else: + str_value = str(value) + # Truncate very long strings + if len(str_value) > 1000: + str_value = str_value[:997] + "..." + span.set_attribute(new_prefix, str_value) + else: + # For other types, convert to string + str_value = str(value) + if len(str_value) > 1000: + str_value = str_value[:997] + "..." + span.set_attribute(new_prefix, str_value) + elif isinstance(obj, list): + # For lists, serialize to JSON string + try: + json_str = json.dumps(obj) + if len(json_str) > 1000: + json_str = json_str[:997] + "..." + span.set_attribute(current_prefix or "list", json_str) + except (TypeError, ValueError): + str_value = str(obj) + if len(str_value) > 1000: + str_value = str_value[:997] + "..." + span.set_attribute(current_prefix or "list", str_value) + else: + # For primitive types or other objects + if obj is None: + span.set_attribute(current_prefix or "value", "null") + else: + str_value = str(obj) + if len(str_value) > 1000: + str_value = str_value[:997] + "..." + span.set_attribute(current_prefix or "value", str_value) + + try: + _flatten_and_add(data, prefix) + except Exception as e: + + # Fallback: add as simple string + try: + fallback_value = str(data) + if len(fallback_value) > 1000: + fallback_value = fallback_value[:997] + "..." + span.set_attribute(prefix or "data", fallback_value) + except Exception as fallback_error: + raise + + + def _end_span_safely(self, span, output=None, level="INFO", status_message=None, end_time=None, metadata=None): + """End a span safely with additional attributes using OpenTelemetry""" + try: + if not span: + return + + # Add output data if provided + if output and hasattr(span, 'set_attribute'): + self._add_attributes_to_span(span, output, "output") + + # Add additional metadata if provided + if metadata and hasattr(span, 'set_attribute'): + self._add_attributes_to_span(span, metadata, "") + + # Set span status based on level + if hasattr(span, 'set_status'): + if level == "ERROR": + error_message = status_message or "An error occurred" + span.set_status(trace.Status(trace.StatusCode.ERROR, error_message)) + if hasattr(span, 'add_event'): + span.add_event("error", {"message": error_message}) + else: + span.set_status(trace.Status(trace.StatusCode.OK)) + + # Add end time event + if hasattr(span, 'add_event'): + span.add_event("span_ended") + + # End the span + span.end() + + except Exception as e: + raise + def _initialize_client(self): """Initialize the Bedrock client.""" config = Config( @@ -101,13 +364,83 @@ async def send_raw_event(self, event_data): return event_json = json.dumps(event_data) - #if "audioInput" not in event_data["event"]: - # print(event_json) + + event_span = None + + if "event" in event_data: + event_type = list(event_data["event"].keys())[0] + + # Create event-specific spans as children of session span + if event_type == "sessionStart": + debug_print("Creating sessionStart span") + event_span = self._create_child_span( + "sessionStart", + parent_span=self.session_span, + input=event_data["event"]["sessionStart"], + metadata={ + "session_id": self.session_id, + + } + ) + + + elif event_type == "sessionEnd": + debug_print("Creating sessionEnd span") + event_span = self._create_child_span( + "sessionEnd", + parent_span=self.session_span, + input=event_data["event"]["sessionEnd"], + metadata={ + "session_id": self.session_id + } + ) + + + elif event_type == "promptStart": + debug_print + event_span = self._create_child_span( + "promptStart", + parent_span=self.session_span, + input=event_data["event"]["promptStart"], + metadata={ + "session_id": self.session_id, + "prompt_name": event_data["event"]["promptStart"].get("promptName"), + "content_name": event_data["event"]["promptStart"].get("contentName"), + # "audio_output_configuration": event_data["event"]["promptStart"].get("audioOutputConfiguration"), + # "tool_configuration": event_data["event"]["promptStart"].get("toolConfiguration"), + } + ) + + + elif event_type == "textInput": + debug_print("Creating textInput span") + text_input_data = event_data["event"]["textInput"] + if text_input_data.get("content"): + event_span = self._create_child_span( + "systemPrompt", + parent_span= self.session_span, + input=text_input_data.get("content"), + metadata={ + "session_id": self.session_id, + "prompt_name": text_input_data.get("promptName"), + "content_name": text_input_data.get("contentName"), + } + ) + + + event = InvokeModelWithBidirectionalStreamInputChunk( value=BidirectionalInputPayloadPart(bytes_=event_json.encode('utf-8')) ) await self.stream.input_stream.send(event) + # Update event span with success + if event_span: + debug_print(f"Ending event span for {event_type}") + self._end_span_safely(event_span, + output={"status": "sent", "event_type": event_type if "event" in event_data else "unknown"} + ) + # Close session if "sessionEnd" in event_data["event"]: self.close() @@ -119,6 +452,7 @@ async def _process_audio_input(self): """Process audio input from the queue and send to Bedrock.""" while self.is_active: try: + debug_print("Waiting for audio data...") # Get audio data from the queue data = await self.audio_input_queue.get() @@ -134,6 +468,8 @@ async def _process_audio_input(self): # Create the audio input event audio_event = S2sEvent.audio_input(prompt_name, content_name, audio_bytes.decode('utf-8') if isinstance(audio_bytes, bytes) else audio_bytes) + debug_print(f"Sending audio chunk for prompt: {prompt_name}, content: {content_name}") + # Send the event await self.send_raw_event(audio_event) @@ -170,8 +506,101 @@ async def _process_responses(self): event_name = None if 'event' in json_data: event_name = list(json_data["event"].keys())[0] - # if event_name == "audioOutput": - # print(json_data) + + # Handle usage events + if event_name == 'usageEvent': + # logger.info("Received usage event") + # Store the usage event + event_data = json_data['event']['usageEvent'] + self.usage_events.append(event_data) + + # Update token usage aggregates + if 'totalInputTokens' in event_data: + self.token_usage['totalInputTokens'] = event_data.get('totalInputTokens', 0) + if 'totalOutputTokens' in event_data: + self.token_usage['totalOutputTokens'] = event_data.get('totalOutputTokens', 0) + if 'totalTokens' in event_data: + self.token_usage['totalTokens'] = event_data.get('totalTokens', 0) + + # Update detailed token usage if available + if 'details' in event_data: + details = event_data.get('details', {}) + if 'delta' in details: + delta = details.get('delta', {}) + # Update input tokens + if 'input' in delta: + input_delta = delta.get('input', {}) + self.token_usage['details']['input']['speechTokens'] += input_delta.get('speechTokens', 0) + self.token_usage['details']['input']['textTokens'] += input_delta.get('textTokens', 0) + # Update output tokens + if 'output' in delta: + output_delta = delta.get('output', {}) + self.token_usage['details']['output']['speechTokens'] += output_delta.get('speechTokens', 0) + self.token_usage['details']['output']['textTokens'] += output_delta.get('textTokens', 0) + + # If total values are provided, use those instead + if 'total' in details: + total = details.get('total', {}) + if 'input' in total: + input_total = total.get('input', {}) + self.token_usage['details']['input']['speechTokens'] = input_total.get('speechTokens', + self.token_usage['details']['input']['speechTokens']) + self.token_usage['details']['input']['textTokens'] = input_total.get('textTokens', + self.token_usage['details']['input']['textTokens']) + if 'output' in total: + output_total = total.get('output', {}) + self.token_usage['details']['output']['speechTokens'] = output_total.get('speechTokens', + self.token_usage['details']['output']['speechTokens']) + self.token_usage['details']['output']['textTokens'] = output_total.get('textTokens', + self.token_usage['details']['output']['textTokens']) + + + if self.session_span: + cost = self._calculate_cost(self.token_usage['details']) + + if hasattr(self.session_span, 'set_attribute'): + self.session_span.set_attribute("input_tokens", self.token_usage['totalInputTokens']) + self.session_span.set_attribute("output_tokens", self.token_usage['totalOutputTokens']) + self.session_span.set_attribute("total_tokens", self.token_usage['totalTokens']) + self.session_span.set_attribute("cost", cost) + self.session_span.set_attribute("currency", "USD") + # Add an event for token usage update + self.session_span.add_event("token_usage_updated", { + "input_tokens": self.token_usage['totalInputTokens'], + "output_tokens": self.token_usage['totalOutputTokens'], + "total_tokens": self.token_usage['totalTokens'], + "cost": cost + }) + + if event_name == 'textOutput': + prompt_name = json_data['event'].get('textOutput', {}).get("promptName") + content = json_data['event'].get('textOutput', {}).get("content") + content_id = json_data['event'].get('textOutput', {}).get("contentId") + role = json_data['event'].get('textOutput', {}).get("role", "ASSISTANT") + #lowercase the role and append "user" with "Input" and "assistant" with "Output" + if role == "USER": + messageType = "userInput" + elif role == "ASSISTANT": + messageType = "assistantOutput" + + # Only create a span if this is a FINAL generation (not SPECULATIVE) + generation_stage = self.content_stages.get(content_id, "FINAL") + + if generation_stage == "FINAL": + debug_print(f"Creating {messageType} span for textOutput") + response_span = self._create_child_span( + messageType, + parent_span=self.session_span, + metadata={ + "session_id": self.session_id, + "prompt_name": prompt_name, + "generation_stage": "FINAL"}, + output={"content": content} + ) + + self._end_span_safely(response_span) + + # Handle tool use detection if event_name == 'toolUse': @@ -234,6 +663,24 @@ async def processToolUse(self, toolName, toolUseContent): toolName = toolName.lower() content, result = None, None try: + + # Call the tool function with unpacked parameters + tool_start_time = time.time_ns() + + # Create tool use span as a child of the current prompt or session span + response_span = self._create_child_span( + "toolUse", + parent_span=self.session_span, + input={ + "toolName": toolName, + "params": toolUseContent.get("content") + }, + metadata={ + "session_id": self.session_id, + "tool_start_time": tool_start_time, + } + ) + if toolUseContent.get("content"): # Parse the JSON string in the content field query_json = json.loads(toolUseContent.get("content")) @@ -287,7 +734,16 @@ async def processToolUse(self, toolName, toolUseContent): if not result: result = "no result found" - + + tool_end_time = time.time_ns() + tool_run_time = tool_end_time - tool_start_time + + self._end_span_safely(response_span, + output={"result": result}, + end_time=tool_end_time, + metadata={"tool_run_time": tool_run_time, "tool_start_time": tool_start_time, "tool_end_time": tool_end_time}, + ) + return {"result": result} except Exception as ex: print(ex) @@ -309,4 +765,3 @@ async def close(self): await self.response_task except asyncio.CancelledError: pass - \ No newline at end of file diff --git a/speech-to-speech/workshops/python-server/server.py b/speech-to-speech/workshops/python-server/server.py index fd0487a7..17805ec2 100644 --- a/speech-to-speech/workshops/python-server/server.py +++ b/speech-to-speech/workshops/python-server/server.py @@ -22,6 +22,11 @@ DEBUG = False +# load environment variables from .env file +from dotenv import load_dotenv +import os +load_dotenv() + def debug_print(message): """Print only if debug mode is enabled""" if DEBUG: @@ -188,7 +193,7 @@ async def forward_responses(websocket, stream_manager): stream_manager.close() -async def main(host, port, health_port, enable_mcp=False, enable_strands_agent=False): +async def main(host, port, health_port, enable_mcp=False, enable_strands=False): if health_port: try: @@ -207,7 +212,7 @@ async def main(host, port, health_port, enable_mcp=False, enable_strands_agent=F print("Failed to start MCP client",ex) # Init Strands Agent - if enable_strands_agent: + if enable_strands: print("Strands agent enabled") try: global STRANDS_AGENT @@ -262,4 +267,4 @@ async def main(host, port, health_port, enable_mcp=False, enable_strands_agent=F traceback.print_exc() finally: if MCP_CLIENT: - MCP_CLIENT.cleanup() \ No newline at end of file + MCP_CLIENT.cleanup() diff --git a/speech-to-speech/workshops/python-server/setup-for-ec2-lab.sh b/speech-to-speech/workshops/python-server/setup-for-ec2-lab.sh index b84fdd5a..b0816f84 100755 --- a/speech-to-speech/workshops/python-server/setup-for-ec2-lab.sh +++ b/speech-to-speech/workshops/python-server/setup-for-ec2-lab.sh @@ -13,6 +13,9 @@ pip install boto3 --upgrade curl -LsSf https://astral.sh/uv/install.sh | sh +# Set Bedrock Agents Lambda Arn to env varaible +export BOOKING_LAMBDA_ARN=$(aws cloudformation describe-stacks --stack-name bedrock-agents --query "Stacks[0].Outputs[?OutputKey=='BookingLambdaArn'].OutputValue" --output text) + # Set websocket server host and port export HOST="0.0.0.0" export WS_PORT=8081 \ No newline at end of file diff --git a/speech-to-speech/workshops/react-client/package.json b/speech-to-speech/workshops/react-client/package.json index 08eb42c7..87e70974 100644 --- a/speech-to-speech/workshops/react-client/package.json +++ b/speech-to-speech/workshops/react-client/package.json @@ -1,5 +1,5 @@ { - "homepage": "/proxy/3000/", + "homepage": ".", "name": "nova-s2s-voice-chat-client", "version": "0.1.0", "private": true,