Skip to content

Commit

Permalink
chore: replace redisHashKeyValueDB with new implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
mrnagydavid committed Oct 18, 2024
1 parent ef49329 commit 4d88bb5
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 266 deletions.
98 changes: 33 additions & 65 deletions src/redisHashKeyValueDB.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,30 @@ import {
IncrementTuple,
KeyValueDBTuple,
} from '@naturalcycles/db-lib'
import { _chunk, StringMap } from '@naturalcycles/js-lib'
import { _chunk } from '@naturalcycles/js-lib'
import { ReadableTyped } from '@naturalcycles/nodejs-lib'
import { RedisClient } from './redisClient'
import { RedisKeyValueDBCfg } from './redisKeyValueDB'

export interface RedisHashKeyValueDBCfg extends RedisKeyValueDBCfg {
hashKey: string
}

export class RedisHashKeyValueDB implements CommonKeyValueDB, AsyncDisposable {
client: RedisClient
keyOfHashField: string

constructor(cfg: RedisHashKeyValueDBCfg) {
/**
* RedisHashKeyValueDB is a KeyValueDB implementation that uses hash fields to simulate tables.
* The value in the `table` arguments points to a hash field in Redis.
*
* The reason for having this approach and also the traditional RedisKeyValueDB is that
* the currently available Redis versions (in Memorystore, or on MacOs) do not support
* expiring hash properties.
* The expiring fields feature is important, and only available via RedisKeyValueDB.
*
* Once the available Redis version reaches 7.4.0+,
* this implementation can take over for RedisKeyValueDB.
*/
export class RedishHashKeyValueDB implements CommonKeyValueDB, AsyncDisposable {
constructor(cfg: RedisKeyValueDBCfg) {
this.client = cfg.client
this.keyOfHashField = cfg.hashKey
}

client: RedisClient

support = {
...commonKeyValueDBFullSupport,
}
Expand All @@ -39,13 +45,13 @@ export class RedisHashKeyValueDB implements CommonKeyValueDB, AsyncDisposable {
async getByIds(table: string, ids: string[]): Promise<KeyValueDBTuple[]> {
if (!ids.length) return []
// we assume that the order of returned values is the same as order of input ids
const bufs = await this.client.hmgetBuffer(this.keyOfHashField, this.idsToKeys(table, ids))
const bufs = await this.client.hmgetBuffer(table, ids)
return bufs.map((buf, i) => [ids[i], buf] as KeyValueDBTuple).filter(([_k, v]) => v !== null)
}

async deleteByIds(table: string, ids: string[]): Promise<void> {
if (!ids.length) return
await this.client.hdel(this.keyOfHashField, this.idsToKeys(table, ids))
await this.client.hdel(table, ids)
}

async saveBatch(
Expand All @@ -55,42 +61,34 @@ export class RedisHashKeyValueDB implements CommonKeyValueDB, AsyncDisposable {
): Promise<void> {
if (!entries.length) return

const entriesWithKey = entries.map(([k, v]) => [this.idToKey(table, k), v])
const map: StringMap<any> = Object.fromEntries(entriesWithKey)
const record = Object.fromEntries(entries)

if (opt?.expireAt) {
await this.client.hsetWithTTL(this.keyOfHashField, map, opt.expireAt)
await this.client.hsetWithTTL(table, record, opt.expireAt)
} else {
await this.client.hset(this.keyOfHashField, map)
await this.client.hset(table, record)
}
}

streamIds(table: string, limit?: number): ReadableTyped<string> {
let stream = this.client
.hscanStream(this.keyOfHashField, {
match: `${table}:*`,
})
const stream = this.client
.hscanStream(table)
.flatMap(keyValueList => {
const keys: string[] = []
keyValueList.forEach((keyOrValue, index) => {
if (index % 2 !== 0) return
keys.push(keyOrValue)
})
return this.keysToIds(table, keys)
return keys
})

if (limit) {
stream = stream.take(limit)
}
.take(limit || Infinity)

return stream
}

streamValues(table: string, limit?: number): ReadableTyped<Buffer> {
return this.client
.hscanStream(this.keyOfHashField, {
match: `${table}:*`,
})
.hscanStream(table)
.flatMap(keyValueList => {
const values: string[] = []
keyValueList.forEach((keyOrValue, index) => {
Expand All @@ -104,57 +102,27 @@ export class RedisHashKeyValueDB implements CommonKeyValueDB, AsyncDisposable {

streamEntries(table: string, limit?: number): ReadableTyped<KeyValueDBTuple> {
return this.client
.hscanStream(this.keyOfHashField, {
match: `${table}:*`,
})
.hscanStream(table)
.flatMap(keyValueList => {
const entries = _chunk(keyValueList, 2)
const entries = _chunk(keyValueList, 2) as [string, string][]
return entries.map(([k, v]) => {
return [this.keyToId(table, String(k)), Buffer.from(String(v))] satisfies KeyValueDBTuple
return [k, Buffer.from(String(v))] satisfies KeyValueDBTuple
})
})
.take(limit || Infinity)
}

async count(table: string): Promise<number> {
return await this.client.hscanCount(this.keyOfHashField, {
match: `${table}:*`,
})
return await this.client.hscanCount(table)
}

async incrementBatch(table: string, increments: IncrementTuple[]): Promise<IncrementTuple[]> {
const incrementTuplesWithInternalKeys = increments.map(
([id, v]) => [this.idToKey(table, id), v] as [string, number],
)
const resultsWithInternalKeys = await this.client.hincrBatch(
this.keyOfHashField,
incrementTuplesWithInternalKeys,
)
const results = resultsWithInternalKeys.map(
([k, v]) => [this.keyToId(table, k), v] as IncrementTuple,
)
return results
return await this.client.hincrBatch(table, increments)
}

async createTable(table: string, opt?: CommonDBCreateOptions): Promise<void> {
if (!opt?.dropIfExists) return

await this.client.dropTable(table)
}

private idsToKeys(table: string, ids: string[]): string[] {
return ids.map(id => this.idToKey(table, id))
}

private idToKey(table: string, id: string): string {
return `${table}:${id}`
}

private keysToIds(table: string, keys: string[]): string[] {
return keys.map(key => this.keyToId(table, key))
}

private keyToId(table: string, key: string): string {
return key.slice(table.length + 1)
await this.client.del([table])
}
}
119 changes: 0 additions & 119 deletions src/redisKeyValueDB2.ts

This file was deleted.

54 changes: 44 additions & 10 deletions src/test/redis.hash.manual.test.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
import {
runCommonKeyValueDaoTest,
runCommonKeyValueDBTest,
TEST_TABLE,
} from '@naturalcycles/db-lib/dist/testing'
import { CommonKeyValueDao, CommonKeyValueDaoMemoCache } from '@naturalcycles/db-lib'
import { runCommonKeyValueDBTest, TEST_TABLE } from '@naturalcycles/db-lib/dist/testing'
import { runCommonKeyValueDaoTest } from '@naturalcycles/db-lib/dist/testing/keyValueDaoTest'
import { KeyValueDBTuple } from '@naturalcycles/db-lib/src/kv/commonKeyValueDB'
import { _range, localTime, pDelay } from '@naturalcycles/js-lib'
import { _AsyncMemo, _range, localTime, pDelay } from '@naturalcycles/js-lib'
import { RedisClient } from '../redisClient'
import { RedisHashKeyValueDB } from '../redisHashKeyValueDB'
import { RedishHashKeyValueDB } from '../redisHashKeyValueDB'

const client = new RedisClient()
const hashKey = 'hashField'
const db = new RedisHashKeyValueDB({ client, hashKey })
const db = new RedishHashKeyValueDB({ client })

const dao = new CommonKeyValueDao<string, Buffer>({
db,
table: TEST_TABLE,
})

afterAll(async () => {
await client.disconnect()
Expand All @@ -20,9 +22,10 @@ test('connect', async () => {
await db.ping()
})

describe('runCommonHashKeyValueDBTest', () => runCommonKeyValueDBTest(db))
describe('runCommonKeyValueDBTest', () => runCommonKeyValueDBTest(db))
describe('runCommonKeyValueDaoTest', () => runCommonKeyValueDaoTest(db))

// Saving expiring hash fields is not supported until Redis 7.4.0
test.skip('saveBatch with EXAT', async () => {
const testIds = _range(1, 4).map(n => `id${n}`)
const testEntries: KeyValueDBTuple[] = testIds.map(id => [id, Buffer.from(`${id}value`)])
Expand All @@ -36,3 +39,34 @@ test.skip('saveBatch with EXAT', async () => {
loaded = await db.getByIds(TEST_TABLE, testIds)
expect(loaded.length).toBe(0)
})

class C {
@_AsyncMemo({
cacheFactory: () =>
new CommonKeyValueDaoMemoCache({
dao,
ttl: 1,
}),
})
async get(k: string): Promise<Buffer | null> {
console.log(`get ${k}`)
return Buffer.from(k)
}
}

const c = new C()

test('CommonKeyValueDaoMemoCache serial', async () => {
for (const _ of _range(10)) {
console.log(await c.get('key'))
await pDelay(100)
}
})

test('CommonKeyValueDaoMemoCache async swarm', async () => {
await Promise.all(
_range(30).map(async () => {
console.log(await c.get('key'))
}),
)
})
Loading

0 comments on commit 4d88bb5

Please sign in to comment.