From d85ff7259bb8d84d91d46fd6c65aa8762c9f9604 Mon Sep 17 00:00:00 2001 From: David Nagy Date: Fri, 18 Oct 2024 11:20:12 +0200 Subject: [PATCH] feat: add `incrementBatch` support (#17) --- src/redisClient.manual.test.ts | 31 ++++++++++++++- src/redisClient.ts | 61 ++++++++++++++++++++++++++---- src/redisHashKeyValueDB.ts | 22 ++++++----- src/redisKeyValueDB.ts | 21 +++++----- src/test/redis.hash.manual.test.ts | 6 +-- src/test/redis.manual.test.ts | 5 +-- yarn.lock | 28 ++++++++++---- 7 files changed, 131 insertions(+), 43 deletions(-) diff --git a/src/redisClient.manual.test.ts b/src/redisClient.manual.test.ts index 37b42bd..d66d3c8 100644 --- a/src/redisClient.manual.test.ts +++ b/src/redisClient.manual.test.ts @@ -16,6 +16,21 @@ afterAll(async () => { await client.disconnect() }) +test('incrBatch should increase multiple keys', async () => { + await client.set('test:one', 1) + await client.set('test:two', 2) + + const result = await client.incrBatch([ + ['test:one', 1], + ['test:two', 2], + ]) + + expect(result).toEqual([ + ['test:one', 2], + ['test:two', 4], + ]) +}) + describe('hashmap functions', () => { test('hset should save a map', async () => { await client.hset('test:key', { foo: 'bar' }) @@ -95,6 +110,20 @@ describe('hashmap functions', () => { expect(result).toBe(1) }) + test('hincrBatch should increase multiple keys', async () => { + await client.hset('test:key', { one: 1, two: 2 }) + + const result = await client.hincrBatch('test:key', [ + ['one', 1], + ['two', 2], + ]) + + expect(result).toEqual([ + ['one', 2], + ['two', 4], + ]) + }) + test('hscanCount should return the number of keys in the hash', async () => { await client.hset('test:key', { one: 1, two: 2, three: 3 }) @@ -120,7 +149,7 @@ describe('hashmap functions', () => { expect(result).toEqual({ one: '1' }) }) - test('hsetWithTTL should set the fields with expiry', async () => { + test.skip('hsetWithTTL should set the fields with expiry', async () => { const now = localTime.now().unix await client.hsetWithTTL('test:key', { foo1: 'bar' }, now + 1000) diff --git a/src/redisClient.ts b/src/redisClient.ts index b2680d8..9b4f157 100644 --- a/src/redisClient.ts +++ b/src/redisClient.ts @@ -1,9 +1,11 @@ import { + _stringMapEntries, AnyObject, CommonLogger, NullableBuffer, NullableString, Promisable, + StringMap, UnixTimestampNumber, } from '@naturalcycles/js-lib' import { ReadableTyped } from '@naturalcycles/nodejs-lib' @@ -157,6 +159,25 @@ export class RedisClient implements CommonClient { return await this.redis().hincrby(key, field, increment) } + async hincrBatch(key: string, incrementTuples: [string, number][]): Promise<[string, number][]> { + const results: StringMap = {} + + await this.withPipeline(async pipeline => { + for (const [field, increment] of incrementTuples) { + pipeline.hincrby(key, field, increment, (_err, newValue) => { + results[field] = newValue + }) + } + }) + + const validResults = _stringMapEntries(results).filter(([_, v]) => v !== undefined) as [ + string, + number, + ][] + + return validResults + } + async setWithTTL( key: string, value: string | number | Buffer, @@ -165,14 +186,19 @@ export class RedisClient implements CommonClient { await this.redis().set(key, value, 'EXAT', expireAt) } - async hsetWithTTL(key: string, value: AnyObject, expireAt: UnixTimestampNumber): Promise { - const valueKeys = Object.keys(value) - const numberOfKeys = valueKeys.length - const keyList = valueKeys.join(' ') - const commandString = `HEXPIREAT ${key} ${expireAt} FIELDS ${numberOfKeys} ${keyList}` - const [command, ...args] = commandString.split(' ') - await this.redis().hset(key, value) - await this.redis().call(command!, args) + async hsetWithTTL( + _key: string, + _value: AnyObject, + _expireAt: UnixTimestampNumber, + ): Promise { + throw new Error('Not supported until Redis 7.4.0') + // const valueKeys = Object.keys(value) + // const numberOfKeys = valueKeys.length + // const keyList = valueKeys.join(' ') + // const commandString = `HEXPIREAT ${key} ${expireAt} FIELDS ${numberOfKeys} ${keyList}` + // const [command, ...args] = commandString.split(' ') + // await this.redis().hset(key, value) + // await this.redis().call(command!, args) } async mset(obj: Record): Promise { @@ -187,6 +213,25 @@ export class RedisClient implements CommonClient { return await this.redis().incrby(key, by) } + async incrBatch(incrementTuples: [string, number][]): Promise<[string, number][]> { + const results: StringMap = {} + + await this.withPipeline(async pipeline => { + for (const [key, increment] of incrementTuples) { + pipeline.incrby(key, increment, (_err, newValue) => { + results[key] = newValue + }) + } + }) + + const validResults = _stringMapEntries(results).filter(([_, v]) => v !== undefined) as [ + string, + number, + ][] + + return validResults + } + async ttl(key: string): Promise { return await this.redis().ttl(key) } diff --git a/src/redisHashKeyValueDB.ts b/src/redisHashKeyValueDB.ts index 0218e7b..e19bfb2 100644 --- a/src/redisHashKeyValueDB.ts +++ b/src/redisHashKeyValueDB.ts @@ -3,6 +3,7 @@ import { CommonKeyValueDB, commonKeyValueDBFullSupport, CommonKeyValueDBSaveBatchOptions, + IncrementTuple, KeyValueDBTuple, } from '@naturalcycles/db-lib' import { _chunk, StringMap } from '@naturalcycles/js-lib' @@ -121,15 +122,18 @@ export class RedisHashKeyValueDB implements CommonKeyValueDB, AsyncDisposable { }) } - async increment(table: string, id: string, by = 1): Promise { - return await this.client.hincr(this.keyOfHashField, this.idToKey(table, id), by) - } - - async incrementBatch( - _table: string, - _incrementMap: StringMap, - ): Promise> { - throw new Error('Not implemented') + async incrementBatch(table: string, increments: IncrementTuple[]): Promise { + const incrementTuplesWithInternalKeys = increments.map( + ([id, v]) => [this.idToKey(table, id), v] as [string, number], + ) + const resultsWithInternalKeys = await this.client.hincrBatch( + this.keyOfHashField, + incrementTuplesWithInternalKeys, + ) + const results = resultsWithInternalKeys.map( + ([k, v]) => [this.keyToId(table, k), v] as IncrementTuple, + ) + return results } async createTable(table: string, opt?: CommonDBCreateOptions): Promise { diff --git a/src/redisKeyValueDB.ts b/src/redisKeyValueDB.ts index fd29f1b..41a5365 100644 --- a/src/redisKeyValueDB.ts +++ b/src/redisKeyValueDB.ts @@ -3,9 +3,10 @@ import { CommonKeyValueDB, commonKeyValueDBFullSupport, CommonKeyValueDBSaveBatchOptions, + IncrementTuple, KeyValueDBTuple, } from '@naturalcycles/db-lib' -import { _isTruthy, _zip, StringMap } from '@naturalcycles/js-lib' +import { _isTruthy, _zip } from '@naturalcycles/js-lib' import { ReadableTyped } from '@naturalcycles/nodejs-lib' import { RedisClient } from './redisClient' @@ -123,15 +124,15 @@ export class RedisKeyValueDB implements CommonKeyValueDB, AsyncDisposable { }) } - async increment(table: string, id: string, by = 1): Promise { - return await this.client.incr(this.idToKey(table, id), by) - } - - async incrementBatch( - _table: string, - _incrementMap: StringMap, - ): Promise> { - throw new Error('Not implemented') + async incrementBatch(table: string, increments: IncrementTuple[]): Promise { + const incrementTuplesWithInternalKeys = increments.map( + ([id, v]) => [this.idToKey(table, id), v] as [string, number], + ) + const resultsWithInternalKeys = await this.client.incrBatch(incrementTuplesWithInternalKeys) + const results = resultsWithInternalKeys.map( + ([k, v]) => [this.keyToId(table, k), v] as IncrementTuple, + ) + return results } async createTable(table: string, opt?: CommonDBCreateOptions): Promise { diff --git a/src/test/redis.hash.manual.test.ts b/src/test/redis.hash.manual.test.ts index adbd066..297a255 100644 --- a/src/test/redis.hash.manual.test.ts +++ b/src/test/redis.hash.manual.test.ts @@ -1,4 +1,3 @@ -import { CommonKeyValueDao } from '@naturalcycles/db-lib' import { runCommonKeyValueDaoTest, runCommonKeyValueDBTest, @@ -12,7 +11,6 @@ import { RedisHashKeyValueDB } from '../redisHashKeyValueDB' const client = new RedisClient() const hashKey = 'hashField' const db = new RedisHashKeyValueDB({ client, hashKey }) -const dao = new CommonKeyValueDao({ db, table: TEST_TABLE }) afterAll(async () => { await client.disconnect() @@ -23,9 +21,9 @@ test('connect', async () => { }) describe('runCommonHashKeyValueDBTest', () => runCommonKeyValueDBTest(db)) -describe('runCommonKeyValueDaoTest', () => runCommonKeyValueDaoTest(dao)) +describe('runCommonKeyValueDaoTest', () => runCommonKeyValueDaoTest(db)) -test('saveBatch with EXAT', async () => { +test.skip('saveBatch with EXAT', async () => { const testIds = _range(1, 4).map(n => `id${n}`) const testEntries: KeyValueDBTuple[] = testIds.map(id => [id, Buffer.from(`${id}value`)]) diff --git a/src/test/redis.manual.test.ts b/src/test/redis.manual.test.ts index c8713ef..baf116e 100644 --- a/src/test/redis.manual.test.ts +++ b/src/test/redis.manual.test.ts @@ -9,7 +9,7 @@ import { RedisKeyValueDB } from '../redisKeyValueDB' const client = new RedisClient() const db = new RedisKeyValueDB({ client }) -const dao = new CommonKeyValueDao({ +const dao = new CommonKeyValueDao({ db, table: TEST_TABLE, }) @@ -23,8 +23,7 @@ test('connect', async () => { }) describe('runCommonKeyValueDBTest', () => runCommonKeyValueDBTest(db)) - -describe('runCommonKeyValueDaoTest', () => runCommonKeyValueDaoTest(dao)) +describe('runCommonKeyValueDaoTest', () => runCommonKeyValueDaoTest(db)) test('saveBatch with EXAT', async () => { const testIds = _range(1, 4).map(n => `id${n}`) diff --git a/yarn.lock b/yarn.lock index 014442d..91bf11f 100644 --- a/yarn.lock +++ b/yarn.lock @@ -1007,9 +1007,9 @@ typescript "^5.0.2" "@naturalcycles/db-lib@^9.9.2": - version "9.22.0" - resolved "https://registry.yarnpkg.com/@naturalcycles/db-lib/-/db-lib-9.22.0.tgz#d01153b033e22155ade705aa049fe089978c5798" - integrity sha512-989fWQqlfMrtoaKxzqWN2Eh7Y7MrJcqoq5wl7Uldm84eUe3OUY87H0BYgGr/1kO309l2EuzhEkkEzcuO9QyKJw== + version "9.23.1" + resolved "https://registry.yarnpkg.com/@naturalcycles/db-lib/-/db-lib-9.23.1.tgz#0bdcb67032762755c75c75d07529dea4f1c20225" + integrity sha512-yQQwC8g21eQTstDSuiob1sKyJ6d6pgkP8tZ3NPqX/Sk91LXX0MZb3dqq7djh1o0nqV6etjBG1stM+ZszHBd/0w== dependencies: "@naturalcycles/js-lib" "^14.116.0" "@naturalcycles/nodejs-lib" "^13.1.1" @@ -1261,7 +1261,14 @@ dependencies: "@types/node" "*" -"@types/node@*", "@types/node@^22.0.0", "@types/node@^22.7.5": +"@types/node@*": + version "22.7.6" + resolved "https://registry.yarnpkg.com/@types/node/-/node-22.7.6.tgz#3ec3e2b071e136cd11093c19128405e1d1f92f33" + integrity sha512-/d7Rnj0/ExXDMcioS78/kf1lMzYk4BZV8MZGTBKzTGZ6/406ukkbYlIsZmMPhcR5KlkunDHQLrtAVmSq7r+mSw== + dependencies: + undici-types "~6.19.2" + +"@types/node@^22.0.0", "@types/node@^22.7.5": version "22.7.5" resolved "https://registry.yarnpkg.com/@types/node/-/node-22.7.5.tgz#cfde981727a7ab3611a481510b473ae54442b92b" integrity sha512-jML7s2NAzMWc//QSJ1a3prpk78cOPchGvXJsC3C6R6PSMoooztvRVQEz89gmBTBY1SPMaqo5teB4uNHPdetShQ== @@ -2391,9 +2398,9 @@ fast-levenshtein@^2.0.6: integrity sha512-DCXu6Ifhqcks7TZKY3Hxp3y6qphY5SJZmrWMDrKcERSOXWQdMhU9Ig/PYrzyw/ul9jOIyh0N4M0tbC5hodg8dw== fast-uri@^3.0.1: - version "3.0.2" - resolved "https://registry.yarnpkg.com/fast-uri/-/fast-uri-3.0.2.tgz#d78b298cf70fd3b752fd951175a3da6a7b48f024" - integrity sha512-GR6f0hD7XXyNJa25Tb9BuIdN0tdr+0BMi6/CJPH3wJO1JjNG3n/VsSw38AwRdKZABm8lGbPfakLRkYzx2V9row== + version "3.0.3" + resolved "https://registry.yarnpkg.com/fast-uri/-/fast-uri-3.0.3.tgz#892a1c91802d5d7860de728f18608a0573142241" + integrity sha512-aLrHthzCjH5He4Z2H9YZ+v6Ujb9ocRuW6ZzkJQOrTxleEijANq4v1TsaPaVG1PZcuurEzrLcWRyYBYXD5cEiaw== fastq@^1.6.0: version "1.17.1" @@ -4480,7 +4487,12 @@ ts-node@^10.0.0: v8-compile-cache-lib "^3.0.1" yn "3.1.1" -tslib@^2.0.0, tslib@^2.6.2, tslib@^2.6.3: +tslib@^2.0.0: + version "2.8.0" + resolved "https://registry.yarnpkg.com/tslib/-/tslib-2.8.0.tgz#d124c86c3c05a40a91e6fdea4021bd31d377971b" + integrity sha512-jWVzBLplnCmoaTr13V9dYbiQ99wvZRd0vNWaDRg+aVYRcjDF3nDksxFDE/+fkXnKhpnUUkmx5pK/v8mCtLVqZA== + +tslib@^2.6.2, tslib@^2.6.3: version "2.7.0" resolved "https://registry.yarnpkg.com/tslib/-/tslib-2.7.0.tgz#d9b40c5c40ab59e8738f297df3087bf1a2690c01" integrity sha512-gLXCKdN1/j47AiHiOkJN69hJmcbGTHI0ImLmbYLHykhgeN0jVGola9yVjFgzCUklsZQMW55o+dW7IXv3RCXDzA==