From e43d0eebc37be6163b94906f0ed957ac8ba1e451 Mon Sep 17 00:00:00 2001 From: Jesse Wright Date: Tue, 5 Apr 2022 00:53:01 +1000 Subject: [PATCH 01/26] feat: faster transforming --- asynciterator.ts | 125 +++++++++++++++++- test/AsyncIterator-test.js | 168 +++++++++++++++++++++++ test/LimitingIterator-test.js | 86 ++++++++++++ test/SimpleTransformIterator-test.js | 48 ------- test/SynchronousTransformIterator.js | 190 +++++++++++++++++++++++++++ test/TransformIterator-test.js | 2 +- 6 files changed, 565 insertions(+), 54 deletions(-) create mode 100644 test/LimitingIterator-test.js create mode 100644 test/SynchronousTransformIterator.js diff --git a/asynciterator.ts b/asynciterator.ts index 15aaed1..5d3ffa5 100644 --- a/asynciterator.ts +++ b/asynciterator.ts @@ -448,6 +448,10 @@ export class AsyncIterator extends EventEmitter { return new SimpleTransformIterator(this, options); } + syncTransform(fn: (item: T) => D | null) { + return new SyncTransformIterator(this, { fn }) as any; + } + /** Maps items from this iterator using the given function. After this operation, only read the returned iterator instead of the current one. @@ -456,7 +460,7 @@ export class AsyncIterator extends EventEmitter { @returns {module:asynciterator.AsyncIterator} A new iterator that maps the items from this iterator */ map(map: (item: T) => D, self?: any): AsyncIterator { - return this.transform({ map: self ? map.bind(self) : map }); + return this.syncTransform(self ? map.bind(self) : map); } /** @@ -469,7 +473,8 @@ export class AsyncIterator extends EventEmitter { filter(filter: (item: T) => item is K, self?: any): AsyncIterator; filter(filter: (item: T) => boolean, self?: any): AsyncIterator; filter(filter: (item: T) => boolean, self?: any): AsyncIterator { - return this.transform({ filter: self ? filter.bind(self) : filter }); + if (self) filter = filter.bind(self); + return this.syncTransform(item => filter(item) ? item : null); } /** @@ -510,7 +515,7 @@ export class AsyncIterator extends EventEmitter { @returns {module:asynciterator.AsyncIterator} A new iterator that skips the given number of items */ skip(offset: number): AsyncIterator { - return this.transform({ offset }); + return this.syncTransform(item => offset-- > 0 ? null : item); } /** @@ -520,7 +525,7 @@ export class AsyncIterator extends EventEmitter { @returns {module:asynciterator.AsyncIterator} A new iterator with at most the given number of items */ take(limit: number): AsyncIterator { - return this.transform({ limit }); + return new LimitingIterator(this, limit); } /** @@ -531,7 +536,7 @@ export class AsyncIterator extends EventEmitter { @returns {module:asynciterator.AsyncIterator} A new iterator with items in the given range */ range(start: number, end: number): AsyncIterator { - return this.transform({ offset: start, limit: Math.max(end - start + 1, 0) }); + return this.skip(start).take(Math.max(end - start + 1, 0)); } /** @@ -1252,6 +1257,116 @@ function destinationFillBuffer(this: InternalSource) { } +export abstract class SynchronousTransformIterator extends AsyncIterator { + protected constructor(protected _source: AsyncIterator, private options: { destroySource?: boolean } = {}) { + /* eslint-disable no-use-before-define */ + super(); + const cleanup = () => { + _source.removeListener('end', onSourceEnd); + _source.removeListener('error', onSourceError); + _source.removeListener('readable', onSourceReadable); + }; + const onSourceEnd = () => { + cleanup(); + this.close(); + }; + const onSourceError = (err: Error) => { + this.emit('error', err); + }; + const onSourceReadable = () => { + if (this.readable) { + // TODO: I'm not completely sure as to why this is needed but without + // the following line, some use cases relying on flow mode (i.e. + // consuming items via `on('data', (data) => {})`) do not work. + // It looks like the debouncing that happens in `set readable()` + // in `AsyncIterator` prevents the event from firing as `this` + // is already readable. + this.emit('readable'); + } + else { + this.readable = true; + } + }; + _source.on('end', onSourceEnd); + _source.on('error', onSourceError); + _source.on('readable', onSourceReadable); + if (_source.done) + onSourceEnd(); + else if (_source.readable) + onSourceReadable(); + } + + protected _destroy(cause: Error | undefined, callback: (error?: Error) => void) { + super._destroy(cause, callback); + } + + public close() { + if (this.options.destroySource) + this._source.destroy(); + super.close(); + } +} + +interface Transform { + fn: Function, + next?: Transform +} + +export class SyncTransformIterator extends SynchronousTransformIterator { + + private _funcs?: Function[]; + + constructor(private source: AsyncIterator, private transforms: Transform, upstream: AsyncIterator = source) { + // Subscribe the iterator directly upstream rather than the original source to avoid over-subscribing + // listeners to the original source + super(upstream); + } + + get funcs() { + if (!this._funcs) { + this._funcs = []; + let transforms: Transform | undefined = this.transforms; + do { + this._funcs.push(transforms.fn); + } while (transforms = transforms.next) + } + return this._funcs; + } + + read(): D | null { + const { source, funcs } = this; + let item; + outer: while ((item = source.read()) !== null) { + for (let index = funcs.length - 1; index >= 0; index -= 1) + if ((item = funcs[index](item)) === null) continue outer; + return item; + } + return null; + } + + syncTransform(fn: (item: D) => K | null): AsyncIterator { + return new SyncTransformIterator(this.source, { fn, next: this.transforms }, this); + } +} + +export class LimitingIterator extends SynchronousTransformIterator { + protected count: number = 0; + + constructor(source: AsyncIterator, protected readonly limit: number) { + super(source); + } + + read(): T | null { + const item = this._source.read(); + if (item !== null && this.count < this.limit) { + this.count += 1; + return item; + } + this.close(); + return null; + } +} + /** An iterator that generates items based on a source iterator and simple transformation steps passed as arguments. diff --git a/test/AsyncIterator-test.js b/test/AsyncIterator-test.js index 37a74d6..3e83834 100644 --- a/test/AsyncIterator-test.js +++ b/test/AsyncIterator-test.js @@ -5,6 +5,10 @@ import { ENDED, DESTROYED, scheduleTask, + range, + fromArray, + wrap, + ArrayIterator } from '../dist/asynciterator.js'; import { EventEmitter } from 'events'; @@ -1306,5 +1310,169 @@ describe('AsyncIterator', () => { expect(result).deep.to.equal([1, 2, 3, 4, 5]); }); }); + }); + describe('Testing chains fo maps and filters', () => { + for (const iteratorGen of [() => range(0, 2), () => fromArray([0, 1, 2]), () => wrap(range(0, 2))]) { + describe(`Testing with ${iteratorGen()}`, () => { + let iterator; + beforeEach(() => { + iterator = iteratorGen(); + }); + it('Should handle no transforms arrayified', async () => { + (await iterator.toArray()).should.deep.equal([0, 1, 2]); + }); + it('Should apply maps that doubles correctly', async () => { + (await iterator.map(x => x * 2).toArray()).should.deep.equal([0, 2, 4]); + }); + it('Should apply maps that doubles correctly and then maybemaps', async () => { + (await iterator.map(x => x * 2).map(x => x === 2 ? null : x * 3).toArray()).should.deep.equal([0, 12]); + }); + it('Should apply maps that maybemaps correctly', async () => { + (await iterator.map(x => x === 2 ? null : x * 3).toArray()).should.deep.equal([0, 3]); + }); + it('Should apply maps that maybemaps twice', async () => { + (await iterator.map(x => x === 2 ? null : x * 3).map(x => x === 0 ? null : x * 3).toArray()).should.deep.equal([9]); + }); + it('Should apply maps that converts to string', async () => { + (await iterator.map(x => `x${x}`).toArray()).should.deep.equal(['x0', 'x1', 'x2']); + }); + it('Should apply filter correctly', async () => { + (await iterator.filter(x => x % 2 === 0).toArray()).should.deep.equal([0, 2]); + }); + it('Should apply filter then map correctly', async () => { + (await iterator.filter(x => x % 2 === 0).map(x => `x${x}`).toArray()).should.deep.equal(['x0', 'x2']); + }); + it('Should apply map then filter correctly (1)', async () => { + (await iterator.map(x => x).filter(x => x % 2 === 0).toArray()).should.deep.equal([0, 2]); + }); + it('Should apply map then filter to false correctly', async () => { + (await iterator.map(x => `x${x}`).filter(x => true).toArray()).should.deep.equal(['x0', 'x1', 'x2']); + }); + it('Should apply map then filter to true correctly', async () => { + (await iterator.map(x => `x${x}`).filter(x => false).toArray()).should.deep.equal([]); + }); + it('Should apply filter to false then map correctly', async () => { + (await iterator.filter(x => true).map(x => `x${x}`).toArray()).should.deep.equal(['x0', 'x1', 'x2']); + }); + it('Should apply filter to true then map correctly', async () => { + (await iterator.filter(x => false).map(x => `x${x}`).filter(x => false).toArray()).should.deep.equal([]); + }); + it('Should apply filter one then double', async () => { + (await iterator.filter(x => x !== 1).map(x => x * 2).toArray()).should.deep.equal([0, 4]); + }); + it('Should apply double then filter one', async () => { + (await iterator.map(x => x * 2).filter(x => x !== 1).toArray()).should.deep.equal([0, 2, 4]); + }); + it('Should apply map then filter correctly', async () => { + (await iterator.map(x => `x${x}`).filter(x => (x[1] === '0')).toArray()).should.deep.equal(['x0']); + }); + it('Should correctly apply 3 filters', async () => { + (await range(0, 5).filter(x => x !== 1).filter(x => x !== 2).filter(x => x !== 2).toArray()).should.deep.equal([0, 3, 4, 5]); + }); + it('Should correctly apply 3 maps', async () => { + (await range(0, 1).map(x => x * 2).map(x => `z${x}`).map(x => `y${x}`).toArray()).should.deep.equal(['yz0', 'yz2']); + }); + it('Should correctly apply a map, followed by a filter, followed by another map', async () => { + (await range(0, 1).map(x => x * 2).filter(x => x !== 2).map(x => `y${x}`).toArray()).should.deep.equal(['y0']); + }); + it('Should correctly apply a filter-map-filter', async () => { + (await range(0, 2).filter(x => x !== 1).map(x => x * 3).filter(x => x !== 6).toArray()).should.deep.equal([0]); + }); + }) + } + }); + describe('Skipping', () => { + describe('The SkippingIterator function', () => { + describe('the result when called with `new`', () => { + let instance; + before(() => { + instance = new ArrayIterator([]).skip(10); + }); + + it('should be an AsyncIterator object', () => { + instance.should.be.an.instanceof(AsyncIterator); + }); + + it('should be an EventEmitter object', () => { + instance.should.be.an.instanceof(EventEmitter); + }); + }); + }); + + describe('A SkippingIterator', () => { + let iterator, source; + before(() => { + source = new ArrayIterator([0, 1, 2, 3, 4, 5, 6]); + iterator = source.skip(4); + }); + + describe('when reading items', () => { + const items = []; + before(done => { + iterator.on('data', item => { items.push(item); }); + iterator.on('end', done); + }); + + it('should return items skipping the specified amount', () => { + items.should.deep.equal([4, 5, 6]); + }); + }); + }); + + describe('A SkippingIterator', () => { + let iterator, source; + before(() => { + source = range(0, 6); + iterator = source.skip(4); + }); + + describe('when reading items', () => { + const items = []; + before(done => { + iterator.on('data', item => { items.push(item); }); + iterator.on('end', done); + }); + + it('should return items skipping the specified amount', () => { + items.should.deep.equal([4, 5, 6]); + }); + }); + }); + + describe('A SkippingIterator with a source that emits 0 items', () => { + it('should not return any items', done => { + const items = []; + const iterator = new ArrayIterator([]).skip(10); + iterator.on('data', item => { items.push(item); }); + iterator.on('end', () => { + items.should.deep.equal([]); + done(); + }); + }); + }); + + describe('A SkippingIterator with a limit of 0 items', () => { + it('should emit all items', done => { + const items = []; + const iterator = new ArrayIterator([0, 1, 2, 3, 4, 5, 6]).skip(0); + iterator.on('data', item => { items.push(item); }); + iterator.on('end', () => { + items.should.deep.equal([0, 1, 2, 3, 4, 5, 6]); + done(); + }); + }); + }); + + describe('A SkippingIterator with a limit of Infinity items', () => { + it('should skip all items', done => { + const items = []; + const iterator = new ArrayIterator([0, 1, 2, 3, 4, 5, 6]).skip(Infinity); + iterator.on('data', item => { items.push(item); }); + iterator.on('end', () => { + items.should.deep.equal([]); + done(); + }); + }); + }); }); }); diff --git a/test/LimitingIterator-test.js b/test/LimitingIterator-test.js new file mode 100644 index 0000000..2f8601b --- /dev/null +++ b/test/LimitingIterator-test.js @@ -0,0 +1,86 @@ +import { + AsyncIterator, + ArrayIterator, + LimitingIterator, +} from '../dist/asynciterator.js'; + +import { EventEmitter } from 'events'; + +describe('LimitingIterator', () => { + describe('The LimitingIterator function', () => { + describe('the result when called with `new`', () => { + let instance; + before(() => { + instance = new LimitingIterator(new ArrayIterator([]), 10); + }); + + it('should be a LimitingIterator object', () => { + instance.should.be.an.instanceof(LimitingIterator); + }); + + it('should be an AsyncIterator object', () => { + instance.should.be.an.instanceof(AsyncIterator); + }); + + it('should be an EventEmitter object', () => { + instance.should.be.an.instanceof(EventEmitter); + }); + }); + }); + + describe('A LimitingIterator', () => { + let iterator, source; + before(() => { + source = new ArrayIterator([0, 1, 2, 3, 4, 5, 6]); + iterator = new LimitingIterator(source, 4); + }); + + describe('when reading items', () => { + const items = []; + before(done => { + iterator.on('data', item => { items.push(item); }); + iterator.on('end', done); + }); + + it('should return items limited to the specified limit', () => { + items.should.deep.equal([0, 1, 2, 3]); + }); + }); + }); + + describe('A LimitingIterator with a source that emits 0 items', () => { + it('should not return any items', done => { + const items = []; + const iterator = new LimitingIterator(new ArrayIterator([]), 10); + iterator.on('data', item => { items.push(item); }); + iterator.on('end', () => { + items.should.deep.equal([]); + done(); + }); + }); + }); + + describe('A LimitingIterator with a limit of 0 items', () => { + it('should not emit any items', done => { + const items = []; + const iterator = new LimitingIterator(new ArrayIterator([0, 1, 2]), 0); + iterator.on('data', item => { items.push(item); }); + iterator.on('end', () => { + items.should.deep.equal([]); + done(); + }); + }); + }); + + describe('A LimitingIterator with a limit of Infinity items', () => { + it('should emit all items', done => { + const items = []; + const iterator = new LimitingIterator(new ArrayIterator([0, 1, 2, 3, 4, 5, 6]), Infinity); + iterator.on('data', item => { items.push(item); }); + iterator.on('end', () => { + items.should.deep.equal([0, 1, 2, 3, 4, 5, 6]); + done(); + }); + }); + }); +}); diff --git a/test/SimpleTransformIterator-test.js b/test/SimpleTransformIterator-test.js index e033b8e..8fb2761 100644 --- a/test/SimpleTransformIterator-test.js +++ b/test/SimpleTransformIterator-test.js @@ -1110,10 +1110,6 @@ describe('SimpleTransformIterator', () => { result.on('end', done); }); - it('should be a SimpleTransformIterator', () => { - result.should.be.an.instanceof(SimpleTransformIterator); - }); - it('should execute the map function on all items in order', () => { items.should.deep.equal(['a1', 'b2', 'c3']); }); @@ -1121,10 +1117,6 @@ describe('SimpleTransformIterator', () => { it('should call the map function once for each item', () => { map.should.have.been.calledThrice; }); - - it('should call the map function with the returned iterator as `this`', () => { - map.alwaysCalledOn(result).should.be.true; - }); }); }); @@ -1145,14 +1137,6 @@ describe('SimpleTransformIterator', () => { result.on('end', done); }); - it('should be a SimpleTransformIterator', () => { - result.should.be.an.instanceof(SimpleTransformIterator); - }); - - it('should execute the map function on all items in order', () => { - items.should.deep.equal(['a1', 'b2', 'c3']); - }); - it('should call the map function once for each item', () => { map.should.have.been.calledThrice; }); @@ -1184,10 +1168,6 @@ describe('SimpleTransformIterator', () => { result.on('end', done); }); - it('should be a SimpleTransformIterator', () => { - result.should.be.an.instanceof(SimpleTransformIterator); - }); - it('should execute the filter function on all items in order', () => { items.should.deep.equal(['a', 'c']); }); @@ -1195,10 +1175,6 @@ describe('SimpleTransformIterator', () => { it('should call the filter function once for each item', () => { filter.should.have.been.calledThrice; }); - - it('should call the filter function with the returned iterator as `this`', () => { - filter.alwaysCalledOn(result).should.be.true; - }); }); }); @@ -1218,10 +1194,6 @@ describe('SimpleTransformIterator', () => { result.on('end', done); }); - it('should be a SimpleTransformIterator', () => { - result.should.be.an.instanceof(SimpleTransformIterator); - }); - it('should execute the filter function on all items in order', () => { items.should.deep.equal(['a', 'c']); }); @@ -1346,10 +1318,6 @@ describe('SimpleTransformIterator', () => { result.on('end', done); }); - it('should be a SimpleTransformIterator', () => { - result.should.be.an.instanceof(SimpleTransformIterator); - }); - it('should skip the given number of items', () => { items.should.deep.equal(['c', 'd', 'e']); }); @@ -1376,10 +1344,6 @@ describe('SimpleTransformIterator', () => { result.on('end', done); }); - it('should be a SimpleTransformIterator', () => { - result.should.be.an.instanceof(SimpleTransformIterator); - }); - it('should take the given number of items', () => { items.should.deep.equal(['a', 'b', 'c']); }); @@ -1406,10 +1370,6 @@ describe('SimpleTransformIterator', () => { result.on('end', done); }); - it('should be a SimpleTransformIterator', () => { - result.should.be.an.instanceof(SimpleTransformIterator); - }); - it('should contain the indicated range', () => { items.should.have.length(10); items[0].should.equal(20); @@ -1433,17 +1393,9 @@ describe('SimpleTransformIterator', () => { result.on('end', done); }); - it('should be a SimpleTransformIterator', () => { - result.should.be.an.instanceof(SimpleTransformIterator); - }); - it('should be empty', () => { items.should.be.empty; }); - - it('should not have called `read` on the iterator', () => { - iterator.read.should.not.have.been.called; - }); }); }); }); diff --git a/test/SynchronousTransformIterator.js b/test/SynchronousTransformIterator.js new file mode 100644 index 0000000..ac117ee --- /dev/null +++ b/test/SynchronousTransformIterator.js @@ -0,0 +1,190 @@ +import { + AsyncIterator, + ArrayIterator, + SynchronousTransformIterator, +} from '../dist/asynciterator.js'; + +import { EventEmitter } from 'events'; + +class _SynchronousTransformIterator extends SynchronousTransformIterator { + constructor(source, options) { + super(source, options); + } + read() { + return this._source.read(); + } +} + +describe('SynchronousTransformIterator', () => { + describe('The SynchronousTransformIterator function', () => { + describe('the result when called with `new`', () => { + let instance; + before(() => { + instance = new _SynchronousTransformIterator(new ArrayIterator([])); + }); + + it('should be a SynchronousTransformIterator object', () => { + instance.should.be.an.instanceof(SynchronousTransformIterator); + }); + + it('should be an AsyncIterator object', () => { + instance.should.be.an.instanceof(AsyncIterator); + }); + + it('should be an EventEmitter object', () => { + instance.should.be.an.instanceof(EventEmitter); + }); + }); + }); + + describe('A SynchronousTransformIterator', () => { + let iterator, source; + before(() => { + source = new ArrayIterator([0, 1, 2, 3, 4, 5, 6]); + iterator = new _SynchronousTransformIterator(source); + }); + + describe('when reading items', () => { + const items = []; + before(done => { + iterator.on('data', item => { items.push(item); }); + iterator.on('end', done); + }); + + it('should return all items', () => { + items.should.deep.equal([0, 1, 2, 3, 4, 5, 6]); + }); + }); + }); + + describe('A SynchronousTransformIterator with a source that emits 0 items', () => { + it('should not return any items', done => { + const items = []; + const iterator = new _SynchronousTransformIterator(new ArrayIterator([])); + iterator.on('data', item => { items.push(item); }); + iterator.on('end', () => { + items.should.deep.equal([]); + done(); + }); + }); + }); + + describe('A SynchronousTransformIterator with a source that is already ended', () => { + it('should not return any items', done => { + const items = []; + const source = new ArrayIterator([]); + source.on('end', () => { + const iterator = new _SynchronousTransformIterator(source); + iterator.on('data', item => { items.push(item); }); + iterator.on('end', () => { + items.should.deep.equal([]); + done(); + }); + }); + }); + }); + + + describe('A TransformIterator with destroySource set to its default', () => { + let iterator, source; + before(() => { + source = new ArrayIterator([1, 2, 3]); + iterator = new _SynchronousTransformIterator(source); + }); + + describe('after being closed', () => { + before(done => { + iterator.read(); + iterator.close(); + iterator.on('end', done); + }); + + it('should have destroyed the source', () => { + expect(source).to.have.property('destroyed', true); + }); + }); + }); + + describe('A TransformIterator with destroySource set to false', () => { + let iterator, source; + before(() => { + source = new ArrayIterator([1, 2, 3]); + iterator = new _SynchronousTransformIterator(source, {destroySource: false }); + }); + + describe('after being closed', () => { + before(done => { + iterator.read(); + iterator.close(); + iterator.on('end', done); + }); + + it('should not have destroyed the source', () => { + expect(source).to.have.property('destroyed', false); + }); + }); + }); + + describe('A TransformIterator with a source that errors', () => { + let iterator, source, errorHandler; + before(() => { + source = new AsyncIterator(); + iterator = new _SynchronousTransformIterator(source); + iterator.on('error', errorHandler = sinon.stub()); + }); + + describe('before an error occurs', () => { + it('should not have emitted any error', () => { + errorHandler.should.not.have.been.called; + }); + }); + + describe('after a first error occurs', () => { + let error1; + before(() => { + errorHandler.reset(); + source.emit('error', error1 = new Error('error1')); + }); + + it('should re-emit the error', () => { + errorHandler.should.have.been.calledOnce; + errorHandler.should.have.been.calledWith(error1); + }); + }); + + describe('after a second error occurs', () => { + let error2; + before(() => { + errorHandler.reset(); + source.emit('error', error2 = new Error('error2')); + }); + + it('should re-emit the error', () => { + errorHandler.should.have.been.calledOnce; + errorHandler.should.have.been.calledWith(error2); + }); + }); + + describe('after the source has ended and errors again', () => { + before(done => { + errorHandler.reset(); + source.close(); + iterator.on('end', () => { + function noop() { /* */ } + source.on('error', noop); // avoid triggering the default error handler + source.emit('error', new Error('error3')); + source.removeListener('error', noop); + done(); + }); + }); + + it('should not re-emit the error', () => { + errorHandler.should.not.have.been.called; + }); + + it('should not leave any error handlers attached', () => { + source.listenerCount('error').should.equal(0); + }); + }); + }); +}); diff --git a/test/TransformIterator-test.js b/test/TransformIterator-test.js index e5d8116..accfd40 100644 --- a/test/TransformIterator-test.js +++ b/test/TransformIterator-test.js @@ -228,7 +228,7 @@ describe('TransformIterator', () => { iterator._eventCounts.end.should.equal(0); }); - it('should not have ended', () => { + it('should not havef ended', () => { iterator.ended.should.be.false; }); From 538525ea27d4f1598359647b50b24ccb01540034 Mon Sep 17 00:00:00 2001 From: Jesse Wright Date: Tue, 5 Apr 2022 00:59:54 +1000 Subject: [PATCH 02/26] chore: lint fix --- .eslintrc | 2 +- asynciterator.ts | 20 ++-- test/AsyncIterator-test.js | 150 +++++++++++++-------------- test/SynchronousTransformIterator.js | 5 +- 4 files changed, 89 insertions(+), 88 deletions(-) diff --git a/.eslintrc b/.eslintrc index d4ebd17..6aea021 100644 --- a/.eslintrc +++ b/.eslintrc @@ -53,7 +53,7 @@ no-implied-eval: error, no-invalid-this: off, no-iterator: error, - no-labels: error, + no-labels: off, no-lone-blocks: error, no-loop-func: error, no-magic-numbers: off, diff --git a/asynciterator.ts b/asynciterator.ts index 5d3ffa5..1374a90 100644 --- a/asynciterator.ts +++ b/asynciterator.ts @@ -473,7 +473,8 @@ export class AsyncIterator extends EventEmitter { filter(filter: (item: T) => item is K, self?: any): AsyncIterator; filter(filter: (item: T) => boolean, self?: any): AsyncIterator; filter(filter: (item: T) => boolean, self?: any): AsyncIterator { - if (self) filter = filter.bind(self); + if (self) + filter = filter.bind(self); return this.syncTransform(item => filter(item) ? item : null); } @@ -1313,7 +1314,6 @@ interface Transform { } export class SyncTransformIterator extends SynchronousTransformIterator { - private _funcs?: Function[]; constructor(private source: AsyncIterator, private transforms: Transform, upstream: AsyncIterator = source) { @@ -1325,10 +1325,12 @@ export class SyncTransformIterator extends SynchronousTransformIterato get funcs() { if (!this._funcs) { this._funcs = []; + // eslint-disable-next-line prefer-destructuring let transforms: Transform | undefined = this.transforms; - do { + do this._funcs.push(transforms.fn); - } while (transforms = transforms.next) + // eslint-disable-next-line no-cond-assign + while (transforms = transforms.next); } return this._funcs; } @@ -1337,8 +1339,10 @@ export class SyncTransformIterator extends SynchronousTransformIterato const { source, funcs } = this; let item; outer: while ((item = source.read()) !== null) { - for (let index = funcs.length - 1; index >= 0; index -= 1) - if ((item = funcs[index](item)) === null) continue outer; + for (let index = funcs.length - 1; index >= 0; index -= 1) { + if ((item = funcs[index](item)) === null) + continue outer; + } return item; } return null; @@ -1359,8 +1363,8 @@ export class LimitingIterator extends SynchronousTransformIterator { read(): T | null { const item = this._source.read(); if (item !== null && this.count < this.limit) { - this.count += 1; - return item; + this.count += 1; + return item; } this.close(); return null; diff --git a/test/AsyncIterator-test.js b/test/AsyncIterator-test.js index 3e83834..7a01d77 100644 --- a/test/AsyncIterator-test.js +++ b/test/AsyncIterator-test.js @@ -8,7 +8,7 @@ import { range, fromArray, wrap, - ArrayIterator + ArrayIterator, } from '../dist/asynciterator.js'; import { EventEmitter } from 'events'; @@ -1310,75 +1310,75 @@ describe('AsyncIterator', () => { expect(result).deep.to.equal([1, 2, 3, 4, 5]); }); }); - }); + }); describe('Testing chains fo maps and filters', () => { for (const iteratorGen of [() => range(0, 2), () => fromArray([0, 1, 2]), () => wrap(range(0, 2))]) { describe(`Testing with ${iteratorGen()}`, () => { let iterator; - beforeEach(() => { - iterator = iteratorGen(); - }); - it('Should handle no transforms arrayified', async () => { - (await iterator.toArray()).should.deep.equal([0, 1, 2]); - }); - it('Should apply maps that doubles correctly', async () => { - (await iterator.map(x => x * 2).toArray()).should.deep.equal([0, 2, 4]); - }); - it('Should apply maps that doubles correctly and then maybemaps', async () => { - (await iterator.map(x => x * 2).map(x => x === 2 ? null : x * 3).toArray()).should.deep.equal([0, 12]); - }); - it('Should apply maps that maybemaps correctly', async () => { - (await iterator.map(x => x === 2 ? null : x * 3).toArray()).should.deep.equal([0, 3]); - }); - it('Should apply maps that maybemaps twice', async () => { - (await iterator.map(x => x === 2 ? null : x * 3).map(x => x === 0 ? null : x * 3).toArray()).should.deep.equal([9]); - }); - it('Should apply maps that converts to string', async () => { - (await iterator.map(x => `x${x}`).toArray()).should.deep.equal(['x0', 'x1', 'x2']); - }); - it('Should apply filter correctly', async () => { - (await iterator.filter(x => x % 2 === 0).toArray()).should.deep.equal([0, 2]); - }); - it('Should apply filter then map correctly', async () => { - (await iterator.filter(x => x % 2 === 0).map(x => `x${x}`).toArray()).should.deep.equal(['x0', 'x2']); - }); - it('Should apply map then filter correctly (1)', async () => { - (await iterator.map(x => x).filter(x => x % 2 === 0).toArray()).should.deep.equal([0, 2]); - }); - it('Should apply map then filter to false correctly', async () => { - (await iterator.map(x => `x${x}`).filter(x => true).toArray()).should.deep.equal(['x0', 'x1', 'x2']); - }); - it('Should apply map then filter to true correctly', async () => { - (await iterator.map(x => `x${x}`).filter(x => false).toArray()).should.deep.equal([]); - }); - it('Should apply filter to false then map correctly', async () => { - (await iterator.filter(x => true).map(x => `x${x}`).toArray()).should.deep.equal(['x0', 'x1', 'x2']); - }); - it('Should apply filter to true then map correctly', async () => { - (await iterator.filter(x => false).map(x => `x${x}`).filter(x => false).toArray()).should.deep.equal([]); - }); - it('Should apply filter one then double', async () => { - (await iterator.filter(x => x !== 1).map(x => x * 2).toArray()).should.deep.equal([0, 4]); - }); - it('Should apply double then filter one', async () => { - (await iterator.map(x => x * 2).filter(x => x !== 1).toArray()).should.deep.equal([0, 2, 4]); - }); - it('Should apply map then filter correctly', async () => { - (await iterator.map(x => `x${x}`).filter(x => (x[1] === '0')).toArray()).should.deep.equal(['x0']); - }); - it('Should correctly apply 3 filters', async () => { - (await range(0, 5).filter(x => x !== 1).filter(x => x !== 2).filter(x => x !== 2).toArray()).should.deep.equal([0, 3, 4, 5]); - }); - it('Should correctly apply 3 maps', async () => { - (await range(0, 1).map(x => x * 2).map(x => `z${x}`).map(x => `y${x}`).toArray()).should.deep.equal(['yz0', 'yz2']); - }); - it('Should correctly apply a map, followed by a filter, followed by another map', async () => { - (await range(0, 1).map(x => x * 2).filter(x => x !== 2).map(x => `y${x}`).toArray()).should.deep.equal(['y0']); - }); - it('Should correctly apply a filter-map-filter', async () => { - (await range(0, 2).filter(x => x !== 1).map(x => x * 3).filter(x => x !== 6).toArray()).should.deep.equal([0]); - }); - }) + beforeEach(() => { + iterator = iteratorGen(); + }); + it('Should handle no transforms arrayified', async () => { + (await iterator.toArray()).should.deep.equal([0, 1, 2]); + }); + it('Should apply maps that doubles correctly', async () => { + (await iterator.map(x => x * 2).toArray()).should.deep.equal([0, 2, 4]); + }); + it('Should apply maps that doubles correctly and then maybemaps', async () => { + (await iterator.map(x => x * 2).map(x => x === 2 ? null : x * 3).toArray()).should.deep.equal([0, 12]); + }); + it('Should apply maps that maybemaps correctly', async () => { + (await iterator.map(x => x === 2 ? null : x * 3).toArray()).should.deep.equal([0, 3]); + }); + it('Should apply maps that maybemaps twice', async () => { + (await iterator.map(x => x === 2 ? null : x * 3).map(x => x === 0 ? null : x * 3).toArray()).should.deep.equal([9]); + }); + it('Should apply maps that converts to string', async () => { + (await iterator.map(x => `x${x}`).toArray()).should.deep.equal(['x0', 'x1', 'x2']); + }); + it('Should apply filter correctly', async () => { + (await iterator.filter(x => x % 2 === 0).toArray()).should.deep.equal([0, 2]); + }); + it('Should apply filter then map correctly', async () => { + (await iterator.filter(x => x % 2 === 0).map(x => `x${x}`).toArray()).should.deep.equal(['x0', 'x2']); + }); + it('Should apply map then filter correctly (1)', async () => { + (await iterator.map(x => x).filter(x => x % 2 === 0).toArray()).should.deep.equal([0, 2]); + }); + it('Should apply map then filter to false correctly', async () => { + (await iterator.map(x => `x${x}`).filter(x => true).toArray()).should.deep.equal(['x0', 'x1', 'x2']); + }); + it('Should apply map then filter to true correctly', async () => { + (await iterator.map(x => `x${x}`).filter(x => false).toArray()).should.deep.equal([]); + }); + it('Should apply filter to false then map correctly', async () => { + (await iterator.filter(x => true).map(x => `x${x}`).toArray()).should.deep.equal(['x0', 'x1', 'x2']); + }); + it('Should apply filter to true then map correctly', async () => { + (await iterator.filter(x => false).map(x => `x${x}`).filter(x => false).toArray()).should.deep.equal([]); + }); + it('Should apply filter one then double', async () => { + (await iterator.filter(x => x !== 1).map(x => x * 2).toArray()).should.deep.equal([0, 4]); + }); + it('Should apply double then filter one', async () => { + (await iterator.map(x => x * 2).filter(x => x !== 1).toArray()).should.deep.equal([0, 2, 4]); + }); + it('Should apply map then filter correctly', async () => { + (await iterator.map(x => `x${x}`).filter(x => (x[1] === '0')).toArray()).should.deep.equal(['x0']); + }); + it('Should correctly apply 3 filters', async () => { + (await range(0, 5).filter(x => x !== 1).filter(x => x !== 2).filter(x => x !== 2).toArray()).should.deep.equal([0, 3, 4, 5]); + }); + it('Should correctly apply 3 maps', async () => { + (await range(0, 1).map(x => x * 2).map(x => `z${x}`).map(x => `y${x}`).toArray()).should.deep.equal(['yz0', 'yz2']); + }); + it('Should correctly apply a map, followed by a filter, followed by another map', async () => { + (await range(0, 1).map(x => x * 2).filter(x => x !== 2).map(x => `y${x}`).toArray()).should.deep.equal(['y0']); + }); + it('Should correctly apply a filter-map-filter', async () => { + (await range(0, 2).filter(x => x !== 1).map(x => x * 3).filter(x => x !== 6).toArray()).should.deep.equal([0]); + }); + }); } }); describe('Skipping', () => { @@ -1392,27 +1392,27 @@ describe('AsyncIterator', () => { it('should be an AsyncIterator object', () => { instance.should.be.an.instanceof(AsyncIterator); }); - + it('should be an EventEmitter object', () => { instance.should.be.an.instanceof(EventEmitter); }); }); }); - + describe('A SkippingIterator', () => { let iterator, source; before(() => { source = new ArrayIterator([0, 1, 2, 3, 4, 5, 6]); iterator = source.skip(4); }); - + describe('when reading items', () => { const items = []; before(done => { iterator.on('data', item => { items.push(item); }); iterator.on('end', done); }); - + it('should return items skipping the specified amount', () => { items.should.deep.equal([4, 5, 6]); }); @@ -1425,20 +1425,20 @@ describe('AsyncIterator', () => { source = range(0, 6); iterator = source.skip(4); }); - + describe('when reading items', () => { const items = []; before(done => { iterator.on('data', item => { items.push(item); }); iterator.on('end', done); }); - + it('should return items skipping the specified amount', () => { items.should.deep.equal([4, 5, 6]); }); }); }); - + describe('A SkippingIterator with a source that emits 0 items', () => { it('should not return any items', done => { const items = []; @@ -1450,7 +1450,7 @@ describe('AsyncIterator', () => { }); }); }); - + describe('A SkippingIterator with a limit of 0 items', () => { it('should emit all items', done => { const items = []; @@ -1462,7 +1462,7 @@ describe('AsyncIterator', () => { }); }); }); - + describe('A SkippingIterator with a limit of Infinity items', () => { it('should skip all items', done => { const items = []; diff --git a/test/SynchronousTransformIterator.js b/test/SynchronousTransformIterator.js index ac117ee..b20ec7d 100644 --- a/test/SynchronousTransformIterator.js +++ b/test/SynchronousTransformIterator.js @@ -7,9 +7,6 @@ import { import { EventEmitter } from 'events'; class _SynchronousTransformIterator extends SynchronousTransformIterator { - constructor(source, options) { - super(source, options); - } read() { return this._source.read(); } @@ -109,7 +106,7 @@ describe('SynchronousTransformIterator', () => { let iterator, source; before(() => { source = new ArrayIterator([1, 2, 3]); - iterator = new _SynchronousTransformIterator(source, {destroySource: false }); + iterator = new _SynchronousTransformIterator(source, { destroySource: false }); }); describe('after being closed', () => { From d6681f2cb5dcdbabc10b53532c7d85d1bd929d4f Mon Sep 17 00:00:00 2001 From: Jesse Wright Date: Tue, 5 Apr 2022 01:07:15 +1000 Subject: [PATCH 03/26] chore: remove syncTransform method --- asynciterator.ts | 20 +++++++++----------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/asynciterator.ts b/asynciterator.ts index 1374a90..49ae05d 100644 --- a/asynciterator.ts +++ b/asynciterator.ts @@ -448,19 +448,15 @@ export class AsyncIterator extends EventEmitter { return new SimpleTransformIterator(this, options); } - syncTransform(fn: (item: T) => D | null) { - return new SyncTransformIterator(this, { fn }) as any; - } - /** Maps items from this iterator using the given function. After this operation, only read the returned iterator instead of the current one. - @param {Function} map A mapping function to call on this iterator's (remaining) items + @param {Function} map A mapping function to call on this iterator's (remaining) items (null values are skipped) @param {object?} self The `this` pointer for the mapping function @returns {module:asynciterator.AsyncIterator} A new iterator that maps the items from this iterator */ - map(map: (item: T) => D, self?: any): AsyncIterator { - return this.syncTransform(self ? map.bind(self) : map); + map(map: (item: T) => D | null, self?: any): AsyncIterator { + return new SyncTransformIterator(this, { fn: self ? map.bind(self) : map }); } /** @@ -475,7 +471,7 @@ export class AsyncIterator extends EventEmitter { filter(filter: (item: T) => boolean, self?: any): AsyncIterator { if (self) filter = filter.bind(self); - return this.syncTransform(item => filter(item) ? item : null); + return this.map(item => filter(item) ? item : null); } /** @@ -516,7 +512,7 @@ export class AsyncIterator extends EventEmitter { @returns {module:asynciterator.AsyncIterator} A new iterator that skips the given number of items */ skip(offset: number): AsyncIterator { - return this.syncTransform(item => offset-- > 0 ? null : item); + return this.map(item => offset-- > 0 ? null : item); } /** @@ -1339,6 +1335,8 @@ export class SyncTransformIterator extends SynchronousTransformIterato const { source, funcs } = this; let item; outer: while ((item = source.read()) !== null) { + // Do not use a for-of loop here, it slows down transformations + // by approximately a factor of 2. for (let index = funcs.length - 1; index >= 0; index -= 1) { if ((item = funcs[index](item)) === null) continue outer; @@ -1348,8 +1346,8 @@ export class SyncTransformIterator extends SynchronousTransformIterato return null; } - syncTransform(fn: (item: D) => K | null): AsyncIterator { - return new SyncTransformIterator(this.source, { fn, next: this.transforms }, this); + map(map: (item: D) => K | null, self?: any): AsyncIterator { + return new SyncTransformIterator(this.source, { fn: self ? map.bind(self) : map, next: this.transforms }, this); } } From a40507994ec8e3105b6d94446cf624288ab83165 Mon Sep 17 00:00:00 2001 From: Jesse Wright Date: Tue, 5 Apr 2022 01:30:07 +1000 Subject: [PATCH 04/26] fix: fix destroysource issues --- asynciterator.ts | 26 ++++++++++++---- test/AsyncIterator-test.js | 63 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 83 insertions(+), 6 deletions(-) diff --git a/asynciterator.ts b/asynciterator.ts index 49ae05d..0e85152 100644 --- a/asynciterator.ts +++ b/asynciterator.ts @@ -1255,9 +1255,11 @@ function destinationFillBuffer(this: InternalSource) { export abstract class SynchronousTransformIterator extends AsyncIterator { - protected constructor(protected _source: AsyncIterator, private options: { destroySource?: boolean } = {}) { + private _destroySource: boolean; + protected constructor(protected _source: AsyncIterator, protected options: { destroySource?: boolean } = {}) { /* eslint-disable no-use-before-define */ super(); + this._destroySource = options.destroySource !== false; const cleanup = () => { _source.removeListener('end', onSourceEnd); _source.removeListener('error', onSourceError); @@ -1293,12 +1295,13 @@ export abstract class SynchronousTransformIterator extends AsyncIterat onSourceReadable(); } - protected _destroy(cause: Error | undefined, callback: (error?: Error) => void) { - super._destroy(cause, callback); + destroy(cause?: Error): void { + this._source.destroy(cause); + super.destroy(cause); } public close() { - if (this.options.destroySource) + if (this._destroySource) this._source.destroy(); super.close(); } @@ -1312,10 +1315,10 @@ interface Transform { export class SyncTransformIterator extends SynchronousTransformIterator { private _funcs?: Function[]; - constructor(private source: AsyncIterator, private transforms: Transform, upstream: AsyncIterator = source) { + constructor(private source: AsyncIterator, private transforms: Transform, private upstream: AsyncIterator = source, options?: { destroySource?: boolean }) { // Subscribe the iterator directly upstream rather than the original source to avoid over-subscribing // listeners to the original source - super(upstream); + super(upstream, options); } get funcs() { @@ -1349,6 +1352,17 @@ export class SyncTransformIterator extends SynchronousTransformIterato map(map: (item: D) => K | null, self?: any): AsyncIterator { return new SyncTransformIterator(this.source, { fn: self ? map.bind(self) : map, next: this.transforms }, this); } + + destroy(cause?: Error): void { + this.upstream.destroy(cause); + super.destroy(cause); + } + + public close() { + if (this.options.destroySource) + this.upstream.destroy(); + super.close(); + } } export class LimitingIterator extends SynchronousTransformIterator { diff --git a/test/AsyncIterator-test.js b/test/AsyncIterator-test.js index 7a01d77..5593098 100644 --- a/test/AsyncIterator-test.js +++ b/test/AsyncIterator-test.js @@ -1313,6 +1313,7 @@ describe('AsyncIterator', () => { }); describe('Testing chains fo maps and filters', () => { for (const iteratorGen of [() => range(0, 2), () => fromArray([0, 1, 2]), () => wrap(range(0, 2))]) { + // eslint-disable-next-line no-loop-func describe(`Testing with ${iteratorGen()}`, () => { let iterator; beforeEach(() => { @@ -1378,6 +1379,68 @@ describe('AsyncIterator', () => { it('Should correctly apply a filter-map-filter', async () => { (await range(0, 2).filter(x => x !== 1).map(x => x * 3).filter(x => x !== 6).toArray()).should.deep.equal([0]); }); + it('Should destroy when closed before being read after map', () => { + iterator.map(x => x).close(); + iterator.destroyed.should.be.true; + }); + it('Should destroy when closed before being read after map then filter', () => { + it = iterator.map(x => x); + it.filter(x => true).close(); + iterator.destroyed.should.be.true; + it.destroyed.should.be.true; + }); + describe('when called on an iterator with a `this` argument', () => { + const self = {}; + let map, result; + before(() => { + let i = 0; + iterator = new ArrayIterator(['a', 'b', 'c']); + map = sinon.spy(item => item + (++i)); + result = iterator.map(map, self); + }); + + describe('the return value', () => { + const items = []; + before(done => { + result.on('data', item => { items.push(item); }); + result.on('end', done); + }); + + it('should call the map function once for each item', () => { + map.should.have.been.calledThrice; + }); + + it('should call the map function with the passed argument as `this`', () => { + map.alwaysCalledOn(self).should.be.true; + }); + }); + }); + describe('when called on an iterator with a `this` argument with nested map', () => { + const self = {}; + let map, result; + before(() => { + let i = 0; + iterator = new ArrayIterator(['a', 'b', 'c']); + map = sinon.spy(item => item + (++i)); + result = iterator.map(x => x).map(map, self); + }); + + describe('the return value', () => { + const items = []; + before(done => { + result.on('data', item => { items.push(item); }); + result.on('end', done); + }); + + it('should call the map function once for each item', () => { + map.should.have.been.calledThrice; + }); + + it('should call the map function with the passed argument as `this`', () => { + map.alwaysCalledOn(self).should.be.true; + }); + }); + }); }); } }); From 75139846ea5a5caa49325715c74bd8ca8e4c5dda Mon Sep 17 00:00:00 2001 From: Jesse Wright Date: Tue, 5 Apr 2022 01:36:44 +1000 Subject: [PATCH 05/26] chore: remove typo --- test/TransformIterator-test.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/TransformIterator-test.js b/test/TransformIterator-test.js index accfd40..e5d8116 100644 --- a/test/TransformIterator-test.js +++ b/test/TransformIterator-test.js @@ -228,7 +228,7 @@ describe('TransformIterator', () => { iterator._eventCounts.end.should.equal(0); }); - it('should not havef ended', () => { + it('should not have ended', () => { iterator.ended.should.be.false; }); From df0d54efd9633ca29d681eea016b4fd875f443f2 Mon Sep 17 00:00:00 2001 From: Jesse Wright Date: Tue, 5 Apr 2022 08:22:33 +1000 Subject: [PATCH 06/26] Update asynciterator.ts Co-authored-by: Ruben Verborgh --- asynciterator.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/asynciterator.ts b/asynciterator.ts index 0e85152..617d2ca 100644 --- a/asynciterator.ts +++ b/asynciterator.ts @@ -451,7 +451,8 @@ export class AsyncIterator extends EventEmitter { /** Maps items from this iterator using the given function. After this operation, only read the returned iterator instead of the current one. - @param {Function} map A mapping function to call on this iterator's (remaining) items (null values are skipped) + @param {Function} map A mapping function to call on this iterator's (remaining) items. + A `null` value indicates that nothing should be returned for a particular item.. @param {object?} self The `this` pointer for the mapping function @returns {module:asynciterator.AsyncIterator} A new iterator that maps the items from this iterator */ From a8dbb6db28a6d55d5dc0e5b4a778f04416f3a19c Mon Sep 17 00:00:00 2001 From: Jesse Wright Date: Tue, 5 Apr 2022 08:41:13 +1000 Subject: [PATCH 07/26] Update asynciterator.ts Co-authored-by: Ruben Verborgh --- asynciterator.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/asynciterator.ts b/asynciterator.ts index 617d2ca..d5ddc31 100644 --- a/asynciterator.ts +++ b/asynciterator.ts @@ -1257,7 +1257,7 @@ function destinationFillBuffer(this: InternalSource) { export abstract class SynchronousTransformIterator extends AsyncIterator { private _destroySource: boolean; - protected constructor(protected _source: AsyncIterator, protected options: { destroySource?: boolean } = {}) { + public constructor(protected _source: AsyncIterator, protected options: { destroySource?: boolean } = {}) { /* eslint-disable no-use-before-define */ super(); this._destroySource = options.destroySource !== false; From 458acf9390c74148957bfbf26db1781cbb1930e7 Mon Sep 17 00:00:00 2001 From: Jesse Wright Date: Tue, 5 Apr 2022 08:48:02 +1000 Subject: [PATCH 08/26] chore: re-enable no-labels --- .eslintrc | 2 +- asynciterator.ts | 36 +++++++++++++++++++++--------------- 2 files changed, 22 insertions(+), 16 deletions(-) diff --git a/.eslintrc b/.eslintrc index 6aea021..d4ebd17 100644 --- a/.eslintrc +++ b/.eslintrc @@ -53,7 +53,7 @@ no-implied-eval: error, no-invalid-this: off, no-iterator: error, - no-labels: off, + no-labels: error, no-lone-blocks: error, no-loop-func: error, no-magic-numbers: off, diff --git a/asynciterator.ts b/asynciterator.ts index d5ddc31..9ffe268 100644 --- a/asynciterator.ts +++ b/asynciterator.ts @@ -1314,7 +1314,7 @@ interface Transform { } export class SyncTransformIterator extends SynchronousTransformIterator { - private _funcs?: Function[]; + private _fn?: Function; constructor(private source: AsyncIterator, private transforms: Transform, private upstream: AsyncIterator = source, options?: { destroySource?: boolean }) { // Subscribe the iterator directly upstream rather than the original source to avoid over-subscribing @@ -1322,30 +1322,36 @@ export class SyncTransformIterator extends SynchronousTransformIterato super(upstream, options); } - get funcs() { - if (!this._funcs) { - this._funcs = []; + get fn() { + if (!this._fn) { + const funcs: Function[] = []; // eslint-disable-next-line prefer-destructuring let transforms: Transform | undefined = this.transforms; do - this._funcs.push(transforms.fn); + funcs.push(transforms.fn); // eslint-disable-next-line no-cond-assign while (transforms = transforms.next); + + const endIndex = funcs.length - 1; + this._fn = (item: any) => { + // Do not use a for-of loop here, it slows down transformations + // by approximately a factor of 2. + for (let index = endIndex; index >= 1; index -= 1) { + if ((item = funcs[index](item)) === null) + return null; + } + return funcs[0](item); + }; } - return this._funcs; + return this._fn; } read(): D | null { - const { source, funcs } = this; + const { source, fn } = this; let item; - outer: while ((item = source.read()) !== null) { - // Do not use a for-of loop here, it slows down transformations - // by approximately a factor of 2. - for (let index = funcs.length - 1; index >= 0; index -= 1) { - if ((item = funcs[index](item)) === null) - continue outer; - } - return item; + while ((item = source.read()) !== null) { + if ((item = fn(item)) !== null) + return item; } return null; } From fe2ccc247013d5588848083b8ab211654fdb82be Mon Sep 17 00:00:00 2001 From: Jesse Wright Date: Tue, 5 Apr 2022 08:50:41 +1000 Subject: [PATCH 09/26] chore: use bind helper --- asynciterator.ts | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/asynciterator.ts b/asynciterator.ts index 9ffe268..5ba9050 100644 --- a/asynciterator.ts +++ b/asynciterator.ts @@ -25,6 +25,11 @@ export function setTaskScheduler(scheduler: TaskScheduler): void { taskScheduler = scheduler; } +/** Binds a function to an object */ +function bind(fn: Function, self: any) { + return self ? fn.bind(self) : fn; +} + /** ID of the INIT state. An iterator is initializing if it is preparing main item generation. @@ -470,8 +475,7 @@ export class AsyncIterator extends EventEmitter { filter(filter: (item: T) => item is K, self?: any): AsyncIterator; filter(filter: (item: T) => boolean, self?: any): AsyncIterator; filter(filter: (item: T) => boolean, self?: any): AsyncIterator { - if (self) - filter = filter.bind(self); + filter = bind(filter, self); return this.map(item => filter(item) ? item : null); } @@ -1357,7 +1361,7 @@ export class SyncTransformIterator extends SynchronousTransformIterato } map(map: (item: D) => K | null, self?: any): AsyncIterator { - return new SyncTransformIterator(this.source, { fn: self ? map.bind(self) : map, next: this.transforms }, this); + return new SyncTransformIterator(this.source, { fn: bind(map, self), next: this.transforms }, this); } destroy(cause?: Error): void { From 129a9dadf2e24cfd2076c0feed0691f890dd5f7b Mon Sep 17 00:00:00 2001 From: Jesse Wright Date: Tue, 5 Apr 2022 08:56:59 +1000 Subject: [PATCH 10/26] rename some tests --- test/AsyncIterator-test.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/AsyncIterator-test.js b/test/AsyncIterator-test.js index 5593098..c7e256d 100644 --- a/test/AsyncIterator-test.js +++ b/test/AsyncIterator-test.js @@ -1311,10 +1311,10 @@ describe('AsyncIterator', () => { }); }); }); - describe('Testing chains fo maps and filters', () => { + describe('A chain of maps and filters', () => { for (const iteratorGen of [() => range(0, 2), () => fromArray([0, 1, 2]), () => wrap(range(0, 2))]) { // eslint-disable-next-line no-loop-func - describe(`Testing with ${iteratorGen()}`, () => { + describe(`With ${iteratorGen()}`, () => { let iterator; beforeEach(() => { iterator = iteratorGen(); From a562356cdcb50d27fe0236774bf8a303f77c8cf1 Mon Sep 17 00:00:00 2001 From: Jesse Wright Date: Tue, 5 Apr 2022 08:58:05 +1000 Subject: [PATCH 11/26] fix typo in tests and use lower case should --- test/AsyncIterator-test.js | 46 +++++++++++++++++++------------------- 1 file changed, 23 insertions(+), 23 deletions(-) diff --git a/test/AsyncIterator-test.js b/test/AsyncIterator-test.js index c7e256d..589ffdb 100644 --- a/test/AsyncIterator-test.js +++ b/test/AsyncIterator-test.js @@ -1314,76 +1314,76 @@ describe('AsyncIterator', () => { describe('A chain of maps and filters', () => { for (const iteratorGen of [() => range(0, 2), () => fromArray([0, 1, 2]), () => wrap(range(0, 2))]) { // eslint-disable-next-line no-loop-func - describe(`With ${iteratorGen()}`, () => { + describe(`with ${iteratorGen()}`, () => { let iterator; beforeEach(() => { iterator = iteratorGen(); }); - it('Should handle no transforms arrayified', async () => { + it('should handle no transforms arrayified', async () => { (await iterator.toArray()).should.deep.equal([0, 1, 2]); }); - it('Should apply maps that doubles correctly', async () => { + it('should apply maps that doubles correctly', async () => { (await iterator.map(x => x * 2).toArray()).should.deep.equal([0, 2, 4]); }); - it('Should apply maps that doubles correctly and then maybemaps', async () => { + it('should apply maps that doubles correctly and then maybemaps', async () => { (await iterator.map(x => x * 2).map(x => x === 2 ? null : x * 3).toArray()).should.deep.equal([0, 12]); }); - it('Should apply maps that maybemaps correctly', async () => { + it('should apply maps that maybemaps correctly', async () => { (await iterator.map(x => x === 2 ? null : x * 3).toArray()).should.deep.equal([0, 3]); }); - it('Should apply maps that maybemaps twice', async () => { + it('should apply maps that maybemaps twice', async () => { (await iterator.map(x => x === 2 ? null : x * 3).map(x => x === 0 ? null : x * 3).toArray()).should.deep.equal([9]); }); - it('Should apply maps that converts to string', async () => { + it('should apply maps that converts to string', async () => { (await iterator.map(x => `x${x}`).toArray()).should.deep.equal(['x0', 'x1', 'x2']); }); - it('Should apply filter correctly', async () => { + it('should apply filter correctly', async () => { (await iterator.filter(x => x % 2 === 0).toArray()).should.deep.equal([0, 2]); }); - it('Should apply filter then map correctly', async () => { + it('should apply filter then map correctly', async () => { (await iterator.filter(x => x % 2 === 0).map(x => `x${x}`).toArray()).should.deep.equal(['x0', 'x2']); }); - it('Should apply map then filter correctly (1)', async () => { + it('should apply map then filter correctly (1)', async () => { (await iterator.map(x => x).filter(x => x % 2 === 0).toArray()).should.deep.equal([0, 2]); }); - it('Should apply map then filter to false correctly', async () => { + it('should apply map then filter to false correctly', async () => { (await iterator.map(x => `x${x}`).filter(x => true).toArray()).should.deep.equal(['x0', 'x1', 'x2']); }); - it('Should apply map then filter to true correctly', async () => { + it('should apply map then filter to true correctly', async () => { (await iterator.map(x => `x${x}`).filter(x => false).toArray()).should.deep.equal([]); }); - it('Should apply filter to false then map correctly', async () => { + it('should apply filter to false then map correctly', async () => { (await iterator.filter(x => true).map(x => `x${x}`).toArray()).should.deep.equal(['x0', 'x1', 'x2']); }); - it('Should apply filter to true then map correctly', async () => { + it('should apply filter to true then map correctly', async () => { (await iterator.filter(x => false).map(x => `x${x}`).filter(x => false).toArray()).should.deep.equal([]); }); - it('Should apply filter one then double', async () => { + it('should apply filter one then double', async () => { (await iterator.filter(x => x !== 1).map(x => x * 2).toArray()).should.deep.equal([0, 4]); }); - it('Should apply double then filter one', async () => { + it('should apply double then filter one', async () => { (await iterator.map(x => x * 2).filter(x => x !== 1).toArray()).should.deep.equal([0, 2, 4]); }); - it('Should apply map then filter correctly', async () => { + it('should apply map then filter correctly', async () => { (await iterator.map(x => `x${x}`).filter(x => (x[1] === '0')).toArray()).should.deep.equal(['x0']); }); - it('Should correctly apply 3 filters', async () => { + it('should correctly apply 3 filters', async () => { (await range(0, 5).filter(x => x !== 1).filter(x => x !== 2).filter(x => x !== 2).toArray()).should.deep.equal([0, 3, 4, 5]); }); - it('Should correctly apply 3 maps', async () => { + it('should correctly apply 3 maps', async () => { (await range(0, 1).map(x => x * 2).map(x => `z${x}`).map(x => `y${x}`).toArray()).should.deep.equal(['yz0', 'yz2']); }); - it('Should correctly apply a map, followed by a filter, followed by another map', async () => { + it('should correctly apply a map, followed by a filter, followed by another map', async () => { (await range(0, 1).map(x => x * 2).filter(x => x !== 2).map(x => `y${x}`).toArray()).should.deep.equal(['y0']); }); - it('Should correctly apply a filter-map-filter', async () => { + it('should correctly apply a filter-map-filter', async () => { (await range(0, 2).filter(x => x !== 1).map(x => x * 3).filter(x => x !== 6).toArray()).should.deep.equal([0]); }); - it('Should destroy when closed before being read after map', () => { + it('should destroy when closed before being read after map', () => { iterator.map(x => x).close(); iterator.destroyed.should.be.true; }); - it('Should destroy when closed before being read after map then filter', () => { + it('should destroy when closed before being read after map then filter', () => { it = iterator.map(x => x); it.filter(x => true).close(); iterator.destroyed.should.be.true; From 4a29d25e08f52b7fc746b037acb83bcaceb3166d Mon Sep 17 00:00:00 2001 From: Jesse Wright Date: Tue, 5 Apr 2022 09:29:35 +1000 Subject: [PATCH 12/26] chore: use bind function util better --- asynciterator.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/asynciterator.ts b/asynciterator.ts index 5ba9050..6a1c465 100644 --- a/asynciterator.ts +++ b/asynciterator.ts @@ -166,7 +166,7 @@ export class AsyncIterator extends EventEmitter { @param {object?} self The `this` pointer for the callback */ forEach(callback: (item: T) => void, self?: object) { - this.on('data', self ? callback.bind(self) : callback); + this.on('data', bind(callback, self)); } /** @@ -462,7 +462,7 @@ export class AsyncIterator extends EventEmitter { @returns {module:asynciterator.AsyncIterator} A new iterator that maps the items from this iterator */ map(map: (item: T) => D | null, self?: any): AsyncIterator { - return new SyncTransformIterator(this, { fn: self ? map.bind(self) : map }); + return new SyncTransformIterator(this, { fn: bind(map, self) }); } /** From a58071522399f794f8932078e4071b3516ab7dfb Mon Sep 17 00:00:00 2001 From: Jesse Wright Date: Tue, 5 Apr 2022 09:41:40 +1000 Subject: [PATCH 13/26] chore: add missing break in test --- test/AsyncIterator-test.js | 1 + 1 file changed, 1 insertion(+) diff --git a/test/AsyncIterator-test.js b/test/AsyncIterator-test.js index 589ffdb..a0c21c0 100644 --- a/test/AsyncIterator-test.js +++ b/test/AsyncIterator-test.js @@ -1311,6 +1311,7 @@ describe('AsyncIterator', () => { }); }); }); + describe('A chain of maps and filters', () => { for (const iteratorGen of [() => range(0, 2), () => fromArray([0, 1, 2]), () => wrap(range(0, 2))]) { // eslint-disable-next-line no-loop-func From aff246c38152ec5caa41e71fc84b4982792e0d75 Mon Sep 17 00:00:00 2001 From: Jesse Wright Date: Tue, 5 Apr 2022 09:44:25 +1000 Subject: [PATCH 14/26] chore: add missing break in test --- test/AsyncIterator-test.js | 37 ++++++++++++++++++++++++++++++++++++- 1 file changed, 36 insertions(+), 1 deletion(-) diff --git a/test/AsyncIterator-test.js b/test/AsyncIterator-test.js index a0c21c0..13b36c0 100644 --- a/test/AsyncIterator-test.js +++ b/test/AsyncIterator-test.js @@ -1311,88 +1311,114 @@ describe('AsyncIterator', () => { }); }); }); - + describe('A chain of maps and filters', () => { for (const iteratorGen of [() => range(0, 2), () => fromArray([0, 1, 2]), () => wrap(range(0, 2))]) { + // eslint-disable-next-line no-loop-func describe(`with ${iteratorGen()}`, () => { let iterator; + beforeEach(() => { iterator = iteratorGen(); }); + it('should handle no transforms arrayified', async () => { (await iterator.toArray()).should.deep.equal([0, 1, 2]); }); + it('should apply maps that doubles correctly', async () => { (await iterator.map(x => x * 2).toArray()).should.deep.equal([0, 2, 4]); }); + it('should apply maps that doubles correctly and then maybemaps', async () => { (await iterator.map(x => x * 2).map(x => x === 2 ? null : x * 3).toArray()).should.deep.equal([0, 12]); }); + it('should apply maps that maybemaps correctly', async () => { (await iterator.map(x => x === 2 ? null : x * 3).toArray()).should.deep.equal([0, 3]); }); + it('should apply maps that maybemaps twice', async () => { (await iterator.map(x => x === 2 ? null : x * 3).map(x => x === 0 ? null : x * 3).toArray()).should.deep.equal([9]); }); + it('should apply maps that converts to string', async () => { (await iterator.map(x => `x${x}`).toArray()).should.deep.equal(['x0', 'x1', 'x2']); }); + it('should apply filter correctly', async () => { (await iterator.filter(x => x % 2 === 0).toArray()).should.deep.equal([0, 2]); }); + it('should apply filter then map correctly', async () => { (await iterator.filter(x => x % 2 === 0).map(x => `x${x}`).toArray()).should.deep.equal(['x0', 'x2']); }); + it('should apply map then filter correctly (1)', async () => { (await iterator.map(x => x).filter(x => x % 2 === 0).toArray()).should.deep.equal([0, 2]); }); + it('should apply map then filter to false correctly', async () => { (await iterator.map(x => `x${x}`).filter(x => true).toArray()).should.deep.equal(['x0', 'x1', 'x2']); }); + it('should apply map then filter to true correctly', async () => { (await iterator.map(x => `x${x}`).filter(x => false).toArray()).should.deep.equal([]); }); + it('should apply filter to false then map correctly', async () => { (await iterator.filter(x => true).map(x => `x${x}`).toArray()).should.deep.equal(['x0', 'x1', 'x2']); }); + it('should apply filter to true then map correctly', async () => { (await iterator.filter(x => false).map(x => `x${x}`).filter(x => false).toArray()).should.deep.equal([]); }); + it('should apply filter one then double', async () => { (await iterator.filter(x => x !== 1).map(x => x * 2).toArray()).should.deep.equal([0, 4]); }); + it('should apply double then filter one', async () => { (await iterator.map(x => x * 2).filter(x => x !== 1).toArray()).should.deep.equal([0, 2, 4]); }); + it('should apply map then filter correctly', async () => { (await iterator.map(x => `x${x}`).filter(x => (x[1] === '0')).toArray()).should.deep.equal(['x0']); }); + it('should correctly apply 3 filters', async () => { (await range(0, 5).filter(x => x !== 1).filter(x => x !== 2).filter(x => x !== 2).toArray()).should.deep.equal([0, 3, 4, 5]); }); + it('should correctly apply 3 maps', async () => { (await range(0, 1).map(x => x * 2).map(x => `z${x}`).map(x => `y${x}`).toArray()).should.deep.equal(['yz0', 'yz2']); }); + it('should correctly apply a map, followed by a filter, followed by another map', async () => { (await range(0, 1).map(x => x * 2).filter(x => x !== 2).map(x => `y${x}`).toArray()).should.deep.equal(['y0']); }); + it('should correctly apply a filter-map-filter', async () => { (await range(0, 2).filter(x => x !== 1).map(x => x * 3).filter(x => x !== 6).toArray()).should.deep.equal([0]); }); + it('should destroy when closed before being read after map', () => { iterator.map(x => x).close(); iterator.destroyed.should.be.true; }); + it('should destroy when closed before being read after map then filter', () => { it = iterator.map(x => x); it.filter(x => true).close(); iterator.destroyed.should.be.true; it.destroyed.should.be.true; }); + describe('when called on an iterator with a `this` argument', () => { const self = {}; let map, result; + before(() => { let i = 0; iterator = new ArrayIterator(['a', 'b', 'c']); @@ -1402,6 +1428,7 @@ describe('AsyncIterator', () => { describe('the return value', () => { const items = []; + before(done => { result.on('data', item => { items.push(item); }); result.on('end', done); @@ -1416,9 +1443,11 @@ describe('AsyncIterator', () => { }); }); }); + describe('when called on an iterator with a `this` argument with nested map', () => { const self = {}; let map, result; + before(() => { let i = 0; iterator = new ArrayIterator(['a', 'b', 'c']); @@ -1428,6 +1457,7 @@ describe('AsyncIterator', () => { describe('the return value', () => { const items = []; + before(done => { result.on('data', item => { items.push(item); }); result.on('end', done); @@ -1449,6 +1479,7 @@ describe('AsyncIterator', () => { describe('The SkippingIterator function', () => { describe('the result when called with `new`', () => { let instance; + before(() => { instance = new ArrayIterator([]).skip(10); }); @@ -1465,6 +1496,7 @@ describe('AsyncIterator', () => { describe('A SkippingIterator', () => { let iterator, source; + before(() => { source = new ArrayIterator([0, 1, 2, 3, 4, 5, 6]); iterator = source.skip(4); @@ -1472,6 +1504,7 @@ describe('AsyncIterator', () => { describe('when reading items', () => { const items = []; + before(done => { iterator.on('data', item => { items.push(item); }); iterator.on('end', done); @@ -1485,6 +1518,7 @@ describe('AsyncIterator', () => { describe('A SkippingIterator', () => { let iterator, source; + before(() => { source = range(0, 6); iterator = source.skip(4); @@ -1492,6 +1526,7 @@ describe('AsyncIterator', () => { describe('when reading items', () => { const items = []; + before(done => { iterator.on('data', item => { items.push(item); }); iterator.on('end', done); From e7a004419620a2e9feae326768821a263799cc19 Mon Sep 17 00:00:00 2001 From: Jesse Wright Date: Tue, 5 Apr 2022 09:46:29 +1000 Subject: [PATCH 15/26] chore: add missing break in test --- test/AsyncIterator-test.js | 67 ++++++++++++++-------------- test/SynchronousTransformIterator.js | 7 +++ 2 files changed, 40 insertions(+), 34 deletions(-) diff --git a/test/AsyncIterator-test.js b/test/AsyncIterator-test.js index 13b36c0..15fb685 100644 --- a/test/AsyncIterator-test.js +++ b/test/AsyncIterator-test.js @@ -1314,7 +1314,6 @@ describe('AsyncIterator', () => { describe('A chain of maps and filters', () => { for (const iteratorGen of [() => range(0, 2), () => fromArray([0, 1, 2]), () => wrap(range(0, 2))]) { - // eslint-disable-next-line no-loop-func describe(`with ${iteratorGen()}`, () => { let iterator; @@ -1322,103 +1321,103 @@ describe('AsyncIterator', () => { beforeEach(() => { iterator = iteratorGen(); }); - + it('should handle no transforms arrayified', async () => { (await iterator.toArray()).should.deep.equal([0, 1, 2]); }); - + it('should apply maps that doubles correctly', async () => { (await iterator.map(x => x * 2).toArray()).should.deep.equal([0, 2, 4]); }); - + it('should apply maps that doubles correctly and then maybemaps', async () => { (await iterator.map(x => x * 2).map(x => x === 2 ? null : x * 3).toArray()).should.deep.equal([0, 12]); }); - + it('should apply maps that maybemaps correctly', async () => { (await iterator.map(x => x === 2 ? null : x * 3).toArray()).should.deep.equal([0, 3]); }); - + it('should apply maps that maybemaps twice', async () => { (await iterator.map(x => x === 2 ? null : x * 3).map(x => x === 0 ? null : x * 3).toArray()).should.deep.equal([9]); }); - + it('should apply maps that converts to string', async () => { (await iterator.map(x => `x${x}`).toArray()).should.deep.equal(['x0', 'x1', 'x2']); }); - + it('should apply filter correctly', async () => { (await iterator.filter(x => x % 2 === 0).toArray()).should.deep.equal([0, 2]); }); - + it('should apply filter then map correctly', async () => { (await iterator.filter(x => x % 2 === 0).map(x => `x${x}`).toArray()).should.deep.equal(['x0', 'x2']); }); - + it('should apply map then filter correctly (1)', async () => { (await iterator.map(x => x).filter(x => x % 2 === 0).toArray()).should.deep.equal([0, 2]); }); - + it('should apply map then filter to false correctly', async () => { (await iterator.map(x => `x${x}`).filter(x => true).toArray()).should.deep.equal(['x0', 'x1', 'x2']); }); - + it('should apply map then filter to true correctly', async () => { (await iterator.map(x => `x${x}`).filter(x => false).toArray()).should.deep.equal([]); }); - + it('should apply filter to false then map correctly', async () => { (await iterator.filter(x => true).map(x => `x${x}`).toArray()).should.deep.equal(['x0', 'x1', 'x2']); }); - + it('should apply filter to true then map correctly', async () => { (await iterator.filter(x => false).map(x => `x${x}`).filter(x => false).toArray()).should.deep.equal([]); }); - + it('should apply filter one then double', async () => { (await iterator.filter(x => x !== 1).map(x => x * 2).toArray()).should.deep.equal([0, 4]); }); - + it('should apply double then filter one', async () => { (await iterator.map(x => x * 2).filter(x => x !== 1).toArray()).should.deep.equal([0, 2, 4]); }); - + it('should apply map then filter correctly', async () => { (await iterator.map(x => `x${x}`).filter(x => (x[1] === '0')).toArray()).should.deep.equal(['x0']); }); - + it('should correctly apply 3 filters', async () => { (await range(0, 5).filter(x => x !== 1).filter(x => x !== 2).filter(x => x !== 2).toArray()).should.deep.equal([0, 3, 4, 5]); }); - + it('should correctly apply 3 maps', async () => { (await range(0, 1).map(x => x * 2).map(x => `z${x}`).map(x => `y${x}`).toArray()).should.deep.equal(['yz0', 'yz2']); }); - + it('should correctly apply a map, followed by a filter, followed by another map', async () => { (await range(0, 1).map(x => x * 2).filter(x => x !== 2).map(x => `y${x}`).toArray()).should.deep.equal(['y0']); }); - + it('should correctly apply a filter-map-filter', async () => { (await range(0, 2).filter(x => x !== 1).map(x => x * 3).filter(x => x !== 6).toArray()).should.deep.equal([0]); }); - + it('should destroy when closed before being read after map', () => { iterator.map(x => x).close(); iterator.destroyed.should.be.true; }); - + it('should destroy when closed before being read after map then filter', () => { it = iterator.map(x => x); it.filter(x => true).close(); iterator.destroyed.should.be.true; it.destroyed.should.be.true; }); - + describe('when called on an iterator with a `this` argument', () => { const self = {}; let map, result; - + before(() => { let i = 0; iterator = new ArrayIterator(['a', 'b', 'c']); @@ -1428,7 +1427,7 @@ describe('AsyncIterator', () => { describe('the return value', () => { const items = []; - + before(done => { result.on('data', item => { items.push(item); }); result.on('end', done); @@ -1443,11 +1442,11 @@ describe('AsyncIterator', () => { }); }); }); - + describe('when called on an iterator with a `this` argument with nested map', () => { const self = {}; let map, result; - + before(() => { let i = 0; iterator = new ArrayIterator(['a', 'b', 'c']); @@ -1457,7 +1456,7 @@ describe('AsyncIterator', () => { describe('the return value', () => { const items = []; - + before(done => { result.on('data', item => { items.push(item); }); result.on('end', done); @@ -1479,7 +1478,7 @@ describe('AsyncIterator', () => { describe('The SkippingIterator function', () => { describe('the result when called with `new`', () => { let instance; - + before(() => { instance = new ArrayIterator([]).skip(10); }); @@ -1496,7 +1495,7 @@ describe('AsyncIterator', () => { describe('A SkippingIterator', () => { let iterator, source; - + before(() => { source = new ArrayIterator([0, 1, 2, 3, 4, 5, 6]); iterator = source.skip(4); @@ -1504,7 +1503,7 @@ describe('AsyncIterator', () => { describe('when reading items', () => { const items = []; - + before(done => { iterator.on('data', item => { items.push(item); }); iterator.on('end', done); @@ -1518,7 +1517,7 @@ describe('AsyncIterator', () => { describe('A SkippingIterator', () => { let iterator, source; - + before(() => { source = range(0, 6); iterator = source.skip(4); @@ -1526,7 +1525,7 @@ describe('AsyncIterator', () => { describe('when reading items', () => { const items = []; - + before(done => { iterator.on('data', item => { items.push(item); }); iterator.on('end', done); diff --git a/test/SynchronousTransformIterator.js b/test/SynchronousTransformIterator.js index b20ec7d..ce16571 100644 --- a/test/SynchronousTransformIterator.js +++ b/test/SynchronousTransformIterator.js @@ -16,6 +16,7 @@ describe('SynchronousTransformIterator', () => { describe('The SynchronousTransformIterator function', () => { describe('the result when called with `new`', () => { let instance; + before(() => { instance = new _SynchronousTransformIterator(new ArrayIterator([])); }); @@ -36,6 +37,7 @@ describe('SynchronousTransformIterator', () => { describe('A SynchronousTransformIterator', () => { let iterator, source; + before(() => { source = new ArrayIterator([0, 1, 2, 3, 4, 5, 6]); iterator = new _SynchronousTransformIterator(source); @@ -43,6 +45,7 @@ describe('SynchronousTransformIterator', () => { describe('when reading items', () => { const items = []; + before(done => { iterator.on('data', item => { items.push(item); }); iterator.on('end', done); @@ -84,6 +87,7 @@ describe('SynchronousTransformIterator', () => { describe('A TransformIterator with destroySource set to its default', () => { let iterator, source; + before(() => { source = new ArrayIterator([1, 2, 3]); iterator = new _SynchronousTransformIterator(source); @@ -104,6 +108,7 @@ describe('SynchronousTransformIterator', () => { describe('A TransformIterator with destroySource set to false', () => { let iterator, source; + before(() => { source = new ArrayIterator([1, 2, 3]); iterator = new _SynchronousTransformIterator(source, { destroySource: false }); @@ -124,6 +129,7 @@ describe('SynchronousTransformIterator', () => { describe('A TransformIterator with a source that errors', () => { let iterator, source, errorHandler; + before(() => { source = new AsyncIterator(); iterator = new _SynchronousTransformIterator(source); @@ -151,6 +157,7 @@ describe('SynchronousTransformIterator', () => { describe('after a second error occurs', () => { let error2; + before(() => { errorHandler.reset(); source.emit('error', error2 = new Error('error2')); From 5f4d3e9fd828540230397279ac056586360cc678 Mon Sep 17 00:00:00 2001 From: Jesse Wright Date: Tue, 5 Apr 2022 09:49:49 +1000 Subject: [PATCH 16/26] chore: rename Transform -> ComposedFunction --- asynciterator.ts | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/asynciterator.ts b/asynciterator.ts index 6a1c465..912bdfd 100644 --- a/asynciterator.ts +++ b/asynciterator.ts @@ -1312,15 +1312,15 @@ export abstract class SynchronousTransformIterator extends AsyncIterat } } -interface Transform { +interface ComposedFunction { fn: Function, - next?: Transform + next?: ComposedFunction } export class SyncTransformIterator extends SynchronousTransformIterator { private _fn?: Function; - constructor(private source: AsyncIterator, private transforms: Transform, private upstream: AsyncIterator = source, options?: { destroySource?: boolean }) { + constructor(private source: AsyncIterator, private transforms: ComposedFunction, private upstream: AsyncIterator = source, options?: { destroySource?: boolean }) { // Subscribe the iterator directly upstream rather than the original source to avoid over-subscribing // listeners to the original source super(upstream, options); @@ -1330,7 +1330,7 @@ export class SyncTransformIterator extends SynchronousTransformIterato if (!this._fn) { const funcs: Function[] = []; // eslint-disable-next-line prefer-destructuring - let transforms: Transform | undefined = this.transforms; + let transforms: ComposedFunction | undefined = this.transforms; do funcs.push(transforms.fn); // eslint-disable-next-line no-cond-assign From 13e30a46b1736d74172d50ceb5272b797666a065 Mon Sep 17 00:00:00 2001 From: Jesse Wright Date: Tue, 5 Apr 2022 09:52:13 +1000 Subject: [PATCH 17/26] chore: rename LimitingIterator -> HeadIterator --- asynciterator.ts | 4 ++-- test/LimitingIterator-test.js | 14 +++++++------- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/asynciterator.ts b/asynciterator.ts index 912bdfd..946d036 100644 --- a/asynciterator.ts +++ b/asynciterator.ts @@ -527,7 +527,7 @@ export class AsyncIterator extends EventEmitter { @returns {module:asynciterator.AsyncIterator} A new iterator with at most the given number of items */ take(limit: number): AsyncIterator { - return new LimitingIterator(this, limit); + return new HeadIterator(this, limit); } /** @@ -1376,7 +1376,7 @@ export class SyncTransformIterator extends SynchronousTransformIterato } } -export class LimitingIterator extends SynchronousTransformIterator { +export class HeadIterator extends SynchronousTransformIterator { protected count: number = 0; constructor(source: AsyncIterator, protected readonly limit: number) { diff --git a/test/LimitingIterator-test.js b/test/LimitingIterator-test.js index 2f8601b..0933a30 100644 --- a/test/LimitingIterator-test.js +++ b/test/LimitingIterator-test.js @@ -1,7 +1,7 @@ import { AsyncIterator, ArrayIterator, - LimitingIterator, + HeadIterator, } from '../dist/asynciterator.js'; import { EventEmitter } from 'events'; @@ -11,11 +11,11 @@ describe('LimitingIterator', () => { describe('the result when called with `new`', () => { let instance; before(() => { - instance = new LimitingIterator(new ArrayIterator([]), 10); + instance = new HeadIterator(new ArrayIterator([]), 10); }); it('should be a LimitingIterator object', () => { - instance.should.be.an.instanceof(LimitingIterator); + instance.should.be.an.instanceof(HeadIterator); }); it('should be an AsyncIterator object', () => { @@ -32,7 +32,7 @@ describe('LimitingIterator', () => { let iterator, source; before(() => { source = new ArrayIterator([0, 1, 2, 3, 4, 5, 6]); - iterator = new LimitingIterator(source, 4); + iterator = new HeadIterator(source, 4); }); describe('when reading items', () => { @@ -51,7 +51,7 @@ describe('LimitingIterator', () => { describe('A LimitingIterator with a source that emits 0 items', () => { it('should not return any items', done => { const items = []; - const iterator = new LimitingIterator(new ArrayIterator([]), 10); + const iterator = new HeadIterator(new ArrayIterator([]), 10); iterator.on('data', item => { items.push(item); }); iterator.on('end', () => { items.should.deep.equal([]); @@ -63,7 +63,7 @@ describe('LimitingIterator', () => { describe('A LimitingIterator with a limit of 0 items', () => { it('should not emit any items', done => { const items = []; - const iterator = new LimitingIterator(new ArrayIterator([0, 1, 2]), 0); + const iterator = new HeadIterator(new ArrayIterator([0, 1, 2]), 0); iterator.on('data', item => { items.push(item); }); iterator.on('end', () => { items.should.deep.equal([]); @@ -75,7 +75,7 @@ describe('LimitingIterator', () => { describe('A LimitingIterator with a limit of Infinity items', () => { it('should emit all items', done => { const items = []; - const iterator = new LimitingIterator(new ArrayIterator([0, 1, 2, 3, 4, 5, 6]), Infinity); + const iterator = new HeadIterator(new ArrayIterator([0, 1, 2, 3, 4, 5, 6]), Infinity); iterator.on('data', item => { items.push(item); }); iterator.on('end', () => { items.should.deep.equal([0, 1, 2, 3, 4, 5, 6]); From 69c14a6a5a488ca74a69f9c03b08fd70ec3e2db2 Mon Sep 17 00:00:00 2001 From: Jesse Wright Date: Tue, 5 Apr 2022 10:50:12 +1000 Subject: [PATCH 18/26] chore: remove SynchronousTransformIterator and rename SyncTransformIterator -> MappingIterator --- asynciterator.ts | 67 ++++++++++------------------ test/SynchronousTransformIterator.js | 10 ++--- 2 files changed, 29 insertions(+), 48 deletions(-) diff --git a/asynciterator.ts b/asynciterator.ts index 946d036..a9e8b7d 100644 --- a/asynciterator.ts +++ b/asynciterator.ts @@ -462,7 +462,7 @@ export class AsyncIterator extends EventEmitter { @returns {module:asynciterator.AsyncIterator} A new iterator that maps the items from this iterator */ map(map: (item: T) => D | null, self?: any): AsyncIterator { - return new SyncTransformIterator(this, { fn: bind(map, self) }); + return new MappingIterator(this, { fn: bind(map, self) }); } /** @@ -1258,17 +1258,24 @@ function destinationFillBuffer(this: InternalSource) { (this._destination as any)._fillBuffer(); } +interface ComposedFunction { + fn: Function, + next?: ComposedFunction +} -export abstract class SynchronousTransformIterator extends AsyncIterator { +export class MappingIterator extends AsyncIterator { + private _fn?: Function; private _destroySource: boolean; - public constructor(protected _source: AsyncIterator, protected options: { destroySource?: boolean } = {}) { - /* eslint-disable no-use-before-define */ + + constructor(protected source: AsyncIterator, private transforms?: ComposedFunction, private upstream: AsyncIterator = source, options: { destroySource?: boolean } = {}) { + // Subscribe the iterator directly upstream rather than the original source to avoid over-subscribing + // listeners to the original source super(); this._destroySource = options.destroySource !== false; const cleanup = () => { - _source.removeListener('end', onSourceEnd); - _source.removeListener('error', onSourceError); - _source.removeListener('readable', onSourceReadable); + upstream.removeListener('end', onSourceEnd); + upstream.removeListener('error', onSourceError); + upstream.removeListener('readable', onSourceReadable); }; const onSourceEnd = () => { cleanup(); @@ -1291,46 +1298,20 @@ export abstract class SynchronousTransformIterator extends AsyncIterat this.readable = true; } }; - _source.on('end', onSourceEnd); - _source.on('error', onSourceError); - _source.on('readable', onSourceReadable); - if (_source.done) + upstream.on('end', onSourceEnd); + upstream.on('error', onSourceError); + upstream.on('readable', onSourceReadable); + if (upstream.done) onSourceEnd(); - else if (_source.readable) + else if (upstream.readable) onSourceReadable(); } - destroy(cause?: Error): void { - this._source.destroy(cause); - super.destroy(cause); - } - - public close() { - if (this._destroySource) - this._source.destroy(); - super.close(); - } -} - -interface ComposedFunction { - fn: Function, - next?: ComposedFunction -} - -export class SyncTransformIterator extends SynchronousTransformIterator { - private _fn?: Function; - - constructor(private source: AsyncIterator, private transforms: ComposedFunction, private upstream: AsyncIterator = source, options?: { destroySource?: boolean }) { - // Subscribe the iterator directly upstream rather than the original source to avoid over-subscribing - // listeners to the original source - super(upstream, options); - } - get fn() { if (!this._fn) { const funcs: Function[] = []; // eslint-disable-next-line prefer-destructuring - let transforms: ComposedFunction | undefined = this.transforms; + let transforms: ComposedFunction | undefined = this.transforms!; do funcs.push(transforms.fn); // eslint-disable-next-line no-cond-assign @@ -1361,7 +1342,7 @@ export class SyncTransformIterator extends SynchronousTransformIterato } map(map: (item: D) => K | null, self?: any): AsyncIterator { - return new SyncTransformIterator(this.source, { fn: bind(map, self), next: this.transforms }, this); + return new MappingIterator(this.source, { fn: bind(map, self), next: this.transforms }, this); } destroy(cause?: Error): void { @@ -1370,13 +1351,13 @@ export class SyncTransformIterator extends SynchronousTransformIterato } public close() { - if (this.options.destroySource) + if (this._destroySource) this.upstream.destroy(); super.close(); } } -export class HeadIterator extends SynchronousTransformIterator { +export class HeadIterator extends MappingIterator { protected count: number = 0; constructor(source: AsyncIterator, protected readonly limit: number) { @@ -1384,7 +1365,7 @@ export class HeadIterator extends SynchronousTransformIterator { } read(): T | null { - const item = this._source.read(); + const item = this.source.read(); if (item !== null && this.count < this.limit) { this.count += 1; return item; diff --git a/test/SynchronousTransformIterator.js b/test/SynchronousTransformIterator.js index ce16571..2d8f673 100644 --- a/test/SynchronousTransformIterator.js +++ b/test/SynchronousTransformIterator.js @@ -1,14 +1,14 @@ import { AsyncIterator, ArrayIterator, - SynchronousTransformIterator, + MappingIterator, } from '../dist/asynciterator.js'; import { EventEmitter } from 'events'; -class _SynchronousTransformIterator extends SynchronousTransformIterator { +class _SynchronousTransformIterator extends MappingIterator { read() { - return this._source.read(); + return this.source.read(); } } @@ -22,7 +22,7 @@ describe('SynchronousTransformIterator', () => { }); it('should be a SynchronousTransformIterator object', () => { - instance.should.be.an.instanceof(SynchronousTransformIterator); + instance.should.be.an.instanceof(MappingIterator); }); it('should be an AsyncIterator object', () => { @@ -111,7 +111,7 @@ describe('SynchronousTransformIterator', () => { before(() => { source = new ArrayIterator([1, 2, 3]); - iterator = new _SynchronousTransformIterator(source, { destroySource: false }); + iterator = new _SynchronousTransformIterator(source, undefined, undefined, { destroySource: false }); }); describe('after being closed', () => { From 29c3e50165b2fdafdbbdde7c39212b1c1c5e8e2e Mon Sep 17 00:00:00 2001 From: Jesse Wright Date: Tue, 5 Apr 2022 10:50:25 +1000 Subject: [PATCH 19/26] chore: remove SynchronousTransformIterator and rename SyncTransformIterator -> MappingIterator --- asynciterator.ts | 2 ++ 1 file changed, 2 insertions(+) diff --git a/asynciterator.ts b/asynciterator.ts index a9e8b7d..20f75e4 100644 --- a/asynciterator.ts +++ b/asynciterator.ts @@ -1273,9 +1273,11 @@ export class MappingIterator extends AsyncIterator { super(); this._destroySource = options.destroySource !== false; const cleanup = () => { + /* eslint-disable no-use-before-define */ upstream.removeListener('end', onSourceEnd); upstream.removeListener('error', onSourceError); upstream.removeListener('readable', onSourceReadable); + /* eslint-enable no-use-before-define */ }; const onSourceEnd = () => { cleanup(); From f1a067645f6a33da38090840feb5cb1eed56a75d Mon Sep 17 00:00:00 2001 From: Jesse Wright Date: Tue, 5 Apr 2022 11:12:59 +1000 Subject: [PATCH 20/26] make readability reflect source --- asynciterator.ts | 25 +++++++++++-------------- 1 file changed, 11 insertions(+), 14 deletions(-) diff --git a/asynciterator.ts b/asynciterator.ts index 20f75e4..35142ab 100644 --- a/asynciterator.ts +++ b/asynciterator.ts @@ -1267,6 +1267,14 @@ export class MappingIterator extends AsyncIterator { private _fn?: Function; private _destroySource: boolean; + get readable() { + return this.source.readable; + } + + set readable(readable) { + this.source.readable = readable; + } + constructor(protected source: AsyncIterator, private transforms?: ComposedFunction, private upstream: AsyncIterator = source, options: { destroySource?: boolean } = {}) { // Subscribe the iterator directly upstream rather than the original source to avoid over-subscribing // listeners to the original source @@ -1287,26 +1295,15 @@ export class MappingIterator extends AsyncIterator { this.emit('error', err); }; const onSourceReadable = () => { - if (this.readable) { - // TODO: I'm not completely sure as to why this is needed but without - // the following line, some use cases relying on flow mode (i.e. - // consuming items via `on('data', (data) => {})`) do not work. - // It looks like the debouncing that happens in `set readable()` - // in `AsyncIterator` prevents the event from firing as `this` - // is already readable. - this.emit('readable'); - } - else { - this.readable = true; - } + this.emit('readable'); }; upstream.on('end', onSourceEnd); upstream.on('error', onSourceError); upstream.on('readable', onSourceReadable); if (upstream.done) onSourceEnd(); - else if (upstream.readable) - onSourceReadable(); + else + this.readable = upstream.readable; } get fn() { From b8971c20653c2ac29d9145bcc2fb3fd7f792727e Mon Sep 17 00:00:00 2001 From: Jesse Wright Date: Tue, 5 Apr 2022 11:24:25 +1000 Subject: [PATCH 21/26] chore: clean up MappingIterator --- asynciterator.ts | 36 +++++++++++++----------------------- 1 file changed, 13 insertions(+), 23 deletions(-) diff --git a/asynciterator.ts b/asynciterator.ts index 35142ab..164b2d8 100644 --- a/asynciterator.ts +++ b/asynciterator.ts @@ -1266,6 +1266,9 @@ interface ComposedFunction { export class MappingIterator extends AsyncIterator { private _fn?: Function; private _destroySource: boolean; + private onSourceError = (err: Error) => this.emit('error', err); + private onSourceReadable = () => this.emit('readable'); + private onSourceEnd = () => this.close(); get readable() { return this.source.readable; @@ -1280,30 +1283,11 @@ export class MappingIterator extends AsyncIterator { // listeners to the original source super(); this._destroySource = options.destroySource !== false; - const cleanup = () => { - /* eslint-disable no-use-before-define */ - upstream.removeListener('end', onSourceEnd); - upstream.removeListener('error', onSourceError); - upstream.removeListener('readable', onSourceReadable); - /* eslint-enable no-use-before-define */ - }; - const onSourceEnd = () => { - cleanup(); - this.close(); - }; - const onSourceError = (err: Error) => { - this.emit('error', err); - }; - const onSourceReadable = () => { - this.emit('readable'); - }; - upstream.on('end', onSourceEnd); - upstream.on('error', onSourceError); - upstream.on('readable', onSourceReadable); + upstream.on('end', this.onSourceEnd); + upstream.on('error', this.onSourceError); + upstream.on('readable', this.onSourceReadable); if (upstream.done) - onSourceEnd(); - else - this.readable = upstream.readable; + this.onSourceEnd(); } get fn() { @@ -1350,8 +1334,14 @@ export class MappingIterator extends AsyncIterator { } public close() { + this.upstream.removeListener('end', this.onSourceEnd); + this.upstream.removeListener('error', this.onSourceError); + this.upstream.removeListener('readable', this.onSourceReadable); if (this._destroySource) this.upstream.destroy(); + scheduleTask(() => { + delete this.source; + }); super.close(); } } From 8b901a5f3215c6bb27b647c8d4436090c50015e5 Mon Sep 17 00:00:00 2001 From: Jesse Wright Date: Tue, 5 Apr 2022 11:26:25 +1000 Subject: [PATCH 22/26] chore: style fix --- asynciterator.ts | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/asynciterator.ts b/asynciterator.ts index 164b2d8..2461acc 100644 --- a/asynciterator.ts +++ b/asynciterator.ts @@ -1278,7 +1278,12 @@ export class MappingIterator extends AsyncIterator { this.source.readable = readable; } - constructor(protected source: AsyncIterator, private transforms?: ComposedFunction, private upstream: AsyncIterator = source, options: { destroySource?: boolean } = {}) { + constructor( + protected source: AsyncIterator, + private transforms?: ComposedFunction, + private upstream: AsyncIterator = source, + options: { destroySource?: boolean } = {} + ) { // Subscribe the iterator directly upstream rather than the original source to avoid over-subscribing // listeners to the original source super(); From 72408fc1eaf350502ef337fab117b938025c7de3 Mon Sep 17 00:00:00 2001 From: Jesse Wright Date: Tue, 5 Apr 2022 11:55:46 +1000 Subject: [PATCH 23/26] chore: add readable test --- ...ransformIterator.js => MappingIterator.js} | 43 +++++++++++++------ 1 file changed, 29 insertions(+), 14 deletions(-) rename test/{SynchronousTransformIterator.js => MappingIterator.js} (77%) diff --git a/test/SynchronousTransformIterator.js b/test/MappingIterator.js similarity index 77% rename from test/SynchronousTransformIterator.js rename to test/MappingIterator.js index 2d8f673..58d7f49 100644 --- a/test/SynchronousTransformIterator.js +++ b/test/MappingIterator.js @@ -6,22 +6,22 @@ import { import { EventEmitter } from 'events'; -class _SynchronousTransformIterator extends MappingIterator { +class CustomTransformIterator extends MappingIterator { read() { return this.source.read(); } } -describe('SynchronousTransformIterator', () => { - describe('The SynchronousTransformIterator function', () => { +describe('CustomTransformIterator', () => { + describe('The CustomTransformIterator function', () => { describe('the result when called with `new`', () => { let instance; before(() => { - instance = new _SynchronousTransformIterator(new ArrayIterator([])); + instance = new CustomTransformIterator(new ArrayIterator([])); }); - it('should be a SynchronousTransformIterator object', () => { + it('should be a CustomTransformIterator object', () => { instance.should.be.an.instanceof(MappingIterator); }); @@ -35,12 +35,12 @@ describe('SynchronousTransformIterator', () => { }); }); - describe('A SynchronousTransformIterator', () => { + describe('A CustomTransformIterator', () => { let iterator, source; before(() => { source = new ArrayIterator([0, 1, 2, 3, 4, 5, 6]); - iterator = new _SynchronousTransformIterator(source); + iterator = new CustomTransformIterator(source); }); describe('when reading items', () => { @@ -57,10 +57,25 @@ describe('SynchronousTransformIterator', () => { }); }); - describe('A SynchronousTransformIterator with a source that emits 0 items', () => { + describe('A CustomTransformIterator', () => { + let iterator, source; + + before(() => { + source = new ArrayIterator([1]); + source._readable = false; + iterator = new CustomTransformIterator(source); + }); + + it('Should emit readable when readable is set to true', done => { + iterator.on('readable', done); + iterator.readable = true; + }); + }); + + describe('A CustomTransformIterator with a source that emits 0 items', () => { it('should not return any items', done => { const items = []; - const iterator = new _SynchronousTransformIterator(new ArrayIterator([])); + const iterator = new CustomTransformIterator(new ArrayIterator([])); iterator.on('data', item => { items.push(item); }); iterator.on('end', () => { items.should.deep.equal([]); @@ -69,12 +84,12 @@ describe('SynchronousTransformIterator', () => { }); }); - describe('A SynchronousTransformIterator with a source that is already ended', () => { + describe('A CustomTransformIterator with a source that is already ended', () => { it('should not return any items', done => { const items = []; const source = new ArrayIterator([]); source.on('end', () => { - const iterator = new _SynchronousTransformIterator(source); + const iterator = new CustomTransformIterator(source); iterator.on('data', item => { items.push(item); }); iterator.on('end', () => { items.should.deep.equal([]); @@ -90,7 +105,7 @@ describe('SynchronousTransformIterator', () => { before(() => { source = new ArrayIterator([1, 2, 3]); - iterator = new _SynchronousTransformIterator(source); + iterator = new CustomTransformIterator(source); }); describe('after being closed', () => { @@ -111,7 +126,7 @@ describe('SynchronousTransformIterator', () => { before(() => { source = new ArrayIterator([1, 2, 3]); - iterator = new _SynchronousTransformIterator(source, undefined, undefined, { destroySource: false }); + iterator = new CustomTransformIterator(source, undefined, undefined, { destroySource: false }); }); describe('after being closed', () => { @@ -132,7 +147,7 @@ describe('SynchronousTransformIterator', () => { before(() => { source = new AsyncIterator(); - iterator = new _SynchronousTransformIterator(source); + iterator = new CustomTransformIterator(source); iterator.on('error', errorHandler = sinon.stub()); }); From 79c5df94b42f801ba7e6fb8ddb65910ad088895a Mon Sep 17 00:00:00 2001 From: Jesse Wright Date: Tue, 5 Apr 2022 12:16:06 +1000 Subject: [PATCH 24/26] chore: call close rather than onSourceEnd in constructor --- asynciterator.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/asynciterator.ts b/asynciterator.ts index 2461acc..f48023f 100644 --- a/asynciterator.ts +++ b/asynciterator.ts @@ -1292,7 +1292,7 @@ export class MappingIterator extends AsyncIterator { upstream.on('error', this.onSourceError); upstream.on('readable', this.onSourceReadable); if (upstream.done) - this.onSourceEnd(); + this.close(); } get fn() { From d69ce40525bf575fa74927a25b1a1b5228c67b22 Mon Sep 17 00:00:00 2001 From: Jesse Wright Date: Tue, 5 Apr 2022 12:20:50 +1000 Subject: [PATCH 25/26] chore: dont add listeners unecessarily --- asynciterator.ts | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/asynciterator.ts b/asynciterator.ts index f48023f..8403304 100644 --- a/asynciterator.ts +++ b/asynciterator.ts @@ -1288,11 +1288,13 @@ export class MappingIterator extends AsyncIterator { // listeners to the original source super(); this._destroySource = options.destroySource !== false; - upstream.on('end', this.onSourceEnd); - upstream.on('error', this.onSourceError); - upstream.on('readable', this.onSourceReadable); if (upstream.done) this.close(); + else { + upstream.on('end', this.onSourceEnd); + upstream.on('error', this.onSourceError); + upstream.on('readable', this.onSourceReadable); + } } get fn() { From bc5d1cc2a76357daf9c34485dd4dfc07d6a23c0e Mon Sep 17 00:00:00 2001 From: Jesse Wright Date: Tue, 5 Apr 2022 12:30:19 +1000 Subject: [PATCH 26/26] chore: push lint changes --- asynciterator.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/asynciterator.ts b/asynciterator.ts index 8403304..a8b9d46 100644 --- a/asynciterator.ts +++ b/asynciterator.ts @@ -1288,8 +1288,9 @@ export class MappingIterator extends AsyncIterator { // listeners to the original source super(); this._destroySource = options.destroySource !== false; - if (upstream.done) + if (upstream.done) { this.close(); + } else { upstream.on('end', this.onSourceEnd); upstream.on('error', this.onSourceError);