Skip to content

Commit

Permalink
feat: fs2.createReadStreamAsNDJSON
Browse files Browse the repository at this point in the history
fs2.createWriteStreamAsNDJSON
Which are also optimized (~20% faster read).
Remove less useful (aka "never used") ndjson helpers,
because fs2.* is more convenient now.

Remove `binary-split` dependency, vendor/adopt its code,
optimize for `\n` splitting (versus generic variable length splitting).
  • Loading branch information
kirillgroshkov committed Apr 27, 2024
1 parent fd53324 commit cfc70cc
Show file tree
Hide file tree
Showing 17 changed files with 273 additions and 324 deletions.
1 change: 0 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
"ajv": "^8.6.2",
"ajv-formats": "^3.0.1",
"ajv-keywords": "^5.0.0",
"binary-split": "^1.0.5",
"chalk": "^4.0.0",
"debug": "^4.1.1",
"dotenv": "^16.0.0",
Expand Down
34 changes: 34 additions & 0 deletions scripts/ndjsonParseSpeed.script.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
yarn tsn ndjsonParseSpeed
*/

import {
_pipeline,
fs2,
requireEnvKeys,
runScript,
transformLogProgress,
transformMap,
} from '../src'

const { SNAPSHOTS_DIR, SNAPSHOT_ID } = requireEnvKeys('SNAPSHOTS_DIR', 'SNAPSHOT_ID')

runScript(async () => {
const filePath = `${SNAPSHOTS_DIR}/${SNAPSHOT_ID}`
const outputFilePath = `${SNAPSHOTS_DIR}/${SNAPSHOT_ID}_out.ndjson.gz`
console.log({ filePath, outputFilePath })
let keys = 0

await _pipeline([
fs2.createReadStreamAsNDJSON(filePath).take(10_000),
transformMap<any, any>(async fu => {
keys += Object.keys(fu || {}).length // just to do some work
return fu
}),
transformLogProgress({ logEvery: 1000, extra: () => ({ keys }) }),
// writableVoid(),
fs2.createWriteStreamAsNDJSON(outputFilePath),
])
})
72 changes: 72 additions & 0 deletions src/fs/fs2.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,13 @@ import type { RmOptions } from 'node:fs'
import fs from 'node:fs'
import fsp from 'node:fs/promises'
import path from 'node:path'
import { createGzip, createUnzip } from 'node:zlib'
import { _jsonParse } from '@naturalcycles/js-lib'
import yaml, { DumpOptions } from 'js-yaml'
import { transformToNDJson } from '../stream/ndjson/transformToNDJson'
import { ReadableTyped, WritableTyped } from '../stream/stream.model'
import { transformSplitOnNewline } from '../stream/transform/transformSplit'
import { requireFileToExist } from '../util/env.util'

/**
* fs2 conveniently groups filesystem functions together.
Expand Down Expand Up @@ -305,6 +310,73 @@ class FS2 {
readdirAsync = fsp.readdir
createWriteStream = fs.createWriteStream
createReadStream = fs.createReadStream

/*
Returns a Readable of [already parsed] NDJSON objects.
Replaces a list of operations:
- requireFileToExist(inputPath)
- fs.createReadStream
- createUnzip (only if path ends with '.gz')
- transformSplitOnNewline
- transformJsonParse
To add a Limit or Offset: just add .take() or .drop(), example:
_pipeline([
fs2.createReadStreamAsNDJSON().take(100),
transformX(),
])
*/
createReadStreamAsNDJSON<ROW = any>(inputPath: string): ReadableTyped<ROW> {
requireFileToExist(inputPath)

let stream: ReadableTyped<ROW> = fs
.createReadStream(inputPath, {
highWaterMark: 64 * 1024, // no observed speedup
})
.on('error', err => stream.emit('error', err))

if (inputPath.endsWith('.gz')) {
stream = stream.pipe(
createUnzip({
chunkSize: 64 * 1024, // speedup from ~3200 to 3800 rps!
}),
)
}

return stream.pipe(transformSplitOnNewline()).map(line => JSON.parse(line))
// For some crazy reason .map is much faster than transformJsonParse!
// ~5000 vs ~4000 rps !!!
// .on('error', err => stream.emit('error', err))
// .pipe(transformJsonParse<ROW>())
}

/*
Returns a Writable.
Replaces a list of operations:
- transformToNDJson
- createGzip (only if path ends with '.gz')
- fs.createWriteStream
*/
createWriteStreamAsNDJSON(outputPath: string): WritableTyped<any> {
const transform1 = transformToNDJson()
let transform = transform1
if (outputPath.endsWith('.gz')) {
transform = transform.pipe(
createGzip({
// chunkSize: 64 * 1024, // no observed speedup
}),
)
}
transform.pipe(
fs.createWriteStream(outputPath, {
// highWaterMark: 64 * 1024, // no observed speedup
}),
)
return transform1
}
}

