Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
129 changes: 121 additions & 8 deletions src/runtime/process-io.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ const DEFAULT_MAX_LINE_LENGTH = 100 * 1024 * 1024;
/** Maximum stderr bytes to retain for diagnostics: 8KB */
const MAX_STDERR_BYTES = 8 * 1024;

/** Default write queue timeout: 30 seconds */
const DEFAULT_WRITE_QUEUE_TIMEOUT_MS = 30_000;

/** Regex for ANSI escape sequences */
const ANSI_ESCAPE_RE = /\u001b\[[0-9;]*[A-Za-z]/g;

Expand Down Expand Up @@ -64,6 +67,9 @@ export interface ProcessIOOptions {

/** Restart process after N requests (0 = never). Default: 0 */
restartAfterRequests?: number;

/** Write queue timeout in milliseconds. Default: 30000ms */
writeQueueTimeoutMs?: number;
}

/**
Expand All @@ -82,6 +88,10 @@ interface QueuedWrite {
data: string;
resolve: () => void;
reject: (error: Error) => void;
/** Timestamp when the write was queued */
queuedAt: number;
/** Timeout handle for write queue timeout */
timeoutHandle?: NodeJS.Timeout;
}

// =============================================================================
Expand Down Expand Up @@ -142,6 +152,7 @@ export class ProcessIO extends BoundedContext implements Transport {
private readonly cwd: string | undefined;
private readonly maxLineLength: number;
private readonly restartAfterRequests: number;
private readonly writeQueueTimeoutMs: number;

// Process state
private process: ChildProcess | null = null;
Expand All @@ -155,6 +166,7 @@ export class ProcessIO extends BoundedContext implements Transport {
// Request tracking
private readonly pending = new Map<number, PendingRequest>();
private requestCount = 0;
private needsRestart = false;

// Write queue for backpressure
private readonly writeQueue: QueuedWrite[] = [];
Expand All @@ -174,6 +186,7 @@ export class ProcessIO extends BoundedContext implements Transport {
this.cwd = options.cwd;
this.maxLineLength = options.maxLineLength ?? DEFAULT_MAX_LINE_LENGTH;
this.restartAfterRequests = options.restartAfterRequests ?? 0;
this.writeQueueTimeoutMs = options.writeQueueTimeoutMs ?? DEFAULT_WRITE_QUEUE_TIMEOUT_MS;
}

// ===========================================================================
Expand Down Expand Up @@ -218,8 +231,8 @@ export class ProcessIO extends BoundedContext implements Transport {
throw new BridgeProtocolError('Message must contain an "id" field');
}

// Check for restart condition
if (this.restartAfterRequests > 0 && this.requestCount >= this.restartAfterRequests) {
// Check for restart condition (either scheduled restart or forced by stream error)
if (this.needsRestart || (this.restartAfterRequests > 0 && this.requestCount >= this.restartAfterRequests)) {
await this.restartProcess();
}

Expand Down Expand Up @@ -320,6 +333,7 @@ export class ProcessIO extends BoundedContext implements Transport {

// Clear write queue
for (const queued of this.writeQueue) {
this.clearQueuedWriteTimeout(queued);
queued.reject(error);
}
this.writeQueue.length = 0;
Expand Down Expand Up @@ -417,10 +431,12 @@ export class ProcessIO extends BoundedContext implements Transport {

if (this.process.stdout) {
this.process.stdout.on('data', this.handleStdoutData.bind(this));
this.process.stdout.on('error', this.handleStdoutError.bind(this));
}

if (this.process.stderr) {
this.process.stderr.on('data', this.handleStderrData.bind(this));
this.process.stderr.on('error', this.handleStderrError.bind(this));
}

if (this.process.stdin) {
Expand Down Expand Up @@ -493,15 +509,25 @@ export class ProcessIO extends BoundedContext implements Transport {
// Kill existing process
await this.killProcess();

// Clear buffers
// Clear buffers and restart flags
this.stdoutBuffer = '';
this.stderrBuffer = '';
this.requestCount = 0;
this.needsRestart = false;

// Spawn new process
await this.spawnProcess();
}

/**
* Mark the process for restart on the next send.
* This is called after stream errors to ensure the next request uses a fresh process.
* Works independently of restartAfterRequests setting.
*/
private markForRestart(): void {
this.needsRestart = true;
}

// ===========================================================================
// STREAM HANDLERS
// ===========================================================================
Expand Down Expand Up @@ -654,18 +680,83 @@ export class ProcessIO extends BoundedContext implements Transport {

// Reject all pending writes
for (const queued of this.writeQueue) {
this.clearQueuedWriteTimeout(queued);
queued.reject(error);
}
this.writeQueue.length = 0;

// Reject all pending requests
this.rejectAllPending(error);

// Mark for restart on next send
this.markForRestart();
}

/**
* Handle stdout error event.
* This can occur during pipe errors or when the process crashes.
*/
private handleStdoutError(err: Error): void {
const error = new BridgeProtocolError(`stdout error: ${err.message}`);
this.rejectAllPending(error);
this.markForRestart();
}

/**
* Handle stderr error event.
* This can occur during pipe errors or when the process crashes.
*/
private handleStderrError(err: Error): void {
// Stderr errors are less critical but still indicate process health issues
const error = new BridgeProtocolError(`stderr error: ${err.message}`);
this.rejectAllPending(error);
this.markForRestart();
}

// ===========================================================================
// WRITE MANAGEMENT
// ===========================================================================

/**
* Create a queued write entry with a timeout timer.
* The timer fires if the drain event never comes.
*/
private createQueuedWrite(
data: string,
resolve: () => void,
reject: (error: Error) => void
): QueuedWrite {
const queuedAt = Date.now();
const entry: QueuedWrite = { data, resolve, reject, queuedAt };

// Set up timeout timer that fires if drain never happens
entry.timeoutHandle = setTimeout(() => {
// Remove this entry from the queue
const index = this.writeQueue.indexOf(entry);
if (index !== -1) {
this.writeQueue.splice(index, 1);
reject(new BridgeTimeoutError(
`Write queue timeout: entry waited ${this.writeQueueTimeoutMs}ms without drain`
));
}
}, this.writeQueueTimeoutMs);

// Unref the timer so it doesn't keep the process alive
entry.timeoutHandle.unref();

return entry;
}

/**
* Clear the timeout for a queued write entry.
*/
private clearQueuedWriteTimeout(entry: QueuedWrite): void {
if (entry.timeoutHandle) {
clearTimeout(entry.timeoutHandle);
entry.timeoutHandle = undefined;
}
}

/**
* Write data to stdin with backpressure handling.
*/
Expand All @@ -677,8 +768,8 @@ export class ProcessIO extends BoundedContext implements Transport {
}

if (this.draining || this.writeQueue.length > 0) {
// Queue the write
this.writeQueue.push({ data, resolve, reject });
// Queue the write with timestamp and timeout timer
this.writeQueue.push(this.createQueuedWrite(data, resolve, reject));
return;
}

Expand All @@ -691,10 +782,11 @@ export class ProcessIO extends BoundedContext implements Transport {
} else {
// Backpressure - queue this write and set draining flag
this.draining = true;
this.writeQueue.push({ data, resolve, reject });
this.writeQueue.push(this.createQueuedWrite(data, resolve, reject));
}
} catch (err) {
// Synchronous write error (e.g., EPIPE)
this.markForRestart();
reject(new BridgeProtocolError(`Write error: ${err instanceof Error ? err.message : 'unknown'}`));
}
});
Expand All @@ -704,13 +796,17 @@ export class ProcessIO extends BoundedContext implements Transport {
* Flush queued writes when backpressure clears.
*/
private flushWriteQueue(): void {
const now = Date.now();

while (this.writeQueue.length > 0 && !this.draining) {
if (!this.process?.stdin || this.processExited) {
// Process died - reject all queued writes
for (const q of this.writeQueue) {
this.clearQueuedWriteTimeout(q);
q.reject(new BridgeProtocolError('Process stdin not available'));
}
this.writeQueue.length = 0;
this.markForRestart();
return;
}

Expand All @@ -719,14 +815,29 @@ export class ProcessIO extends BoundedContext implements Transport {
return;
}

// Clear the timeout since we're processing this entry now
this.clearQueuedWriteTimeout(queued);

// Check for write queue timeout (fallback check, timer should have handled this)
if (now - queued.queuedAt > this.writeQueueTimeoutMs) {
queued.reject(new BridgeTimeoutError(
`Write queue timeout: entry waited ${now - queued.queuedAt}ms (limit: ${this.writeQueueTimeoutMs}ms)`
));
continue; // Process next entry
}

try {
const canWrite = this.process.stdin.write(queued.data);

if (canWrite) {
queued.resolve();
} else {
// Still under pressure - put it back
this.writeQueue.unshift(queued);
// Still under pressure - put it back with a new timeout
this.writeQueue.unshift(this.createQueuedWrite(
queued.data,
queued.resolve,
queued.reject
));
this.draining = true;
return;
}
Expand All @@ -737,9 +848,11 @@ export class ProcessIO extends BoundedContext implements Transport {
);
queued.reject(error);
for (const q of this.writeQueue) {
this.clearQueuedWriteTimeout(q);
q.reject(error);
}
this.writeQueue.length = 0;
this.markForRestart();
return;
}
}
Expand Down
Loading