diff --git a/README.md b/README.md index b625559..0a75052 100644 --- a/README.md +++ b/README.md @@ -90,17 +90,27 @@ Then in your async functions, you can do: ```js const items = [1, 2, 3, 4]; -const slowSquare = async (n) => { await sleep(5); return n * 2; }; -let newItems = await asyncmap(items, async (i) => { return await slowSquare(i); }); +const slowSquare = async (n) => { await sleep(5); return n * n; }; +let newItems = await asyncmap(items, slowSquare); console.log(newItems); // [1, 4, 9, 16]; const slowEven = async (n) => { await sleep(5); return n % 2 === 0; }; -newItems = await asyncfilter(items, async (i) => { return await slowEven(i); }); +newItems = await asyncfilter(items, slowEven); console.log(newItems); // [2, 4]; ``` -By default, `asyncmap` and `asyncfilter` run their operations in parallel; you -can pass `false` as a third argument to make sure it happens serially. +By default, `asyncmap` and `asyncfilter` run their operations in parallel, but you +can set the third argument to `false` to enforce sequential execution, or set a custom +concurrency pool limit using `{concurrency: }`: + +```js +const items = [1, 2, 3, 4]; +const slowSquare = async (n) => { await sleep(5); return n * n; }; +// this will run sequentially (~20ms) +const newItemsSeq = await asyncmap(items, slowSquare, false); +// this will handle 2 items at a time (~10ms) +const newItemsMaxTwo = await asyncmap(items, slowSquare, {concurrency: 2}); +``` ### waitForCondition diff --git a/lib/asyncbox.ts b/lib/asyncbox.ts index e7bb9ac..7f1aaa6 100644 --- a/lib/asyncbox.ts +++ b/lib/asyncbox.ts @@ -1,4 +1,5 @@ -import type {LongSleepOptions, WaitForConditionOptions} from './types.js'; +import {limitFunction} from 'p-limit'; +import type {LongSleepOptions, MapFilterOptions, WaitForConditionOptions} from './types.js'; const LONG_SLEEP_THRESHOLD = 5000; // anything over 5000ms will turn into a spin @@ -105,54 +106,66 @@ export async function retryInterval( } /** - * Similar to `Array.prototype.map`; runs in serial or parallel + * Similar to `Array.prototype.map`; runs in parallel, serial, or with custom concurrency pool * @param coll - The collection to map over * @param mapper - The function to apply to each element - * @param runInParallel - Whether to run operations in parallel (default: true) + * @param options - Options for controlling parallelism (default: true - fully parallel) */ export async function asyncmap( coll: T[], mapper: (value: T) => R | Promise, - runInParallel = true, + options: MapFilterOptions = true, ): Promise { - if (runInParallel) { - return Promise.all(coll.map(mapper)); + if (options === null) { + throw new Error('Options cannot be null'); } - - const newColl: R[] = []; - for (const item of coll) { - newColl.push(await mapper(item)); + // limitFunction requires the mapper to always return a promise + const mapperAsync = async (value: T): Promise => mapper(value); + if (options === false) { + return coll.reduce>( + async (acc, item) => [...(await acc), await mapperAsync(item)], + Promise.resolve([]), + ); } - return newColl; + const adjustedMapper = + options === true ? mapperAsync : limitFunction(mapperAsync, {concurrency: options.concurrency}); + return Promise.all(coll.map(adjustedMapper)); } /** - * Similar to `Array.prototype.filter` + * Similar to `Array.prototype.filter`; runs in parallel, serial, or with custom concurrency pool * @param coll - The collection to filter * @param filter - The function to test each element - * @param runInParallel - Whether to run operations in parallel (default: true) + * @param options - Options for controlling parallelism (default: true - fully parallel) */ export async function asyncfilter( coll: T[], filter: (value: T) => boolean | Promise, - runInParallel = true, + options: MapFilterOptions = true, ): Promise { - const newColl: T[] = []; - if (runInParallel) { - const bools = await Promise.all(coll.map(filter)); - for (let i = 0; i < coll.length; i++) { - if (bools[i]) { - newColl.push(coll[i]); - } - } - } else { - for (const item of coll) { - if (await filter(item)) { - newColl.push(item); + if (options === null) { + throw new Error('Options cannot be null'); + } + // limitFunction requires the filter to always return a promise + const filterAsync = async (value: T): Promise => filter(value); + if (options === false) { + return coll.reduce>(async (accP, item) => { + const acc = await accP; + if (await filterAsync(item)) { + acc.push(item); } - } + return acc; + }, Promise.resolve([])); } - return newColl; + const adjustedFilter = + options === true ? filterAsync : limitFunction(filterAsync, {concurrency: options.concurrency}); + const bools = await Promise.all(coll.map(adjustedFilter)); + return coll.reduce((acc, item, i) => { + if (bools[i]) { + acc.push(item); + } + return acc; + }, []); } /** diff --git a/lib/types.ts b/lib/types.ts index ccbac28..40ec4ca 100644 --- a/lib/types.ts +++ b/lib/types.ts @@ -31,6 +31,11 @@ export interface LongSleepOptions { progressCb?: ProgressCallback | null; } +/** + * Options for {@link asyncmap} and {@link asyncfilter} + */ +export type MapFilterOptions = boolean | {concurrency: number}; + /** * Options for {@link waitForCondition} */ diff --git a/package.json b/package.json index a649318..5aa42d7 100644 --- a/package.json +++ b/package.json @@ -45,6 +45,9 @@ "printWidth": 100, "singleQuote": true }, + "dependencies": { + "p-limit": "^7.2.0" + }, "devDependencies": { "@appium/eslint-config-appium-ts": "^2.0.5", "@appium/tsconfig": "^1.0.0", @@ -56,8 +59,8 @@ "chai-as-promised": "^8.0.2", "conventional-changelog-conventionalcommits": "^9.0.0", "eslint": "^9.39.1", - "prettier": "^3.0.0", "mocha": "^11.7.5", + "prettier": "^3.0.0", "semantic-release": "^25.0.2", "ts-node": "^10.9.1", "tsx": "^4.21.0", diff --git a/test/asyncbox-specs.ts b/test/asyncbox-specs.ts index fbc2c0b..d336124 100644 --- a/test/asyncbox-specs.ts +++ b/test/asyncbox-specs.ts @@ -210,49 +210,81 @@ describe('asyncmap', function () { await sleep(10); return el * 2; }; - const coll = [1, 2, 3]; + const coll = [1, 2, 3, 4, 5]; + const newColl = [2, 4, 6, 8, 10]; it('should map elements one at a time', async function () { const start = Date.now(); - expect(await asyncmap(coll, mapper, false)).to.eql([2, 4, 6]); - expect(Date.now() - start).to.be.at.least(30); + expect(await asyncmap(coll, mapper, false)).to.eql(newColl); + expect(Date.now() - start).to.be.at.least(50); }); it('should map elements in parallel', async function () { const start = Date.now(); - expect(await asyncmap(coll, mapper)).to.eql([2, 4, 6]); + expect(await asyncmap(coll, mapper)).to.eql(newColl); expect(Date.now() - start).to.be.at.most(20); }); + it('should map elements with concurrency', async function () { + const start = Date.now(); + expect(await asyncmap(coll, mapper, {concurrency: 2})).to.eql(newColl); + expect(Date.now() - start).to.be.at.least(29); + expect(Date.now() - start).to.be.at.most(40); + }); it('should handle an empty array', async function () { expect(await asyncmap([], mapper, false)).to.eql([]); }); it('should handle an empty array in parallel', async function () { expect(await asyncmap([], mapper)).to.eql([]); }); + it('should work for a sync mapper function', async function () { + const syncmapper = (el: number): number => el * 2; + expect(await asyncmap(coll, syncmapper, false)).to.eql(newColl); + expect(await asyncmap(coll, syncmapper)).to.eql(newColl); + }); + it('should raise an error if options is null', async function () { + // @ts-expect-error - testing invalid inputs + await expect(asyncmap(coll, mapper, null)).to.be.rejectedWith( + 'Options cannot be null' + ); + }); }); describe('asyncfilter', function () { const filter = async function (el: number): Promise { - await sleep(5); + await sleep(10); return el % 2 === 0; }; const coll = [1, 2, 3, 4, 5]; + const newColl = [2, 4]; it('should filter elements one at a time', async function () { const start = Date.now(); - expect(await asyncfilter(coll, filter, false)).to.eql([2, 4]); - expect(Date.now() - start).to.be.at.least(19); + expect(await asyncfilter(coll, filter, false)).to.eql(newColl); + expect(Date.now() - start).to.be.at.least(50); }); it('should filter elements in parallel', async function () { const start = Date.now(); - expect(await asyncfilter(coll, filter)).to.eql([2, 4]); - expect(Date.now() - start).to.be.below(9); + expect(await asyncfilter(coll, filter)).to.eql(newColl); + expect(Date.now() - start).to.be.at.most(20); }); - it('should handle an empty array', async function () { + it('should filter elements with concurrency', async function () { const start = Date.now(); + expect(await asyncfilter(coll, filter, {concurrency: 2})).to.eql(newColl); + expect(Date.now() - start).to.be.at.least(29); + expect(Date.now() - start).to.be.at.most(40); + }); + it('should handle an empty array', async function () { expect(await asyncfilter([], filter, false)).to.eql([]); - expect(Date.now() - start).to.be.below(9); }); it('should handle an empty array in parallel', async function () { - const start = Date.now(); expect(await asyncfilter([], filter)).to.eql([]); - expect(Date.now() - start).to.be.below(9); + }); + it('should work for a sync filter function', async function () { + const syncfilter = (el: number): boolean => el % 2 === 0; + expect(await asyncfilter(coll, syncfilter, false)).to.eql(newColl); + expect(await asyncfilter(coll, syncfilter)).to.eql(newColl); + }); + it('should raise an error if options is null', async function () { + // @ts-expect-error - testing invalid inputs + await expect(asyncfilter(coll, filter, null)).to.be.rejectedWith( + 'Options cannot be null' + ); }); });