From bf511306fd216627a541c4921c6982fee118a9ec Mon Sep 17 00:00:00 2001 From: ptaindia Date: Sun, 7 Dec 2025 23:47:06 +0530 Subject: [PATCH] Upgrade to FastAPI 0.124.0 with modern patterns MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Dependencies Updated - FastAPI: 0.115.4 → 0.124.0 - Pydantic: 2.9.2 → 2.10.3 - Pydantic-settings: 2.6.1 → 2.7.0 - Uvicorn: 0.32.0 → 0.32.1 - Starlette: → 0.45.2 - Added: annotated-doc for Doc type hints ## New Features Implemented ### Annotated Type Hints with Doc - Dependencies use `Annotated[Type, Doc("description")]` for better docs - Created typed dependency aliases: `DatabaseSession`, `RequiredAPIKey` - Enhanced parameter documentation in headers ### Enhanced Pydantic Models - Added `json_schema_extra` with examples for all request/response models - Implemented `@computed_field` for derived properties (is_complete, duration_seconds) - Added validation constraints (ge, le) to numeric fields - Full `Annotated[Type, Field(...), Doc(...)]` pattern ### Improved OpenAPI Documentation - Added tag descriptions for endpoint grouping - Enhanced API description with markdown formatting - Configured Swagger UI parameters (deepLinking, persistAuthorization) - Added response schemas for error cases (400, 401, 429, 503) - Enabled `separate_input_output_schemas` for cleaner schema generation ### Code Quality - Consistent use of `status.HTTP_*` constants - Better error responses with structured error objects - Type hints throughout codebase --- api/dependencies.py | 225 +++++++++++---- api/main.py | 77 ++++- api/models/job.py | 500 +++++++++++++++++++++++++++------ api/routers/convert.py | 90 ++++-- docker/requirements-stable.txt | 15 +- requirements.txt | 13 +- 6 files changed, 749 insertions(+), 171 deletions(-) diff --git a/api/dependencies.py b/api/dependencies.py index ae89f01..6104d93 100644 --- a/api/dependencies.py +++ b/api/dependencies.py @@ -1,9 +1,15 @@ """ -FastAPI dependencies for authentication, database, etc. +FastAPI dependencies for authentication, database, and common utilities. + +This module uses modern FastAPI 0.124+ patterns including: +- Annotated type hints with Doc for better documentation +- Dependency scopes for proper resource management +- Enhanced type safety with Pydantic 2.10+ """ from typing import Optional, Annotated, AsyncGenerator -from fastapi import Depends, HTTPException, Header, Request +from annotated_doc import Doc +from fastapi import Depends, HTTPException, Header, Request, status from sqlalchemy.ext.asyncio import AsyncSession import structlog @@ -13,116 +19,198 @@ logger = structlog.get_logger() +# Type aliases for cleaner code +APIKey = Annotated[str, Doc("Valid API key for authentication")] +OptionalAPIKey = Annotated[Optional[str], Doc("Optional API key from headers")] + + async def get_db() -> AsyncGenerator[AsyncSession, None]: - """Get database session dependency.""" + """ + Get database session dependency. + + Uses FastAPI's dependency injection to provide database sessions + that are automatically closed after the request completes. + """ async for session in get_session(): yield session +# Create typed dependency for database session +DatabaseSession = Annotated[ + AsyncSession, + Depends(get_db), + Doc("Async database session for database operations") +] + + async def get_api_key( - x_api_key: Annotated[Optional[str], Header()] = None, - authorization: Annotated[Optional[str], Header()] = None, + x_api_key: Annotated[ + Optional[str], + Header( + alias="X-API-Key", + description="API key for authentication", + example="rnd_live_abcdef123456789" + ) + ] = None, + authorization: Annotated[ + Optional[str], + Header( + description="Bearer token authorization", + example="Bearer rnd_live_abcdef123456789" + ) + ] = None, ) -> Optional[str]: - """Extract API key from headers.""" + """ + Extract API key from request headers. + + Supports two authentication methods: + 1. X-API-Key header: Direct API key + 2. Authorization header: Bearer token format + """ if x_api_key: return x_api_key - + if authorization and authorization.startswith("Bearer "): return authorization[7:] - + return None async def require_api_key( request: Request, - api_key: Optional[str] = Depends(get_api_key), - db: AsyncSession = Depends(get_db), + api_key: Annotated[ + Optional[str], + Depends(get_api_key), + Doc("API key extracted from request headers") + ] = None, + db: DatabaseSession = None, ) -> str: - """Require valid API key for endpoint access.""" + """ + Require valid API key for endpoint access. + + This dependency: + - Validates API key format and existence + - Uses timing attack protection + - Supports IP whitelist validation + - Updates API key usage statistics + + Returns: + str: Validated API key + + Raises: + HTTPException: 401 if API key is missing or invalid + HTTPException: 403 if IP is not in whitelist + """ if not settings.ENABLE_API_KEYS: return "anonymous" - + if not api_key: raise HTTPException( - status_code=401, - detail="API key required", + status_code=status.HTTP_401_UNAUTHORIZED, + detail={ + "error": "authentication_required", + "message": "API key required", + "help": "Include X-API-Key header or Authorization: Bearer " + }, headers={"WWW-Authenticate": "Bearer"}, ) - + # Validate API key against database with timing attack protection import asyncio from api.services.api_key import APIKeyService - + # Always take the same amount of time regardless of key validity start_time = asyncio.get_event_loop().time() - + api_key_model = await APIKeyService.validate_api_key( db, api_key, update_usage=True ) - + # Ensure constant time execution (minimum 100ms) elapsed = asyncio.get_event_loop().time() - start_time min_time = 0.1 # 100ms if elapsed < min_time: await asyncio.sleep(min_time - elapsed) - + if not api_key_model: logger.warning( "Invalid API key attempted", api_key_prefix=api_key[:8] + "..." if len(api_key) > 8 else api_key, - client_ip=request.client.host, + client_ip=request.client.host if request.client else "unknown", ) raise HTTPException( - status_code=401, - detail="Invalid API key", + status_code=status.HTTP_401_UNAUTHORIZED, + detail={ + "error": "invalid_api_key", + "message": "Invalid API key" + }, ) - + # Check IP whitelist if enabled if settings.ENABLE_IP_WHITELIST: import ipaddress - client_ip = request.client.host - + client_ip = request.client.host if request.client else "unknown" + # Validate client IP against CIDR ranges - client_ip_obj = ipaddress.ip_address(client_ip) - allowed = False - - for allowed_range in settings.ip_whitelist_parsed: - try: - if client_ip_obj in ipaddress.ip_network(allowed_range, strict=False): - allowed = True - break - except (ipaddress.AddressValueError, ipaddress.NetmaskValueError): - # Fallback to string comparison for invalid CIDR - if client_ip.startswith(allowed_range): - allowed = True - break - - if not allowed: - logger.warning( - "IP not in whitelist", - client_ip=client_ip, - api_key_id=str(api_key_model.id), - user_id=api_key_model.user_id, - ) - raise HTTPException( - status_code=403, - detail="IP address not authorized", - ) - + try: + client_ip_obj = ipaddress.ip_address(client_ip) + allowed = False + + for allowed_range in settings.ip_whitelist_parsed: + try: + if client_ip_obj in ipaddress.ip_network(allowed_range, strict=False): + allowed = True + break + except (ipaddress.AddressValueError, ipaddress.NetmaskValueError): + # Fallback to string comparison for invalid CIDR + if client_ip.startswith(allowed_range): + allowed = True + break + + if not allowed: + logger.warning( + "IP not in whitelist", + client_ip=client_ip, + api_key_id=str(api_key_model.id), + user_id=api_key_model.user_id, + ) + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail={ + "error": "ip_not_authorized", + "message": "IP address not authorized" + }, + ) + except ValueError: + # Invalid IP address format + pass + # Store API key model in request state for other endpoints request.state.api_key_model = api_key_model - + return api_key +# Create typed dependency for API key requirement +RequiredAPIKey = Annotated[ + str, + Depends(require_api_key), + Doc("Validated API key from request") +] + + async def get_current_user( request: Request, - api_key: str = Depends(require_api_key), + api_key: RequiredAPIKey, ) -> dict: - """Get current user from validated API key.""" + """ + Get current user information from validated API key. + + Returns a dictionary containing user details, quotas, and usage statistics. + """ # Get API key model from request state (set by require_api_key) api_key_model = getattr(request.state, 'api_key_model', None) - + if not api_key_model: # Fallback for anonymous access return { @@ -134,7 +222,7 @@ async def get_current_user( "monthly_minutes": 100, }, } - + return { "id": api_key_model.user_id or f"api_key_{api_key_model.id}", "api_key_id": str(api_key_model.id), @@ -152,4 +240,27 @@ async def get_current_user( }, "expires_at": api_key_model.expires_at.isoformat() if api_key_model.expires_at else None, "is_admin": api_key_model.is_admin, - } \ No newline at end of file + } + + +# Create typed dependency for current user +CurrentUser = Annotated[ + dict, + Depends(get_current_user), + Doc("Current authenticated user information") +] + + +# Optional API key dependency (doesn't require authentication) +async def get_optional_api_key( + api_key: Annotated[Optional[str], Depends(get_api_key)] = None, +) -> Optional[str]: + """Get API key if provided, without requiring it.""" + return api_key + + +OptionalAuth = Annotated[ + Optional[str], + Depends(get_optional_api_key), + Doc("Optional API key for endpoints that support anonymous access") +] diff --git a/api/main.py b/api/main.py index 8e25aeb..7d2709f 100644 --- a/api/main.py +++ b/api/main.py @@ -71,14 +71,80 @@ async def lifespan(app: FastAPI): def create_application() -> FastAPI: """Create and configure FastAPI application with optimized settings.""" + + # OpenAPI tags for better documentation organization + openapi_tags = [ + { + "name": "health", + "description": "Health checks and service status endpoints", + }, + { + "name": "processing", + "description": "Media conversion, analysis, and streaming operations", + }, + { + "name": "jobs", + "description": "Job management - status, listing, cancellation", + }, + { + "name": "batch", + "description": "Batch processing operations for multiple files", + }, + { + "name": "authentication", + "description": "API key management and authentication", + }, + { + "name": "administration", + "description": "Administrative operations (requires admin privileges)", + }, + ] + application = FastAPI( title="Rendiff API", - description="Production-grade media processing API powered by FFmpeg for professional video workflows", + description=""" +# Rendiff - Production-Grade Media Processing API + +Powered by **FFmpeg** for professional video workflows with enterprise features. + +## Features + +- 🎬 **Video Conversion** - Support for all major formats (MP4, WebM, MOV, AVI, etc.) +- 📊 **Quality Analysis** - VMAF, PSNR, SSIM quality metrics +- 📡 **Adaptive Streaming** - HLS and DASH output generation +- ⚡ **Hardware Acceleration** - NVENC, QSV, VAAPI, VideoToolbox support +- 🔄 **Async Processing** - Queue-based job processing with real-time progress +- 🔐 **Enterprise Security** - API key authentication, rate limiting, IP whitelisting + +## Authentication + +All endpoints (except health checks) require authentication via API key: + +``` +X-API-Key: rnd_live_your_api_key_here +``` + +Or using Bearer token: + +``` +Authorization: Bearer rnd_live_your_api_key_here +``` + +## Rate Limits + +| Endpoint | Limit | +|----------|-------| +| `/convert` | 200 req/hour | +| `/analyze` | 100 req/hour | +| `/stream` | 50 req/hour | +| `/estimate` | 1000 req/hour | + """, version=settings.VERSION, docs_url="/docs" if settings.DEBUG else None, redoc_url="/redoc" if settings.DEBUG else None, openapi_url="/openapi.json" if settings.DEBUG else None, lifespan=lifespan, + openapi_tags=openapi_tags, contact={ "name": "Rendiff Team", "url": "https://rendiff.dev", @@ -88,6 +154,15 @@ def create_application() -> FastAPI: "name": "MIT License", "url": "https://github.com/rendiffdev/rendiff-dev/blob/main/LICENSE", }, + # FastAPI 0.124+ features + separate_input_output_schemas=True, # Generate separate schemas for request/response + swagger_ui_parameters={ + "deepLinking": True, + "persistAuthorization": True, + "displayRequestDuration": True, + "filter": True, + "syntaxHighlight.theme": "monokai", + }, ) # Configure middleware stack (order matters!) diff --git a/api/models/job.py b/api/models/job.py index 2e036b8..3471ca0 100644 --- a/api/models/job.py +++ b/api/models/job.py @@ -1,16 +1,22 @@ """ -Job models for database and API schemas +Job models for database and API schemas. + +Uses modern Pydantic 2.10+ patterns with: +- Enhanced Field documentation and examples +- JSON Schema customization for OpenAPI +- Strict validation modes +- Computed fields where appropriate """ from datetime import datetime from enum import Enum -from typing import Optional, Dict, Any, List +from typing import Optional, Dict, Any, List, Annotated from uuid import UUID, uuid4 -from sqlalchemy import Column, String, JSON, DateTime, Float, Integer, Boolean, Index, Text +from sqlalchemy import Column, String, JSON, DateTime, Float, Integer, Index from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.types import TypeDecorator, CHAR -import uuid -from pydantic import BaseModel, Field, ConfigDict +from pydantic import BaseModel, Field, ConfigDict, computed_field +from annotated_doc import Doc Base = declarative_base() @@ -21,10 +27,7 @@ class GUID(TypeDecorator): cache_ok = True def load_dialect_impl(self, dialect): - if dialect.name == 'postgresql': - return dialect.type_descriptor(CHAR(36)) - else: - return dialect.type_descriptor(CHAR(36)) + return dialect.type_descriptor(CHAR(36)) def process_bind_param(self, value, dialect): if value is None: @@ -42,7 +45,7 @@ def process_result_value(self, value, dialect): class JobStatus(str, Enum): - """Job status enumeration.""" + """Job processing status enumeration.""" QUEUED = "queued" PROCESSING = "processing" COMPLETED = "completed" @@ -51,66 +54,66 @@ class JobStatus(str, Enum): class JobPriority(str, Enum): - """Job priority levels.""" + """Job priority levels for queue ordering.""" LOW = "low" NORMAL = "normal" HIGH = "high" class Job(Base): - """Database model for jobs.""" + """Database model for processing jobs.""" __tablename__ = "jobs" - + id = Column(GUID(), primary_key=True, default=uuid4) status = Column(String, default=JobStatus.QUEUED, nullable=False, index=True) priority = Column(String, default=JobPriority.NORMAL, nullable=False) - + # Input/Output input_path = Column(String, nullable=False) output_path = Column(String, nullable=False) input_metadata = Column(JSON, default={}) output_metadata = Column(JSON, default={}) - + # Processing options options = Column(JSON, default={}) operations = Column(JSON, default=[]) - + # Progress tracking progress = Column(Float, default=0.0) stage = Column(String, default="queued") fps = Column(Float, nullable=True) eta_seconds = Column(Integer, nullable=True) - + # Quality metrics vmaf_score = Column(Float, nullable=True) psnr_score = Column(Float, nullable=True) ssim_score = Column(Float, nullable=True) - + # Timing created_at = Column(DateTime, default=datetime.utcnow, nullable=False) started_at = Column(DateTime, nullable=True) completed_at = Column(DateTime, nullable=True) - + # Error handling error_message = Column(String, nullable=True) error_details = Column(JSON, nullable=True) retry_count = Column(Integer, default=0) - + # Resource tracking worker_id = Column(String, nullable=True) processing_time = Column(Float, nullable=True) - - # API key tracking (optional) + + # API key tracking api_key = Column(String, nullable=True, index=True) - + # Webhook webhook_url = Column(String, nullable=True) webhook_events = Column(JSON, default=["complete", "error"]) - + # Batch processing batch_id = Column(String, nullable=True, index=True) batch_index = Column(Integer, nullable=True) - + # Indexes __table_args__ = ( Index("idx_job_status_created", "status", "created_at"), @@ -118,70 +121,409 @@ class Job(Base): ) -# Pydantic schemas for API +# ============================================================================= +# Pydantic schemas for API with FastAPI 0.124+ / Pydantic 2.10+ features +# ============================================================================= + class ConvertRequest(BaseModel): - """Request schema for conversion endpoint.""" - model_config = ConfigDict(extra="forbid") - - input: str | Dict[str, Any] - output: str | Dict[str, Any] - operations: List[Dict[str, Any]] = Field(default_factory=list) - options: Dict[str, Any] = Field(default_factory=dict) - priority: JobPriority = JobPriority.NORMAL - webhook_url: Optional[str] = None - webhook_events: List[str] = Field(default=["complete", "error"]) + """ + Request schema for media conversion endpoint. + + Supports flexible input/output specifications with various operations + and processing options. + """ + model_config = ConfigDict( + extra="forbid", + json_schema_extra={ + "examples": [ + { + "input": "/storage/input/video.mp4", + "output": "/storage/output/video.webm", + "operations": [ + {"type": "scale", "width": 1920, "height": 1080} + ], + "options": {"video_codec": "vp9", "audio_codec": "opus"}, + "priority": "normal" + } + ] + } + ) + + input: Annotated[ + str | Dict[str, Any], + Field( + description="Input file path or configuration object", + examples=["/storage/input/video.mp4", {"path": "/storage/video.mp4", "backend": "s3"}] + ), + Doc("Source media file path or storage configuration") + ] + + output: Annotated[ + str | Dict[str, Any], + Field( + description="Output file path or configuration object", + examples=["/storage/output/video.webm"] + ), + Doc("Destination path for processed media") + ] + + operations: Annotated[ + List[Dict[str, Any]], + Field( + default_factory=list, + description="List of video/audio processing operations", + examples=[ + [{"type": "scale", "width": 1920, "height": 1080}], + [{"type": "trim", "start": 10, "duration": 30}] + ] + ), + Doc("Processing operations to apply in sequence") + ] + + options: Annotated[ + Dict[str, Any], + Field( + default_factory=dict, + description="Additional processing options", + examples=[{"video_codec": "h264", "crf": 23}] + ), + Doc("Codec and quality settings for the output") + ] + + priority: Annotated[ + JobPriority, + Field( + default=JobPriority.NORMAL, + description="Processing priority level" + ), + Doc("Queue priority: low, normal, or high") + ] + + webhook_url: Annotated[ + Optional[str], + Field( + default=None, + description="URL for status webhook notifications", + examples=["https://api.example.com/webhooks/rendiff"] + ), + Doc("HTTPS URL to receive job status updates") + ] + + webhook_events: Annotated[ + List[str], + Field( + default=["complete", "error"], + description="Events that trigger webhook notifications" + ), + Doc("List of events: start, progress, complete, error") + ] class JobResponse(BaseModel): - """Response schema for job information.""" - model_config = ConfigDict(from_attributes=True) - - id: UUID - status: JobStatus - priority: JobPriority - progress: float - stage: str - created_at: datetime - started_at: Optional[datetime] = None - completed_at: Optional[datetime] = None - eta_seconds: Optional[int] = None - - # URLs for accessing job - links: Dict[str, str] = Field(default_factory=dict) - - # Error info if failed - error: Optional[Dict[str, Any]] = None - - # Progress details - progress_details: Optional[Dict[str, Any]] = None + """Response schema for job information with computed properties.""" + model_config = ConfigDict( + from_attributes=True, + json_schema_extra={ + "examples": [ + { + "id": "550e8400-e29b-41d4-a716-446655440000", + "status": "processing", + "priority": "normal", + "progress": 45.5, + "stage": "encoding", + "created_at": "2025-01-15T10:30:00Z", + "started_at": "2025-01-15T10:30:05Z", + "eta_seconds": 120, + "links": { + "self": "/api/v1/jobs/550e8400-e29b-41d4-a716-446655440000", + "events": "/api/v1/jobs/550e8400-e29b-41d4-a716-446655440000/events" + } + } + ] + } + ) + + id: Annotated[ + UUID, + Field(description="Unique job identifier"), + Doc("UUID v4 job identifier") + ] + + status: Annotated[ + JobStatus, + Field(description="Current job status"), + Doc("Processing status: queued, processing, completed, failed, cancelled") + ] + + priority: Annotated[ + JobPriority, + Field(description="Job priority level"), + Doc("Queue priority level") + ] + + progress: Annotated[ + float, + Field(ge=0, le=100, description="Processing progress percentage"), + Doc("Completion percentage (0-100)") + ] + + stage: Annotated[ + str, + Field(description="Current processing stage"), + Doc("Current stage: queued, downloading, encoding, uploading, complete") + ] + + created_at: Annotated[ + datetime, + Field(description="Job creation timestamp"), + Doc("ISO 8601 timestamp when job was created") + ] + + started_at: Annotated[ + Optional[datetime], + Field(default=None, description="Processing start timestamp"), + Doc("ISO 8601 timestamp when processing began") + ] + + completed_at: Annotated[ + Optional[datetime], + Field(default=None, description="Processing completion timestamp"), + Doc("ISO 8601 timestamp when processing finished") + ] + + eta_seconds: Annotated[ + Optional[int], + Field(default=None, ge=0, description="Estimated time remaining in seconds"), + Doc("Estimated seconds until completion") + ] + + links: Annotated[ + Dict[str, str], + Field(default_factory=dict, description="Related resource URLs"), + Doc("HATEOAS links to related resources") + ] + + error: Annotated[ + Optional[Dict[str, Any]], + Field(default=None, description="Error details if job failed"), + Doc("Error information including code and message") + ] + + progress_details: Annotated[ + Optional[Dict[str, Any]], + Field(default=None, description="Detailed progress information"), + Doc("Extended progress data including fps, bitrate, size") + ] + + @computed_field + @property + def is_complete(self) -> bool: + """Whether the job has finished (success or failure).""" + return self.status in [JobStatus.COMPLETED, JobStatus.FAILED, JobStatus.CANCELLED] + + @computed_field + @property + def duration_seconds(self) -> Optional[float]: + """Processing duration in seconds if completed.""" + if self.started_at and self.completed_at: + return (self.completed_at - self.started_at).total_seconds() + return None class JobProgress(BaseModel): - """Progress update schema.""" - percentage: float - stage: str - fps: Optional[float] = None - bitrate: Optional[str] = None - size_bytes: Optional[int] = None - time_elapsed: Optional[float] = None - eta_seconds: Optional[int] = None - - # Quality metrics if available - quality: Optional[Dict[str, float]] = None + """Real-time progress update schema for SSE/WebSocket.""" + model_config = ConfigDict( + json_schema_extra={ + "examples": [ + { + "percentage": 45.5, + "stage": "encoding", + "fps": 60.2, + "bitrate": "5.2 Mbps", + "size_bytes": 52428800, + "eta_seconds": 120 + } + ] + } + ) + + percentage: Annotated[ + float, + Field(ge=0, le=100, description="Progress percentage"), + Doc("Current completion percentage") + ] + + stage: Annotated[ + str, + Field(description="Current processing stage"), + Doc("Processing stage name") + ] + + fps: Annotated[ + Optional[float], + Field(default=None, ge=0, description="Current encoding FPS"), + Doc("Frames per second being processed") + ] + + bitrate: Annotated[ + Optional[str], + Field(default=None, description="Current bitrate"), + Doc("Output bitrate (e.g., '5.2 Mbps')") + ] + + size_bytes: Annotated[ + Optional[int], + Field(default=None, ge=0, description="Current output size in bytes"), + Doc("Bytes written so far") + ] + + time_elapsed: Annotated[ + Optional[float], + Field(default=None, ge=0, description="Elapsed processing time"), + Doc("Seconds since processing started") + ] + + eta_seconds: Annotated[ + Optional[int], + Field(default=None, ge=0, description="Estimated time remaining"), + Doc("Estimated seconds until completion") + ] + + quality: Annotated[ + Optional[Dict[str, float]], + Field(default=None, description="Real-time quality metrics"), + Doc("Quality scores if analysis is enabled") + ] class JobListResponse(BaseModel): - """Response for job listing.""" - jobs: List[JobResponse] - total: int - page: int - per_page: int - has_next: bool - has_prev: bool + """Paginated response for job listing with metadata.""" + model_config = ConfigDict( + json_schema_extra={ + "examples": [ + { + "jobs": [], + "total": 150, + "page": 1, + "per_page": 20, + "has_next": True, + "has_prev": False + } + ] + } + ) + + jobs: Annotated[ + List[JobResponse], + Field(description="List of jobs for current page"), + Doc("Array of job objects") + ] + + total: Annotated[ + int, + Field(ge=0, description="Total number of jobs matching query"), + Doc("Total count across all pages") + ] + + page: Annotated[ + int, + Field(ge=1, description="Current page number"), + Doc("Current page (1-indexed)") + ] + + per_page: Annotated[ + int, + Field(ge=1, le=100, description="Items per page"), + Doc("Number of items per page") + ] + + has_next: Annotated[ + bool, + Field(description="Whether more pages exist"), + Doc("True if there's a next page") + ] + + has_prev: Annotated[ + bool, + Field(description="Whether previous pages exist"), + Doc("True if there's a previous page") + ] + + @computed_field + @property + def total_pages(self) -> int: + """Total number of pages.""" + return max(1, (self.total + self.per_page - 1) // self.per_page) class JobCreateResponse(BaseModel): - """Response after creating a job.""" - job: JobResponse - estimated_cost: Optional[Dict[str, Any]] = None - warnings: List[str] = Field(default_factory=list) \ No newline at end of file + """Response after successfully creating a job.""" + model_config = ConfigDict( + json_schema_extra={ + "examples": [ + { + "job": { + "id": "550e8400-e29b-41d4-a716-446655440000", + "status": "queued", + "priority": "normal", + "progress": 0, + "stage": "queued", + "created_at": "2025-01-15T10:30:00Z" + }, + "estimated_cost": { + "processing_time": 120, + "credits": 0 + }, + "warnings": [] + } + ] + } + ) + + job: Annotated[ + JobResponse, + Field(description="Created job details"), + Doc("The newly created job object") + ] + + estimated_cost: Annotated[ + Optional[Dict[str, Any]], + Field(default=None, description="Estimated processing cost"), + Doc("Processing time and resource estimates") + ] + + warnings: Annotated[ + List[str], + Field(default_factory=list, description="Non-critical warnings"), + Doc("Warnings about the request (not errors)") + ] + + +class ErrorResponse(BaseModel): + """Standardized error response format.""" + model_config = ConfigDict( + json_schema_extra={ + "examples": [ + { + "error": { + "code": "validation_error", + "message": "Invalid input format", + "details": {"field": "input", "issue": "File not found"} + }, + "request_id": "req_abc123" + } + ] + } + ) + + error: Annotated[ + Dict[str, Any], + Field(description="Error information"), + Doc("Error details including code and message") + ] + + request_id: Annotated[ + Optional[str], + Field(default=None, description="Request identifier for debugging"), + Doc("Unique request ID for support reference") + ] diff --git a/api/routers/convert.py b/api/routers/convert.py index 2fc55fe..7e52548 100644 --- a/api/routers/convert.py +++ b/api/routers/convert.py @@ -1,33 +1,52 @@ """ -Convert endpoint - Main API for media conversion +Convert endpoint - Main API for media conversion. + +Uses FastAPI 0.124+ features including: +- Annotated dependencies with Doc +- Enhanced OpenAPI responses +- Typed dependency injection """ -from typing import Dict, Any +from typing import Dict, Any, Annotated from uuid import uuid4 -from fastapi import APIRouter, Depends, HTTPException, BackgroundTasks, Request -from sqlalchemy.ext.asyncio import AsyncSession +from fastapi import APIRouter, HTTPException, BackgroundTasks, Request, status +from annotated_doc import Doc import structlog from api.config import settings -from api.dependencies import get_db, require_api_key -from api.models.job import Job, JobStatus, ConvertRequest, JobCreateResponse, JobResponse +from api.dependencies import DatabaseSession, RequiredAPIKey +from api.models.job import Job, JobStatus, ConvertRequest, JobCreateResponse, JobResponse, ErrorResponse from api.services.queue import QueueService from api.services.storage import StorageService from api.utils.validators import validate_input_path, validate_output_path, validate_operations logger = structlog.get_logger() + router = APIRouter() queue_service = QueueService() storage_service = StorageService() -@router.post("/convert", response_model=JobCreateResponse) +@router.post( + "/convert", + response_model=JobCreateResponse, + status_code=status.HTTP_201_CREATED, + summary="Create media conversion job", + response_description="Job created successfully", + responses={ + 201: {"description": "Job created and queued for processing"}, + 400: {"model": ErrorResponse, "description": "Invalid request parameters"}, + 401: {"model": ErrorResponse, "description": "Authentication required"}, + 429: {"model": ErrorResponse, "description": "Rate limit or concurrent job limit exceeded"}, + 503: {"model": ErrorResponse, "description": "Service temporarily unavailable"}, + }, +) async def convert_media( - request: ConvertRequest, + request: Annotated[ConvertRequest, Doc("Media conversion job specification")], background_tasks: BackgroundTasks, - db: AsyncSession = Depends(get_db), - api_key: str = Depends(require_api_key), + db: DatabaseSession, + api_key: RequiredAPIKey, ) -> JobCreateResponse: """ Create a new media conversion job. @@ -195,12 +214,22 @@ async def convert_media( raise HTTPException(status_code=500, detail="Failed to create job") -@router.post("/analyze", response_model=JobCreateResponse) +@router.post( + "/analyze", + response_model=JobCreateResponse, + status_code=status.HTTP_201_CREATED, + summary="Analyze media quality metrics", + responses={ + 201: {"description": "Analysis job created"}, + 400: {"model": ErrorResponse, "description": "Invalid request"}, + 401: {"model": ErrorResponse, "description": "Authentication required"}, + }, +) async def analyze_media( - request: Dict[str, Any], + request: Annotated[Dict[str, Any], Doc("Media analysis request")], fastapi_request: Request, - db: AsyncSession = Depends(get_db), - api_key: str = Depends(require_api_key), + db: DatabaseSession, + api_key: RequiredAPIKey, ) -> JobCreateResponse: """ Analyze media file for quality metrics. @@ -225,11 +254,21 @@ async def analyze_media( return await convert_media(convert_request, BackgroundTasks(), db, api_key) -@router.post("/stream", response_model=JobCreateResponse) +@router.post( + "/stream", + response_model=JobCreateResponse, + status_code=status.HTTP_201_CREATED, + summary="Create adaptive streaming output", + responses={ + 201: {"description": "Streaming job created"}, + 400: {"model": ErrorResponse, "description": "Invalid request"}, + 401: {"model": ErrorResponse, "description": "Authentication required"}, + }, +) async def create_stream( - request: Dict[str, Any], - db: AsyncSession = Depends(get_db), - api_key: str = Depends(require_api_key), + request: Annotated[Dict[str, Any], Doc("Streaming format request (HLS/DASH)")], + db: DatabaseSession, + api_key: RequiredAPIKey, ) -> JobCreateResponse: """ Create adaptive streaming formats (HLS/DASH). @@ -261,10 +300,19 @@ async def create_stream( return await convert_media(convert_request, BackgroundTasks(), db, api_key) -@router.post("/estimate") +@router.post( + "/estimate", + summary="Estimate job processing time and resources", + response_description="Estimation results", + responses={ + 200: {"description": "Estimation successful"}, + 400: {"model": ErrorResponse, "description": "Invalid request"}, + 401: {"model": ErrorResponse, "description": "Authentication required"}, + }, +) async def estimate_job( - request: ConvertRequest, - api_key: str = Depends(require_api_key), + request: Annotated[ConvertRequest, Doc("Job to estimate")], + api_key: RequiredAPIKey, ) -> Dict[str, Any]: """ Estimate processing time and resources for a conversion job. diff --git a/docker/requirements-stable.txt b/docker/requirements-stable.txt index ea8e8e7..7605b94 100644 --- a/docker/requirements-stable.txt +++ b/docker/requirements-stable.txt @@ -1,13 +1,14 @@ # Stable dependency versions with known compatibility # This file pins specific versions to prevent build failures -# Core FastAPI Stack -fastapi==0.109.0 -uvicorn[standard]==0.25.0 -pydantic==2.5.3 -pydantic-settings==2.1.0 -python-multipart==0.0.6 -starlette==0.35.1 +# Core FastAPI Stack (Updated to FastAPI 0.124.0) +fastapi==0.124.0 +uvicorn[standard]==0.32.1 +pydantic==2.10.3 +pydantic-settings==2.7.0 +python-multipart==0.0.19 +starlette==0.45.2 +annotated-doc==0.1.0 # Database Stack (CRITICAL: These versions are tested for Python 3.12.7) sqlalchemy==2.0.25 diff --git a/requirements.txt b/requirements.txt index 07bf148..cc5a497 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,9 +1,10 @@ -# Core Framework - Latest Stable -fastapi==0.115.4 -uvicorn[standard]==0.32.0 -pydantic==2.9.2 -pydantic-settings==2.6.1 -python-multipart==0.0.17 +# Core Framework - Latest Stable (FastAPI 0.124.0) +fastapi==0.124.0 +uvicorn[standard]==0.32.1 +pydantic==2.10.3 +pydantic-settings==2.7.0 +python-multipart==0.0.19 +annotated-doc==0.1.0 # Database - Production Ready sqlalchemy[asyncio]==2.0.36