Skip to content

Commit

Permalink
feat: add incrementBatch support (#17)
Browse files Browse the repository at this point in the history
  • Loading branch information
mrnagydavid authored Oct 18, 2024
1 parent 54add95 commit d85ff72
Show file tree
Hide file tree
Showing 7 changed files with 131 additions and 43 deletions.
31 changes: 30 additions & 1 deletion src/redisClient.manual.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,21 @@ afterAll(async () => {
await client.disconnect()
})

test('incrBatch should increase multiple keys', async () => {
await client.set('test:one', 1)
await client.set('test:two', 2)

const result = await client.incrBatch([
['test:one', 1],
['test:two', 2],
])

expect(result).toEqual([
['test:one', 2],
['test:two', 4],
])
})

describe('hashmap functions', () => {
test('hset should save a map', async () => {
await client.hset('test:key', { foo: 'bar' })
Expand Down Expand Up @@ -95,6 +110,20 @@ describe('hashmap functions', () => {
expect(result).toBe(1)
})

test('hincrBatch should increase multiple keys', async () => {
await client.hset('test:key', { one: 1, two: 2 })

const result = await client.hincrBatch('test:key', [
['one', 1],
['two', 2],
])

expect(result).toEqual([
['one', 2],
['two', 4],
])
})

test('hscanCount should return the number of keys in the hash', async () => {
await client.hset('test:key', { one: 1, two: 2, three: 3 })

Expand All @@ -120,7 +149,7 @@ describe('hashmap functions', () => {
expect(result).toEqual({ one: '1' })
})

test('hsetWithTTL should set the fields with expiry', async () => {
test.skip('hsetWithTTL should set the fields with expiry', async () => {
const now = localTime.now().unix

await client.hsetWithTTL('test:key', { foo1: 'bar' }, now + 1000)
Expand Down
61 changes: 53 additions & 8 deletions src/redisClient.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import {
_stringMapEntries,
AnyObject,
CommonLogger,
NullableBuffer,
NullableString,
Promisable,
StringMap,
UnixTimestampNumber,
} from '@naturalcycles/js-lib'
import { ReadableTyped } from '@naturalcycles/nodejs-lib'
Expand Down Expand Up @@ -157,6 +159,25 @@ export class RedisClient implements CommonClient {
return await this.redis().hincrby(key, field, increment)
}

async hincrBatch(key: string, incrementTuples: [string, number][]): Promise<[string, number][]> {
const results: StringMap<number | undefined> = {}

await this.withPipeline(async pipeline => {
for (const [field, increment] of incrementTuples) {
pipeline.hincrby(key, field, increment, (_err, newValue) => {
results[field] = newValue
})
}
})

const validResults = _stringMapEntries(results).filter(([_, v]) => v !== undefined) as [
string,
number,
][]

return validResults
}

async setWithTTL(
key: string,
value: string | number | Buffer,
Expand All @@ -165,14 +186,19 @@ export class RedisClient implements CommonClient {
await this.redis().set(key, value, 'EXAT', expireAt)
}

async hsetWithTTL(key: string, value: AnyObject, expireAt: UnixTimestampNumber): Promise<void> {
const valueKeys = Object.keys(value)
const numberOfKeys = valueKeys.length
const keyList = valueKeys.join(' ')
const commandString = `HEXPIREAT ${key} ${expireAt} FIELDS ${numberOfKeys} ${keyList}`
const [command, ...args] = commandString.split(' ')
await this.redis().hset(key, value)
await this.redis().call(command!, args)
async hsetWithTTL(
_key: string,
_value: AnyObject,
_expireAt: UnixTimestampNumber,
): Promise<void> {
throw new Error('Not supported until Redis 7.4.0')
// const valueKeys = Object.keys(value)
// const numberOfKeys = valueKeys.length
// const keyList = valueKeys.join(' ')
// const commandString = `HEXPIREAT ${key} ${expireAt} FIELDS ${numberOfKeys} ${keyList}`
// const [command, ...args] = commandString.split(' ')
// await this.redis().hset(key, value)
// await this.redis().call(command!, args)
}

async mset(obj: Record<string, string | number>): Promise<void> {
Expand All @@ -187,6 +213,25 @@ export class RedisClient implements CommonClient {
return await this.redis().incrby(key, by)
}

async incrBatch(incrementTuples: [string, number][]): Promise<[string, number][]> {
const results: StringMap<number | undefined> = {}

await this.withPipeline(async pipeline => {
for (const [key, increment] of incrementTuples) {
pipeline.incrby(key, increment, (_err, newValue) => {
results[key] = newValue
})
}
})

const validResults = _stringMapEntries(results).filter(([_, v]) => v !== undefined) as [
string,
number,
][]

return validResults
}

async ttl(key: string): Promise<number> {
return await this.redis().ttl(key)
}
Expand Down
22 changes: 13 additions & 9 deletions src/redisHashKeyValueDB.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import {
CommonKeyValueDB,
commonKeyValueDBFullSupport,
CommonKeyValueDBSaveBatchOptions,
IncrementTuple,
KeyValueDBTuple,
} from '@naturalcycles/db-lib'
import { _chunk, StringMap } from '@naturalcycles/js-lib'
Expand Down Expand Up @@ -121,15 +122,18 @@ export class RedisHashKeyValueDB implements CommonKeyValueDB, AsyncDisposable {
})
}

async increment(table: string, id: string, by = 1): Promise<number> {
return await this.client.hincr(this.keyOfHashField, this.idToKey(table, id), by)
}

async incrementBatch(
_table: string,
_incrementMap: StringMap<number>,
): Promise<StringMap<number>> {
throw new Error('Not implemented')
async incrementBatch(table: string, increments: IncrementTuple[]): Promise<IncrementTuple[]> {
const incrementTuplesWithInternalKeys = increments.map(
([id, v]) => [this.idToKey(table, id), v] as [string, number],
)
const resultsWithInternalKeys = await this.client.hincrBatch(
this.keyOfHashField,
incrementTuplesWithInternalKeys,
)
const results = resultsWithInternalKeys.map(
([k, v]) => [this.keyToId(table, k), v] as IncrementTuple,
)
return results
}

async createTable(table: string, opt?: CommonDBCreateOptions): Promise<void> {
Expand Down
21 changes: 11 additions & 10 deletions src/redisKeyValueDB.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@ import {
CommonKeyValueDB,
commonKeyValueDBFullSupport,
CommonKeyValueDBSaveBatchOptions,
IncrementTuple,
KeyValueDBTuple,
} from '@naturalcycles/db-lib'
import { _isTruthy, _zip, StringMap } from '@naturalcycles/js-lib'
import { _isTruthy, _zip } from '@naturalcycles/js-lib'
import { ReadableTyped } from '@naturalcycles/nodejs-lib'
import { RedisClient } from './redisClient'

Expand Down Expand Up @@ -123,15 +124,15 @@ export class RedisKeyValueDB implements CommonKeyValueDB, AsyncDisposable {
})
}

async increment(table: string, id: string, by = 1): Promise<number> {
return await this.client.incr(this.idToKey(table, id), by)
}

async incrementBatch(
_table: string,
_incrementMap: StringMap<number>,
): Promise<StringMap<number>> {
throw new Error('Not implemented')
async incrementBatch(table: string, increments: IncrementTuple[]): Promise<IncrementTuple[]> {
const incrementTuplesWithInternalKeys = increments.map(
([id, v]) => [this.idToKey(table, id), v] as [string, number],
)
const resultsWithInternalKeys = await this.client.incrBatch(incrementTuplesWithInternalKeys)
const results = resultsWithInternalKeys.map(
([k, v]) => [this.keyToId(table, k), v] as IncrementTuple,
)
return results
}

async createTable(table: string, opt?: CommonDBCreateOptions): Promise<void> {
Expand Down
6 changes: 2 additions & 4 deletions src/test/redis.hash.manual.test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import { CommonKeyValueDao } from '@naturalcycles/db-lib'
import {
runCommonKeyValueDaoTest,
runCommonKeyValueDBTest,
Expand All @@ -12,7 +11,6 @@ import { RedisHashKeyValueDB } from '../redisHashKeyValueDB'
const client = new RedisClient()
const hashKey = 'hashField'
const db = new RedisHashKeyValueDB({ client, hashKey })
const dao = new CommonKeyValueDao<Buffer>({ db, table: TEST_TABLE })

afterAll(async () => {
await client.disconnect()
Expand All @@ -23,9 +21,9 @@ test('connect', async () => {
})

describe('runCommonHashKeyValueDBTest', () => runCommonKeyValueDBTest(db))
describe('runCommonKeyValueDaoTest', () => runCommonKeyValueDaoTest(dao))
describe('runCommonKeyValueDaoTest', () => runCommonKeyValueDaoTest(db))

test('saveBatch with EXAT', async () => {
test.skip('saveBatch with EXAT', async () => {
const testIds = _range(1, 4).map(n => `id${n}`)
const testEntries: KeyValueDBTuple[] = testIds.map(id => [id, Buffer.from(`${id}value`)])

Expand Down
5 changes: 2 additions & 3 deletions src/test/redis.manual.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import { RedisKeyValueDB } from '../redisKeyValueDB'
const client = new RedisClient()
const db = new RedisKeyValueDB({ client })

const dao = new CommonKeyValueDao<Buffer>({
const dao = new CommonKeyValueDao<string, Buffer>({
db,
table: TEST_TABLE,
})
Expand All @@ -23,8 +23,7 @@ test('connect', async () => {
})

describe('runCommonKeyValueDBTest', () => runCommonKeyValueDBTest(db))

describe('runCommonKeyValueDaoTest', () => runCommonKeyValueDaoTest(dao))
describe('runCommonKeyValueDaoTest', () => runCommonKeyValueDaoTest(db))

test('saveBatch with EXAT', async () => {
const testIds = _range(1, 4).map(n => `id${n}`)
Expand Down
28 changes: 20 additions & 8 deletions yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -1007,9 +1007,9 @@
typescript "^5.0.2"

"@naturalcycles/db-lib@^9.9.2":
version "9.22.0"
resolved "https://registry.yarnpkg.com/@naturalcycles/db-lib/-/db-lib-9.22.0.tgz#d01153b033e22155ade705aa049fe089978c5798"
integrity sha512-989fWQqlfMrtoaKxzqWN2Eh7Y7MrJcqoq5wl7Uldm84eUe3OUY87H0BYgGr/1kO309l2EuzhEkkEzcuO9QyKJw==
version "9.23.1"
resolved "https://registry.yarnpkg.com/@naturalcycles/db-lib/-/db-lib-9.23.1.tgz#0bdcb67032762755c75c75d07529dea4f1c20225"
integrity sha512-yQQwC8g21eQTstDSuiob1sKyJ6d6pgkP8tZ3NPqX/Sk91LXX0MZb3dqq7djh1o0nqV6etjBG1stM+ZszHBd/0w==
dependencies:
"@naturalcycles/js-lib" "^14.116.0"
"@naturalcycles/nodejs-lib" "^13.1.1"
Expand Down Expand Up @@ -1261,7 +1261,14 @@
dependencies:
"@types/node" "*"

"@types/node@*", "@types/node@^22.0.0", "@types/node@^22.7.5":
"@types/node@*":
version "22.7.6"
resolved "https://registry.yarnpkg.com/@types/node/-/node-22.7.6.tgz#3ec3e2b071e136cd11093c19128405e1d1f92f33"
integrity sha512-/d7Rnj0/ExXDMcioS78/kf1lMzYk4BZV8MZGTBKzTGZ6/406ukkbYlIsZmMPhcR5KlkunDHQLrtAVmSq7r+mSw==
dependencies:
undici-types "~6.19.2"

"@types/node@^22.0.0", "@types/node@^22.7.5":
version "22.7.5"
resolved "https://registry.yarnpkg.com/@types/node/-/node-22.7.5.tgz#cfde981727a7ab3611a481510b473ae54442b92b"
integrity sha512-jML7s2NAzMWc//QSJ1a3prpk78cOPchGvXJsC3C6R6PSMoooztvRVQEz89gmBTBY1SPMaqo5teB4uNHPdetShQ==
Expand Down Expand Up @@ -2391,9 +2398,9 @@ fast-levenshtein@^2.0.6:
integrity sha512-DCXu6Ifhqcks7TZKY3Hxp3y6qphY5SJZmrWMDrKcERSOXWQdMhU9Ig/PYrzyw/ul9jOIyh0N4M0tbC5hodg8dw==

fast-uri@^3.0.1:
version "3.0.2"
resolved "https://registry.yarnpkg.com/fast-uri/-/fast-uri-3.0.2.tgz#d78b298cf70fd3b752fd951175a3da6a7b48f024"
integrity sha512-GR6f0hD7XXyNJa25Tb9BuIdN0tdr+0BMi6/CJPH3wJO1JjNG3n/VsSw38AwRdKZABm8lGbPfakLRkYzx2V9row==
version "3.0.3"
resolved "https://registry.yarnpkg.com/fast-uri/-/fast-uri-3.0.3.tgz#892a1c91802d5d7860de728f18608a0573142241"
integrity sha512-aLrHthzCjH5He4Z2H9YZ+v6Ujb9ocRuW6ZzkJQOrTxleEijANq4v1TsaPaVG1PZcuurEzrLcWRyYBYXD5cEiaw==

fastq@^1.6.0:
version "1.17.1"
Expand Down Expand Up @@ -4480,7 +4487,12 @@ ts-node@^10.0.0:
v8-compile-cache-lib "^3.0.1"
yn "3.1.1"

tslib@^2.0.0, tslib@^2.6.2, tslib@^2.6.3:
tslib@^2.0.0:
version "2.8.0"
resolved "https://registry.yarnpkg.com/tslib/-/tslib-2.8.0.tgz#d124c86c3c05a40a91e6fdea4021bd31d377971b"
integrity sha512-jWVzBLplnCmoaTr13V9dYbiQ99wvZRd0vNWaDRg+aVYRcjDF3nDksxFDE/+fkXnKhpnUUkmx5pK/v8mCtLVqZA==

tslib@^2.6.2, tslib@^2.6.3:
version "2.7.0"
resolved "https://registry.yarnpkg.com/tslib/-/tslib-2.7.0.tgz#d9b40c5c40ab59e8738f297df3087bf1a2690c01"
integrity sha512-gLXCKdN1/j47AiHiOkJN69hJmcbGTHI0ImLmbYLHykhgeN0jVGola9yVjFgzCUklsZQMW55o+dW7IXv3RCXDzA==
Expand Down

0 comments on commit d85ff72

Please sign in to comment.