- Status:
{'Active' if call_sid == current_call_sid else 'Inactive'}
+
+
Live Audio
+
+ Status: {'Active' if call_sid == current_call_sid else 'Inactive'}
+
-
-
-
-
-
Inject Prompt
-
-
-
@@ -376,17 +361,38 @@ async def preview_current_call(request: Request):
# WebSocket endpoint for streaming call audio and logs
@app.websocket(f"/{CALLS_ENDPOINT}/{CURRENT_CALL_ENDPOINT}/ws")
async def preview_websocket(websocket: WebSocket):
- await websocket.accept()
-
global active_cm
- if not active_cm or current_call_sid != call_sid:
+ if not active_cm:
await websocket.close(code=1000, reason="Call not active")
return
+ await websocket.accept()
+
await active_cm.add_observer(websocket)
try:
while True:
- await websocket.receive_text()
+ receive_task = asyncio.create_task(websocket.receive_text())
+ call_ended_task = asyncio.create_task(active_cm.call_ended_event.wait())
+
+ done, pending = await asyncio.wait(
+ [receive_task, call_ended_task],
+ return_when=asyncio.FIRST_COMPLETED
+ )
+
+ if call_ended_task in done:
+ receive_task.cancel()
+ print("Call ended, closing preview websocket")
+ await websocket.close(code=1000, reason="Call ended")
+ break
+
+ if receive_task in done:
+ try:
+ message = receive_task.result()
+ print(f"Received prompt via websocket: {message}")
+ await active_cm.send_text_prompt(message)
+ except Exception as e:
+ print(f"Error processing message: {e}")
+
except WebSocketDisconnect:
pass
except Exception as e:
@@ -531,6 +537,24 @@ async def websocket_handler(websocket: WebSocket):
conversation_manager.start_recording(recording_path)
conversation_manager.initial_prompt_sent.set()
+ # Send Webhook that call started
+ if WEBHOOK_NOTIFICATION_URL:
+ try:
+ call_details = services.twilio.twilio_client.calls(call_sid).fetch()
+ call_info = {k: v for k, v in call_details.__dict__.items() if not k.startswith('_')}
+ payload = {
+ "event": "started",
+ "call_sid": call_sid,
+ "timestamp": datetime.datetime.now().isoformat(),
+ "twilio_call_data": call_info
+ }
+ # We use fire-and-forget logic or just await with short timeout
+ requests.post(WEBHOOK_NOTIFICATION_URL, json=payload, timeout=2)
+ print(f"Sent start webhook for {call_sid}")
+ except Exception as e:
+ print(f"Failed to send start webhook: {e}")
+
+
elif data['event'] == 'media':
if not conversation_manager:
continue
@@ -632,7 +656,7 @@ def process_call(call_sid: str, call_dict: dict, summarized_text: Optional[str]
utils.cleanup_calls(CALLS_DIR, KEEP_CALLS)
- webhook_url = os.environ.get("WEBHOOK_TARGET_URL")
+ webhook_url = WEBHOOK_NOTIFICATION_URL
if webhook_url:
with open(metadata_path, 'r') as f:
payload = json.load(f)
diff --git a/docker-compose.yaml b/docker-compose.yaml
index 7c17636..3864d99 100644
--- a/docker-compose.yaml
+++ b/docker-compose.yaml
@@ -10,5 +10,5 @@ services:
environment:
- PYTHONUNBUFFERED=1
volumes:
- - ./calls:/app/calls
+ - ./user_data:/app/user_data
restart: unless-stopped
diff --git a/main.py b/main.py
index dd7858e..7660b81 100644
--- a/main.py
+++ b/main.py
@@ -1,39 +1,36 @@
import os
import uvicorn
from pyngrok import ngrok
+
import services
-from app import app, SERVER_PUBLIC_URL
+import app
# Configuration
SERVICE_PORT = 8080
def run_app():
"""Starts the ngrok tunnel and the FastAPI app to handle incoming calls."""
- global SERVER_PUBLIC_URL
-
- ngrok_authtoken = os.environ.get("NGROK_AUTHTOKEN")
- if ngrok_authtoken:
- ngrok.set_auth_token(ngrok_authtoken)
-
- # Start ngrok tunnel
- tunnel = ngrok.connect(SERVICE_PORT)
- public_url = tunnel.public_url
- SERVER_PUBLIC_URL = public_url
-
- # We also need to update this validity in app.py if it uses it directly.
- # But app.py imports SERVER_PUBLIC_URL? No, variables are not shared like that.
- # We should set it on app module.
- import app as app_module
- app_module.SERVER_PUBLIC_URL = public_url
-
- print(f"Ngrok tunnel is active at: {public_url}")
+
+ public_url = None
+
+ if os.environ.get("SERVER_PUBLIC_URL"):
+ public_url = os.environ.get("SERVER_PUBLIC_URL")
+ print(f"Using manual public URL: {public_url}")
+
+ else:
+ ngrok.set_auth_token(os.environ.get("NGROK_AUTHTOKEN"))
+ tunnel = ngrok.connect(SERVICE_PORT)
+ public_url = tunnel.public_url
+ print(f"Ngrok tunnel is active at: {public_url}")
+
+ app.SERVER_PUBLIC_URL = public_url
# Update the webhook for incoming calls
services.twilio.update_twilio_webhook(public_url)
try:
# Run the FastAPI app
- uvicorn.run(app, host="0.0.0.0", port=SERVICE_PORT)
+ uvicorn.run(app.app, host="0.0.0.0", port=SERVICE_PORT)
finally:
print("Shutting down ngrok tunnel.")
diff --git a/services/gemini.py b/services/gemini.py
index fe27b69..0de9d2e 100644
--- a/services/gemini.py
+++ b/services/gemini.py
@@ -34,6 +34,7 @@ def __init__(self, websocket, call_state, initial_prompt="Hello! How can I help
self.is_speaking = False
self.observers = []
self.transcription_history = []
+ self.call_ended_event = asyncio.Event()
async def send_audio_to_twilio(self, pcm_24k_audio):
"""Transcodes and sends audio data to Twilio."""
@@ -126,7 +127,7 @@ async def run(self):
"speech_config": {
"voice_config": {
"prebuilt_voice_config": {
- "voice_name": "Puck"
+ "voice_name": os.environ.get("GEMINI_ASSISTANT_VOICE")
}
}
}
@@ -208,6 +209,8 @@ async def receiver():
[sender_task, receiver_task],
return_when=asyncio.FIRST_COMPLETED
)
+
+ self.call_ended_event.set()
# Cancel pending tasks
for task in pending:
@@ -220,6 +223,7 @@ def gemini_summarize_call(transcription_history):
return "No transcription available."
transcript_text = "\n".join([f"{entry['role'].upper()}: {entry['text']}" for entry in transcription_history])
+ print(f"Transcript: {transcript_text}")
client = genai.Client()
response = client.models.generate_content(
diff --git a/services/twilio.py b/services/twilio.py
index cd8cd46..ff3a1ac 100644
--- a/services/twilio.py
+++ b/services/twilio.py
@@ -1,6 +1,8 @@
import os
from twilio.rest import Client
+TWILIO_ENDPOINT = "twilio"
+
twilio_client = Client(os.environ.get("TWILIO_ACCOUNT_SID"), os.environ.get("TWILIO_AUTH_TOKEN"))
def update_twilio_webhook(public_url):
@@ -18,7 +20,7 @@ def update_twilio_webhook(public_url):
return
number_sid = incoming_phone_numbers[0].sid
- twilio_client.incoming_phone_numbers(number_sid).update(voice_url=f"{public_url}/twilio/voice")
+ twilio_client.incoming_phone_numbers(number_sid).update(voice_url=f"{public_url}/{TWILIO_ENDPOINT}/voice")
print(f"Successfully updated webhook for {twilio_phone_number}.")
except Exception as e:
print(f"Error updating Twilio webhook: {e}")
@@ -35,8 +37,8 @@ def make_outbound_call(public_url, to_number) -> str | None:
call = twilio_client.calls.create(
to=to_number,
from_=twilio_phone_number,
- url=f"{public_url}/twilio/voice",
- status_callback=f"{public_url}/twilio/status_callback",
+ url=f"{public_url}/{TWILIO_ENDPOINT}/voice",
+ status_callback=f"{public_url}/{TWILIO_ENDPOINT}/status_callback",
status_callback_method='POST',
machine_detection='Enable'
)