Skip to content

Commit

Permalink
feat: add RedisHashKeyValueDB (#16)
Browse files Browse the repository at this point in the history
  • Loading branch information
mrnagydavid authored Oct 2, 2024
1 parent a26f76b commit 6522566
Show file tree
Hide file tree
Showing 7 changed files with 391 additions and 11 deletions.
1 change: 1 addition & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
export * from './redisClient'
export * from './redisKeyValueDB'
export * from './redisHashKeyValueDB'
132 changes: 132 additions & 0 deletions src/redisClient.manual.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
import { localTime } from '@naturalcycles/js-lib'
import { RedisClient } from './redisClient'

let client: RedisClient

beforeAll(() => {
client = new RedisClient()
})

beforeEach(async () => {
await client.dropTable('test')
})

afterAll(async () => {
await client.dropTable('test')
await client.disconnect()
})

describe('hashmap functions', () => {
test('hset should save a map', async () => {
await client.hset('test:key', { foo: 'bar' })

const result = await client.hgetall('test:key')

expect(result).toEqual({ foo: 'bar' })
})

test('should store/fetch numbers as strings', async () => {
await client.hset('test:key', { one: 1 })

const result = await client.hgetall('test:key')

expect(result).toEqual({ one: '1' })
})

test('hgetall should not fetch nested objects', async () => {
await client.hset('test:key', { nested: { one: 1 } })

const result = await client.hgetall('test:key')

expect(result).toEqual({ nested: '[object Object]' })
})

test('hget should fetch map property', async () => {
await client.hset('test:key', { foo: 'bar' })

const result = await client.hget('test:key', 'foo')

expect(result).toBe('bar')
})

test('hget should fetch value as string', async () => {
await client.hset('test:key', { one: 1 })

const result = await client.hget('test:key', 'one')

expect(result).toBe('1')
})

test('hmgetBuffer should get the values of the fields as strings', async () => {
await client.hset('test:key', { one: 1, two: 2, three: 3 })

const result = await client.hmget('test:key', ['one', 'three'])

expect(result).toEqual(['1', '3'])
})

test('hmgetBuffer should get the values of the fields as buffers', async () => {
await client.hset('test:key', { one: 1, two: 2, three: 3 })

const result = await client.hmgetBuffer('test:key', ['one', 'three'])

expect(result).toEqual([Buffer.from('1'), Buffer.from('3')])
})

test('hincr should change the value and return with a numeric result', async () => {
await client.hset('test:key', { one: 1 })

const result = await client.hincr('test:key', 'one', -2)

expect(result).toBe(-1)
})

test('hincr should increase the value by 1 by default', async () => {
await client.hset('test:key', { one: 1 })

const result = await client.hincr('test:key', 'one')

expect(result).toBe(2)
})

test('hincr should set the value to 1 for a non-existing field', async () => {
const result = await client.hincr('test:key', 'one')

expect(result).toBe(1)
})

test('hscanCount should return the number of keys in the hash', async () => {
await client.hset('test:key', { one: 1, two: 2, three: 3 })

const result = await client.hscanCount('test:key', {})

expect(result).toBe(3)
})

test('hscanCount with a match pattern should return the number of matching keys in the hash', async () => {
await client.hset('test:key', { one: 1, two: 2, three: 3 })

const result = await client.hscanCount('test:key', { match: 't*' })

expect(result).toBe(2)
})

test('hdel should delete a fields from the hash', async () => {
await client.hset('test:key', { one: 1, two: 2, three: 3 })

await client.hdel('test:key', ['two', 'three'])

const result = await client.hgetall('test:key')
expect(result).toEqual({ one: '1' })
})

test('hsetWithTTL should set the fields with expiry', async () => {
const now = localTime.now().unix

await client.hsetWithTTL('test:key', { foo1: 'bar' }, now + 1000)
await client.hsetWithTTL('test:key', { foo2: 'bar' }, now - 1)

const result = await client.hgetall('test:key')
expect(result).toEqual({ foo1: 'bar' })
})
})
63 changes: 61 additions & 2 deletions src/redisClient.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import {
AnyObject,
CommonLogger,
NullableBuffer,
NullableString,
Expand Down Expand Up @@ -124,6 +125,38 @@ export class RedisClient implements CommonClient {
await this.redis().set(key, value)
}

async hgetall<T extends Record<string, string> = Record<string, string>>(
key: string,
): Promise<T | null> {
const result = await this.redis().hgetall(key)
if (Object.keys(result).length === 0) return null
return result as T
}

async hget(key: string, field: string): Promise<NullableString> {
return await this.redis().hget(key, field)
}

async hset(key: string, value: AnyObject): Promise<void> {
await this.redis().hset(key, value)
}

async hdel(key: string, fields: string[]): Promise<void> {
await this.redis().hdel(key, ...fields)
}

async hmget(key: string, fields: string[]): Promise<NullableString[]> {
return await this.redis().hmget(key, ...fields)
}

async hmgetBuffer(key: string, fields: string[]): Promise<NullableBuffer[]> {
return await this.redis().hmgetBuffer(key, ...fields)
}

async hincr(key: string, field: string, increment: number = 1): Promise<number> {
return await this.redis().hincrby(key, field, increment)
}

async setWithTTL(
key: string,
value: string | number | Buffer,
Expand All @@ -132,6 +165,16 @@ export class RedisClient implements CommonClient {
await this.redis().set(key, value, 'EXAT', expireAt)
}

async hsetWithTTL(key: string, value: AnyObject, expireAt: UnixTimestampNumber): Promise<void> {
const valueKeys = Object.keys(value)
const numberOfKeys = valueKeys.length
const keyList = valueKeys.join(' ')
const commandString = `HEXPIREAT ${key} ${expireAt} FIELDS ${numberOfKeys} ${keyList}`
const [command, ...args] = commandString.split(' ')
await this.redis().hset(key, value)
await this.redis().call(command!, args)
}

async mset(obj: Record<string, string | number>): Promise<void> {
await this.redis().mset(obj)
}
Expand All @@ -140,8 +183,8 @@ export class RedisClient implements CommonClient {
await this.redis().mset(obj)
}

async incr(key: string): Promise<number> {
return await this.redis().incr(key)
async incr(key: string, by: number = 1): Promise<number> {
return await this.redis().incrby(key, by)
}

async ttl(key: string): Promise<number> {
Expand Down Expand Up @@ -205,6 +248,22 @@ export class RedisClient implements CommonClient {
return count
}

hscanStream(key: string, opt: ScanStreamOptions): ReadableTyped<string[]> {
return this.redis().hscanStream(key, opt)
}

async hscanCount(key: string, opt: ScanStreamOptions): Promise<number> {
let count = 0

const stream = this.redis().hscanStream(key, opt)

await stream.forEach((keyValueList: string[]) => {
count += keyValueList.length / 2
})

return count
}

async withPipeline(fn: (pipeline: ChainableCommander) => Promisable<void>): Promise<void> {
const pipeline = this.redis().pipeline()
await fn(pipeline)
Expand Down
144 changes: 144 additions & 0 deletions src/redisHashKeyValueDB.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
import {
CommonKeyValueDBSaveBatchOptions,
CommonDBCreateOptions,
CommonKeyValueDB,
KeyValueDBTuple,
} from '@naturalcycles/db-lib'
import { _chunk, StringMap } from '@naturalcycles/js-lib'
import { ReadableTyped } from '@naturalcycles/nodejs-lib'
import { RedisClient } from './redisClient'
import { RedisKeyValueDBCfg } from './redisKeyValueDB'

export interface RedisHashKeyValueDBCfg extends RedisKeyValueDBCfg {
hashKey: string
}

export class RedisHashKeyValueDB implements CommonKeyValueDB, AsyncDisposable {
client: RedisClient
keyOfHashField: string

constructor(cfg: RedisHashKeyValueDBCfg) {
this.client = cfg.client
this.keyOfHashField = cfg.hashKey
}

async ping(): Promise<void> {
await this.client.ping()
}

async [Symbol.asyncDispose](): Promise<void> {
await this.client.disconnect()
}

async getByIds(table: string, ids: string[]): Promise<KeyValueDBTuple[]> {
if (!ids.length) return []
// we assume that the order of returned values is the same as order of input ids
const bufs = await this.client.hmgetBuffer(this.keyOfHashField, this.idsToKeys(table, ids))
return bufs.map((buf, i) => [ids[i], buf] as KeyValueDBTuple).filter(([_k, v]) => v !== null)
}

async deleteByIds(table: string, ids: string[]): Promise<void> {
if (!ids.length) return
await this.client.hdel(this.keyOfHashField, this.idsToKeys(table, ids))
}

async saveBatch(
table: string,
entries: KeyValueDBTuple[],
opt?: CommonKeyValueDBSaveBatchOptions,
): Promise<void> {
if (!entries.length) return

const entriesWithKey = entries.map(([k, v]) => [this.idToKey(table, k), v])
const map: StringMap<any> = Object.fromEntries(entriesWithKey)

if (opt?.expireAt) {
await this.client.hsetWithTTL(this.keyOfHashField, map, opt.expireAt)
} else {
await this.client.hset(this.keyOfHashField, map)
}
}

streamIds(table: string, limit?: number): ReadableTyped<string> {
let stream = this.client
.hscanStream(this.keyOfHashField, {
match: `${table}:*`,
})
.flatMap(keyValueList => {
const keys: string[] = []
keyValueList.forEach((keyOrValue, index) => {
if (index % 2 !== 0) return
keys.push(keyOrValue)
})
return this.keysToIds(table, keys)
})

if (limit) {
stream = stream.take(limit)
}

return stream
}

streamValues(table: string, limit?: number): ReadableTyped<Buffer> {
return this.client
.hscanStream(this.keyOfHashField, {
match: `${table}:*`,
})
.flatMap(keyValueList => {
const values: string[] = []
keyValueList.forEach((keyOrValue, index) => {
if (index % 2 !== 1) return
values.push(keyOrValue)
})
return values.map(v => Buffer.from(v))
})
.take(limit || Infinity)
}

streamEntries(table: string, limit?: number | undefined): ReadableTyped<KeyValueDBTuple> {
return this.client
.hscanStream(this.keyOfHashField, {
match: `${table}:*`,
})
.flatMap(keyValueList => {
const entries = _chunk(keyValueList, 2)
return entries.map(([k, v]) => {
return [this.keyToId(table, String(k)), Buffer.from(String(v))] satisfies KeyValueDBTuple
})
})
.take(limit || Infinity)
}

async count(table: string): Promise<number> {
return await this.client.hscanCount(this.keyOfHashField, {
match: `${table}:*`,
})
}

async increment(table: string, id: string, by: number = 1): Promise<number> {
return await this.client.hincr(this.keyOfHashField, this.idToKey(table, id), by)
}

async createTable(table: string, opt?: CommonDBCreateOptions): Promise<void> {
if (!opt?.dropIfExists) return

await this.client.dropTable(table)
}

private idsToKeys(table: string, ids: string[]): string[] {
return ids.map(id => this.idToKey(table, id))
}

private idToKey(table: string, id: string): string {
return `${table}:${id}`
}

private keysToIds(table: string, keys: string[]): string[] {
return keys.map(key => this.keyToId(table, key))
}

private keyToId(table: string, key: string): string {
return key.slice(table.length + 1)
}
}
4 changes: 4 additions & 0 deletions src/redisKeyValueDB.ts
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,10 @@ export class RedisKeyValueDB implements CommonKeyValueDB, AsyncDisposable {
})
}

async increment(table: string, id: string, by: number = 1): Promise<number> {
return await this.client.incr(this.idToKey(table, id), by)
}

async createTable(table: string, opt?: CommonDBCreateOptions): Promise<void> {
if (!opt?.dropIfExists) return

Expand Down
Loading

0 comments on commit 6522566

Please sign in to comment.