export const fs2 = new FS2()
Expand Down
6 changes: 0 additions & 6 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,8 @@ export * from './log/log.util'
export * from './slack/slack.service'
export * from './slack/slack.service.model'
export * from './stream/ndjson/ndjson.model'
export * from './stream/ndjson/ndJsonFileRead'
export * from './stream/ndjson/ndJsonFileWrite'
export * from './stream/ndjson/ndjsonMap'
export * from './stream/ndjson/ndjsonStreamForEach'
export * from './stream/ndjson/pipelineFromNDJsonFile'
export * from './stream/ndjson/pipelineToNDJsonFile'
export * from './stream/ndjson/streamToNDJsonFile'
export * from './stream/ndjson/transformJsonParse'
export * from './stream/ndjson/transformToNDJson'
export * from './stream/pipeline/pipeline'
Expand All @@ -56,7 +51,6 @@ export * from './stream/transform/transformMapSync'
export * from './stream/transform/transformSplit'
export * from './stream/transform/transformTap'
export * from './stream/transform/transformToArray'
export * from './stream/transform/transformToString'
export * from './stream/transform/transformTee'
export * from './stream/transform/worker/baseWorkerClass'
export * from './stream/transform/worker/transformMultiThreaded'
Expand Down
56 changes: 0 additions & 56 deletions src/stream/ndjson/ndJsonFile.test.ts

This file was deleted.

15 changes: 0 additions & 15 deletions src/stream/ndjson/ndJsonFileRead.ts

This file was deleted.

12 changes: 0 additions & 12 deletions src/stream/ndjson/ndJsonFileWrite.ts

This file was deleted.

24 changes: 5 additions & 19 deletions src/stream/ndjson/ndjsonMap.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,12 @@
import { createReadStream, createWriteStream } from 'node:fs'
import { createGzip, createUnzip } from 'node:zlib'
import { AbortableAsyncMapper, ErrorMode } from '@naturalcycles/js-lib'
import {
requireFileToExist,
transformJsonParse,
transformLimit,
transformLogProgress,
transformMap,
TransformMapOptions,
transformSplit,
transformToNDJson,
_pipeline,
TransformLogProgressOptions,
fs2,
} from '../..'

export interface NDJSONMapOptions<IN = any, OUT = IN>
Expand Down Expand Up @@ -46,24 +41,17 @@ export async function ndjsonMap<IN = any, OUT = any>(
): Promise<void> {
const { inputFilePath, outputFilePath, logEveryOutput = 100_000, limitInput, limitOutput } = opt

requireFileToExist(inputFilePath)

console.log({
inputFilePath,
outputFilePath,
})

const transformUnzip = inputFilePath.endsWith('.gz') ? [createUnzip()] : []
const transformZip = outputFilePath.endsWith('.gz') ? [createGzip()] : []

const readable = createReadStream(inputFilePath)
const readable = fs2
.createReadStreamAsNDJSON(inputFilePath)
.take(limitInput || Number.POSITIVE_INFINITY)

await _pipeline([
readable,
...transformUnzip,
transformSplit(), // splits by \n
transformJsonParse(),
transformLimit({ limit: limitInput, sourceReadable: readable }),
transformLogProgress({ metric: 'read', ...opt }),
transformMap(mapper, {
flattenArrayOutput: true,
Expand All @@ -72,8 +60,6 @@ export async function ndjsonMap<IN = any, OUT = any>(
}),
transformLimit({ limit: limitOutput, sourceReadable: readable }),
transformLogProgress({ metric: 'saved', logEvery: logEveryOutput }),
transformToNDJson(),
...transformZip,
createWriteStream(outputFilePath),
fs2.createWriteStreamAsNDJSON(outputFilePath),
])
}
15 changes: 2 additions & 13 deletions src/stream/ndjson/ndjsonStreamForEach.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,12 @@
import fs from 'node:fs'
import { createUnzip } from 'node:zlib'
import { AbortableAsyncMapper, ErrorMode } from '@naturalcycles/js-lib'
import {
requireFileToExist,
transformJsonParse,
transformLogProgress,
TransformLogProgressOptions,
transformMap,
TransformMapOptions,
transformSplit,
writableVoid,
_pipeline,
fs2,
} from '../..'

export interface NDJSONStreamForEachOptions<IN = any>
Expand All @@ -26,15 +22,8 @@ export async function ndjsonStreamForEach<T>(
mapper: AbortableAsyncMapper<T, void>,
opt: NDJSONStreamForEachOptions<T>,
): Promise<void> {
requireFileToExist(opt.inputFilePath)

const transformUnzip = opt.inputFilePath.endsWith('.gz') ? [createUnzip()] : []

await _pipeline([
fs.createReadStream(opt.inputFilePath),
...transformUnzip,
transformSplit(),
transformJsonParse(),
fs2.createReadStreamAsNDJSON(opt.inputFilePath),
transformMap<T, any>(mapper, {
errorMode: ErrorMode.THROW_AGGREGATED,
...opt,
Expand Down
62 changes: 0 additions & 62 deletions src/stream/ndjson/pipelineFromNDJsonFile.ts

This file was deleted.

Loading

0 comments on commit cfc70cc

Please sign in to comment.