From d95257ca607f300af0dbfd49da53967f9e23d794 Mon Sep 17 00:00:00 2001 From: David Nagy Date: Fri, 18 Oct 2024 16:35:17 +0200 Subject: [PATCH] chore: expose `cfg` --- src/redisHashKeyValueDB.ts | 31 +++++++++++++------------------ src/redisKeyValueDB.ts | 34 +++++++++++++++------------------- 2 files changed, 28 insertions(+), 37 deletions(-) diff --git a/src/redisHashKeyValueDB.ts b/src/redisHashKeyValueDB.ts index a01ede6..3cfdbdd 100644 --- a/src/redisHashKeyValueDB.ts +++ b/src/redisHashKeyValueDB.ts @@ -8,7 +8,6 @@ import { } from '@naturalcycles/db-lib' import { _chunk } from '@naturalcycles/js-lib' import { ReadableTyped } from '@naturalcycles/nodejs-lib' -import { RedisClient } from './redisClient' import { RedisKeyValueDBCfg } from './redisKeyValueDB' /** @@ -24,34 +23,30 @@ import { RedisKeyValueDBCfg } from './redisKeyValueDB' * this implementation can take over for RedisKeyValueDB. */ export class RedishHashKeyValueDB implements CommonKeyValueDB, AsyncDisposable { - constructor(cfg: RedisKeyValueDBCfg) { - this.client = cfg.client - } - - client: RedisClient + constructor(public cfg: RedisKeyValueDBCfg) {} support = { ...commonKeyValueDBFullSupport, } async ping(): Promise { - await this.client.ping() + await this.cfg.client.ping() } async [Symbol.asyncDispose](): Promise { - await this.client.disconnect() + await this.cfg.client.disconnect() } async getByIds(table: string, ids: string[]): Promise { if (!ids.length) return [] // we assume that the order of returned values is the same as order of input ids - const bufs = await this.client.hmgetBuffer(table, ids) + const bufs = await this.cfg.client.hmgetBuffer(table, ids) return bufs.map((buf, i) => [ids[i], buf] as KeyValueDBTuple).filter(([_k, v]) => v !== null) } async deleteByIds(table: string, ids: string[]): Promise { if (!ids.length) return - await this.client.hdel(table, ids) + await this.cfg.client.hdel(table, ids) } async saveBatch( @@ -64,14 +59,14 @@ export class RedishHashKeyValueDB implements CommonKeyValueDB, AsyncDisposable { const record = Object.fromEntries(entries) if (opt?.expireAt) { - await this.client.hsetWithTTL(table, record, opt.expireAt) + await this.cfg.client.hsetWithTTL(table, record, opt.expireAt) } else { - await this.client.hset(table, record) + await this.cfg.client.hset(table, record) } } streamIds(table: string, limit?: number): ReadableTyped { - const stream = this.client + const stream = this.cfg.client .hscanStream(table) .flatMap(keyValueList => { const keys: string[] = [] @@ -87,7 +82,7 @@ export class RedishHashKeyValueDB implements CommonKeyValueDB, AsyncDisposable { } streamValues(table: string, limit?: number): ReadableTyped { - return this.client + return this.cfg.client .hscanStream(table) .flatMap(keyValueList => { const values: string[] = [] @@ -101,7 +96,7 @@ export class RedishHashKeyValueDB implements CommonKeyValueDB, AsyncDisposable { } streamEntries(table: string, limit?: number): ReadableTyped { - return this.client + return this.cfg.client .hscanStream(table) .flatMap(keyValueList => { const entries = _chunk(keyValueList, 2) as [string, string][] @@ -113,16 +108,16 @@ export class RedishHashKeyValueDB implements CommonKeyValueDB, AsyncDisposable { } async count(table: string): Promise { - return await this.client.hscanCount(table) + return await this.cfg.client.hscanCount(table) } async incrementBatch(table: string, increments: IncrementTuple[]): Promise { - return await this.client.hincrBatch(table, increments) + return await this.cfg.client.hincrBatch(table, increments) } async createTable(table: string, opt?: CommonDBCreateOptions): Promise { if (!opt?.dropIfExists) return - await this.client.del([table]) + await this.cfg.client.del([table]) } } diff --git a/src/redisKeyValueDB.ts b/src/redisKeyValueDB.ts index 41a5365..908cbbc 100644 --- a/src/redisKeyValueDB.ts +++ b/src/redisKeyValueDB.ts @@ -15,34 +15,30 @@ export interface RedisKeyValueDBCfg { } export class RedisKeyValueDB implements CommonKeyValueDB, AsyncDisposable { - constructor(cfg: RedisKeyValueDBCfg) { - this.client = cfg.client - } - - client: RedisClient + constructor(public cfg: RedisKeyValueDBCfg) {} support = { ...commonKeyValueDBFullSupport, } async ping(): Promise { - await this.client.ping() + await this.cfg.client.ping() } async [Symbol.asyncDispose](): Promise { - await this.client.disconnect() + await this.cfg.client.disconnect() } async getByIds(table: string, ids: string[]): Promise { if (!ids.length) return [] // we assume that the order of returned values is the same as order of input ids - const bufs = await this.client.mgetBuffer(this.idsToKeys(table, ids)) + const bufs = await this.cfg.client.mgetBuffer(this.idsToKeys(table, ids)) return bufs.map((buf, i) => [ids[i], buf] as KeyValueDBTuple).filter(([_k, v]) => v !== null) } async deleteByIds(table: string, ids: string[]): Promise { if (!ids.length) return - await this.client.del(this.idsToKeys(table, ids)) + await this.cfg.client.del(this.idsToKeys(table, ids)) } async saveBatch( @@ -55,7 +51,7 @@ export class RedisKeyValueDB implements CommonKeyValueDB, AsyncDisposable { if (opt?.expireAt) { // There's no supported mset with TTL: https://stackoverflow.com/questions/16423342/redis-multi-set-with-a-ttl // so we gonna use a pipeline instead - await this.client.withPipeline(pipeline => { + await this.cfg.client.withPipeline(pipeline => { for (const [k, v] of entries) { pipeline.set(this.idToKey(table, k), v, 'EXAT', opt.expireAt!) } @@ -64,12 +60,12 @@ export class RedisKeyValueDB implements CommonKeyValueDB, AsyncDisposable { const obj: Record = Object.fromEntries( entries.map(([k, v]) => [this.idToKey(table, k), v]) as KeyValueDBTuple[], ) - await this.client.msetBuffer(obj) + await this.cfg.client.msetBuffer(obj) } } streamIds(table: string, limit?: number): ReadableTyped { - let stream = this.client + let stream = this.cfg.client .scanStream({ match: `${table}:*`, // count: limit, // count is actually a "batchSize", not a limit @@ -84,13 +80,13 @@ export class RedisKeyValueDB implements CommonKeyValueDB, AsyncDisposable { } streamValues(table: string, limit?: number): ReadableTyped { - return this.client + return this.cfg.client .scanStream({ match: `${table}:*`, }) .flatMap( async keys => { - return (await this.client.mgetBuffer(keys)).filter(_isTruthy) + return (await this.cfg.client.mgetBuffer(keys)).filter(_isTruthy) }, { concurrency: 16, @@ -100,14 +96,14 @@ export class RedisKeyValueDB implements CommonKeyValueDB, AsyncDisposable { } streamEntries(table: string, limit?: number): ReadableTyped { - return this.client + return this.cfg.client .scanStream({ match: `${table}:*`, }) .flatMap( async keys => { // casting as Buffer[], because values are expected to exist for given keys - const bufs = (await this.client.mgetBuffer(keys)) as Buffer[] + const bufs = (await this.cfg.client.mgetBuffer(keys)) as Buffer[] return _zip(this.keysToIds(table, keys), bufs) }, { @@ -119,7 +115,7 @@ export class RedisKeyValueDB implements CommonKeyValueDB, AsyncDisposable { async count(table: string): Promise { // todo: implement more efficiently, e.g via LUA? - return await this.client.scanCount({ + return await this.cfg.client.scanCount({ match: `${table}:*`, }) } @@ -128,7 +124,7 @@ export class RedisKeyValueDB implements CommonKeyValueDB, AsyncDisposable { const incrementTuplesWithInternalKeys = increments.map( ([id, v]) => [this.idToKey(table, id), v] as [string, number], ) - const resultsWithInternalKeys = await this.client.incrBatch(incrementTuplesWithInternalKeys) + const resultsWithInternalKeys = await this.cfg.client.incrBatch(incrementTuplesWithInternalKeys) const results = resultsWithInternalKeys.map( ([k, v]) => [this.keyToId(table, k), v] as IncrementTuple, ) @@ -138,7 +134,7 @@ export class RedisKeyValueDB implements CommonKeyValueDB, AsyncDisposable { async createTable(table: string, opt?: CommonDBCreateOptions): Promise { if (!opt?.dropIfExists) return - await this.client.dropTable(table) + await this.cfg.client.dropTable(table) } private idsToKeys(table: string, ids: string[]): string[] {