diff --git a/app/core/monitor.py b/app/core/monitor.py index d2ce8e9..4f78f22 100644 --- a/app/core/monitor.py +++ b/app/core/monitor.py @@ -19,9 +19,6 @@ class JobMonitor: def __init__(self): self.stop_monitoring = False self.monitoring_task = None - self._ignore_jobs = ( - set() - ) # Keep ignore list to avoid reprocessing completed jobs async def _get_queue_info(self) -> dict[str, int]: """Get queue information with fallback""" @@ -137,13 +134,9 @@ async def monitor_jobs(self): for job in jobs: job_id = job.metadata.name - # Skip previously completed jobs - if job_id in self._ignore_jobs: - logger.debug(f"ignoring job: {job_id}") - continue - if not job.status.conditions: logger.warning(f"Job conditions not ready for {job_id}") + await asyncio.sleep(0.1) continue conditions = job.status.conditions[-1] @@ -157,7 +150,8 @@ async def monitor_jobs(self): if status in TrainingJobStatus.stopped_states: job_info = await db_manager.get_job(job_id) if job_info and job_info.status == status: - self._ignore_jobs.add(job_id) + # ignore jobs that are already in stopped states + await asyncio.sleep(0.1) continue # check if state changed @@ -185,8 +179,6 @@ async def monitor_jobs(self): # Additional handling for completed jobs if is_completed: - self._ignore_jobs.add(job_id) - if status == KubeflowStatusEnum.succeeded: logger.info( f"Job {job_id} completed successfully, cleaning up" @@ -229,7 +221,6 @@ async def stop(self): except asyncio.CancelledError: pass finally: - self._ignore_jobs.clear() self.monitoring_task = None