Skip to content

arch: ParallelProcessor reliability #148

@bbopen

Description

@bbopen

Overview

Two issues relate to ParallelProcessor worker management and cleanup. This tracking issue proposes improved lifecycle handling with isolated worker contexts.

Related Issues

Current Problems

  1. Blast radius - One worker error rejects ALL pending tasks
  2. Zombie intervals - Queue processing continues after dispose
  3. Hanging promises - Dispose clears maps without rejecting

Architectural Solution: Isolated Worker Contexts

┌─────────────────────────────────────────────────────────────┐
│                  ParallelProcessor                           │
├─────────────────────────────────────────────────────────────┤
│                                                              │
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐         │
│  │ Worker      │  │ Worker      │  │ Worker      │         │
│  │ Context 1   │  │ Context 2   │  │ Context 3   │         │
│  ├─────────────┤  ├─────────────┤  ├─────────────┤         │
│  │ pendingTasks│  │ pendingTasks│  │ pendingTasks│         │
│  │ Map<id, P>  │  │ Map<id, P>  │  │ Map<id, P>  │         │
│  └──────┬──────┘  └──────┬──────┘  └──────┬──────┘         │
│         │                │                │                 │
│         └────────────────┼────────────────┘                 │
│                          │                                  │
│                    ┌─────┴─────┐                            │
│                    │ Scheduler │                            │
│                    │ (unref'd) │                            │
│                    └───────────┘                            │
│                                                              │
└─────────────────────────────────────────────────────────────┘

Key Implementation Details

1. Isolated Worker Context (#65)

class WorkerContext {
  readonly id: string;
  private worker: Worker;
  private pendingTasks = new Map<string, PendingTask>();
  private disposed = false;
  
  async execute<T>(task: Task): Promise<T> {
    if (this.disposed) {
      throw new DisposedError('Worker context disposed');
    }
    
    const taskId = crypto.randomUUID();
    
    return new Promise<T>((resolve, reject) => {
      const timeoutId = setTimeout(() => {
        this.pendingTasks.delete(taskId);
        reject(new TimeoutError(`Task ${taskId} timed out`));
      }, task.timeoutMs);
      
      this.pendingTasks.set(taskId, {
        resolve: (value) => {
          clearTimeout(timeoutId);
          this.pendingTasks.delete(taskId);
          resolve(value as T);
        },
        reject: (error) => {
          clearTimeout(timeoutId);
          this.pendingTasks.delete(taskId);
          reject(error);
        },
        timeoutId,
      });
      
      try {
        this.worker.postMessage({ taskId, ...task });
      } catch (error) {
        this.pendingTasks.get(taskId)?.reject(error as Error);
      }
    });
  }
  
  handleWorkerError(error: Error): void {
    // Only reject THIS worker's tasks - not all tasks
    for (const [taskId, pending] of this.pendingTasks) {
      pending.reject(new WorkerError(this.id, error));
    }
    this.pendingTasks.clear();
  }
  
  dispose(): void {
    this.disposed = true;
    
    // Reject all pending with clear error
    for (const pending of this.pendingTasks.values()) {
      clearTimeout(pending.timeoutId);
      pending.reject(new DisposedError('Worker disposed'));
    }
    this.pendingTasks.clear();
    
    this.worker.terminate();
  }
}

2. ParallelProcessor with Graceful Dispose (#66)

class ParallelProcessor {
  private contexts: WorkerContext[] = [];
  private queueInterval?: NodeJS.Timeout;
  private taskQueue: Task[] = [];
  private disposed = false;
  private roundRobinIndex = 0;
  
  constructor(options: { workers: number }) {
    for (let i = 0; i < options.workers; i++) {
      this.contexts.push(new WorkerContext(`worker-${i}`));
    }
    
    // Start queue processor with unref() to not block exit
    this.queueInterval = setInterval(() => this.processQueue(), 10);
    this.queueInterval.unref();
  }
  
  async execute<T>(task: Task): Promise<T> {
    if (this.disposed) {
      throw new DisposedError('Processor disposed');
    }
    
    // Round-robin selection
    const context = this.contexts[this.roundRobinIndex];
    this.roundRobinIndex = (this.roundRobinIndex + 1) % this.contexts.length;
    
    return context.execute<T>(task);
  }
  
  async dispose(): Promise<void> {
    if (this.disposed) return;
    this.disposed = true;
    
    // 1. Stop queue processing
    if (this.queueInterval) {
      clearInterval(this.queueInterval);
      this.queueInterval = undefined;
    }
    
    // 2. Reject queued tasks
    for (const task of this.taskQueue) {
      task.reject?.(new DisposedError('Processor disposed'));
    }
    this.taskQueue.length = 0;
    
    // 3. Dispose all contexts (rejects their pending tasks)
    await Promise.all(this.contexts.map(c => c.dispose()));
    this.contexts.length = 0;
  }
}

3. Task-Worker Association

interface PendingTask {
  taskId: string;
  workerId: string;  // Track which worker owns the task
  resolve: (value: unknown) => void;
  reject: (error: Error) => void;
  timeoutId: NodeJS.Timeout;
}

// When worker errors, only reject its tasks
handleWorkerError(workerId: string, error: Error): void {
  const context = this.contexts.find(c => c.id === workerId);
  if (context) {
    context.handleWorkerError(error);
    // Optionally restart the worker
    this.replaceWorker(context);
  }
}

Testing Strategy

  • Test error isolation (one worker failure doesn't affect others)
  • Test dispose during active processing
  • Test memory leak scenarios
  • Test queue interval cleanup

Expectations

  • Worker crash only affects that worker's tasks
  • dispose() cleanly rejects all pending promises
  • No hanging promises after dispose
  • Process can exit cleanly (no zombie intervals)
  • Timeout cleanup removes activeTasks entries

Metadata

Metadata

Assignees

No one assigned

    Labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions