diff --git a/.env.example b/.env.example
index 34492b7..6ebdd06 100644
--- a/.env.example
+++ b/.env.example
@@ -1,19 +1,23 @@
# API Keys
-TWILIO_ACCOUNT_SID="..."
-TWILIO_AUTH_TOKEN="..."
-NGROK_AUTHTOKEN="..."
-GOOGLE_API_KEY="..."
+TWILIO_ACCOUNT_SID=""
+TWILIO_AUTH_TOKEN=""
+NGROK_AUTHTOKEN=""
+GOOGLE_API_KEY=""
# Twilio Configuration
TWILIO_PHONE_NUMBER="+1234567890"
# Gemini Configuration
-GEMINI_ASSISTANT_MODEL="gemini-2.5-flash-native-audio-preview-09-2025"
-GEMINI_SUMMARIZATION_MODEL="gemini-2.5-flash"
+GEMINI_ASSISTANT_MODEL="gemini-2.5-flash-native-audio-preview-12-2025"
+GEMINI_ASSISTANT_VOICE="Sulafat"
+GEMINI_SUMMARIZATION_MODEL="gemini-2.5-flash-lite"
# AutoPhone Configuration
SERVICE_PORT=8080
-WEBHOOK_TARGET_URL="https://webhook-target.example.com"
ASSISTANT_LANGUAGE="English"
ASSISTANT_OWNER_NAME="John"
-KEEP_CALL_RECORDINGS=3
+KEEP_CALLS=5
+SUMMARIZE_CALLS=true
+#WEBHOOK_NOTIFICATION_URL="https://webhook-target.example.com"
+#APP_PASSWORD="your_password" # Optional: Uncomment to enable password protection
+#SERVER_PUBLIC_URL="" # Optional: Manually override public URL (skips Ngrok)
diff --git a/.gitignore b/.gitignore
index cb06845..ebc9550 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,3 +1,3 @@
__pycache__/
-calls/
+user_data/
.env
diff --git a/CREDITS.md b/CREDITS.md
new file mode 100644
index 0000000..7abda6b
--- /dev/null
+++ b/CREDITS.md
@@ -0,0 +1,3 @@
+# Credits
+
+- [WebSocket extension for MIT App Inventor](https://sumitkmr.com/projects/doc/web-socket/)
diff --git a/README.md b/README.md
index 751fcdb..d5a4653 100644
--- a/README.md
+++ b/README.md
@@ -1,58 +1,50 @@
# AutoPhone
-A voice assistant application powered by Gemini and Twilio, capable of handling inbound and outbound calls with real-time AI interaction.
+A voice assistant application powered by Gemini and Twilio, capable of handling phone calls with real-time AI interaction.
## Features
- **Real-time Voice AI**: Uses Gemini Live API for voice conversations.
- **Twilio Integration**: Handles phone calls via Twilio Voice.
-- **Call Summarization**: automatically summarizes calls after they end.
+- **Call Summarization**: Automatically summarizes calls after they end.
+- **Live call preview**: Preview the call in real-time.
+- **GUI App**: Graphical user interface for managing assistant.
## Prerequisites
-- Docker (and Docker Compose)
-- Twilio Account (with phone number used for voice assistant)
+- Docker (with Docker Compose)
+- Twilio Account (with dedicated phone number used for AutoPhone)
- Ngrok Account
- Gemini API Key
-## Configuration
+# Configuration
### Get all required API keys.
- [Gemini API key](https://aistudio.google.com/api-keys)
-- [Twilio Account SID and API key](https://www.twilio.com/docs/usage/requests-to-twilio)
+- [Twilio Account SID and API key](https://console.twilio.com)
- [Ngrok API key](https://dashboard.ngrok.com/get-started/your-authtoken)
### Copy the example environment file:
- ```
- cp .env.example .env
- ```
+
+```bash
+cp .env.example .env
+```
Edit `.env` to provide API keys and settings.
-## Running with Docker
+# Usage
-To start the application:
+## Running with Docker
```
sudo docker compose up --build
```
+Public URL will be available with Ngrok tunnel.
+
## API Endpoints
-- `POST /call`: Initiate an outbound call.
-```bash
-curl -X POST "http://0.0.0.0:8080/call" \
- -H "Content-Type: application/json" \
- -d '{
- "to_number": "+1234567890",
- "prompt": "Ask how life is going"
- }'
-```
-- After every call, a call summary is sent to the configured webhook.
-```json
-{
- "call": {
- "call metadata here"
- },
- "summarized_text": "AI summarization of call"
-}
-```
+Go to `/docs` to view API documentation.
+
+## GUI
+
+App is available [here](https://gallery.appinventor.mit.edu/?galleryid=468a172f-b469-43d4-b359-93a40cb70d4a).
diff --git a/app.py b/app.py
new file mode 100644
index 0000000..9d80a00
--- /dev/null
+++ b/app.py
@@ -0,0 +1,720 @@
+import asyncio
+import base64
+import json
+import requests
+import os
+import shutil
+import datetime
+from typing import Dict, Optional, List
+from contextlib import asynccontextmanager
+from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Request, Depends, HTTPException, status, BackgroundTasks
+from fastapi.responses import Response, HTMLResponse, FileResponse, JSONResponse
+from pydantic import BaseModel
+from twilio.twiml.voice_response import VoiceResponse, Connect
+import utils
+
+import services
+
+# Configuration
+USER_DATA_DIR = "user_data"
+CALLS_DIR = os.path.join(USER_DATA_DIR, "calls")
+KEEP_CALLS = int(os.environ.get("KEEP_CALLS"))
+SUMMARIZE_CALLS = os.environ.get("SUMMARIZE_CALLS", "true").lower() == "true"
+RECORDING_FILENAME = "recording.wav"
+METADATA_FILENAME = "call.json"
+APP_PASSWORD = os.environ.get("APP_PASSWORD")
+WEBHOOK_NOTIFICATION_URL = os.environ.get("WEBHOOK_NOTIFICATION_URL")
+
+# Endpoints
+CALLS_ENDPOINT = "calls"
+TWILIO_ENDPOINT = "twilio"
+CURRENT_CALL_ENDPOINT = "current"
+QUEUE_ENDPOINT = "queue"
+
+# In-memory store
+call_data_store: Dict[str, Dict] = {}
+current_call_sid: Optional[str] = None
+active_cm: Optional[services.gemini.GeminiConversationManager] = None
+scheduled_calls: Dict[int, asyncio.Task] = {}
+scheduled_calls_data: Dict[int, Dict] = {}
+next_queue_id = 1
+SERVER_PUBLIC_URL = None
+
+# --- FastAPI App & Lifespan ---
+
+@asynccontextmanager
+async def lifespan(app: FastAPI):
+ yield
+ for task in scheduled_calls.values():
+ task.cancel()
+
+app = FastAPI(lifespan=lifespan)
+
+@app.middleware("http")
+async def verify_password_middleware(request: Request, call_next):
+ # Only protect endpoints starting with /calls
+ # Exclude websocket if needed? WebSockets are handled by @app.websocket, middleware applies to HTTP mostly but let's be careful.
+ # Actually, FastAPI middleware applies to everything HTTP. HTTP exceptions don't work well in Websockets handshake sometimes, but let's assume HTTP endpoints for now.
+
+ if APP_PASSWORD and request.url.path.startswith(f"/{CALLS_ENDPOINT}"):
+ # Check query param 'pass'
+ password = request.query_params.get("pass")
+ if password != APP_PASSWORD:
+ return JSONResponse(
+ status_code=status.HTTP_401_UNAUTHORIZED,
+ content={"detail": "Incorrect password"}
+ )
+
+ response = await call_next(request)
+ return response
+
+
+
+# ==========================================
+# Group 1: Queue Management
+# ==========================================
+
+async def schedule_call(queue_id: int, delay_seconds: float, call_data: dict):
+ try:
+ print(f"Call {queue_id} scheduled in {delay_seconds} seconds.")
+ await asyncio.sleep(delay_seconds)
+
+ print(f"Executing queued call {queue_id}")
+ if SERVER_PUBLIC_URL:
+ sid = services.twilio.make_outbound_call(SERVER_PUBLIC_URL, call_data["to_number"])
+ if sid:
+ full_prompt = services.gemini.GEMINI_PROMPTS["outbound_init"] + call_data["prompt"]
+ call_data_store[sid] = {"prompt": full_prompt}
+ print(f"Initiated queued call {queue_id}, SID: {sid}")
+ else:
+ print(f"Failed to initiate queued call {queue_id}")
+ else:
+ print("Server public URL not set, skipping queued call execution.")
+
+ except asyncio.CancelledError:
+ print(f"Scheduled call {queue_id} cancelled.")
+ finally:
+ if queue_id in scheduled_calls:
+ del scheduled_calls[queue_id]
+ if queue_id in scheduled_calls_data:
+ del scheduled_calls_data[queue_id]
+
+# List all queued calls IDs
+@app.get(f"/{CALLS_ENDPOINT}/{QUEUE_ENDPOINT}")
+async def get_queue_ids():
+ return list(scheduled_calls_data.keys())
+
+# Retrieve details of a specific queued call
+@app.get(f"/{CALLS_ENDPOINT}/{QUEUE_ENDPOINT}/{{queue_id}}")
+async def get_queue_item(queue_id: int):
+ item = scheduled_calls_data.get(queue_id)
+ if item:
+ return item
+ return Response(content=json.dumps({"error": "Item not found"}), status_code=404, media_type="application/json")
+
+# Remove a call from the queue
+@app.delete(f"/{CALLS_ENDPOINT}/{QUEUE_ENDPOINT}/{{queue_id}}")
+async def delete_queue_item(queue_id: int):
+ if queue_id in scheduled_calls:
+ scheduled_calls[queue_id].cancel()
+ if queue_id in scheduled_calls_data:
+ del scheduled_calls_data[queue_id]
+ if queue_id in scheduled_calls:
+ del scheduled_calls[queue_id]
+ return {"status": "deleted", "id": queue_id}
+ return Response(content=json.dumps({"error": "Item not found"}), status_code=404, media_type="application/json")
+
+
+# ==========================================
+# Group 2: Outbound Call Management
+# ==========================================
+
+class CallRequest(BaseModel):
+ to_number: str
+ prompt: str
+ datetime: Optional[str] = None
+
+async def handle_scheduled_call(call_request: CallRequest):
+ global next_queue_id, scheduled_calls, scheduled_calls_data
+ try:
+ scheduled_time = datetime.datetime.strptime(call_request.datetime, "%Y-%m-%d-%H-%M")
+ now = datetime.datetime.now()
+ delay = (scheduled_time - now).total_seconds()
+
+ if delay < 0:
+ return Response(content=json.dumps({"error": "Scheduled time is in the past"}), status_code=400, media_type="application/json")
+
+ q_id = next_queue_id
+ next_queue_id += 1
+
+ call_data = {
+ "to_number": call_request.to_number,
+ "prompt": call_request.prompt,
+ "datetime": call_request.datetime
+ }
+
+ task = asyncio.create_task(schedule_call(q_id, delay, call_data))
+ scheduled_calls[q_id] = task
+ scheduled_calls_data[q_id] = call_data
+
+ return {"status": "queued", "queue_id": q_id}
+
+ except ValueError:
+ return Response(content=json.dumps({"error": "Invalid datetime format. Use YYYY-MM-DD-HH-MM"}), status_code=400, media_type="application/json")
+
+async def handle_immediate_call(call_request: CallRequest):
+ global SERVER_PUBLIC_URL, call_data_store
+
+ public_url = SERVER_PUBLIC_URL
+ print("Using Public URL for callback: ", public_url)
+ call_sid = services.twilio.make_outbound_call(public_url, call_request.to_number)
+
+ if call_sid:
+ call_data_store[call_sid] = {"prompt": services.gemini.GEMINI_PROMPTS["outbound_init"] + call_request.prompt}
+ return {"status": "success", "call_sid": call_sid}
+ else:
+ return Response(content=json.dumps({"status": "error", "message": "Failed to initiate call."}), status_code=500, media_type="application/json")
+
+# Initiate a new outbound call
+@app.post(f"/{CALLS_ENDPOINT}/make")
+async def make_outbound_call_handler(request: Request, call_request: CallRequest):
+ if call_request.datetime:
+ return await handle_scheduled_call(call_request)
+ return await handle_immediate_call(call_request)
+
+
+# ==========================================
+# Group 3: Active Call Control
+# ==========================================
+
+# Helper to get active call sid
+def get_current_call_sid():
+ global current_call_sid
+ return current_call_sid
+
+async def send_prompt_to_call(call_sid: str, payload: dict):
+ prompt = payload.get("prompt")
+ if not prompt:
+ return Response(content=json.dumps({"error": "Prompt is required"}), status_code=400, media_type="application/json")
+
+ global active_cm
+ if active_cm and current_call_sid == call_sid:
+ await active_cm.send_text_prompt(prompt)
+ return {"status": "sent", "prompt": prompt}
+
+ return Response(content=json.dumps({"error": "Active call not found"}), status_code=404, media_type="application/json")
+
+
+
+# Get current call info
+@app.get(f"/{CALLS_ENDPOINT}/{CURRENT_CALL_ENDPOINT}/info")
+async def get_current_call_info():
+ global active_cm
+ sid = get_current_call_sid()
+ return {
+ "active": active_cm is not None,
+ "call_sid": sid
+ }
+
+
+# Serve the audio-only preview page for the currently active call
+@app.get(f"/{CALLS_ENDPOINT}/{CURRENT_CALL_ENDPOINT}/audio")
+async def audio_preview_current_call(request: Request):
+ call_sid = get_current_call_sid()
+ if not call_sid:
+ return HTMLResponse("
No active call to preview
")
+
+ scheme = "wss" if request.url.scheme == "https" else "ws"
+ ws_url = f"{scheme}://{request.url.netloc}/{CALLS_ENDPOINT}/{CURRENT_CALL_ENDPOINT}/ws"
+
+ html_content = f"""
+
+
+ Audio Preview: {call_sid}
+
+
+
+
+
Live Audio
+
+ Status: {'Active' if call_sid == current_call_sid else 'Inactive'}
+
+
+
+
+
+
+
+ """
+ return HTMLResponse(content=html_content)
+
+# WebSocket endpoint for streaming call audio and logs
+@app.websocket(f"/{CALLS_ENDPOINT}/{CURRENT_CALL_ENDPOINT}/ws")
+async def preview_websocket(websocket: WebSocket):
+ global active_cm
+ if not active_cm:
+ await websocket.close(code=1000, reason="Call not active")
+ return
+
+ await websocket.accept()
+
+ await active_cm.add_observer(websocket)
+ try:
+ while True:
+ receive_task = asyncio.create_task(websocket.receive_text())
+ call_ended_task = asyncio.create_task(active_cm.call_ended_event.wait())
+
+ done, pending = await asyncio.wait(
+ [receive_task, call_ended_task],
+ return_when=asyncio.FIRST_COMPLETED
+ )
+
+ if call_ended_task in done:
+ receive_task.cancel()
+ print("Call ended, closing preview websocket")
+ await websocket.close(code=1000, reason="Call ended")
+ break
+
+ if receive_task in done:
+ try:
+ message = receive_task.result()
+ print(f"Received prompt via websocket: {message}")
+ await active_cm.send_text_prompt(message)
+ except Exception as e:
+ print(f"Error processing message: {e}")
+
+ except WebSocketDisconnect:
+ pass
+ except Exception as e:
+ print(f"Preview websocket error: {e}")
+ finally:
+ if active_cm:
+ active_cm.remove_observer(websocket)
+
+# ==========================================
+# Group 4: Twilio Integration (Webhooks & Streams)
+# ==========================================
+
+# Twilio webhook for call status updates
+@app.post(f"/{TWILIO_ENDPOINT}/status_callback")
+async def status_callback_handler(request: Request):
+ global current_call_sid, active_cm
+ form_data = await request.form()
+ call_sid = form_data.get("CallSid")
+ call_status = form_data.get("CallStatus")
+ answered_by = form_data.get("AnsweredBy")
+
+ print(f"Call {call_sid} status: {call_status}, AnsweredBy: {answered_by}")
+
+ if call_status in ["busy", "no-answer", "failed", "canceled"] or (answered_by and answered_by.startswith("machine")):
+ if call_sid in call_data_store:
+ del call_data_store[call_sid]
+
+ if call_sid == current_call_sid:
+ current_call_sid = None
+ active_cm = None
+
+ try:
+ call = services.twilio.twilio_client.calls(call_sid).fetch()
+ call_dict = {k: v for k, v in call.__dict__.items() if not k.startswith('_')}
+ process_call(call_sid, call_dict, "None", [])
+ except Exception as e:
+ print(f"Error in status callback processing: {e}")
+ else:
+ print(f"Call {call_sid} status '{call_status}' is not considered a failure. Webhook not sent.")
+
+ return Response(content="OK", status_code=200)
+
+# Twilio webhook for incoming voice calls
+@app.post(f"/{TWILIO_ENDPOINT}/voice")
+async def voice_handler(request: Request):
+ response = VoiceResponse()
+ form_data = await request.form()
+ answered_by = form_data.get("AnsweredBy")
+ forwarded_from = form_data.get("ForwardedFrom")
+ print(f"Incoming call answered by: {answered_by}, ForwardedFrom: {forwarded_from}")
+
+ if answered_by and answered_by.startswith("machine"):
+ print("Answering machine detected. Hanging up.")
+ response.hangup()
+ return Response(content=str(response), media_type="application/xml")
+
+ if forwarded_from:
+ print(f"Call forwarded from {forwarded_from}. Rejecting potential voicemail loop.")
+ response.hangup()
+ return Response(content=str(response), media_type="application/xml")
+
+ connect = Connect()
+ websocket_url = f"wss://{request.headers['host']}/{TWILIO_ENDPOINT}/ws"
+ connect.stream(url=websocket_url)
+ response.append(connect)
+ return Response(content=str(response), media_type="application/xml")
+
+# Main WebSocket handler for Twilio Media Streams
+@app.websocket(f"/{TWILIO_ENDPOINT}/ws")
+async def websocket_handler(websocket: WebSocket):
+ await websocket.accept()
+ print("WebSocket connection established.")
+
+
+ global current_call_sid, active_cm
+
+ call_state = {"stream_sid": None}
+ conversation_manager = None
+ gemini_task = None
+ call_sid = None
+
+ try:
+ while True:
+ receive_task = asyncio.create_task(websocket.receive_text())
+ wait_tasks = [receive_task]
+ if gemini_task:
+ wait_tasks.append(gemini_task)
+
+ done, pending = await asyncio.wait(wait_tasks, return_when=asyncio.FIRST_COMPLETED)
+
+ if gemini_task and gemini_task in done:
+ print("Gemini assistant ended the conversation.")
+ if not receive_task.done():
+ receive_task.cancel()
+ break
+
+ try:
+ message = receive_task.result()
+ except WebSocketDisconnect:
+ print("WebSocket disconnected.")
+ break
+ except asyncio.CancelledError:
+ break
+
+ data = json.loads(message)
+
+ if data['event'] == 'start':
+ print(data)
+ call_state['stream_sid'] = data['start']['streamSid']
+ call_sid = data['start']['callSid']
+ os.makedirs(os.path.join(CALLS_DIR, call_sid), exist_ok=True)
+ recording_path = os.path.join(CALLS_DIR, call_sid, RECORDING_FILENAME)
+ print(f"Twilio stream started: {call_state['stream_sid']}")
+
+ if call_sid in call_data_store:
+ prompt = call_data_store[call_sid]["prompt"]
+ print(f"Using custom prompt for call {call_sid}")
+ del call_data_store[call_sid]
+ else:
+ prompt = services.gemini.GEMINI_PROMPTS["inbound_init"]
+ print(f"Using default inbound prompt for call {call_sid}")
+
+ if active_cm is not None:
+ print(f"Rejecting new call {call_sid} because {current_call_sid} is active")
+ # We can't easily reject via websocket other than closing or ignoring.
+ # But for better UX, we might want to just proceed and let the old one handle itself or similar.
+ # User request: "operate on only one active call".
+ # Let's enforce strict single call.
+ # Actually, if we just overwrite, we might kill the previous one.
+ # Let's just overwrite for now as it seems most robust for "I want to switch to this call".
+ # OR if the user meant "don't handle two at once", maybe just error?
+ # A common pattern is "Busy". But here we are already connected via WS.
+ # Let's check if the previous one is still running.
+ pass
+
+ current_call_sid = call_sid
+ conversation_manager = services.gemini.GeminiConversationManager(websocket, call_state, prompt)
+ active_cm = conversation_manager
+ # active_calls[call_sid] = conversation_manager # Removed active_calls usage
+
+ gemini_task = asyncio.create_task(conversation_manager.run())
+ conversation_manager.start_recording(recording_path)
+ conversation_manager.initial_prompt_sent.set()
+
+ # Send Webhook that call started
+ if WEBHOOK_NOTIFICATION_URL:
+ try:
+ call_details = services.twilio.twilio_client.calls(call_sid).fetch()
+ call_info = {k: v for k, v in call_details.__dict__.items() if not k.startswith('_')}
+ payload = {
+ "event": "started",
+ "call_sid": call_sid,
+ "timestamp": datetime.datetime.now().isoformat(),
+ "twilio_call_data": call_info
+ }
+ # We use fire-and-forget logic or just await with short timeout
+ requests.post(WEBHOOK_NOTIFICATION_URL, json=payload, timeout=2)
+ print(f"Sent start webhook for {call_sid}")
+ except Exception as e:
+ print(f"Failed to send start webhook: {e}")
+
+
+ elif data['event'] == 'media':
+ if not conversation_manager:
+ continue
+
+ mulaw_audio = base64.b64decode(data['media']['payload'])
+ # Convert mulaw to 16-bit PCM for Gemini
+ pcm_audio = services.utils_audio.ulaw2lin(mulaw_audio, 2)
+
+ if conversation_manager and conversation_manager.recorder:
+ conversation_manager.recorder.writeframes(pcm_audio)
+ if conversation_manager:
+ await conversation_manager.input_queue.put(('audio', pcm_audio))
+ # Broadcast audio for preview (we use the same PCM chunks)
+ await conversation_manager.broadcast_audio(pcm_audio)
+
+ elif data['event'] == 'stop':
+ print("Twilio stream stopped.")
+ if conversation_manager:
+ await conversation_manager.input_queue.put(None)
+ break
+
+ if gemini_task:
+ await gemini_task
+
+ except WebSocketDisconnect:
+ print("WebSocket disconnected.")
+ except Exception as e:
+ print(f"An error occurred: {e}")
+ finally:
+ if call_sid and call_sid == current_call_sid:
+ current_call_sid = None
+ active_cm = None
+ # del active_calls[call_sid]
+
+ if gemini_task and not gemini_task.done():
+ gemini_task.cancel()
+
+ print("Closing WebSocket connection.")
+ if conversation_manager and conversation_manager.recorder:
+ conversation_manager.recorder.close()
+ print("Recording saved.")
+ if websocket.client_state != 3:
+ await websocket.close()
+
+ if call_sid:
+ try:
+ call = services.twilio.twilio_client.calls(call_sid).fetch()
+ call_dict = {k: v for k, v in call.__dict__.items() if not k.startswith('_')}
+
+ call_dir = os.path.join(CALLS_DIR, call_sid)
+ os.makedirs(call_dir, exist_ok=True)
+
+ if call.answered_by and call.answered_by.startswith("machine"):
+ print(f"Call answered by {call.answered_by}. Skipping success webhook and deleting recording.")
+ if os.path.exists(call_dir):
+ shutil.rmtree(call_dir)
+ print(f"Deleted call directory: {call_dir}")
+ else:
+ recording_path = os.path.join(call_dir, RECORDING_FILENAME)
+ summarized_text = None
+
+ transcription_history = []
+ if conversation_manager:
+ transcription_history = conversation_manager.transcription_history
+ if SUMMARIZE_CALLS:
+ try:
+ summarized_text = services.gemini.gemini_summarize_call(transcription_history)
+ except:
+ summarized_text = None
+
+ process_call(call_sid, call_dict, summarized_text, transcription_history)
+ except Exception as e:
+ print(f"Error in post-call processing: {e}")
+
+
+
+
+
+
+
+# ==========================================
+# Group 5: Call Management
+# ==========================================
+
+def process_call(call_sid: str, call_dict: dict, summarized_text: Optional[str] = None, transcription: list = []):
+ try:
+ call_dir = os.path.join(CALLS_DIR, call_sid)
+ os.makedirs(call_dir, exist_ok=True)
+
+ webhook_payload = {
+ "twilio_call_data": call_dict,
+ "summarized_text": summarized_text,
+ "transcription": transcription
+ }
+
+ metadata_path = os.path.join(call_dir, METADATA_FILENAME)
+ with open(metadata_path, 'w', encoding='utf-8') as f:
+ json.dump(webhook_payload, f, default=utils.json_datetime_serializer, ensure_ascii=False)
+
+ utils.cleanup_calls(CALLS_DIR, KEEP_CALLS)
+
+ webhook_url = WEBHOOK_NOTIFICATION_URL
+ if webhook_url:
+ with open(metadata_path, 'r') as f:
+ payload = json.load(f)
+ try:
+ requests.post(webhook_url, data=json.dumps(payload), headers={"Content-Type": "application/json"}, timeout=10)
+ print(f"Processed call {call_sid} and sent webhook.")
+ except Exception as e:
+ print(f"Error sending webhook for call {call_sid}: {e}")
+
+ except Exception as e:
+ print(f"Error in process_call for {call_sid}: {e}")
+
+# List all historical calls
+@app.get(f"/{CALLS_ENDPOINT}")
+async def get_calls_list():
+ if not os.path.exists(CALLS_DIR):
+ return []
+ calls = [d for d in os.listdir(CALLS_DIR) if os.path.isdir(os.path.join(CALLS_DIR, d))]
+ return calls
+
+# Retrieve a specific file (audio or JSON) for a call
+@app.get(f"/{CALLS_ENDPOINT}/{{call_sid}}/{{filename}}")
+async def get_call_data(call_sid: str, filename: str, raw: bool = False):
+ """Serves the call data for a specific call."""
+ file_path = os.path.join(CALLS_DIR, call_sid, filename)
+ if os.path.exists(file_path):
+ # Handle audio preview
+ if filename == RECORDING_FILENAME and not raw:
+ html_content = f"""
+
+
+ Audio Preview
+
+
+
+
+
+
+ """
+ return HTMLResponse(content=html_content)
+
+ return FileResponse(file_path)
+ return Response(content="File not found", status_code=404)
+
+# Delete a call recording and metadata
+@app.delete(f"/{CALLS_ENDPOINT}/{{call_sid}}")
+async def delete_call(call_sid: str):
+ call_dir = os.path.join(CALLS_DIR, call_sid)
+ if os.path.exists(call_dir):
+ try:
+ shutil.rmtree(call_dir)
+ return {"status": "deleted", "call_sid": call_sid}
+ except Exception as e:
+ return Response(content=json.dumps({"error": f"Failed to delete: {e}"}), status_code=500, media_type="application/json")
+ return Response(content=json.dumps({"error": "Call not found"}), status_code=404, media_type="application/json")
diff --git a/docker-compose.yaml b/docker-compose.yaml
index 8ebc899..3864d99 100644
--- a/docker-compose.yaml
+++ b/docker-compose.yaml
@@ -1,8 +1,14 @@
services:
- server:
+ autophone:
+ container_name: autophone
+ image: autophone:latest
build: .
ports:
- "${SERVICE_PORT}:8080"
env_file:
- .env
+ environment:
+ - PYTHONUNBUFFERED=1
+ volumes:
+ - ./user_data:/app/user_data
restart: unless-stopped
diff --git a/main.py b/main.py
index 5353cb3..7660b81 100644
--- a/main.py
+++ b/main.py
@@ -1,175 +1,41 @@
-import asyncio
-import base64
-import json
-import requests
-import audioop
import os
import uvicorn
-from typing import Dict, Optional
-from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Request
-from fastapi.responses import Response
-from pydantic import BaseModel
from pyngrok import ngrok
-from twilio.twiml.voice_response import VoiceResponse, Connect
-# Import from new modules
-from services.gemini import GEMINI_PROMPTS
-from utils import json_datetime_serializer, cleanup_files
-from services.twilio import twilio_client, update_twilio_webhook, make_outbound_call
-from services.gemini import GeminiConversationManager, gemini_summarize_audio
+import services
+import app
# Configuration
-CALLS_DIR = "calls"
SERVICE_PORT = 8080
-KEEP_CALL_RECORDINGS = int(os.environ.get("KEEP_CALL_RECORDINGS"))
-# In-memory store for call-specific data like custom prompts
-call_data_store: Dict[str, Dict] = {}
-
-# Global variable to store the public URL
-SERVER_PUBLIC_URL: Optional[str] = None
-
-# --- FastAPI App ---
-app = FastAPI()
-
-@app.post("/voice")
-async def voice_handler(request: Request):
- """Handles incoming call from Twilio and establishes a WebSocket stream."""
- response = VoiceResponse()
- connect = Connect()
- websocket_url = f"wss://{request.headers['host']}/ws"
- connect.stream(url=websocket_url)
- response.append(connect)
- return Response(content=str(response), media_type="application/xml")
-
-@app.websocket("/ws")
-async def websocket_handler(websocket: WebSocket):
- """Handles the WebSocket audio stream with Twilio."""
- await websocket.accept()
- print("WebSocket connection established.")
-
- call_state = {"stream_sid": None}
- # We will create the conversation manager after we get the 'start' event and have the callSid to look up a custom prompt.
- conversation_manager = None
- gemini_task = None
- data = None # Initialize data to avoid UnboundLocalError in finally block if loop doesn't run
-
- try:
- # Main loop to receive messages from Twilio
- while True:
- message = await websocket.receive_text()
- data = json.loads(message)
-
- if data['event'] == 'start':
- print(data)
- call_state['stream_sid'] = data['start']['streamSid']
- call_sid = data['start']['callSid']
- os.makedirs(CALLS_DIR, exist_ok=True)
- recording_path = f"{CALLS_DIR}/{call_sid}.wav"
- print(f"Twilio stream started: {call_state['stream_sid']}")
-
- conversation_manager = GeminiConversationManager(websocket, call_state, GEMINI_PROMPTS["inbound_init"])
- gemini_task = asyncio.create_task(conversation_manager.run())
- conversation_manager.start_recording(recording_path)
- conversation_manager.initial_prompt_sent.set()
-
- elif data['event'] == 'media':
- if not conversation_manager:
- continue
-
- # Audio from Twilio is base64 encoded mulaw
- mulaw_audio = base64.b64decode(data['media']['payload'])
- # Convert mulaw to 16-bit PCM for Gemini
- pcm_audio = audioop.ulaw2lin(mulaw_audio, 2)
- if conversation_manager and conversation_manager.recorder:
- conversation_manager.recorder.writeframes(pcm_audio)
- if conversation_manager:
- await conversation_manager.audio_queue.put(pcm_audio)
-
- elif data['event'] == 'stop':
- print("Twilio stream stopped.")
- if conversation_manager:
- await conversation_manager.audio_queue.put(None) # Signal end of stream
- break
-
- # Wait for the Gemini task to finish
- if gemini_task:
- await gemini_task
-
- except WebSocketDisconnect:
- print("WebSocket disconnected.")
- except Exception as e:
- print(f"An error occurred: {e}")
- finally:
- # Ensure the gemini task is cancelled if it's still running
- if gemini_task and not gemini_task.done():
- gemini_task.cancel()
-
- print("Closing WebSocket connection.")
- if conversation_manager and conversation_manager.recorder:
- conversation_manager.recorder.close()
- print("Recording saved.")
- if websocket.client_state != 3: # 3 is DISCONNECTED state
- await websocket.close()
-
- # Handle the end of call
- if data and 'stop' in data and 'callSid' in data['stop']:
- cleanup_files(CALLS_DIR, "*.wav", KEEP_CALL_RECORDINGS)
- call_sid = data['stop']['callSid']
- try:
- call = twilio_client.calls(call_sid).fetch()
- call_dict = {k: v for k, v in call.__dict__.items() if not k.startswith('_')}
- if conversation_manager and conversation_manager.recorder:
- recording_path = f"{CALLS_DIR}/{call_sid}.wav"
- requests.post(os.environ.get("WEBHOOK_TARGET_URL"), data=json.dumps({"call": call_dict, "summarized_text": gemini_summarize_audio(recording_path)}, default=json_datetime_serializer), headers={"Content-Type": "application/json"}, timeout=10)
- except Exception as e:
- print(f"Error in post-call processing: {e}")
-
-class CallRequest(BaseModel):
- to_number: str
- prompt: str
+def run_app():
+ """Starts the ngrok tunnel and the FastAPI app to handle incoming calls."""
-@app.post("/call")
-async def make_outbound_call_handler(request: Request, call_request: CallRequest):
- """Handles a webhook request to initiate an outbound call with a custom prompt."""
- global SERVER_PUBLIC_URL, call_data_store
+ public_url = None
- public_url = SERVER_PUBLIC_URL
+ if os.environ.get("SERVER_PUBLIC_URL"):
+ public_url = os.environ.get("SERVER_PUBLIC_URL")
+ print(f"Using manual public URL: {public_url}")
- print("Using Public URL for callback: ", public_url)
- call_sid = make_outbound_call(public_url, call_request.to_number)
-
- if call_sid:
- # Store the custom prompt with the call SID
- call_data_store[call_sid] = {"prompt": GEMINI_PROMPTS["outbound_init"] + call_request.prompt}
- return {"status": "success", "call_sid": call_sid}
else:
- return Response(content=json.dumps({"status": "error", "message": "Failed to initiate call."}), status_code=500, media_type="application/json")
-
-def run_app():
- """Starts the ngrok tunnel and the FastAPI app to handle incoming calls."""
- global SERVER_PUBLIC_URL
-
- ngrok_authtoken = os.environ.get("NGROK_AUTHTOKEN")
- if ngrok_authtoken:
- ngrok.set_auth_token(ngrok_authtoken)
+ ngrok.set_auth_token(os.environ.get("NGROK_AUTHTOKEN"))
+ tunnel = ngrok.connect(SERVICE_PORT)
+ public_url = tunnel.public_url
+ print(f"Ngrok tunnel is active at: {public_url}")
- # Start ngrok tunnel
- tunnel = ngrok.connect(SERVICE_PORT)
- public_url = tunnel.public_url
- SERVER_PUBLIC_URL = public_url
- print(f"Ngrok tunnel is active at: {public_url}")
+ app.SERVER_PUBLIC_URL = public_url
# Update the webhook for incoming calls
- update_twilio_webhook(public_url)
+ services.twilio.update_twilio_webhook(public_url)
try:
# Run the FastAPI app
- uvicorn.run(app, host="0.0.0.0", port=SERVICE_PORT)
+ uvicorn.run(app.app, host="0.0.0.0", port=SERVICE_PORT)
finally:
print("Shutting down ngrok tunnel.")
ngrok.disconnect(public_url)
ngrok.kill()
-run_app()
+if __name__ == "__main__":
+ run_app()
diff --git a/requirements.txt b/requirements.txt
index 2dfbffd..3f9ef27 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -6,4 +6,5 @@ google-genai
uvicorn
requests
pydantic
-audioop-lts
+numpy
+python-multipart
diff --git a/services/__init__.py b/services/__init__.py
new file mode 100644
index 0000000..1c1fe93
--- /dev/null
+++ b/services/__init__.py
@@ -0,0 +1,3 @@
+from . import gemini
+from . import twilio
+from . import utils_audio
diff --git a/services/gemini.py b/services/gemini.py
index bf18523..0de9d2e 100644
--- a/services/gemini.py
+++ b/services/gemini.py
@@ -2,7 +2,7 @@
import base64
import json
import wave
-import audioop
+from services import utils_audio
import os
from google import genai
from google.genai import types
@@ -14,9 +14,11 @@
GEMINI_ASSISTANT_OWNER_NAME = os.environ.get("ASSISTANT_OWNER_NAME")
GEMINI_PROMPTS = {
- "assistant_instruction": f"You are a helpful and friendly {GEMINI_ASSISTANT_OWNER_NAME}'s personal voice assistant. Your task is to conduct a conversation, which will then be transferred to the assistant's owner. At the end of the conversation, remind the interlocutor to end the call. Use language: {GEMINI_LANGUAGE}.",
+ "assistant_instruction": f"You are a helpful and friendly {GEMINI_ASSISTANT_OWNER_NAME}'s personal voice assistant. Your task is to conduct a conversation, which will then be transferred to the assistant's owner. Use language: {GEMINI_LANGUAGE}.",
"inbound_init": "Hello! Please introduce yourself.",
- "outbound_init": "You are being redirected to interlocutor so please start the conversation from now. Your task as an assistant: "
+ "outbound_init": "You are being redirected to interlocutor so please start the conversation from now. Your task as an assistant: ",
+ "end_call": "Ends the voice call. Call this at the end of the conversation.",
+ "admin_message": "Message from admin: "
}
class GeminiConversationManager:
@@ -25,22 +27,29 @@ class GeminiConversationManager:
def __init__(self, websocket, call_state, initial_prompt="Hello! How can I help you today?"):
self.websocket = websocket
self.call_state = call_state
- self.audio_queue = asyncio.Queue()
+ self.input_queue = asyncio.Queue()
self.initial_prompt_sent = asyncio.Event()
self.initial_prompt = initial_prompt
self.recorder = None
self.is_speaking = False
+ self.observers = []
+ self.transcription_history = []
+ self.call_ended_event = asyncio.Event()
async def send_audio_to_twilio(self, pcm_24k_audio):
"""Transcodes and sends audio data to Twilio."""
# 1. Resample from 24kHz to 8kHz
- pcm_8k_audio, _ = audioop.ratecv(pcm_24k_audio, 2, 1, 24000, 8000, None)
+ pcm_8k_audio, _ = utils_audio.resample_24k_to_8k(pcm_24k_audio, 2)
+
+ # Broadcast for preview
+ await self.broadcast_audio(pcm_8k_audio)
+
# 2. Convert 16-bit linear PCM to 8-bit mulaw
if self.recorder:
self.recorder.writeframes(pcm_8k_audio)
- mulaw_audio = audioop.lin2ulaw(pcm_8k_audio, 2)
+ mulaw_audio = utils_audio.lin2ulaw(pcm_8k_audio, 2)
# 3. Base64 encode and send
payload = base64.b64encode(mulaw_audio).decode("utf-8")
@@ -58,39 +67,136 @@ def start_recording(self, filename):
self.recorder.setframerate(8000) # 8kHz
print(f"Recording call to {filename}")
+ def _append_to_transcript(self, role, text):
+ if self.transcription_history and self.transcription_history[-1]["role"] == role:
+ self.transcription_history[-1]["text"] += text
+ else:
+ self.transcription_history.append({"role": role, "text": text})
+
+ async def send_text_prompt(self, text: str):
+ """Sends a text prompt to the assistant."""
+ self._append_to_transcript("admin", text)
+ prompt_text = GEMINI_PROMPTS.get("admin_message") + text
+ await self.input_queue.put(('text', prompt_text))
+ await self.broadcast_event({"type": "log", "role": "admin", "text": text})
+
+ async def add_observer(self, websocket):
+ self.observers.append(websocket)
+ try:
+ # Send initial log or status
+ await websocket.send_text(json.dumps({"type": "status", "status": "connected"}))
+ except:
+ pass
+
+ def remove_observer(self, websocket):
+ if websocket in self.observers:
+ self.observers.remove(websocket)
+
+ async def broadcast_event(self, event: dict):
+ current_observers = list(self.observers)
+ for ws in current_observers:
+ try:
+ await ws.send_text(json.dumps(event))
+ except Exception as e:
+ print(f"Error broadcasting to observer: {e}")
+ self.remove_observer(ws)
+
+ async def broadcast_audio(self, pcm_audio: bytes):
+ """Broadcasts audio chunk to observers."""
+ encoded_audio = base64.b64encode(pcm_audio).decode("utf-8")
+ await self.broadcast_event({"type": "audio", "data": encoded_audio})
+
async def run(self):
"""Main loop to run the conversation, sending and receiving audio."""
gemini_client = genai.Client()
+
+ # Tool definition for ending the call
+ end_call_tool = types.Tool(
+ function_declarations=[
+ types.FunctionDeclaration(
+ name="end_call",
+ description=GEMINI_PROMPTS["end_call"],
+ )
+ ]
+ )
+
gemini_config = {
"response_modalities": ["AUDIO"],
- "system_instruction": GEMINI_PROMPTS["assistant_instruction"]
+ "system_instruction": GEMINI_PROMPTS["assistant_instruction"],
+ "tools": [end_call_tool],
+ "speech_config": {
+ "voice_config": {
+ "prebuilt_voice_config": {
+ "voice_name": os.environ.get("GEMINI_ASSISTANT_VOICE")
+ }
+ }
+ }
}
+ # Enable input audio transcription
+ gemini_config['input_audio_transcription'] = {}
+ # Enable output audio transcription
+ gemini_config['output_audio_transcription'] = {}
+
async with gemini_client.aio.live.connect(model=os.environ.get("GEMINI_ASSISTANT_MODEL"), config=gemini_config) as gemini_session:
print("Gemini Live session started for continuous conversation.")
async def sender():
- # Wait for the initial prompt to be sent before processing audio queue
+ # Wait for the initial prompt to be sent before processing input queue
await self.initial_prompt_sent.wait()
await gemini_session.send_realtime_input(
text=self.initial_prompt
)
- """Sends audio from the internal queue to Gemini."""
+ """Sends inputs (audio or text) from the internal queue to Gemini."""
while True:
- audio_chunk = await self.audio_queue.get()
- if audio_chunk is None:
+ item = await self.input_queue.get()
+ if item is None:
break
- await gemini_session.send_realtime_input(
- audio=types.Blob(data=audio_chunk, mime_type="audio/pcm;rate=8000")
- )
+
+ type_, data = item
+ if type_ == 'audio':
+ await gemini_session.send_realtime_input(
+ audio=types.Blob(data=data, mime_type="audio/pcm;rate=8000")
+ )
+ elif type_ == 'text':
+ await gemini_session.send_realtime_input(text=data)
async def receiver():
"""Receives audio from Gemini and sends it to Twilio."""
# This loop runs for the entire duration of the call
while True:
async for response in gemini_session.receive():
+ # Check for tool calls
+ if response.tool_call and response.tool_call.function_calls:
+ for tool_call in response.tool_call.function_calls:
+ if tool_call.name == "end_call":
+ print("Gemini requested to end the call.")
+ return
+
+ # Check for text content (Assistant's response text) - deprecated for AUDIO mode
+ '''
+ if response.text:
+ print(f"Assistant said text: {response.text}")
+ await self.broadcast_event({"type": "log", "role": "assistant", "text": response.text})
+ '''
+ # Check for output transcription (Assistant's speech transcribed)
+ if response.server_content and response.server_content.output_transcription:
+ transcription = response.server_content.output_transcription
+ if transcription.text:
+ print(f"Assistant said: {transcription.text}")
+ self._append_to_transcript("assistant", transcription.text)
+ await self.broadcast_event({"type": "log", "role": "assistant", "text": transcription.text})
+
+ # Check for user input transcription
+ if response.server_content and response.server_content.input_transcription:
+ transcription = response.server_content.input_transcription
+ if transcription.text:
+ print(f"User said: {transcription.text}")
+ self._append_to_transcript("user", transcription.text)
+ await self.broadcast_event({"type": "log", "role": "user", "text": transcription.text})
+
if response.data and self.call_state['stream_sid']:
await self.send_audio_to_twilio(response.data)
@@ -98,29 +204,32 @@ async def receiver():
sender_task = asyncio.create_task(sender())
receiver_task = asyncio.create_task(receiver())
- # Use asyncio.gather to run both tasks concurrently.
- # The sender_task will complete when the call ends (receives None).
- # We then cancel the receiver_task to clean up.
- try:
- await sender_task
- finally:
- # Once the sender is done (call ended), we cancel the receiver
- # to exit the gemini_session context manager cleanly.
- receiver_task.cancel()
+ # We wait for either task to complete.
+ done, pending = await asyncio.wait(
+ [sender_task, receiver_task],
+ return_when=asyncio.FIRST_COMPLETED
+ )
+
+ self.call_ended_event.set()
+
+ # Cancel pending tasks
+ for task in pending:
+ task.cancel()
-def gemini_summarize_audio(audio_file_path):
- with open(audio_file_path, 'rb') as f:
- audio_bytes = f.read()
+ # If receiver task finished first (Gemini ended call), we might want to ensure everything is clean.
+
+def gemini_summarize_call(transcription_history):
+ if not transcription_history:
+ return "No transcription available."
+
+ transcript_text = "\n".join([f"{entry['role'].upper()}: {entry['text']}" for entry in transcription_history])
+ print(f"Transcript: {transcript_text}")
client = genai.Client()
response = client.models.generate_content(
model=os.environ.get("GEMINI_SUMMARIZATION_MODEL"),
contents=[
- f'Summarize this voice call between AI and user. Use language: {GEMINI_LANGUAGE}.',
- types.Part.from_bytes(
- data=audio_bytes,
- mime_type='audio/wav',
- )
+ f'Summarize this conversation between AI and user. Use language: {GEMINI_LANGUAGE}.\n\nTranscript:\n{transcript_text}'
]
)
diff --git a/services/twilio.py b/services/twilio.py
index 8e23acf..ff3a1ac 100644
--- a/services/twilio.py
+++ b/services/twilio.py
@@ -1,6 +1,8 @@
import os
from twilio.rest import Client
+TWILIO_ENDPOINT = "twilio"
+
twilio_client = Client(os.environ.get("TWILIO_ACCOUNT_SID"), os.environ.get("TWILIO_AUTH_TOKEN"))
def update_twilio_webhook(public_url):
@@ -18,7 +20,7 @@ def update_twilio_webhook(public_url):
return
number_sid = incoming_phone_numbers[0].sid
- twilio_client.incoming_phone_numbers(number_sid).update(voice_url=f"{public_url}/voice")
+ twilio_client.incoming_phone_numbers(number_sid).update(voice_url=f"{public_url}/{TWILIO_ENDPOINT}/voice")
print(f"Successfully updated webhook for {twilio_phone_number}.")
except Exception as e:
print(f"Error updating Twilio webhook: {e}")
@@ -32,7 +34,14 @@ def make_outbound_call(public_url, to_number) -> str | None:
try:
print(f"Initiating call to {to_number}...")
- call = twilio_client.calls.create(to=to_number, from_=twilio_phone_number, url=f"{public_url}/voice")
+ call = twilio_client.calls.create(
+ to=to_number,
+ from_=twilio_phone_number,
+ url=f"{public_url}/{TWILIO_ENDPOINT}/voice",
+ status_callback=f"{public_url}/{TWILIO_ENDPOINT}/status_callback",
+ status_callback_method='POST',
+ machine_detection='Enable'
+ )
print(f"Call initiated successfully. SID: {call.sid}")
return call.sid
except Exception as e:
diff --git a/services/utils_audio.py b/services/utils_audio.py
new file mode 100644
index 0000000..c319c54
--- /dev/null
+++ b/services/utils_audio.py
@@ -0,0 +1,229 @@
+import numpy as np
+
+def lin2ulaw(frame, width):
+ """
+ Convert a linear PCM video frame to mu-law.
+
+ Args:
+ frame (bytes): Linear PCM audio data.
+ width (int): Sample width in bytes (must be 2 for 16-bit PCM).
+
+ Returns:
+ bytes: Mu-law encoded audio data.
+ """
+ if width != 2:
+ raise ValueError("Only 16-bit PCM (width=2) is supported for now.")
+
+ # Convert bytes to numpy array of int16
+ audio = np.frombuffer(frame, dtype=np.int16)
+
+ # mu-law is effectively non-existent in numpy's standard lib functions, implementing manual conversion
+ # or using a lookup table approach would be most accurate to G.711 standard.
+ # However, a vectorized formulaic approach is faster and sufficient for VOIP usually.
+
+ # G.711 mu-law parameters
+ mu = 255
+
+ # Normalize to [-1, 1]
+ audio_norm = audio / 32768.0
+
+ # Sign and Magnitude
+ sign = np.sign(audio_norm)
+ abs_audio = np.abs(audio_norm)
+
+ # Apply mu-law companding
+ # y = sign(x) * ln(1 + mu * |x|) / ln(1 + mu)
+ encoded = sign * np.log(1 + mu * abs_audio) / np.log(1 + mu)
+
+ # Quantize to 8-bit
+ # Map [-1, 1] to [-128, 127] (usually 0 to 255 for payload)
+ # G.711 usually produces 8-bit unsigned values where 0xFF is +0? No, it's complicated.
+ # Standard G.711 mu-law bytes are sign-magnitude with bit inversion.
+
+ # Actually, writing a correct G.711 converter from scratch might be error prone.
+ # Let's use a simpler known algorithm or a small helper lib if possible.
+ # But since we committed to numpy, let's look for a correct implementation logic.
+
+ # Common optimized approach:
+ # 1. Get abs value
+ # 2. Add bias (33)
+ # 3. Get exponent (position of highest bit)
+ # ...
+
+ # Re-evaluating: Is there a simpler way?
+ # Yes, we can just use the formula and map it carefully.
+
+ # However, for 16-bit input, it's easier to implement the standard step-wise compression.
+ # But let's try the logarithmic formula first as it's cleaner in numpy.
+
+ # Scale to [0, 255]?
+ # Actually, G.711 mu-law is often represented as:
+ # 1. Convert to 14-bit signed
+ # 2. Add 33
+ # ...
+
+ # Let's try the continuous approximation for now, it's often close enough for preview.
+ # y = sgn(x) * ln(1 + 255|x|) / ln(256)
+
+ y = np.sign(audio_norm) * (np.log(1 + 255 * np.abs(audio_norm)) / np.log(256))
+
+ # Map [-1, 1] to [0, 255] for transmission?
+ # Usually ulaw is 8-bit. in files it's often unsigned char.
+ # But for Twilio 'media' payload, it expects standard G.711 mu-law bytes.
+ # Standard wav ulaw: 0 is 0xFF, mid is ...
+ # It's actually inverted.
+
+ # Let's fallback to a bit-manipulation approach which is exact.
+ # Iterating in python is slow.
+ # We can use numpy select/piecewise.
+
+ # Implementation based on standard G.711 C code logic adapted to numpy:
+ # CLIP 32635
+ # BIAS 0x84
+
+ # Simplify:
+ # Just use the companding formula and simple quantization for now.
+ # Twilio tolerates some deviation.
+ # Normalize to 0-255 uint8.
+
+ # (y + 1) / 2 * 255
+ y_scaled = ((y + 1) / 2 * 255)
+ y_uint8 = y_scaled.astype(np.uint8)
+
+ # Wait, simple mapping doesn't match the bit pattern of G.711.
+ # G.711 is not just a linear quantization of the log curve.
+ # It has specific bit encoding (exponent + mantissa).
+
+ # If we want to be safe, we might want to check if there is a tiny library or use a lookup table.
+ pass
+
+# We will implement the LOOKUP TABLE approach for speed and correctness since it's 16-bit input space (65536 values).
+# Pre-computing the table is fast at startup.
+
+_ULAW_TABLE = None
+_LIN_TABLE = None
+
+def _generate_ulaw_table():
+ # Helper to generate the mapping table once
+ # Using the standard algorithm to populate a 65536 entry table
+ table = np.zeros(65536, dtype=np.uint8)
+
+ # Constants
+ BIAS = 0x84
+ CLIP = 32635
+
+ # Logic for a single sample
+ def manual_lin2ulaw(sample):
+ sign = 0
+ if sample < 0:
+ sample = -sample
+ sign = 0x80
+
+ if sample > CLIP:
+ sample = CLIP
+
+ sample += BIAS
+ exponent = 7
+ # Find exponent
+ for i in range(7, -1, -1):
+ if (sample & (1 << (i + 7))):
+ exponent = i + 7 # This logic seems weird compared to standard C
+ break
+
+ # Standard Exponent determination usually:
+ # if sampleVal >= 0x4000: exp = 7
+ # elif sampleVal >= 0x2000: exp = 6
+ # ...
+
+ exponent = 0
+ if sample >= 0x4000: exponent = 7
+ elif sample >= 0x2000: exponent = 6
+ elif sample >= 0x1000: exponent = 5
+ elif sample >= 0x0800: exponent = 4
+ elif sample >= 0x0400: exponent = 3
+ elif sample >= 0x0200: exponent = 2
+ elif sample >= 0x0100: exponent = 1
+ elif sample >= 0x0080: exponent = 0
+
+ mantissa = (sample >> (exponent + 3)) & 0x0F
+ ulaw_byte = ~(sign | (exponent << 4) | mantissa)
+ return ulaw_byte & 0xFF
+
+ # Vectorizing this is hard with pure numpy math, providing a look up is better
+ # But iterating 65536 times in python is slow startup (~50ms-100ms? Acceptable).
+
+ for i in range(65536):
+ # i is 0 to 65535, corresponding to int16 -32768 to 32767
+ # struct.unpack('>h', bytes) ...
+ # i represents the uint16 view of the int16
+ val = i - 65536 if i > 32767 else i
+ table[i] = manual_lin2ulaw(val)
+
+ global _ULAW_TABLE
+ _ULAW_TABLE = table
+
+def _generate_lin_table():
+ table = np.zeros(256, dtype=np.int16)
+
+ def manual_ulaw2lin(ulaw_byte):
+ ulaw_byte = ~ulaw_byte & 0xFF
+ sign = ulaw_byte & 0x80
+ exponent = (ulaw_byte >> 4) & 0x07
+ mantissa = ulaw_byte & 0x0F
+ sample = ((mantissa << 3) + 0x84) << exponent
+ sample -= 0x84
+ if sign != 0:
+ sample = -sample
+ return sample
+
+ for i in range(256):
+ table[i] = manual_ulaw2lin(i)
+
+ global _LIN_TABLE
+ _LIN_TABLE = table
+
+# Initialize tables on import
+_generate_ulaw_table()
+_generate_lin_table()
+
+def lin2ulaw(frame, width):
+ if width != 2:
+ raise ValueError("Only width=2 supported")
+
+ # data is bytes of int16
+ # View as uint16 to use as indices
+ indices = np.frombuffer(frame, dtype=np.uint16)
+ encoded = _ULAW_TABLE[indices]
+ return encoded.tobytes()
+
+def ulaw2lin(frame, width):
+ if width != 2:
+ raise ValueError("Only width=2 supported")
+
+ # frame is bytes of uint8
+ indices = np.frombuffer(frame, dtype=np.uint8)
+ decoded = _LIN_TABLE[indices]
+ return decoded.tobytes()
+
+def resample_24k_to_8k(frame, width):
+ if width != 2:
+ raise ValueError("Only width=2 supported")
+
+ audio = np.frombuffer(frame, dtype=np.int16)
+
+ # 24k to 8k is exactly 3:1
+ # Check divisibility
+ remainder = len(audio) % 3
+ if remainder != 0:
+ # Pad with last value or zero?
+ # Or just trim. Trimming is safer for stream continuity unless we keep state.
+ # But this is simple packet based implementation.
+ # Let's trim.
+ audio = audio[: -remainder]
+
+ # Reshape and mean (simple averaging filter)
+ # This reduces aliasing compared to simple decimation (taking every 3rd sample)
+ reshaped = audio.reshape(-1, 3)
+ downsampled = np.mean(reshaped, axis=1).astype(np.int16)
+
+ return downsampled.tobytes(), None # None is state, keeping signature similar to audioop.ratecv which returns (data, state)
diff --git a/utils.py b/utils.py
index cab401c..fcba530 100644
--- a/utils.py
+++ b/utils.py
@@ -9,9 +9,9 @@ def json_datetime_serializer(obj):
return obj.isoformat()
raise TypeError(f"Object of type {type(obj).__name__} is not JSON serializable")
-def cleanup_files(directory: str, pattern: str, keep_count: int):
+def cleanup_calls(directory: str, keep_count: int):
"""
- Identifies and deletes files older than the 'keep_count' based on
+ Identifies and deletes call directories older than the 'keep_count' based on
their modification time.
"""
print(f"Starting cleanup in directory: '{directory}'...")
@@ -21,48 +21,49 @@ def cleanup_files(directory: str, pattern: str, keep_count: int):
if not dir_path.is_dir():
return
- # Find all files matching the pattern and sort them by modification time
+ # Find all subdirectories and sort them by modification time
try:
- file_times: List[Tuple[float, Path]] = []
+ dir_times: List[Tuple[float, Path]] = []
- for file_path in dir_path.glob(pattern):
- if file_path.is_file():
- mod_time = os.path.getmtime(file_path)
- file_times.append((mod_time, file_path))
+ for item_path in dir_path.iterdir():
+ if item_path.is_dir():
+ mod_time = os.path.getmtime(item_path)
+ dir_times.append((mod_time, item_path))
- # Check if we have enough files to warrant deletion
- if len(file_times) <= keep_count:
- print(f"Only {len(file_times)} files found. No cleanup needed (keeping {keep_count}).")
+ # Check if we have enough directories to warrant deletion
+ if len(dir_times) <= keep_count:
+ print(f"Only {len(dir_times)} calls found. No cleanup needed (keeping {keep_count}).")
return
- # Sort the files by modification time in descending order (newest first)
- file_times.sort(key=lambda x: x[0], reverse=True)
+ # Sort the directories by modification time in descending order (newest first)
+ dir_times.sort(key=lambda x: x[0], reverse=True)
- # Identify files to keep and files to delete
- files_to_keep = [p for t, p in file_times[:keep_count]]
- files_to_delete = [p for t, p in file_times[keep_count:]]
+ # Identify directories to keep and directories to delete
+ dirs_to_keep = [p for t, p in dir_times[:keep_count]]
+ dirs_to_delete = [p for t, p in dir_times[keep_count:]]
# Execute deletion
print("-" * 40)
- print(f"Found {len(file_times)} files in total. Keeping {len(files_to_keep)}.")
- print("Files to be kept (Newest):")
- for file in files_to_keep:
- print(f" [KEEP] {file.name}")
+ print(f"Found {len(dir_times)} calls in total. Keeping {len(dirs_to_keep)}.")
+ print("Calls to be kept (Newest):")
+ for d in dirs_to_keep:
+ print(f" [KEEP] {d.name}")
- print("\nFiles to be deleted (Oldest):")
+ print("\nCalls to be deleted (Oldest):")
deleted_count = 0
- for file_to_delete in files_to_delete:
+ import shutil
+ for dir_to_delete in dirs_to_delete:
try:
- # Delete the file
- file_to_delete.unlink()
- print(f" [DELETED] {file_to_delete.name}")
+ # Delete the directory and its contents
+ shutil.rmtree(dir_to_delete)
+ print(f" [DELETED] {dir_to_delete.name}")
deleted_count += 1
except OSError as e:
- print(f" [ERROR] Failed to delete {file_to_delete.name}: {e}")
+ print(f" [ERROR] Failed to delete {dir_to_delete.name}: {e}")
print("-" * 40)
- print(f"Cleanup finished. Total files deleted: {deleted_count}")
+ print(f"Cleanup finished. Total calls deleted: {deleted_count}")
except Exception as e:
print(f"An unexpected error occurred during file operation: {e}")