Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 15 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,17 +90,27 @@ Then in your async functions, you can do:

```js
const items = [1, 2, 3, 4];
const slowSquare = async (n) => { await sleep(5); return n * 2; };
let newItems = await asyncmap(items, async (i) => { return await slowSquare(i); });
const slowSquare = async (n) => { await sleep(5); return n * n; };
let newItems = await asyncmap(items, slowSquare);
console.log(newItems); // [1, 4, 9, 16];

const slowEven = async (n) => { await sleep(5); return n % 2 === 0; };
newItems = await asyncfilter(items, async (i) => { return await slowEven(i); });
newItems = await asyncfilter(items, slowEven);
console.log(newItems); // [2, 4];
```

By default, `asyncmap` and `asyncfilter` run their operations in parallel; you
can pass `false` as a third argument to make sure it happens serially.
By default, `asyncmap` and `asyncfilter` run their operations in parallel, but you
can set the third argument to `false` to enforce sequential execution, or set a custom
concurrency pool limit using `{concurrency: <number>}`:

```js
const items = [1, 2, 3, 4];
const slowSquare = async (n) => { await sleep(5); return n * n; };
// this will run sequentially (~20ms)
const newItemsSeq = await asyncmap(items, slowSquare, false);
// this will handle 2 items at a time (~10ms)
const newItemsMaxTwo = await asyncmap(items, slowSquare, {concurrency: 2});
```

### waitForCondition

Expand Down
69 changes: 41 additions & 28 deletions lib/asyncbox.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import type {LongSleepOptions, WaitForConditionOptions} from './types.js';
import {limitFunction} from 'p-limit';
import type {LongSleepOptions, MapFilterOptions, WaitForConditionOptions} from './types.js';

const LONG_SLEEP_THRESHOLD = 5000; // anything over 5000ms will turn into a spin

Expand Down Expand Up @@ -105,54 +106,66 @@ export async function retryInterval<T = any>(
}

