From 0069586c30bc04aa1eff0efeb5718c2280a62987 Mon Sep 17 00:00:00 2001 From: Florent Monin <130468717+flo-monin@users.noreply.github.com> Date: Mon, 8 Apr 2024 17:06:41 +0200 Subject: [PATCH] feat: recursive implementation of combineFiles (#1) Also, refactor the constructors of CloudStorage and support passing a logger Co-authored-by: kirillgroshkov --- src/cloudStorage.ts | 135 +++++++++++++++--- src/commonStorage.ts | 12 +- src/inMemoryCommonStorage.ts | 15 ++ src/test/cloudStorage.manual.test.ts | 4 +- .../commonStorageKeyValueDB.manual.test.ts | 4 +- src/test/firebaseStorage.manual.test.ts | 2 +- 6 files changed, 143 insertions(+), 29 deletions(-) diff --git a/src/cloudStorage.ts b/src/cloudStorage.ts index fd32cb8..bfe2477 100644 --- a/src/cloudStorage.ts +++ b/src/cloudStorage.ts @@ -2,7 +2,10 @@ import { Readable, Writable } from 'node:stream' import { File, Storage, StorageOptions } from '@google-cloud/storage' import { _assert, + _chunk, + _since, _substringAfterLast, + CommonLogger, localTime, LocalTimeInput, pMap, @@ -18,6 +21,9 @@ export { type StorageOptions, } +const MAX_RECURSION_DEPTH = 10 +const BATCH_SIZE = 32 + /** * This object is intentionally made to NOT extend StorageOptions, * because StorageOptions is complicated and provides just too many ways @@ -29,9 +35,14 @@ export { */ export interface CloudStorageCfg { /** - * It's optional, to allow automatic credentials in AppEngine, or GOOGLE_APPLICATION_CREDENTIALS. + * Default is console + */ + logger?: CommonLogger + + /** + * Pass true for extra debugging */ - credentials?: GCPServiceAccount + debug?: boolean } /** @@ -40,29 +51,51 @@ export interface CloudStorageCfg { * API: https://googleapis.dev/nodejs/storage/latest/index.html */ export class CloudStorage implements CommonStorage { - /** - * Passing the pre-created Storage allows to instantiate it from both - * GCP Storage and FirebaseStorage. - */ - constructor(public storage: Storage) {} + private constructor( + public storage: Storage, + cfg: CloudStorageCfg = {}, + ) { + this.cfg = { + logger: console, + ...cfg, + } + } - static createFromGCPServiceAccount(cfg: CloudStorageCfg): CloudStorage { + cfg: CloudStorageCfg & { + logger: CommonLogger + } + + static createFromGCPServiceAccount( + credentials?: GCPServiceAccount, + cfg?: CloudStorageCfg, + ): CloudStorage { const storage = new Storage({ - credentials: cfg.credentials, + credentials, // Explicitly passing it here to fix this error: // Error: Unable to detect a Project Id in the current environment. // To learn more about authentication and Google APIs, visit: // https://cloud.google.com/docs/authentication/getting-started // at /root/repo/node_modules/google-auth-library/build/src/auth/googleauth.js:95:31 - projectId: cfg.credentials?.project_id, + projectId: credentials?.project_id, }) - return new CloudStorage(storage) + return new CloudStorage(storage, cfg) } - static createFromStorageOptions(storageOptions?: StorageOptions): CloudStorage { + static createFromStorageOptions( + storageOptions?: StorageOptions, + cfg?: CloudStorageCfg, + ): CloudStorage { const storage = new Storage(storageOptions) - return new CloudStorage(storage) + return new CloudStorage(storage, cfg) + } + + /** + * Passing the pre-created Storage allows to instantiate it from both + * GCP Storage and FirebaseStorage. + */ + static createFromStorage(storage: Storage, cfg?: CloudStorageCfg): CloudStorage { + return new CloudStorage(storage, cfg) } async ping(bucketName?: string): Promise { @@ -235,21 +268,81 @@ export class CloudStorage implements CommonStorage { }) } - async combine( + async deleteFiles(bucketName: string, filePaths: string[]): Promise { + await pMap(filePaths, async filePath => { + await this.storage.bucket(bucketName).file(filePath).delete() + }) + } + + async combineFiles( bucketName: string, filePaths: string[], toPath: string, toBucket?: string, + currentRecursionDepth = 0, // not to be set publicly, only used internally ): Promise { - // todo: if (filePaths.length > 32) - use recursive algorithm - _assert(filePaths.length <= 32, 'combine supports up to 32 input files') + _assert( + currentRecursionDepth <= MAX_RECURSION_DEPTH, + `combineFiles reached max recursion depth of ${MAX_RECURSION_DEPTH}`, + ) + const { logger, debug } = this.cfg - await this.storage - .bucket(bucketName) - .combine(filePaths, this.storage.bucket(toBucket || bucketName).file(toPath)) + if (debug) { + logger.log( + `[${currentRecursionDepth}] Will compose ${filePaths.length} files, by batches of ${BATCH_SIZE}`, + ) + } + + const intermediateFiles: string[] = [] - // Delete original files - await this.deletePaths(bucketName, filePaths) + if (filePaths.length <= BATCH_SIZE) { + await this.storage + .bucket(bucketName) + .combine(filePaths, this.storage.bucket(toBucket || bucketName).file(toPath)) + + if (debug) { + logger.log(`[${currentRecursionDepth}] Composed into ${toPath}!`) + } + + await this.deleteFiles(bucketName, filePaths) + return + } + + const started = Date.now() + await pMap(_chunk(filePaths, BATCH_SIZE), async (fileBatch, i) => { + if (debug) { + logger.log(`[${currentRecursionDepth}] Composing batch ${i + 1}...`) + } + const intermediateFile = `temp_${currentRecursionDepth}_${i}` + await this.storage + .bucket(bucketName) + .combine(fileBatch, this.storage.bucket(toBucket || bucketName).file(intermediateFile)) + intermediateFiles.push(intermediateFile) + await this.deleteFiles(bucketName, fileBatch) + }) + if (debug) { + logger.log( + `[${currentRecursionDepth}] Batch composed into ${intermediateFiles.length} files, in ${_since(started)}`, + ) + } + + await this.combineFiles( + toBucket || bucketName, + intermediateFiles, + toPath, + toBucket, + currentRecursionDepth + 1, + ) + } + + async combine( + bucketName: string, + prefix: string, + toPath: string, + toBucket?: string, + ): Promise { + const filePaths = await this.getFileNames(bucketName, { prefix }) + await this.combineFiles(bucketName, filePaths, toPath, toBucket) } /** diff --git a/src/commonStorage.ts b/src/commonStorage.ts index 76b916c..b33467d 100644 --- a/src/commonStorage.ts +++ b/src/commonStorage.ts @@ -69,6 +69,11 @@ export interface CommonStorage { deletePaths: (bucketName: string, prefixes: string[]) => Promise + /** + * Should delete all files by their paths. + */ + deleteFiles: (bucketName: string, filePaths: string[]) => Promise + /** * Returns an array of strings which are file paths. * Files that are not found by the path are not present in the map. @@ -134,13 +139,18 @@ export interface CommonStorage { * * @experimental */ - combine: ( + combineFiles: ( bucketName: string, filePaths: string[], toPath: string, toBucket?: string, ) => Promise + /** + * Like `combineFiles`, but for a `prefix`. + */ + combine: (bucketName: string, prefix: string, toPath: string, toBucket?: string) => Promise + /** * Acquire a "signed url", which allows bearer to use it to download ('read') the file. * diff --git a/src/inMemoryCommonStorage.ts b/src/inMemoryCommonStorage.ts index 2a35145..9153160 100644 --- a/src/inMemoryCommonStorage.ts +++ b/src/inMemoryCommonStorage.ts @@ -54,6 +54,11 @@ export class InMemoryCommonStorage implements CommonStorage { }) } + async deleteFiles(bucketName: string, filePaths: string[]): Promise { + if (!this.data[bucketName]) return + filePaths.forEach(filePath => delete this.data[bucketName]![filePath]) + } + async getFileNames(bucketName: string, opt: CommonStorageGetOptions = {}): Promise { const { prefix = '', fullPaths = true } = opt return Object.keys(this.data[bucketName] || {}) @@ -156,6 +161,16 @@ export class InMemoryCommonStorage implements CommonStorage { } async combine( + bucketName: string, + prefix: string, + toPath: string, + toBucket?: string, + ): Promise { + const filePaths = await this.getFileNames(bucketName, { prefix }) + await this.combineFiles(bucketName, filePaths, toPath, toBucket) + } + + async combineFiles( bucketName: string, filePaths: string[], toPath: string, diff --git a/src/test/cloudStorage.manual.test.ts b/src/test/cloudStorage.manual.test.ts index 05d3437..b0f8c13 100644 --- a/src/test/cloudStorage.manual.test.ts +++ b/src/test/cloudStorage.manual.test.ts @@ -9,9 +9,7 @@ const { bucketName, GCP_SERVICE_ACCOUNT: serviceAccountStr } = requireEnvKeys( ) const serviceAccount: GCPServiceAccount = JSON.parse(serviceAccountStr) -const storage = CloudStorage.createFromGCPServiceAccount({ - credentials: serviceAccount, -}) +const storage = CloudStorage.createFromGCPServiceAccount(serviceAccount) // const TEST_FOLDER = 'test/subdir' // diff --git a/src/test/commonStorageKeyValueDB.manual.test.ts b/src/test/commonStorageKeyValueDB.manual.test.ts index d2b80cc..387fc01 100644 --- a/src/test/commonStorageKeyValueDB.manual.test.ts +++ b/src/test/commonStorageKeyValueDB.manual.test.ts @@ -10,9 +10,7 @@ const { bucketName, GCP_SERVICE_ACCOUNT: serviceAccountStr } = requireEnvKeys( ) const serviceAccount: GCPServiceAccount = JSON.parse(serviceAccountStr) -const storage = CloudStorage.createFromGCPServiceAccount({ - credentials: serviceAccount, -}) +const storage = CloudStorage.createFromGCPServiceAccount(serviceAccount) const db = new CommonStorageKeyValueDB({ storage, diff --git a/src/test/firebaseStorage.manual.test.ts b/src/test/firebaseStorage.manual.test.ts index 49b6ba5..b8ba1aa 100644 --- a/src/test/firebaseStorage.manual.test.ts +++ b/src/test/firebaseStorage.manual.test.ts @@ -15,7 +15,7 @@ const app = admin.initializeApp({ // storageBucket: FIREBASE_BUCKET, }) -const storage = new CloudStorage(app.storage() as any) +const storage = CloudStorage.createFromStorage(app.storage() as any) describe(`runCommonStorageTest`, () => runCommonStorageTest(storage, FIREBASE_BUCKET))