Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
e43d0ee
feat: faster transforming
jeswr Apr 4, 2022
538525e
chore: lint fix
jeswr Apr 4, 2022
d6681f2
chore: remove syncTransform method
jeswr Apr 4, 2022
a405079
fix: fix destroysource issues
jeswr Apr 4, 2022
7513984
chore: remove typo
jeswr Apr 4, 2022
df0d54e
Update asynciterator.ts
jeswr Apr 4, 2022
a8dbb6d
Update asynciterator.ts
jeswr Apr 4, 2022
458acf9
chore: re-enable no-labels
jeswr Apr 4, 2022
fe2ccc2
chore: use bind helper
jeswr Apr 4, 2022
129a9da
rename some tests
jeswr Apr 4, 2022
a562356
fix typo in tests and use lower case should
jeswr Apr 4, 2022
4a29d25
chore: use bind function util better
jeswr Apr 4, 2022
a580715
chore: add missing break in test
jeswr Apr 4, 2022
aff246c
chore: add missing break in test
jeswr Apr 4, 2022
e7a0044
chore: add missing break in test
jeswr Apr 4, 2022
5f4d3e9
chore: rename Transform -> ComposedFunction
jeswr Apr 4, 2022
13e30a4
chore: rename LimitingIterator -> HeadIterator
jeswr Apr 4, 2022
69c14a6
chore: remove SynchronousTransformIterator and rename SyncTransformIt…
jeswr Apr 5, 2022
29c3e50
chore: remove SynchronousTransformIterator and rename SyncTransformIt…
jeswr Apr 5, 2022
f1a0676
make readability reflect source
jeswr Apr 5, 2022
b8971c2
chore: clean up MappingIterator
jeswr Apr 5, 2022
8b901a5
chore: style fix
jeswr Apr 5, 2022
72408fc
chore: add readable test
jeswr Apr 5, 2022
79c5df9
chore: call close rather than onSourceEnd in constructor
jeswr Apr 5, 2022
d69ce40
chore: dont add listeners unecessarily
jeswr Apr 5, 2022
bc5d1cc
chore: push lint changes
jeswr Apr 5, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
136 changes: 128 additions & 8 deletions asynciterator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@ export function setTaskScheduler(scheduler: TaskScheduler): void {
taskScheduler = scheduler;
}

/** Binds a function to an object */
function bind(fn: Function, self: any) {
return self ? fn.bind(self) : fn;
}

/**
ID of the INIT state.
An iterator is initializing if it is preparing main item generation.
Expand Down Expand Up @@ -161,7 +166,7 @@ export class AsyncIterator<T> extends EventEmitter {
@param {object?} self The `this` pointer for the callback
*/
forEach(callback: (item: T) => void, self?: object) {
this.on('data', self ? callback.bind(self) : callback);
this.on('data', bind(callback, self));
}

