Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 35 additions & 5 deletions bindu/server/task_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,12 @@

from __future__ import annotations

import asyncio
import uuid
from contextlib import AsyncExitStack
from dataclasses import dataclass, field
from typing import Any

from enum import IntEnum
from typing import Any, Dict

from ..utils.logging import get_logger
from .handlers import ContextHandlers, MessageHandlers, TaskHandlers
Expand All @@ -79,24 +80,53 @@
logger = get_logger("pebbling.server.task_manager")


class TaskPriority(IntEnum):
"""Task priority levels.

Higher values indicate higher priority.
"""
LOW = 1
NORMAL = 2
HIGH = 3
URGENT = 4


@dataclass
class TaskManager:
"""A task manager responsible for managing tasks and coordinating the AI agent ecosystem."""
"""A task manager responsible for managing tasks and coordinating the AI agent ecosystem.

Implements task prioritization and rate limiting to ensure fair resource allocation.
"""

scheduler: Scheduler
storage: Storage[Any]
manifest: Any | None = None # AgentManifest for creating workers


# Priority queue configuration
max_concurrent_tasks: int = 100 # Maximum concurrent tasks
_aexit_stack: AsyncExitStack | None = field(default=None, init=False)
_workers: list[ManifestWorker] = field(default_factory=list, init=False)
_push_manager: PushNotificationManager = field(init=False)
_message_handlers: MessageHandlers = field(init=False)
_task_handlers: TaskHandlers = field(init=False)
_context_handlers: ContextHandlers = field(init=False)

# Task queues by priority
_task_queues: Dict[TaskPriority, asyncio.Queue] = field(init=False)
_shutdown_event: asyncio.Event = field(init=False)

def __post_init__(self) -> None:
"""Initialize push notification manager after dataclass initialization."""
"""Initialize task manager components after dataclass initialization."""
self._push_manager = PushNotificationManager(manifest=self.manifest)
self._shutdown_event = asyncio.Event()

# Initialize priority queues
self._task_queues = {
TaskPriority.URGENT: asyncio.Queue(),
TaskPriority.HIGH: asyncio.Queue(),
TaskPriority.NORMAL: asyncio.Queue(),
TaskPriority.LOW: asyncio.Queue(),
}

async def __aenter__(self) -> TaskManager:
"""Initialize the task manager and start all components."""
Expand Down