From ef4932952303219d7cac6375426fadf8848f9043 Mon Sep 17 00:00:00 2001 From: David Nagy Date: Fri, 18 Oct 2024 13:25:26 +0200 Subject: [PATCH] feat: use hashfields as table namespaces --- src/redisClient.ts | 6 +- src/redisKeyValueDB2.ts | 119 +++++++++++++++++++++++++++++++++ src/test/redis2.manual.test.ts | 72 ++++++++++++++++++++ 3 files changed, 194 insertions(+), 3 deletions(-) create mode 100644 src/redisKeyValueDB2.ts create mode 100644 src/test/redis2.manual.test.ts diff --git a/src/redisClient.ts b/src/redisClient.ts index 9b4f157..e4fd6b6 100644 --- a/src/redisClient.ts +++ b/src/redisClient.ts @@ -271,7 +271,7 @@ export class RedisClient implements CommonClient { Convenient type-safe wrapper. Returns BATCHES of keys in each iteration (as-is). */ - scanStream(opt: ScanStreamOptions): ReadableTyped { + scanStream(opt?: ScanStreamOptions): ReadableTyped { return this.redis().scanStream(opt) } @@ -294,11 +294,11 @@ export class RedisClient implements CommonClient { return count } - hscanStream(key: string, opt: ScanStreamOptions): ReadableTyped { + hscanStream(key: string, opt?: ScanStreamOptions): ReadableTyped { return this.redis().hscanStream(key, opt) } - async hscanCount(key: string, opt: ScanStreamOptions): Promise { + async hscanCount(key: string, opt?: ScanStreamOptions): Promise { let count = 0 const stream = this.redis().hscanStream(key, opt) diff --git a/src/redisKeyValueDB2.ts b/src/redisKeyValueDB2.ts new file mode 100644 index 0000000..b8cb228 --- /dev/null +++ b/src/redisKeyValueDB2.ts @@ -0,0 +1,119 @@ +import { + CommonDBCreateOptions, + CommonKeyValueDB, + commonKeyValueDBFullSupport, + CommonKeyValueDBSaveBatchOptions, + IncrementTuple, + KeyValueDBTuple, +} from '@naturalcycles/db-lib' +import { _chunk } from '@naturalcycles/js-lib' +import { ReadableTyped } from '@naturalcycles/nodejs-lib' +import { RedisClient } from './redisClient' + +export interface RedisKeyValueDBCfg { + client: RedisClient +} + +export class RedisKeyValueDB2 implements CommonKeyValueDB, AsyncDisposable { + constructor(cfg: RedisKeyValueDBCfg) { + this.client = cfg.client + } + + client: RedisClient + + support = { + ...commonKeyValueDBFullSupport, + } + + async ping(): Promise { + await this.client.ping() + } + + async [Symbol.asyncDispose](): Promise { + await this.client.disconnect() + } + + async getByIds(table: string, ids: string[]): Promise { + if (!ids.length) return [] + // we assume that the order of returned values is the same as order of input ids + const bufs = await this.client.hmgetBuffer(table, ids) + return bufs.map((buf, i) => [ids[i], buf] as KeyValueDBTuple).filter(([_k, v]) => v !== null) + } + + async deleteByIds(table: string, ids: string[]): Promise { + if (!ids.length) return + await this.client.hdel(table, ids) + } + + async saveBatch( + table: string, + entries: KeyValueDBTuple[], + opt?: CommonKeyValueDBSaveBatchOptions, + ): Promise { + if (!entries.length) return + + const record = Object.fromEntries(entries) + + if (opt?.expireAt) { + await this.client.hsetWithTTL(table, record, opt.expireAt) + } else { + await this.client.hset(table, record) + } + } + + streamIds(table: string, limit?: number): ReadableTyped { + const stream = this.client + .hscanStream(table) + .flatMap(keyValueList => { + const keys: string[] = [] + keyValueList.forEach((keyOrValue, index) => { + if (index % 2 !== 0) return + keys.push(keyOrValue) + }) + return keys + }) + .take(limit || Infinity) + + return stream + } + + streamValues(table: string, limit?: number): ReadableTyped { + return this.client + .hscanStream(table) + .flatMap(keyValueList => { + const values: string[] = [] + keyValueList.forEach((keyOrValue, index) => { + if (index % 2 !== 1) return + values.push(keyOrValue) + }) + return values.map(v => Buffer.from(v)) + }) + .take(limit || Infinity) + } + + streamEntries(table: string, limit?: number): ReadableTyped { + return this.client + .hscanStream(table) + .flatMap(keyValueList => { + const entries = _chunk(keyValueList, 2) as [string, string][] + return entries.map(([k, v]) => { + return [k, Buffer.from(String(v))] satisfies KeyValueDBTuple + }) + }) + .take(limit || Infinity) + } + + async count(table: string): Promise { + return await this.client.hscanCount(table) + } + + async incrementBatch(table: string, increments: IncrementTuple[]): Promise { + return await this.client.hincrBatch(table, increments) + } + + async createTable(table: string, opt?: CommonDBCreateOptions): Promise { + if (!opt?.dropIfExists) return + + await this.client.del([table]) + } +} diff --git a/src/test/redis2.manual.test.ts b/src/test/redis2.manual.test.ts new file mode 100644 index 0000000..aa3e4bf --- /dev/null +++ b/src/test/redis2.manual.test.ts @@ -0,0 +1,72 @@ +import { CommonKeyValueDao, CommonKeyValueDaoMemoCache } from '@naturalcycles/db-lib' +import { runCommonKeyValueDBTest, TEST_TABLE } from '@naturalcycles/db-lib/dist/testing' +import { runCommonKeyValueDaoTest } from '@naturalcycles/db-lib/dist/testing/keyValueDaoTest' +import { KeyValueDBTuple } from '@naturalcycles/db-lib/src/kv/commonKeyValueDB' +import { _AsyncMemo, _range, localTime, pDelay } from '@naturalcycles/js-lib' +import { RedisClient } from '../redisClient' +import { RedisKeyValueDB2 } from '../redisKeyValueDB2' + +const client = new RedisClient() +const db = new RedisKeyValueDB2({ client }) + +const dao = new CommonKeyValueDao({ + db, + table: TEST_TABLE, +}) + +afterAll(async () => { + await client.disconnect() +}) + +test('connect', async () => { + await db.ping() +}) + +describe('runCommonKeyValueDBTest', () => runCommonKeyValueDBTest(db)) +describe('runCommonKeyValueDaoTest', () => runCommonKeyValueDaoTest(db)) + +// Saving expiring hash fields is not supported until Redis 7.4.0 +test.skip('saveBatch with EXAT', async () => { + const testIds = _range(1, 4).map(n => `id${n}`) + const testEntries: KeyValueDBTuple[] = testIds.map(id => [id, Buffer.from(`${id}value`)]) + + await db.saveBatch(TEST_TABLE, testEntries, { + expireAt: localTime.now().plus(1, 'second').unix, + }) + let loaded = await db.getByIds(TEST_TABLE, testIds) + expect(loaded.length).toBe(3) + await pDelay(2000) + loaded = await db.getByIds(TEST_TABLE, testIds) + expect(loaded.length).toBe(0) +}) + +class C { + @_AsyncMemo({ + cacheFactory: () => + new CommonKeyValueDaoMemoCache({ + dao, + ttl: 1, + }), + }) + async get(k: string): Promise { + console.log(`get ${k}`) + return Buffer.from(k) + } +} + +const c = new C() + +test('CommonKeyValueDaoMemoCache serial', async () => { + for (const _ of _range(10)) { + console.log(await c.get('key')) + await pDelay(100) + } +}) + +test('CommonKeyValueDaoMemoCache async swarm', async () => { + await Promise.all( + _range(30).map(async () => { + console.log(await c.get('key')) + }), + ) +})