From 65225665d31ea6873a9102599cd993501c4d8938 Mon Sep 17 00:00:00 2001 From: David Nagy Date: Wed, 2 Oct 2024 12:26:11 +0200 Subject: [PATCH] feat: add `RedisHashKeyValueDB` (#16) --- src/index.ts | 1 + src/redisClient.manual.test.ts | 132 ++++++++++++++++++++++++++ src/redisClient.ts | 63 ++++++++++++- src/redisHashKeyValueDB.ts | 144 +++++++++++++++++++++++++++++ src/redisKeyValueDB.ts | 4 + src/test/redis.hash.manual.test.ts | 40 ++++++++ yarn.lock | 18 ++-- 7 files changed, 391 insertions(+), 11 deletions(-) create mode 100644 src/redisClient.manual.test.ts create mode 100644 src/redisHashKeyValueDB.ts create mode 100644 src/test/redis.hash.manual.test.ts diff --git a/src/index.ts b/src/index.ts index 377f9f2..8c758b2 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,2 +1,3 @@ export * from './redisClient' export * from './redisKeyValueDB' +export * from './redisHashKeyValueDB' diff --git a/src/redisClient.manual.test.ts b/src/redisClient.manual.test.ts new file mode 100644 index 0000000..37b42bd --- /dev/null +++ b/src/redisClient.manual.test.ts @@ -0,0 +1,132 @@ +import { localTime } from '@naturalcycles/js-lib' +import { RedisClient } from './redisClient' + +let client: RedisClient + +beforeAll(() => { + client = new RedisClient() +}) + +beforeEach(async () => { + await client.dropTable('test') +}) + +afterAll(async () => { + await client.dropTable('test') + await client.disconnect() +}) + +describe('hashmap functions', () => { + test('hset should save a map', async () => { + await client.hset('test:key', { foo: 'bar' }) + + const result = await client.hgetall('test:key') + + expect(result).toEqual({ foo: 'bar' }) + }) + + test('should store/fetch numbers as strings', async () => { + await client.hset('test:key', { one: 1 }) + + const result = await client.hgetall('test:key') + + expect(result).toEqual({ one: '1' }) + }) + + test('hgetall should not fetch nested objects', async () => { + await client.hset('test:key', { nested: { one: 1 } }) + + const result = await client.hgetall('test:key') + + expect(result).toEqual({ nested: '[object Object]' }) + }) + + test('hget should fetch map property', async () => { + await client.hset('test:key', { foo: 'bar' }) + + const result = await client.hget('test:key', 'foo') + + expect(result).toBe('bar') + }) + + test('hget should fetch value as string', async () => { + await client.hset('test:key', { one: 1 }) + + const result = await client.hget('test:key', 'one') + + expect(result).toBe('1') + }) + + test('hmgetBuffer should get the values of the fields as strings', async () => { + await client.hset('test:key', { one: 1, two: 2, three: 3 }) + + const result = await client.hmget('test:key', ['one', 'three']) + + expect(result).toEqual(['1', '3']) + }) + + test('hmgetBuffer should get the values of the fields as buffers', async () => { + await client.hset('test:key', { one: 1, two: 2, three: 3 }) + + const result = await client.hmgetBuffer('test:key', ['one', 'three']) + + expect(result).toEqual([Buffer.from('1'), Buffer.from('3')]) + }) + + test('hincr should change the value and return with a numeric result', async () => { + await client.hset('test:key', { one: 1 }) + + const result = await client.hincr('test:key', 'one', -2) + + expect(result).toBe(-1) + }) + + test('hincr should increase the value by 1 by default', async () => { + await client.hset('test:key', { one: 1 }) + + const result = await client.hincr('test:key', 'one') + + expect(result).toBe(2) + }) + + test('hincr should set the value to 1 for a non-existing field', async () => { + const result = await client.hincr('test:key', 'one') + + expect(result).toBe(1) + }) + + test('hscanCount should return the number of keys in the hash', async () => { + await client.hset('test:key', { one: 1, two: 2, three: 3 }) + + const result = await client.hscanCount('test:key', {}) + + expect(result).toBe(3) + }) + + test('hscanCount with a match pattern should return the number of matching keys in the hash', async () => { + await client.hset('test:key', { one: 1, two: 2, three: 3 }) + + const result = await client.hscanCount('test:key', { match: 't*' }) + + expect(result).toBe(2) + }) + + test('hdel should delete a fields from the hash', async () => { + await client.hset('test:key', { one: 1, two: 2, three: 3 }) + + await client.hdel('test:key', ['two', 'three']) + + const result = await client.hgetall('test:key') + expect(result).toEqual({ one: '1' }) + }) + + test('hsetWithTTL should set the fields with expiry', async () => { + const now = localTime.now().unix + + await client.hsetWithTTL('test:key', { foo1: 'bar' }, now + 1000) + await client.hsetWithTTL('test:key', { foo2: 'bar' }, now - 1) + + const result = await client.hgetall('test:key') + expect(result).toEqual({ foo1: 'bar' }) + }) +}) diff --git a/src/redisClient.ts b/src/redisClient.ts index 12dfa6e..5130cd3 100644 --- a/src/redisClient.ts +++ b/src/redisClient.ts @@ -1,4 +1,5 @@ import { + AnyObject, CommonLogger, NullableBuffer, NullableString, @@ -124,6 +125,38 @@ export class RedisClient implements CommonClient { await this.redis().set(key, value) } + async hgetall = Record>( + key: string, + ): Promise { + const result = await this.redis().hgetall(key) + if (Object.keys(result).length === 0) return null + return result as T + } + + async hget(key: string, field: string): Promise { + return await this.redis().hget(key, field) + } + + async hset(key: string, value: AnyObject): Promise { + await this.redis().hset(key, value) + } + + async hdel(key: string, fields: string[]): Promise { + await this.redis().hdel(key, ...fields) + } + + async hmget(key: string, fields: string[]): Promise { + return await this.redis().hmget(key, ...fields) + } + + async hmgetBuffer(key: string, fields: string[]): Promise { + return await this.redis().hmgetBuffer(key, ...fields) + } + + async hincr(key: string, field: string, increment: number = 1): Promise { + return await this.redis().hincrby(key, field, increment) + } + async setWithTTL( key: string, value: string | number | Buffer, @@ -132,6 +165,16 @@ export class RedisClient implements CommonClient { await this.redis().set(key, value, 'EXAT', expireAt) } + async hsetWithTTL(key: string, value: AnyObject, expireAt: UnixTimestampNumber): Promise { + const valueKeys = Object.keys(value) + const numberOfKeys = valueKeys.length + const keyList = valueKeys.join(' ') + const commandString = `HEXPIREAT ${key} ${expireAt} FIELDS ${numberOfKeys} ${keyList}` + const [command, ...args] = commandString.split(' ') + await this.redis().hset(key, value) + await this.redis().call(command!, args) + } + async mset(obj: Record): Promise { await this.redis().mset(obj) } @@ -140,8 +183,8 @@ export class RedisClient implements CommonClient { await this.redis().mset(obj) } - async incr(key: string): Promise { - return await this.redis().incr(key) + async incr(key: string, by: number = 1): Promise { + return await this.redis().incrby(key, by) } async ttl(key: string): Promise { @@ -205,6 +248,22 @@ export class RedisClient implements CommonClient { return count } + hscanStream(key: string, opt: ScanStreamOptions): ReadableTyped { + return this.redis().hscanStream(key, opt) + } + + async hscanCount(key: string, opt: ScanStreamOptions): Promise { + let count = 0 + + const stream = this.redis().hscanStream(key, opt) + + await stream.forEach((keyValueList: string[]) => { + count += keyValueList.length / 2 + }) + + return count + } + async withPipeline(fn: (pipeline: ChainableCommander) => Promisable): Promise { const pipeline = this.redis().pipeline() await fn(pipeline) diff --git a/src/redisHashKeyValueDB.ts b/src/redisHashKeyValueDB.ts new file mode 100644 index 0000000..919be9c --- /dev/null +++ b/src/redisHashKeyValueDB.ts @@ -0,0 +1,144 @@ +import { + CommonKeyValueDBSaveBatchOptions, + CommonDBCreateOptions, + CommonKeyValueDB, + KeyValueDBTuple, +} from '@naturalcycles/db-lib' +import { _chunk, StringMap } from '@naturalcycles/js-lib' +import { ReadableTyped } from '@naturalcycles/nodejs-lib' +import { RedisClient } from './redisClient' +import { RedisKeyValueDBCfg } from './redisKeyValueDB' + +export interface RedisHashKeyValueDBCfg extends RedisKeyValueDBCfg { + hashKey: string +} + +export class RedisHashKeyValueDB implements CommonKeyValueDB, AsyncDisposable { + client: RedisClient + keyOfHashField: string + + constructor(cfg: RedisHashKeyValueDBCfg) { + this.client = cfg.client + this.keyOfHashField = cfg.hashKey + } + + async ping(): Promise { + await this.client.ping() + } + + async [Symbol.asyncDispose](): Promise { + await this.client.disconnect() + } + + async getByIds(table: string, ids: string[]): Promise { + if (!ids.length) return [] + // we assume that the order of returned values is the same as order of input ids + const bufs = await this.client.hmgetBuffer(this.keyOfHashField, this.idsToKeys(table, ids)) + return bufs.map((buf, i) => [ids[i], buf] as KeyValueDBTuple).filter(([_k, v]) => v !== null) + } + + async deleteByIds(table: string, ids: string[]): Promise { + if (!ids.length) return + await this.client.hdel(this.keyOfHashField, this.idsToKeys(table, ids)) + } + + async saveBatch( + table: string, + entries: KeyValueDBTuple[], + opt?: CommonKeyValueDBSaveBatchOptions, + ): Promise { + if (!entries.length) return + + const entriesWithKey = entries.map(([k, v]) => [this.idToKey(table, k), v]) + const map: StringMap = Object.fromEntries(entriesWithKey) + + if (opt?.expireAt) { + await this.client.hsetWithTTL(this.keyOfHashField, map, opt.expireAt) + } else { + await this.client.hset(this.keyOfHashField, map) + } + } + + streamIds(table: string, limit?: number): ReadableTyped { + let stream = this.client + .hscanStream(this.keyOfHashField, { + match: `${table}:*`, + }) + .flatMap(keyValueList => { + const keys: string[] = [] + keyValueList.forEach((keyOrValue, index) => { + if (index % 2 !== 0) return + keys.push(keyOrValue) + }) + return this.keysToIds(table, keys) + }) + + if (limit) { + stream = stream.take(limit) + } + + return stream + } + + streamValues(table: string, limit?: number): ReadableTyped { + return this.client + .hscanStream(this.keyOfHashField, { + match: `${table}:*`, + }) + .flatMap(keyValueList => { + const values: string[] = [] + keyValueList.forEach((keyOrValue, index) => { + if (index % 2 !== 1) return + values.push(keyOrValue) + }) + return values.map(v => Buffer.from(v)) + }) + .take(limit || Infinity) + } + + streamEntries(table: string, limit?: number | undefined): ReadableTyped { + return this.client + .hscanStream(this.keyOfHashField, { + match: `${table}:*`, + }) + .flatMap(keyValueList => { + const entries = _chunk(keyValueList, 2) + return entries.map(([k, v]) => { + return [this.keyToId(table, String(k)), Buffer.from(String(v))] satisfies KeyValueDBTuple + }) + }) + .take(limit || Infinity) + } + + async count(table: string): Promise { + return await this.client.hscanCount(this.keyOfHashField, { + match: `${table}:*`, + }) + } + + async increment(table: string, id: string, by: number = 1): Promise { + return await this.client.hincr(this.keyOfHashField, this.idToKey(table, id), by) + } + + async createTable(table: string, opt?: CommonDBCreateOptions): Promise { + if (!opt?.dropIfExists) return + + await this.client.dropTable(table) + } + + private idsToKeys(table: string, ids: string[]): string[] { + return ids.map(id => this.idToKey(table, id)) + } + + private idToKey(table: string, id: string): string { + return `${table}:${id}` + } + + private keysToIds(table: string, keys: string[]): string[] { + return keys.map(key => this.keyToId(table, key)) + } + + private keyToId(table: string, key: string): string { + return key.slice(table.length + 1) + } +} diff --git a/src/redisKeyValueDB.ts b/src/redisKeyValueDB.ts index edfb81d..66915f1 100644 --- a/src/redisKeyValueDB.ts +++ b/src/redisKeyValueDB.ts @@ -118,6 +118,10 @@ export class RedisKeyValueDB implements CommonKeyValueDB, AsyncDisposable { }) } + async increment(table: string, id: string, by: number = 1): Promise { + return await this.client.incr(this.idToKey(table, id), by) + } + async createTable(table: string, opt?: CommonDBCreateOptions): Promise { if (!opt?.dropIfExists) return diff --git a/src/test/redis.hash.manual.test.ts b/src/test/redis.hash.manual.test.ts new file mode 100644 index 0000000..7ddbd3d --- /dev/null +++ b/src/test/redis.hash.manual.test.ts @@ -0,0 +1,40 @@ +import { CommonKeyValueDao } from '@naturalcycles/db-lib' +import { + runCommonKeyValueDBTest, + runCommonKeyValueDaoTest, + TEST_TABLE, +} from '@naturalcycles/db-lib/dist/testing' +import { KeyValueDBTuple } from '@naturalcycles/db-lib/src/kv/commonKeyValueDB' +import { _range, localTime, pDelay } from '@naturalcycles/js-lib' +import { RedisClient } from '../redisClient' +import { RedisHashKeyValueDB } from '../redisHashKeyValueDB' + +const client = new RedisClient() +const hashKey = 'hashField' +const db = new RedisHashKeyValueDB({ client, hashKey }) +const dao = new CommonKeyValueDao({ db, table: TEST_TABLE }) + +afterAll(async () => { + await client.disconnect() +}) + +test('connect', async () => { + await db.ping() +}) + +describe('runCommonHashKeyValueDBTest', () => runCommonKeyValueDBTest(db)) +describe('runCommonKeyValueDaoTest', () => runCommonKeyValueDaoTest(dao)) + +test('saveBatch with EXAT', async () => { + const testIds = _range(1, 4).map(n => `id${n}`) + const testEntries: KeyValueDBTuple[] = testIds.map(id => [id, Buffer.from(`${id}value`)]) + + await db.saveBatch(TEST_TABLE, testEntries, { + expireAt: localTime.now().plus(1, 'second').unix, + }) + let loaded = await db.getByIds(TEST_TABLE, testIds) + expect(loaded.length).toBe(3) + await pDelay(2000) + loaded = await db.getByIds(TEST_TABLE, testIds) + expect(loaded.length).toBe(0) +}) diff --git a/yarn.lock b/yarn.lock index 4bab51a..1a2fd2a 100644 --- a/yarn.lock +++ b/yarn.lock @@ -807,9 +807,9 @@ typescript "^5.0.2" "@naturalcycles/db-lib@^9.9.2": - version "9.14.2" - resolved "https://registry.yarnpkg.com/@naturalcycles/db-lib/-/db-lib-9.14.2.tgz#da0fb14236ab3750bbf0bad524ff585bde446467" - integrity sha512-sE5Em3GnSmb0dyaLCs9S9iR4a1oxLzQ+IPdpNdcxyQCSEOsI5EilSvordkaTWBJBeMQWS8FKukK2gcPmhpYkig== + version "9.19.0" + resolved "https://registry.yarnpkg.com/@naturalcycles/db-lib/-/db-lib-9.19.0.tgz#bbf628c9b6e9ed50ed2671c8cd2a5df3f85ed91a" + integrity sha512-nL1+EdQFKkaX8uqZap3fdr0q/QlLaezlkQ0Vsklt8DupBPR8Hdvrp6S1Y/mAFO9LbfwV7FQrJeWEjtYSAqQgkw== dependencies: "@naturalcycles/js-lib" "^14.116.0" "@naturalcycles/nodejs-lib" "^13.1.1" @@ -850,9 +850,9 @@ yargs "^17.0.0" "@naturalcycles/js-lib@^14.0.0", "@naturalcycles/js-lib@^14.116.0", "@naturalcycles/js-lib@^14.217.0", "@naturalcycles/js-lib@^14.244.0": - version "14.244.0" - resolved "https://registry.yarnpkg.com/@naturalcycles/js-lib/-/js-lib-14.244.0.tgz#a8c38b99dee32dc25814c52eca98fd0fa87d8634" - integrity sha512-lQMPWGrZz1QxC8YzBQhQwwV3WBL8AX2/Z+ki9ZNZwByu/wZ5PrHIFgnndNxuDg9+KLx+oB2UnYwrV9yuuhHIBA== + version "14.253.0" + resolved "https://registry.yarnpkg.com/@naturalcycles/js-lib/-/js-lib-14.253.0.tgz#4e5eee9ad39f560446534b39532a4820300e8d6f" + integrity sha512-QLf3Kh0r3AZk/LUGtJvbBqS2Qc9MyfRnU4mUWYn+GTsJe7sDULtUiJe6FcwphbrFLI1lPvWQOUQAY0mKeXiYhQ== dependencies: tslib "^2.0.0" zod "^3.20.2" @@ -4798,9 +4798,9 @@ tsconfig-paths@^3.15.0: strip-bom "^3.0.0" tslib@^2.0.0, tslib@^2.6.2: - version "2.6.3" - resolved "https://registry.yarnpkg.com/tslib/-/tslib-2.6.3.tgz#0438f810ad7a9edcde7a241c3d80db693c8cbfe0" - integrity sha512-xNvxJEOUiWPGhUuUdQgAJPKOOJfGnIyKySOc09XkKsgdUV/3E2zvwZYdejjmRgPCgcym1juLH3226yA7sEFJKQ== + version "2.7.0" + resolved "https://registry.yarnpkg.com/tslib/-/tslib-2.7.0.tgz#d9b40c5c40ab59e8738f297df3087bf1a2690c01" + integrity sha512-gLXCKdN1/j47AiHiOkJN69hJmcbGTHI0ImLmbYLHykhgeN0jVGola9yVjFgzCUklsZQMW55o+dW7IXv3RCXDzA== type-check@^0.4.0, type-check@~0.4.0: version "0.4.0"