Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add incrementBatch support #17

Merged
merged 3 commits into from
Oct 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 30 additions & 1 deletion src/redisClient.manual.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,21 @@ afterAll(async () => {
await client.disconnect()
})

test('incrBatch should increase multiple keys', async () => {
await client.set('test:one', 1)
await client.set('test:two', 2)

const result = await client.incrBatch([
['test:one', 1],
['test:two', 2],
])

expect(result).toEqual([
['test:one', 2],
['test:two', 4],
])
})

describe('hashmap functions', () => {
test('hset should save a map', async () => {
await client.hset('test:key', { foo: 'bar' })
Expand Down Expand Up @@ -95,6 +110,20 @@ describe('hashmap functions', () => {
expect(result).toBe(1)
})

test('hincrBatch should increase multiple keys', async () => {
await client.hset('test:key', { one: 1, two: 2 })

const result = await client.hincrBatch('test:key', [
['one', 1],
['two', 2],
])

expect(result).toEqual([
['one', 2],
['two', 4],
])
})

test('hscanCount should return the number of keys in the hash', async () => {
await client.hset('test:key', { one: 1, two: 2, three: 3 })

Expand All @@ -120,7 +149,7 @@ describe('hashmap functions', () => {
expect(result).toEqual({ one: '1' })
})

test('hsetWithTTL should set the fields with expiry', async () => {
test.skip('hsetWithTTL should set the fields with expiry', async () => {
const now = localTime.now().unix

await client.hsetWithTTL('test:key', { foo1: 'bar' }, now + 1000)
Expand Down
61 changes: 53 additions & 8 deletions src/redisClient.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import {
_stringMapEntries,
AnyObject,
CommonLogger,
NullableBuffer,
NullableString,
Promisable,
StringMap,
UnixTimestampNumber,
} from '@naturalcycles/js-lib'
import { ReadableTyped } from '@naturalcycles/nodejs-lib'
Expand Down Expand Up @@ -157,6 +159,25 @@ export class RedisClient implements CommonClient {
return await this.redis().hincrby(key, field, increment)
}

async hincrBatch(key: string, incrementTuples: [string, number][]): Promise<[string, number][]> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added the batch support into our client-wrapper, because the withPipeline() exposes the underlying lib, and I don't think that consumers of our wrapper should be using that (if avoidable).

The consuming logic (redis*KeyValueDb.incrementBatch) has also its own business translating the keys - so this separation of concerns (here we pipe, there we translate ids) is also a plus,

const results: StringMap<number | undefined> = {}

await this.withPipeline(async pipeline => {
for (const [field, increment] of incrementTuples) {
pipeline.hincrby(key, field, increment, (_err, newValue) => {
results[field] = newValue
})
}
})

const validResults = _stringMapEntries(results).filter(([_, v]) => v !== undefined) as [
string,
number,
][]

return validResults
}

async setWithTTL(
key: string,
value: string | number | Buffer,
Expand All @@ -165,14 +186,19 @@ export class RedisClient implements CommonClient {
await this.redis().set(key, value, 'EXAT', expireAt)
}

async hsetWithTTL(key: string, value: AnyObject, expireAt: UnixTimestampNumber): Promise<void> {
const valueKeys = Object.keys(value)
const numberOfKeys = valueKeys.length
const keyList = valueKeys.join(' ')
const commandString = `HEXPIREAT ${key} ${expireAt} FIELDS ${numberOfKeys} ${keyList}`
const [command, ...args] = commandString.split(' ')
await this.redis().hset(key, value)
await this.redis().call(command!, args)
async hsetWithTTL(
_key: string,
_value: AnyObject,
_expireAt: UnixTimestampNumber,
): Promise<void> {
throw new Error('Not supported until Redis 7.4.0')
// const valueKeys = Object.keys(value)
// const numberOfKeys = valueKeys.length
// const keyList = valueKeys.join(' ')
// const commandString = `HEXPIREAT ${key} ${expireAt} FIELDS ${numberOfKeys} ${keyList}`
// const [command, ...args] = commandString.split(' ')
// await this.redis().hset(key, value)
// await this.redis().call(command!, args)
Comment on lines +194 to +201
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking open source, this is probably not a good idea.
Maybe a version check, and then throw?

On the other hand, none of OUR usually accessible redises support it.

}

async mset(obj: Record<string, string | number>): Promise<void> {
Expand All @@ -187,6 +213,25 @@ export class RedisClient implements CommonClient {
return await this.redis().incrby(key, by)
}

async incrBatch(incrementTuples: [string, number][]): Promise<[string, number][]> {
const results: StringMap<number | undefined> = {}

await this.withPipeline(async pipeline => {
for (const [key, increment] of incrementTuples) {
pipeline.incrby(key, increment, (_err, newValue) => {
results[key] = newValue
})
}
})

const validResults = _stringMapEntries(results).filter(([_, v]) => v !== undefined) as [
string,
number,
][]

return validResults
}

async ttl(key: string): Promise<number> {
return await this.redis().ttl(key)
}
Expand Down
22 changes: 13 additions & 9 deletions src/redisHashKeyValueDB.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import {
CommonKeyValueDB,
commonKeyValueDBFullSupport,
CommonKeyValueDBSaveBatchOptions,
IncrementTuple,
KeyValueDBTuple,
} from '@naturalcycles/db-lib'
import { _chunk, StringMap } from '@naturalcycles/js-lib'
Expand Down Expand Up @@ -121,15 +122,18 @@ export class RedisHashKeyValueDB implements CommonKeyValueDB, AsyncDisposable {
})
}

async increment(table: string, id: string, by = 1): Promise<number> {
return await this.client.hincr(this.keyOfHashField, this.idToKey(table, id), by)
}

async incrementBatch(
_table: string,
_incrementMap: StringMap<number>,
): Promise<StringMap<number>> {
throw new Error('Not implemented')
async incrementBatch(table: string, increments: IncrementTuple[]): Promise<IncrementTuple[]> {
const incrementTuplesWithInternalKeys = increments.map(
([id, v]) => [this.idToKey(table, id), v] as [string, number],
)
const resultsWithInternalKeys = await this.client.hincrBatch(
this.keyOfHashField,
incrementTuplesWithInternalKeys,
)
const results = resultsWithInternalKeys.map(
([k, v]) => [this.keyToId(table, k), v] as IncrementTuple,
)
return results
}

async createTable(table: string, opt?: CommonDBCreateOptions): Promise<void> {
Expand Down
21 changes: 11 additions & 10 deletions src/redisKeyValueDB.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@ import {
CommonKeyValueDB,
commonKeyValueDBFullSupport,
CommonKeyValueDBSaveBatchOptions,
IncrementTuple,
KeyValueDBTuple,
} from '@naturalcycles/db-lib'
import { _isTruthy, _zip, StringMap } from '@naturalcycles/js-lib'
import { _isTruthy, _zip } from '@naturalcycles/js-lib'
import { ReadableTyped } from '@naturalcycles/nodejs-lib'
import { RedisClient } from './redisClient'

Expand Down Expand Up @@ -123,15 +124,15 @@ export class RedisKeyValueDB implements CommonKeyValueDB, AsyncDisposable {
})
}

async increment(table: string, id: string, by = 1): Promise<number> {
return await this.client.incr(this.idToKey(table, id), by)
}

async incrementBatch(
_table: string,
_incrementMap: StringMap<number>,
): Promise<StringMap<number>> {
throw new Error('Not implemented')
async incrementBatch(table: string, increments: IncrementTuple[]): Promise<IncrementTuple[]> {
const incrementTuplesWithInternalKeys = increments.map(
([id, v]) => [this.idToKey(table, id), v] as [string, number],
)
const resultsWithInternalKeys = await this.client.incrBatch(incrementTuplesWithInternalKeys)
const results = resultsWithInternalKeys.map(
([k, v]) => [this.keyToId(table, k), v] as IncrementTuple,
)
return results
}

async createTable(table: string, opt?: CommonDBCreateOptions): Promise<void> {
Expand Down
6 changes: 2 additions & 4 deletions src/test/redis.hash.manual.test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import { CommonKeyValueDao } from '@naturalcycles/db-lib'
import {
runCommonKeyValueDaoTest,
runCommonKeyValueDBTest,
Expand All @@ -12,7 +11,6 @@ import { RedisHashKeyValueDB } from '../redisHashKeyValueDB'
const client = new RedisClient()
const hashKey = 'hashField'
const db = new RedisHashKeyValueDB({ client, hashKey })
const dao = new CommonKeyValueDao<Buffer>({ db, table: TEST_TABLE })

afterAll(async () => {
await client.disconnect()
Expand All @@ -23,9 +21,9 @@ test('connect', async () => {
})

describe('runCommonHashKeyValueDBTest', () => runCommonKeyValueDBTest(db))
describe('runCommonKeyValueDaoTest', () => runCommonKeyValueDaoTest(dao))
describe('runCommonKeyValueDaoTest', () => runCommonKeyValueDaoTest(db))

test('saveBatch with EXAT', async () => {
test.skip('saveBatch with EXAT', async () => {
const testIds = _range(1, 4).map(n => `id${n}`)
const testEntries: KeyValueDBTuple[] = testIds.map(id => [id, Buffer.from(`${id}value`)])

Expand Down
5 changes: 2 additions & 3 deletions src/test/redis.manual.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import { RedisKeyValueDB } from '../redisKeyValueDB'
const client = new RedisClient()
const db = new RedisKeyValueDB({ client })

const dao = new CommonKeyValueDao<Buffer>({
const dao = new CommonKeyValueDao<string, Buffer>({
db,
table: TEST_TABLE,
})
Expand All @@ -23,8 +23,7 @@ test('connect', async () => {
})

describe('runCommonKeyValueDBTest', () => runCommonKeyValueDBTest(db))

describe('runCommonKeyValueDaoTest', () => runCommonKeyValueDaoTest(dao))
describe('runCommonKeyValueDaoTest', () => runCommonKeyValueDaoTest(db))

test('saveBatch with EXAT', async () => {
const testIds = _range(1, 4).map(n => `id${n}`)
Expand Down
28 changes: 20 additions & 8 deletions yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -1007,9 +1007,9 @@
typescript "^5.0.2"

"@naturalcycles/db-lib@^9.9.2":
version "9.22.0"
resolved "https://registry.yarnpkg.com/@naturalcycles/db-lib/-/db-lib-9.22.0.tgz#d01153b033e22155ade705aa049fe089978c5798"
integrity sha512-989fWQqlfMrtoaKxzqWN2Eh7Y7MrJcqoq5wl7Uldm84eUe3OUY87H0BYgGr/1kO309l2EuzhEkkEzcuO9QyKJw==
version "9.23.1"
resolved "https://registry.yarnpkg.com/@naturalcycles/db-lib/-/db-lib-9.23.1.tgz#0bdcb67032762755c75c75d07529dea4f1c20225"
integrity sha512-yQQwC8g21eQTstDSuiob1sKyJ6d6pgkP8tZ3NPqX/Sk91LXX0MZb3dqq7djh1o0nqV6etjBG1stM+ZszHBd/0w==
dependencies:
"@naturalcycles/js-lib" "^14.116.0"
"@naturalcycles/nodejs-lib" "^13.1.1"
Expand Down Expand Up @@ -1261,7 +1261,14 @@
dependencies:
"@types/node" "*"

"@types/node@*", "@types/node@^22.0.0", "@types/node@^22.7.5":
"@types/node@*":
version "22.7.6"
resolved "https://registry.yarnpkg.com/@types/node/-/node-22.7.6.tgz#3ec3e2b071e136cd11093c19128405e1d1f92f33"
integrity sha512-/d7Rnj0/ExXDMcioS78/kf1lMzYk4BZV8MZGTBKzTGZ6/406ukkbYlIsZmMPhcR5KlkunDHQLrtAVmSq7r+mSw==
dependencies:
undici-types "~6.19.2"

"@types/node@^22.0.0", "@types/node@^22.7.5":
version "22.7.5"
resolved "https://registry.yarnpkg.com/@types/node/-/node-22.7.5.tgz#cfde981727a7ab3611a481510b473ae54442b92b"
integrity sha512-jML7s2NAzMWc//QSJ1a3prpk78cOPchGvXJsC3C6R6PSMoooztvRVQEz89gmBTBY1SPMaqo5teB4uNHPdetShQ==
Expand Down Expand Up @@ -2391,9 +2398,9 @@ fast-levenshtein@^2.0.6:
integrity sha512-DCXu6Ifhqcks7TZKY3Hxp3y6qphY5SJZmrWMDrKcERSOXWQdMhU9Ig/PYrzyw/ul9jOIyh0N4M0tbC5hodg8dw==

fast-uri@^3.0.1:
version "3.0.2"
resolved "https://registry.yarnpkg.com/fast-uri/-/fast-uri-3.0.2.tgz#d78b298cf70fd3b752fd951175a3da6a7b48f024"
integrity sha512-GR6f0hD7XXyNJa25Tb9BuIdN0tdr+0BMi6/CJPH3wJO1JjNG3n/VsSw38AwRdKZABm8lGbPfakLRkYzx2V9row==
version "3.0.3"
resolved "https://registry.yarnpkg.com/fast-uri/-/fast-uri-3.0.3.tgz#892a1c91802d5d7860de728f18608a0573142241"
integrity sha512-aLrHthzCjH5He4Z2H9YZ+v6Ujb9ocRuW6ZzkJQOrTxleEijANq4v1TsaPaVG1PZcuurEzrLcWRyYBYXD5cEiaw==

fastq@^1.6.0:
version "1.17.1"
Expand Down Expand Up @@ -4480,7 +4487,12 @@ ts-node@^10.0.0:
v8-compile-cache-lib "^3.0.1"
yn "3.1.1"

tslib@^2.0.0, tslib@^2.6.2, tslib@^2.6.3:
tslib@^2.0.0:
version "2.8.0"
resolved "https://registry.yarnpkg.com/tslib/-/tslib-2.8.0.tgz#d124c86c3c05a40a91e6fdea4021bd31d377971b"
integrity sha512-jWVzBLplnCmoaTr13V9dYbiQ99wvZRd0vNWaDRg+aVYRcjDF3nDksxFDE/+fkXnKhpnUUkmx5pK/v8mCtLVqZA==

tslib@^2.6.2, tslib@^2.6.3:
version "2.7.0"
resolved "https://registry.yarnpkg.com/tslib/-/tslib-2.7.0.tgz#d9b40c5c40ab59e8738f297df3087bf1a2690c01"
integrity sha512-gLXCKdN1/j47AiHiOkJN69hJmcbGTHI0ImLmbYLHykhgeN0jVGola9yVjFgzCUklsZQMW55o+dW7IXv3RCXDzA==
Expand Down