Skip to content

Commit

Permalink
feat: rename transformBuffer to transformChunk
Browse files Browse the repository at this point in the history
  • Loading branch information
kirillgroshkov committed Apr 15, 2024
1 parent e8fe43a commit 69e6445
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 51 deletions.
2 changes: 1 addition & 1 deletion src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ export * from './stream/readable/readableFromArray'
export * from './stream/readable/readableToArray'
export * from './stream/stream.model'
export * from './stream/progressLogger'
export * from './stream/transform/transformBuffer'
export * from './stream/transform/transformChunk'
export * from './stream/transform/transformFilter'
export * from './stream/transform/transformLimit'
export * from './stream/transform/transformLogProgress'
Expand Down
16 changes: 8 additions & 8 deletions src/stream/progressLogger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -103,12 +103,12 @@ export interface ProgressLoggerCfg<T = any> {

/**
* If specified - will multiply the counter by this number.
* Useful e.g when using `transformBuffer({ batchSize: 500 })`, so
* it'll accurately represent the number of processed entries (not batches).
* Useful e.g when using `transformChunk({ chunkSize: 500 })`, so
* it'll accurately represent the number of processed entries (not chunks).
*
* Defaults to 1.
*/
batchSize?: number
chunkSize?: number

/**
* Experimental logging of item (shunk) sizes, when json-stringified.
Expand Down Expand Up @@ -160,7 +160,7 @@ export class ProgressLogger<T> implements Disposable {
logRPS: true,
logEvery: 1000,
logSizesBuffer: 100_000,
batchSize: 1,
chunkSize: 1,
logger: console,
logProgress: cfg.logProgress !== false && cfg.logEvery !== 0,
...cfg,
Expand All @@ -174,7 +174,7 @@ export class ProgressLogger<T> implements Disposable {
cfg!: ProgressLoggerCfg<T> & {
logEvery: number
logSizesBuffer: number
batchSize: number
chunkSize: number
metric: string
logger: CommonLogger
}
Expand Down Expand Up @@ -230,7 +230,7 @@ export class ProgressLogger<T> implements Disposable {
const {
metric,
extra,
batchSize,
chunkSize,
heapUsed: logHeapUsed,
heapTotal: logHeapTotal,
rss: logRss,
Expand All @@ -245,9 +245,9 @@ export class ProgressLogger<T> implements Disposable {
const mem = process.memoryUsage()

const now = Date.now()
const batchedProgress = this.progress * batchSize
const batchedProgress = this.progress * chunkSize
const lastRPS =
(this.processedLastSecond * batchSize) / ((now - this.lastSecondStarted) / 1000) || 0
(this.processedLastSecond * chunkSize) / ((now - this.lastSecondStarted) / 1000) || 0
const rpsTotal = Math.round(batchedProgress / ((now - this.started) / 1000)) || 0
this.lastSecondStarted = now
this.processedLastSecond = 0
Expand Down
40 changes: 0 additions & 40 deletions src/stream/transform/transformBuffer.ts

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { Readable } from 'node:stream'
import { _range } from '@naturalcycles/js-lib'
import { writablePushToArray, _pipeline } from '../..'
import { transformBuffer } from './transformBuffer'
import { transformChunk } from './transformChunk'

test('transformBuffer', async () => {
const data = _range(1, 6).map(n => ({ id: String(n) }))
Expand All @@ -10,7 +10,7 @@ test('transformBuffer', async () => {

await _pipeline([
Readable.from(data),
transformBuffer({ batchSize: 2 }),
transformChunk({ chunkSize: 2 }),
writablePushToArray(data2),
])

Expand Down
44 changes: 44 additions & 0 deletions src/stream/transform/transformChunk.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import { Transform } from 'node:stream'
import { TransformOptions, TransformTyped } from '../stream.model'

export interface TransformChunkOptions extends TransformOptions {
/**
* How many items to include in each chunk.
* Last chunk will contain the remaining items, possibly less than chunkSize.
*/
chunkSize: number
}

/**
* Similar to RxJS bufferCount(),
* allows to "chunk" the input stream into chunks of `opt.chunkSize` size.
* Last chunk will contain the remaining items, possibly less than chunkSize.
*/
export function transformChunk<IN = any>(opt: TransformChunkOptions): TransformTyped<IN, IN[]> {
const { chunkSize } = opt

let buf: IN[] = []

return new Transform({
objectMode: true,
...opt,
transform(chunk, _, cb) {
buf.push(chunk)

if (buf.length >= chunkSize) {
cb(null, buf)
buf = []
} else {
cb()
}
},
final(this: Transform, cb) {
if (buf.length) {
this.push(buf)
buf = []
}

cb()
},
})
}

0 comments on commit 69e6445

Please sign in to comment.