From 35c54b393114aeb64c33afc4de5055963b8b78a5 Mon Sep 17 00:00:00 2001 From: Jacopo Scazzosi Date: Thu, 24 Mar 2022 20:41:31 +0100 Subject: [PATCH 01/34] adds optimized iterators for mapping (.map()) and filtering (.filter()) --- asynciterator.ts | 44 ++++++++++++++++++++++++++-- test/SimpleTransformIterator-test.js | 12 ++++---- 2 files changed, 49 insertions(+), 7 deletions(-) diff --git a/asynciterator.ts b/asynciterator.ts index 000fd10..bbc6931 100644 --- a/asynciterator.ts +++ b/asynciterator.ts @@ -456,7 +456,8 @@ export class AsyncIterator extends EventEmitter { @returns {module:asynciterator.AsyncIterator} A new iterator that maps the items from this iterator */ map(map: (item: T) => D, self?: any): AsyncIterator { - return this.transform({ map: self ? map.bind(self) : map }); + // return this.transform({ map: self ? map.bind(self) : map }); + return new MappingIterator(this, self ? map.bind(self) : map); } /** @@ -469,7 +470,8 @@ export class AsyncIterator extends EventEmitter { filter(filter: (item: T) => item is K, self?: any): AsyncIterator; filter(filter: (item: T) => boolean, self?: any): AsyncIterator; filter(filter: (item: T) => boolean, self?: any): AsyncIterator { - return this.transform({ filter: self ? filter.bind(self) : filter }); + return new FilteringIterator(this, self ? filter.bind(self) : filter); + // return this.transform({ filter: self ? filter.bind(self) : filter }); } /** @@ -1251,6 +1253,44 @@ function destinationFillBuffer(this: InternalSource) { (this._destination as any)._fillBuffer(); } +export class MappingIterator extends AsyncIterator { + constructor(source: AsyncIterator, map: (item: S) => D) { + super(); + let item: S | null; + this.read = (): D | null => { + if ((item = source.read()) !== null) + return map.call(this, item); + return null; + }; + source.on('end', () => { + this.close(); + }); + source.on('readable', () => { + this.readable = true; + }); + } +} + +export class FilteringIterator extends AsyncIterator { + constructor(source: AsyncIterator, filter: (item: T) => boolean) { + super(); + let item: T | null; + this.read = (): T | null => { + while ((item = source.read()) !== null) { + if (filter.call(this, item)) + return item; + } + return null; + }; + source.on('end', () => { + this.close(); + }); + source.on('readable', () => { + this.readable = true; + }); + } +} + /** An iterator that generates items based on a source iterator diff --git a/test/SimpleTransformIterator-test.js b/test/SimpleTransformIterator-test.js index e033b8e..78815aa 100644 --- a/test/SimpleTransformIterator-test.js +++ b/test/SimpleTransformIterator-test.js @@ -7,6 +7,8 @@ import { ArrayIterator, IntegerIterator, scheduleTask, + MappingIterator, + FilteringIterator, } from '../dist/asynciterator.js'; import { EventEmitter } from 'events'; @@ -1110,8 +1112,8 @@ describe('SimpleTransformIterator', () => { result.on('end', done); }); - it('should be a SimpleTransformIterator', () => { - result.should.be.an.instanceof(SimpleTransformIterator); + it('should be a MappingIterator', () => { + result.should.be.an.instanceof(MappingIterator); }); it('should execute the map function on all items in order', () => { @@ -1146,7 +1148,7 @@ describe('SimpleTransformIterator', () => { }); it('should be a SimpleTransformIterator', () => { - result.should.be.an.instanceof(SimpleTransformIterator); + result.should.be.an.instanceof(MappingIterator); }); it('should execute the map function on all items in order', () => { @@ -1185,7 +1187,7 @@ describe('SimpleTransformIterator', () => { }); it('should be a SimpleTransformIterator', () => { - result.should.be.an.instanceof(SimpleTransformIterator); + result.should.be.an.instanceof(FilteringIterator); }); it('should execute the filter function on all items in order', () => { @@ -1219,7 +1221,7 @@ describe('SimpleTransformIterator', () => { }); it('should be a SimpleTransformIterator', () => { - result.should.be.an.instanceof(SimpleTransformIterator); + result.should.be.an.instanceof(FilteringIterator); }); it('should execute the filter function on all items in order', () => { From 98195c274ffa9ea8f96635070904e2cf5014e2e5 Mon Sep 17 00:00:00 2001 From: Jacopo Scazzosi Date: Thu, 24 Mar 2022 21:09:13 +0100 Subject: [PATCH 02/34] adds simpler and faster iterators for skipping and limiting --- asynciterator.ts | 54 ++++++++++++++++++++++++++-- test/SimpleTransformIterator-test.js | 6 ++-- 2 files changed, 56 insertions(+), 4 deletions(-) diff --git a/asynciterator.ts b/asynciterator.ts index bbc6931..5a8db07 100644 --- a/asynciterator.ts +++ b/asynciterator.ts @@ -512,7 +512,8 @@ export class AsyncIterator extends EventEmitter { @returns {module:asynciterator.AsyncIterator} A new iterator that skips the given number of items */ skip(offset: number): AsyncIterator { - return this.transform({ offset }); + // return this.transform({ offset }); + return new SkippingIterator(this, offset); } /** @@ -522,7 +523,8 @@ export class AsyncIterator extends EventEmitter { @returns {module:asynciterator.AsyncIterator} A new iterator with at most the given number of items */ take(limit: number): AsyncIterator { - return this.transform({ limit }); + // return this.transform({ limit }); + return new LimitingIterator(this, limit); } /** @@ -1291,6 +1293,54 @@ export class FilteringIterator extends AsyncIterator { } } +export class SkippingIterator extends AsyncIterator { + constructor(source: AsyncIterator, skip: number) { + super(); + let item: T | null; + let skipped = 0; + this.read = (): T | null => { + while ((item = source.read()) !== null) { + if (skipped < skip) + skipped += 1; + else + return item; + } + return null; + }; + source.on('end', () => { + this.close(); + }); + source.on('readable', () => { + this.readable = true; + }); + } +} + +export class LimitingIterator extends AsyncIterator { + constructor(source: AsyncIterator, limit: number) { + super(); + let item: T | null; + let count = 0; + this.read = (): T | null => { + while ((item = source.read()) !== null) { + if (count < limit) { + count += 1; + return item; + } + this.close(); + return null; + } + return null; + }; + source.on('end', () => { + this.close(); + }); + source.on('readable', () => { + this.readable = true; + }); + } +} + /** An iterator that generates items based on a source iterator diff --git a/test/SimpleTransformIterator-test.js b/test/SimpleTransformIterator-test.js index 78815aa..71ef501 100644 --- a/test/SimpleTransformIterator-test.js +++ b/test/SimpleTransformIterator-test.js @@ -9,6 +9,8 @@ import { scheduleTask, MappingIterator, FilteringIterator, + SkippingIterator, + LimitingIterator, } from '../dist/asynciterator.js'; import { EventEmitter } from 'events'; @@ -1349,7 +1351,7 @@ describe('SimpleTransformIterator', () => { }); it('should be a SimpleTransformIterator', () => { - result.should.be.an.instanceof(SimpleTransformIterator); + result.should.be.an.instanceof(SkippingIterator); }); it('should skip the given number of items', () => { @@ -1379,7 +1381,7 @@ describe('SimpleTransformIterator', () => { }); it('should be a SimpleTransformIterator', () => { - result.should.be.an.instanceof(SimpleTransformIterator); + result.should.be.an.instanceof(LimitingIterator); }); it('should take the given number of items', () => { From f1e4b14a1c1741a6044d85fe6316a37d098a6887 Mon Sep 17 00:00:00 2001 From: Jacopo Scazzosi Date: Thu, 24 Mar 2022 21:31:30 +0100 Subject: [PATCH 03/34] fixes if vs. while in LimitingIterator's .read() --- asynciterator.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/asynciterator.ts b/asynciterator.ts index 5a8db07..96913f8 100644 --- a/asynciterator.ts +++ b/asynciterator.ts @@ -1322,7 +1322,7 @@ export class LimitingIterator extends AsyncIterator { let item: T | null; let count = 0; this.read = (): T | null => { - while ((item = source.read()) !== null) { + if ((item = source.read()) !== null) { if (count < limit) { count += 1; return item; From 974f076fcf15802050fc7cb90dd68aeb3db0772c Mon Sep 17 00:00:00 2001 From: Jacopo Scazzosi Date: Sat, 26 Mar 2022 18:08:10 +0100 Subject: [PATCH 04/34] refactors read() functions into class methods --- asynciterator.ts | 102 +++++++++++++++++++++++++++++------------------ 1 file changed, 64 insertions(+), 38 deletions(-) diff --git a/asynciterator.ts b/asynciterator.ts index 96913f8..0349194 100644 --- a/asynciterator.ts +++ b/asynciterator.ts @@ -1256,14 +1256,13 @@ function destinationFillBuffer(this: InternalSource) { } export class MappingIterator extends AsyncIterator { + protected readonly _map: (item: S) => D; + protected readonly _source: AsyncIterator; + constructor(source: AsyncIterator, map: (item: S) => D) { super(); - let item: S | null; - this.read = (): D | null => { - if ((item = source.read()) !== null) - return map.call(this, item); - return null; - }; + this._source = source; + this._map = map; source.on('end', () => { this.close(); }); @@ -1271,19 +1270,23 @@ export class MappingIterator extends AsyncIterator { this.readable = true; }); } + + read(): D | null { + const item = this._source.read(); + if (item !== null) + return this._map(item); + return null; + } } export class FilteringIterator extends AsyncIterator { + protected readonly _filter: (item: T) => boolean; + protected readonly _source: AsyncIterator; + constructor(source: AsyncIterator, filter: (item: T) => boolean) { super(); - let item: T | null; - this.read = (): T | null => { - while ((item = source.read()) !== null) { - if (filter.call(this, item)) - return item; - } - return null; - }; + this._source = source; + this._filter = filter; source.on('end', () => { this.close(); }); @@ -1291,22 +1294,27 @@ export class FilteringIterator extends AsyncIterator { this.readable = true; }); } + + read(): T | null { + let item; + while ((item = this._source.read()) !== null) { + if (this._filter(item)) + return item; + } + return null; + } } export class SkippingIterator extends AsyncIterator { + protected readonly _source: AsyncIterator; + protected readonly _skip: number; + protected _skipped: number; + constructor(source: AsyncIterator, skip: number) { super(); - let item: T | null; - let skipped = 0; - this.read = (): T | null => { - while ((item = source.read()) !== null) { - if (skipped < skip) - skipped += 1; - else - return item; - } - return null; - }; + this._skip = skip; + this._skipped = 0; + this._source = source; source.on('end', () => { this.close(); }); @@ -1314,24 +1322,29 @@ export class SkippingIterator extends AsyncIterator { this.readable = true; }); } + + read(): T | null { + let item; + while ((item = this._source.read()) !== null) { + if (this._skipped < this._skip) + this._skipped += 1; + else + return item; + } + return null; + } } export class LimitingIterator extends AsyncIterator { + protected readonly _source: AsyncIterator; + protected readonly _limit: number; + protected _count: number; + constructor(source: AsyncIterator, limit: number) { super(); - let item: T | null; - let count = 0; - this.read = (): T | null => { - if ((item = source.read()) !== null) { - if (count < limit) { - count += 1; - return item; - } - this.close(); - return null; - } - return null; - }; + this._source = source; + this._limit = limit; + this._count = 0; source.on('end', () => { this.close(); }); @@ -1339,6 +1352,19 @@ export class LimitingIterator extends AsyncIterator { this.readable = true; }); } + + read(): T | null { + const item = this._source.read(); + if (item !== null) { + if (this._count < this._limit) { + this._count += 1; + return item; + } + this.close(); + return null; + } + return null; + } } From 728f9ba790dc5fd731e70f50450f7f944def416c Mon Sep 17 00:00:00 2001 From: Jacopo Scazzosi Date: Sat, 26 Mar 2022 18:31:10 +0100 Subject: [PATCH 05/34] centralizes common code into shared base class --- asynciterator.ts | 72 +++++++++++++++++++++++------------------------- 1 file changed, 34 insertions(+), 38 deletions(-) diff --git a/asynciterator.ts b/asynciterator.ts index 0349194..e11a87e 100644 --- a/asynciterator.ts +++ b/asynciterator.ts @@ -1255,20 +1255,40 @@ function destinationFillBuffer(this: InternalSource) { (this._destination as any)._fillBuffer(); } -export class MappingIterator extends AsyncIterator { - protected readonly _map: (item: S) => D; +export class SynchronousTransformIterator extends AsyncIterator { protected readonly _source: AsyncIterator; - constructor(source: AsyncIterator, map: (item: S) => D) { + constructor(source: AsyncIterator) { + /* eslint-disable no-use-before-define */ super(); this._source = source; - this._map = map; - source.on('end', () => { + const cleanup = () => { + source.removeListener('end', onEnd); + source.removeListener('readable', onReadable); + }; + const onEnd = () => { + cleanup(); this.close(); - }); - source.on('readable', () => { + }; + const onReadable = () => { this.readable = true; - }); + }; + source.on('end', onEnd); + source.on('readable', onReadable); + } + + protected _destroy(cause: Error | undefined, callback: (error?: Error) => void) { + super._destroy(cause, callback); + this._source.destroy(cause); + } +} + +export class MappingIterator extends SynchronousTransformIterator { + protected readonly _map: (item: S) => D; + + constructor(source: AsyncIterator, map: (item: S) => D) { + super(source); + this._map = map; } read(): D | null { @@ -1279,20 +1299,12 @@ export class MappingIterator extends AsyncIterator { } } -export class FilteringIterator extends AsyncIterator { +export class FilteringIterator extends SynchronousTransformIterator { protected readonly _filter: (item: T) => boolean; - protected readonly _source: AsyncIterator; constructor(source: AsyncIterator, filter: (item: T) => boolean) { - super(); - this._source = source; + super(source); this._filter = filter; - source.on('end', () => { - this.close(); - }); - source.on('readable', () => { - this.readable = true; - }); } read(): T | null { @@ -1305,22 +1317,14 @@ export class FilteringIterator extends AsyncIterator { } } -export class SkippingIterator extends AsyncIterator { - protected readonly _source: AsyncIterator; +export class SkippingIterator extends SynchronousTransformIterator { protected readonly _skip: number; protected _skipped: number; constructor(source: AsyncIterator, skip: number) { - super(); + super(source); this._skip = skip; this._skipped = 0; - this._source = source; - source.on('end', () => { - this.close(); - }); - source.on('readable', () => { - this.readable = true; - }); } read(): T | null { @@ -1335,22 +1339,14 @@ export class SkippingIterator extends AsyncIterator { } } -export class LimitingIterator extends AsyncIterator { - protected readonly _source: AsyncIterator; +export class LimitingIterator extends SynchronousTransformIterator { protected readonly _limit: number; protected _count: number; constructor(source: AsyncIterator, limit: number) { - super(); - this._source = source; + super(source); this._limit = limit; this._count = 0; - source.on('end', () => { - this.close(); - }); - source.on('readable', () => { - this.readable = true; - }); } read(): T | null { From c1d47354b922ac693bf55b9b7a71620146795e60 Mon Sep 17 00:00:00 2001 From: Jesse Wright Date: Sun, 27 Mar 2022 10:56:50 +1100 Subject: [PATCH 06/34] chore: remove unecessary commented out lines --- asynciterator.ts | 4 ---- 1 file changed, 4 deletions(-) diff --git a/asynciterator.ts b/asynciterator.ts index e11a87e..f7c4613 100644 --- a/asynciterator.ts +++ b/asynciterator.ts @@ -456,7 +456,6 @@ export class AsyncIterator extends EventEmitter { @returns {module:asynciterator.AsyncIterator} A new iterator that maps the items from this iterator */ map(map: (item: T) => D, self?: any): AsyncIterator { - // return this.transform({ map: self ? map.bind(self) : map }); return new MappingIterator(this, self ? map.bind(self) : map); } @@ -471,7 +470,6 @@ export class AsyncIterator extends EventEmitter { filter(filter: (item: T) => boolean, self?: any): AsyncIterator; filter(filter: (item: T) => boolean, self?: any): AsyncIterator { return new FilteringIterator(this, self ? filter.bind(self) : filter); - // return this.transform({ filter: self ? filter.bind(self) : filter }); } /** @@ -512,7 +510,6 @@ export class AsyncIterator extends EventEmitter { @returns {module:asynciterator.AsyncIterator} A new iterator that skips the given number of items */ skip(offset: number): AsyncIterator { - // return this.transform({ offset }); return new SkippingIterator(this, offset); } @@ -523,7 +520,6 @@ export class AsyncIterator extends EventEmitter { @returns {module:asynciterator.AsyncIterator} A new iterator with at most the given number of items */ take(limit: number): AsyncIterator { - // return this.transform({ limit }); return new LimitingIterator(this, limit); } From 1ec39fbb12497ad46ea5409ba683deb4cbb2079c Mon Sep 17 00:00:00 2001 From: Jesse Wright Date: Sun, 27 Mar 2022 12:32:20 +1100 Subject: [PATCH 07/34] feat: optimize chained mapping and filtering --- asynciterator.ts | 138 +++++++++++++++++++++++++++++++++++++ test/AsyncIterator-test.js | 84 ++++++++++++++++++++++ 2 files changed, 222 insertions(+) diff --git a/asynciterator.ts b/asynciterator.ts index f7c4613..9ee0446 100644 --- a/asynciterator.ts +++ b/asynciterator.ts @@ -459,6 +459,14 @@ export class AsyncIterator extends EventEmitter { return new MappingIterator(this, self ? map.bind(self) : map); } + /** + MultiMaps items according to a synchronous generator (hence no need for buffering) + @param {Function} multiMap The function to multiMap items with + */ + multiMap(multiMap: (item: T) => Generator): AsyncIterator { + return new MultiMappingIterator(this, multiMap); + } + /** Return items from this iterator that match the filter. After this operation, only read the returned iterator instead of the current one. @@ -1279,6 +1287,32 @@ export class SynchronousTransformIterator extends AsyncIterator { } } +export class MultiMappingIterator extends SynchronousTransformIterator { + protected readonly _map: (item: S) => Generator; + private generator?: Generator; + + constructor(source: AsyncIterator, map: (item: S) => Generator) { + super(source); + this._map = map; + } + + read(): D | null { + let _item; + + // eslint-disable-next-line no-constant-condition + while (true) { + if (!this.generator) { + if ((_item = this._source.read()) === null) + return null; + this.generator = this._map(_item); + } + if (!(_item = this.generator.next()).done) + return _item.value; + this.generator = undefined; + } + } +} + export class MappingIterator extends SynchronousTransformIterator { protected readonly _map: (item: S) => D; @@ -1293,6 +1327,28 @@ export class MappingIterator extends SynchronousTransformIterator(map: (item: D) => T, self?: any): AsyncIterator { + return new MultiMapFilterTransformIterator(this._source, { + filter: false, + function: self ? map.bind(self) : map, + next: { + filter: false, + function: this._map, + }, + }); + } + + filter(filter: (item: D) => boolean, self?: any): AsyncIterator { + return new MultiMapFilterTransformIterator(this._source, { + filter: true, + function: self ? filter.bind(self) : filter, + next: { + filter: false, + function: this._map, + }, + }); + } } export class FilteringIterator extends SynchronousTransformIterator { @@ -1311,6 +1367,28 @@ export class FilteringIterator extends SynchronousTransformIterator { } return null; } + + map(map: (item: T) => D, self?: any): AsyncIterator { + return new MultiMapFilterTransformIterator(this._source, { + filter: false, + function: self ? map.bind(self) : map, + next: { + filter: true, + function: this._filter, + }, + }); + } + + filter(filter: (item: T) => boolean, self?: any): AsyncIterator { + return new MultiMapFilterTransformIterator(this._source, { + filter: true, + function: self ? filter.bind(self) : filter, + next: { + filter: true, + function: this._filter, + }, + }); + } } export class SkippingIterator extends SynchronousTransformIterator { @@ -1359,6 +1437,66 @@ export class LimitingIterator extends SynchronousTransformIterator { } } +interface Transform { + filter: boolean, + function: Function, + next?: Transform +} + +export class MultiMapFilterTransformIterator extends SynchronousTransformIterator { + private _transformation?: (item: S) => D | null; + + constructor(source: AsyncIterator, private transforms: Transform) { + super(source); + } + + protected transformation(_item: S): D | null { + if (!this._transformation) { + let _transforms: Transform | undefined = this.transforms; + + const { filter, function: func } = _transforms!; + + this._transformation = filter ? + ((item: any) => func(item) ? item : null) : + func as any; + + while ((_transforms = _transforms!.next) !== undefined) { + const { filter: _filter, function: _func } = _transforms; + const t = this._transformation!; + + this._transformation = _filter ? + (item: any) => _func(item) ? t(item) : null : + (item: any) => t(_func(item)); + } + } + return this._transformation!(_item); + } + + read(): D | null { + let item; + while ((item = this._source.read()) !== null) { + if ((item = this.transformation(item)) !== null) + return item; + } + return null; + } + + map(map: (item: D) => T, self?: any): AsyncIterator { + return new MultiMapFilterTransformIterator(this._source, { + filter: false, + function: self ? map.bind(self) : map, + next: this.transforms, + }); + } + + filter(filter: (item: D) => boolean, self?: any): AsyncIterator { + return new MultiMapFilterTransformIterator(this._source, { + filter: true, + function: self ? filter.bind(self) : filter, + next: this.transforms, + }); + } +} /** An iterator that generates items based on a source iterator diff --git a/test/AsyncIterator-test.js b/test/AsyncIterator-test.js index 37a74d6..d7527b1 100644 --- a/test/AsyncIterator-test.js +++ b/test/AsyncIterator-test.js @@ -5,6 +5,7 @@ import { ENDED, DESTROYED, scheduleTask, + range, } from '../dist/asynciterator.js'; import { EventEmitter } from 'events'; @@ -1307,4 +1308,87 @@ describe('AsyncIterator', () => { }); }); }); + describe('Testing chains fo maps and filters', () => { + let iterator; + beforeEach(() => { + iterator = range(0, 2); + }); + it('Should handle no transforms', async () => { + iterator.read().should.equal(0); + iterator.read().should.equal(1); + iterator.read().should.equal(2); + }); + it('Should handle no transforms arrayified', async () => { + (await iterator.toArray()).should.deep.equal([0, 1, 2]); + }); + it('Should apply maps that doubles correctly', async () => { + (await iterator.map(x => x * 2).toArray()).should.deep.equal([0, 2, 4]); + }); + it('Should apply maps that doubles correctly', async () => { + (await iterator.map(x => `x${x}`).toArray()).should.deep.equal(['x0', 'x1', 'x2']); + }); + it('Should apply filter correctly', async () => { + (await iterator.filter(x => x % 2 === 0).toArray()).should.deep.equal([0, 2]); + }); + it('Should apply filter then map correctly', async () => { + (await iterator.filter(x => x % 2 === 0).map(x => `x${x}`).toArray()).should.deep.equal(['x0', 'x2']); + }); + it('Should apply map then filter correctly (1)', async () => { + (await iterator.map(x => x).filter(x => x % 2 === 0).toArray()).should.deep.equal([0, 2]); + }); + it('Should apply map then filter to false correctly', async () => { + (await iterator.map(x => `x${x}`).filter(x => true).toArray()).should.deep.equal(['x0', 'x1', 'x2']); + }); + it('Should apply map then filter to true correctly', async () => { + (await iterator.map(x => `x${x}`).filter(x => false).toArray()).should.deep.equal([]); + }); + it('Should apply filter to false then map correctly', async () => { + (await iterator.filter(x => true).map(x => `x${x}`).toArray()).should.deep.equal(['x0', 'x1', 'x2']); + }); + it('Should apply filter to true then map correctly', async () => { + (await iterator.filter(x => false).map(x => `x${x}`).filter(x => false).toArray()).should.deep.equal([]); + }); + it('Should apply filter one then double', async () => { + (await iterator.filter(x => x !== 1).map(x => x * 2).toArray()).should.deep.equal([0, 4]); + }); + it('Should apply double then filter one', async () => { + (await iterator.map(x => x * 2).filter(x => x !== 1).toArray()).should.deep.equal([0, 2, 4]); + }); + it('Should apply map then filter correctly', async () => { + (await iterator.map(x => `x${x}`).filter(x => (x[1] === '0')).toArray()).should.deep.equal(['x0']); + }); + it('Should correctly apply 3 filters', async () => { + (await range(0, 5).filter(x => x !== 1).filter(x => x !== 2).filter(x => x !== 2).toArray()).should.deep.equal([0, 3, 4, 5]); + }); + it('Should correctly apply 3 maps', async () => { + (await range(0, 1).map(x => x * 2).map(x => `z${x}`).map(x => `y${x}`).toArray()).should.deep.equal(['yz0', 'yz2']); + }); + it('Should correctly apply a map, followed by a filter, followed by another map', async () => { + (await range(0, 1).map(x => x * 2).filter(x => x !== 2).map(x => `y${x}`).toArray()).should.deep.equal(['y0']); + }); + it('Should correctly apply a filter-map-filter', async () => { + (await range(0, 2).filter(x => x !== 1).map(x => x * 3).filter(x => x !== 6).toArray()).should.deep.equal([0]); + }); + it('Should handle transforms', async () => { + iterator = iterator.multiMap(function* (data) { + yield `x${data}`; + yield `y${data}`; + }); + (await iterator.toArray()).should.deep.equal(['x0', 'y0', 'x1', 'y1', 'x2', 'y2']); + }); + it('Should handle transforms and maps', async () => { + iterator = iterator.multiMap(function* (data) { + yield `x${data}`; + yield `y${data}`; + }).map(x => `z${x}`); + (await iterator.toArray()).should.deep.equal(['zx0', 'zy0', 'zx1', 'zy1', 'zx2', 'zy2']); + }); + it('Should handle maps and transforms', async () => { + iterator = iterator.map(x => `z${x}`).multiMap(function* (data) { + yield `x${data}`; + yield `y${data}`; + }); + (await iterator.toArray()).should.deep.equal(['xz0', 'yz0', 'xz1', 'yz1', 'xz2', 'yz2']); + }); + }); }); From 6141346920244148d1a10d0601268a7058ec4644 Mon Sep 17 00:00:00 2001 From: Jacopo Scazzosi Date: Mon, 28 Mar 2022 18:13:49 +0200 Subject: [PATCH 08/34] fixes missing readable events --- asynciterator.ts | 43 +++++++++++++++++++++++++++++++++++-------- 1 file changed, 35 insertions(+), 8 deletions(-) diff --git a/asynciterator.ts b/asynciterator.ts index 9ee0446..e9959ff 100644 --- a/asynciterator.ts +++ b/asynciterator.ts @@ -1260,25 +1260,52 @@ function destinationFillBuffer(this: InternalSource) { } export class SynchronousTransformIterator extends AsyncIterator { - protected readonly _source: AsyncIterator; + protected _source: AsyncIterator; constructor(source: AsyncIterator) { /* eslint-disable no-use-before-define */ super(); this._source = source; const cleanup = () => { - source.removeListener('end', onEnd); - source.removeListener('readable', onReadable); + source.removeListener('end', onSourceEnd); + source.removeListener('error', onSourceError); + source.removeListener('readable', onSourceReadable); + taskScheduler(() => { + // Delayed as there might be pending tasks using the source at the + // time that cleanup() is called. + delete this._source; + }); }; - const onEnd = () => { + const onSourceEnd = () => { cleanup(); this.close(); }; - const onReadable = () => { - this.readable = true; + const onSourceError = (err: Error) => { + scheduleTask(() => { + this.emit('error', err); + }); + }; + const onSourceReadable = () => { + if (this.readable) { + // TODO: I'm not completely sure as to why this is needed but without + // the following line, some use cases relying on flow mode (i.e. + // consuming items via `on('data', (data) => {})`) do not work. + // It looks like the debouncing that happens in `set readable()` + // in `AsyncIterator` prevents the event from firing as `this` + // is already readable. + scheduleTask(() => { + this.emit('readable'); + }); + } + else { + this.readable = true; + } }; - source.on('end', onEnd); - source.on('readable', onReadable); + source.on('end', onSourceEnd); + source.on('error', onSourceError); + source.on('readable', onSourceReadable); + if (source.readable) + onSourceReadable(); } protected _destroy(cause: Error | undefined, callback: (error?: Error) => void) { From d7ab7d9a0e970b05156dcac17505c837bf1386df Mon Sep 17 00:00:00 2001 From: Jacopo Scazzosi Date: Mon, 28 Mar 2022 18:47:10 +0200 Subject: [PATCH 09/34] synchronously emits readable and error events in SychronousTransformIterator --- asynciterator.ts | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/asynciterator.ts b/asynciterator.ts index e9959ff..b9a83cf 100644 --- a/asynciterator.ts +++ b/asynciterator.ts @@ -1281,9 +1281,7 @@ export class SynchronousTransformIterator extends AsyncIterator { this.close(); }; const onSourceError = (err: Error) => { - scheduleTask(() => { - this.emit('error', err); - }); + this.emit('error', err); }; const onSourceReadable = () => { if (this.readable) { @@ -1293,9 +1291,7 @@ export class SynchronousTransformIterator extends AsyncIterator { // It looks like the debouncing that happens in `set readable()` // in `AsyncIterator` prevents the event from firing as `this` // is already readable. - scheduleTask(() => { - this.emit('readable'); - }); + this.emit('readable'); } else { this.readable = true; From 44fa99ccfc98376a2aa3e44741574a6105d2df3e Mon Sep 17 00:00:00 2001 From: Jacopo Scazzosi Date: Fri, 25 Mar 2022 15:00:49 +0100 Subject: [PATCH 10/34] faster wrapping of iterator-like sources --- asynciterator.ts | 38 ++++++++++++++-- test/TransformIterator-test.js | 3 +- test/WrappingIterator-test.js | 82 ++++++++++++++++++++++++++++++++++ test/wrap-test.js | 55 +++++++++++++++++++++++ 4 files changed, 174 insertions(+), 4 deletions(-) create mode 100644 test/WrappingIterator-test.js create mode 100644 test/wrap-test.js diff --git a/asynciterator.ts b/asynciterator.ts index b9a83cf..f60e32c 100644 --- a/asynciterator.ts +++ b/asynciterator.ts @@ -2117,17 +2117,49 @@ class HistoryReader { } } +export class WrappingIterator extends AsyncIterator { + protected _source?: InternalSource; + + constructor(sourceOrPromise: AsyncIterator | Promise> | EventEmitter | Promise) { + super(); + if (sourceOrPromise instanceof AsyncIterator) + return sourceOrPromise; + Promise.resolve(sourceOrPromise) + .then(source => { + // @ts-ignore - TODO: how to drop this cleanly? + if (!isFunction(source.read) || !isFunction(source.on)) + throw new Error(`Invalid source: ${source}`); + this._source = (source as InternalSource) + .on('end', () => { + this.close(); + }); + this.readable = true; + }) + .catch(error => { + this.emit('error', error); + }); + } + + read(): T | null { + if (this._source) + return this._source.read(); + return null; + } +} + /** Creates an iterator that wraps around a given iterator or readable stream. Use this to convert an iterator-like object into a full-featured AsyncIterator. After this operation, only read the returned iterator instead of the given one. @function - @param {module:asynciterator.AsyncIterator|Readable} [source] The source this iterator generates items from + @param {module:asynciterator.AsyncIterator|Readable} [sourceOrPromise] The source this iterator generates items from @param {object} [options] Settings of the iterator @returns {module:asynciterator.AsyncIterator} A new iterator with the items from the given iterator */ -export function wrap(source: EventEmitter | Promise, options?: TransformIteratorOptions) { - return new TransformIterator(source as AsyncIterator | Promise>, options); +export function wrap(sourceOrPromise: AsyncIterator | Promise> | EventEmitter | Promise, options?: TransformIteratorOptions) { + if (options) + return new TransformIterator(sourceOrPromise as AsyncIterator | Promise>, options); + return new WrappingIterator(sourceOrPromise); } /** diff --git a/test/TransformIterator-test.js b/test/TransformIterator-test.js index e5d8116..0f4f7a5 100644 --- a/test/TransformIterator-test.js +++ b/test/TransformIterator-test.js @@ -4,6 +4,7 @@ import { EmptyIterator, ArrayIterator, TransformIterator, + WrappingIterator, wrap, scheduleTask, } from '../dist/asynciterator.js'; @@ -38,7 +39,7 @@ describe('TransformIterator', () => { before(() => { instance = wrap(); }); it('should be an TransformIterator object', () => { - instance.should.be.an.instanceof(TransformIterator); + instance.should.be.an.instanceof(WrappingIterator); }); it('should be an AsyncIterator object', () => { diff --git a/test/WrappingIterator-test.js b/test/WrappingIterator-test.js new file mode 100644 index 0000000..5f86a10 --- /dev/null +++ b/test/WrappingIterator-test.js @@ -0,0 +1,82 @@ +import { AsyncIterator, ArrayIterator, WrappingIterator } from '../dist/asynciterator.js'; +import { EventEmitter } from 'events'; + +describe('WrappingIterator', () => { + describe('The WrappingIterator function', () => { + describe('the result when called with `new`', () => { + let instance; + + before(() => { + instance = new WrappingIterator({}); + }); + + it('should be a WrappingIterator object', () => { + instance.should.be.an.instanceof(WrappingIterator); + }); + + it('should be a AsyncIterator object', () => { + instance.should.be.an.instanceof(AsyncIterator); + }); + + it('should be an EventEmitter object', () => { + instance.should.be.an.instanceof(EventEmitter); + }); + }); + }); + describe('with an invalid source', () => { + it('should emit an error', done => { + const source = {}; + const wrapped = new WrappingIterator(source); + wrapped.on('error', err => { + err; + done(); + }); + }); + }); + describe('with an empty source', () => { + it('should end when the source ends', done => { + const source = new ArrayIterator([]); + const wrapped = new WrappingIterator(source); + wrapped.on('end', () => { + done(); + }); + }); + }); + describe('with a non-empty source', () => { + it('should end when the source ends', done => { + const source = new ArrayIterator([0, 1, 2, 3, 4]); + const wrapped = new WrappingIterator(source); + wrapped.on('data', item => { item; }).on('end', () => { + done(); + }); + }); + it('should emit items from the source before ending', done => { + const array = [0, 1, 2, 3, 4]; + const source = new ArrayIterator(array); + const wrapped = new WrappingIterator(source); + let i = 0; + wrapped + .on('data', item => { + item.should.equal(array[i++]); + }) + .on('end', () => { + done(); + }); + }); + }); + describe('with a promise of a non-empty source', () => { + it('should emit items from the source before ending', done => { + const array = [0, 1, 2, 3, 4]; + const source = new ArrayIterator(array); + const wrapped = new WrappingIterator(Promise.resolve(source)); + let i = 0; + wrapped + .on('data', item => { + item.should.equal(array[i++]); + }) + .on('end', () => { + done(); + }); + }); + }); +}); diff --git a/test/wrap-test.js b/test/wrap-test.js new file mode 100644 index 0000000..c54e49b --- /dev/null +++ b/test/wrap-test.js @@ -0,0 +1,55 @@ + +import { + wrap, + ArrayIterator, + TransformIterator, + WrappingIterator, +} from '../dist/asynciterator.js'; + +import { EventEmitter } from 'events'; + +class IteratorLike extends EventEmitter { + constructor() { + super(); + this._count = 0; + } + + read() { + if (this._count >= 5) { + this.emit('end'); + return null; + } + return this._count++; + } +} + +describe('The wrap() function', () => { + it('should let an instance of AsyncIterator pass through without wrapping', () => { + const source = new ArrayIterator([0, 1, 2, 3, 4]); + const wrapped = wrap(source); + wrapped.should.equal(source); + wrapped.should.be.instanceof(ArrayIterator); + }); + + it('should emit an error when an incompatible source is passed', done => { + const source = {}; + const wrapped = wrap(source); + wrapped.on('error', err => { + err; + done(); + }); + }); + + it('should return a TransformIterator when an options object is passed', () => { + const source = new ArrayIterator([0, 1, 2, 3, 4]); + const options = { map: num => num * 2 }; + const wrapped = wrap(source, options); + wrapped.should.be.instanceof(TransformIterator); + }); + + it('should return a WrappingIterator when no options object is passed', () => { + const source = new IteratorLike(); + const wrapped = wrap(source); + wrapped.should.be.instanceof(WrappingIterator); + }); +}); From e8f9108eb1a777ba0e11dc2162e60af2739113ed Mon Sep 17 00:00:00 2001 From: Jacopo Scazzosi Date: Fri, 25 Mar 2022 15:07:34 +0100 Subject: [PATCH 11/34] adds missing readable event handler in WrappingIterator --- asynciterator.ts | 3 +++ 1 file changed, 3 insertions(+) diff --git a/asynciterator.ts b/asynciterator.ts index f60e32c..f0af6d6 100644 --- a/asynciterator.ts +++ b/asynciterator.ts @@ -2132,6 +2132,9 @@ export class WrappingIterator extends AsyncIterator { this._source = (source as InternalSource) .on('end', () => { this.close(); + }) + .on('readable', () => { + this.readable = true; }); this.readable = true; }) From 46c6877295c3bdd238ef94e1ae253d7b2ffb5ca1 Mon Sep 17 00:00:00 2001 From: Jacopo Scazzosi Date: Sat, 26 Mar 2022 11:34:07 +0100 Subject: [PATCH 12/34] puts iterator pass-through behind a dedicated option, fixes breaking end events on promisified sources --- asynciterator.ts | 67 ++++++++++++++++++++++++---------- test/TransformIterator-test.js | 19 ---------- test/WrappingIterator-test.js | 15 ++++++-- test/wrap-test.js | 20 ++++++++-- 4 files changed, 75 insertions(+), 46 deletions(-) diff --git a/asynciterator.ts b/asynciterator.ts index f0af6d6..73522be 100644 --- a/asynciterator.ts +++ b/asynciterator.ts @@ -2117,30 +2117,54 @@ class HistoryReader { } } +export interface WrappingIteratorOptions { + letIteratorThrough?: boolean; +} + +export type PromiseLike = Pick, 'then' | 'catch'> + +/* eslint-disable arrow-body-style */ +export const isPromiseLike = (item: { [key: string]: any }): item is PromiseLike => { + return isFunction(item.then) && isFunction(item.catch); +}; + export class WrappingIterator extends AsyncIterator { protected _source?: InternalSource; - constructor(sourceOrPromise: AsyncIterator | Promise> | EventEmitter | Promise) { + constructor(sourceOrPromise: EventEmitter | Promise | PromiseLike, options: WrappingIteratorOptions = {}) { super(); - if (sourceOrPromise instanceof AsyncIterator) + if (options.letIteratorThrough === true && sourceOrPromise instanceof AsyncIterator) return sourceOrPromise; - Promise.resolve(sourceOrPromise) - .then(source => { - // @ts-ignore - TODO: how to drop this cleanly? - if (!isFunction(source.read) || !isFunction(source.on)) - throw new Error(`Invalid source: ${source}`); - this._source = (source as InternalSource) - .on('end', () => { - this.close(); - }) - .on('readable', () => { - this.readable = true; - }); - this.readable = true; + if (sourceOrPromise instanceof Promise || isPromiseLike(sourceOrPromise)) { + sourceOrPromise + .then(source => { + WrappingIterator._wrapSource(source, this); + }) + .catch(err => { + this.emit('error', err); + }); + } + else { + WrappingIterator._wrapSource(sourceOrPromise, this); + } + } + + protected static _wrapSource(source: EventEmitter, wrapped: WrappingIterator) { + // @ts-ignore - TODO: how to drop this cleanly? + if (!isFunction(source.read) || !isFunction(source.on)) { + taskScheduler(() => { + wrapped.emit('error', new Error(`Invalid source: ${source}`)); + }); + return; + } + wrapped._source = (source as InternalSource) + .on('end', () => { + wrapped.close(); }) - .catch(error => { - this.emit('error', error); + .on('readable', () => { + wrapped.readable = true; }); + wrapped.readable = true; } read(): T | null { @@ -2159,10 +2183,13 @@ export class WrappingIterator extends AsyncIterator { @param {object} [options] Settings of the iterator @returns {module:asynciterator.AsyncIterator} A new iterator with the items from the given iterator */ -export function wrap(sourceOrPromise: AsyncIterator | Promise> | EventEmitter | Promise, options?: TransformIteratorOptions) { - if (options) +export function wrap( + sourceOrPromise: EventEmitter | Promise, + options: TransformIteratorOptions & WrappingIteratorOptions = {}, +): AsyncIterator { + if ('maxBufferSize' in options || 'autoStart' in options || 'optional' in options || 'destroySource' in options) return new TransformIterator(sourceOrPromise as AsyncIterator | Promise>, options); - return new WrappingIterator(sourceOrPromise); + return new WrappingIterator(sourceOrPromise, options); } /** diff --git a/test/TransformIterator-test.js b/test/TransformIterator-test.js index 0f4f7a5..a6cdcab 100644 --- a/test/TransformIterator-test.js +++ b/test/TransformIterator-test.js @@ -4,8 +4,6 @@ import { EmptyIterator, ArrayIterator, TransformIterator, - WrappingIterator, - wrap, scheduleTask, } from '../dist/asynciterator.js'; @@ -33,23 +31,6 @@ describe('TransformIterator', () => { instance.should.be.an.instanceof(EventEmitter); }); }); - - describe('the result when called through `wrap`', () => { - let instance; - before(() => { instance = wrap(); }); - - it('should be an TransformIterator object', () => { - instance.should.be.an.instanceof(WrappingIterator); - }); - - it('should be an AsyncIterator object', () => { - instance.should.be.an.instanceof(AsyncIterator); - }); - - it('should be an EventEmitter object', () => { - instance.should.be.an.instanceof(EventEmitter); - }); - }); }); describe('A TransformIterator', () => { diff --git a/test/WrappingIterator-test.js b/test/WrappingIterator-test.js index 5f86a10..aef904c 100644 --- a/test/WrappingIterator-test.js +++ b/test/WrappingIterator-test.js @@ -7,7 +7,7 @@ describe('WrappingIterator', () => { let instance; before(() => { - instance = new WrappingIterator({}); + instance = new WrappingIterator(new ArrayIterator([])); }); it('should be a WrappingIterator object', () => { @@ -23,7 +23,7 @@ describe('WrappingIterator', () => { }); }); }); - describe('with an invalid source', () => { + describe('the result when called with new and with an invalid source', () => { it('should emit an error', done => { const source = {}; const wrapped = new WrappingIterator(source); @@ -33,14 +33,21 @@ describe('WrappingIterator', () => { }); }); }); - describe('with an empty source', () => { - it('should end when the source ends', done => { + describe('with an empty source iterator', () => { + it('should end when the source iterator ends and letIteratorThrough is not set', done => { const source = new ArrayIterator([]); const wrapped = new WrappingIterator(source); wrapped.on('end', () => { done(); }); }); + it('should end when the source iterator ends and letIteratorThrough is set to true', done => { + const source = new ArrayIterator([]); + const wrapped = new WrappingIterator(source, { letIteratorThrough: true }); + wrapped.on('end', () => { + done(); + }); + }); }); describe('with a non-empty source', () => { it('should end when the source ends', done => { diff --git a/test/wrap-test.js b/test/wrap-test.js index c54e49b..127082b 100644 --- a/test/wrap-test.js +++ b/test/wrap-test.js @@ -24,9 +24,23 @@ class IteratorLike extends EventEmitter { } describe('The wrap() function', () => { - it('should let an instance of AsyncIterator pass through without wrapping', () => { + it('should not let an instance of AsyncIterator pass through without wrapping if letIteratorThrough option is not set', () => { const source = new ArrayIterator([0, 1, 2, 3, 4]); const wrapped = wrap(source); + wrapped.should.not.equal(source); + wrapped.should.be.instanceof(WrappingIterator); + }); + + it('should not let an instance of AsyncIterator pass through without wrapping if letIteratorThrough option is set to false', () => { + const source = new ArrayIterator([0, 1, 2, 3, 4]); + const wrapped = wrap(source, { letIteratorThrough: false }); + wrapped.should.not.equal(source); + wrapped.should.be.instanceof(WrappingIterator); + }); + + it('should let an instance of AsyncIterator pass through without wrapping if letIteratorThrough option is set to true', () => { + const source = new ArrayIterator([0, 1, 2, 3, 4]); + const wrapped = wrap(source, { letIteratorThrough: true }); wrapped.should.equal(source); wrapped.should.be.instanceof(ArrayIterator); }); @@ -40,9 +54,9 @@ describe('The wrap() function', () => { }); }); - it('should return a TransformIterator when an options object is passed', () => { + it('should return a TransformIterator when transform options are passed', () => { const source = new ArrayIterator([0, 1, 2, 3, 4]); - const options = { map: num => num * 2 }; + const options = { maxBufferSize: 42 }; const wrapped = wrap(source, options); wrapped.should.be.instanceof(TransformIterator); }); From 60a0d150382c1cc806279273e3e970e1a9deaab0 Mon Sep 17 00:00:00 2001 From: Jacopo Scazzosi Date: Sat, 26 Mar 2022 11:57:40 +0100 Subject: [PATCH 13/34] better typings for iterator-like and promise-like objects --- asynciterator.ts | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/asynciterator.ts b/asynciterator.ts index 73522be..f0c58f3 100644 --- a/asynciterator.ts +++ b/asynciterator.ts @@ -2121,21 +2121,27 @@ export interface WrappingIteratorOptions { letIteratorThrough?: boolean; } -export type PromiseLike = Pick, 'then' | 'catch'> +export type PromiseLike = Pick, 'then' | 'catch'>; /* eslint-disable arrow-body-style */ export const isPromiseLike = (item: { [key: string]: any }): item is PromiseLike => { return isFunction(item.then) && isFunction(item.catch); }; +export type IteratorLike = EventEmitter & { on: () => any; read: () => T | null }; + +export const isIteratorLike = (item: EventEmitter & { [key: string]: any }): item is IteratorLike => { + return isFunction(item.on) && isFunction(item.read); +}; + export class WrappingIterator extends AsyncIterator { - protected _source?: InternalSource; + protected _source?: IteratorLike; - constructor(sourceOrPromise: EventEmitter | Promise | PromiseLike, options: WrappingIteratorOptions = {}) { + constructor(sourceOrPromise: EventEmitter | PromiseLike, options: WrappingIteratorOptions = {}) { super(); if (options.letIteratorThrough === true && sourceOrPromise instanceof AsyncIterator) return sourceOrPromise; - if (sourceOrPromise instanceof Promise || isPromiseLike(sourceOrPromise)) { + if (isPromiseLike(sourceOrPromise)) { sourceOrPromise .then(source => { WrappingIterator._wrapSource(source, this); @@ -2150,14 +2156,13 @@ export class WrappingIterator extends AsyncIterator { } protected static _wrapSource(source: EventEmitter, wrapped: WrappingIterator) { - // @ts-ignore - TODO: how to drop this cleanly? - if (!isFunction(source.read) || !isFunction(source.on)) { + if (!isIteratorLike(source)) { taskScheduler(() => { wrapped.emit('error', new Error(`Invalid source: ${source}`)); }); return; } - wrapped._source = (source as InternalSource) + wrapped._source = source .on('end', () => { wrapped.close(); }) From 12885ba9ec1bc36932ca06740bd6d67e60452f7c Mon Sep 17 00:00:00 2001 From: Jesse Wright Date: Sat, 26 Mar 2022 14:25:27 +1100 Subject: [PATCH 14/34] Allow unionIterator to take promises. Closes https://github.com/RubenVerborgh/AsyncIterator/issues/42 --- asynciterator.ts | 20 ++++++--- test/UnionIterator-test.js | 92 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 105 insertions(+), 7 deletions(-) diff --git a/asynciterator.ts b/asynciterator.ts index f0c58f3..07560c7 100644 --- a/asynciterator.ts +++ b/asynciterator.ts @@ -1783,7 +1783,7 @@ export class MultiTransformIterator extends TransformIterator { */ export class UnionIterator extends BufferedIterator { private _sources : InternalSource[] = []; - private _pending? : { sources?: AsyncIterator> }; + private _pending? : { sources?: AsyncIterator>> }; private _currentSource = -1; /** @@ -1791,7 +1791,9 @@ export class UnionIterator extends BufferedIterator { @param {module:asynciterator.AsyncIterator|Array} [sources] The sources to read from @param {object} [options] Settings of the iterator */ - constructor(sources: AsyncIteratorOrArray>, + constructor(sources: AsyncIteratorOrArray> | + AsyncIteratorOrArray>> | + AsyncIteratorOrArray>>, options: BufferedIteratorOptions = {}) { super(options); const autoStart = options.autoStart !== false; @@ -1799,14 +1801,14 @@ export class UnionIterator extends BufferedIterator { // Sources have been passed as an iterator if (isEventEmitter(sources)) { sources.on('error', error => this.emit('error', error)); - this._pending = { sources }; + this._pending = { sources: sources as AsyncIterator>> }; if (autoStart) this._loadSources(); } // Sources have been passed as a non-empty array else if (Array.isArray(sources) && sources.length > 0) { for (const source of sources) - this._addSource(source as InternalSource); + this._addSource(source as MaybePromise>); } // Sources are an empty list else if (autoStart) { @@ -1828,7 +1830,7 @@ export class UnionIterator extends BufferedIterator { // Otherwise, set up source reading else { sources.on('data', source => { - this._addSource(source as InternalSource); + this._addSource(source as MaybePromise>); this._fillBufferAsync(); }); sources.on('end', () => { @@ -1839,7 +1841,9 @@ export class UnionIterator extends BufferedIterator { } // Adds the given source to the internal sources array - protected _addSource(source: InternalSource) { + protected _addSource(source: MaybePromise>) { + if (isPromise(source)) + source = wrap(source) as any as InternalSource; if (!source.done) { this._sources.push(source); source._destination = this; @@ -2224,7 +2228,9 @@ export function fromArray(items: Iterable) { Creates an iterator containing all items from the given iterators. @param {Array} items the items */ -export function union(sources: AsyncIteratorOrArray>) { +export function union(sources: AsyncIteratorOrArray> | + AsyncIteratorOrArray>> | + AsyncIteratorOrArray>>) { return new UnionIterator(sources); } diff --git a/test/UnionIterator-test.js b/test/UnionIterator-test.js index 308f52d..c7714ec 100644 --- a/test/UnionIterator-test.js +++ b/test/UnionIterator-test.js @@ -260,6 +260,50 @@ describe('UnionIterator', () => { }); }); + describe('when constructed with an iterator and with autoStart and one source a promise', () => { + let iterator, sourceIterator; + before(() => { + const sources = [Promise.resolve(range(0, 2)), range(3, 6)]; + sourceIterator = new ArrayIterator(sources); + sinon.spy(sourceIterator, 'read'); + iterator = new UnionIterator(sourceIterator, { autoStart: true }); + }); + + describe('before reading', () => { + it('should have read the sources', () => { + sourceIterator.read.should.have.been.called; + }); + + it('should not have ended', () => { + iterator.ended.should.be.false; + }); + + it('should pass errors', () => { + const callback = sinon.spy(); + const error = new Error('error'); + iterator.once('error', callback); + sourceIterator.emit('error', error); + callback.should.have.been.calledOnce; + callback.should.have.been.calledWith(error); + }); + }); + + describe('after reading', () => { + let items; + before(async () => { + items = (await toArray(iterator)).sort(); + }); + + it('should have emitted all items', () => { + items.should.eql([0, 1, 2, 3, 4, 5, 6]); + }); + + it('should have ended', () => { + iterator.ended.should.be.true; + }); + }); + }); + describe('when constructed with an iterator and without autoStart', () => { let iterator, sourceIterator; before(() => { @@ -308,6 +352,54 @@ describe('UnionIterator', () => { }); }); + describe('when constructed with an iterator and without autoStart and one source as a promise', () => { + let iterator, sourceIterator; + before(() => { + const sources = [Promise.resolve(range(0, 2)), range(3, 6)]; + sourceIterator = new ArrayIterator(sources); + sinon.spy(sourceIterator, 'read'); + iterator = new UnionIterator(sourceIterator, { autoStart: false }); + }); + + describe('before reading', () => { + it('should not have read the sources', () => { + sourceIterator.read.should.not.have.been.called; + }); + + it('should not have ended', () => { + iterator.ended.should.be.false; + }); + + it('should pass errors', () => { + const callback = sinon.spy(); + const error = new Error('error'); + iterator.once('error', callback); + sourceIterator.emit('error', error); + callback.should.have.been.calledOnce; + callback.should.have.been.calledWith(error); + }); + }); + + describe('after reading', () => { + let items; + before(async () => { + items = (await toArray(iterator)).sort(); + }); + + it('should have read the sources', () => { + sourceIterator.read.should.have.been.called; + }); + + it('should have emitted all items', () => { + items.should.eql([0, 1, 2, 3, 4, 5, 6]); + }); + + it('should have ended', () => { + iterator.ended.should.be.true; + }); + }); + }); + describe('a UnionIterator with two sources', () => { let iterator, sources; From 730bf645d82257efd27fb23c3b1ef2d0d8bd3621 Mon Sep 17 00:00:00 2001 From: Ruben Verborgh Date: Sat, 26 Mar 2022 16:35:01 +0000 Subject: [PATCH 15/34] Release version 3.4.0 of the npm package. --- package-lock.json | 4 ++-- package.json | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/package-lock.json b/package-lock.json index 831df0f..d438e01 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "asynciterator", - "version": "3.3.0", + "version": "3.4.0", "lockfileVersion": 2, "requires": true, "packages": { "": { "name": "asynciterator", - "version": "3.3.0", + "version": "3.4.0", "license": "MIT", "devDependencies": { "@babel/cli": "^7.10.1", diff --git a/package.json b/package.json index 03b490b..edcab1f 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "asynciterator", - "version": "3.3.0", + "version": "3.4.0", "description": "An asynchronous iterator library for advanced object pipelines.", "author": "Ruben Verborgh ", "type": "module", From 55f21c6ea4ff7e70375740230712e2238747defe Mon Sep 17 00:00:00 2001 From: Jesse Wright Date: Sun, 27 Mar 2022 16:37:05 +1100 Subject: [PATCH 16/34] feat: wrapIterator --- asynciterator.ts | 31 +++++++++++++++++++++++++++++-- test/WrapIterator-test.js | 13 +++++++++++++ 2 files changed, 42 insertions(+), 2 deletions(-) create mode 100644 test/WrapIterator-test.js diff --git a/asynciterator.ts b/asynciterator.ts index 07560c7..51da262 100644 --- a/asynciterator.ts +++ b/asynciterator.ts @@ -2153,8 +2153,7 @@ export class WrappingIterator extends AsyncIterator { .catch(err => { this.emit('error', err); }); - } - else { + } else { WrappingIterator._wrapSource(sourceOrPromise, this); } } @@ -2183,6 +2182,22 @@ export class WrappingIterator extends AsyncIterator { } } +class WrapIterator extends AsyncIterator { + constructor(private source: Iterator) { + super(); + this.readable = true; + } + + read(): T | null { + const item = this.source.next(); + if (item.done) { + this.close(); + return null; + } + return item.value; + } +} + /** Creates an iterator that wraps around a given iterator or readable stream. Use this to convert an iterator-like object into a full-featured AsyncIterator. @@ -2201,6 +2216,18 @@ export function wrap( return new WrappingIterator(sourceOrPromise, options); } +/** + Creates an iterator that wraps around a given synchronous iterator. + Use this to convert an iterator-like object into a full-featured AsyncIterator. + After this operation, only read the returned iterator instead of the given one. + @function + @param {Iterator} [source] The source this iterator generates items from + @returns {module:asynciterator.AsyncIterator} A new iterator with the items from the given iterator +*/ +export function wrapIterator(source: Iterator) { + return new WrapIterator(source); +} + /** Creates an empty iterator. */ diff --git a/test/WrapIterator-test.js b/test/WrapIterator-test.js new file mode 100644 index 0000000..a7adec1 --- /dev/null +++ b/test/WrapIterator-test.js @@ -0,0 +1,13 @@ +import { + wrapIterator, +} from '../dist/asynciterator.js'; + +describe('wrapIterator', () => { + it('Should wrap correctly', async () => { + (await wrapIterator((function * () { + yield 1; + yield 2; + yield 3; + })()).toArray()).should.deep.equal([1, 2, 3]); + }); +}); From 2339f4b2be2fa83790c57335b9062827c9a36a57 Mon Sep 17 00:00:00 2001 From: Jacopo Scazzosi Date: Sun, 27 Mar 2022 19:19:24 +0200 Subject: [PATCH 17/34] universal wrap() function --- asynciterator.ts | 118 +++++++++++------- ...rator-test.js => IteratorIterator-test.js} | 6 +- 2 files changed, 73 insertions(+), 51 deletions(-) rename test/{WrapIterator-test.js => IteratorIterator-test.js} (64%) diff --git a/asynciterator.ts b/asynciterator.ts index 51da262..7b0b11c 100644 --- a/asynciterator.ts +++ b/asynciterator.ts @@ -2121,58 +2121,55 @@ class HistoryReader { } } -export interface WrappingIteratorOptions { - letIteratorThrough?: boolean; -} - -export type PromiseLike = Pick, 'then' | 'catch'>; +export type IteratorLike = EventEmitter & { + on: (event: string | symbol, listener: (...args: any[]) => void) => AsyncIterator; read: () => T | null }; /* eslint-disable arrow-body-style */ -export const isPromiseLike = (item: { [key: string]: any }): item is PromiseLike => { - return isFunction(item.then) && isFunction(item.catch); +export const isIteratorLike = (item: { [key: string]: any }): item is IteratorLike => { + return isFunction(item.on) && isFunction(item.read); }; -export type IteratorLike = EventEmitter & { on: () => any; read: () => T | null }; +export const isIterator = (item: { [key: string]: any }): item is Iterator => { + return isFunction(item.next); +}; -export const isIteratorLike = (item: EventEmitter & { [key: string]: any }): item is IteratorLike => { - return isFunction(item.on) && isFunction(item.read); +export const isIterable = (item: { [key: string]: any }): item is Iterable => { + return Symbol.iterator in item; }; export class WrappingIterator extends AsyncIterator { protected _source?: IteratorLike; - constructor(sourceOrPromise: EventEmitter | PromiseLike, options: WrappingIteratorOptions = {}) { + constructor(sourceOrPromise: WrapSource | Promise>, options: WrapOptions = {}) { super(); - if (options.letIteratorThrough === true && sourceOrPromise instanceof AsyncIterator) - return sourceOrPromise; - if (isPromiseLike(sourceOrPromise)) { + if (isPromise(sourceOrPromise)) { sourceOrPromise .then(source => { - WrappingIterator._wrapSource(source, this); + WrappingIterator._wrapSource(source, this, options); }) .catch(err => { this.emit('error', err); }); - } else { - WrappingIterator._wrapSource(sourceOrPromise, this); + } + else { + WrappingIterator._wrapSource(sourceOrPromise, this, options); } } - protected static _wrapSource(source: EventEmitter, wrapped: WrappingIterator) { - if (!isIteratorLike(source)) { - taskScheduler(() => { - wrapped.emit('error', new Error(`Invalid source: ${source}`)); - }); - return; + protected static _wrapSource(source: WrapSource, iterator: WrappingIterator, options: WrapOptions = {}) { + try { + iterator._source = (isIteratorLike(source) ? source : _wrap(source, options)) + .on('end', () => { + iterator.close(); + }) + .on('readable', () => { + iterator.readable = true; + }); + iterator.readable = true; + } + catch (err) { + scheduleTask(() => iterator.emit('error', err)); } - wrapped._source = source - .on('end', () => { - wrapped.close(); - }) - .on('readable', () => { - wrapped.readable = true; - }); - wrapped.readable = true; } read(): T | null { @@ -2182,7 +2179,7 @@ export class WrappingIterator extends AsyncIterator { } } -class WrapIterator extends AsyncIterator { +export class IteratorIterator extends AsyncIterator { constructor(private source: Iterator) { super(); this.readable = true; @@ -2198,8 +2195,29 @@ class WrapIterator extends AsyncIterator { } } +export interface WrapOptions { + letIteratorThrough?: boolean; +} + +export type WrapSource = T[] | EventEmitter | Iterator | Iterable; + +const _wrap = (source: WrapSource, options: WrapOptions = {}): AsyncIterator => { + if (options.letIteratorThrough && source instanceof AsyncIterator) + return source; + if (Array.isArray(source)) + return new ArrayIterator(source); + if (isIteratorLike(source)) + return new WrappingIterator(source); + if (isIterator(source)) + return new IteratorIterator(source); + if (isIterable(source)) + return new IteratorIterator(source[Symbol.iterator]()); + throw new Error(`Unsupported source ${source}`); +}; + /** - Creates an iterator that wraps around a given iterator or readable stream. + Creates an iterator that wraps around a given array, iterator, iterable or + readable stream. Use this to convert an iterator-like object into a full-featured AsyncIterator. After this operation, only read the returned iterator instead of the given one. @function @@ -2208,24 +2226,28 @@ class WrapIterator extends AsyncIterator { @returns {module:asynciterator.AsyncIterator} A new iterator with the items from the given iterator */ export function wrap( - sourceOrPromise: EventEmitter | Promise, - options: TransformIteratorOptions & WrappingIteratorOptions = {}, + sourceOrPromise: WrapSource | Promise>, + options: TransformIteratorOptions & WrapOptions = {}, ): AsyncIterator { + // For backward compatibility, passing TransformIteratorOptions results in + // an instance of TransformIterator. + // TODO: consider dropping this in the next major version if ('maxBufferSize' in options || 'autoStart' in options || 'optional' in options || 'destroySource' in options) return new TransformIterator(sourceOrPromise as AsyncIterator | Promise>, options); - return new WrappingIterator(sourceOrPromise, options); -} - -/** - Creates an iterator that wraps around a given synchronous iterator. - Use this to convert an iterator-like object into a full-featured AsyncIterator. - After this operation, only read the returned iterator instead of the given one. - @function - @param {Iterator} [source] The source this iterator generates items from - @returns {module:asynciterator.AsyncIterator} A new iterator with the items from the given iterator -*/ -export function wrapIterator(source: Iterator) { - return new WrapIterator(source); + // If the source is promisified, we *need* to use a WrappingIterator as this + // function is a synchronous one. + if (isPromise(sourceOrPromise)) + return new WrappingIterator(sourceOrPromise, options); + // The _wrap function synchronously return an iterator or throws on + // unsupported sources. However, for backward-compatiblity we need + // to relay errors as events of an AsyncIterator instance. + // TODO: consider dropping this in the next major version + try { + return _wrap(sourceOrPromise as WrapSource, options); + } + catch (err) { + return new WrappingIterator(sourceOrPromise); + } } /** diff --git a/test/WrapIterator-test.js b/test/IteratorIterator-test.js similarity index 64% rename from test/WrapIterator-test.js rename to test/IteratorIterator-test.js index a7adec1..868fc67 100644 --- a/test/WrapIterator-test.js +++ b/test/IteratorIterator-test.js @@ -1,10 +1,10 @@ import { - wrapIterator, + IteratorIterator, } from '../dist/asynciterator.js'; -describe('wrapIterator', () => { +describe('IteratorIterator', () => { it('Should wrap correctly', async () => { - (await wrapIterator((function * () { + (await new IteratorIterator((function * () { yield 1; yield 2; yield 3; From 3abf6b76b70df659b6a1e4de339d158bca183c6b Mon Sep 17 00:00:00 2001 From: Jacopo Scazzosi Date: Mon, 28 Mar 2022 07:10:47 +0200 Subject: [PATCH 18/34] adds dedicated from* functions for ES2015 Iterator and Iterable, IteratorLike, adds option to prioritize ES2015 Iterable and Iterator while wrapping --- asynciterator.ts | 48 +++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 41 insertions(+), 7 deletions(-) diff --git a/asynciterator.ts b/asynciterator.ts index 7b0b11c..5316d99 100644 --- a/asynciterator.ts +++ b/asynciterator.ts @@ -2196,6 +2196,7 @@ export class IteratorIterator extends AsyncIterator { } export interface WrapOptions { + prioritizeIterable?: boolean; letIteratorThrough?: boolean; } @@ -2204,14 +2205,22 @@ export type WrapSource = T[] | EventEmitter | Iterator | Iterable; const _wrap = (source: WrapSource, options: WrapOptions = {}): AsyncIterator => { if (options.letIteratorThrough && source instanceof AsyncIterator) return source; + if (options.prioritizeIterable) { + if (isIterator(source)) + return fromIterator(source); + if (isIterable(source)) + return fromIterable(source); + } if (Array.isArray(source)) - return new ArrayIterator(source); + return fromArray(source); if (isIteratorLike(source)) - return new WrappingIterator(source); - if (isIterator(source)) - return new IteratorIterator(source); - if (isIterable(source)) - return new IteratorIterator(source[Symbol.iterator]()); + return fromIteratorLike(source); + if (!options.prioritizeIterable) { + if (isIterator(source)) + return fromIterator(source); + if (isIterable(source)) + return fromIterable(source); + } throw new Error(`Unsupported source ${source}`); }; @@ -2269,10 +2278,35 @@ export function single(item: T) { Creates an iterator for the given array. @param {Array} items the items */ -export function fromArray(items: Iterable) { +export function fromArray(items: Iterable): AsyncIterator { return new ArrayIterator(items); } +/** + Creates an iterator for the given ES2015 Iterable. + @param {Iterable} iterable the iterable + */ +export function fromIterable(iterable: Iterable): AsyncIterator { + return new IteratorIterator(iterable[Symbol.iterator]()); +} + +/** + Creates an iterator for the given ES2015 Iterator. + @param {Iterable} iterator the iterator + */ +export function fromIterator(iterator: Iterator): AsyncIterator { + return new IteratorIterator(iterator); +} + +/** + * Creates an iterator for the given iterator-like object + * (AsyncIterator, stream.Readable, ...). + * @param {IteratorLike} iterator + */ +export function fromIteratorLike(iterator: IteratorLike): AsyncIterator { + return new WrappingIterator(iterator); +} + /** Creates an iterator containing all items from the given iterators. @param {Array} items the items From 10308913ee54c9ef6fec220dd5b7dcac80cd2653 Mon Sep 17 00:00:00 2001 From: Jacopo Scazzosi Date: Mon, 28 Mar 2022 07:27:58 +0200 Subject: [PATCH 19/34] renames IteratorLike to AsyncIteratorLike to highlight difference from ES2015 Iterator --- asynciterator.ts | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/asynciterator.ts b/asynciterator.ts index 5316d99..e938963 100644 --- a/asynciterator.ts +++ b/asynciterator.ts @@ -2121,11 +2121,11 @@ class HistoryReader { } } -export type IteratorLike = EventEmitter & { +export type AsyncIteratorLike = EventEmitter & { on: (event: string | symbol, listener: (...args: any[]) => void) => AsyncIterator; read: () => T | null }; /* eslint-disable arrow-body-style */ -export const isIteratorLike = (item: { [key: string]: any }): item is IteratorLike => { +export const isAsyncIteratorLike = (item: { [key: string]: any }): item is AsyncIteratorLike => { return isFunction(item.on) && isFunction(item.read); }; @@ -2138,7 +2138,7 @@ export const isIterable = (item: { [key: string]: any }): item is Iterable }; export class WrappingIterator extends AsyncIterator { - protected _source?: IteratorLike; + protected _source?: AsyncIteratorLike; constructor(sourceOrPromise: WrapSource | Promise>, options: WrapOptions = {}) { super(); @@ -2158,7 +2158,7 @@ export class WrappingIterator extends AsyncIterator { protected static _wrapSource(source: WrapSource, iterator: WrappingIterator, options: WrapOptions = {}) { try { - iterator._source = (isIteratorLike(source) ? source : _wrap(source, options)) + iterator._source = (isAsyncIteratorLike(source) ? source : _wrap(source, options)) .on('end', () => { iterator.close(); }) @@ -2213,8 +2213,8 @@ const _wrap = (source: WrapSource, options: WrapOptions = {}): AsyncIterat } if (Array.isArray(source)) return fromArray(source); - if (isIteratorLike(source)) - return fromIteratorLike(source); + if (isAsyncIteratorLike(source)) + return fromAsyncIteratorLike(source); if (!options.prioritizeIterable) { if (isIterator(source)) return fromIterator(source); @@ -2301,9 +2301,9 @@ export function fromIterator(iterator: Iterator): AsyncIterator { /** * Creates an iterator for the given iterator-like object * (AsyncIterator, stream.Readable, ...). - * @param {IteratorLike} iterator + * @param {AsyncIteratorLike} iterator */ -export function fromIteratorLike(iterator: IteratorLike): AsyncIterator { +export function fromAsyncIteratorLike(iterator: AsyncIteratorLike): AsyncIterator { return new WrappingIterator(iterator); } From 4bfadd6ca91d543d5e36010de829525c8e195829 Mon Sep 17 00:00:00 2001 From: Jesse Wright Date: Sun, 27 Mar 2022 15:25:32 +1100 Subject: [PATCH 20/34] chore: SyncUnionIterator --- asynciterator.ts | 93 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 93 insertions(+) diff --git a/asynciterator.ts b/asynciterator.ts index e938963..bd2e6de 100644 --- a/asynciterator.ts +++ b/asynciterator.ts @@ -1777,6 +1777,99 @@ export class MultiTransformIterator extends TransformIterator { } } +export class SyncUnionIterator extends AsyncIterator { + private _sources : InternalSource[] = []; + private _pending? : { sources?: AsyncIterator>> }; + private _currentSource = 0; + + /** + Creates a new `SyncUnionIterator`. + @param {module:asynciterator.AsyncIterator|Array} [sources] The sources to read from + @param {object} [options] Settings of the iterator + */ + constructor(sources: AsyncIteratorOrArray> | + AsyncIteratorOrArray>> | + AsyncIteratorOrArray>>) { + super(); + + // Sources have been passed as an iterator + if (isEventEmitter(sources)) { + sources.on('error', error => this.emit('error', error)); + this._pending = { sources: sources as AsyncIterator>> }; + this._loadSources(); + } + // Sources have been passed as a non-empty array + else if (Array.isArray(sources) && sources.length > 0) { + for (const source of sources) + this._addSource(source as MaybePromise>); + } + } + + // Loads sources passed as an iterator + protected _loadSources() { + // Obtain sources iterator + const sources = this._pending!.sources!; + delete this._pending!.sources; + + // Close immediately if done + if (sources.done) { + delete this._pending; + this.close(); + } + // Otherwise, set up source reading + else { + sources.on('data', source => { + this._addSource(source as MaybePromise>); + }); + sources.on('end', () => { + delete this._pending; + if (this._sources.length === 0) + this.close(); + }); + } + } + + // Adds the given source to the internal sources array + public _addSource(source: MaybePromise>) { + if (isPromise(source)) + source = wrap(source) as any as InternalSource; + if (!source.done) { + this._sources.push(source); + source._destination = this; + source.on('error', destinationEmitError); + source.on('readable', () => { this.readable = true }); + source.on('end', destinationRemoveEmptySources); + } + } + + // Removes sources that will no longer emit items + protected _removeEmptySources() { + this._sources = this._sources.filter((source, index) => { + // Adjust the index of the current source if needed + if (source.done && index <= this._currentSource) + this._currentSource--; + return !source.done; + }); + if (!this._pending && this._sources.length === 0) + this.close(); + } + + // Reads items from the next sources + public read(): T | null { + for (let i = 0; i < this._sources.length; i++) { + // this._currentSource = (this._currentSource + 1) % this._sources.length; + const item = this._sources[this._currentSource].read(); + // Attempt to read an item from that source + if (item !== null) + return item + this._currentSource = (this._currentSource + 1) % this._sources.length; + // this._currentSource += 1 + } + + return null; + } +} + /** An iterator that generates items by reading from multiple other iterators. @extends module:asynciterator.BufferedIterator From c7574461c56446962084b5d4a90032c1778312c0 Mon Sep 17 00:00:00 2001 From: Jesse Wright Date: Sun, 27 Mar 2022 15:30:22 +1100 Subject: [PATCH 21/34] chore: lint --- asynciterator.ts | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/asynciterator.ts b/asynciterator.ts index bd2e6de..a81cee7 100644 --- a/asynciterator.ts +++ b/asynciterator.ts @@ -1837,7 +1837,9 @@ export class SyncUnionIterator extends AsyncIterator { this._sources.push(source); source._destination = this; source.on('error', destinationEmitError); - source.on('readable', () => { this.readable = true }); + source.on('readable', () => { + this.readable = true; + }); source.on('end', destinationRemoveEmptySources); } } @@ -1851,7 +1853,7 @@ export class SyncUnionIterator extends AsyncIterator { return !source.done; }); if (!this._pending && this._sources.length === 0) - this.close(); + this.close(); } // Reads items from the next sources @@ -1861,7 +1863,7 @@ export class SyncUnionIterator extends AsyncIterator { const item = this._sources[this._currentSource].read(); // Attempt to read an item from that source if (item !== null) - return item + return item; this._currentSource = (this._currentSource + 1) % this._sources.length; // this._currentSource += 1 } From 3350b2c8a94d9a5958a12d0d57d72b06fa695363 Mon Sep 17 00:00:00 2001 From: Jacopo Scazzosi Date: Wed, 30 Mar 2022 09:31:16 +0200 Subject: [PATCH 22/34] changes union() to return an instance of SyncUnionIterator --- asynciterator.ts | 2 +- test/UnionIterator-test.js | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/asynciterator.ts b/asynciterator.ts index a81cee7..664afbe 100644 --- a/asynciterator.ts +++ b/asynciterator.ts @@ -2409,7 +2409,7 @@ export function fromAsyncIteratorLike(iterator: AsyncIteratorLike): AsyncI export function union(sources: AsyncIteratorOrArray> | AsyncIteratorOrArray>> | AsyncIteratorOrArray>>) { - return new UnionIterator(sources); + return new SyncUnionIterator(sources); } /** diff --git a/test/UnionIterator-test.js b/test/UnionIterator-test.js index c7714ec..e8950fd 100644 --- a/test/UnionIterator-test.js +++ b/test/UnionIterator-test.js @@ -4,6 +4,7 @@ import { ArrayIterator, BufferedIterator, EmptyIterator, + SyncUnionIterator, union, range, scheduleTask, @@ -35,7 +36,7 @@ describe('UnionIterator', () => { before(() => { instance = union(); }); it('should be an UnionIterator object', () => { - instance.should.be.an.instanceof(UnionIterator); + instance.should.be.an.instanceof(SyncUnionIterator); }); it('should be an AsyncIterator object', () => { From 14d2f7cc2abdb6f8323a8098feb6e2a8f8a3d1dc Mon Sep 17 00:00:00 2001 From: Jacopo Scazzosi Date: Wed, 30 Mar 2022 13:04:17 +0200 Subject: [PATCH 23/34] improves handling of BufferedIterator with autoStart set to false in wrapping and synchronous transform --- asynciterator.ts | 58 ++++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 51 insertions(+), 7 deletions(-) diff --git a/asynciterator.ts b/asynciterator.ts index 664afbe..e191775 100644 --- a/asynciterator.ts +++ b/asynciterator.ts @@ -793,6 +793,11 @@ export class IntegerIterator extends AsyncIterator { @extends module:asynciterator.AsyncIterator */ export class BufferedIterator extends AsyncIterator { + static ensureInit(iterator: BufferedIterator) { + if (iterator._state === INIT) + iterator._init(true); + } + private _buffer: LinkedList = new LinkedList(); private _maxBufferSize = 4; protected _reading = true; @@ -1302,6 +1307,8 @@ export class SynchronousTransformIterator extends AsyncIterator { source.on('readable', onSourceReadable); if (source.readable) onSourceReadable(); + else if (source instanceof BufferedIterator) + BufferedIterator.ensureInit(source); } protected _destroy(cause: Error | undefined, callback: (error?: Error) => void) { @@ -2253,14 +2260,51 @@ export class WrappingIterator extends AsyncIterator { protected static _wrapSource(source: WrapSource, iterator: WrappingIterator, options: WrapOptions = {}) { try { - iterator._source = (isAsyncIteratorLike(source) ? source : _wrap(source, options)) - .on('end', () => { - iterator.close(); - }) - .on('readable', () => { - iterator.readable = true; + const wrappedSource = isAsyncIteratorLike(source) ? source : _wrap(source, options); + + const cleanup = () => { + wrappedSource.removeListener('end', onSourceEnd); + wrappedSource.removeListener('error', onSourceError); + wrappedSource.removeListener('readable', onSourceReadable); + scheduleTask(() => { + delete iterator._source; }); - iterator.readable = true; + }; + + const onSourceReadable = () => { + if (iterator.readable) { + // TODO: I'm not completely sure as to why this is needed but without + // the following line, some use cases relying on flow mode (i.e. + // consuming items via `on('data', (data) => {})`) do not work. + // It looks like the debouncing that happens in `set readable()` + // in `AsyncIterator` prevents the event from firing as `this` + // is already readable. + iterator.emit('readable'); + } + else { + iterator.readable = true; + } + }; + + const onSourceEnd = () => { + iterator.close(); + cleanup(); + }; + + const onSourceError = (err: Error) => { + iterator.emit('error', err); + }; + + wrappedSource.on('end', onSourceEnd); + wrappedSource.on('error', onSourceError); + wrappedSource.on('readable', onSourceReadable); + + iterator._source = wrappedSource; + + if (wrappedSource instanceof AsyncIterator && wrappedSource.readable) + onSourceReadable(); + else if (wrappedSource instanceof BufferedIterator) + BufferedIterator.ensureInit(wrappedSource); } catch (err) { scheduleTask(() => iterator.emit('error', err)); From ce929c73fe141f9e3d4b970b4b0c6e9b27b9d006 Mon Sep 17 00:00:00 2001 From: Jacopo Scazzosi Date: Wed, 30 Mar 2022 13:12:13 +0200 Subject: [PATCH 24/34] makes union() return UnionIterator instances as we wait for SyncUnionIterator to mature --- asynciterator.ts | 2 +- test/UnionIterator-test.js | 3 +-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/asynciterator.ts b/asynciterator.ts index e191775..3be8330 100644 --- a/asynciterator.ts +++ b/asynciterator.ts @@ -2453,7 +2453,7 @@ export function fromAsyncIteratorLike(iterator: AsyncIteratorLike): AsyncI export function union(sources: AsyncIteratorOrArray> | AsyncIteratorOrArray>> | AsyncIteratorOrArray>>) { - return new SyncUnionIterator(sources); + return new UnionIterator(sources); } /** diff --git a/test/UnionIterator-test.js b/test/UnionIterator-test.js index e8950fd..c7714ec 100644 --- a/test/UnionIterator-test.js +++ b/test/UnionIterator-test.js @@ -4,7 +4,6 @@ import { ArrayIterator, BufferedIterator, EmptyIterator, - SyncUnionIterator, union, range, scheduleTask, @@ -36,7 +35,7 @@ describe('UnionIterator', () => { before(() => { instance = union(); }); it('should be an UnionIterator object', () => { - instance.should.be.an.instanceof(SyncUnionIterator); + instance.should.be.an.instanceof(UnionIterator); }); it('should be an AsyncIterator object', () => { From e98e248ff4b5092a341935e3ec63034a6bd248e7 Mon Sep 17 00:00:00 2001 From: Jesse Wright Date: Sun, 3 Apr 2022 17:51:08 +1000 Subject: [PATCH 25/34] introduce maybe map and do some refactoring --- asynciterator.ts | 152 +++++++++++++++++++++++----------- test/AsyncIterator-test.js | 9 ++ test/IteratorIterator-test.js | 26 ++++-- test/WrappingIterator-test.js | 23 ++++- 4 files changed, 156 insertions(+), 54 deletions(-) diff --git a/asynciterator.ts b/asynciterator.ts index 3be8330..1a4661b 100644 --- a/asynciterator.ts +++ b/asynciterator.ts @@ -459,6 +459,14 @@ export class AsyncIterator extends EventEmitter { return new MappingIterator(this, self ? map.bind(self) : map); } + /** + Possibly maps items, return null if no item should be emitted. + @param {Function} maybeMap The function to multiMap items with + */ + maybeMap(map: (item: T) => D | null, self?: any): AsyncIterator { + return new MaybeMappingIterator(this, self ? map.bind(self) : map); + } + /** MultiMaps items according to a synchronous generator (hence no need for buffering) @param {Function} multiMap The function to multiMap items with @@ -1264,9 +1272,13 @@ function destinationFillBuffer(this: InternalSource) { (this._destination as any)._fillBuffer(); } -export class SynchronousTransformIterator extends AsyncIterator { +export abstract class SynchronousTransformIterator extends AsyncIterator { protected _source: AsyncIterator; + get fastInfo(): Transform | false { + return false; + } + constructor(source: AsyncIterator) { /* eslint-disable no-use-before-define */ super(); @@ -1315,6 +1327,40 @@ export class SynchronousTransformIterator extends AsyncIterator { super._destroy(cause, callback); this._source.destroy(cause); } + + map(map: (item: D) => T, self?: any): AsyncIterator { + const next = this.fastInfo; + if (!next) + return super.map(map, self); + return new MultiMapFilterTransformIterator(this._source, { + filter: false, + function: self ? map.bind(self) : map, + next, + }); + } + + filter(filter: (item: D) => boolean, self?: any): AsyncIterator { + const next = this.fastInfo; + if (!next) + return super.filter(filter, self); + return new MultiMapFilterTransformIterator(this._source, { + filter: true, + function: self ? filter.bind(self) : filter, + next, + }); + } + + maybeMap(map: (item: D) => T | null, self?: any): AsyncIterator { + const next = this.fastInfo; + if (!next) + return super.maybeMap(map, self); + return new MultiMapFilterTransformIterator(this._source, { + filter: false, + nullable: true, + function: self ? map.bind(self) : map, + next, + }); + } } export class MultiMappingIterator extends SynchronousTransformIterator { @@ -1343,6 +1389,32 @@ export class MultiMappingIterator extends SynchronousTransformIterator } } +export class MaybeMappingIterator extends SynchronousTransformIterator { + protected readonly _map: (item: S) => D | null; + + constructor(source: AsyncIterator, map: (item: S) => D | null) { + super(source); + this._map = map; + } + + get fastInfo(): Transform { + return { + filter: false, + nullable: true, + function: this._map, + }; + } + + read(): D | null { + let item; + while ((item = this._source.read()) !== null) { + if ((item = this._map(item)) !== null) + return item; + } + return null; + } +} + export class MappingIterator extends SynchronousTransformIterator { protected readonly _map: (item: S) => D; @@ -1351,34 +1423,19 @@ export class MappingIterator extends SynchronousTransformIterator(map: (item: D) => T, self?: any): AsyncIterator { - return new MultiMapFilterTransformIterator(this._source, { - filter: false, - function: self ? map.bind(self) : map, - next: { - filter: false, - function: this._map, - }, - }); - } - - filter(filter: (item: D) => boolean, self?: any): AsyncIterator { - return new MultiMapFilterTransformIterator(this._source, { - filter: true, - function: self ? filter.bind(self) : filter, - next: { - filter: false, - function: this._map, - }, - }); - } } export class FilteringIterator extends SynchronousTransformIterator { @@ -1389,6 +1446,13 @@ export class FilteringIterator extends SynchronousTransformIterator { this._filter = filter; } + getInfo(): Transform { + return { + filter: true, + function: this._filter, + }; + } + read(): T | null { let item; while ((item = this._source.read()) !== null) { @@ -1397,28 +1461,6 @@ export class FilteringIterator extends SynchronousTransformIterator { } return null; } - - map(map: (item: T) => D, self?: any): AsyncIterator { - return new MultiMapFilterTransformIterator(this._source, { - filter: false, - function: self ? map.bind(self) : map, - next: { - filter: true, - function: this._filter, - }, - }); - } - - filter(filter: (item: T) => boolean, self?: any): AsyncIterator { - return new MultiMapFilterTransformIterator(this._source, { - filter: true, - function: self ? filter.bind(self) : filter, - next: { - filter: true, - function: this._filter, - }, - }); - } } export class SkippingIterator extends SynchronousTransformIterator { @@ -1469,6 +1511,7 @@ export class LimitingIterator extends SynchronousTransformIterator { interface Transform { filter: boolean, + nullable?: true, function: Function, next?: Transform } @@ -1491,12 +1534,16 @@ export class MultiMapFilterTransformIterator extends SynchronousTransf func as any; while ((_transforms = _transforms!.next) !== undefined) { - const { filter: _filter, function: _func } = _transforms; + const { filter: _filter, function: _func, nullable: _nullable } = _transforms; const t = this._transformation!; + // eslint-disable-next-line no-nested-ternary this._transformation = _filter ? - (item: any) => _func(item) ? t(item) : null : - (item: any) => t(_func(item)); + (item: any) => _func(item) ? t(item) : null : ( + _nullable === true ? + (item: any) => ((item = _func(item)) === null ? null : t(item)) : + (item: any) => t(_func(item)) + ); } } return this._transformation!(_item); @@ -1526,6 +1573,15 @@ export class MultiMapFilterTransformIterator extends SynchronousTransf next: this.transforms, }); } + + maybeMap(map: (item: D) => T | null, self?: any): AsyncIterator { + return new MultiMapFilterTransformIterator(this._source, { + filter: false, + nullable: true, + function: self ? map.bind(self) : map, + next: this.transforms, + }); + } } /** diff --git a/test/AsyncIterator-test.js b/test/AsyncIterator-test.js index d7527b1..5f0dfef 100644 --- a/test/AsyncIterator-test.js +++ b/test/AsyncIterator-test.js @@ -1324,6 +1324,15 @@ describe('AsyncIterator', () => { it('Should apply maps that doubles correctly', async () => { (await iterator.map(x => x * 2).toArray()).should.deep.equal([0, 2, 4]); }); + it('Should apply maps that doubles correctly', async () => { + (await iterator.map(x => x * 2).maybeMap(x => x === 2 ? null : x * 3).toArray()).should.deep.equal([0, 12]); + }); + it('Should apply maps that doubles correctly', async () => { + (await iterator.maybeMap(x => x === 2 ? null : x * 3).toArray()).should.deep.equal([0, 3]); + }); + it('Should apply maps that doubles correctly', async () => { + (await iterator.maybeMap(x => x === 2 ? null : x * 3).maybeMap(x => x === 0 ? null : x * 3).toArray()).should.deep.equal([9]); + }); it('Should apply maps that doubles correctly', async () => { (await iterator.map(x => `x${x}`).toArray()).should.deep.equal(['x0', 'x1', 'x2']); }); diff --git a/test/IteratorIterator-test.js b/test/IteratorIterator-test.js index 868fc67..169ddeb 100644 --- a/test/IteratorIterator-test.js +++ b/test/IteratorIterator-test.js @@ -1,13 +1,29 @@ import { + fromIterator, + fromIterable, IteratorIterator, } from '../dist/asynciterator.js'; +function *testYield() { + yield 1; + yield 2; + yield 3; +} + describe('IteratorIterator', () => { it('Should wrap correctly', async () => { - (await new IteratorIterator((function * () { - yield 1; - yield 2; - yield 3; - })()).toArray()).should.deep.equal([1, 2, 3]); + (await new IteratorIterator(testYield()).toArray()).should.deep.equal([1, 2, 3]); + }); +}); + +describe('fromIterator', () => { + it('Should wrap correctly', async () => { + (await fromIterator(testYield()).toArray()).should.deep.equal([1, 2, 3]); + }); +}); + +describe('fromIterable', () => { + it('Should wrap correctly', async () => { + (await fromIterable({ [Symbol.iterator]: testYield }).toArray()).should.deep.equal([1, 2, 3]); }); }); diff --git a/test/WrappingIterator-test.js b/test/WrappingIterator-test.js index aef904c..d8fc2b1 100644 --- a/test/WrappingIterator-test.js +++ b/test/WrappingIterator-test.js @@ -1,4 +1,4 @@ -import { AsyncIterator, ArrayIterator, WrappingIterator } from '../dist/asynciterator.js'; +import { AsyncIterator, ArrayIterator, WrappingIterator, wrap, fromArray } from '../dist/asynciterator.js'; import { EventEmitter } from 'events'; describe('WrappingIterator', () => { @@ -86,4 +86,25 @@ describe('WrappingIterator', () => { }); }); }); + describe('source with read, on and iterable methods', () => { + let obj; + + beforeEach(() => { + obj = fromArray([1]); + obj[Symbol.iterator] = function * () { + yield 'x'; + yield 'y'; + }; + }); + + it('should prioritize the read method', async () => { + (await wrap(obj).toArray()).should.deep.equal([1]); + }); + it('should use the iterator when correctly set-up', async () => { + (await wrap(obj, { prioritizeIterable: true }).toArray()).should.deep.equal(['x', 'y']); + }); + it('wrapping should produce a new object', async () => { + wrap(obj).should.not.equal(obj); + }); + }); }); From 7fa04c47646cdcc4651acbcb33aeea8e4326f7c2 Mon Sep 17 00:00:00 2001 From: Jacopo Scazzosi Date: Sun, 3 Apr 2022 19:53:56 +0200 Subject: [PATCH 26/34] drops SyncUnionIterator as per plan defined in https://github.com/RubenVerborgh/AsyncIterator/issues/44#issuecomment-1086820890 --- asynciterator.ts | 95 ------------------------------------------------ 1 file changed, 95 deletions(-) diff --git a/asynciterator.ts b/asynciterator.ts index 1a4661b..c177245 100644 --- a/asynciterator.ts +++ b/asynciterator.ts @@ -1840,101 +1840,6 @@ export class MultiTransformIterator extends TransformIterator { } } -export class SyncUnionIterator extends AsyncIterator { - private _sources : InternalSource[] = []; - private _pending? : { sources?: AsyncIterator>> }; - private _currentSource = 0; - - /** - Creates a new `SyncUnionIterator`. - @param {module:asynciterator.AsyncIterator|Array} [sources] The sources to read from - @param {object} [options] Settings of the iterator - */ - constructor(sources: AsyncIteratorOrArray> | - AsyncIteratorOrArray>> | - AsyncIteratorOrArray>>) { - super(); - - // Sources have been passed as an iterator - if (isEventEmitter(sources)) { - sources.on('error', error => this.emit('error', error)); - this._pending = { sources: sources as AsyncIterator>> }; - this._loadSources(); - } - // Sources have been passed as a non-empty array - else if (Array.isArray(sources) && sources.length > 0) { - for (const source of sources) - this._addSource(source as MaybePromise>); - } - } - - // Loads sources passed as an iterator - protected _loadSources() { - // Obtain sources iterator - const sources = this._pending!.sources!; - delete this._pending!.sources; - - // Close immediately if done - if (sources.done) { - delete this._pending; - this.close(); - } - // Otherwise, set up source reading - else { - sources.on('data', source => { - this._addSource(source as MaybePromise>); - }); - sources.on('end', () => { - delete this._pending; - if (this._sources.length === 0) - this.close(); - }); - } - } - - // Adds the given source to the internal sources array - public _addSource(source: MaybePromise>) { - if (isPromise(source)) - source = wrap(source) as any as InternalSource; - if (!source.done) { - this._sources.push(source); - source._destination = this; - source.on('error', destinationEmitError); - source.on('readable', () => { - this.readable = true; - }); - source.on('end', destinationRemoveEmptySources); - } - } - - // Removes sources that will no longer emit items - protected _removeEmptySources() { - this._sources = this._sources.filter((source, index) => { - // Adjust the index of the current source if needed - if (source.done && index <= this._currentSource) - this._currentSource--; - return !source.done; - }); - if (!this._pending && this._sources.length === 0) - this.close(); - } - - // Reads items from the next sources - public read(): T | null { - for (let i = 0; i < this._sources.length; i++) { - // this._currentSource = (this._currentSource + 1) % this._sources.length; - const item = this._sources[this._currentSource].read(); - // Attempt to read an item from that source - if (item !== null) - return item; - this._currentSource = (this._currentSource + 1) % this._sources.length; - // this._currentSource += 1 - } - - return null; - } -} - /** An iterator that generates items by reading from multiple other iterators. @extends module:asynciterator.BufferedIterator From 7a0901ad4c3779d9e78c2a91171999ad63896731 Mon Sep 17 00:00:00 2001 From: Jacopo Scazzosi Date: Sun, 3 Apr 2022 20:08:55 +0200 Subject: [PATCH 27/34] lazy start of BufferedIterator sources in WrappingIterator and SynchronousTransformIterator --- asynciterator.ts | 38 +++++++++++++++++++++++++------------- 1 file changed, 25 insertions(+), 13 deletions(-) diff --git a/asynciterator.ts b/asynciterator.ts index c177245..a312d00 100644 --- a/asynciterator.ts +++ b/asynciterator.ts @@ -1274,15 +1274,17 @@ function destinationFillBuffer(this: InternalSource) { export abstract class SynchronousTransformIterator extends AsyncIterator { protected _source: AsyncIterator; + protected _sourceStarted: boolean; get fastInfo(): Transform | false { return false; } - constructor(source: AsyncIterator) { + protected constructor(source: AsyncIterator) { /* eslint-disable no-use-before-define */ super(); this._source = source; + this._sourceStarted = false; const cleanup = () => { source.removeListener('end', onSourceEnd); source.removeListener('error', onSourceError); @@ -1319,8 +1321,6 @@ export abstract class SynchronousTransformIterator extends AsyncIterat source.on('readable', onSourceReadable); if (source.readable) onSourceReadable(); - else if (source instanceof BufferedIterator) - BufferedIterator.ensureInit(source); } protected _destroy(cause: Error | undefined, callback: (error?: Error) => void) { @@ -1328,6 +1328,15 @@ export abstract class SynchronousTransformIterator extends AsyncIterat this._source.destroy(cause); } + protected _readSource(): S | null { + if (!this._sourceStarted) { + this._sourceStarted = true; + if (this._source instanceof BufferedIterator) + BufferedIterator.ensureInit(this._source); + } + return this._source.read(); + } + map(map: (item: D) => T, self?: any): AsyncIterator { const next = this.fastInfo; if (!next) @@ -1378,7 +1387,7 @@ export class MultiMappingIterator extends SynchronousTransformIterator // eslint-disable-next-line no-constant-condition while (true) { if (!this.generator) { - if ((_item = this._source.read()) === null) + if ((_item = this._readSource()) === null) return null; this.generator = this._map(_item); } @@ -1407,7 +1416,7 @@ export class MaybeMappingIterator extends SynchronousTransformIterator read(): D | null { let item; - while ((item = this._source.read()) !== null) { + while ((item = this._readSource()) !== null) { if ((item = this._map(item)) !== null) return item; } @@ -1431,7 +1440,7 @@ export class MappingIterator extends SynchronousTransformIterator extends SynchronousTransformIterator { read(): T | null { let item; - while ((item = this._source.read()) !== null) { + while ((item = this._readSource()) !== null) { if (this._filter(item)) return item; } @@ -1475,7 +1484,7 @@ export class SkippingIterator extends SynchronousTransformIterator { read(): T | null { let item; - while ((item = this._source.read()) !== null) { + while ((item = this._readSource()) !== null) { if (this._skipped < this._skip) this._skipped += 1; else @@ -1496,7 +1505,7 @@ export class LimitingIterator extends SynchronousTransformIterator { } read(): T | null { - const item = this._source.read(); + const item = this._readSource(); if (item !== null) { if (this._count < this._limit) { this._count += 1; @@ -1551,7 +1560,7 @@ export class MultiMapFilterTransformIterator extends SynchronousTransf read(): D | null { let item; - while ((item = this._source.read()) !== null) { + while ((item = this._readSource()) !== null) { if ((item = this.transformation(item)) !== null) return item; } @@ -2202,9 +2211,11 @@ export const isIterable = (item: { [key: string]: any }): item is Iterable export class WrappingIterator extends AsyncIterator { protected _source?: AsyncIteratorLike; + protected _sourceStarted: boolean; constructor(sourceOrPromise: WrapSource | Promise>, options: WrapOptions = {}) { super(); + this._sourceStarted = false; if (isPromise(sourceOrPromise)) { sourceOrPromise .then(source => { @@ -2264,8 +2275,6 @@ export class WrappingIterator extends AsyncIterator { if (wrappedSource instanceof AsyncIterator && wrappedSource.readable) onSourceReadable(); - else if (wrappedSource instanceof BufferedIterator) - BufferedIterator.ensureInit(wrappedSource); } catch (err) { scheduleTask(() => iterator.emit('error', err)); @@ -2273,8 +2282,11 @@ export class WrappingIterator extends AsyncIterator { } read(): T | null { - if (this._source) + if (this._source) { + if (!this._sourceStarted && this._source instanceof BufferedIterator) + BufferedIterator.ensureInit(this._source); return this._source.read(); + } return null; } } From b354197b9e7a3f1314ccf5d1decc2998351526cb Mon Sep 17 00:00:00 2001 From: Jacopo Scazzosi Date: Sun, 3 Apr 2022 20:21:14 +0200 Subject: [PATCH 28/34] immediately closes SynchronousTransformIterator(s) when the source is already closed --- asynciterator.ts | 2 ++ test/MappingIterator-test.js | 0 2 files changed, 2 insertions(+) create mode 100644 test/MappingIterator-test.js diff --git a/asynciterator.ts b/asynciterator.ts index a312d00..841a495 100644 --- a/asynciterator.ts +++ b/asynciterator.ts @@ -1321,6 +1321,8 @@ export abstract class SynchronousTransformIterator extends AsyncIterat source.on('readable', onSourceReadable); if (source.readable) onSourceReadable(); + else if (source.closed) + setTaskScheduler(() => this.close()); } protected _destroy(cause: Error | undefined, callback: (error?: Error) => void) { diff --git a/test/MappingIterator-test.js b/test/MappingIterator-test.js new file mode 100644 index 0000000..e69de29 From 99a65b07eeca4f57c9045a376b7537e214894604 Mon Sep 17 00:00:00 2001 From: Jacopo Scazzosi Date: Sun, 3 Apr 2022 20:21:31 +0200 Subject: [PATCH 29/34] adds basic tests for MappingIterator --- test/MappingIterator-test.js | 70 ++++++++++++++++++++++++++++++++++++ 1 file changed, 70 insertions(+) diff --git a/test/MappingIterator-test.js b/test/MappingIterator-test.js index e69de29..ea89da9 100644 --- a/test/MappingIterator-test.js +++ b/test/MappingIterator-test.js @@ -0,0 +1,70 @@ +import { + AsyncIterator, + ArrayIterator, + MappingIterator, +} from '../dist/asynciterator.js'; + +import { EventEmitter } from 'events'; + +describe('MappingIterator', () => { + describe('The MappingIterator function', () => { + describe('the result when called with `new`', () => { + let instance; + before(() => { + instance = new MappingIterator(new ArrayIterator([]), item => item); + }); + + it('should be a MappingIterator object', () => { + instance.should.be.an.instanceof(MappingIterator); + }); + + it('should be an AsyncIterator object', () => { + instance.should.be.an.instanceof(AsyncIterator); + }); + + it('should be an EventEmitter object', () => { + instance.should.be.an.instanceof(EventEmitter); + }); + }); + }); + + describe('A MappingIterator', () => { + let iterator, source; + before(() => { + source = new ArrayIterator([0, 1, 2, 3, 4, 5, 6]); + iterator = new MappingIterator(source, item => item * 2); + }); + + describe('when reading items', () => { + const items = []; + before(done => { + iterator.on('data', item => { items.push(item); }); + iterator.on('end', done); + }); + + it('should return items mapped according to the mapping function', () => { + items.should.deep.equal([0, 2, 4, 6, 8, 10, 12]); + }); + }); + }); + + describe('A MappingIterator with a source that emits 0 items', () => { + let iterator, source; + before(() => { + source = new ArrayIterator([]); + iterator = new MappingIterator(source, item => item); + }); + + describe('when reading items', () => { + const items = []; + before(done => { + iterator.on('data', item => { items.push(item); }); + iterator.on('end', done); + }); + + it('should not return any items', () => { + items.should.deep.equal([]); + }); + }); + }); +}); From e956e79b0dc785ce899747699c4d26b76d32cb69 Mon Sep 17 00:00:00 2001 From: Jacopo Scazzosi Date: Sun, 3 Apr 2022 20:54:38 +0200 Subject: [PATCH 30/34] evaluates whether the source is done before readable in SynchronousTransformIterator --- asynciterator.ts | 6 +-- test/FilteringIterator-test.js | 70 ++++++++++++++++++++++++++++++++++ 2 files changed, 73 insertions(+), 3 deletions(-) create mode 100644 test/FilteringIterator-test.js diff --git a/asynciterator.ts b/asynciterator.ts index 841a495..6db2bf7 100644 --- a/asynciterator.ts +++ b/asynciterator.ts @@ -1319,10 +1319,10 @@ export abstract class SynchronousTransformIterator extends AsyncIterat source.on('end', onSourceEnd); source.on('error', onSourceError); source.on('readable', onSourceReadable); - if (source.readable) + if (source.done) + onSourceEnd(); + else if (source.readable) onSourceReadable(); - else if (source.closed) - setTaskScheduler(() => this.close()); } protected _destroy(cause: Error | undefined, callback: (error?: Error) => void) { diff --git a/test/FilteringIterator-test.js b/test/FilteringIterator-test.js new file mode 100644 index 0000000..ea89da9 --- /dev/null +++ b/test/FilteringIterator-test.js @@ -0,0 +1,70 @@ +import { + AsyncIterator, + ArrayIterator, + MappingIterator, +} from '../dist/asynciterator.js'; + +import { EventEmitter } from 'events'; + +describe('MappingIterator', () => { + describe('The MappingIterator function', () => { + describe('the result when called with `new`', () => { + let instance; + before(() => { + instance = new MappingIterator(new ArrayIterator([]), item => item); + }); + + it('should be a MappingIterator object', () => { + instance.should.be.an.instanceof(MappingIterator); + }); + + it('should be an AsyncIterator object', () => { + instance.should.be.an.instanceof(AsyncIterator); + }); + + it('should be an EventEmitter object', () => { + instance.should.be.an.instanceof(EventEmitter); + }); + }); + }); + + describe('A MappingIterator', () => { + let iterator, source; + before(() => { + source = new ArrayIterator([0, 1, 2, 3, 4, 5, 6]); + iterator = new MappingIterator(source, item => item * 2); + }); + + describe('when reading items', () => { + const items = []; + before(done => { + iterator.on('data', item => { items.push(item); }); + iterator.on('end', done); + }); + + it('should return items mapped according to the mapping function', () => { + items.should.deep.equal([0, 2, 4, 6, 8, 10, 12]); + }); + }); + }); + + describe('A MappingIterator with a source that emits 0 items', () => { + let iterator, source; + before(() => { + source = new ArrayIterator([]); + iterator = new MappingIterator(source, item => item); + }); + + describe('when reading items', () => { + const items = []; + before(done => { + iterator.on('data', item => { items.push(item); }); + iterator.on('end', done); + }); + + it('should not return any items', () => { + items.should.deep.equal([]); + }); + }); + }); +}); From 1d1cfc47ce8d2bf196106501e4a4edb854298162 Mon Sep 17 00:00:00 2001 From: Jacopo Scazzosi Date: Sun, 3 Apr 2022 20:55:23 +0200 Subject: [PATCH 31/34] adds basic tests for FilteringIterator --- test/FilteringIterator-test.js | 38 ++++++++++++++-------------------- test/MappingIterator-test.js | 18 +++++----------- 2 files changed, 20 insertions(+), 36 deletions(-) diff --git a/test/FilteringIterator-test.js b/test/FilteringIterator-test.js index ea89da9..f9d85ef 100644 --- a/test/FilteringIterator-test.js +++ b/test/FilteringIterator-test.js @@ -1,21 +1,21 @@ import { AsyncIterator, ArrayIterator, - MappingIterator, + FilteringIterator, } from '../dist/asynciterator.js'; import { EventEmitter } from 'events'; -describe('MappingIterator', () => { - describe('The MappingIterator function', () => { +describe('FilteringIterator', () => { + describe('The FilteringIterator function', () => { describe('the result when called with `new`', () => { let instance; before(() => { - instance = new MappingIterator(new ArrayIterator([]), item => item); + instance = new FilteringIterator(new ArrayIterator([]), item => true); }); - it('should be a MappingIterator object', () => { - instance.should.be.an.instanceof(MappingIterator); + it('should be a FilteringIterator object', () => { + instance.should.be.an.instanceof(FilteringIterator); }); it('should be an AsyncIterator object', () => { @@ -28,11 +28,11 @@ describe('MappingIterator', () => { }); }); - describe('A MappingIterator', () => { + describe('A FilteringIterator', () => { let iterator, source; before(() => { source = new ArrayIterator([0, 1, 2, 3, 4, 5, 6]); - iterator = new MappingIterator(source, item => item * 2); + iterator = new FilteringIterator(source, item => item % 2 === 0); }); describe('when reading items', () => { @@ -43,27 +43,19 @@ describe('MappingIterator', () => { }); it('should return items mapped according to the mapping function', () => { - items.should.deep.equal([0, 2, 4, 6, 8, 10, 12]); + items.should.deep.equal([0, 2, 4, 6]); }); }); }); - describe('A MappingIterator with a source that emits 0 items', () => { - let iterator, source; - before(() => { - source = new ArrayIterator([]); - iterator = new MappingIterator(source, item => item); - }); - - describe('when reading items', () => { + describe('A FilteringIterator with a source that emits 0 items', () => { + it('should not return any items', done => { const items = []; - before(done => { - iterator.on('data', item => { items.push(item); }); - iterator.on('end', done); - }); - - it('should not return any items', () => { + const iterator = new FilteringIterator(new ArrayIterator([]), () => true); + iterator.on('data', item => { items.push(item); }); + iterator.on('end', () => { items.should.deep.equal([]); + done(); }); }); }); diff --git a/test/MappingIterator-test.js b/test/MappingIterator-test.js index ea89da9..5f5ae2d 100644 --- a/test/MappingIterator-test.js +++ b/test/MappingIterator-test.js @@ -49,21 +49,13 @@ describe('MappingIterator', () => { }); describe('A MappingIterator with a source that emits 0 items', () => { - let iterator, source; - before(() => { - source = new ArrayIterator([]); - iterator = new MappingIterator(source, item => item); - }); - - describe('when reading items', () => { + it('should not return any items', done => { const items = []; - before(done => { - iterator.on('data', item => { items.push(item); }); - iterator.on('end', done); - }); - - it('should not return any items', () => { + const iterator = new MappingIterator(new ArrayIterator([]), item => item); + iterator.on('data', item => { items.push(item); }); + iterator.on('end', () => { items.should.deep.equal([]); + done(); }); }); }); From f18e28c747e1c37cc2c81e6cbbf185d0fa650946 Mon Sep 17 00:00:00 2001 From: Jacopo Scazzosi Date: Sun, 3 Apr 2022 20:59:12 +0200 Subject: [PATCH 32/34] adds basic tests for LimitingIterator --- test/LimitingIterator-test.js | 86 +++++++++++++++++++++++++++++++++++ 1 file changed, 86 insertions(+) create mode 100644 test/LimitingIterator-test.js diff --git a/test/LimitingIterator-test.js b/test/LimitingIterator-test.js new file mode 100644 index 0000000..2f8601b --- /dev/null +++ b/test/LimitingIterator-test.js @@ -0,0 +1,86 @@ +import { + AsyncIterator, + ArrayIterator, + LimitingIterator, +} from '../dist/asynciterator.js'; + +import { EventEmitter } from 'events'; + +describe('LimitingIterator', () => { + describe('The LimitingIterator function', () => { + describe('the result when called with `new`', () => { + let instance; + before(() => { + instance = new LimitingIterator(new ArrayIterator([]), 10); + }); + + it('should be a LimitingIterator object', () => { + instance.should.be.an.instanceof(LimitingIterator); + }); + + it('should be an AsyncIterator object', () => { + instance.should.be.an.instanceof(AsyncIterator); + }); + + it('should be an EventEmitter object', () => { + instance.should.be.an.instanceof(EventEmitter); + }); + }); + }); + + describe('A LimitingIterator', () => { + let iterator, source; + before(() => { + source = new ArrayIterator([0, 1, 2, 3, 4, 5, 6]); + iterator = new LimitingIterator(source, 4); + }); + + describe('when reading items', () => { + const items = []; + before(done => { + iterator.on('data', item => { items.push(item); }); + iterator.on('end', done); + }); + + it('should return items limited to the specified limit', () => { + items.should.deep.equal([0, 1, 2, 3]); + }); + }); + }); + + describe('A LimitingIterator with a source that emits 0 items', () => { + it('should not return any items', done => { + const items = []; + const iterator = new LimitingIterator(new ArrayIterator([]), 10); + iterator.on('data', item => { items.push(item); }); + iterator.on('end', () => { + items.should.deep.equal([]); + done(); + }); + }); + }); + + describe('A LimitingIterator with a limit of 0 items', () => { + it('should not emit any items', done => { + const items = []; + const iterator = new LimitingIterator(new ArrayIterator([0, 1, 2]), 0); + iterator.on('data', item => { items.push(item); }); + iterator.on('end', () => { + items.should.deep.equal([]); + done(); + }); + }); + }); + + describe('A LimitingIterator with a limit of Infinity items', () => { + it('should emit all items', done => { + const items = []; + const iterator = new LimitingIterator(new ArrayIterator([0, 1, 2, 3, 4, 5, 6]), Infinity); + iterator.on('data', item => { items.push(item); }); + iterator.on('end', () => { + items.should.deep.equal([0, 1, 2, 3, 4, 5, 6]); + done(); + }); + }); + }); +}); From e08f560528772944d1ef58bf1b56e74800b7bddf Mon Sep 17 00:00:00 2001 From: Jacopo Scazzosi Date: Sun, 3 Apr 2022 21:01:40 +0200 Subject: [PATCH 33/34] adds basic tests for SkippingIterator --- test/SkippingIterator-test.js | 86 +++++++++++++++++++++++++++++++++++ 1 file changed, 86 insertions(+) create mode 100644 test/SkippingIterator-test.js diff --git a/test/SkippingIterator-test.js b/test/SkippingIterator-test.js new file mode 100644 index 0000000..c1f912a --- /dev/null +++ b/test/SkippingIterator-test.js @@ -0,0 +1,86 @@ +import { + AsyncIterator, + ArrayIterator, + SkippingIterator, +} from '../dist/asynciterator.js'; + +import { EventEmitter } from 'events'; + +describe('SkippingIterator', () => { + describe('The SkippingIterator function', () => { + describe('the result when called with `new`', () => { + let instance; + before(() => { + instance = new SkippingIterator(new ArrayIterator([]), 10); + }); + + it('should be a SkippingIterator object', () => { + instance.should.be.an.instanceof(SkippingIterator); + }); + + it('should be an AsyncIterator object', () => { + instance.should.be.an.instanceof(AsyncIterator); + }); + + it('should be an EventEmitter object', () => { + instance.should.be.an.instanceof(EventEmitter); + }); + }); + }); + + describe('A SkippingIterator', () => { + let iterator, source; + before(() => { + source = new ArrayIterator([0, 1, 2, 3, 4, 5, 6]); + iterator = new SkippingIterator(source, 4); + }); + + describe('when reading items', () => { + const items = []; + before(done => { + iterator.on('data', item => { items.push(item); }); + iterator.on('end', done); + }); + + it('should return items skipping the specified amount', () => { + items.should.deep.equal([4, 5, 6]); + }); + }); + }); + + describe('A SkippingIterator with a source that emits 0 items', () => { + it('should not return any items', done => { + const items = []; + const iterator = new SkippingIterator(new ArrayIterator([]), 10); + iterator.on('data', item => { items.push(item); }); + iterator.on('end', () => { + items.should.deep.equal([]); + done(); + }); + }); + }); + + describe('A SkippingIterator with a limit of 0 items', () => { + it('should emit all items', done => { + const items = []; + const iterator = new SkippingIterator(new ArrayIterator([0, 1, 2, 3, 4, 5, 6]), 0); + iterator.on('data', item => { items.push(item); }); + iterator.on('end', () => { + items.should.deep.equal([0, 1, 2, 3, 4, 5, 6]); + done(); + }); + }); + }); + + describe('A SkippingIterator with a limit of Infinity items', () => { + it('should skip all items', done => { + const items = []; + const iterator = new SkippingIterator(new ArrayIterator([0, 1, 2, 3, 4, 5, 6]), Infinity); + iterator.on('data', item => { items.push(item); }); + iterator.on('end', () => { + items.should.deep.equal([]); + done(); + }); + }); + }); +}); From 37891775354f66471c0296839e7a39e975c40508 Mon Sep 17 00:00:00 2001 From: Jacopo Scazzosi Date: Sun, 3 Apr 2022 21:26:02 +0200 Subject: [PATCH 34/34] adds basic tests for SynchronousTransformIterator, increases coverage for WrappingIterator --- test/SynchronousTransformIterator-test.js | 83 +++++++++++++++++++++++ test/WrappingIterator-test.js | 35 ++++++++++ 2 files changed, 118 insertions(+) create mode 100644 test/SynchronousTransformIterator-test.js diff --git a/test/SynchronousTransformIterator-test.js b/test/SynchronousTransformIterator-test.js new file mode 100644 index 0000000..4c6d347 --- /dev/null +++ b/test/SynchronousTransformIterator-test.js @@ -0,0 +1,83 @@ +import { + AsyncIterator, + ArrayIterator, + SynchronousTransformIterator, +} from '../dist/asynciterator.js'; + +import { EventEmitter } from 'events'; + +class _SynchronousTransformIterator extends SynchronousTransformIterator { + read() { + return this._readSource(); + } +} + +describe('SynchronousTransformIterator', () => { + describe('The SynchronousTransformIterator function', () => { + describe('the result when called with `new`', () => { + let instance; + before(() => { + instance = new _SynchronousTransformIterator(new ArrayIterator([])); + }); + + it('should be a SynchronousTransformIterator object', () => { + instance.should.be.an.instanceof(SynchronousTransformIterator); + }); + + it('should be an AsyncIterator object', () => { + instance.should.be.an.instanceof(AsyncIterator); + }); + + it('should be an EventEmitter object', () => { + instance.should.be.an.instanceof(EventEmitter); + }); + }); + }); + + describe('A SynchronousTransformIterator', () => { + let iterator, source; + before(() => { + source = new ArrayIterator([0, 1, 2, 3, 4, 5, 6]); + iterator = new _SynchronousTransformIterator(source); + }); + + describe('when reading items', () => { + const items = []; + before(done => { + iterator.on('data', item => { items.push(item); }); + iterator.on('end', done); + }); + + it('should return all items', () => { + items.should.deep.equal([0, 1, 2, 3, 4, 5, 6]); + }); + }); + }); + + describe('A SynchronousTransformIterator with a source that emits 0 items', () => { + it('should not return any items', done => { + const items = []; + const iterator = new _SynchronousTransformIterator(new ArrayIterator([])); + iterator.on('data', item => { items.push(item); }); + iterator.on('end', () => { + items.should.deep.equal([]); + done(); + }); + }); + }); + + describe('A SynchronousTransformIterator with a source that is already ended', () => { + it('should not return any items', done => { + const items = []; + const source = new ArrayIterator([]); + source.on('end', () => { + const iterator = new _SynchronousTransformIterator(source); + iterator.on('data', item => { items.push(item); }); + iterator.on('end', () => { + items.should.deep.equal([]); + done(); + }); + }); + }); + }); +}); diff --git a/test/WrappingIterator-test.js b/test/WrappingIterator-test.js index d8fc2b1..d187567 100644 --- a/test/WrappingIterator-test.js +++ b/test/WrappingIterator-test.js @@ -107,4 +107,39 @@ describe('WrappingIterator', () => { wrap(obj).should.not.equal(obj); }); }); + describe('source that emits an error', () => { + it('relay the error', done => { + const err = new Error('some error'); + const source = new ArrayIterator([0, 1, 2, 3]); + const iterator = new WrappingIterator(source); + iterator.on('error', iteratorErr => { + expect(iteratorErr).to.equal(err); + done(); + }); + source.emit('error', err); + }); + }); + describe('promise of a source that rejects', () => { + it('emit the error', done => { + const err = new Error('some error'); + const iterator = new WrappingIterator(Promise.reject(err)); + iterator.on('error', iteratorErr => { + expect(iteratorErr).to.equal(err); + done(); + }); + }); + }); + describe('promise of a source', () => { + it('read null until the promise resolves', done => { + const promise = new Promise(resolve => { + setTimeout(() => { + resolve(new ArrayIterator([0, 1, 2, 3])); + }, 10); + }); + const iterator = new WrappingIterator(promise); + expect(iterator.readable).to.equal(false); + expect(iterator.read()).to.equal(null); + done(); + }); + }); });