Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
173 changes: 88 additions & 85 deletions asynciterator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -479,7 +479,10 @@ export class AsyncIterator<T> extends EventEmitter {
@returns {module:asynciterator.AsyncIterator} A new iterator that prepends items to this iterator
*/
prepend(items: T[] | AsyncIterator<T>): AsyncIterator<T> {
return this.transform({ prepend: items });
return new UnionIterator(
[Array.isArray(items) ? new ArrayIterator(items, { autoStart: false }) : items, this],
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After #63 is merged, this could become simply wrap(items, { letIteratorThrough: true }) (thus supporting a wider range of types for items).

{ maxBufferSize: 1 }
);
}

/**
Expand All @@ -489,7 +492,10 @@ export class AsyncIterator<T> extends EventEmitter {
@returns {module:asynciterator.AsyncIterator} A new iterator that appends items to this iterator
*/
append(items: T[] | AsyncIterator<T>): AsyncIterator<T> {
return this.transform({ append: items });
return new UnionIterator(
[this, Array.isArray(items) ? new ArrayIterator(items, { autoStart: false }) : items],
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above

{ maxBufferSize: 1 }
);
}

/**
Expand Down Expand Up @@ -1243,6 +1249,9 @@ export class TransformIterator<S, D = S> extends BufferedIterator<D> {
function destinationEmitError<S>(this: InternalSource<S>, error: Error) {
this._destination.emit('error', error);
}
function destinationSetReadable<S>(this: InternalSource<S>) {
this._destination.readable = true;
}
function destinationCloseWhenDone<S>(this: InternalSource<S>) {
(this._destination as any)._closeWhenDone();
}
Expand Down Expand Up @@ -1510,12 +1519,13 @@ export class MultiTransformIterator<S, D = S> extends TransformIterator<S, D> {

/**
An iterator that generates items by reading from multiple other iterators.
@extends module:asynciterator.BufferedIterator
@extends module:asynciterator.AsyncIterator
*/
export class UnionIterator<T> extends BufferedIterator<T> {
private _sources : InternalSource<T>[] = [];
private _pending? : { sources?: AsyncIterator<MaybePromise<AsyncIterator<T>>> };
private _currentSource = -1;
export class UnionIterator<T> extends AsyncIterator<T> {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Document whether or not it is in-order; crucial to prepend and append.

private _sources : AsyncIterator<AsyncIterator<T>>;
private buffer = new LinkedList<AsyncIterator<T>>();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this needs a _ prefix.

private _sourceStarted: boolean;
private _maxBufferSize: number;

/**
Creates a new `UnionIterator`.
Expand All @@ -1526,101 +1536,94 @@ export class UnionIterator<T> extends BufferedIterator<T> {
AsyncIteratorOrArray<Promise<AsyncIterator<T>>> |
AsyncIteratorOrArray<MaybePromise<AsyncIterator<T>>>,
options: BufferedIteratorOptions = {}) {
super(options);
const autoStart = options.autoStart !== false;
super();

// Sources have been passed as an iterator
if (isEventEmitter(sources)) {
sources.on('error', error => this.emit('error', error));
this._pending = { sources: sources as AsyncIterator<MaybePromise<AsyncIterator<T>>> };
if (autoStart)
this._loadSources();
}
// Sources have been passed as a non-empty array
else if (Array.isArray(sources) && sources.length > 0) {
for (const source of sources)
this._addSource(source as MaybePromise<InternalSource<T>>);
}
// Sources are an empty list
else if (autoStart) {
this.close();
// Remove this in the next major version
if (Array.isArray(sources)) {
for (const source of sources) {
if (!isPromise(source)) {
// @ts-ignore
source._destination = this;
source.on('error', destinationEmitError);
}
}
}
}

// Loads sources passed as an iterator
protected _loadSources() {
// Obtain sources iterator
const sources = this._pending!.sources!;
delete this._pending!.sources;
// @ts-ignore
this._sources = (Array.isArray(sources) ? fromArray(sources) : wrap(sources)).map<AsyncIterator<T>>((it: any) => isPromise(it) ? wrap(it as any) : (it as AsyncIterator<T>));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this could also be simplified after #63


// Close immediately if done
if (sources.done) {
delete this._pending;
// @ts-ignore
this._addSource(this._sources);

this._sourceStarted = options.autoStart !== false;
this._maxBufferSize = options.maxBufferSize || Infinity;
if (this._sources.done && this._sourceStarted)
this.close();
}
// Otherwise, set up source reading
else {
sources.on('data', source => {
this._addSource(source as MaybePromise<InternalSource<T>>);
this._fillBufferAsync();
});
sources.on('end', () => {
delete this._pending;
this._fillBuffer();
});
}
else
this.readable = true;
}

// Adds the given source to the internal sources array
protected _addSource(source: MaybePromise<InternalSource<T>>) {
if (isPromise(source))
source = wrap<T>(source) as any as InternalSource<T>;
if (!source.done) {
this._sources.push(source);
source._destination = this;
source.on('error', destinationEmitError);
source.on('readable', destinationFillBuffer);
source.on('end', destinationRemoveEmptySources);
}
protected _addSource(source: InternalSource<T>) {
source._destination = this;
source.on('error', destinationEmitError);
source.on('readable', destinationSetReadable);
source.on('end', destinationRemoveEmptySources);
}

protected _removeSource(source: InternalSource<T>) {
source.removeListener('error', destinationEmitError);
source.removeListener('readable', destinationSetReadable);
source.removeListener('end', destinationRemoveEmptySources);
// @ts-ignore
delete source._destination;
}

// Removes sources that will no longer emit items
protected _removeEmptySources() {
this._sources = this._sources.filter((source, index) => {
// Adjust the index of the current source if needed
if (source.done && index <= this._currentSource)
this._currentSource--;
return !source.done;
this.buffer.mutateFilter((source: any) => {
if (source.done) {
this._removeSource(source);
return false;
}
return true;
});
this._fillBuffer();
if (this.buffer.empty && this._sources.done && this._sourceStarted)
this.close();
else
this.readable = true;
}

public close() {
this._removeSource(this._sources as any);
super.close();
}


// Reads items from the next sources
protected _read(count: number, done: () => void): void {
// Start source loading if needed
if (this._pending?.sources)
this._loadSources();

// Try to read `count` items
let lastCount = 0, item : T | null;
while (lastCount !== (lastCount = count)) {
// Try every source at least once
for (let i = 0; i < this._sources.length && count > 0; i++) {
// Pick the next source
this._currentSource = (this._currentSource + 1) % this._sources.length;
const source = this._sources[this._currentSource];
// Attempt to read an item from that source
if ((item = source.read()) !== null) {
count--;
this._push(item);
}
}
public read(): T | null {
if (!this._sourceStarted)
this._sourceStarted = true;

const { buffer, _sources } = this;
let item: T | null;
let iterator: AsyncIterator<T> | null;
let node = buffer._head;
while (node !== null) {
if (node.value.readable && (item = node.value.read()) !== null)
return item;
node = node.next;
}

// Close this iterator if all of its sources have been read
if (!this._pending && this._sources.length === 0)
this.close();
done();
while (this.buffer.length < this._maxBufferSize && (iterator = _sources.read()) !== null) {
this._addSource(iterator as any);
this.buffer.push(iterator);

if ((item = iterator.read()) !== null)
return item;
}
this._removeEmptySources();
this.readable = false;
return null;
}
}

Expand Down
38 changes: 37 additions & 1 deletion linkedlist.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ interface LinkedNode<V> {

export default class LinkedList<V> {
private _length: number = 0;
private _head: LinkedNode<V> | null = null;
_head: LinkedNode<V> | null = null;
private _tail: LinkedNode<V> | null = null;

get length() { return this._length; }
Expand Down Expand Up @@ -34,8 +34,44 @@ export default class LinkedList<V> {
return value;
}

mutateFilter(filter: (item: V) => boolean) {
let last: LinkedNode<V> | null;
let next: LinkedNode<V> | null;
while (this._head !== null && !filter(this._head.value)) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suspect this could be simplified into one single loop if we were to have the list itself implement the LinkedNode interface, referencing the first node in the chain as next instead of _head. It might not be worth the effort, though.

this._head = this._head.next
this._length--;
}
if (this._head === null) {
this._tail = null;
return
}
last = this._head;
next = this._head.next;
while (next !== null) {
if (filter(next.value)) {
last = next;
next = next.next
} else {
next = next.next
last.next = next
this._length--;
}
}
this._tail = last;
}

clear() {
this._length = 0;
this._head = this._tail = null;
}

// This iterator does not keep yielding items as they are pushed into the list.
// It synchronously runs until the current end of the list and that's it.
*[Symbol.iterator]() {
let node = this._head;
while (node !== null) {
yield node.value;
node = node.next;
}
}
}
13 changes: 13 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
"c8": "^7.2.0",
"chai": "^4.2.0",
"eslint": "^8.0.0",
"event-emitter-promisify": "^1.1.0",
"husky": "^4.2.5",
"jaguarjs-jsdoc": "^1.1.0",
"jsdoc": "^3.5.5",
Expand Down
31 changes: 31 additions & 0 deletions perf/UnionIterator-perf.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import { UnionIterator, range } from '../dist/asynciterator.js';
import { promisifyEventEmitter } from 'event-emitter-promisify'

let it;

// Warmup

console.time('For loop with 5x10^9 elems');
for (let i = 0; i < 5_000_000_000; i++);
console.timeEnd('For loop with 5x10^9 elems');

console.time('UnionIterator 2 x 10^7 iterators');
for (let i = 0; i < 5; i++) {
it = new UnionIterator([range(0, 10_000_000), range(0, 10_000_000)]);
await promisifyEventEmitter(it.on('data', () => {}))
}
console.timeEnd('UnionIterator 2 x 10^7 iterators');

console.time('UnionIterator 1000 x 20_000 iterators');
for (let i = 0; i < 5; i++) {
it = new UnionIterator(range(0, 1000).map(() => range(0, 20_000)));
await promisifyEventEmitter(it.on('data', () => {}))
}
console.timeEnd('UnionIterator 1000 x 20_000 iterators');

console.time('UnionIterator 1000 x 20_000 iterators - maxBufferSize of 1');
for (let i = 0; i < 5; i++) {
it = new UnionIterator(range(0, 1000).map(() => range(0, 20_000)));
await promisifyEventEmitter(it.on('data', () => {}))
}
console.timeEnd('UnionIterator 1000 x 20_000 iterators - maxBufferSize of 1');
20 changes: 20 additions & 0 deletions test/AsyncIterator-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@ import {
ENDED,
DESTROYED,
scheduleTask,
range,
} from '../dist/asynciterator.js';

import { EventEmitter } from 'events';
import { expect } from 'chai';

describe('AsyncIterator', () => {
describe('The AsyncIterator module', () => {
Expand Down Expand Up @@ -1307,4 +1309,22 @@ describe('AsyncIterator', () => {
});
});
});

describe('Testing #append', () => {
it('Should append an array', async () => {
expect(await range(0, 1).append([2, 3, 4]).toArray()).to.deep.equal([0, 1, 2, 3, 4]);
});
it('Should append an iterator', async () => {
expect(await range(0, 1).append(range(2, 4)).toArray()).to.deep.equal([0, 1, 2, 3, 4]);
});
});

describe('Testing #prepend', () => {
it('Should prepend an array', async () => {
expect(await range(0, 1).prepend([2, 3, 4]).toArray()).to.deep.equal([2, 3, 4, 0, 1]);
});
it('Should prepend an iterator', async () => {
expect(await range(0, 1).prepend(range(2, 4)).toArray()).to.deep.equal([2, 3, 4, 0, 1]);
});
});
});
Loading