Skip to content

Commit

Permalink
feat: use hashfields as table namespaces
Browse files Browse the repository at this point in the history
  • Loading branch information
mrnagydavid committed Oct 18, 2024
1 parent d85ff72 commit ef49329
Show file tree
Hide file tree
Showing 3 changed files with 194 additions and 3 deletions.
6 changes: 3 additions & 3 deletions src/redisClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ export class RedisClient implements CommonClient {
Convenient type-safe wrapper.
Returns BATCHES of keys in each iteration (as-is).
*/
scanStream(opt: ScanStreamOptions): ReadableTyped<string[]> {
scanStream(opt?: ScanStreamOptions): ReadableTyped<string[]> {
return this.redis().scanStream(opt)
}

Expand All @@ -294,11 +294,11 @@ export class RedisClient implements CommonClient {
return count
}

hscanStream(key: string, opt: ScanStreamOptions): ReadableTyped<string[]> {
hscanStream(key: string, opt?: ScanStreamOptions): ReadableTyped<string[]> {
return this.redis().hscanStream(key, opt)
}

async hscanCount(key: string, opt: ScanStreamOptions): Promise<number> {
async hscanCount(key: string, opt?: ScanStreamOptions): Promise<number> {
let count = 0

const stream = this.redis().hscanStream(key, opt)
Expand Down
119 changes: 119 additions & 0 deletions src/redisKeyValueDB2.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
import {
CommonDBCreateOptions,
CommonKeyValueDB,
commonKeyValueDBFullSupport,
CommonKeyValueDBSaveBatchOptions,
IncrementTuple,
KeyValueDBTuple,
} from '@naturalcycles/db-lib'
import { _chunk } from '@naturalcycles/js-lib'
import { ReadableTyped } from '@naturalcycles/nodejs-lib'
import { RedisClient } from './redisClient'

export interface RedisKeyValueDBCfg {
client: RedisClient
}

export class RedisKeyValueDB2 implements CommonKeyValueDB, AsyncDisposable {
constructor(cfg: RedisKeyValueDBCfg) {
this.client = cfg.client
}

client: RedisClient

support = {
...commonKeyValueDBFullSupport,
}

async ping(): Promise<void> {
await this.client.ping()
}

async [Symbol.asyncDispose](): Promise<void> {
await this.client.disconnect()
}

async getByIds(table: string, ids: string[]): Promise<KeyValueDBTuple[]> {
if (!ids.length) return []
// we assume that the order of returned values is the same as order of input ids
const bufs = await this.client.hmgetBuffer(table, ids)
return bufs.map((buf, i) => [ids[i], buf] as KeyValueDBTuple).filter(([_k, v]) => v !== null)
}

async deleteByIds(table: string, ids: string[]): Promise<void> {
if (!ids.length) return
await this.client.hdel(table, ids)
}

async saveBatch(
table: string,
entries: KeyValueDBTuple[],
opt?: CommonKeyValueDBSaveBatchOptions,
): Promise<void> {
if (!entries.length) return

const record = Object.fromEntries(entries)

if (opt?.expireAt) {
await this.client.hsetWithTTL(table, record, opt.expireAt)
} else {
await this.client.hset(table, record)
}
}

streamIds(table: string, limit?: number): ReadableTyped<string> {
const stream = this.client
.hscanStream(table)
.flatMap(keyValueList => {
const keys: string[] = []
keyValueList.forEach((keyOrValue, index) => {
if (index % 2 !== 0) return
keys.push(keyOrValue)
})
return keys
})
.take(limit || Infinity)

return stream
}

streamValues(table: string, limit?: number): ReadableTyped<Buffer> {
return this.client
.hscanStream(table)
.flatMap(keyValueList => {
const values: string[] = []
keyValueList.forEach((keyOrValue, index) => {
if (index % 2 !== 1) return
values.push(keyOrValue)
})
return values.map(v => Buffer.from(v))
})
.take(limit || Infinity)
}

streamEntries(table: string, limit?: number): ReadableTyped<KeyValueDBTuple> {
return this.client
.hscanStream(table)
.flatMap(keyValueList => {
const entries = _chunk(keyValueList, 2) as [string, string][]
return entries.map(([k, v]) => {
return [k, Buffer.from(String(v))] satisfies KeyValueDBTuple
})
})
.take(limit || Infinity)
}

async count(table: string): Promise<number> {
return await this.client.hscanCount(table)
}

async incrementBatch(table: string, increments: IncrementTuple[]): Promise<IncrementTuple[]> {
return await this.client.hincrBatch(table, increments)
}

async createTable(table: string, opt?: CommonDBCreateOptions): Promise<void> {
if (!opt?.dropIfExists) return

await this.client.del([table])
}
}
72 changes: 72 additions & 0 deletions src/test/redis2.manual.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
import { CommonKeyValueDao, CommonKeyValueDaoMemoCache } from '@naturalcycles/db-lib'
import { runCommonKeyValueDBTest, TEST_TABLE } from '@naturalcycles/db-lib/dist/testing'
import { runCommonKeyValueDaoTest } from '@naturalcycles/db-lib/dist/testing/keyValueDaoTest'
import { KeyValueDBTuple } from '@naturalcycles/db-lib/src/kv/commonKeyValueDB'
import { _AsyncMemo, _range, localTime, pDelay } from '@naturalcycles/js-lib'
import { RedisClient } from '../redisClient'
import { RedisKeyValueDB2 } from '../redisKeyValueDB2'

const client = new RedisClient()
const db = new RedisKeyValueDB2({ client })

const dao = new CommonKeyValueDao<string, Buffer>({
db,
table: TEST_TABLE,
})

afterAll(async () => {
await client.disconnect()
})

test('connect', async () => {
await db.ping()
})

describe('runCommonKeyValueDBTest', () => runCommonKeyValueDBTest(db))
describe('runCommonKeyValueDaoTest', () => runCommonKeyValueDaoTest(db))

// Saving expiring hash fields is not supported until Redis 7.4.0
test.skip('saveBatch with EXAT', async () => {
const testIds = _range(1, 4).map(n => `id${n}`)
const testEntries: KeyValueDBTuple[] = testIds.map(id => [id, Buffer.from(`${id}value`)])

await db.saveBatch(TEST_TABLE, testEntries, {
expireAt: localTime.now().plus(1, 'second').unix,
})
let loaded = await db.getByIds(TEST_TABLE, testIds)
expect(loaded.length).toBe(3)
await pDelay(2000)
loaded = await db.getByIds(TEST_TABLE, testIds)
expect(loaded.length).toBe(0)
})

class C {
@_AsyncMemo({
cacheFactory: () =>
new CommonKeyValueDaoMemoCache({
dao,
ttl: 1,
}),
})
async get(k: string): Promise<Buffer | null> {
console.log(`get ${k}`)
return Buffer.from(k)
}
}

const c = new C()

test('CommonKeyValueDaoMemoCache serial', async () => {
for (const _ of _range(10)) {
console.log(await c.get('key'))
await pDelay(100)
}
})

test('CommonKeyValueDaoMemoCache async swarm', async () => {
await Promise.all(
_range(30).map(async () => {
console.log(await c.get('key'))
}),
)
})

0 comments on commit ef49329

Please sign in to comment.