Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 25 additions & 22 deletions asynciterator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -791,6 +791,9 @@ export function identity<S>(item: S): typeof item {
return item;
}

/** Key indicating the current consumer of a source. */
export const DESTINATION = Symbol('destination');


/**
An iterator that synchronously transforms every item from its source
Expand Down Expand Up @@ -821,7 +824,7 @@ export class MappingIterator<S, D = S> extends AsyncIterator<D> {
}
// Otherwise, wire up the source for reading
else {
this._source._destination = this;
this._source[DESTINATION] = this;
this._source.on('end', destinationClose);
this._source.on('error', destinationEmitError);
this._source.on('readable', destinationSetReadable);
Expand Down Expand Up @@ -854,7 +857,7 @@ export class MappingIterator<S, D = S> extends AsyncIterator<D> {
this._source.removeListener('end', destinationClose);
this._source.removeListener('error', destinationEmitError);
this._source.removeListener('readable', destinationSetReadable);
delete this._source._destination;
delete this._source[DESTINATION];
if (this._destroySource)
this._source.destroy();
super._end(destroy);
Expand All @@ -865,7 +868,7 @@ export class MappingIterator<S, D = S> extends AsyncIterator<D> {
function ensureSourceAvailable<S>(source?: AsyncIterator<S>, allowDestination = false) {
if (!source || !isFunction(source.read) || !isFunction(source.on))
throw new TypeError(`Invalid source: ${source}`);
if (!allowDestination && (source as any)._destination)
if (!allowDestination && (source as any)[DESTINATION])
throw new Error('The source already has a destination');
return source as InternalSource<S>;
}
Expand Down Expand Up @@ -1209,7 +1212,7 @@ export class TransformIterator<S, D = S> extends BufferedIterator<D> {

// Validate and set source
const source = this._source = this._validateSource(value);
source._destination = this;
source[DESTINATION] = this;

// Close this iterator if the source has already ended
if (source.done) {
Expand Down Expand Up @@ -1324,7 +1327,7 @@ export class TransformIterator<S, D = S> extends BufferedIterator<D> {
source.removeListener('end', destinationCloseWhenDone);
source.removeListener('error', destinationEmitError);
source.removeListener('readable', destinationFillBuffer);
delete source._destination;
delete source[DESTINATION];
if (this._destroySource)
source.destroy();
}
Expand All @@ -1333,20 +1336,20 @@ export class TransformIterator<S, D = S> extends BufferedIterator<D> {
}

function destinationSetReadable<S>(this: InternalSource<S>) {
this._destination!.readable = true;
this[DESTINATION]!.readable = true;
}
function destinationEmitError<S>(this: InternalSource<S>, error: Error) {
this._destination!.emit('error', error);
this[DESTINATION]!.emit('error', error);
}
function destinationClose<S>(this: InternalSource<S>) {
this._destination!.close();
this[DESTINATION]!.close();
}
function destinationCloseWhenDone<S>(this: InternalSource<S>) {
(this._destination as any)._closeWhenDone();
(this[DESTINATION] as any)._closeWhenDone();
}
function destinationFillBuffer<S>(this: InternalSource<S>) {
if ((this._destination as any)._sourceStarted !== false)
(this._destination as any)._fillBuffer();
if ((this[DESTINATION] as any)._sourceStarted !== false)
(this[DESTINATION] as any)._fillBuffer();
}


Expand Down Expand Up @@ -1564,7 +1567,7 @@ export class MultiTransformIterator<S, D = S> extends TransformIterator<S, D> {
// Create the transformer and listen to its events
const transformer = (this._createTransformer(item) ||
new EmptyIterator()) as InternalSource<D>;
transformer._destination = this;
transformer[DESTINATION] = this;
transformer.on('end', destinationFillBuffer);
transformer.on('readable', destinationFillBuffer);
transformer.on('error', destinationEmitError);
Expand Down Expand Up @@ -1689,7 +1692,7 @@ export class UnionIterator<T> extends BufferedIterator<T> {
source = wrap<T>(source) as any as InternalSource<T>;
if (!source.done) {
this._sources.push(source);
source._destination = this;
source[DESTINATION] = this;
source.on('error', destinationEmitError);
source.on('readable', destinationFillBuffer);
source.on('end', destinationRemoveEmptySources);
Expand Down Expand Up @@ -1753,7 +1756,7 @@ export class UnionIterator<T> extends BufferedIterator<T> {
}

function destinationRemoveEmptySources<T>(this: InternalSource<T>) {
(this._destination as any)._removeEmptySources();
(this[DESTINATION] as any)._removeEmptySources();
}


Expand Down Expand Up @@ -1795,8 +1798,8 @@ export class ClonedIterator<T> extends TransformIterator<T> {
// Validate and set the source
const source = this._source = this._validateSource(value);
// Create a history reader for the source if none already existed
const history = (source && (source as any)._destination) ||
(source._destination = new HistoryReader<T>(source) as any);
const history = (source && (source as any)[DESTINATION]) ||
(source[DESTINATION] = new HistoryReader<T>(source) as any);

// Close this clone if history is empty and the source has ended
if (history.endsAt(0)) {
Expand Down Expand Up @@ -1827,7 +1830,7 @@ export class ClonedIterator<T> extends TransformIterator<T> {
@param {boolean} allowDestination Whether the source can already have a destination
*/
protected _validateSource(source?: AsyncIterator<T>, allowDestination = false) {
const history = (source && (source as any)._destination);
const history = (source && (source as any)[DESTINATION]);
return super._validateSource(source, !history || history instanceof HistoryReader);
}

