From cf00349de454054a2c29968cb5d1326c1314778e Mon Sep 17 00:00:00 2001 From: Saurabh Sharma Date: Sun, 14 Dec 2025 19:43:46 +0530 Subject: [PATCH 1/4] feat(task-manager): implement priority-based task scheduling --- bindu/server/task_manager.py | 41 +++++++++++++++++++++++++++++++----- 1 file changed, 36 insertions(+), 5 deletions(-) diff --git a/bindu/server/task_manager.py b/bindu/server/task_manager.py index 9cde0d44..e92df6b0 100644 --- a/bindu/server/task_manager.py +++ b/bindu/server/task_manager.py @@ -63,40 +63,71 @@ from __future__ import annotations +import asyncio import uuid from contextlib import AsyncExitStack from dataclasses import dataclass, field -from typing import Any +from enum import IntEnum +from typing import Any, Dict, Optional +from opentelemetry import trace from ..utils.logging import get_logger from .handlers import ContextHandlers, MessageHandlers, TaskHandlers from .notifications import PushNotificationManager -from .scheduler import Scheduler +from .scheduler import Scheduler, TaskSendParams from .storage import Storage from .workers import ManifestWorker +class TaskPriority(IntEnum): + """Task priority levels. + + Higher values indicate higher priority. + """ + LOW = 1 + NORMAL = 2 + HIGH = 3 + URGENT = 4 + logger = get_logger("pebbling.server.task_manager") @dataclass class TaskManager: - """A task manager responsible for managing tasks and coordinating the AI agent ecosystem.""" + """A task manager responsible for managing tasks and coordinating the AI agent ecosystem. + + Implements task prioritization and rate limiting to ensure fair resource allocation. + """ scheduler: Scheduler storage: Storage[Any] manifest: Any | None = None # AgentManifest for creating workers - + + # Priority queue configuration + max_concurrent_tasks: int = 100 # Maximum concurrent tasks _aexit_stack: AsyncExitStack | None = field(default=None, init=False) _workers: list[ManifestWorker] = field(default_factory=list, init=False) _push_manager: PushNotificationManager = field(init=False) _message_handlers: MessageHandlers = field(init=False) _task_handlers: TaskHandlers = field(init=False) _context_handlers: ContextHandlers = field(init=False) + + # Task queues by priority + _task_queues: Dict[TaskPriority, asyncio.Queue] = field(init=False) + _shutdown_event: asyncio.Event = field(init=False) def __post_init__(self) -> None: - """Initialize push notification manager after dataclass initialization.""" + """Initialize task manager components after dataclass initialization.""" self._push_manager = PushNotificationManager(manifest=self.manifest) + self._shutdown_event = asyncio.Event() + + # Initialize priority queues + self._task_queues = { + TaskPriority.URGENT: asyncio.Queue(), + TaskPriority.HIGH: asyncio.Queue(), + TaskPriority.NORMAL: asyncio.Queue(), + TaskPriority.LOW: asyncio.Queue(), + } async def __aenter__(self) -> TaskManager: """Initialize the task manager and start all components.""" From fb2083abb9d8f56c4cd9239fc5022b94b6add80a Mon Sep 17 00:00:00 2001 From: Saurabh Sharma Date: Sun, 14 Dec 2025 19:56:04 +0530 Subject: [PATCH 2/4] chore(task-manager): clean up imports and logger placement --- bindu/server/task_manager.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/bindu/server/task_manager.py b/bindu/server/task_manager.py index e92df6b0..02c9912a 100644 --- a/bindu/server/task_manager.py +++ b/bindu/server/task_manager.py @@ -68,7 +68,7 @@ from contextlib import AsyncExitStack from dataclasses import dataclass, field from enum import IntEnum -from typing import Any, Dict, Optional +from typing import Any, Dict from opentelemetry import trace @@ -79,6 +79,10 @@ from .storage import Storage from .workers import ManifestWorker + +logger = get_logger("pebbling.server.task_manager") + + class TaskPriority(IntEnum): """Task priority levels. @@ -89,8 +93,6 @@ class TaskPriority(IntEnum): HIGH = 3 URGENT = 4 -logger = get_logger("pebbling.server.task_manager") - @dataclass class TaskManager: From 7eba2d583159043b90e740a94978bb09257749db Mon Sep 17 00:00:00 2001 From: Saurabh Sharma Date: Sun, 14 Dec 2025 20:11:13 +0530 Subject: [PATCH 3/4] chore(task-manager): clean up imports and logger placement --- bindu/server/task_manager.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/bindu/server/task_manager.py b/bindu/server/task_manager.py index 02c9912a..baf25460 100644 --- a/bindu/server/task_manager.py +++ b/bindu/server/task_manager.py @@ -70,8 +70,6 @@ from enum import IntEnum from typing import Any, Dict -from opentelemetry import trace - from ..utils.logging import get_logger from .handlers import ContextHandlers, MessageHandlers, TaskHandlers from .notifications import PushNotificationManager From f96c00bebe9328db186a0a71ca809e2403555b73 Mon Sep 17 00:00:00 2001 From: Saurabh Sharma Date: Sun, 14 Dec 2025 20:13:06 +0530 Subject: [PATCH 4/4] chore(task-manager): clean up imports and logger placement --- bindu/server/task_manager.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/bindu/server/task_manager.py b/bindu/server/task_manager.py index baf25460..255a6e28 100644 --- a/bindu/server/task_manager.py +++ b/bindu/server/task_manager.py @@ -73,11 +73,10 @@ from ..utils.logging import get_logger from .handlers import ContextHandlers, MessageHandlers, TaskHandlers from .notifications import PushNotificationManager -from .scheduler import Scheduler, TaskSendParams +from .scheduler import Scheduler from .storage import Storage from .workers import ManifestWorker - logger = get_logger("pebbling.server.task_manager")