Skip to content

Commit

Permalink
feat: refactor pMap, concurrency now defaults to 16 (was: Infinity)
Browse files Browse the repository at this point in the history
  • Loading branch information
kirillgroshkov committed Apr 5, 2024
1 parent 662ec5d commit 194be8d
Show file tree
Hide file tree
Showing 4 changed files with 345 additions and 302 deletions.
2 changes: 1 addition & 1 deletion docs/promise.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ Based on [p-map](https://github.com/sindresorhus/p-map)

Allows to asynchronously map an array of Promises, with options to:

- control `concurrency` (default: `Infinity`)
- control `concurrency` (default: `16`)
- control error behavior (`ErrorMode`):
- `THROW_IMMEDIATELY` (default)
- `THROW_AGGREGATED`: throw `AggregateError` in the end of execution, if at least 1 error happened
Expand Down
10 changes: 10 additions & 0 deletions src/promise/pMap.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -230,3 +230,13 @@ async function fn(n: number): Promise<number> {

return n * 2
}

test('Infinity math', () => {
const a = Infinity
const b = Infinity
expect(a).toBe(b)
// eslint-disable-next-line jest/prefer-equality-matcher
expect(a === b).toBe(true)
// eslint-disable-next-line jest/prefer-equality-matcher
expect(a === Infinity).toBe(true)
})
167 changes: 100 additions & 67 deletions src/promise/pMap.ts
Original file line number Diff line number Diff line change
@@ -1,20 +1,15 @@
/*
Taken from https://github.com/sindresorhus/p-map
Improvements:
- Exported as { pMap }, so IDE auto-completion works
- Included Typescript typings (no need for @types/p-map)
- Compatible with pProps (that had typings issues)
*/

import type { AbortableAsyncMapper, CommonLogger } from '..'
import { END, ErrorMode, SKIP } from '..'

export interface PMapOptions {
/**
* Number of concurrently pending promises returned by `mapper`.
*
* @default Infinity
* Defaults to 16.
*
* It previously (and originally) defaulted to Infinity, which was later changed,
* because it's somewhat dangerous to run "infinite number of parallel promises".
* You can still emulate the old behavior by passing `Infinity`.
*/
concurrency?: number

Expand All @@ -36,6 +31,14 @@ export interface PMapOptions {
}

/**
* Forked from https://github.com/sindresorhus/p-map
*
* Improvements:
* - Exported as { pMap }, so IDE auto-completion works
* - Included Typescript typings (no need for @types/p-map)
* - Compatible with pProps (that had typings issues)
* - Preserves async stack traces (in selected cases)
*
* Returns a `Promise` that is fulfilled when all promises in `input` and ones returned from `mapper` are fulfilled,
* or rejects if any of the promises reject. The fulfilled value is an `Array` of the fulfilled values returned
* from `mapper` in `input` order.
Expand Down Expand Up @@ -66,73 +69,30 @@ export async function pMap<IN, OUT>(
mapper: AbortableAsyncMapper<IN, OUT>,
opt: PMapOptions = {},
): Promise<OUT[]> {
const { logger = console } = opt
const ret: (OUT | typeof SKIP)[] = []
// const iterator = iterable[Symbol.iterator]()
const items = [...iterable]
const itemsLength = items.length
if (itemsLength === 0) return [] // short circuit

const { concurrency = itemsLength, errorMode = ErrorMode.THROW_IMMEDIATELY } = opt

const errors: Error[] = []
let isSettled = false
let resolvingCount = 0
let currentIndex = 0
const { concurrency = 16, errorMode = ErrorMode.THROW_IMMEDIATELY, logger = console } = opt

// Special cases that are able to preserve async stack traces

// Special case: serial execution
if (concurrency === 1) {
// Special case for concurrency == 1

for (const item of items) {
try {
const r = await mapper(item, currentIndex++)
if (r === END) break
if (r !== SKIP) ret.push(r)
} catch (err) {
if (errorMode === ErrorMode.THROW_IMMEDIATELY) throw err
if (errorMode === ErrorMode.THROW_AGGREGATED) {
errors.push(err as Error)
} else {
// otherwise, suppress (but still log via logger)
logger?.error(err)
}
}
}

if (errors.length) {
throw new AggregateError(errors, `pMap resulted in ${errors.length} error(s)`)
}

return ret as OUT[]
} else if (!opt.concurrency || items.length <= opt.concurrency) {
// Special case for concurrency == infinity or iterable.length < concurrency

if (errorMode === ErrorMode.THROW_IMMEDIATELY) {
return (await Promise.all(items.map((item, i) => mapper(item, i)))).filter(
r => r !== SKIP && r !== END,
) as OUT[]
}

;(await Promise.allSettled(items.map((item, i) => mapper(item, i)))).forEach(r => {
if (r.status === 'fulfilled') {
if (r.value !== SKIP && r.value !== END) ret.push(r.value)
} else if (errorMode === ErrorMode.THROW_AGGREGATED) {
errors.push(r.reason)
} else {
// otherwise, suppress (but still log via logger)
logger?.error(r.reason)
}
})

if (errors.length) {
throw new AggregateError(errors, `pMap resulted in ${errors.length} error(s)`)
}
return await pMap1(items, mapper, errorMode, logger)
}

return ret as OUT[]
// Special case: concurrency === Infinity or items.length <= concurrency
if (concurrency === Infinity || items.length <= concurrency) {
return await pMapAll(items, mapper, errorMode, logger)
}

// General case: execution with throttled concurrency
const ret: (OUT | typeof SKIP)[] = []
const errors: Error[] = []
let isSettled = false
let resolvingCount = 0
let currentIndex = 0

return await new Promise<OUT[]>((resolve, reject) => {
const next = (): void => {
if (isSettled) {
Expand Down Expand Up @@ -198,3 +158,76 @@ export async function pMap<IN, OUT>(
}
})
}

/**
pMap with serial (non-concurrent) execution.
*/
async function pMap1<IN, OUT>(
items: IN[],
mapper: AbortableAsyncMapper<IN, OUT>,
errorMode: ErrorMode,
logger: CommonLogger | null,
): Promise<OUT[]> {
let i = 0
const ret: OUT[] = []
const errors: Error[] = []

for (const item of items) {
try {
const r = await mapper(item, i++)
if (r === END) break
if (r !== SKIP) ret.push(r)
} catch (err) {
if (errorMode === ErrorMode.THROW_IMMEDIATELY) throw err
if (errorMode === ErrorMode.THROW_AGGREGATED) {
errors.push(err as Error)
} else {
// otherwise, suppress (but still log via logger)
logger?.error(err)
}
}
}

if (errors.length) {
throw new AggregateError(errors, `pMap resulted in ${errors.length} error(s)`)
}

return ret
}

/**
pMap with fully concurrent execution, like Promise.all
*/
async function pMapAll<IN, OUT>(
items: IN[],
mapper: AbortableAsyncMapper<IN, OUT>,
errorMode: ErrorMode,
logger: CommonLogger | null,
): Promise<OUT[]> {
if (errorMode === ErrorMode.THROW_IMMEDIATELY) {
return (await Promise.all(items.map((item, i) => mapper(item, i)))).filter(
r => r !== SKIP && r !== END,
) as OUT[]
}

const ret: OUT[] = []
const errors: Error[] = []

for (const r of await Promise.allSettled(items.map((item, i) => mapper(item, i)))) {
if (r.status === 'fulfilled') {
if (r.value === END) break
if (r.value !== SKIP) ret.push(r.value)
} else if (errorMode === ErrorMode.THROW_AGGREGATED) {
errors.push(r.reason)
} else {
// otherwise, suppress (but still log via logger)
logger?.error(r.reason)
}
}

if (errors.length) {
throw new AggregateError(errors, `pMap resulted in ${errors.length} error(s)`)
}

return ret
}
Loading

0 comments on commit 194be8d

Please sign in to comment.