Skip to content

Commit 24b6187

Browse files
committed
Deep audit: Fix critical bugs and service initialization issues
Key fixes: - Fix service initialization: Use lazy imports in routers to avoid duplicate StorageService/QueueService instances that were never initialized - Fix worker/main.py: Replace settings.get() with getattr() for Pydantic Settings - Fix database password mismatch in compose.yml for worker services - Fix Docker resource limits in compose.override.yml (memory reservation < limit) - Add WORKER_TYPE setting to api/config.py - Add typing_extensions>=4.9.0 to requirements.txt - Add annotated_doc.py module for Doc annotation compatibility - Add batch processing migration (005_add_batch_columns.py) Affected routers updated to use get_storage_service()/get_queue_service(): - convert.py, batch.py, admin.py, health.py, jobs.py All changes tested with Docker Compose deployment.
1 parent c0915ac commit 24b6187

File tree

17 files changed

+418
-383
lines changed

17 files changed

+418
-383
lines changed

.dockerignore

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,8 @@
77
# Documentation
88
*.md
99
docs/
10-
*.txt
1110
LICENSE
11+
# Note: requirements.txt is NOT ignored (needed for build)
1212

1313
# Development files
1414
.vscode/
@@ -124,8 +124,11 @@ node_modules/
124124
# Optional npm cache directory
125125
.npm
126126

127-
# Storage and data directories (only for build context)
128-
storage/
127+
# Storage data directories (NOT the storage module)
128+
# Note: storage/ module is needed, storage/input, storage/output etc. are not
129+
storage/input/
130+
storage/output/
131+
storage/temp/
129132
data/
130133
tmp/
131134
temp/
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
"""Add batch_id and batch_index columns to jobs table
2+
3+
Revision ID: 005
4+
Revises: 004
5+
Create Date: 2025-01-20
6+
7+
"""
8+
from typing import Sequence, Union
9+
10+
from alembic import op
11+
import sqlalchemy as sa
12+
13+
14+
# revision identifiers, used by Alembic.
15+
revision: str = '005'
16+
down_revision: Union[str, None] = '004'
17+
branch_labels: Union[str, Sequence[str], None] = None
18+
depends_on: Union[str, Sequence[str], None] = None
19+
20+
21+
def upgrade() -> None:
22+
"""Add batch_id and batch_index columns to jobs table."""
23+
# Add batch_id column for batch processing
24+
op.add_column('jobs', sa.Column('batch_id', sa.String(), nullable=True))
25+
26+
# Add batch_index column for ordering within a batch
27+
op.add_column('jobs', sa.Column('batch_index', sa.Integer(), nullable=True))
28+
29+
# Create index for batch_id for faster batch queries
30+
op.create_index('ix_jobs_batch_id', 'jobs', ['batch_id'])
31+
32+
33+
def downgrade() -> None:
34+
"""Remove batch columns from jobs table."""
35+
op.drop_index('ix_jobs_batch_id', 'jobs')
36+
op.drop_column('jobs', 'batch_index')
37+
op.drop_column('jobs', 'batch_id')

annotated_doc.py

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
"""
2+
Annotated Doc compatibility module.
3+
4+
Provides the Doc annotation for FastAPI documentation.
5+
Uses typing_extensions.Doc when available (Python 3.9+ with typing_extensions >= 4.9.0),
6+
otherwise provides a simple fallback implementation.
7+
"""
8+
9+
try:
10+
from typing_extensions import Doc
11+
except ImportError:
12+
class Doc:
13+
"""
14+
Documentation annotation for Annotated types.
15+
16+
Used to provide documentation for type annotations in FastAPI endpoints.
17+
Falls back to a simple implementation if typing_extensions is not available.
18+
19+
Example:
20+
from typing import Annotated
21+
from annotated_doc import Doc
22+
23+
def endpoint(
24+
user_id: Annotated[str, Doc("The user's unique identifier")]
25+
):
26+
pass
27+
"""
28+
__slots__ = ('documentation',)
29+
30+
def __init__(self, documentation: str) -> None:
31+
self.documentation = documentation
32+
33+
def __repr__(self) -> str:
34+
return f"Doc({self.documentation!r})"
35+
36+
def __hash__(self) -> int:
37+
return hash(self.documentation)
38+
39+
def __eq__(self, other: object) -> bool:
40+
if isinstance(other, Doc):
41+
return self.documentation == other.documentation
42+
return NotImplemented
43+
44+
45+
__all__ = ['Doc']

api/config.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ class Settings(BaseSettings):
4949
TEMP_PATH: str = "/tmp/rendiff"
5050

5151
# Worker
52+
WORKER_TYPE: str = "cpu" # cpu, gpu, or analysis
5253
WORKER_CONCURRENCY: int = 4
5354
WORKER_PREFETCH_MULTIPLIER: int = 1
5455
WORKER_MAX_TASKS_PER_CHILD: int = 100

