-
Notifications
You must be signed in to change notification settings - Fork 8
Add MappingIterator for synchronous transformations #59
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -25,6 +25,11 @@ export function setTaskScheduler(scheduler: TaskScheduler): void { | |
| taskScheduler = scheduler; | ||
| } | ||
|
|
||
| /** Binds a function to an object */ | ||
| function bind(fn: Function, self?: object) { | ||
| return self ? fn.bind(self) : fn; | ||
| } | ||
|
|
||
| /** | ||
| ID of the INIT state. | ||
| An iterator is initializing if it is preparing main item generation. | ||
|
|
@@ -161,7 +166,7 @@ export class AsyncIterator<T> extends EventEmitter { | |
| @param {object?} self The `this` pointer for the callback | ||
| */ | ||
| forEach(callback: (item: T) => void, self?: object) { | ||
| this.on('data', self ? callback.bind(self) : callback); | ||
| this.on('data', bind(callback, self)); | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -451,12 +456,13 @@ export class AsyncIterator<T> extends EventEmitter { | |
| /** | ||
| Maps items from this iterator using the given function. | ||
| After this operation, only read the returned iterator instead of the current one. | ||
| @param {Function} map A mapping function to call on this iterator's (remaining) items | ||
| @param {Function} map A mapping function to call on this iterator's (remaining) items. | ||
| A `null` value indicates that nothing should be returned for a particular item. | ||
| @param {object?} self The `this` pointer for the mapping function | ||
| @returns {module:asynciterator.AsyncIterator} A new iterator that maps the items from this iterator | ||
| */ | ||
| map<D>(map: (item: T) => D, self?: any): AsyncIterator<D> { | ||
| return this.transform({ map: self ? map.bind(self) : map }); | ||
| map<D>(map: MapFunction<T, D>, self?: any): AsyncIterator<D> { | ||
| return new MappingIterator<T, D>(this, bind(map, self)); | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -469,7 +475,8 @@ export class AsyncIterator<T> extends EventEmitter { | |
| filter<K extends T>(filter: (item: T) => item is K, self?: any): AsyncIterator<K>; | ||
| filter(filter: (item: T) => boolean, self?: any): AsyncIterator<T>; | ||
| filter(filter: (item: T) => boolean, self?: any): AsyncIterator<T> { | ||
| return this.transform({ filter: self ? filter.bind(self) : filter }); | ||
| filter = bind(filter, self); | ||
| return this.map(item => filter(item) ? item : null); | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -510,7 +517,7 @@ export class AsyncIterator<T> extends EventEmitter { | |
| @returns {module:asynciterator.AsyncIterator} A new iterator that skips the given number of items | ||
| */ | ||
| skip(offset: number): AsyncIterator<T> { | ||
| return this.transform({ offset }); | ||
| return this.map(item => offset-- > 0 ? null : item); | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -520,7 +527,7 @@ export class AsyncIterator<T> extends EventEmitter { | |
| @returns {module:asynciterator.AsyncIterator} A new iterator with at most the given number of items | ||
| */ | ||
| take(limit: number): AsyncIterator<T> { | ||
| return this.transform({ limit }); | ||
| return this.map((item, it) => limit-- > 0 ? item : (it.close(), null)); | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -531,7 +538,7 @@ export class AsyncIterator<T> extends EventEmitter { | |
| @returns {module:asynciterator.AsyncIterator} A new iterator with items in the given range | ||
| */ | ||
| range(start: number, end: number): AsyncIterator<T> { | ||
| return this.transform({ offset: start, limit: Math.max(end - start + 1, 0) }); | ||
| return this.skip(start).take(Math.max(end - start + 1, 0)); | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -777,9 +784,121 @@ export class IntegerIterator extends AsyncIterator<number> { | |
| } | ||
| } | ||
|
|
||
| /** | ||
| * A synchronous mapping function from one element to another. | ||
| * The iterator performing the mapping is passed as a second argument. | ||
| */ | ||
| export type MapFunction<S, D = S, I extends AsyncIterator<D> = AsyncIterator<D>> = | ||
| (item: S, iterator: I) => D | null; | ||
|
|
||
| /** | ||
| An iterator that calls a synchronous mapping function | ||
| on every item from its source iterator. | ||
| @extends module:asynciterator.AsyncIterator | ||
| */ | ||
| export class MappingIterator<S, D = S> extends AsyncIterator<D> { | ||
| protected _source: AsyncIterator<S>; | ||
| private readonly _destroySource: boolean; | ||
| private readonly _mappings: MapFunction<any, any, MappingIterator<S, D>>[]; | ||
RubenVerborgh marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| private readonly _mappingRoot: InternalSource<any>; | ||
|
|
||
| // This is wrong: readable should be set by listening to source events | ||
| get readable() { | ||
|
Owner
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This breaks things, because certain |
||
| return (this._state < CLOSED) && this._source.readable; | ||
| } | ||
|
|
||
| /** | ||
| * Applies the given mapping to the source iterator. | ||
| */ | ||
| constructor( | ||
| source: AsyncIterator<S>, | ||
| mapping?: MapFunction<S, D, MappingIterator<S, D>>, | ||
| options?: SourcedIteratorOptions, | ||
| ); | ||
|
|
||
| /** | ||
| * Applies the given list of mappings to the mapping root. | ||
| * | ||
| * This is an optimization for | ||
| * root.map(f1).map(f2).map(f3) | ||
| * where the combined mapping x => f3(f2(f1(x))) | ||
| * is applied to root rather than to the intermediate sources. | ||
| */ | ||
| constructor( | ||
| source: AsyncIterator<S>, | ||
| mappings: MapFunction<any, any, MappingIterator<S, D>>[], | ||
| mappingRoot: AsyncIterator<any>, | ||
| options?: SourcedIteratorOptions, | ||
| ); | ||
|
|
||
| constructor( | ||
| source: AsyncIterator<S>, | ||
| mappings: MapFunction<S, D, MappingIterator<S, D>> | | ||
| MapFunction<any, any, MappingIterator<S, D>>[] = [], | ||
| mappingRoot?: AsyncIterator<any> | SourcedIteratorOptions, | ||
| options: SourcedIteratorOptions = {}, | ||
| ) { | ||
| super(); | ||
| // Resolve optional parameters | ||
| if (!isEventEmitter(mappingRoot)) { | ||
| if (mappingRoot) | ||
| options = mappingRoot; | ||
| mappingRoot = source; | ||
| } | ||
| this._source = source; | ||
| this._mappings = isFunction(mappings) ? [mappings] : mappings; | ||
| this._mappingRoot = mappingRoot as InternalSource<any>; | ||
| this._destroySource = options.destroySource !== false; | ||
|
|
||
| if (mappingRoot.done) { | ||
| this.close(); | ||
| } | ||
| else { | ||
| _validateSource(mappingRoot); | ||
| this._mappingRoot._destination = this; | ||
| this._mappingRoot.on('end', destinationClose); | ||
| this._mappingRoot.on('error', destinationEmitError); | ||
| this._mappingRoot.on('readable', destinationEmitReadable); | ||
| } | ||
| } | ||
|
|
||
| read(): D | null { | ||
| let mapped : any = null; | ||
| while (mapped === null && (mapped = this._source.read()) !== null) { | ||
|
Owner
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's not read the source if we are closed. Need to guard for this. And test. |
||
| for (let i = 0; i < this._mappings.length; i++) { | ||
| mapped = this._mappings[i](mapped, this); | ||
|
Owner
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think you are right but I am still context switching back to this issue so not fully confident yet. |
||
| if (mapped === null) | ||
| break; | ||
| } | ||
| } | ||
| return mapped; | ||
| } | ||
|
|
||
| map<K>(map: MapFunction<D, K>, self?: any): AsyncIterator<K> { | ||
| return new MappingIterator<S, K>(this._source, [...this._mappings, bind(map, self)], this); | ||
|
Owner
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Something is off here.
|
||
| } | ||
|
|
||
| public close() { | ||
| if (this._destroySource) | ||
| this._mappingRoot.destroy(); | ||
| super.close(); | ||
| } | ||
|
|
||
| /* Cleans up the source iterator and ends. */ | ||
| protected _end(destroy: boolean) { | ||
| this._mappingRoot.removeListener('end', destinationClose); | ||
| this._mappingRoot.removeListener('error', destinationEmitError); | ||
| this._mappingRoot.removeListener('readable', destinationEmitReadable); | ||
| delete this._mappingRoot._destination; | ||
| if (this._destroySource) | ||
| this._mappingRoot.destroy(); | ||
| super._end(destroy); | ||
| } | ||
| } | ||
|
|
||
|
|
||
| /** | ||
| A iterator that maintains an internal buffer of items. | ||
| An iterator that maintains an internal buffer of items. | ||
| This class serves as a base class for other iterators | ||
| with a typically complex item generation process. | ||
| @extends module:asynciterator.AsyncIterator | ||
|
|
@@ -1055,6 +1174,14 @@ export class BufferedIterator<T> extends AsyncIterator<T> { | |
| } | ||
| } | ||
|
|
||
| function _validateSource<S>(source?: AsyncIterator<S>, allowDestination = false) { | ||
| if (!source || !isFunction(source.read) || !isFunction(source.on)) | ||
| throw new Error(`Invalid source: ${source}`); | ||
| if (!allowDestination && (source as any)._destination) | ||
| throw new Error('The source already has a destination'); | ||
| return source as InternalSource<S>; | ||
| } | ||
|
|
||
| /** | ||
| An iterator that generates items based on a source iterator. | ||
| This class serves as a base class for other iterators. | ||
|
|
@@ -1153,11 +1280,7 @@ export class TransformIterator<S, D = S> extends BufferedIterator<D> { | |
| protected _validateSource(source?: AsyncIterator<S>, allowDestination = false) { | ||
| if (this._source || typeof this._createSource !== 'undefined') | ||
| throw new Error('The source cannot be changed after it has been set'); | ||
| if (!source || !isFunction(source.read) || !isFunction(source.on)) | ||
| throw new Error(`Invalid source: ${source}`); | ||
| if (!allowDestination && (source as any)._destination) | ||
| throw new Error('The source already has a destination'); | ||
| return source as InternalSource<S>; | ||
| return _validateSource(source, allowDestination); | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -1240,9 +1363,15 @@ export class TransformIterator<S, D = S> extends BufferedIterator<D> { | |
| } | ||
| } | ||
|
|
||
| function destinationEmitReadable<S>(this: InternalSource<S>) { | ||
| this._destination!.emit('readable'); | ||
| } | ||
| function destinationEmitError<S>(this: InternalSource<S>, error: Error) { | ||
| this._destination!.emit('error', error); | ||
| } | ||
| function destinationClose<S>(this: InternalSource<S>) { | ||
| this._destination!.close(); | ||
| } | ||
| function destinationCloseWhenDone<S>(this: InternalSource<S>) { | ||
| (this._destination as any)._closeWhenDone(); | ||
| } | ||
|
|
@@ -1946,15 +2075,18 @@ function isSourceExpression<T>(object: any): object is SourceExpression<T> { | |
| return object && (isEventEmitter(object) || isPromise(object) || isFunction(object)); | ||
| } | ||
|
|
||
| export interface SourcedIteratorOptions { | ||
| destroySource?: boolean; | ||
| } | ||
|
|
||
| export interface BufferedIteratorOptions { | ||
| maxBufferSize?: number; | ||
| autoStart?: boolean; | ||
| } | ||
|
|
||
| export interface TransformIteratorOptions<S> extends BufferedIteratorOptions { | ||
| export interface TransformIteratorOptions<S> extends SourcedIteratorOptions, BufferedIteratorOptions { | ||
| source?: SourceExpression<S>; | ||
| optional?: boolean; | ||
| destroySource?: boolean; | ||
| } | ||
|
|
||
| export interface TransformOptions<S, D> extends TransformIteratorOptions<S> { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is the correct one, but that leads to errors below.