-
Notifications
You must be signed in to change notification settings - Fork 0
Open
Labels
area:runtime-nodeArea: Node runtime bridgeArea: Node runtime bridgeenhancementNew feature or requestNew feature or request
Description
Overview
Multiple issues relate to stream/IO handling in NodeBridge - EPIPE errors, backpressure, and recovery after write failures. This tracking issue proposes a robust ProcessIO abstraction.
Related Issues
- NodeBridge should reset process after stdin write failures #107 - NodeBridge should reset process after stdin write failures
- Handle stdio stream errors to avoid unhandled EPIPE crashes #91 - Handle stdio stream errors to avoid unhandled EPIPE crashes
- Handle stdin backpressure in NodeBridge and OptimizedNodeBridge #59 - Handle stdin backpressure in NodeBridge and OptimizedNodeBridge
Proposed Architecture
ProcessIO Wrapper
class ProcessIO {
private writeQueue: Array<{ data: string; resolve: () => void; reject: (e: Error) => void }> = [];
private draining = false;
constructor(
private stdin: Writable,
private onError: (error: Error) => void
) {
stdin.on('error', (err) => {
this.rejectPending(err);
this.onError(err);
});
stdin.on('drain', () => {
this.draining = false;
this.flush();
});
}
async write(data: string): Promise<void> {
if (!this.stdin.writable) {
throw new BridgeProtocolError('stdin not writable');
}
return new Promise((resolve, reject) => {
this.writeQueue.push({ data, resolve, reject });
this.flush();
});
}
private flush(): void {
if (this.draining) return;
while (this.writeQueue.length > 0) {
const item = this.writeQueue[0];
const ok = this.stdin.write(item.data);
if (!ok) {
this.draining = true;
return; // Wait for drain
}
this.writeQueue.shift();
item.resolve();
}
}
private rejectPending(error: Error): void {
for (const item of this.writeQueue) {
item.reject(error);
}
this.writeQueue = [];
}
destroy(): void {
this.rejectPending(new BridgeDisposedError('ProcessIO destroyed'));
this.stdin.destroy();
}
}Integration with NodeBridge
// In spawnProcess()
worker.io = new ProcessIO(
childProcess.stdin,
(error) => this.quarantineWorker(worker, error)
);
// In BridgeCore write callback
write: async (data: string) => {
await worker.io.write(data);
}Acceptance Criteria
- EPIPE and stream errors are caught and trigger quarantine
- Backpressure is handled via write queue
- Write failures reject pending writes and trigger recovery
- No unhandled stream errors crash the process
- All 3 related issues can be closed
Scope
This fix touches:
- New:
src/runtime/process-io.ts- ProcessIO wrapper src/runtime/node.ts- integrate ProcessIOsrc/runtime/bridge-core.ts- async write support
Metadata
Metadata
Assignees
Labels
area:runtime-nodeArea: Node runtime bridgeArea: Node runtime bridgeenhancementNew feature or requestNew feature or request