Expand Down Expand Up @@ -1881,7 +1884,7 @@ export class ClonedIterator<T> extends TransformIterator<T> {
let item = null;
if (!this.done && source) {
// Try to read an item at the current point in history
const history = source._destination as any as HistoryReader<T>;
const history = source[DESTINATION] as any as HistoryReader<T>;
if ((item = history.readAt(this._readPosition)) !== null)
this._readPosition++;
else
Expand All @@ -1897,7 +1900,7 @@ export class ClonedIterator<T> extends TransformIterator<T> {
protected _end(destroy: boolean) {
// Unregister from a possible history reader
const source = this.source as InternalSource<T>;
const history = source?._destination as any as HistoryReader<T>;
const history = source?.[DESTINATION] as any as HistoryReader<T>;
if (history)
history.unregister(this);

Expand Down Expand Up @@ -2043,7 +2046,7 @@ export class WrappingIterator<T> extends AsyncIterator<T> {
}

// Set up event handling
source._destination = this;
source[DESTINATION] = this;
source.on('end', destinationClose);
source.on('error', destinationEmitError);
source.on('readable', destinationSetReadable);
Expand All @@ -2070,7 +2073,7 @@ export class WrappingIterator<T> extends AsyncIterator<T> {
this._source.removeListener('end', destinationClose);
this._source.removeListener('error', destinationEmitError);
this._source.removeListener('readable', destinationSetReadable);
delete this._source._destination;
delete this._source[DESTINATION];
this._source = null;
}
}
Expand Down Expand Up @@ -2224,7 +2227,7 @@ type SourceExpression<T> =
(() => MaybePromise<AsyncIterator<T>>);

type InternalSource<T> =
AsyncIterator<T> & { _destination?: AsyncIterator<any> };
AsyncIterator<T> & { [DESTINATION]?: AsyncIterator<any> };

// Returns a function that calls `fn` with `self` as `this` pointer. */
function bind<T extends Function>(fn: T, self?: object): T {
Expand Down
3 changes: 2 additions & 1 deletion test/ClonedIterator-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import {
BufferedIterator,
EmptyIterator,
ArrayIterator,
DESTINATION,
} from '../dist/asynciterator.js';

import { EventEmitter } from 'events';
Expand Down Expand Up @@ -120,7 +121,7 @@ describe('ClonedIterator', () => {
describe('Cloning an iterator that already has a destination', () => {
it('should throw an exception', () => {
const source = new AsyncIterator(), destination = new TransformIterator(source);
source.should.have.property('_destination', destination);
source.should.have.property(DESTINATION, destination);
(() => source.clone()).should.throw('The source already has a destination');
});
});
Expand Down
7 changes: 4 additions & 3 deletions test/TransformIterator-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import {
TransformIterator,
wrap,
scheduleTask,
DESTINATION,
} from '../dist/asynciterator.js';

import { EventEmitter } from 'events';
Expand Down Expand Up @@ -359,7 +360,7 @@ describe('TransformIterator', () => {
});

it('should remove itself as destination from the source', () => {
source.should.not.have.key('_destination');
source.should.not.have.key(DESTINATION);
});
});
});
Expand Down Expand Up @@ -466,7 +467,7 @@ describe('TransformIterator', () => {
});

it('should remove itself as destination from the source', () => {
source.should.not.have.key('_destination');
source.should.not.have.key(DESTINATION);
});
});
});
Expand Down Expand Up @@ -575,7 +576,7 @@ describe('TransformIterator', () => {
});

it('should remove itself as destination from the source', () => {
source.should.not.have.key('_destination');
source.should.not.have.key(DESTINATION);
});
});
});
Expand Down