Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ jobs:
if: ${{ matrix.node-version != '10.x' }}
- run: npx c8 --reporter=lcov mocha
- uses: coverallsapp/github-action@master
if: ${{ matrix.node-version != '10.x' }}
with:
github-token: ${{ secrets.github_token }}
flag-name: run-${{ matrix.node-version }}
Expand Down
164 changes: 148 additions & 16 deletions asynciterator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@ export function setTaskScheduler(scheduler: TaskScheduler): void {
taskScheduler = scheduler;
}

/** Binds a function to an object */
function bind(fn: Function, self?: object) {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
function bind(fn: Function, self?: object) {
function bind<T extends Function>(fn: T, self?: object): T {

is the correct one, but that leads to errors below.

return self ? fn.bind(self) : fn;
}

/**
ID of the INIT state.
An iterator is initializing if it is preparing main item generation.
Expand Down Expand Up @@ -161,7 +166,7 @@ export class AsyncIterator<T> extends EventEmitter {
@param {object?} self The `this` pointer for the callback
*/
forEach(callback: (item: T) => void, self?: object) {
this.on('data', self ? callback.bind(self) : callback);
this.on('data', bind(callback, self));
}

/**
Expand Down Expand Up @@ -451,12 +456,13 @@ export class AsyncIterator<T> extends EventEmitter {
/**
Maps items from this iterator using the given function.
After this operation, only read the returned iterator instead of the current one.
@param {Function} map A mapping function to call on this iterator's (remaining) items
@param {Function} map A mapping function to call on this iterator's (remaining) items.
A `null` value indicates that nothing should be returned for a particular item.
@param {object?} self The `this` pointer for the mapping function
@returns {module:asynciterator.AsyncIterator} A new iterator that maps the items from this iterator
*/
map<D>(map: (item: T) => D, self?: any): AsyncIterator<D> {
return this.transform({ map: self ? map.bind(self) : map });
map<D>(map: MapFunction<T, D>, self?: any): AsyncIterator<D> {
return new MappingIterator<T, D>(this, bind(map, self));
}

/**
Expand All @@ -469,7 +475,8 @@ export class AsyncIterator<T> extends EventEmitter {
filter<K extends T>(filter: (item: T) => item is K, self?: any): AsyncIterator<K>;
filter(filter: (item: T) => boolean, self?: any): AsyncIterator<T>;
filter(filter: (item: T) => boolean, self?: any): AsyncIterator<T> {
return this.transform({ filter: self ? filter.bind(self) : filter });
filter = bind(filter, self);
return this.map(item => filter(item) ? item : null);
}

/**
Expand Down Expand Up @@ -510,7 +517,7 @@ export class AsyncIterator<T> extends EventEmitter {
@returns {module:asynciterator.AsyncIterator} A new iterator that skips the given number of items
*/
skip(offset: number): AsyncIterator<T> {
return this.transform({ offset });
return this.map(item => offset-- > 0 ? null : item);
}

/**
Expand All @@ -520,7 +527,7 @@ export class AsyncIterator<T> extends EventEmitter {
@returns {module:asynciterator.AsyncIterator} A new iterator with at most the given number of items
*/
take(limit: number): AsyncIterator<T> {
return this.transform({ limit });
return this.map((item, it) => limit-- > 0 ? item : (it.close(), null));
}

/**
Expand All @@ -531,7 +538,7 @@ export class AsyncIterator<T> extends EventEmitter {
@returns {module:asynciterator.AsyncIterator} A new iterator with items in the given range
*/
range(start: number, end: number): AsyncIterator<T> {
return this.transform({ offset: start, limit: Math.max(end - start + 1, 0) });
return this.skip(start).take(Math.max(end - start + 1, 0));
}

/**
Expand Down Expand Up @@ -777,9 +784,121 @@ export class IntegerIterator extends AsyncIterator<number> {
}
}

/**
* A synchronous mapping function from one element to another.
* The iterator performing the mapping is passed as a second argument.
*/
export type MapFunction<S, D = S, I extends AsyncIterator<D> = AsyncIterator<D>> =
(item: S, iterator: I) => D | null;

/**
An iterator that calls a synchronous mapping function
on every item from its source iterator.
@extends module:asynciterator.AsyncIterator
*/
export class MappingIterator<S, D = S> extends AsyncIterator<D> {
protected _source: AsyncIterator<S>;
private readonly _destroySource: boolean;
private readonly _mappings: MapFunction<any, any, MappingIterator<S, D>>[];
private readonly _mappingRoot: InternalSource<any>;

// This is wrong: readable should be set by listening to source events
get readable() {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This breaks things, because certain readable events will not be fired. We need to actually set readable = true.

return (this._state < CLOSED) && this._source.readable;
}

/**
* Applies the given mapping to the source iterator.
*/
constructor(
source: AsyncIterator<S>,
mapping?: MapFunction<S, D, MappingIterator<S, D>>,
options?: SourcedIteratorOptions,
);

/**
* Applies the given list of mappings to the mapping root.
*
* This is an optimization for
* root.map(f1).map(f2).map(f3)
* where the combined mapping x => f3(f2(f1(x)))
* is applied to root rather than to the intermediate sources.
*/
constructor(
source: AsyncIterator<S>,
mappings: MapFunction<any, any, MappingIterator<S, D>>[],
mappingRoot: AsyncIterator<any>,
options?: SourcedIteratorOptions,
);

constructor(
source: AsyncIterator<S>,
mappings: MapFunction<S, D, MappingIterator<S, D>> |
MapFunction<any, any, MappingIterator<S, D>>[] = [],
mappingRoot?: AsyncIterator<any> | SourcedIteratorOptions,
options: SourcedIteratorOptions = {},
) {
super();
// Resolve optional parameters
if (!isEventEmitter(mappingRoot)) {
if (mappingRoot)
options = mappingRoot;
mappingRoot = source;
}
this._source = source;
this._mappings = isFunction(mappings) ? [mappings] : mappings;
this._mappingRoot = mappingRoot as InternalSource<any>;
this._destroySource = options.destroySource !== false;

if (mappingRoot.done) {
this.close();
}
else {
_validateSource(mappingRoot);
this._mappingRoot._destination = this;
this._mappingRoot.on('end', destinationClose);
this._mappingRoot.on('error', destinationEmitError);
this._mappingRoot.on('readable', destinationEmitReadable);
}
}

read(): D | null {
let mapped : any = null;
while (mapped === null && (mapped = this._source.read()) !== null) {
Copy link
Owner

@RubenVerborgh RubenVerborgh Jun 24, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's not read the source if we are closed. Need to guard for this. And test.

for (let i = 0; i < this._mappings.length; i++) {
mapped = this._mappings[i](mapped, this);
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The this pointer is not correct here; that should be the MappingIterator at each level.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you are right but I am still context switching back to this issue so not fully confident yet.

if (mapped === null)
break;
}
}
return mapped;
}

map<K>(map: MapFunction<D, K>, self?: any): AsyncIterator<K> {
return new MappingIterator<S, K>(this._source, [...this._mappings, bind(map, self)], this);
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something is off here.

MappingIterator (overload 2) takes as constructor parameters the direct source (should be this), the mappings, and the mapping root (should be this._mappingRoot). Yet the tests do not catch this problem.

}

public close() {
if (this._destroySource)
this._mappingRoot.destroy();
super.close();
}

/* Cleans up the source iterator and ends. */
protected _end(destroy: boolean) {
this._mappingRoot.removeListener('end', destinationClose);
this._mappingRoot.removeListener('error', destinationEmitError);
this._mappingRoot.removeListener('readable', destinationEmitReadable);
delete this._mappingRoot._destination;
if (this._destroySource)
this._mappingRoot.destroy();
super._end(destroy);
}
}


/**
A iterator that maintains an internal buffer of items.
An iterator that maintains an internal buffer of items.
This class serves as a base class for other iterators
with a typically complex item generation process.
@extends module:asynciterator.AsyncIterator
Expand Down Expand Up @@ -1055,6 +1174,14 @@ export class BufferedIterator<T> extends AsyncIterator<T> {
}
}

function _validateSource<S>(source?: AsyncIterator<S>, allowDestination = false) {
if (!source || !isFunction(source.read) || !isFunction(source.on))
throw new Error(`Invalid source: ${source}`);
if (!allowDestination && (source as any)._destination)
throw new Error('The source already has a destination');
return source as InternalSource<S>;
}

/**
An iterator that generates items based on a source iterator.
This class serves as a base class for other iterators.
Expand Down Expand Up @@ -1153,11 +1280,7 @@ export class TransformIterator<S, D = S> extends BufferedIterator<D> {
protected _validateSource(source?: AsyncIterator<S>, allowDestination = false) {
if (this._source || typeof this._createSource !== 'undefined')
throw new Error('The source cannot be changed after it has been set');
if (!source || !isFunction(source.read) || !isFunction(source.on))
throw new Error(`Invalid source: ${source}`);
if (!allowDestination && (source as any)._destination)
throw new Error('The source already has a destination');
return source as InternalSource<S>;
return _validateSource(source, allowDestination);
}

/**
Expand Down Expand Up @@ -1240,9 +1363,15 @@ export class TransformIterator<S, D = S> extends BufferedIterator<D> {
}
}

function destinationEmitReadable<S>(this: InternalSource<S>) {
this._destination!.emit('readable');
}
function destinationEmitError<S>(this: InternalSource<S>, error: Error) {
this._destination!.emit('error', error);
}
function destinationClose<S>(this: InternalSource<S>) {
this._destination!.close();
}
function destinationCloseWhenDone<S>(this: InternalSource<S>) {
(this._destination as any)._closeWhenDone();
}
Expand Down Expand Up @@ -1946,15 +2075,18 @@ function isSourceExpression<T>(object: any): object is SourceExpression<T> {
return object && (isEventEmitter(object) || isPromise(object) || isFunction(object));
}

export interface SourcedIteratorOptions {
destroySource?: boolean;
}

export interface BufferedIteratorOptions {
maxBufferSize?: number;
autoStart?: boolean;
}

export interface TransformIteratorOptions<S> extends BufferedIteratorOptions {
export interface TransformIteratorOptions<S> extends SourcedIteratorOptions, BufferedIteratorOptions {
source?: SourceExpression<S>;
optional?: boolean;
destroySource?: boolean;
}

export interface TransformOptions<S, D> extends TransformIteratorOptions<S> {
Expand Down
4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
"scripts": {
"build": "npm run build:clean && npm run build:module && npm run build:commonjs && npm run build:types",
"build:clean": "rm -rf dist",
"build:module": " tsc --module es2015 && mv dist/ts-out/*.js dist && npm run build:module:import",
"build:module:import": " sed -i'.bak' -e 's/\\.\\/linkedlist/.\\/linkedlist.js/' -e 's/\\.\\/taskscheduler/.\\/taskscheduler.js/' dist/asynciterator.js && rm dist/*.bak",
"build:module": "tsc && mv dist/ts-out/*.js dist && npm run build:module:import",
"build:module:import": "sed -i'.bak' -e 's/\\.\\/linkedlist/.\\/linkedlist.js/' -e 's/\\.\\/taskscheduler/.\\/taskscheduler.js/' dist/asynciterator.js && rm dist/*.bak",
"build:commonjs": "tsc --module commonjs && ./.change-extension cjs dist/ts-out/*.js && mv dist/ts-out/*.cjs dist && npm run build:commonjs:import",
"build:commonjs:import": "sed -i'.bak' -e 's/\\.\\/linkedlist/.\\/linkedlist.cjs/' -e 's/\\.\\/taskscheduler/.\\/taskscheduler.cjs/' dist/asynciterator.cjs && rm dist/*.bak",
"build:types": "tsc -d && rm dist/ts-out/*.js && mv dist/ts-out/*.d.ts dist",
Expand Down
Loading