/**
Expand Down Expand Up @@ -451,12 +456,13 @@ export class AsyncIterator<T> extends EventEmitter {
/**
Maps items from this iterator using the given function.
After this operation, only read the returned iterator instead of the current one.
@param {Function} map A mapping function to call on this iterator's (remaining) items
@param {Function} map A mapping function to call on this iterator's (remaining) items.
A `null` value indicates that nothing should be returned for a particular item..
@param {object?} self The `this` pointer for the mapping function
@returns {module:asynciterator.AsyncIterator} A new iterator that maps the items from this iterator
*/
map<D>(map: (item: T) => D, self?: any): AsyncIterator<D> {
return this.transform({ map: self ? map.bind(self) : map });
map<D>(map: (item: T) => D | null, self?: any): AsyncIterator<D> {
return new MappingIterator<T, D>(this, { fn: bind(map, self) });
}

/**
Expand All @@ -469,7 +475,8 @@ export class AsyncIterator<T> extends EventEmitter {
filter<K extends T>(filter: (item: T) => item is K, self?: any): AsyncIterator<K>;
filter(filter: (item: T) => boolean, self?: any): AsyncIterator<T>;
filter(filter: (item: T) => boolean, self?: any): AsyncIterator<T> {
return this.transform({ filter: self ? filter.bind(self) : filter });
filter = bind(filter, self);
return this.map(item => filter(item) ? item : null);
}

/**
Expand Down Expand Up @@ -510,7 +517,7 @@ export class AsyncIterator<T> extends EventEmitter {
@returns {module:asynciterator.AsyncIterator} A new iterator that skips the given number of items
*/
skip(offset: number): AsyncIterator<T> {
return this.transform({ offset });
return this.map(item => offset-- > 0 ? null : item);
}

/**
Expand All @@ -520,7 +527,7 @@ export class AsyncIterator<T> extends EventEmitter {
@returns {module:asynciterator.AsyncIterator} A new iterator with at most the given number of items
*/
take(limit: number): AsyncIterator<T> {
return this.transform({ limit });
return new HeadIterator(this, limit);
}

/**
Expand All @@ -531,7 +538,7 @@ export class AsyncIterator<T> extends EventEmitter {
@returns {module:asynciterator.AsyncIterator} A new iterator with items in the given range
*/
range(start: number, end: number): AsyncIterator<T> {
return this.transform({ offset: start, limit: Math.max(end - start + 1, 0) });
return this.skip(start).take(Math.max(end - start + 1, 0));
}

/**
Expand Down Expand Up @@ -1251,6 +1258,119 @@ function destinationFillBuffer<S>(this: InternalSource<S>) {
(this._destination as any)._fillBuffer();
}

interface ComposedFunction {
fn: Function,
next?: ComposedFunction
}

export class MappingIterator<T, D = T> extends AsyncIterator<D> {
private _fn?: Function;
private _destroySource: boolean;
private onSourceError = (err: Error) => this.emit('error', err);
private onSourceReadable = () => this.emit('readable');
private onSourceEnd = () => this.close();

get readable() {
return this.source.readable;
}

set readable(readable) {
this.source.readable = readable;
}

constructor(
protected source: AsyncIterator<T>,
private transforms?: ComposedFunction,
private upstream: AsyncIterator<any> = source,
options: { destroySource?: boolean } = {}
) {
// Subscribe the iterator directly upstream rather than the original source to avoid over-subscribing
// listeners to the original source
super();
this._destroySource = options.destroySource !== false;
if (upstream.done) {
this.close();
}
else {
upstream.on('end', this.onSourceEnd);
upstream.on('error', this.onSourceError);
upstream.on('readable', this.onSourceReadable);
}
}

get fn() {
if (!this._fn) {
const funcs: Function[] = [];
// eslint-disable-next-line prefer-destructuring
let transforms: ComposedFunction | undefined = this.transforms!;
do
funcs.push(transforms.fn);
// eslint-disable-next-line no-cond-assign
while (transforms = transforms.next);

const endIndex = funcs.length - 1;
this._fn = (item: any) => {
// Do not use a for-of loop here, it slows down transformations
// by approximately a factor of 2.
for (let index = endIndex; index >= 1; index -= 1) {
if ((item = funcs[index](item)) === null)
return null;
}
return funcs[0](item);
};
}
return this._fn;
}

read(): D | null {
const { source, fn } = this;
let item;
while ((item = source.read()) !== null) {
if ((item = fn(item)) !== null)
return item;
}
return null;
}

map<K>(map: (item: D) => K | null, self?: any): AsyncIterator<K> {
return new MappingIterator<T, K>(this.source, { fn: bind(map, self), next: this.transforms }, this);
}

destroy(cause?: Error): void {
this.upstream.destroy(cause);
super.destroy(cause);
}

public close() {
this.upstream.removeListener('end', this.onSourceEnd);
this.upstream.removeListener('error', this.onSourceError);
this.upstream.removeListener('readable', this.onSourceReadable);
if (this._destroySource)
this.upstream.destroy();
scheduleTask(() => {
delete this.source;
});
super.close();
}
}

export class HeadIterator<T> extends MappingIterator<T> {
protected count: number = 0;

constructor(source: AsyncIterator<T>, protected readonly limit: number) {
super(source);
}

read(): T | null {
const item = this.source.read();
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should probably be super.read() so all checks (are we not closed?) are still done.

if (item !== null && this.count < this.limit) {
this.count += 1;
return item;
}
this.close();
return null;
}
}

/**
An iterator that generates items based on a source iterator
Expand Down
Loading