diff --git a/.env.example b/.env.example index 6490214..f0be270 100644 --- a/.env.example +++ b/.env.example @@ -35,6 +35,9 @@ S3_BUCKET_NAME= S3_DEFAULT_DEPLOY_BUCKET= JOB_MONITOR_INTERVAL=5 +# slack webhook for notifications +# SLACK_WEBHOOK_URL= + ## development flags ## # enable the job monitor that updates the database. deploy seperately for prod. DEV_LOCAL_JOB_MONITOR=True diff --git a/app/core/config.py b/app/core/config.py index 443a432..f9a306d 100644 --- a/app/core/config.py +++ b/app/core/config.py @@ -48,6 +48,7 @@ class Settings(BaseSettings): MONGODB_DATABASE: str = "default" # job monitor JOB_MONITOR_INTERVAL: int = 2 + SLACK_WEBHOOK_URL: str | None = None DEV_LOCAL_JOB_MONITOR: bool = False AWS_JOB_SYNC_INTERVAL: int = 60 # aws configuration diff --git a/app/core/monitor.py b/app/core/monitor.py index 4f78f22..e7f9c41 100644 --- a/app/core/monitor.py +++ b/app/core/monitor.py @@ -7,10 +7,15 @@ from app.core.config import settings from app.database.db import db_manager from app.schemas.db_schemas import JobStatus -from app.schemas.kubeflow_schemas import KubeflowStatusEnum, TrainingJobStatus +from app.schemas.kubeflow_schemas import ( + KubeflowStatusEnum, + TrainingJobStatus, + DatabaseStatusEnum, +) from app.utils.kf_config import kubeflow_api from app.utils.S3Handler import s3_handler from app.utils.kueue_helpers import get_kueue_queue, get_kubeflow_queue +from app.utils.slack_helpers import send_slack_notification logger = logging.getLogger(__name__) @@ -19,6 +24,8 @@ class JobMonitor: def __init__(self): self.stop_monitoring = False self.monitoring_task = None + self.active_jobs = set() + self.notified_queued_jobs = set() async def _get_queue_info(self) -> dict[str, int]: """Get queue information with fallback""" @@ -128,13 +135,33 @@ async def monitor_jobs(self): while not self.stop_monitoring: try: # Get current state from Kubeflow API - jobs = kubeflow_api.list_jobs(namespace=settings.NAMESPACE) - queue_positions = await self._get_queue_info() + api_jobs = { + job.metadata.name: job + for job in kubeflow_api.list_jobs(namespace=settings.NAMESPACE) + } + current_job_ids = set(api_jobs.keys()) + + # Find jobs that have disappeared + disappeared_jobs = self.active_jobs - current_job_ids + for job_id in disappeared_jobs: + job_info = await db_manager.get_job(job_id) + if ( + job_info + and job_info.status not in TrainingJobStatus.running_states + ): + logger.warning( + f"Job {job_id} disappeared from API, probably canceled by user." + ) + await send_slack_notification( + f"Job `{job_info.status}`: `{job_info.job_name} ({job_id})` by `{job_info.user_id}`\nModel: `{job_info.model_name}` | Started: `{job_info.created_at}`" + ) + self.notified_queued_jobs.discard(job_id) - for job in jobs: - job_id = job.metadata.name + self.active_jobs = current_job_ids + queue_positions = await self._get_queue_info() - if not job.status.conditions: + for job_id, job in api_jobs.items(): + if not job.status or not job.status.conditions: logger.warning(f"Job conditions not ready for {job_id}") await asyncio.sleep(0.1) continue @@ -160,6 +187,31 @@ async def monitor_jobs(self): logger.info( f"Job {job_id} status changed from {prev_job_info.status} to {status}" ) + # Notify if job in queue + if ( + TrainingJobStatus.map_status(status) + == DatabaseStatusEnum.queued + and job_id not in self.notified_queued_jobs + ): + await send_slack_notification( + f"Job `{TrainingJobStatus.map_status(status)}`: `{prev_job_info.job_name} ({job_id})` by `{prev_job_info.user_id}`\nModel: `{prev_job_info.model_name}` | Queue Position: `{queue_positions.get(job_id, 'N/A')}`" + ) + self.notified_queued_jobs.add(job_id) + # Notify on start + elif ( + status == KubeflowStatusEnum.running + and prev_job_info.status.lower() + != KubeflowStatusEnum.running.lower() + ): + await send_slack_notification( + f"Job `{TrainingJobStatus.map_status(status)}`: `{prev_job_info.job_name} ({job_id})` by `{prev_job_info.user_id}`\nModel: `{prev_job_info.model_name}` | Started: `{prev_job_info.created_at}`" + ) + # Notify on stop + elif status in TrainingJobStatus.stopped_states: + await send_slack_notification( + f"Job `{TrainingJobStatus.map_status(status)}`: `{prev_job_info.job_name} ({job_id})` by `{prev_job_info.user_id}`\nModel: `{prev_job_info.model_name}` | Started: `{prev_job_info.created_at}` | Ended: `{job.status.completion_time}`" + ) + self.notified_queued_jobs.discard(job_id) # Update status in database job_info = await self._update_job_status( @@ -188,6 +240,7 @@ async def monitor_jobs(self): logger.error( f"Job {job_id} failed. Manual investigation required." ) + self.active_jobs.discard(job_id) except Exception as e: logger.error(f"Error in job monitoring loop: {e}", exc_info=True) diff --git a/app/utils/slack_helpers.py b/app/utils/slack_helpers.py new file mode 100644 index 0000000..8f863d8 --- /dev/null +++ b/app/utils/slack_helpers.py @@ -0,0 +1,31 @@ +import httpx +import logging + +from app.core.config import settings + +logger = logging.getLogger(__name__) + + +async def send_slack_notification(message: str): + """ + Sends a notification to a Slack channel via a webhook. + """ + if not settings.SLACK_WEBHOOK_URL: + logger.debug("SLACK_WEBHOOK_URL not set, skipping notification.") + return + + try: + async with httpx.AsyncClient() as client: + response = await client.post( + settings.SLACK_WEBHOOK_URL, + json={"text": message}, + timeout=10.0, + ) + response.raise_for_status() + logger.debug(f"Slack notification sent successfully: {message}") + except httpx.RequestError as e: + logger.error(f"Error sending Slack notification: {e}") + except Exception as e: + logger.error( + f"An unexpected error occurred while sending a Slack notification: {e}" + )