From ba64a351adea56c15816cec391c83ec8d739fb3e Mon Sep 17 00:00:00 2001 From: Jesse Wright Date: Tue, 5 Apr 2022 00:53:01 +1000 Subject: [PATCH 1/2] Introduce MappingIterator. --- asynciterator.ts | 123 ++++++++-- test.js | 25 +++ test/AsyncIterator-test.js | 325 +++++++++++++++++++++++++++ test/MappingIterator.js | 209 +++++++++++++++++ test/SimpleTransformIterator-test.js | 48 ---- 5 files changed, 669 insertions(+), 61 deletions(-) create mode 100644 test.js create mode 100644 test/MappingIterator.js diff --git a/asynciterator.ts b/asynciterator.ts index ddffefd9..af832681 100644 --- a/asynciterator.ts +++ b/asynciterator.ts @@ -25,6 +25,11 @@ export function setTaskScheduler(scheduler: TaskScheduler): void { taskScheduler = scheduler; } +/** Binds a function to an object */ +function bind(fn: Function, self: any) { + return self ? fn.bind(self) : fn; +} + /** ID of the INIT state. An iterator is initializing if it is preparing main item generation. @@ -161,7 +166,7 @@ export class AsyncIterator extends EventEmitter { @param {object?} self The `this` pointer for the callback */ forEach(callback: (item: T) => void, self?: object) { - this.on('data', self ? callback.bind(self) : callback); + this.on('data', bind(callback, self)); } /** @@ -451,12 +456,14 @@ export class AsyncIterator extends EventEmitter { /** Maps items from this iterator using the given function. After this operation, only read the returned iterator instead of the current one. - @param {Function} map A mapping function to call on this iterator's (remaining) items + @param {Function} map A mapping function to call on this iterator's (remaining) items. + A `null` value indicates that nothing should be returned for a particular item.. @param {object?} self The `this` pointer for the mapping function + @param {boolean?} close Close the iterator after an item is mapped to null @returns {module:asynciterator.AsyncIterator} A new iterator that maps the items from this iterator */ - map(map: (item: T) => D, self?: any): AsyncIterator { - return this.transform({ map: self ? map.bind(self) : map }); + map(map: (item: T, it: AsyncIterator) => D | null, self?: any): AsyncIterator { + return new MappingIterator(this, [bind(map, self)]); } /** @@ -469,7 +476,8 @@ export class AsyncIterator extends EventEmitter { filter(filter: (item: T) => item is K, self?: any): AsyncIterator; filter(filter: (item: T) => boolean, self?: any): AsyncIterator; filter(filter: (item: T) => boolean, self?: any): AsyncIterator { - return this.transform({ filter: self ? filter.bind(self) : filter }); + filter = bind(filter, self); + return this.map(item => filter(item) ? item : null); } /** @@ -510,7 +518,7 @@ export class AsyncIterator extends EventEmitter { @returns {module:asynciterator.AsyncIterator} A new iterator that skips the given number of items */ skip(offset: number): AsyncIterator { - return this.transform({ offset }); + return this.map(item => offset-- > 0 ? null : item); } /** @@ -520,7 +528,7 @@ export class AsyncIterator extends EventEmitter { @returns {module:asynciterator.AsyncIterator} A new iterator with at most the given number of items */ take(limit: number): AsyncIterator { - return this.transform({ limit }); + return this.map((item, it) => limit-- > 0 ? item : (it.close(), null)); } /** @@ -531,7 +539,7 @@ export class AsyncIterator extends EventEmitter { @returns {module:asynciterator.AsyncIterator} A new iterator with items in the given range */ range(start: number, end: number): AsyncIterator { - return this.transform({ offset: start, limit: Math.max(end - start + 1, 0) }); + return this.skip(start).take(Math.max(end - start + 1, 0)); } /** @@ -1055,6 +1063,14 @@ export class BufferedIterator extends AsyncIterator { } } +function _validateSource(source?: AsyncIterator, allowDestination = false) { + if (!source || !isFunction(source.read) || !isFunction(source.on)) + throw new Error(`Invalid source: ${source}`); + if (!allowDestination && (source as any)._destination) + throw new Error('The source already has a destination'); + return source as InternalSource; +} + /** An iterator that generates items based on a source iterator. This class serves as a base class for other iterators. @@ -1153,11 +1169,7 @@ export class TransformIterator extends BufferedIterator { protected _validateSource(source?: AsyncIterator, allowDestination = false) { if (this._source || typeof this._createSource !== 'undefined') throw new Error('The source cannot be changed after it has been set'); - if (!source || !isFunction(source.read) || !isFunction(source.on)) - throw new Error(`Invalid source: ${source}`); - if (!allowDestination && (source as any)._destination) - throw new Error('The source already has a destination'); - return source as InternalSource; + return _validateSource(source, allowDestination); } /** @@ -1251,6 +1263,91 @@ function destinationFillBuffer(this: InternalSource) { (this._destination as any)._fillBuffer(); } +export class MappingIterator extends AsyncIterator { + private _destroySource: boolean; + + get readable() { + return this.source.readable; + } + + set readable(readable) { + this.source.readable = readable; + } + + constructor( + protected source: AsyncIterator, + private transforms: ((item: any, iterator: AsyncIterator) => any)[], + private upstream: AsyncIterator = source, + options: { destroySource?: boolean } = {} + ) { + // Subscribe the iterator directly upstream rather than the original source to avoid over-subscribing + // listeners to the original source + super(); + this._destroySource = options.destroySource !== false; + if (upstream.done) { + this.close(); + } + else { + _validateSource(upstream); + // @ts-ignore + upstream._destination = this; + upstream.on('end', onSourceEnd); + upstream.on('error', onSourceError); + upstream.on('readable', onSourceReadable); + } + } + + read(): D | null { + const { source, transforms } = this; + let item, i; + while ((item = source.read()) !== null) { + i = transforms.length; + // Applies each of the transforms in sequence, and terminates + // early if a transform returns null + // + // Do not use a for-of loop here, it slows down transformations + // by approximately a factor of 2. + while (i-- >= 1 && (item = transforms[i](item, this)) !== null) + ; + if (item !== null) + return item; + } + return null; + } + + map(map: (item: D, it: AsyncIterator) => K | null, self?: any): AsyncIterator { + return new MappingIterator(this.source, [bind(map, self), ...this.transforms], this); + } + + destroy(cause?: Error): void { + this.upstream.destroy(cause); + super.destroy(cause); + } + + public close() { + this.upstream.removeListener('end', onSourceEnd); + this.upstream.removeListener('error', onSourceError); + this.upstream.removeListener('readable', onSourceReadable); + if (this._destroySource) + this.upstream.destroy(); + scheduleTask(() => { + // @ts-ignore + delete this.upstream._destination; + delete this.source; + }); + super.close(); + } +} + +function onSourceError(this: InternalSource, error: Error) { + this._destination.emit('error', error); +} +function onSourceEnd(this: InternalSource) { + this._destination.close(); +} +function onSourceReadable(this: InternalSource) { + this._destination.emit('readable'); +} /** An iterator that generates items based on a source iterator diff --git a/test.js b/test.js new file mode 100644 index 00000000..5bcb87aa --- /dev/null +++ b/test.js @@ -0,0 +1,25 @@ +import { ArrayIterator } from './dist/asynciterator.js' + +let i = 0; +const arr = new Array(20_000).fill(true).map(() => i++); + +const now = Date.now(); +let times = 100; + +const loop = () => { + if (times === 0) { + console.log('elapsed', Date.now() - now); + return; + } + const iterator = new ArrayIterator(arr) + .map((item) => item) + .map((item) => item) + .filter((item) => item % 2 === 0) + ; + iterator.on('data', () => {}).on('end', () => { + times -= 1; + loop(); + }); +}; + +loop(); diff --git a/test/AsyncIterator-test.js b/test/AsyncIterator-test.js index 37a74d60..fae1f56a 100644 --- a/test/AsyncIterator-test.js +++ b/test/AsyncIterator-test.js @@ -5,6 +5,10 @@ import { ENDED, DESTROYED, scheduleTask, + range, + fromArray, + wrap, + ArrayIterator, } from '../dist/asynciterator.js'; import { EventEmitter } from 'events'; @@ -1307,4 +1311,325 @@ describe('AsyncIterator', () => { }); }); }); + + describe('A chain of maps and filters', () => { + for (const iteratorGen of [() => range(0, 2), () => fromArray([0, 1, 2]), () => wrap(range(0, 2))]) { + // eslint-disable-next-line no-loop-func + describe(`with ${iteratorGen()}`, () => { + let iterator; + + beforeEach(() => { + iterator = iteratorGen(); + }); + + it('should handle no transforms arrayified', async () => { + (await iterator.toArray()).should.deep.equal([0, 1, 2]); + }); + + it('should apply maps that doubles correctly', async () => { + (await iterator.map(x => x * 2).toArray()).should.deep.equal([0, 2, 4]); + }); + + it('should apply maps that doubles correctly and then maybemaps', async () => { + (await iterator.map(x => x * 2).map(x => x === 2 ? null : x * 3).toArray()).should.deep.equal([0, 12]); + }); + + it('should apply maps that maybemaps correctly', async () => { + (await iterator.map(x => x === 2 ? null : x * 3).toArray()).should.deep.equal([0, 3]); + }); + + it('should apply maps that maybemaps twice', async () => { + (await iterator.map(x => x === 2 ? null : x * 3).map(x => x === 0 ? null : x * 3).toArray()).should.deep.equal([9]); + }); + + it('should apply maps that converts to string', async () => { + (await iterator.map(x => `x${x}`).toArray()).should.deep.equal(['x0', 'x1', 'x2']); + }); + + it('should apply filter correctly', async () => { + (await iterator.filter(x => x % 2 === 0).toArray()).should.deep.equal([0, 2]); + }); + + it('should apply filter then map correctly', async () => { + (await iterator.filter(x => x % 2 === 0).map(x => `x${x}`).toArray()).should.deep.equal(['x0', 'x2']); + }); + + it('should apply map then filter correctly (1)', async () => { + (await iterator.map(x => x).filter(x => x % 2 === 0).toArray()).should.deep.equal([0, 2]); + }); + + it('should apply map then filter to false correctly', async () => { + (await iterator.map(x => `x${x}`).filter(x => true).toArray()).should.deep.equal(['x0', 'x1', 'x2']); + }); + + it('should apply map then filter to true correctly', async () => { + (await iterator.map(x => `x${x}`).filter(x => false).toArray()).should.deep.equal([]); + }); + + it('should apply filter to false then map correctly', async () => { + (await iterator.filter(x => true).map(x => `x${x}`).toArray()).should.deep.equal(['x0', 'x1', 'x2']); + }); + + it('should apply filter to true then map correctly', async () => { + (await iterator.filter(x => false).map(x => `x${x}`).filter(x => false).toArray()).should.deep.equal([]); + }); + + it('should apply filter one then double', async () => { + (await iterator.filter(x => x !== 1).map(x => x * 2).toArray()).should.deep.equal([0, 4]); + }); + + it('should apply double then filter one', async () => { + (await iterator.map(x => x * 2).filter(x => x !== 1).toArray()).should.deep.equal([0, 2, 4]); + }); + + it('should apply map then filter correctly', async () => { + (await iterator.map(x => `x${x}`).filter(x => (x[1] === '0')).toArray()).should.deep.equal(['x0']); + }); + + it('should correctly apply 3 filters', async () => { + (await range(0, 5).filter(x => x !== 1).filter(x => x !== 2).filter(x => x !== 2).toArray()).should.deep.equal([0, 3, 4, 5]); + }); + + it('should correctly apply 3 maps', async () => { + (await range(0, 1).map(x => x * 2).map(x => `z${x}`).map(x => `y${x}`).toArray()).should.deep.equal(['yz0', 'yz2']); + }); + + it('should correctly apply a map, followed by a filter, followed by another map', async () => { + (await range(0, 1).map(x => x * 2).filter(x => x !== 2).map(x => `y${x}`).toArray()).should.deep.equal(['y0']); + }); + + it('should correctly apply a filter-map-filter', async () => { + (await range(0, 2).filter(x => x !== 1).map(x => x * 3).filter(x => x !== 6).toArray()).should.deep.equal([0]); + }); + + it('should destroy when closed before being read after map', () => { + iterator.map(x => x).close(); + iterator.destroyed.should.be.true; + }); + + it('should destroy when closed before being read after map then filter', () => { + it = iterator.map(x => x); + it.filter(x => true).close(); + iterator.destroyed.should.be.true; + it.destroyed.should.be.true; + }); + + describe('when called on an iterator with a `this` argument', () => { + const self = {}; + let map, result; + + before(() => { + let i = 0; + iterator = new ArrayIterator(['a', 'b', 'c']); + map = sinon.spy(item => item + (++i)); + result = iterator.map(map, self); + }); + + describe('the return value', () => { + const items = []; + + before(done => { + result.on('data', item => { items.push(item); }); + result.on('end', done); + }); + + it('should call the map function once for each item', () => { + map.should.have.been.calledThrice; + }); + + it('should call the map function with the passed argument as `this`', () => { + map.alwaysCalledOn(self).should.be.true; + }); + }); + }); + + describe('when called on an iterator with a `this` argument with nested map', () => { + const self = {}; + let map, result; + + before(() => { + let i = 0; + iterator = new ArrayIterator(['a', 'b', 'c']); + map = sinon.spy(item => item + (++i)); + result = iterator.map(x => x).map(map, self); + }); + + describe('the return value', () => { + const items = []; + + before(done => { + result.on('data', item => { items.push(item); }); + result.on('end', done); + }); + + it('should call the map function once for each item', () => { + map.should.have.been.calledThrice; + }); + + it('should call the map function with the passed argument as `this`', () => { + map.alwaysCalledOn(self).should.be.true; + }); + }); + }); + }); + } + }); + + describe('Skipping', () => { + describe('The .skip function', () => { + describe('the result when called with `new`', () => { + let instance; + + before(() => { + instance = new ArrayIterator([]).skip(10); + }); + + it('should be an AsyncIterator object', () => { + instance.should.be.an.instanceof(AsyncIterator); + }); + + it('should be an EventEmitter object', () => { + instance.should.be.an.instanceof(EventEmitter); + }); + }); + }); + + describe('A .skip', () => { + let iterator, source; + + before(() => { + source = new ArrayIterator([0, 1, 2, 3, 4, 5, 6]); + iterator = source.skip(4); + }); + + describe('when reading items', () => { + const items = []; + + before(done => { + iterator.on('data', item => { items.push(item); }); + iterator.on('end', done); + }); + + it('should return items skipping the specified amount', () => { + items.should.deep.equal([4, 5, 6]); + }); + }); + }); + + describe('A .skip', () => { + let iterator, source; + + before(() => { + source = range(0, 6); + iterator = source.skip(4); + }); + + describe('when reading items', () => { + const items = []; + + before(done => { + iterator.on('data', item => { items.push(item); }); + iterator.on('end', done); + }); + + it('should return items skipping the specified amount', () => { + items.should.deep.equal([4, 5, 6]); + }); + }); + }); + + describe('A .skip with a source that emits 0 items', () => { + it('should not return any items', done => { + const items = []; + const iterator = new ArrayIterator([]).skip(10); + iterator.on('data', item => { items.push(item); }); + iterator.on('end', () => { + items.should.deep.equal([]); + done(); + }); + }); + }); + + describe('A .skip with a limit of 0 items', () => { + it('should emit all items', done => { + const items = []; + const iterator = new ArrayIterator([0, 1, 2, 3, 4, 5, 6]).skip(0); + iterator.on('data', item => { items.push(item); }); + iterator.on('end', () => { + items.should.deep.equal([0, 1, 2, 3, 4, 5, 6]); + done(); + }); + }); + }); + + describe('A .skip with a limit of Infinity items', () => { + it('should skip all items', done => { + const items = []; + const iterator = new ArrayIterator([0, 1, 2, 3, 4, 5, 6]).skip(Infinity); + iterator.on('data', item => { items.push(item); }); + iterator.on('end', () => { + items.should.deep.equal([]); + done(); + }); + }); + }); + + describe('.take', () => { + describe('A .take', () => { + let iterator, source; + before(() => { + source = new ArrayIterator([0, 1, 2, 3, 4, 5, 6]); + iterator = source.take(4); + }); + + describe('when reading items', () => { + const items = []; + before(done => { + iterator.on('data', item => { items.push(item); }); + iterator.on('end', done); + }); + + it('should return items takeed to the specified take', () => { + items.should.deep.equal([0, 1, 2, 3]); + }); + }); + }); + + describe('A .take with a source that emits 0 items', () => { + it('should not return any items', done => { + const items = []; + const iterator = new ArrayIterator([]).take(10); + iterator.on('data', item => { items.push(item); }); + iterator.on('end', () => { + items.should.deep.equal([]); + done(); + }); + }); + }); + + describe('A .take with a take of 0 items', () => { + it('should not emit any items', done => { + const items = []; + const iterator = new ArrayIterator([0, 1, 2]).take(0); + iterator.on('data', item => { items.push(item); }); + iterator.on('end', () => { + items.should.deep.equal([]); + done(); + }); + }); + }); + + describe('A .take with a take of Infinity items', () => { + it('should emit all items', done => { + const items = []; + const iterator = new ArrayIterator([0, 1, 2, 3, 4, 5, 6]).take(Infinity); + iterator.on('data', item => { items.push(item); }); + iterator.on('end', () => { + items.should.deep.equal([0, 1, 2, 3, 4, 5, 6]); + done(); + }); + }); + }); + }); + }); }); diff --git a/test/MappingIterator.js b/test/MappingIterator.js new file mode 100644 index 00000000..58d7f497 --- /dev/null +++ b/test/MappingIterator.js @@ -0,0 +1,209 @@ +import { + AsyncIterator, + ArrayIterator, + MappingIterator, +} from '../dist/asynciterator.js'; + +import { EventEmitter } from 'events'; + +class CustomTransformIterator extends MappingIterator { + read() { + return this.source.read(); + } +} + +describe('CustomTransformIterator', () => { + describe('The CustomTransformIterator function', () => { + describe('the result when called with `new`', () => { + let instance; + + before(() => { + instance = new CustomTransformIterator(new ArrayIterator([])); + }); + + it('should be a CustomTransformIterator object', () => { + instance.should.be.an.instanceof(MappingIterator); + }); + + it('should be an AsyncIterator object', () => { + instance.should.be.an.instanceof(AsyncIterator); + }); + + it('should be an EventEmitter object', () => { + instance.should.be.an.instanceof(EventEmitter); + }); + }); + }); + + describe('A CustomTransformIterator', () => { + let iterator, source; + + before(() => { + source = new ArrayIterator([0, 1, 2, 3, 4, 5, 6]); + iterator = new CustomTransformIterator(source); + }); + + describe('when reading items', () => { + const items = []; + + before(done => { + iterator.on('data', item => { items.push(item); }); + iterator.on('end', done); + }); + + it('should return all items', () => { + items.should.deep.equal([0, 1, 2, 3, 4, 5, 6]); + }); + }); + }); + + describe('A CustomTransformIterator', () => { + let iterator, source; + + before(() => { + source = new ArrayIterator([1]); + source._readable = false; + iterator = new CustomTransformIterator(source); + }); + + it('Should emit readable when readable is set to true', done => { + iterator.on('readable', done); + iterator.readable = true; + }); + }); + + describe('A CustomTransformIterator with a source that emits 0 items', () => { + it('should not return any items', done => { + const items = []; + const iterator = new CustomTransformIterator(new ArrayIterator([])); + iterator.on('data', item => { items.push(item); }); + iterator.on('end', () => { + items.should.deep.equal([]); + done(); + }); + }); + }); + + describe('A CustomTransformIterator with a source that is already ended', () => { + it('should not return any items', done => { + const items = []; + const source = new ArrayIterator([]); + source.on('end', () => { + const iterator = new CustomTransformIterator(source); + iterator.on('data', item => { items.push(item); }); + iterator.on('end', () => { + items.should.deep.equal([]); + done(); + }); + }); + }); + }); + + + describe('A TransformIterator with destroySource set to its default', () => { + let iterator, source; + + before(() => { + source = new ArrayIterator([1, 2, 3]); + iterator = new CustomTransformIterator(source); + }); + + describe('after being closed', () => { + before(done => { + iterator.read(); + iterator.close(); + iterator.on('end', done); + }); + + it('should have destroyed the source', () => { + expect(source).to.have.property('destroyed', true); + }); + }); + }); + + describe('A TransformIterator with destroySource set to false', () => { + let iterator, source; + + before(() => { + source = new ArrayIterator([1, 2, 3]); + iterator = new CustomTransformIterator(source, undefined, undefined, { destroySource: false }); + }); + + describe('after being closed', () => { + before(done => { + iterator.read(); + iterator.close(); + iterator.on('end', done); + }); + + it('should not have destroyed the source', () => { + expect(source).to.have.property('destroyed', false); + }); + }); + }); + + describe('A TransformIterator with a source that errors', () => { + let iterator, source, errorHandler; + + before(() => { + source = new AsyncIterator(); + iterator = new CustomTransformIterator(source); + iterator.on('error', errorHandler = sinon.stub()); + }); + + describe('before an error occurs', () => { + it('should not have emitted any error', () => { + errorHandler.should.not.have.been.called; + }); + }); + + describe('after a first error occurs', () => { + let error1; + before(() => { + errorHandler.reset(); + source.emit('error', error1 = new Error('error1')); + }); + + it('should re-emit the error', () => { + errorHandler.should.have.been.calledOnce; + errorHandler.should.have.been.calledWith(error1); + }); + }); + + describe('after a second error occurs', () => { + let error2; + + before(() => { + errorHandler.reset(); + source.emit('error', error2 = new Error('error2')); + }); + + it('should re-emit the error', () => { + errorHandler.should.have.been.calledOnce; + errorHandler.should.have.been.calledWith(error2); + }); + }); + + describe('after the source has ended and errors again', () => { + before(done => { + errorHandler.reset(); + source.close(); + iterator.on('end', () => { + function noop() { /* */ } + source.on('error', noop); // avoid triggering the default error handler + source.emit('error', new Error('error3')); + source.removeListener('error', noop); + done(); + }); + }); + + it('should not re-emit the error', () => { + errorHandler.should.not.have.been.called; + }); + + it('should not leave any error handlers attached', () => { + source.listenerCount('error').should.equal(0); + }); + }); + }); +}); diff --git a/test/SimpleTransformIterator-test.js b/test/SimpleTransformIterator-test.js index e033b8e6..8fb27611 100644 --- a/test/SimpleTransformIterator-test.js +++ b/test/SimpleTransformIterator-test.js @@ -1110,10 +1110,6 @@ describe('SimpleTransformIterator', () => { result.on('end', done); }); - it('should be a SimpleTransformIterator', () => { - result.should.be.an.instanceof(SimpleTransformIterator); - }); - it('should execute the map function on all items in order', () => { items.should.deep.equal(['a1', 'b2', 'c3']); }); @@ -1121,10 +1117,6 @@ describe('SimpleTransformIterator', () => { it('should call the map function once for each item', () => { map.should.have.been.calledThrice; }); - - it('should call the map function with the returned iterator as `this`', () => { - map.alwaysCalledOn(result).should.be.true; - }); }); }); @@ -1145,14 +1137,6 @@ describe('SimpleTransformIterator', () => { result.on('end', done); }); - it('should be a SimpleTransformIterator', () => { - result.should.be.an.instanceof(SimpleTransformIterator); - }); - - it('should execute the map function on all items in order', () => { - items.should.deep.equal(['a1', 'b2', 'c3']); - }); - it('should call the map function once for each item', () => { map.should.have.been.calledThrice; }); @@ -1184,10 +1168,6 @@ describe('SimpleTransformIterator', () => { result.on('end', done); }); - it('should be a SimpleTransformIterator', () => { - result.should.be.an.instanceof(SimpleTransformIterator); - }); - it('should execute the filter function on all items in order', () => { items.should.deep.equal(['a', 'c']); }); @@ -1195,10 +1175,6 @@ describe('SimpleTransformIterator', () => { it('should call the filter function once for each item', () => { filter.should.have.been.calledThrice; }); - - it('should call the filter function with the returned iterator as `this`', () => { - filter.alwaysCalledOn(result).should.be.true; - }); }); }); @@ -1218,10 +1194,6 @@ describe('SimpleTransformIterator', () => { result.on('end', done); }); - it('should be a SimpleTransformIterator', () => { - result.should.be.an.instanceof(SimpleTransformIterator); - }); - it('should execute the filter function on all items in order', () => { items.should.deep.equal(['a', 'c']); }); @@ -1346,10 +1318,6 @@ describe('SimpleTransformIterator', () => { result.on('end', done); }); - it('should be a SimpleTransformIterator', () => { - result.should.be.an.instanceof(SimpleTransformIterator); - }); - it('should skip the given number of items', () => { items.should.deep.equal(['c', 'd', 'e']); }); @@ -1376,10 +1344,6 @@ describe('SimpleTransformIterator', () => { result.on('end', done); }); - it('should be a SimpleTransformIterator', () => { - result.should.be.an.instanceof(SimpleTransformIterator); - }); - it('should take the given number of items', () => { items.should.deep.equal(['a', 'b', 'c']); }); @@ -1406,10 +1370,6 @@ describe('SimpleTransformIterator', () => { result.on('end', done); }); - it('should be a SimpleTransformIterator', () => { - result.should.be.an.instanceof(SimpleTransformIterator); - }); - it('should contain the indicated range', () => { items.should.have.length(10); items[0].should.equal(20); @@ -1433,17 +1393,9 @@ describe('SimpleTransformIterator', () => { result.on('end', done); }); - it('should be a SimpleTransformIterator', () => { - result.should.be.an.instanceof(SimpleTransformIterator); - }); - it('should be empty', () => { items.should.be.empty; }); - - it('should not have called `read` on the iterator', () => { - iterator.read.should.not.have.been.called; - }); }); }); }); From 3be4178f2fa0780b577016c40b2a178744a37691 Mon Sep 17 00:00:00 2001 From: Ruben Verborgh Date: Thu, 7 Apr 2022 23:15:34 +0200 Subject: [PATCH 2/2] WIP: Review. --- .github/workflows/ci.yml | 1 + asynciterator.ts | 221 ++++++---- package.json | 4 +- test.js | 25 -- test/AsyncIterator-test.js | 325 -------------- test/MappingIterator-test.js | 613 +++++++++++++++++++++++++++ test/MappingIterator.js | 209 --------- test/SimpleTransformIterator-test.js | 101 ----- 8 files changed, 744 insertions(+), 755 deletions(-) delete mode 100644 test.js create mode 100644 test/MappingIterator-test.js delete mode 100644 test/MappingIterator.js diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index b2034541..6ba9d7db 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -25,6 +25,7 @@ jobs: if: ${{ matrix.node-version != '10.x' }} - run: npx c8 --reporter=lcov mocha - uses: coverallsapp/github-action@master + if: ${{ matrix.node-version != '10.x' }} with: github-token: ${{ secrets.github_token }} flag-name: run-${{ matrix.node-version }} diff --git a/asynciterator.ts b/asynciterator.ts index af832681..0c6e5abf 100644 --- a/asynciterator.ts +++ b/asynciterator.ts @@ -26,7 +26,7 @@ export function setTaskScheduler(scheduler: TaskScheduler): void { } /** Binds a function to an object */ -function bind(fn: Function, self: any) { +function bind(fn: Function, self?: object) { return self ? fn.bind(self) : fn; } @@ -457,13 +457,12 @@ export class AsyncIterator extends EventEmitter { Maps items from this iterator using the given function. After this operation, only read the returned iterator instead of the current one. @param {Function} map A mapping function to call on this iterator's (remaining) items. - A `null` value indicates that nothing should be returned for a particular item.. + A `null` value indicates that nothing should be returned for a particular item. @param {object?} self The `this` pointer for the mapping function - @param {boolean?} close Close the iterator after an item is mapped to null @returns {module:asynciterator.AsyncIterator} A new iterator that maps the items from this iterator */ - map(map: (item: T, it: AsyncIterator) => D | null, self?: any): AsyncIterator { - return new MappingIterator(this, [bind(map, self)]); + map(map: MapFunction, self?: any): AsyncIterator { + return new MappingIterator(this, bind(map, self)); } /** @@ -785,9 +784,121 @@ export class IntegerIterator extends AsyncIterator { } } +/** + * A synchronous mapping function from one element to another. + * The iterator performing the mapping is passed as a second argument. + */ +export type MapFunction = AsyncIterator> = + (item: S, iterator: I) => D | null; + +/** + An iterator that calls a synchronous mapping function + on every item from its source iterator. + @extends module:asynciterator.AsyncIterator +*/ +export class MappingIterator extends AsyncIterator { + protected _source: AsyncIterator; + private readonly _destroySource: boolean; + private readonly _mappings: MapFunction>[]; + private readonly _mappingRoot: InternalSource; + + // This is wrong: readable should be set by listening to source events + get readable() { + return (this._state < CLOSED) && this._source.readable; + } + + /** + * Applies the given mapping to the source iterator. + */ + constructor( + source: AsyncIterator, + mapping?: MapFunction>, + options?: SourcedIteratorOptions, + ); + + /** + * Applies the given list of mappings to the mapping root. + * + * This is an optimization for + * root.map(f1).map(f2).map(f3) + * where the combined mapping x => f3(f2(f1(x))) + * is applied to root rather than to the intermediate sources. + */ + constructor( + source: AsyncIterator, + mappings: MapFunction>[], + mappingRoot: AsyncIterator, + options?: SourcedIteratorOptions, + ); + + constructor( + source: AsyncIterator, + mappings: MapFunction> | + MapFunction>[] = [], + mappingRoot?: AsyncIterator | SourcedIteratorOptions, + options: SourcedIteratorOptions = {}, + ) { + super(); + // Resolve optional parameters + if (!isEventEmitter(mappingRoot)) { + if (mappingRoot) + options = mappingRoot; + mappingRoot = source; + } + this._source = source; + this._mappings = isFunction(mappings) ? [mappings] : mappings; + this._mappingRoot = mappingRoot as InternalSource; + this._destroySource = options.destroySource !== false; + + if (mappingRoot.done) { + this.close(); + } + else { + _validateSource(mappingRoot); + this._mappingRoot._destination = this; + this._mappingRoot.on('end', destinationClose); + this._mappingRoot.on('error', destinationEmitError); + this._mappingRoot.on('readable', destinationEmitReadable); + } + } + + read(): D | null { + let mapped : any = null; + while (mapped === null && (mapped = this._source.read()) !== null) { + for (let i = 0; i < this._mappings.length; i++) { + mapped = this._mappings[i](mapped, this); + if (mapped === null) + break; + } + } + return mapped; + } + + map(map: MapFunction, self?: any): AsyncIterator { + return new MappingIterator(this._source, [...this._mappings, bind(map, self)], this); + } + + public close() { + if (this._destroySource) + this._mappingRoot.destroy(); + super.close(); + } + + /* Cleans up the source iterator and ends. */ + protected _end(destroy: boolean) { + this._mappingRoot.removeListener('end', destinationClose); + this._mappingRoot.removeListener('error', destinationEmitError); + this._mappingRoot.removeListener('readable', destinationEmitReadable); + delete this._mappingRoot._destination; + if (this._destroySource) + this._mappingRoot.destroy(); + super._end(destroy); + } +} + /** - A iterator that maintains an internal buffer of items. + An iterator that maintains an internal buffer of items. This class serves as a base class for other iterators with a typically complex item generation process. @extends module:asynciterator.AsyncIterator @@ -1252,9 +1363,15 @@ export class TransformIterator extends BufferedIterator { } } +function destinationEmitReadable(this: InternalSource) { + this._destination!.emit('readable'); +} function destinationEmitError(this: InternalSource, error: Error) { this._destination!.emit('error', error); } +function destinationClose(this: InternalSource) { + this._destination!.close(); +} function destinationCloseWhenDone(this: InternalSource) { (this._destination as any)._closeWhenDone(); } @@ -1263,91 +1380,6 @@ function destinationFillBuffer(this: InternalSource) { (this._destination as any)._fillBuffer(); } -export class MappingIterator extends AsyncIterator { - private _destroySource: boolean; - - get readable() { - return this.source.readable; - } - - set readable(readable) { - this.source.readable = readable; - } - - constructor( - protected source: AsyncIterator, - private transforms: ((item: any, iterator: AsyncIterator) => any)[], - private upstream: AsyncIterator = source, - options: { destroySource?: boolean } = {} - ) { - // Subscribe the iterator directly upstream rather than the original source to avoid over-subscribing - // listeners to the original source - super(); - this._destroySource = options.destroySource !== false; - if (upstream.done) { - this.close(); - } - else { - _validateSource(upstream); - // @ts-ignore - upstream._destination = this; - upstream.on('end', onSourceEnd); - upstream.on('error', onSourceError); - upstream.on('readable', onSourceReadable); - } - } - - read(): D | null { - const { source, transforms } = this; - let item, i; - while ((item = source.read()) !== null) { - i = transforms.length; - // Applies each of the transforms in sequence, and terminates - // early if a transform returns null - // - // Do not use a for-of loop here, it slows down transformations - // by approximately a factor of 2. - while (i-- >= 1 && (item = transforms[i](item, this)) !== null) - ; - if (item !== null) - return item; - } - return null; - } - - map(map: (item: D, it: AsyncIterator) => K | null, self?: any): AsyncIterator { - return new MappingIterator(this.source, [bind(map, self), ...this.transforms], this); - } - - destroy(cause?: Error): void { - this.upstream.destroy(cause); - super.destroy(cause); - } - - public close() { - this.upstream.removeListener('end', onSourceEnd); - this.upstream.removeListener('error', onSourceError); - this.upstream.removeListener('readable', onSourceReadable); - if (this._destroySource) - this.upstream.destroy(); - scheduleTask(() => { - // @ts-ignore - delete this.upstream._destination; - delete this.source; - }); - super.close(); - } -} - -function onSourceError(this: InternalSource, error: Error) { - this._destination.emit('error', error); -} -function onSourceEnd(this: InternalSource) { - this._destination.close(); -} -function onSourceReadable(this: InternalSource) { - this._destination.emit('readable'); -} /** An iterator that generates items based on a source iterator @@ -2043,15 +2075,18 @@ function isSourceExpression(object: any): object is SourceExpression { return object && (isEventEmitter(object) || isPromise(object) || isFunction(object)); } +export interface SourcedIteratorOptions { + destroySource?: boolean; +} + export interface BufferedIteratorOptions { maxBufferSize?: number; autoStart?: boolean; } -export interface TransformIteratorOptions extends BufferedIteratorOptions { +export interface TransformIteratorOptions extends SourcedIteratorOptions, BufferedIteratorOptions { source?: SourceExpression; optional?: boolean; - destroySource?: boolean; } export interface TransformOptions extends TransformIteratorOptions { diff --git a/package.json b/package.json index ec6b170f..856c4031 100644 --- a/package.json +++ b/package.json @@ -21,8 +21,8 @@ "scripts": { "build": "npm run build:clean && npm run build:module && npm run build:commonjs && npm run build:types", "build:clean": "rm -rf dist", - "build:module": " tsc --module es2015 && mv dist/ts-out/*.js dist && npm run build:module:import", - "build:module:import": " sed -i'.bak' -e 's/\\.\\/linkedlist/.\\/linkedlist.js/' -e 's/\\.\\/taskscheduler/.\\/taskscheduler.js/' dist/asynciterator.js && rm dist/*.bak", + "build:module": "tsc && mv dist/ts-out/*.js dist && npm run build:module:import", + "build:module:import": "sed -i'.bak' -e 's/\\.\\/linkedlist/.\\/linkedlist.js/' -e 's/\\.\\/taskscheduler/.\\/taskscheduler.js/' dist/asynciterator.js && rm dist/*.bak", "build:commonjs": "tsc --module commonjs && ./.change-extension cjs dist/ts-out/*.js && mv dist/ts-out/*.cjs dist && npm run build:commonjs:import", "build:commonjs:import": "sed -i'.bak' -e 's/\\.\\/linkedlist/.\\/linkedlist.cjs/' -e 's/\\.\\/taskscheduler/.\\/taskscheduler.cjs/' dist/asynciterator.cjs && rm dist/*.bak", "build:types": "tsc -d && rm dist/ts-out/*.js && mv dist/ts-out/*.d.ts dist", diff --git a/test.js b/test.js deleted file mode 100644 index 5bcb87aa..00000000 --- a/test.js +++ /dev/null @@ -1,25 +0,0 @@ -import { ArrayIterator } from './dist/asynciterator.js' - -let i = 0; -const arr = new Array(20_000).fill(true).map(() => i++); - -const now = Date.now(); -let times = 100; - -const loop = () => { - if (times === 0) { - console.log('elapsed', Date.now() - now); - return; - } - const iterator = new ArrayIterator(arr) - .map((item) => item) - .map((item) => item) - .filter((item) => item % 2 === 0) - ; - iterator.on('data', () => {}).on('end', () => { - times -= 1; - loop(); - }); -}; - -loop(); diff --git a/test/AsyncIterator-test.js b/test/AsyncIterator-test.js index fae1f56a..37a74d60 100644 --- a/test/AsyncIterator-test.js +++ b/test/AsyncIterator-test.js @@ -5,10 +5,6 @@ import { ENDED, DESTROYED, scheduleTask, - range, - fromArray, - wrap, - ArrayIterator, } from '../dist/asynciterator.js'; import { EventEmitter } from 'events'; @@ -1311,325 +1307,4 @@ describe('AsyncIterator', () => { }); }); }); - - describe('A chain of maps and filters', () => { - for (const iteratorGen of [() => range(0, 2), () => fromArray([0, 1, 2]), () => wrap(range(0, 2))]) { - // eslint-disable-next-line no-loop-func - describe(`with ${iteratorGen()}`, () => { - let iterator; - - beforeEach(() => { - iterator = iteratorGen(); - }); - - it('should handle no transforms arrayified', async () => { - (await iterator.toArray()).should.deep.equal([0, 1, 2]); - }); - - it('should apply maps that doubles correctly', async () => { - (await iterator.map(x => x * 2).toArray()).should.deep.equal([0, 2, 4]); - }); - - it('should apply maps that doubles correctly and then maybemaps', async () => { - (await iterator.map(x => x * 2).map(x => x === 2 ? null : x * 3).toArray()).should.deep.equal([0, 12]); - }); - - it('should apply maps that maybemaps correctly', async () => { - (await iterator.map(x => x === 2 ? null : x * 3).toArray()).should.deep.equal([0, 3]); - }); - - it('should apply maps that maybemaps twice', async () => { - (await iterator.map(x => x === 2 ? null : x * 3).map(x => x === 0 ? null : x * 3).toArray()).should.deep.equal([9]); - }); - - it('should apply maps that converts to string', async () => { - (await iterator.map(x => `x${x}`).toArray()).should.deep.equal(['x0', 'x1', 'x2']); - }); - - it('should apply filter correctly', async () => { - (await iterator.filter(x => x % 2 === 0).toArray()).should.deep.equal([0, 2]); - }); - - it('should apply filter then map correctly', async () => { - (await iterator.filter(x => x % 2 === 0).map(x => `x${x}`).toArray()).should.deep.equal(['x0', 'x2']); - }); - - it('should apply map then filter correctly (1)', async () => { - (await iterator.map(x => x).filter(x => x % 2 === 0).toArray()).should.deep.equal([0, 2]); - }); - - it('should apply map then filter to false correctly', async () => { - (await iterator.map(x => `x${x}`).filter(x => true).toArray()).should.deep.equal(['x0', 'x1', 'x2']); - }); - - it('should apply map then filter to true correctly', async () => { - (await iterator.map(x => `x${x}`).filter(x => false).toArray()).should.deep.equal([]); - }); - - it('should apply filter to false then map correctly', async () => { - (await iterator.filter(x => true).map(x => `x${x}`).toArray()).should.deep.equal(['x0', 'x1', 'x2']); - }); - - it('should apply filter to true then map correctly', async () => { - (await iterator.filter(x => false).map(x => `x${x}`).filter(x => false).toArray()).should.deep.equal([]); - }); - - it('should apply filter one then double', async () => { - (await iterator.filter(x => x !== 1).map(x => x * 2).toArray()).should.deep.equal([0, 4]); - }); - - it('should apply double then filter one', async () => { - (await iterator.map(x => x * 2).filter(x => x !== 1).toArray()).should.deep.equal([0, 2, 4]); - }); - - it('should apply map then filter correctly', async () => { - (await iterator.map(x => `x${x}`).filter(x => (x[1] === '0')).toArray()).should.deep.equal(['x0']); - }); - - it('should correctly apply 3 filters', async () => { - (await range(0, 5).filter(x => x !== 1).filter(x => x !== 2).filter(x => x !== 2).toArray()).should.deep.equal([0, 3, 4, 5]); - }); - - it('should correctly apply 3 maps', async () => { - (await range(0, 1).map(x => x * 2).map(x => `z${x}`).map(x => `y${x}`).toArray()).should.deep.equal(['yz0', 'yz2']); - }); - - it('should correctly apply a map, followed by a filter, followed by another map', async () => { - (await range(0, 1).map(x => x * 2).filter(x => x !== 2).map(x => `y${x}`).toArray()).should.deep.equal(['y0']); - }); - - it('should correctly apply a filter-map-filter', async () => { - (await range(0, 2).filter(x => x !== 1).map(x => x * 3).filter(x => x !== 6).toArray()).should.deep.equal([0]); - }); - - it('should destroy when closed before being read after map', () => { - iterator.map(x => x).close(); - iterator.destroyed.should.be.true; - }); - - it('should destroy when closed before being read after map then filter', () => { - it = iterator.map(x => x); - it.filter(x => true).close(); - iterator.destroyed.should.be.true; - it.destroyed.should.be.true; - }); - - describe('when called on an iterator with a `this` argument', () => { - const self = {}; - let map, result; - - before(() => { - let i = 0; - iterator = new ArrayIterator(['a', 'b', 'c']); - map = sinon.spy(item => item + (++i)); - result = iterator.map(map, self); - }); - - describe('the return value', () => { - const items = []; - - before(done => { - result.on('data', item => { items.push(item); }); - result.on('end', done); - }); - - it('should call the map function once for each item', () => { - map.should.have.been.calledThrice; - }); - - it('should call the map function with the passed argument as `this`', () => { - map.alwaysCalledOn(self).should.be.true; - }); - }); - }); - - describe('when called on an iterator with a `this` argument with nested map', () => { - const self = {}; - let map, result; - - before(() => { - let i = 0; - iterator = new ArrayIterator(['a', 'b', 'c']); - map = sinon.spy(item => item + (++i)); - result = iterator.map(x => x).map(map, self); - }); - - describe('the return value', () => { - const items = []; - - before(done => { - result.on('data', item => { items.push(item); }); - result.on('end', done); - }); - - it('should call the map function once for each item', () => { - map.should.have.been.calledThrice; - }); - - it('should call the map function with the passed argument as `this`', () => { - map.alwaysCalledOn(self).should.be.true; - }); - }); - }); - }); - } - }); - - describe('Skipping', () => { - describe('The .skip function', () => { - describe('the result when called with `new`', () => { - let instance; - - before(() => { - instance = new ArrayIterator([]).skip(10); - }); - - it('should be an AsyncIterator object', () => { - instance.should.be.an.instanceof(AsyncIterator); - }); - - it('should be an EventEmitter object', () => { - instance.should.be.an.instanceof(EventEmitter); - }); - }); - }); - - describe('A .skip', () => { - let iterator, source; - - before(() => { - source = new ArrayIterator([0, 1, 2, 3, 4, 5, 6]); - iterator = source.skip(4); - }); - - describe('when reading items', () => { - const items = []; - - before(done => { - iterator.on('data', item => { items.push(item); }); - iterator.on('end', done); - }); - - it('should return items skipping the specified amount', () => { - items.should.deep.equal([4, 5, 6]); - }); - }); - }); - - describe('A .skip', () => { - let iterator, source; - - before(() => { - source = range(0, 6); - iterator = source.skip(4); - }); - - describe('when reading items', () => { - const items = []; - - before(done => { - iterator.on('data', item => { items.push(item); }); - iterator.on('end', done); - }); - - it('should return items skipping the specified amount', () => { - items.should.deep.equal([4, 5, 6]); - }); - }); - }); - - describe('A .skip with a source that emits 0 items', () => { - it('should not return any items', done => { - const items = []; - const iterator = new ArrayIterator([]).skip(10); - iterator.on('data', item => { items.push(item); }); - iterator.on('end', () => { - items.should.deep.equal([]); - done(); - }); - }); - }); - - describe('A .skip with a limit of 0 items', () => { - it('should emit all items', done => { - const items = []; - const iterator = new ArrayIterator([0, 1, 2, 3, 4, 5, 6]).skip(0); - iterator.on('data', item => { items.push(item); }); - iterator.on('end', () => { - items.should.deep.equal([0, 1, 2, 3, 4, 5, 6]); - done(); - }); - }); - }); - - describe('A .skip with a limit of Infinity items', () => { - it('should skip all items', done => { - const items = []; - const iterator = new ArrayIterator([0, 1, 2, 3, 4, 5, 6]).skip(Infinity); - iterator.on('data', item => { items.push(item); }); - iterator.on('end', () => { - items.should.deep.equal([]); - done(); - }); - }); - }); - - describe('.take', () => { - describe('A .take', () => { - let iterator, source; - before(() => { - source = new ArrayIterator([0, 1, 2, 3, 4, 5, 6]); - iterator = source.take(4); - }); - - describe('when reading items', () => { - const items = []; - before(done => { - iterator.on('data', item => { items.push(item); }); - iterator.on('end', done); - }); - - it('should return items takeed to the specified take', () => { - items.should.deep.equal([0, 1, 2, 3]); - }); - }); - }); - - describe('A .take with a source that emits 0 items', () => { - it('should not return any items', done => { - const items = []; - const iterator = new ArrayIterator([]).take(10); - iterator.on('data', item => { items.push(item); }); - iterator.on('end', () => { - items.should.deep.equal([]); - done(); - }); - }); - }); - - describe('A .take with a take of 0 items', () => { - it('should not emit any items', done => { - const items = []; - const iterator = new ArrayIterator([0, 1, 2]).take(0); - iterator.on('data', item => { items.push(item); }); - iterator.on('end', () => { - items.should.deep.equal([]); - done(); - }); - }); - }); - - describe('A .take with a take of Infinity items', () => { - it('should emit all items', done => { - const items = []; - const iterator = new ArrayIterator([0, 1, 2, 3, 4, 5, 6]).take(Infinity); - iterator.on('data', item => { items.push(item); }); - iterator.on('end', () => { - items.should.deep.equal([0, 1, 2, 3, 4, 5, 6]); - done(); - }); - }); - }); - }); - }); }); diff --git a/test/MappingIterator-test.js b/test/MappingIterator-test.js new file mode 100644 index 00000000..eabc2b4c --- /dev/null +++ b/test/MappingIterator-test.js @@ -0,0 +1,613 @@ +import { + AsyncIterator, + ArrayIterator, + IntegerIterator, + MappingIterator, + range, + fromArray, + wrap, +} from '../dist/asynciterator.js'; + +import { EventEmitter } from 'events'; + +describe('MappingIterator', () => { + describe('The MappingIterator function', () => { + describe('the result when called with `new`', () => { + let instance; + + before(() => { + instance = new MappingIterator(new ArrayIterator([])); + }); + + it('should be a MappingIterator object', () => { + instance.should.be.an.instanceof(MappingIterator); + }); + + it('should be an AsyncIterator object', () => { + instance.should.be.an.instanceof(AsyncIterator); + }); + + it('should be an EventEmitter object', () => { + instance.should.be.an.instanceof(EventEmitter); + }); + }); + }); + + describe('A MappingIterator with an array source', () => { + let iterator, source; + + before(() => { + source = new ArrayIterator([0, 1, 2, 3, 4, 5, 6]); + iterator = new MappingIterator(source); + }); + + describe('when reading items', () => { + const items = []; + + before(done => { + iterator.on('data', item => { items.push(item); }); + iterator.on('end', done); + }); + + it('should return all items', () => { + items.should.deep.equal([0, 1, 2, 3, 4, 5, 6]); + }); + }); + }); + + describe('A MappingIterator with a source that emits 0 items', () => { + it('should not return any items', done => { + const items = []; + const iterator = new MappingIterator(new ArrayIterator([])); + iterator.on('data', item => { items.push(item); }); + iterator.on('end', () => { + items.should.deep.equal([]); + done(); + }); + }); + }); + + describe('A MappingIterator with a source that is already ended', () => { + it('should not return any items', done => { + const items = []; + const source = new ArrayIterator([]); + source.on('end', () => { + const iterator = new MappingIterator(source); + iterator.on('data', item => { items.push(item); }); + iterator.on('end', () => { + items.should.deep.equal([]); + done(); + }); + }); + }); + }); + + describe('A TransformIterator with destroySource set to its default', () => { + let iterator, source; + + before(() => { + source = new ArrayIterator([1, 2, 3]); + iterator = new MappingIterator(source); + }); + + describe('after being closed', () => { + before(done => { + iterator.read(); + iterator.close(); + iterator.on('end', done); + }); + + it('should have destroyed the source', () => { + expect(source).to.have.property('destroyed', true); + }); + }); + }); + + describe('A TransformIterator with destroySource set to false', () => { + let iterator, source; + + before(() => { + source = new ArrayIterator([1, 2, 3]); + iterator = new MappingIterator(source, undefined, { destroySource: false }); + }); + + describe('after being closed', () => { + before(done => { + iterator.read(); + iterator.close(); + iterator.on('end', done); + }); + + it('should not have destroyed the source', () => { + expect(source).to.have.property('destroyed', false); + }); + }); + }); + + describe('A TransformIterator with a source that errors', () => { + let iterator, source, errorHandler; + + before(() => { + source = new AsyncIterator(); + iterator = new MappingIterator(source); + iterator.on('error', errorHandler = sinon.stub()); + }); + + describe('before an error occurs', () => { + it('should not have emitted any error', () => { + errorHandler.should.not.have.been.called; + }); + }); + + describe('after a first error occurs', () => { + let error1; + before(() => { + errorHandler.reset(); + source.emit('error', error1 = new Error('error1')); + }); + + it('should re-emit the error', () => { + errorHandler.should.have.been.calledOnce; + errorHandler.should.have.been.calledWith(error1); + }); + }); + + describe('after a second error occurs', () => { + let error2; + + before(() => { + errorHandler.reset(); + source.emit('error', error2 = new Error('error2')); + }); + + it('should re-emit the error', () => { + errorHandler.should.have.been.calledOnce; + errorHandler.should.have.been.calledWith(error2); + }); + }); + + describe('after the source has ended and errors again', () => { + before(done => { + errorHandler.reset(); + source.close(); + iterator.on('end', () => { + function noop() { /* */ } + source.on('error', noop); // avoid triggering the default error handler + source.emit('error', new Error('error3')); + source.removeListener('error', noop); + done(); + }); + }); + + it('should not re-emit the error', () => { + errorHandler.should.not.have.been.called; + }); + + it('should not leave any error handlers attached', () => { + source.listenerCount('error').should.equal(0); + }); + }); + }); + + describe('A chain of maps and filters', () => { + for (const iteratorGen of [() => range(0, 2), () => fromArray([0, 1, 2]), () => wrap(range(0, 2))]) { + // eslint-disable-next-line no-loop-func + describe(`with ${iteratorGen()}`, () => { + let iterator; + + beforeEach(() => { + iterator = iteratorGen(); + }); + + it('should handle no transforms arrayified', async () => { + (await iterator.toArray()).should.deep.equal([0, 1, 2]); + }); + + it('should apply maps that doubles correctly', async () => { + (await iterator.map(x => x * 2).toArray()).should.deep.equal([0, 2, 4]); + }); + + it('should apply maps that doubles correctly and then maybemaps', async () => { + (await iterator.map(x => x * 2).map(x => x === 2 ? null : x * 3).toArray()).should.deep.equal([0, 12]); + }); + + it('should apply maps that maybemaps correctly', async () => { + (await iterator.map(x => x === 2 ? null : x * 3).toArray()).should.deep.equal([0, 3]); + }); + + it('should apply maps that maybemaps twice', async () => { + (await iterator.map(x => x === 2 ? null : x * 3).map(x => x === 0 ? null : x * 3).toArray()).should.deep.equal([9]); + }); + + it('should apply maps that converts to string', async () => { + (await iterator.map(x => `x${x}`).toArray()).should.deep.equal(['x0', 'x1', 'x2']); + }); + + it('should apply filter correctly', async () => { + (await iterator.filter(x => x % 2 === 0).toArray()).should.deep.equal([0, 2]); + }); + + it('should apply filter then map correctly', async () => { + (await iterator.filter(x => x % 2 === 0).map(x => `x${x}`).toArray()).should.deep.equal(['x0', 'x2']); + }); + + it('should apply map then filter correctly (1)', async () => { + (await iterator.map(x => x).filter(x => x % 2 === 0).toArray()).should.deep.equal([0, 2]); + }); + + it('should apply map then filter to false correctly', async () => { + (await iterator.map(x => `x${x}`).filter(x => true).toArray()).should.deep.equal(['x0', 'x1', 'x2']); + }); + + it('should apply map then filter to true correctly', async () => { + (await iterator.map(x => `x${x}`).filter(x => false).toArray()).should.deep.equal([]); + }); + + it('should apply filter to false then map correctly', async () => { + (await iterator.filter(x => true).map(x => `x${x}`).toArray()).should.deep.equal(['x0', 'x1', 'x2']); + }); + + it('should apply filter to true then map correctly', async () => { + (await iterator.filter(x => false).map(x => `x${x}`).filter(x => false).toArray()).should.deep.equal([]); + }); + + it('should apply filter one then double', async () => { + (await iterator.filter(x => x !== 1).map(x => x * 2).toArray()).should.deep.equal([0, 4]); + }); + + it('should apply double then filter one', async () => { + (await iterator.map(x => x * 2).filter(x => x !== 1).toArray()).should.deep.equal([0, 2, 4]); + }); + + it('should apply map then filter correctly', async () => { + (await iterator.map(x => `x${x}`).filter(x => (x[1] === '0')).toArray()).should.deep.equal(['x0']); + }); + + it('should correctly apply 3 filters', async () => { + (await range(0, 5).filter(x => x !== 1).filter(x => x !== 2).filter(x => x !== 2).toArray()).should.deep.equal([0, 3, 4, 5]); + }); + + it('should correctly apply 3 maps', async () => { + (await range(0, 1).map(x => x * 2).map(x => `z${x}`).map(x => `y${x}`).toArray()).should.deep.equal(['yz0', 'yz2']); + }); + + it('should correctly apply a map, followed by a filter, followed by another map', async () => { + (await range(0, 1).map(x => x * 2).filter(x => x !== 2).map(x => `y${x}`).toArray()).should.deep.equal(['y0']); + }); + + it('should correctly apply a filter-map-filter', async () => { + (await range(0, 2).filter(x => x !== 1).map(x => x * 3).filter(x => x !== 6).toArray()).should.deep.equal([0]); + }); + + it('should destroy when closed before being read after map', () => { + iterator.map(x => x).close(); + iterator.destroyed.should.be.true; + }); + + it('should destroy when closed before being read after map then filter', () => { + it = iterator.map(x => x); + it.filter(x => true).close(); + iterator.destroyed.should.be.true; + it.destroyed.should.be.true; + }); + + describe('when called on an iterator with a `this` argument', () => { + const self = {}; + let map, result; + + before(() => { + let i = 0; + iterator = new ArrayIterator(['a', 'b', 'c']); + map = sinon.spy(item => item + (++i)); + result = iterator.map(map, self); + }); + + describe('the return value', () => { + const items = []; + + before(done => { + result.on('data', item => { items.push(item); }); + result.on('end', done); + }); + + it('should call the map function once for each item', () => { + map.should.have.been.calledThrice; + }); + + it('should call the map function with the passed argument as `this`', () => { + map.alwaysCalledOn(self).should.be.true; + }); + }); + }); + + describe('when called on an iterator with a `this` argument with nested map', () => { + const self = {}; + let map, result; + + before(() => { + let i = 0; + iterator = new ArrayIterator(['a', 'b', 'c']); + map = sinon.spy(item => item + (++i)); + result = iterator.map(x => x).map(map, self); + }); + + describe('the return value', () => { + const items = []; + + before(done => { + result.on('data', item => { items.push(item); }); + result.on('end', done); + }); + + it('should call the map function once for each item', () => { + map.should.have.been.calledThrice; + }); + + it('should call the map function with the passed argument as `this`', () => { + map.alwaysCalledOn(self).should.be.true; + }); + }); + }); + }); + } + }); + + describe('The AsyncIterator#skip function', () => { + it('should be a function', () => { + expect(AsyncIterator.prototype.skip).to.be.a('function'); + }); + + describe('when called on an iterator', () => { + let iterator, result; + before(() => { + iterator = new ArrayIterator(['a', 'b', 'c', 'd', 'e']); + result = iterator.skip(2); + }); + + describe('the return value', () => { + const items = []; + before(done => { + result.on('data', item => { items.push(item); }); + result.on('end', done); + }); + + it('should skip the given number of items', () => { + items.should.deep.equal(['c', 'd', 'e']); + }); + }); + }); + }); + + describe('The AsyncIterator#take function', () => { + it('should be a function', () => { + expect(AsyncIterator.prototype.take).to.be.a('function'); + }); + + describe('when called on an iterator', () => { + let iterator, result; + before(() => { + iterator = new ArrayIterator(['a', 'b', 'c', 'd', 'e']); + result = iterator.take(3); + }); + + describe('the return value', () => { + const items = []; + before(done => { + result.on('data', item => { items.push(item); }); + result.on('end', done); + }); + + it('should take the given number of items', () => { + items.should.deep.equal(['a', 'b', 'c']); + }); + }); + }); + }); + + describe('The AsyncIterator#range function', () => { + it('should be a function', () => { + expect(AsyncIterator.prototype.range).to.be.a('function'); + }); + + describe('when called on an iterator', () => { + let iterator, result; + before(() => { + iterator = new IntegerIterator(); + result = iterator.range(20, 29); + }); + + describe('the return value', () => { + const items = []; + before(done => { + result.on('data', item => { items.push(item); }); + result.on('end', done); + }); + + it('should contain the indicated range', () => { + items.should.have.length(10); + items[0].should.equal(20); + items[9].should.equal(29); + }); + }); + }); + + describe('when called on an iterator with an inverse range', () => { + let iterator, result; + before(() => { + iterator = new IntegerIterator(); + sinon.spy(iterator, 'read'); + }); + + describe('the return value', () => { + const items = []; + before(done => { + result = iterator.range(30, 20); + result.on('data', item => { items.push(item); }); + result.on('end', done); + }); + + it('should be empty', () => { + items.should.be.empty; + }); + }); + }); + }); + + describe('Skipping', () => { + describe('The .skip function', () => { + describe('the result', () => { + let instance; + + before(() => { + instance = new ArrayIterator([]).skip(10); + }); + + it('should be an AsyncIterator object', () => { + instance.should.be.an.instanceof(AsyncIterator); + }); + + it('should be an EventEmitter object', () => { + instance.should.be.an.instanceof(EventEmitter); + }); + }); + }); + + describe('A .skip on an array', () => { + let iterator, source; + + before(() => { + source = new ArrayIterator([0, 1, 2, 3, 4, 5, 6]); + iterator = source.skip(4); + }); + + describe('when reading items', () => { + const items = []; + + before(done => { + iterator.on('data', item => { items.push(item); }); + iterator.on('end', done); + }); + + it('should return items skipping the specified amount', () => { + items.should.deep.equal([4, 5, 6]); + }); + }); + }); + + describe('A .skip on a range', () => { + let iterator, source; + + before(() => { + source = range(0, 6); + iterator = source.skip(4); + }); + + describe('when reading items', () => { + const items = []; + + before(done => { + iterator.on('data', item => { items.push(item); }); + iterator.on('end', done); + }); + + it('should return items skipping the specified amount', () => { + items.should.deep.equal([4, 5, 6]); + }); + }); + }); + + describe('A .skip with a source that emits 0 items', () => { + it('should not return any items', done => { + const items = []; + const iterator = new ArrayIterator([]).skip(10); + iterator.on('data', item => { items.push(item); }); + iterator.on('end', () => { + items.should.deep.equal([]); + done(); + }); + }); + }); + + describe('A .skip with a limit of 0 items', () => { + it('should emit all items', done => { + const items = []; + const iterator = new ArrayIterator([0, 1, 2, 3, 4, 5, 6]).skip(0); + iterator.on('data', item => { items.push(item); }); + iterator.on('end', () => { + items.should.deep.equal([0, 1, 2, 3, 4, 5, 6]); + done(); + }); + }); + }); + + describe('A .skip with a limit of Infinity items', () => { + it('should skip all items', done => { + const items = []; + const iterator = new ArrayIterator([0, 1, 2, 3, 4, 5, 6]).skip(Infinity); + iterator.on('data', item => { items.push(item); }); + iterator.on('end', () => { + items.should.deep.equal([]); + done(); + }); + }); + }); + + describe('.take', () => { + describe('A .take', () => { + let iterator, source; + before(() => { + source = new ArrayIterator([0, 1, 2, 3, 4, 5, 6]); + iterator = source.take(4); + }); + + describe('when reading items', () => { + const items = []; + before(done => { + iterator.on('data', item => { items.push(item); }); + iterator.on('end', done); + }); + + it('should return items to the specified take', () => { + items.should.deep.equal([0, 1, 2, 3]); + }); + }); + }); + + describe('A .take with a source that emits 0 items', () => { + it('should not return any items', done => { + const items = []; + const iterator = new ArrayIterator([]).take(10); + iterator.on('data', item => { items.push(item); }); + iterator.on('end', () => { + items.should.deep.equal([]); + done(); + }); + }); + }); + + describe('A .take with a take of 0 items', () => { + it('should not emit any items', done => { + const items = []; + const iterator = new ArrayIterator([0, 1, 2]).take(0); + iterator.on('data', item => { items.push(item); }); + iterator.on('end', () => { + items.should.deep.equal([]); + done(); + }); + }); + }); + + describe('A .take with a take of Infinity items', () => { + it('should emit all items', done => { + const items = []; + const iterator = new ArrayIterator([0, 1, 2, 3, 4, 5, 6]).take(Infinity); + iterator.on('data', item => { items.push(item); }); + iterator.on('end', () => { + items.should.deep.equal([0, 1, 2, 3, 4, 5, 6]); + done(); + }); + }); + }); + }); + }); +}); diff --git a/test/MappingIterator.js b/test/MappingIterator.js deleted file mode 100644 index 58d7f497..00000000 --- a/test/MappingIterator.js +++ /dev/null @@ -1,209 +0,0 @@ -import { - AsyncIterator, - ArrayIterator, - MappingIterator, -} from '../dist/asynciterator.js'; - -import { EventEmitter } from 'events'; - -class CustomTransformIterator extends MappingIterator { - read() { - return this.source.read(); - } -} - -describe('CustomTransformIterator', () => { - describe('The CustomTransformIterator function', () => { - describe('the result when called with `new`', () => { - let instance; - - before(() => { - instance = new CustomTransformIterator(new ArrayIterator([])); - }); - - it('should be a CustomTransformIterator object', () => { - instance.should.be.an.instanceof(MappingIterator); - }); - - it('should be an AsyncIterator object', () => { - instance.should.be.an.instanceof(AsyncIterator); - }); - - it('should be an EventEmitter object', () => { - instance.should.be.an.instanceof(EventEmitter); - }); - }); - }); - - describe('A CustomTransformIterator', () => { - let iterator, source; - - before(() => { - source = new ArrayIterator([0, 1, 2, 3, 4, 5, 6]); - iterator = new CustomTransformIterator(source); - }); - - describe('when reading items', () => { - const items = []; - - before(done => { - iterator.on('data', item => { items.push(item); }); - iterator.on('end', done); - }); - - it('should return all items', () => { - items.should.deep.equal([0, 1, 2, 3, 4, 5, 6]); - }); - }); - }); - - describe('A CustomTransformIterator', () => { - let iterator, source; - - before(() => { - source = new ArrayIterator([1]); - source._readable = false; - iterator = new CustomTransformIterator(source); - }); - - it('Should emit readable when readable is set to true', done => { - iterator.on('readable', done); - iterator.readable = true; - }); - }); - - describe('A CustomTransformIterator with a source that emits 0 items', () => { - it('should not return any items', done => { - const items = []; - const iterator = new CustomTransformIterator(new ArrayIterator([])); - iterator.on('data', item => { items.push(item); }); - iterator.on('end', () => { - items.should.deep.equal([]); - done(); - }); - }); - }); - - describe('A CustomTransformIterator with a source that is already ended', () => { - it('should not return any items', done => { - const items = []; - const source = new ArrayIterator([]); - source.on('end', () => { - const iterator = new CustomTransformIterator(source); - iterator.on('data', item => { items.push(item); }); - iterator.on('end', () => { - items.should.deep.equal([]); - done(); - }); - }); - }); - }); - - - describe('A TransformIterator with destroySource set to its default', () => { - let iterator, source; - - before(() => { - source = new ArrayIterator([1, 2, 3]); - iterator = new CustomTransformIterator(source); - }); - - describe('after being closed', () => { - before(done => { - iterator.read(); - iterator.close(); - iterator.on('end', done); - }); - - it('should have destroyed the source', () => { - expect(source).to.have.property('destroyed', true); - }); - }); - }); - - describe('A TransformIterator with destroySource set to false', () => { - let iterator, source; - - before(() => { - source = new ArrayIterator([1, 2, 3]); - iterator = new CustomTransformIterator(source, undefined, undefined, { destroySource: false }); - }); - - describe('after being closed', () => { - before(done => { - iterator.read(); - iterator.close(); - iterator.on('end', done); - }); - - it('should not have destroyed the source', () => { - expect(source).to.have.property('destroyed', false); - }); - }); - }); - - describe('A TransformIterator with a source that errors', () => { - let iterator, source, errorHandler; - - before(() => { - source = new AsyncIterator(); - iterator = new CustomTransformIterator(source); - iterator.on('error', errorHandler = sinon.stub()); - }); - - describe('before an error occurs', () => { - it('should not have emitted any error', () => { - errorHandler.should.not.have.been.called; - }); - }); - - describe('after a first error occurs', () => { - let error1; - before(() => { - errorHandler.reset(); - source.emit('error', error1 = new Error('error1')); - }); - - it('should re-emit the error', () => { - errorHandler.should.have.been.calledOnce; - errorHandler.should.have.been.calledWith(error1); - }); - }); - - describe('after a second error occurs', () => { - let error2; - - before(() => { - errorHandler.reset(); - source.emit('error', error2 = new Error('error2')); - }); - - it('should re-emit the error', () => { - errorHandler.should.have.been.calledOnce; - errorHandler.should.have.been.calledWith(error2); - }); - }); - - describe('after the source has ended and errors again', () => { - before(done => { - errorHandler.reset(); - source.close(); - iterator.on('end', () => { - function noop() { /* */ } - source.on('error', noop); // avoid triggering the default error handler - source.emit('error', new Error('error3')); - source.removeListener('error', noop); - done(); - }); - }); - - it('should not re-emit the error', () => { - errorHandler.should.not.have.been.called; - }); - - it('should not leave any error handlers attached', () => { - source.listenerCount('error').should.equal(0); - }); - }); - }); -}); diff --git a/test/SimpleTransformIterator-test.js b/test/SimpleTransformIterator-test.js index 8fb27611..039cae08 100644 --- a/test/SimpleTransformIterator-test.js +++ b/test/SimpleTransformIterator-test.js @@ -1299,107 +1299,6 @@ describe('SimpleTransformIterator', () => { }); }); - describe('The AsyncIterator#skip function', () => { - it('should be a function', () => { - expect(AsyncIterator.prototype.skip).to.be.a('function'); - }); - - describe('when called on an iterator', () => { - let iterator, result; - before(() => { - iterator = new ArrayIterator(['a', 'b', 'c', 'd', 'e']); - result = iterator.skip(2); - }); - - describe('the return value', () => { - const items = []; - before(done => { - result.on('data', item => { items.push(item); }); - result.on('end', done); - }); - - it('should skip the given number of items', () => { - items.should.deep.equal(['c', 'd', 'e']); - }); - }); - }); - }); - - describe('The AsyncIterator#take function', () => { - it('should be a function', () => { - expect(AsyncIterator.prototype.take).to.be.a('function'); - }); - - describe('when called on an iterator', () => { - let iterator, result; - before(() => { - iterator = new ArrayIterator(['a', 'b', 'c', 'd', 'e']); - result = iterator.take(3); - }); - - describe('the return value', () => { - const items = []; - before(done => { - result.on('data', item => { items.push(item); }); - result.on('end', done); - }); - - it('should take the given number of items', () => { - items.should.deep.equal(['a', 'b', 'c']); - }); - }); - }); - }); - - describe('The AsyncIterator#range function', () => { - it('should be a function', () => { - expect(AsyncIterator.prototype.range).to.be.a('function'); - }); - - describe('when called on an iterator', () => { - let iterator, result; - before(() => { - iterator = new IntegerIterator(); - result = iterator.range(20, 29); - }); - - describe('the return value', () => { - const items = []; - before(done => { - result.on('data', item => { items.push(item); }); - result.on('end', done); - }); - - it('should contain the indicated range', () => { - items.should.have.length(10); - items[0].should.equal(20); - items[9].should.equal(29); - }); - }); - }); - - describe('when called on an iterator with an inverse range', () => { - let iterator, result; - before(() => { - iterator = new IntegerIterator(); - sinon.spy(iterator, 'read'); - }); - - describe('the return value', () => { - const items = []; - before(done => { - result = iterator.range(30, 20); - result.on('data', item => { items.push(item); }); - result.on('end', done); - }); - - it('should be empty', () => { - items.should.be.empty; - }); - }); - }); - }); - describe('The AsyncIterator#transform function', () => { it('should be a function', () => { expect(AsyncIterator.prototype.transform).to.be.a('function');