api/routers/admin.py

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,19 @@
1515
from api.config import settings
1616
from api.dependencies import DatabaseSession, require_api_key
1717
from api.models.job import Job, JobStatus, ErrorResponse
18-
from api.services.queue import QueueService
19-
from api.services.storage import StorageService
2018
from pydantic import BaseModel
2119

2220
logger = structlog.get_logger()
2321
router = APIRouter()
2422

25-
queue_service = QueueService()
26-
storage_service = StorageService()
23+
# Lazy import to avoid circular dependency
24+
def get_queue_service():
25+
from api.main import queue_service
26+
return queue_service
27+
28+
def get_storage_service():
29+
from api.main import storage_service
30+
return storage_service
2731

2832

2933
# Response models for OpenAPI documentation
@@ -124,7 +128,7 @@ async def get_workers_status(
124128
Only accessible with admin API key.
125129
"""
126130
try:
127-
workers = await queue_service.get_workers_status()
131+
workers = await get_queue_service().get_workers_status()
128132

129133
return WorkersStatusResponse(
130134
total_workers=len(workers),
@@ -169,7 +173,7 @@ async def get_storage_status(
169173
try:
170174
storage_status = {}
171175

172-
for name, backend in storage_service.backends.items():
176+
for name, backend in get_storage_service().backends.items():
173177
try:
174178
# Get backend-specific status
175179
backend_status = await backend.get_status()
@@ -186,8 +190,8 @@ async def get_storage_status(
186190

187191
return StorageStatusResponse(
188192
backends=storage_status,
189-
default_backend=storage_service.config.get("default_backend"),
190-
policies=storage_service.config.get("policies", {}),
193+
default_backend=get_storage_service().config.get("default_backend"),
194+
policies=get_storage_service().config.get("policies", {}),
191195
)
192196
except Exception as e:
193197
logger.error("Failed to get storage status", error=str(e))
@@ -267,8 +271,8 @@ async def get_system_stats(
267271
"avg_processing_time": sum(row.avg_time or 0 for row in job_stats) / len(job_stats) if job_stats else 0,
268272
"avg_vmaf_score": sum(row.avg_vmaf or 0 for row in job_stats if row.avg_vmaf) / sum(1 for row in job_stats if row.avg_vmaf) if any(row.avg_vmaf for row in job_stats) else None,
269273
},
270-
queue=await queue_service.get_queue_stats(),
271-
workers=await queue_service.get_workers_stats(),
274+
queue=await get_queue_service().get_queue_stats(),
275+
workers=await get_queue_service().get_workers_stats(),
272276
)
273277

274278
return stats
@@ -337,8 +341,8 @@ async def cleanup_old_jobs(
337341
try:
338342
# Delete output file if it exists
339343
if job.output_path:
340-
backend_name, file_path = storage_service.parse_uri(job.output_path)
341-
backend = storage_service.backends.get(backend_name)
344+
backend_name, file_path = get_storage_service().parse_uri(job.output_path)
345+
backend = get_storage_service().backends.get(backend_name)
342346
if backend:
343347
await backend.delete(file_path)
344348

api/routers/batch.py

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -13,17 +13,21 @@
1313
from api.config import settings
1414
from api.dependencies import DatabaseSession, RequiredAPIKey
1515
from api.models.job import Job, JobStatus, JobResponse, ErrorResponse
16-
from api.services.queue import QueueService
17-
from api.services.storage import StorageService
1816
from api.utils.validators import validate_input_path, validate_output_path, validate_operations
1917
from api.utils.media_validator import media_validator
2018
from pydantic import BaseModel, Field
2119

2220
logger = structlog.get_logger()
2321
router = APIRouter()
2422

25-
queue_service = QueueService()
26-
storage_service = StorageService()
23+
# Lazy import to avoid circular dependency
24+
def get_queue_service():
25+
from api.main import queue_service
26+
return queue_service
27+
28+
def get_storage_service():
29+
from api.main import storage_service
30+
return storage_service
2731

2832

2933
class BatchJob(BaseModel):
@@ -248,7 +252,7 @@ async def create_batch_job(
248252
await db.refresh(job)
249253

250254
# Queue the job
251-
await queue_service.enqueue_job(
255+
await get_queue_service().enqueue_job(
252256
job_id=str(job.id),
253257
priority=job_request.priority,
254258
)
@@ -487,9 +491,9 @@ async def cancel_batch(
487491
try:
488492
# Cancel job in queue
489493
if job.status == JobStatus.QUEUED:
490-
success = await queue_service.cancel_job(str(job.id))
494+
success = await get_queue_service().cancel_job(str(job.id))
491495
else: # PROCESSING
492-
success = await queue_service.cancel_running_job(
496+
success = await get_queue_service().cancel_running_job(
493497
str(job.id),
494498
job.worker_id or ""
495499
)

api/routers/convert.py

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,16 +16,21 @@
1616
from api.config import settings
1717
from api.dependencies import DatabaseSession, RequiredAPIKey
1818
from api.models.job import Job, JobStatus, ConvertRequest, JobCreateResponse, JobResponse, ErrorResponse
19-
from api.services.queue import QueueService
20-
from api.services.storage import StorageService
2119
from api.utils.validators import validate_input_path, validate_output_path, validate_operations
2220

2321
logger = structlog.get_logger()
2422

2523
router = APIRouter()
2624

27-
queue_service = QueueService()
28-
storage_service = StorageService()
25+
# Import services from main - they are initialized during app startup
26+
# Lazy import to avoid circular dependency
27+
def get_storage_service():
28+
from api.main import storage_service
29+
return storage_service
30+
31+
def get_queue_service():
32+
from api.main import queue_service
33+
return queue_service
2934

3035

3136
@router.post(
@@ -104,6 +109,7 @@ async def convert_media(
104109
output_path = request.output if isinstance(request.output, str) else request.output.get("path")
105110

106111
# Validate paths
112+
storage_service = get_storage_service()
107113
input_backend, input_validated = await validate_input_path(input_path, storage_service)
108114
output_backend, output_validated = await validate_output_path(output_path, storage_service)
109115

@@ -153,9 +159,10 @@ async def convert_media(
153159

154160
# Now we have a guaranteed unique job ID, queue it
155161
job_id_str = str(job.id)
156-
162+
157163
# Queue the job (do this before commit in case queuing fails)
158164
try:
165+
queue_service = get_queue_service()
159166
await queue_service.enqueue_job(
160167
job_id=job_id_str,
161168
priority=request.priority,

api/routers/health.py

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,19 @@
1515

1616
from api.config import settings
1717
from api.dependencies import DatabaseSession
18-
from api.services.queue import QueueService
19-
from api.services.storage import StorageService
2018

2119
logger = structlog.get_logger()
2220

2321
router = APIRouter()
2422

25-
queue_service = QueueService()
26-
storage_service = StorageService()
23+
# Lazy import to avoid circular dependency
24+
def get_queue_service():
25+
from api.main import queue_service
26+
return queue_service
27+
28+
def get_storage_service():
29+
from api.main import storage_service
30+
return storage_service
2731

2832

2933
# Response models for OpenAPI documentation
@@ -146,7 +150,7 @@ async def detailed_health_check(
146150

147151
# Check queue
148152
try:
149-
queue_health = await queue_service.health_check()
153+
queue_health = await get_queue_service().health_check()
150154
health_status["components"]["queue"] = queue_health
151155
except Exception as e:
152156
health_status["status"] = "unhealthy"
@@ -157,7 +161,7 @@ async def detailed_health_check(
157161

158162
# Check storage backends
159163
try:
160-
storage_health = await storage_service.health_check()
164+
storage_health = await get_storage_service().health_check()
161165
health_status["components"]["storage"] = storage_health
162166
except Exception as e:
163167
health_status["status"] = "unhealthy"
@@ -283,7 +287,7 @@ async def get_capabilities() -> Dict[str, Any]:
283287
"metrics": ["vmaf", "psnr", "ssim"],
284288
"probing": ["format", "streams", "metadata"],
285289
},
286-
"storage_backends": list(storage_service.backends.keys()),
290+
"storage_backends": list(get_storage_service().backends.keys()),
287291
"hardware_acceleration": {
288292
"available": await check_hardware_acceleration(),
289293
"types": ["nvidia", "vaapi", "qsv", "videotoolbox"],

api/routers/jobs.py

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,15 @@
1818
from api.config import settings
1919
from api.dependencies import DatabaseSession, RequiredAPIKey
2020
from api.models.job import Job, JobStatus, JobResponse, JobListResponse, JobProgress, ErrorResponse
21-
from api.services.queue import QueueService
2221

2322
logger = structlog.get_logger()
2423

2524
router = APIRouter()
2625

27-
queue_service = QueueService()
26+
# Lazy import to avoid circular dependency
27+
def get_queue_service():
28+
from api.main import queue_service
29+
return queue_service
2830

2931

3032
@router.get(
@@ -288,10 +290,10 @@ async def cancel_job(
288290

289291
# Cancel in queue
290292
if job.status == JobStatus.QUEUED:
291-
await queue_service.cancel_job(str(job_id))
293+
await get_queue_service().cancel_job(str(job_id))
292294
elif job.status == JobStatus.PROCESSING:
293295
# Send cancel signal to worker
294-
await queue_service.cancel_running_job(str(job_id), job.worker_id)
296+
await get_queue_service().cancel_running_job(str(job_id), job.worker_id)
295297

296298
# Update job status
297299
job.status = JobStatus.CANCELLED
@@ -471,7 +473,7 @@ async def get_job_logs(
471473

472474
if job.status == JobStatus.PROCESSING and job.worker_id:
473475
# Get live logs from worker
474-
logs = await queue_service.get_worker_logs(job.worker_id, str(job_id), lines)
476+
logs = await get_queue_service().get_worker_logs(job.worker_id, str(job_id), lines)
475477
else:
476478
# Get stored logs from database and log aggregation system
477479
from api.services.job_service import JobService

0 commit comments

Comments
 (0)