From 96c65205697bef7037fe58958bdd90b8818bea4c Mon Sep 17 00:00:00 2001 From: Alexander Ivanov Date: Wed, 27 Mar 2024 22:08:47 +0300 Subject: [PATCH 1/2] Writable concept --- lib/utils/writer.js | 143 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 143 insertions(+) create mode 100644 lib/utils/writer.js diff --git a/lib/utils/writer.js b/lib/utils/writer.js new file mode 100644 index 0000000..66c0cce --- /dev/null +++ b/lib/utils/writer.js @@ -0,0 +1,143 @@ +'use strict'; + +const { READ_INDEX, WRITE_INDEX, READ_CYCLE, WRITE_CYCLE } = require('./config'); +const assert = require('node:assert'); + +const BYTE_SIZE = 4; +const EXTRA_BYTES = 2; +const SHARED_STATE_SIZE = 32; + +module.exports = class Writable extends require('node:events') { + _sharedBuffer = null; + _sharedState = null; + _extraBuffer = []; + _cycle = 0; + _write = 0; + + _ready = false; + _ended = false; + _closed = false; + _ending = false; + _errored = false; + _destroyed = false; + _flashing = false; + _finished = false; + _needDrain = false; + + constructor({ sharedState, sharedBuffer, maxMessageSize }) { + assert(sharedState && sharedState.byteLength >= SHARED_STATE_SIZE); + assert(maxMessageSize); + assert(sharedBuffer); + super(); + + this._sharedState = new Int32Array(sharedState); + this._sharedBuffer = Buffer.from(sharedBuffer); + this.writeSync = Write.bind(this, true); + this.write = Write.bind(this, false); + } + + store(data, isFinal) { + //! Data + EXTRA_BYTES <= this._sharedBuffer.byteLength + const written = this._buffer.write(data, this._write + BYTE_SIZE); + this._buffer.writeInt32LE(isFinal, this._write + written + BYTE_SIZE); + this._buffer.writeInt32LE(written, this._write); + this._write += BYTE_SIZE + written + BYTE_SIZE; + Atomics.store(this._sharedState, WRITE_INDEX, this._write); + Atomics.notify(this._sharedState, WRITE_INDEX); + } + + end() {} + destroy() { + this._destroyed = true; + } + + get writable() { + return !this._destroyed && !this._ending; + } + + get writableEnded() { + return this._ending; + } + + get writableFinished() { + return this._finished; + } + + get writableErrored() { + return this._errored; + } + + get ready() { + return this._ready; + } + + get closed() { + return this._closed; + } + + get writableNeedDrain() { + return this._needDrain; + } + + get writableObjectMode() { + return false; + } +}; + +function Write(sync, toWrite) { + const read = Atomics.load(this._sharedBuffer, READ_INDEX); + const cycle = Atomics.load(this._sharedBuffer, READ_CYCLE); + const isBehind = read > this._write || cycle < this._cycle; + let leftover = (isBehind ? read : this._sharedBuffer.length) - this._write; + if (leftover < 0) throw new Error('overwritten'); //? Should never happen + if (cycle > this._cycle) throw new Error('Read further than expected'); + leftover -= EXTRA_BYTES; + + if (leftover <= 0 && isBehind) { + var { async, value } = Atomics.waitAsync(this._sharedState, READ_INDEX, read); + if (!async) return Write.call(this, sync, toWrite); + if (sync) return wait(this._sharedState, read), Write.call(this, sync, toWrite); + value.then(() => { + if (!this.flush()) return; + this.write = Write.bind(this, sync); + this._needDrain = false; + this.emit('drain'); + }); + + this.write = FakeWrite; + this._needDrain = true; + return true; + } + + if (leftover <= 0 && !isBehind) { + this._write = 0; + Atomics.store(this._sharedState, WRITE_INDEX, this._write); + Atomics.store(this._sharedState, WRITE_CYCLE, ++this.cycle); + Atomics.notify(this._sharedState, WRITE_INDEX); + return Write.call(this, sync, toWrite, true); + } + + if (leftover < Buffer.byteLength(toWrite) + EXTRA_BYTES) { + this.store(toWrite.slice(0, leftover), 0); + return Write.call(this, sync, toWrite.slice(leftover)); + } + + this.store(toWrite); + return false; +} + +function FakeWrite(data) { + this._extraBuffer.push(data); +} + +const MAX_SPINS = 10; +const WAIT_TIMEOUT = 1000; +const ERROR_TOO_LONG = 'Wait took too long (10s)'; +function wait(state, origin) { + let spins = 0; + do { + var status = Atomics.wait(state, READ_INDEX, origin, WAIT_TIMEOUT); + var read = Atomics.load(state, READ_INDEX); + if (++spins === MAX_SPINS) throw new Error(ERROR_TOO_LONG); + } while (status !== 'ok' || origin !== read); +} From a061801ecd3182018a1a2f3d812b019859605b5b Mon Sep 17 00:00:00 2001 From: Alexander Ivanov Date: Thu, 28 Mar 2024 21:05:58 +0300 Subject: [PATCH 2/2] Writable --- lib/config.js | 20 ++++ lib/utils/writer.js | 143 ------------------------ lib/writable.js | 267 ++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 287 insertions(+), 143 deletions(-) create mode 100644 lib/config.js delete mode 100644 lib/utils/writer.js create mode 100644 lib/writable.js diff --git a/lib/config.js b/lib/config.js new file mode 100644 index 0000000..c8441dc --- /dev/null +++ b/lib/config.js @@ -0,0 +1,20 @@ +'use strict'; + +module.exports = { + READ_INDEX: 4, + READ_CYCLE: 8, + READ_PROCESS: 16, + + WRITE_INDEX: 20, + WRITE_CYCLE: 24, + WRITE_PROCESS: 28, + + EMPTY_SIGN: 0, + READY_SIGN: 1, + FINISHING_SIGN: -1, + FINISHED_SIGN: -2, + FAILED_SIGN: -3, + + PREFIX_SIZE: 4, + POSTFIX_SIZE: 1, +}; diff --git a/lib/utils/writer.js b/lib/utils/writer.js deleted file mode 100644 index 66c0cce..0000000 --- a/lib/utils/writer.js +++ /dev/null @@ -1,143 +0,0 @@ -'use strict'; - -const { READ_INDEX, WRITE_INDEX, READ_CYCLE, WRITE_CYCLE } = require('./config'); -const assert = require('node:assert'); - -const BYTE_SIZE = 4; -const EXTRA_BYTES = 2; -const SHARED_STATE_SIZE = 32; - -module.exports = class Writable extends require('node:events') { - _sharedBuffer = null; - _sharedState = null; - _extraBuffer = []; - _cycle = 0; - _write = 0; - - _ready = false; - _ended = false; - _closed = false; - _ending = false; - _errored = false; - _destroyed = false; - _flashing = false; - _finished = false; - _needDrain = false; - - constructor({ sharedState, sharedBuffer, maxMessageSize }) { - assert(sharedState && sharedState.byteLength >= SHARED_STATE_SIZE); - assert(maxMessageSize); - assert(sharedBuffer); - super(); - - this._sharedState = new Int32Array(sharedState); - this._sharedBuffer = Buffer.from(sharedBuffer); - this.writeSync = Write.bind(this, true); - this.write = Write.bind(this, false); - } - - store(data, isFinal) { - //! Data + EXTRA_BYTES <= this._sharedBuffer.byteLength - const written = this._buffer.write(data, this._write + BYTE_SIZE); - this._buffer.writeInt32LE(isFinal, this._write + written + BYTE_SIZE); - this._buffer.writeInt32LE(written, this._write); - this._write += BYTE_SIZE + written + BYTE_SIZE; - Atomics.store(this._sharedState, WRITE_INDEX, this._write); - Atomics.notify(this._sharedState, WRITE_INDEX); - } - - end() {} - destroy() { - this._destroyed = true; - } - - get writable() { - return !this._destroyed && !this._ending; - } - - get writableEnded() { - return this._ending; - } - - get writableFinished() { - return this._finished; - } - - get writableErrored() { - return this._errored; - } - - get ready() { - return this._ready; - } - - get closed() { - return this._closed; - } - - get writableNeedDrain() { - return this._needDrain; - } - - get writableObjectMode() { - return false; - } -}; - -function Write(sync, toWrite) { - const read = Atomics.load(this._sharedBuffer, READ_INDEX); - const cycle = Atomics.load(this._sharedBuffer, READ_CYCLE); - const isBehind = read > this._write || cycle < this._cycle; - let leftover = (isBehind ? read : this._sharedBuffer.length) - this._write; - if (leftover < 0) throw new Error('overwritten'); //? Should never happen - if (cycle > this._cycle) throw new Error('Read further than expected'); - leftover -= EXTRA_BYTES; - - if (leftover <= 0 && isBehind) { - var { async, value } = Atomics.waitAsync(this._sharedState, READ_INDEX, read); - if (!async) return Write.call(this, sync, toWrite); - if (sync) return wait(this._sharedState, read), Write.call(this, sync, toWrite); - value.then(() => { - if (!this.flush()) return; - this.write = Write.bind(this, sync); - this._needDrain = false; - this.emit('drain'); - }); - - this.write = FakeWrite; - this._needDrain = true; - return true; - } - - if (leftover <= 0 && !isBehind) { - this._write = 0; - Atomics.store(this._sharedState, WRITE_INDEX, this._write); - Atomics.store(this._sharedState, WRITE_CYCLE, ++this.cycle); - Atomics.notify(this._sharedState, WRITE_INDEX); - return Write.call(this, sync, toWrite, true); - } - - if (leftover < Buffer.byteLength(toWrite) + EXTRA_BYTES) { - this.store(toWrite.slice(0, leftover), 0); - return Write.call(this, sync, toWrite.slice(leftover)); - } - - this.store(toWrite); - return false; -} - -function FakeWrite(data) { - this._extraBuffer.push(data); -} - -const MAX_SPINS = 10; -const WAIT_TIMEOUT = 1000; -const ERROR_TOO_LONG = 'Wait took too long (10s)'; -function wait(state, origin) { - let spins = 0; - do { - var status = Atomics.wait(state, READ_INDEX, origin, WAIT_TIMEOUT); - var read = Atomics.load(state, READ_INDEX); - if (++spins === MAX_SPINS) throw new Error(ERROR_TOO_LONG); - } while (status !== 'ok' || origin !== read); -} diff --git a/lib/writable.js b/lib/writable.js new file mode 100644 index 0000000..4f11bb8 --- /dev/null +++ b/lib/writable.js @@ -0,0 +1,267 @@ +'use strict'; + +const assert = require('node:assert'); +const { READ_INDEX, READ_CYCLE, READ_PROCESS } = require('./config'); +const { WRITE_INDEX, WRITE_CYCLE, WRITE_PROCESS } = require('./config'); +const { READY_SIGN, EMPTY_SIGN, FAILED_SIGN, FINISHED_SIGN, FINISHING_SIGN } = require('./config'); +const { PREFIX_SIZE, POSTFIX_SIZE } = require('./config'); + +const EXTRA_SPACE = PREFIX_SIZE + POSTFIX_SIZE; +const SHARED_STATE_SIZE = 128; + +const READ_SPINS = 10; +const FINISH_SPINS = 10; +const SPIN_TIMEOUT = 1000; +const START_TIMEOUT = 5000; + +const TOO_LONG_READING = 'Reading process took too long (' + READ_SPINS + 's)'; +const READER_EXIT_BEFORE_SYNC = 'Failed to start - reader failed before synchronization'; +const READER_FAILED_TIMEOUT = 'Failed to start - reader timed out (' + START_TIMEOUT + 'ms)'; +const READER_EXIT_AT_SYNC = 'Failed to start - reader exited while synchronization'; +const FAILED_TO_FINISH_TIMEOUT = 'Failed to finish - timed out (' + FINISH_SPINS + ')s'; +const FAILED_TO_FINISH_READER = 'Failed to finish - reader exited with error'; +const READER_EXIT_WHILE_WATCH = 'Reader failed while watching'; + +module.exports = class Writable extends require('node:events') { + _sharedBuffer = null; + _sharedState = null; + _extraBuffer = []; + _process = EMPTY_SIGN; + _cycle = 0; + _write = 0; + + _ready = false; + _ended = false; + _closed = false; + _ending = false; + _errored = null; + _destroyed = false; + _flashing = false; + _finished = false; + _needDrain = false; + _watching = false; + + constructor({ sharedState, sharedBuffer }) { + assert(sharedState && sharedState.byteLength >= SHARED_STATE_SIZE); + assert(sharedBuffer); + super(); + + this._sharedState = new Int32Array(sharedState); + this._sharedBuffer = Buffer.from(sharedBuffer); + this.writeSync = Write.bind(this, true); + this.write = Write.bind(this, false); + // this._synchronize(); + } + + synchronize() { + this._process = READY_SIGN; + Atomics.store(this._sharedState, WRITE_PROCESS, this._process); + const status = Atomics.load(this._sharedState, READ_PROCESS); + if (status === READY_SIGN) { + this._ready = true; + this.emit('ready'); + return; + } + + if (status !== EMPTY_SIGN) return void this.destroy(new Error(READER_EXIT_BEFORE_SYNC)); + const result = Atomics.waitAsync(this._sharedState, READ_PROCESS, EMPTY_SIGN, START_TIMEOUT); + if (!result.async) { + if (result.value === 'not-equal') return void this._synchronize(); + return void this.destroy(new Error(READER_FAILED_TIMEOUT)); + } + + result.value.then(v => { + if (v !== 'ok') return void this.destroy(new Error(READER_FAILED_TIMEOUT)); + const status = Atomics.load(this._sharedState, READ_PROCESS); + if (status !== READY_SIGN) return void this.destroy(new Error(READER_EXIT_AT_SYNC)); + this._watching = true; + this._ready = true; + this.emit('ready'); + this._watch(); + }); + } + + _watch() { + if (!this._watching) return; + const status = Atomics.load(this._sharedState, READ_PROCESS); + + if (status === FINISHING_SIGN) { + if (!this._extraBuffer.length) return void this.end(); + this.once('drain', () => void this.end()); + } + + if (status === FAILED_SIGN || status === FINISHED_SIGN) { + return void this.destroy(new Error(READER_EXIT_WHILE_WATCH)); + } + + const { async, value } = Atomics.waitAsync(this._sharedState, READ_PROCESS, status); + if (!async) return void this._watch(); + value.then(() => this._watch()); + } + + _store(data, isNotFin = 0) { + //! Data + EXTRA_BYTES <= this._sharedBuffer.byteLength + const written = this._sharedBuffer.write(data, this._write + PREFIX_SIZE); + // process._rawDebug({ data, written, offset: this._write, fin: isNotFin }); + this._sharedBuffer.writeInt32LE(written, this._write); + this._sharedBuffer.writeUInt8(isNotFin, this._write + written + EXTRA_SPACE); + this._write += PREFIX_SIZE + written + POSTFIX_SIZE + 1; + Atomics.store(this._sharedState, WRITE_INDEX, this._write); + Atomics.notify(this._sharedState, WRITE_INDEX); + } + + _drain() { + do { + var item = this._extraBuffer.shift(); + var status = Write.call(this, false, item); + if (status) return false; + } while (this._extraBuffer.length); + + this.write = Write.bind(this, false); + this._needDrain = false; + this.emit('drain'); + return true; + } + + end() { + if (!this.writable) return; + this._process = FINISHING_SIGN; + this._watching = false; + this._ending = true; + + let status = Atomics.load(this._sharedState, READ_PROCESS); + if (status === READY_SIGN || status === EMPTY_SIGN || status === FINISHING_SIGN) { + Atomics.store(this._sharedState, WRITE_PROCESS, FINISHING_SIGN); + Atomics.notify(this._sharedState, WRITE_PROCESS); + } + + let spins = 0; + const origin = status; + while (status !== FAILED_SIGN || status !== FINISHED_SIGN) { + Atomics.wait(this._sharedState, READ_PROCESS, status, SPIN_TIMEOUT); + status = Atomics.load(this._sharedState, READ_PROCESS); + if (++spins === FINISH_SPINS) break; + } + + if (status === FAILED_SIGN || status === origin) { + this._ending = false; + if (status === origin) this.destroy(new Error(FAILED_TO_FINISH_TIMEOUT)); + else this.destroy(new Error(FAILED_TO_FINISH_READER)); + return; + } + + Atomics.store(this._sharedState, WRITE_PROCESS, FINISHED_SIGN); + Atomics.notify(this._sharedState, WRITE_PROCESS); + this._process = FINISHED_SIGN; + this._finished = true; + this._ended = true; + this.emit('finish'); + } + + destroy(err) { + if (!this.writable) return; + this._watching = false; + const readStatus = Atomics.load(this._sharedState, READ_PROCESS); + const writeStatus = Atomics.load(this._sharedState, WRITE_PROCESS); + const writeActive = writeStatus === READY_SIGN || writeStatus === EMPTY_SIGN; + const readActive = readStatus === READY_SIGN || readStatus === EMPTY_SIGN; + + if (writeActive && readActive) { + const sign = err ? FAILED_SIGN : FINISHED_SIGN; + Atomics.store(this._sharedState, WRITE_PROCESS, sign); + Atomics.notify(this._sharedState, WRITE_PROCESS); + } + + if (err) { + this._errored = err; + this.emit('error', err); + } + + this._destroyed = true; + this._closed = true; + this.emit('close'); + } + + get writable() { + return !this._destroyed && !this._ending; + } + + get writableEnded() { + return this._ending; + } + + get writableFinished() { + return this._finished; + } + + get writableErrored() { + return !!this._errored; + } + + get ready() { + return this._ready; + } + + get closed() { + return this._closed; + } + + get writableNeedDrain() { + return this._needDrain; + } + + // eslint-disable-next-line class-methods-use-this + get writableObjectMode() { + return false; + } +}; + +function Write(sync, toWrite) { + if (!this.writable) return false; + const read = Atomics.load(this._sharedState, READ_INDEX); + const cycle = Atomics.load(this._sharedState, READ_CYCLE); + const isBehind = read > this._write || cycle < this._cycle; + let leftover = (isBehind ? read : this._sharedBuffer.length) - this._write; + if (leftover < 0) throw new Error('overwritten'); //? Should never happen + if (cycle > this._cycle) throw new Error('Read further than expected'); + leftover -= EXTRA_SPACE + 1; + + if (leftover <= 0 && isBehind) { + if (sync) { + var spins = 0; + do { + var status = Atomics.wait(this._sharedState, READ_INDEX, read, SPIN_TIMEOUT); + if (++spins === READ_SPINS) throw new Error(TOO_LONG_READING); + } while (status === 'timed-out'); + return Write.call(this, false, toWrite); + } + + var { async, value } = Atomics.waitAsync(this._sharedState, READ_INDEX, read); + if (!async) return Write.call(this, sync, toWrite); + value.then(() => void this._drain()); + this.write = FakeWrite; + this._needDrain = true; + return FakeWrite.call(this, toWrite); + } + + if (leftover <= 0 && !isBehind) { + this._write = 0; + Atomics.store(this._sharedState, WRITE_INDEX, this._write); + Atomics.store(this._sharedState, WRITE_CYCLE, this._cycle++); + Atomics.notify(this._sharedState, WRITE_INDEX); + return Write.call(this, sync, toWrite); + } + + if (leftover < Buffer.byteLength(toWrite) + EXTRA_SPACE) { + this._store(toWrite.slice(0, leftover), 1); + return Write.call(this, sync, toWrite.slice(leftover)); + } + + this._store(toWrite); + return false; +} + +function FakeWrite(data) { + this._extraBuffer.push(data); + return true; +}