diff --git a/src/index.ts b/src/index.ts index 343517d..8dc276c 100644 --- a/src/index.ts +++ b/src/index.ts @@ -52,6 +52,7 @@ export * from './stream/transform/transformSplit' export * from './stream/transform/transformTap' export * from './stream/transform/transformToArray' export * from './stream/transform/transformTee' +export * from './stream/transform/transformThrottle' export * from './stream/transform/worker/baseWorkerClass' export * from './stream/transform/worker/transformMultiThreaded' export * from './stream/transform/worker/transformMultiThreaded.model' diff --git a/src/stream/transform/transformThrottle.test.ts b/src/stream/transform/transformThrottle.test.ts new file mode 100644 index 0000000..7428a2c --- /dev/null +++ b/src/stream/transform/transformThrottle.test.ts @@ -0,0 +1,25 @@ +import { Readable } from 'node:stream' +import { _range, ObjectWithId } from '@naturalcycles/js-lib' +import { _pipeline } from '../pipeline/pipeline' +import { writableVoid } from '../writable/writableVoid' +import { transformTap } from './transformTap' +import { transformThrottle } from './transformThrottle' + +test('transformThrottle', async () => { + await _pipeline([ + // super-fast producer + Readable.from(_range(1, 11).map(id => ({ id: String(id) }))), + // transformTap(obj => { + // console.log('pre', obj) + // }), + transformThrottle({ + interval: 1, + throughput: 3, + // debug: true, + }), + transformTap(obj => { + console.log('post', obj) + }), + writableVoid(), + ]) +}, 20_000) diff --git a/src/stream/transform/transformThrottle.ts b/src/stream/transform/transformThrottle.ts new file mode 100644 index 0000000..4472780 --- /dev/null +++ b/src/stream/transform/transformThrottle.ts @@ -0,0 +1,104 @@ +import { Transform } from 'node:stream' +import { + _ms, + DeferredPromise, + localTime, + NumberOfMilliseconds, + NumberOfSeconds, + pDefer, + PositiveInteger, +} from '@naturalcycles/js-lib' +import { TransformTyped } from '../stream.model' + +export interface TransformThrottleOptions { + /** + * How many items to allow per `interval` of seconds. + */ + throughput: PositiveInteger + + /** + * How long is the interval (in seconds) where number of items should not exceed `throughput`. + */ + interval: NumberOfSeconds + + debug?: boolean +} + +/** + * Allows to throttle the throughput of the stream. + * For example, when you have an API with rate limit of 5000 requests per minute, + * `transformThrottle` can help you utilize it most efficiently. + * You can define it as: + * + * _pipeline([ + * // ... + * transformThrottle({ + * throughput: 5000, + * interval: 60, + * }), + * // ... + * ]) + * + * @experimental + */ +export function transformThrottle(opt: TransformThrottleOptions): TransformTyped { + const { throughput, interval, debug } = opt + + let count = 0 + let start: NumberOfMilliseconds + let paused: DeferredPromise | undefined + let timeout: NodeJS.Timeout | undefined + + return new Transform({ + objectMode: true, + async transform(item: T, _, cb) { + // console.log('incoming', item, { paused: !!paused, count }) + if (!start) { + start = Date.now() + timeout = setTimeout(() => onInterval(this), interval * 1000) + if (debug) { + console.log(`${localTime.now().toPretty()} transformThrottle started with`, { + throughput, + interval, + rps: Math.round(throughput / interval), + }) + } + } + + if (paused) { + // console.log('awaiting pause', {item, count}) + await paused + } + + if (++count >= throughput) { + // console.log('pausing now after', {item, count}) + paused = pDefer() + if (debug) { + console.log( + `${localTime.now().toPretty()} transformThrottle activated: ${count} items passed in ${_ms(interval * 1000)}, will pause for ${_ms(interval * 1000 - (Date.now() - start))}`, + ) + } + } + + cb(null, item) // pass the item through + }, + final(cb) { + clearTimeout(timeout) + cb() + }, + }) + + function onInterval(transform: Transform): void { + if (!paused) return + + if (debug) { + console.log(`${localTime.now().toPretty()} transformThrottle resumed`) + } + + count = 0 + start = Date.now() + timeout = setTimeout(() => onInterval(transform), interval * 1000) + paused.resolve() + paused = undefined + } +}