/**
* Similar to `Array.prototype.map`; runs in serial or parallel
* Similar to `Array.prototype.map`; runs in parallel, serial, or with custom concurrency pool
* @param coll - The collection to map over
* @param mapper - The function to apply to each element
* @param runInParallel - Whether to run operations in parallel (default: true)
* @param options - Options for controlling parallelism (default: true - fully parallel)
*/
export async function asyncmap<T, R>(
coll: T[],
mapper: (value: T) => R | Promise<R>,
runInParallel = true,
options: MapFilterOptions = true,
): Promise<R[]> {
if (runInParallel) {
return Promise.all(coll.map(mapper));
if (options === null) {
throw new Error('Options cannot be null');
}

const newColl: R[] = [];
for (const item of coll) {
newColl.push(await mapper(item));
// limitFunction requires the mapper to always return a promise
const mapperAsync = async (value: T): Promise<R> => mapper(value);
if (options === false) {
return coll.reduce<Promise<R[]>>(
async (acc, item) => [...(await acc), await mapperAsync(item)],
Promise.resolve([]),
);
}
return newColl;
const adjustedMapper =
options === true ? mapperAsync : limitFunction(mapperAsync, {concurrency: options.concurrency});
return Promise.all(coll.map(adjustedMapper));
}

/**
* Similar to `Array.prototype.filter`
* Similar to `Array.prototype.filter`; runs in parallel, serial, or with custom concurrency pool
* @param coll - The collection to filter
* @param filter - The function to test each element
* @param runInParallel - Whether to run operations in parallel (default: true)
* @param options - Options for controlling parallelism (default: true - fully parallel)
*/
export async function asyncfilter<T>(
coll: T[],
filter: (value: T) => boolean | Promise<boolean>,
runInParallel = true,
options: MapFilterOptions = true,
): Promise<T[]> {
const newColl: T[] = [];
if (runInParallel) {
const bools = await Promise.all(coll.map(filter));
for (let i = 0; i < coll.length; i++) {
if (bools[i]) {
newColl.push(coll[i]);
}
}
} else {
for (const item of coll) {
if (await filter(item)) {
newColl.push(item);
if (options === null) {
throw new Error('Options cannot be null');
}
// limitFunction requires the filter to always return a promise
const filterAsync = async (value: T): Promise<boolean> => filter(value);
if (options === false) {
return coll.reduce<Promise<T[]>>(async (accP, item) => {
const acc = await accP;
if (await filterAsync(item)) {
acc.push(item);
}
}
return acc;
}, Promise.resolve([]));
}
return newColl;
const adjustedFilter =
options === true ? filterAsync : limitFunction(filterAsync, {concurrency: options.concurrency});
const bools = await Promise.all(coll.map(adjustedFilter));
return coll.reduce<T[]>((acc, item, i) => {
if (bools[i]) {
acc.push(item);
}
return acc;
}, []);
}

/**
Expand Down
5 changes: 5 additions & 0 deletions lib/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ export interface LongSleepOptions {
progressCb?: ProgressCallback | null;
}

/**
* Options for {@link asyncmap} and {@link asyncfilter}
*/
export type MapFilterOptions = boolean | {concurrency: number};

/**
* Options for {@link waitForCondition}
*/
Expand Down
5 changes: 4 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@
"printWidth": 100,
"singleQuote": true
},
"dependencies": {
"p-limit": "^7.2.0"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume the main reason for the major version bump was to get rid of any third party dependencies, however now we still add one.

Would you mind if I try to actually implement concurrent mapping and filtering as part of this module without using third party modules?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't consider the removal of dependencies on its own to require a major version bump. I was merely investigating how to achieve that, and decided to modernize the package along the way, which resulted in the major bumps.
Regarding p-limit specifically, while I still maintain my stance that it's preferable to minimize third party dependencies here, there's 3 main reasons why I added it:

  • Replacing it with native functionality requires somewhat non-trivial code (unlike the previous uses of bluebird and lodash). This is also why I have not attempted to remove lodash from teen_process, since it uses _.isBuffer, whose replacement would require more than a one-liner.
  • It is a very small module (12KB unpacked, compared to 632KB for bluebird and 1.41MB for lodash), and is purpose-built for this use case.
  • It comes from a trusted developer, which gives confidence in the module's coverage of any edge cases.

Hand-rolling the concurrency is certainly still possible, but given the above, I'm not sure if it's worth the effort, even with AI assistance.

},
"devDependencies": {
"@appium/eslint-config-appium-ts": "^2.0.5",
"@appium/tsconfig": "^1.0.0",
Expand All @@ -56,8 +59,8 @@
"chai-as-promised": "^8.0.2",
"conventional-changelog-conventionalcommits": "^9.0.0",
"eslint": "^9.39.1",
"prettier": "^3.0.0",
"mocha": "^11.7.5",
"prettier": "^3.0.0",
"semantic-release": "^25.0.2",
"ts-node": "^10.9.1",
"tsx": "^4.21.0",
Expand Down
58 changes: 45 additions & 13 deletions test/asyncbox-specs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -210,49 +210,81 @@ describe('asyncmap', function () {
await sleep(10);
return el * 2;
};
const coll = [1, 2, 3];
const coll = [1, 2, 3, 4, 5];
const newColl = [2, 4, 6, 8, 10];
it('should map elements one at a time', async function () {
const start = Date.now();
expect(await asyncmap(coll, mapper, false)).to.eql([2, 4, 6]);
expect(Date.now() - start).to.be.at.least(30);
expect(await asyncmap(coll, mapper, false)).to.eql(newColl);
expect(Date.now() - start).to.be.at.least(50);
});
it('should map elements in parallel', async function () {
const start = Date.now();
expect(await asyncmap(coll, mapper)).to.eql([2, 4, 6]);
expect(await asyncmap(coll, mapper)).to.eql(newColl);
expect(Date.now() - start).to.be.at.most(20);
});
it('should map elements with concurrency', async function () {
const start = Date.now();
expect(await asyncmap(coll, mapper, {concurrency: 2})).to.eql(newColl);
expect(Date.now() - start).to.be.at.least(29);
expect(Date.now() - start).to.be.at.most(40);
});
it('should handle an empty array', async function () {
expect(await asyncmap([], mapper, false)).to.eql([]);
});
it('should handle an empty array in parallel', async function () {
expect(await asyncmap([], mapper)).to.eql([]);
});
it('should work for a sync mapper function', async function () {
const syncmapper = (el: number): number => el * 2;
expect(await asyncmap(coll, syncmapper, false)).to.eql(newColl);
expect(await asyncmap(coll, syncmapper)).to.eql(newColl);
});
it('should raise an error if options is null', async function () {
// @ts-expect-error - testing invalid inputs
await expect(asyncmap(coll, mapper, null)).to.be.rejectedWith(
'Options cannot be null'
);
});
});

describe('asyncfilter', function () {
const filter = async function (el: number): Promise<boolean> {
await sleep(5);
await sleep(10);
return el % 2 === 0;
};
const coll = [1, 2, 3, 4, 5];
const newColl = [2, 4];
it('should filter elements one at a time', async function () {
const start = Date.now();
expect(await asyncfilter(coll, filter, false)).to.eql([2, 4]);
expect(Date.now() - start).to.be.at.least(19);
expect(await asyncfilter(coll, filter, false)).to.eql(newColl);
expect(Date.now() - start).to.be.at.least(50);
});
it('should filter elements in parallel', async function () {
const start = Date.now();
expect(await asyncfilter(coll, filter)).to.eql([2, 4]);
expect(Date.now() - start).to.be.below(9);
expect(await asyncfilter(coll, filter)).to.eql(newColl);
expect(Date.now() - start).to.be.at.most(20);
});
it('should handle an empty array', async function () {
it('should filter elements with concurrency', async function () {
const start = Date.now();
expect(await asyncfilter(coll, filter, {concurrency: 2})).to.eql(newColl);
expect(Date.now() - start).to.be.at.least(29);
expect(Date.now() - start).to.be.at.most(40);
});
it('should handle an empty array', async function () {
expect(await asyncfilter([], filter, false)).to.eql([]);
expect(Date.now() - start).to.be.below(9);
});
it('should handle an empty array in parallel', async function () {
const start = Date.now();
expect(await asyncfilter([], filter)).to.eql([]);
expect(Date.now() - start).to.be.below(9);
});
it('should work for a sync filter function', async function () {
const syncfilter = (el: number): boolean => el % 2 === 0;
expect(await asyncfilter(coll, syncfilter, false)).to.eql(newColl);
expect(await asyncfilter(coll, syncfilter)).to.eql(newColl);
});
it('should raise an error if options is null', async function () {
// @ts-expect-error - testing invalid inputs
await expect(asyncfilter(coll, filter, null)).to.be.rejectedWith(
'Options cannot be null'
);
});
});