Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add RedisHashKeyValueDB #16

Merged
merged 14 commits into from
Oct 2, 2024
1 change: 1 addition & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
export * from './redisClient'
export * from './redisKeyValueDB'
export * from './redisHashKeyValueDB'
132 changes: 132 additions & 0 deletions src/redisClient.manual.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
import { localTime } from '@naturalcycles/js-lib'
import { RedisClient } from './redisClient'

let client: RedisClient

beforeAll(() => {
client = new RedisClient()
})

beforeEach(async () => {
await client.dropTable('test')
})

afterAll(async () => {
await client.dropTable('test')
await client.disconnect()
})

describe('hashmap functions', () => {
test('hset should save a map', async () => {
await client.hset('test:key', { foo: 'bar' })

const result = await client.hgetall('test:key')

expect(result).toEqual({ foo: 'bar' })
})

test('should store/fetch numbers as strings', async () => {
await client.hset('test:key', { one: 1 })

const result = await client.hgetall('test:key')

expect(result).toEqual({ one: '1' })
})

test('hgetall should not fetch nested objects', async () => {
await client.hset('test:key', { nested: { one: 1 } })

const result = await client.hgetall('test:key')

expect(result).toEqual({ nested: '[object Object]' })
})

test('hget should fetch map property', async () => {
await client.hset('test:key', { foo: 'bar' })

const result = await client.hget('test:key', 'foo')

expect(result).toBe('bar')
})

test('hget should fetch value as string', async () => {
await client.hset('test:key', { one: 1 })

const result = await client.hget('test:key', 'one')

expect(result).toBe('1')
})

test('hmgetBuffer should get the values of the fields as strings', async () => {
await client.hset('test:key', { one: 1, two: 2, three: 3 })

const result = await client.hmget('test:key', ['one', 'three'])

expect(result).toEqual(['1', '3'])
})

test('hmgetBuffer should get the values of the fields as buffers', async () => {
await client.hset('test:key', { one: 1, two: 2, three: 3 })

const result = await client.hmgetBuffer('test:key', ['one', 'three'])

expect(result).toEqual([Buffer.from('1'), Buffer.from('3')])
})

test('hincr should change the value and return with a numeric result', async () => {
await client.hset('test:key', { one: 1 })

const result = await client.hincr('test:key', 'one', -2)

expect(result).toBe(-1)
})

test('hincr should increase the value by 1 by default', async () => {
await client.hset('test:key', { one: 1 })

const result = await client.hincr('test:key', 'one')

expect(result).toBe(2)
})

test('hincr should set the value to 1 for a non-existing field', async () => {
const result = await client.hincr('test:key', 'one')

expect(result).toBe(1)
})

test('hScanCount should return the number of keys in the hash', async () => {
await client.hset('test:key', { one: 1, two: 2, three: 3 })

const result = await client.hScanCount('test:key', {})

expect(result).toBe(3)
})

test('hScanCount with a match pattern should return the number of matching keys in the hash', async () => {
await client.hset('test:key', { one: 1, two: 2, three: 3 })

const result = await client.hScanCount('test:key', { match: 't*' })

expect(result).toBe(2)
})

test('hdel should delete a fields from the hash', async () => {
await client.hset('test:key', { one: 1, two: 2, three: 3 })

await client.hdel('test:key', ['two', 'three'])

const result = await client.hgetall('test:key')
expect(result).toEqual({ one: '1' })
})

test('hsetWithTTL should set the fields with expiry', async () => {
const now = localTime.now().unix

await client.hsetWithTTL('test:key', { foo1: 'bar' }, now + 1000)
await client.hsetWithTTL('test:key', { foo2: 'bar' }, now - 1)

const result = await client.hgetall('test:key')
expect(result).toEqual({ foo1: 'bar' })
})
})
63 changes: 61 additions & 2 deletions src/redisClient.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import {
AnyObject,
CommonLogger,
NullableBuffer,
NullableString,
Expand Down Expand Up @@ -124,6 +125,38 @@ export class RedisClient implements CommonClient {
await this.redis().set(key, value)
}

async hgetall<T extends Record<string, string> = Record<string, string>>(
key: string,
): Promise<T | null> {
const result = await this.redis().hgetall(key)
if (Object.keys(result).length === 0) return null
return result as T
}

async hget(key: string, field: string): Promise<NullableString> {
return await this.redis().hget(key, field)
}

async hset(key: string, value: AnyObject): Promise<void> {
await this.redis().hset(key, value)
}

async hdel(key: string, fields: string[]): Promise<void> {
await this.redis().hdel(key, ...fields)
}

async hmget(key: string, fields: string[]): Promise<NullableString[]> {
return await this.redis().hmget(key, ...fields)
}

async hmgetBuffer(key: string, fields: string[]): Promise<NullableBuffer[]> {
return await this.redis().hmgetBuffer(key, ...fields)
}

async hincr(key: string, field: string, increment: number = 1): Promise<number> {
return await this.redis().hincrby(key, field, increment)
}

async setWithTTL(
key: string,
value: string | number | Buffer,
Expand All @@ -132,6 +165,16 @@ export class RedisClient implements CommonClient {
await this.redis().set(key, value, 'EXAT', expireAt)
}

async hsetWithTTL(key: string, value: AnyObject, expireAt: UnixTimestampNumber): Promise<void> {
const valueKeys = Object.keys(value)
const numberOfKeys = valueKeys.length
const keyList = valueKeys.join(' ')
const commandString = `HEXPIREAT ${key} ${expireAt} FIELDS ${numberOfKeys} ${keyList}`
const [command, ...args] = commandString.split(' ')
await this.redis().hset(key, value)
await this.redis().call(command!, args)
}

async mset(obj: Record<string, string | number>): Promise<void> {
await this.redis().mset(obj)
}
Expand All @@ -140,8 +183,8 @@ export class RedisClient implements CommonClient {
await this.redis().mset(obj)
}

async incr(key: string): Promise<number> {
return await this.redis().incr(key)
async incr(key: string, by: number = 1): Promise<number> {
return await this.redis().incrby(key, by)
}

async ttl(key: string): Promise<number> {
Expand Down Expand Up @@ -205,6 +248,22 @@ export class RedisClient implements CommonClient {
return count
}

hscanStream(key: string, opt: ScanStreamOptions): ReadableTyped<string[]> {
return this.redis().hscanStream(key, opt)
}

async hScanCount(key: string, opt: ScanStreamOptions): Promise<number> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should it be in lowercase? (consistent)

Suggested change
async hScanCount(key: string, opt: ScanStreamOptions): Promise<number> {
async hscanCount(key: string, opt: ScanStreamOptions): Promise<number> {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My impression was that you tried to follow the same naming pattern whenever we are just providing a wrapper:
for example async mget() wraps this.redis().mget() (instead of async mGet()).

And you followed camelCase whenever this lib provides extra funtionality, e.g.: mgetBuffer or scanStreamFlat.

So my logic was:

  • hscanStream() wraps this.redis().hscanStream() call, hence identically named
  • hScanCount() provides custom functionality (i.e. there is no redis().hscanCount call), hence camelCase

TBH I don't know and I don't feel strongly about it, so I trust your judgement in using lowercase

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No strong opinion either, I just thought it's a typo. You can pick how you prefer.

My gut feeling is with lowercase hscan though

let count = 0

const stream = this.redis().hscanStream(key, opt)

await stream.forEach((keyValueList: string[]) => {
count += keyValueList.length / 2
})

return count
}

async withPipeline(fn: (pipeline: ChainableCommander) => Promisable<void>): Promise<void> {
const pipeline = this.redis().pipeline()
await fn(pipeline)
Expand Down
159 changes: 159 additions & 0 deletions src/redisHashKeyValueDB.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
import {
CommonKeyValueDBSaveBatchOptions,
CommonDBCreateOptions,
CommonKeyValueDB,
KeyValueDBTuple,
} from '@naturalcycles/db-lib'
import { _chunk, _mapObject, StringMap } from '@naturalcycles/js-lib'
import { ReadableTyped } from '@naturalcycles/nodejs-lib'
import { RedisClient } from './redisClient'
import { RedisKeyValueDBCfg } from './redisKeyValueDB'

export interface RedisHashKeyValueDBCfg extends RedisKeyValueDBCfg {
hashKey: string
}

export class RedisHashKeyValueDB implements CommonKeyValueDB, AsyncDisposable {
client: RedisClient
keyOfHashField: string

constructor(cfg: RedisHashKeyValueDBCfg) {
this.client = cfg.client
this.keyOfHashField = cfg.hashKey
}

async ping(): Promise<void> {
await this.client.ping()
}

async [Symbol.asyncDispose](): Promise<void> {
await this.client.disconnect()
}

async getByIds(table: string, ids: string[]): Promise<KeyValueDBTuple[]> {
if (!ids.length) return []
// we assume that the order of returned values is the same as order of input ids
const bufs = await this.client.hmgetBuffer(this.keyOfHashField, this.idsToKeys(table, ids))
return bufs.map((buf, i) => [ids[i], buf] as KeyValueDBTuple).filter(([_k, v]) => v !== null)
}

async deleteByIds(table: string, ids: string[]): Promise<void> {
if (!ids.length) return
await this.client.hdel(this.keyOfHashField, this.idsToKeys(table, ids))
}

async saveBatch(
table: string,
entries: KeyValueDBTuple[],
opt?: CommonKeyValueDBSaveBatchOptions,
): Promise<void> {
if (!entries.length) return

const map: StringMap<any> = _mapObject(Object.fromEntries(entries), (k, v) => [
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

p3: I'd use native map here (instead of _mapObject), like:

const map = Object.fromEntries(entries.map(...))

why?
Since you're already using native Object.fromEntries.

this.idToKey(table, String(k)),
v,
])

if (opt?.expireAt) {
await this.client.hsetWithTTL(this.keyOfHashField, map, opt.expireAt)
} else {
await this.client.hset(this.keyOfHashField, map)
}
}

streamIds(table: string, limit?: number): ReadableTyped<string> {
let stream = this.client
.hscanStream(this.keyOfHashField, {
match: `${table}:*`,
})
.flatMap(keyValueList => {
const keys: string[] = []
keyValueList.forEach((keyOrValue, index) => {
if (index % 2 !== 0) return
keys.push(keyOrValue)
})
return this.keysToIds(table, keys)
})

if (limit) {
stream = stream.take(limit)
}

return stream
}

streamValues(table: string, limit?: number): ReadableTyped<Buffer> {
return this.client
.hscanStream(this.keyOfHashField, {
match: `${table}:*`,
})
.flatMap(
async keyValueList => {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Q: why is it async (and has concurrency), while I see no await inside?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy-pasted from redisKeyValueDB where it is correctly async.
As I re-wrote the function, I didn't notice that I removed the awaited calls in it.

const values: string[] = []
keyValueList.forEach((keyOrValue, index) => {
if (index % 2 !== 1) return
values.push(keyOrValue)
})
return values.map(v => Buffer.from(v))
},
{
concurrency: 16,
},
)
.take(limit || Infinity)
}

streamEntries(table: string, limit?: number | undefined): ReadableTyped<KeyValueDBTuple> {
return this.client
.hscanStream(this.keyOfHashField, {
match: `${table}:*`,
})
.flatMap(
async keyValueList => {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Q why async?

const entries = _chunk(keyValueList, 2)
return entries.map(([k, v]) => {
return [
this.keyToId(table, String(k)),
Buffer.from(String(v)),
] satisfies KeyValueDBTuple
})
},
{
concurrency: 16,
},
)
.take(limit || Infinity)
}

async count(table: string): Promise<number> {
return await this.client.hScanCount(this.keyOfHashField, {
match: `${table}:*`,
})
}

async increment(table: string, id: string, by: number = 1): Promise<number> {
return await this.client.hincr(this.keyOfHashField, this.idToKey(table, id), by)
}

async createTable(table: string, opt?: CommonDBCreateOptions): Promise<void> {
if (!opt?.dropIfExists) return

await this.client.dropTable(table)
}

private idsToKeys(table: string, ids: string[]): string[] {
return ids.map(id => this.idToKey(table, id))
}

private idToKey(table: string, id: string): string {
return `${table}:${id}`
}

private keysToIds(table: string, keys: string[]): string[] {
return keys.map(key => this.keyToId(table, key))
}

private keyToId(table: string, key: string): string {
return key.slice(table.length + 1)
}
}
Loading