Skip to content

Commit

Permalink
feat: recursive implementation of combineFiles (#1)
Browse files Browse the repository at this point in the history
Also, refactor the constructors of CloudStorage and support passing a logger

Co-authored-by: kirillgroshkov <[email protected]>
  • Loading branch information
flo-monin and kirillgroshkov authored Apr 8, 2024
1 parent 151cca1 commit 0069586
Show file tree
Hide file tree
Showing 6 changed files with 143 additions and 29 deletions.
135 changes: 114 additions & 21 deletions src/cloudStorage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ import { Readable, Writable } from 'node:stream'
import { File, Storage, StorageOptions } from '@google-cloud/storage'
import {
_assert,
_chunk,
_since,
_substringAfterLast,
CommonLogger,
localTime,
LocalTimeInput,
pMap,
Expand All @@ -18,6 +21,9 @@ export {
type StorageOptions,
}

const MAX_RECURSION_DEPTH = 10
const BATCH_SIZE = 32

/**
* This object is intentionally made to NOT extend StorageOptions,
* because StorageOptions is complicated and provides just too many ways
Expand All @@ -29,9 +35,14 @@ export {
*/
export interface CloudStorageCfg {
/**
* It's optional, to allow automatic credentials in AppEngine, or GOOGLE_APPLICATION_CREDENTIALS.
* Default is console
*/
logger?: CommonLogger

/**
* Pass true for extra debugging
*/
credentials?: GCPServiceAccount
debug?: boolean
}

/**
Expand All @@ -40,29 +51,51 @@ export interface CloudStorageCfg {
* API: https://googleapis.dev/nodejs/storage/latest/index.html
*/
export class CloudStorage implements CommonStorage {
/**
* Passing the pre-created Storage allows to instantiate it from both
* GCP Storage and FirebaseStorage.
*/
constructor(public storage: Storage) {}
private constructor(
public storage: Storage,
cfg: CloudStorageCfg = {},
) {
this.cfg = {
logger: console,
...cfg,
}
}

static createFromGCPServiceAccount(cfg: CloudStorageCfg): CloudStorage {
cfg: CloudStorageCfg & {
logger: CommonLogger
}

static createFromGCPServiceAccount(
credentials?: GCPServiceAccount,
cfg?: CloudStorageCfg,
): CloudStorage {
const storage = new Storage({
credentials: cfg.credentials,
credentials,
// Explicitly passing it here to fix this error:
// Error: Unable to detect a Project Id in the current environment.
// To learn more about authentication and Google APIs, visit:
// https://cloud.google.com/docs/authentication/getting-started
// at /root/repo/node_modules/google-auth-library/build/src/auth/googleauth.js:95:31
projectId: cfg.credentials?.project_id,
projectId: credentials?.project_id,
})

return new CloudStorage(storage)
return new CloudStorage(storage, cfg)
}

static createFromStorageOptions(storageOptions?: StorageOptions): CloudStorage {
static createFromStorageOptions(
storageOptions?: StorageOptions,
cfg?: CloudStorageCfg,
): CloudStorage {
const storage = new Storage(storageOptions)
return new CloudStorage(storage)
return new CloudStorage(storage, cfg)
}

/**
* Passing the pre-created Storage allows to instantiate it from both
* GCP Storage and FirebaseStorage.
*/
static createFromStorage(storage: Storage, cfg?: CloudStorageCfg): CloudStorage {
return new CloudStorage(storage, cfg)
}

async ping(bucketName?: string): Promise<void> {
Expand Down Expand Up @@ -235,21 +268,81 @@ export class CloudStorage implements CommonStorage {
})
}

async combine(
async deleteFiles(bucketName: string, filePaths: string[]): Promise<void> {
await pMap(filePaths, async filePath => {
await this.storage.bucket(bucketName).file(filePath).delete()
})
}

async combineFiles(
bucketName: string,
filePaths: string[],
toPath: string,
toBucket?: string,
currentRecursionDepth = 0, // not to be set publicly, only used internally
): Promise<void> {
// todo: if (filePaths.length > 32) - use recursive algorithm
_assert(filePaths.length <= 32, 'combine supports up to 32 input files')
_assert(
currentRecursionDepth <= MAX_RECURSION_DEPTH,
`combineFiles reached max recursion depth of ${MAX_RECURSION_DEPTH}`,
)
const { logger, debug } = this.cfg

await this.storage
.bucket(bucketName)
.combine(filePaths, this.storage.bucket(toBucket || bucketName).file(toPath))
if (debug) {
logger.log(
`[${currentRecursionDepth}] Will compose ${filePaths.length} files, by batches of ${BATCH_SIZE}`,
)
}

const intermediateFiles: string[] = []

// Delete original files
await this.deletePaths(bucketName, filePaths)
if (filePaths.length <= BATCH_SIZE) {
await this.storage
.bucket(bucketName)
.combine(filePaths, this.storage.bucket(toBucket || bucketName).file(toPath))

if (debug) {
logger.log(`[${currentRecursionDepth}] Composed into ${toPath}!`)
}

await this.deleteFiles(bucketName, filePaths)
return
}

const started = Date.now()
await pMap(_chunk(filePaths, BATCH_SIZE), async (fileBatch, i) => {
if (debug) {
logger.log(`[${currentRecursionDepth}] Composing batch ${i + 1}...`)
}
const intermediateFile = `temp_${currentRecursionDepth}_${i}`
await this.storage
.bucket(bucketName)
.combine(fileBatch, this.storage.bucket(toBucket || bucketName).file(intermediateFile))
intermediateFiles.push(intermediateFile)
await this.deleteFiles(bucketName, fileBatch)
})
if (debug) {
logger.log(
`[${currentRecursionDepth}] Batch composed into ${intermediateFiles.length} files, in ${_since(started)}`,
)
}

await this.combineFiles(
toBucket || bucketName,
intermediateFiles,
toPath,
toBucket,
currentRecursionDepth + 1,
)
}

async combine(
bucketName: string,
prefix: string,
toPath: string,
toBucket?: string,
): Promise<void> {
const filePaths = await this.getFileNames(bucketName, { prefix })
await this.combineFiles(bucketName, filePaths, toPath, toBucket)
}

/**
Expand Down
12 changes: 11 additions & 1 deletion src/commonStorage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,11 @@ export interface CommonStorage {

deletePaths: (bucketName: string, prefixes: string[]) => Promise<void>

/**
* Should delete all files by their paths.
*/
deleteFiles: (bucketName: string, filePaths: string[]) => Promise<void>

/**
* Returns an array of strings which are file paths.
* Files that are not found by the path are not present in the map.
Expand Down Expand Up @@ -134,13 +139,18 @@ export interface CommonStorage {
*
* @experimental
*/
combine: (
combineFiles: (
bucketName: string,
filePaths: string[],
toPath: string,
toBucket?: string,
) => Promise<void>

/**
* Like `combineFiles`, but for a `prefix`.
*/
combine: (bucketName: string, prefix: string, toPath: string, toBucket?: string) => Promise<void>

/**
* Acquire a "signed url", which allows bearer to use it to download ('read') the file.
*
Expand Down
15 changes: 15 additions & 0 deletions src/inMemoryCommonStorage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@ export class InMemoryCommonStorage implements CommonStorage {
})
}

async deleteFiles(bucketName: string, filePaths: string[]): Promise<void> {
if (!this.data[bucketName]) return
filePaths.forEach(filePath => delete this.data[bucketName]![filePath])
}

async getFileNames(bucketName: string, opt: CommonStorageGetOptions = {}): Promise<string[]> {
const { prefix = '', fullPaths = true } = opt
return Object.keys(this.data[bucketName] || {})
Expand Down Expand Up @@ -156,6 +161,16 @@ export class InMemoryCommonStorage implements CommonStorage {
}

async combine(
bucketName: string,
prefix: string,
toPath: string,
toBucket?: string,
): Promise<void> {
const filePaths = await this.getFileNames(bucketName, { prefix })
await this.combineFiles(bucketName, filePaths, toPath, toBucket)
}

async combineFiles(
bucketName: string,
filePaths: string[],
toPath: string,
Expand Down
4 changes: 1 addition & 3 deletions src/test/cloudStorage.manual.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,7 @@ const { bucketName, GCP_SERVICE_ACCOUNT: serviceAccountStr } = requireEnvKeys(
)
const serviceAccount: GCPServiceAccount = JSON.parse(serviceAccountStr)

const storage = CloudStorage.createFromGCPServiceAccount({
credentials: serviceAccount,
})
const storage = CloudStorage.createFromGCPServiceAccount(serviceAccount)

// const TEST_FOLDER = 'test/subdir'
//
Expand Down
4 changes: 1 addition & 3 deletions src/test/commonStorageKeyValueDB.manual.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,7 @@ const { bucketName, GCP_SERVICE_ACCOUNT: serviceAccountStr } = requireEnvKeys(
)
const serviceAccount: GCPServiceAccount = JSON.parse(serviceAccountStr)

const storage = CloudStorage.createFromGCPServiceAccount({
credentials: serviceAccount,
})
const storage = CloudStorage.createFromGCPServiceAccount(serviceAccount)

const db = new CommonStorageKeyValueDB({
storage,
Expand Down
2 changes: 1 addition & 1 deletion src/test/firebaseStorage.manual.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ const app = admin.initializeApp({
// storageBucket: FIREBASE_BUCKET,
})

const storage = new CloudStorage(app.storage() as any)
const storage = CloudStorage.createFromStorage(app.storage() as any)

describe(`runCommonStorageTest`, () => runCommonStorageTest(storage, FIREBASE_BUCKET))

Expand Down

0 comments on commit 0069586

Please sign in to comment.