diff --git a/src/redisClient.test.ts b/src/redisClient.test.ts new file mode 100644 index 0000000..3173c88 --- /dev/null +++ b/src/redisClient.test.ts @@ -0,0 +1,132 @@ +import { localTime } from '@naturalcycles/js-lib' +import { RedisClient } from './redisClient' + +let client: RedisClient + +beforeAll(() => { + client = new RedisClient() +}) + +beforeEach(async () => { + await client.dropTable('test') +}) + +afterAll(async () => { + await client.dropTable('test') + await client.disconnect() +}) + +describe('hashmap functions', () => { + test('hset should save a map', async () => { + await client.hset('test:key', { foo: 'bar' }) + + const result = await client.hgetall('test:key') + + expect(result).toEqual({ foo: 'bar' }) + }) + + test('should store/fetch numbers as strings', async () => { + await client.hset('test:key', { one: 1 }) + + const result = await client.hgetall('test:key') + + expect(result).toEqual({ one: '1' }) + }) + + test('hgetall should not fetch nested objects', async () => { + await client.hset('test:key', { nested: { one: 1 } }) + + const result = await client.hgetall('test:key') + + expect(result).toEqual({ nested: '[object Object]' }) + }) + + test('hget should fetch map property', async () => { + await client.hset('test:key', { foo: 'bar' }) + + const result = await client.hget('test:key', 'foo') + + expect(result).toBe('bar') + }) + + test('hget should fetch value as string', async () => { + await client.hset('test:key', { one: 1 }) + + const result = await client.hget('test:key', 'one') + + expect(result).toBe('1') + }) + + test('hmgetBuffer should get the values of the fields as strings', async () => { + await client.hset('test:key', { one: 1, two: 2, three: 3 }) + + const result = await client.hmget('test:key', ['one', 'three']) + + expect(result).toEqual(['1', '3']) + }) + + test('hmgetBuffer should get the values of the fields as buffers', async () => { + await client.hset('test:key', { one: 1, two: 2, three: 3 }) + + const result = await client.hmgetBuffer('test:key', ['one', 'three']) + + expect(result).toEqual([Buffer.from('1'), Buffer.from('3')]) + }) + + test('hincr should change the value and return with a numeric result', async () => { + await client.hset('test:key', { one: 1 }) + + const result = await client.hincr('test:key', 'one', -2) + + expect(result).toBe(-1) + }) + + test('hincr should increase the value by 1 by default', async () => { + await client.hset('test:key', { one: 1 }) + + const result = await client.hincr('test:key', 'one') + + expect(result).toBe(2) + }) + + test('hincr should set the value to 1 for a non-existing field', async () => { + const result = await client.hincr('test:key', 'one') + + expect(result).toBe(1) + }) + + test('hScanCount should return the number of keys in the hash', async () => { + await client.hset('test:key', { one: 1, two: 2, three: 3 }) + + const result = await client.hScanCount('test:key', {}) + + expect(result).toBe(3) + }) + + test('hScanCount with a match pattern should return the number of matching keys in the hash', async () => { + await client.hset('test:key', { one: 1, two: 2, three: 3 }) + + const result = await client.hScanCount('test:key', { match: 't*' }) + + expect(result).toBe(2) + }) + + test('hdel should delete a fields from the hash', async () => { + await client.hset('test:key', { one: 1, two: 2, three: 3 }) + + await client.hdel('test:key', ['two', 'three']) + + const result = await client.hgetall('test:key') + expect(result).toEqual({ one: '1' }) + }) + + test('hsetWithTTL should set the fields with expiry', async () => { + const now = localTime.now().unix + + await client.hsetWithTTL('test:key', { foo1: 'bar' }, now + 1000) + await client.hsetWithTTL('test:key', { foo2: 'bar' }, now - 1) + + const result = await client.hgetall('test:key') + expect(result).toEqual({ foo1: 'bar' }) + }) +}) diff --git a/src/redisClient.ts b/src/redisClient.ts index 12dfa6e..ddf9788 100644 --- a/src/redisClient.ts +++ b/src/redisClient.ts @@ -1,4 +1,5 @@ import { + AnyObject, CommonLogger, NullableBuffer, NullableString, @@ -124,6 +125,38 @@ export class RedisClient implements CommonClient { await this.redis().set(key, value) } + async hgetall = Record>( + key: string, + ): Promise { + const result = await this.redis().hgetall(key) + if (Object.keys(result).length === 0) return null + return result as T + } + + async hget(key: string, field: string): Promise { + return await this.redis().hget(key, field) + } + + async hset(key: string, value: AnyObject): Promise { + await this.redis().hset(key, value) + } + + async hdel(key: string, fields: string[]): Promise { + await this.redis().hdel(key, ...fields) + } + + async hmget(key: string, fields: string[]): Promise { + return await this.redis().hmget(key, ...fields) + } + + async hmgetBuffer(key: string, fields: string[]): Promise { + return await this.redis().hmgetBuffer(key, ...fields) + } + + async hincr(key: string, field: string, increment: number = 1): Promise { + return await this.redis().hincrby(key, field, increment) + } + async setWithTTL( key: string, value: string | number | Buffer, @@ -132,6 +165,20 @@ export class RedisClient implements CommonClient { await this.redis().set(key, value, 'EXAT', expireAt) } + async hsetWithTTL( + key: string, + value: AnyObject, + expireAt: UnixTimestampNumber + ): Promise { + const valueKeys = Object.keys(value) + const numberOfKeys = valueKeys.length + const keyList = valueKeys.join(' ') + const commandString = `HEXPIREAT ${key} ${expireAt} FIELDS ${numberOfKeys} ${keyList}` + const [command, ...args] = commandString.split(' ') + await this.redis().hset(key, value) + await this.redis().call(command!, args) + } + async mset(obj: Record): Promise { await this.redis().mset(obj) } @@ -205,6 +252,23 @@ export class RedisClient implements CommonClient { return count } + hscanStream(key: string, opt: ScanStreamOptions): ReadableTyped { + return this.redis().hscanStream(key, opt) + } + + async hScanCount(key: string, opt: ScanStreamOptions): Promise { + let count = 0 + + const stream = await this.redis().hscanStream(key, opt) + + await stream.forEach((keyValueList: string[]) => { + console.log({ keyValueList }) + count += keyValueList.length / 2 + }) + + return count + } + async withPipeline(fn: (pipeline: ChainableCommander) => Promisable): Promise { const pipeline = this.redis().pipeline() await fn(pipeline) diff --git a/src/redisHashKeyValueDB.ts b/src/redisHashKeyValueDB.ts new file mode 100644 index 0000000..c963f8f --- /dev/null +++ b/src/redisHashKeyValueDB.ts @@ -0,0 +1,178 @@ +import { + CommonKeyValueDBSaveBatchOptions, + CommonDBCreateOptions, + CommonKeyValueDB, + KeyValueDBTuple, +} from '@naturalcycles/db-lib' +import { + _chunk, + _isTruthy, + _mapObject, + _stringMapEntries, + _zip, + StringMap, +} from '@naturalcycles/js-lib' +import { ReadableTyped } from '@naturalcycles/nodejs-lib' +import { RedisClient } from './redisClient' +import { RedisKeyValueDBCfg } from './redisKeyValueDB' + +export interface RedisHashKeyValueDBCfg extends RedisKeyValueDBCfg { + hashKey: string +} + +export class RedisHashKeyValueDB implements CommonKeyValueDB, AsyncDisposable { + client: RedisClient + keyOfHashField: string + + constructor(cfg: RedisHashKeyValueDBCfg) { + this.client = cfg.client + this.keyOfHashField = cfg.hashKey + } + + async ping(): Promise { + await this.client.ping() + } + + async [Symbol.asyncDispose](): Promise { + await this.client.disconnect() + } + + async getByIds(table: string, ids: string[]): Promise { + if (!ids.length) return [] + // we assume that the order of returned values is the same as order of input ids + const bufs = await this.client.hmgetBuffer( + this.keyOfHashField, + this.idsToKeys(table, ids) + ) + return bufs + .map((buf, i) => [ids[i], buf] as KeyValueDBTuple) + .filter(([_k, v]) => v !== null) + } + + async deleteByIds(table: string, ids: string[]): Promise { + if (!ids.length) return + await this.client.hdel(this.keyOfHashField, this.idsToKeys(table, ids)) + } + + async saveBatch( + table: string, + entries: KeyValueDBTuple[], + opt?: CommonKeyValueDBSaveBatchOptions + ): Promise { + if (!entries.length) return + + const map: StringMap = _mapObject( + Object.fromEntries(entries), + (k, v) => [this.idToKey(table, String(k)), v] + ) + + if (opt?.expireAt) { + await this.client.hsetWithTTL(this.keyOfHashField, map, opt.expireAt) + } else { + await this.client.hset(this.keyOfHashField, map) + } + } + + streamIds(table: string, limit?: number): ReadableTyped { + let stream = this.client + .hscanStream(this.keyOfHashField, { + match: `${table}:*`, + }) + .flatMap((keyValueList) => { + const keys: string[] = [] + keyValueList.forEach((keyOrValue, index) => { + if (index % 2 !== 0) return + keys.push(keyOrValue) + }) + return this.keysToIds(table, keys) + }) + + if (limit) { + stream = stream.take(limit) + } + + return stream + } + + streamValues(table: string, limit?: number): ReadableTyped { + return this.client + .hscanStream(this.keyOfHashField, { + match: `${table}:*`, + }) + .flatMap( + async (keyValueList) => { + const values: string[] = [] + keyValueList.forEach((keyOrValue, index) => { + if (index % 2 !== 1) return + values.push(keyOrValue) + }) + return values.map((v) => Buffer.from(v)) + }, + { + concurrency: 16, + } + ) + .take(limit || Infinity) + } + + streamEntries( + table: string, + limit?: number | undefined + ): ReadableTyped { + return this.client + .hscanStream(this.keyOfHashField, { + match: `${table}:*`, + }) + .flatMap( + async (keyValueList) => { + const entries = _chunk(keyValueList, 2) + return entries.map(([k, v]) => { + return [ + this.keyToId(table, String(k)), + Buffer.from(String(v)), + ] satisfies KeyValueDBTuple + }) + }, + { + concurrency: 16, + } + ) + .take(limit || Infinity) + } + + async count(table: string): Promise { + return await this.client.hScanCount(this.keyOfHashField, { + match: `${table}:*`, + }) + } + + async increment(table: string, id: string, by: number = 1): Promise { + return await this.client.hincr( + this.keyOfHashField, + this.idToKey(table, id), + by + ) + } + + async createTable(table: string, opt?: CommonDBCreateOptions): Promise { + if (!opt?.dropIfExists) return + + await this.client.dropTable(table) + } + + private idsToKeys(table: string, ids: string[]): string[] { + return ids.map((id) => this.idToKey(table, id)) + } + + private idToKey(table: string, id: string): string { + return `${table}:${id}` + } + + private keysToIds(table: string, keys: string[]): string[] { + return keys.map((key) => this.keyToId(table, key)) + } + + private keyToId(table: string, key: string): string { + return key.slice(table.length + 1) + } +} diff --git a/src/test/redis.hash.manual.test.ts b/src/test/redis.hash.manual.test.ts new file mode 100644 index 0000000..b59c8fa --- /dev/null +++ b/src/test/redis.hash.manual.test.ts @@ -0,0 +1,39 @@ +import { + runCommonHashKeyValueDBTest, + TEST_TABLE, +} from '@naturalcycles/db-lib/dist/testing' +import { KeyValueDBTuple } from '@naturalcycles/db-lib/src/kv/commonKeyValueDB' +import { _AsyncMemo, _range, localTime, pDelay } from '@naturalcycles/js-lib' +import { RedisClient } from '../redisClient' +import { RedisHashKeyValueDB } from '../redisHashKeyValueDB' + +const client = new RedisClient() +const hashKey = 'hashField' +const db = new RedisHashKeyValueDB({ client, hashKey }) + +afterAll(async () => { + await client.disconnect() +}) + +test('connect', async () => { + await db.ping() +}) + +describe('runCommonHashKeyValueDBTest', () => runCommonHashKeyValueDBTest(db)) + +test('saveBatch with EXAT', async () => { + const testIds = _range(1, 4).map((n) => `id${n}`) + const testEntries: KeyValueDBTuple[] = testIds.map((id) => [ + id, + Buffer.from(`${id}value`), + ]) + + await db.saveBatch(TEST_TABLE, testEntries, { + expireAt: localTime.now().plus(1, 'second').unix, + }) + let loaded = await db.getByIds(TEST_TABLE, testIds) + expect(loaded.length).toBe(3) + await pDelay(2000) + loaded = await db.getByIds(TEST_TABLE, testIds) + expect(loaded.length).toBe(0) +})