Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ S3_BUCKET_NAME=
S3_DEFAULT_DEPLOY_BUCKET=
JOB_MONITOR_INTERVAL=5

# slack webhook for notifications
# SLACK_WEBHOOK_URL=

## development flags ##
# enable the job monitor that updates the database. deploy seperately for prod.
DEV_LOCAL_JOB_MONITOR=True
Expand Down
1 change: 1 addition & 0 deletions app/core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ class Settings(BaseSettings):
MONGODB_DATABASE: str = "default"
# job monitor
JOB_MONITOR_INTERVAL: int = 2
SLACK_WEBHOOK_URL: str | None = None
DEV_LOCAL_JOB_MONITOR: bool = False
AWS_JOB_SYNC_INTERVAL: int = 60
# aws configuration
Expand Down
65 changes: 59 additions & 6 deletions app/core/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,15 @@
from app.core.config import settings
from app.database.db import db_manager
from app.schemas.db_schemas import JobStatus
from app.schemas.kubeflow_schemas import KubeflowStatusEnum, TrainingJobStatus
from app.schemas.kubeflow_schemas import (
KubeflowStatusEnum,
TrainingJobStatus,
DatabaseStatusEnum,
)
from app.utils.kf_config import kubeflow_api
from app.utils.S3Handler import s3_handler
from app.utils.kueue_helpers import get_kueue_queue, get_kubeflow_queue
from app.utils.slack_helpers import send_slack_notification

logger = logging.getLogger(__name__)

Expand All @@ -19,6 +24,8 @@ class JobMonitor:
def __init__(self):
self.stop_monitoring = False
self.monitoring_task = None
self.active_jobs = set()
self.notified_queued_jobs = set()

async def _get_queue_info(self) -> dict[str, int]:
"""Get queue information with fallback"""
Expand Down Expand Up @@ -128,13 +135,33 @@ async def monitor_jobs(self):
while not self.stop_monitoring:
try:
# Get current state from Kubeflow API
jobs = kubeflow_api.list_jobs(namespace=settings.NAMESPACE)
queue_positions = await self._get_queue_info()
api_jobs = {
job.metadata.name: job
for job in kubeflow_api.list_jobs(namespace=settings.NAMESPACE)
}
current_job_ids = set(api_jobs.keys())

# Find jobs that have disappeared
disappeared_jobs = self.active_jobs - current_job_ids
for job_id in disappeared_jobs:
job_info = await db_manager.get_job(job_id)
if (
job_info
and job_info.status not in TrainingJobStatus.running_states
):
logger.warning(
f"Job {job_id} disappeared from API, probably canceled by user."
)
await send_slack_notification(
f"Job `{job_info.status}`: `{job_info.job_name} ({job_id})` by `{job_info.user_id}`\nModel: `{job_info.model_name}` | Started: `{job_info.created_at}`"
)
self.notified_queued_jobs.discard(job_id)

for job in jobs:
job_id = job.metadata.name
self.active_jobs = current_job_ids
queue_positions = await self._get_queue_info()

if not job.status.conditions:
for job_id, job in api_jobs.items():
if not job.status or not job.status.conditions:
logger.warning(f"Job conditions not ready for {job_id}")
await asyncio.sleep(0.1)
continue
Expand All @@ -160,6 +187,31 @@ async def monitor_jobs(self):
logger.info(
f"Job {job_id} status changed from {prev_job_info.status} to {status}"
)
# Notify if job in queue
if (
TrainingJobStatus.map_status(status)
== DatabaseStatusEnum.queued
and job_id not in self.notified_queued_jobs
):
await send_slack_notification(
f"Job `{TrainingJobStatus.map_status(status)}`: `{prev_job_info.job_name} ({job_id})` by `{prev_job_info.user_id}`\nModel: `{prev_job_info.model_name}` | Queue Position: `{queue_positions.get(job_id, 'N/A')}`"
)
self.notified_queued_jobs.add(job_id)
# Notify on start
elif (
status == KubeflowStatusEnum.running
and prev_job_info.status.lower()
!= KubeflowStatusEnum.running.lower()
):
await send_slack_notification(
f"Job `{TrainingJobStatus.map_status(status)}`: `{prev_job_info.job_name} ({job_id})` by `{prev_job_info.user_id}`\nModel: `{prev_job_info.model_name}` | Started: `{prev_job_info.created_at}`"
)
# Notify on stop
elif status in TrainingJobStatus.stopped_states:
await send_slack_notification(
f"Job `{TrainingJobStatus.map_status(status)}`: `{prev_job_info.job_name} ({job_id})` by `{prev_job_info.user_id}`\nModel: `{prev_job_info.model_name}` | Started: `{prev_job_info.created_at}` | Ended: `{job.status.completion_time}`"
)
self.notified_queued_jobs.discard(job_id)

# Update status in database
job_info = await self._update_job_status(
Expand Down Expand Up @@ -188,6 +240,7 @@ async def monitor_jobs(self):
logger.error(
f"Job {job_id} failed. Manual investigation required."
)
self.active_jobs.discard(job_id)

except Exception as e:
logger.error(f"Error in job monitoring loop: {e}", exc_info=True)
Expand Down
31 changes: 31 additions & 0 deletions app/utils/slack_helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import httpx
import logging

from app.core.config import settings

logger = logging.getLogger(__name__)


async def send_slack_notification(message: str):
"""
Sends a notification to a Slack channel via a webhook.
"""
if not settings.SLACK_WEBHOOK_URL:
logger.debug("SLACK_WEBHOOK_URL not set, skipping notification.")
return

try:
async with httpx.AsyncClient() as client:
response = await client.post(
settings.SLACK_WEBHOOK_URL,
json={"text": message},
timeout=10.0,
)
response.raise_for_status()
logger.debug(f"Slack notification sent successfully: {message}")
except httpx.RequestError as e:
logger.error(f"Error sending Slack notification: {e}")
except Exception as e:
logger.error(
f"An unexpected error occurred while sending a Slack notification: {e}"
)