Skip to content

Commit

Permalink
feat: use hash fields as table namespace (#18)
Browse files Browse the repository at this point in the history
  • Loading branch information
mrnagydavid authored Oct 18, 2024
1 parent d85ff72 commit 9dec99d
Show file tree
Hide file tree
Showing 4 changed files with 110 additions and 116 deletions.
6 changes: 3 additions & 3 deletions src/redisClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ export class RedisClient implements CommonClient {
Convenient type-safe wrapper.
Returns BATCHES of keys in each iteration (as-is).
*/
scanStream(opt: ScanStreamOptions): ReadableTyped<string[]> {
scanStream(opt?: ScanStreamOptions): ReadableTyped<string[]> {
return this.redis().scanStream(opt)
}

Expand All @@ -294,11 +294,11 @@ export class RedisClient implements CommonClient {
return count
}

hscanStream(key: string, opt: ScanStreamOptions): ReadableTyped<string[]> {
hscanStream(key: string, opt?: ScanStreamOptions): ReadableTyped<string[]> {
return this.redis().hscanStream(key, opt)
}

async hscanCount(key: string, opt: ScanStreamOptions): Promise<number> {
async hscanCount(key: string, opt?: ScanStreamOptions): Promise<number> {
let count = 0

const stream = this.redis().hscanStream(key, opt)
Expand Down
132 changes: 48 additions & 84 deletions src/redisHashKeyValueDB.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,46 +6,46 @@ import {
IncrementTuple,
KeyValueDBTuple,
} from '@naturalcycles/db-lib'
import { _chunk, StringMap } from '@naturalcycles/js-lib'
import { ReadableTyped } from '@naturalcycles/nodejs-lib'
import { RedisClient } from './redisClient'
import { RedisKeyValueDBCfg } from './redisKeyValueDB'

export interface RedisHashKeyValueDBCfg extends RedisKeyValueDBCfg {
hashKey: string
}

export class RedisHashKeyValueDB implements CommonKeyValueDB, AsyncDisposable {
client: RedisClient
keyOfHashField: string

constructor(cfg: RedisHashKeyValueDBCfg) {
this.client = cfg.client
this.keyOfHashField = cfg.hashKey
}
/**
* RedisHashKeyValueDB is a KeyValueDB implementation that uses hash fields to simulate tables.
* The value in the `table` arguments points to a hash field in Redis.
*
* The reason for having this approach and also the traditional RedisKeyValueDB is that
* the currently available Redis versions (in Memorystore, or on MacOs) do not support
* expiring hash properties.
* The expiring fields feature is important, and only available via RedisKeyValueDB.
*
* Once the available Redis version reaches 7.4.0+,
* this implementation can take over for RedisKeyValueDB.
*/
export class RedishHashKeyValueDB implements CommonKeyValueDB, AsyncDisposable {
constructor(public cfg: RedisKeyValueDBCfg) {}

support = {
...commonKeyValueDBFullSupport,
}

async ping(): Promise<void> {
await this.client.ping()
await this.cfg.client.ping()
}

async [Symbol.asyncDispose](): Promise<void> {
await this.client.disconnect()
await this.cfg.client.disconnect()
}

async getByIds(table: string, ids: string[]): Promise<KeyValueDBTuple[]> {
if (!ids.length) return []
// we assume that the order of returned values is the same as order of input ids
const bufs = await this.client.hmgetBuffer(this.keyOfHashField, this.idsToKeys(table, ids))
const bufs = await this.cfg.client.hmgetBuffer(table, ids)
return bufs.map((buf, i) => [ids[i], buf] as KeyValueDBTuple).filter(([_k, v]) => v !== null)
}

async deleteByIds(table: string, ids: string[]): Promise<void> {
if (!ids.length) return
await this.client.hdel(this.keyOfHashField, this.idsToKeys(table, ids))
await this.cfg.client.hdel(table, ids)
}

async saveBatch(
Expand All @@ -55,106 +55,70 @@ export class RedisHashKeyValueDB implements CommonKeyValueDB, AsyncDisposable {
): Promise<void> {
if (!entries.length) return

const entriesWithKey = entries.map(([k, v]) => [this.idToKey(table, k), v])
const map: StringMap<any> = Object.fromEntries(entriesWithKey)
const record = Object.fromEntries(entries)

if (opt?.expireAt) {
await this.client.hsetWithTTL(this.keyOfHashField, map, opt.expireAt)
await this.cfg.client.hsetWithTTL(table, record, opt.expireAt)
} else {
await this.client.hset(this.keyOfHashField, map)
await this.cfg.client.hset(table, record)
}
}

streamIds(table: string, limit?: number): ReadableTyped<string> {
let stream = this.client
.hscanStream(this.keyOfHashField, {
match: `${table}:*`,
})
const stream = this.cfg.client
.hscanStream(table)
.flatMap(keyValueList => {
const keys: string[] = []
keyValueList.forEach((keyOrValue, index) => {
if (index % 2 !== 0) return
keys.push(keyOrValue)
})
return this.keysToIds(table, keys)
for (let i = 0; i < keyValueList.length; i += 2) {
keys.push(keyValueList[i]!)
}
return keys
})

if (limit) {
stream = stream.take(limit)
}
.take(limit || Infinity)

return stream
}

streamValues(table: string, limit?: number): ReadableTyped<Buffer> {
return this.client
.hscanStream(this.keyOfHashField, {
match: `${table}:*`,
})
return this.cfg.client
.hscanStream(table)
.flatMap(keyValueList => {
const values: string[] = []
keyValueList.forEach((keyOrValue, index) => {
if (index % 2 !== 1) return
values.push(keyOrValue)
})
return values.map(v => Buffer.from(v))
const values: Buffer[] = []
for (let i = 0; i < keyValueList.length; i += 2) {
const value = Buffer.from(keyValueList[i + 1]!)
values.push(value)
}
return values
})
.take(limit || Infinity)
}

streamEntries(table: string, limit?: number): ReadableTyped<KeyValueDBTuple> {
return this.client
.hscanStream(this.keyOfHashField, {
match: `${table}:*`,
})
return this.cfg.client
.hscanStream(table)
.flatMap(keyValueList => {
const entries = _chunk(keyValueList, 2)
return entries.map(([k, v]) => {
return [this.keyToId(table, String(k)), Buffer.from(String(v))] satisfies KeyValueDBTuple
})
const entries: [string, Buffer][] = []
for (let i = 0; i < keyValueList.length; i += 2) {
const key = keyValueList[i]!
const value = Buffer.from(keyValueList[i + 1]!)
entries.push([key, value])
}
return entries
})
.take(limit || Infinity)
}

async count(table: string): Promise<number> {
return await this.client.hscanCount(this.keyOfHashField, {
match: `${table}:*`,
})
return await this.cfg.client.hscanCount(table)
}

async incrementBatch(table: string, increments: IncrementTuple[]): Promise<IncrementTuple[]> {
const incrementTuplesWithInternalKeys = increments.map(
([id, v]) => [this.idToKey(table, id), v] as [string, number],
)
const resultsWithInternalKeys = await this.client.hincrBatch(
this.keyOfHashField,
incrementTuplesWithInternalKeys,
)
const results = resultsWithInternalKeys.map(
([k, v]) => [this.keyToId(table, k), v] as IncrementTuple,
)
return results
return await this.cfg.client.hincrBatch(table, increments)
}

async createTable(table: string, opt?: CommonDBCreateOptions): Promise<void> {
if (!opt?.dropIfExists) return

await this.client.dropTable(table)
}

private idsToKeys(table: string, ids: string[]): string[] {
return ids.map(id => this.idToKey(table, id))
}

private idToKey(table: string, id: string): string {
return `${table}:${id}`
}

private keysToIds(table: string, keys: string[]): string[] {
return keys.map(key => this.keyToId(table, key))
}

private keyToId(table: string, key: string): string {
return key.slice(table.length + 1)
await this.cfg.client.del([table])
}
}
34 changes: 15 additions & 19 deletions src/redisKeyValueDB.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,34 +15,30 @@ export interface RedisKeyValueDBCfg {
}

export class RedisKeyValueDB implements CommonKeyValueDB, AsyncDisposable {
constructor(cfg: RedisKeyValueDBCfg) {
this.client = cfg.client
}

client: RedisClient
constructor(public cfg: RedisKeyValueDBCfg) {}

support = {
...commonKeyValueDBFullSupport,
}

async ping(): Promise<void> {
await this.client.ping()
await this.cfg.client.ping()
}

async [Symbol.asyncDispose](): Promise<void> {
await this.client.disconnect()
await this.cfg.client.disconnect()
}

async getByIds(table: string, ids: string[]): Promise<KeyValueDBTuple[]> {
if (!ids.length) return []
// we assume that the order of returned values is the same as order of input ids
const bufs = await this.client.mgetBuffer(this.idsToKeys(table, ids))
const bufs = await this.cfg.client.mgetBuffer(this.idsToKeys(table, ids))
return bufs.map((buf, i) => [ids[i], buf] as KeyValueDBTuple).filter(([_k, v]) => v !== null)
}

async deleteByIds(table: string, ids: string[]): Promise<void> {
if (!ids.length) return
await this.client.del(this.idsToKeys(table, ids))
await this.cfg.client.del(this.idsToKeys(table, ids))
}

async saveBatch(
Expand All @@ -55,7 +51,7 @@ export class RedisKeyValueDB implements CommonKeyValueDB, AsyncDisposable {
if (opt?.expireAt) {
// There's no supported mset with TTL: https://stackoverflow.com/questions/16423342/redis-multi-set-with-a-ttl
// so we gonna use a pipeline instead
await this.client.withPipeline(pipeline => {
await this.cfg.client.withPipeline(pipeline => {
for (const [k, v] of entries) {
pipeline.set(this.idToKey(table, k), v, 'EXAT', opt.expireAt!)
}
Expand All @@ -64,12 +60,12 @@ export class RedisKeyValueDB implements CommonKeyValueDB, AsyncDisposable {
const obj: Record<string, Buffer> = Object.fromEntries(
entries.map(([k, v]) => [this.idToKey(table, k), v]) as KeyValueDBTuple[],
)
await this.client.msetBuffer(obj)
await this.cfg.client.msetBuffer(obj)
}
}

streamIds(table: string, limit?: number): ReadableTyped<string> {
let stream = this.client
let stream = this.cfg.client
.scanStream({
match: `${table}:*`,
// count: limit, // count is actually a "batchSize", not a limit
Expand All @@ -84,13 +80,13 @@ export class RedisKeyValueDB implements CommonKeyValueDB, AsyncDisposable {
}

streamValues(table: string, limit?: number): ReadableTyped<Buffer> {
return this.client
return this.cfg.client
.scanStream({
match: `${table}:*`,
})
.flatMap(
async keys => {
return (await this.client.mgetBuffer(keys)).filter(_isTruthy)
return (await this.cfg.client.mgetBuffer(keys)).filter(_isTruthy)
},
{
concurrency: 16,
Expand All @@ -100,14 +96,14 @@ export class RedisKeyValueDB implements CommonKeyValueDB, AsyncDisposable {
}

streamEntries(table: string, limit?: number): ReadableTyped<KeyValueDBTuple> {
return this.client
return this.cfg.client
.scanStream({
match: `${table}:*`,
})
.flatMap(
async keys => {
// casting as Buffer[], because values are expected to exist for given keys
const bufs = (await this.client.mgetBuffer(keys)) as Buffer[]
const bufs = (await this.cfg.client.mgetBuffer(keys)) as Buffer[]
return _zip(this.keysToIds(table, keys), bufs)
},
{
Expand All @@ -119,7 +115,7 @@ export class RedisKeyValueDB implements CommonKeyValueDB, AsyncDisposable {

async count(table: string): Promise<number> {
// todo: implement more efficiently, e.g via LUA?
return await this.client.scanCount({
return await this.cfg.client.scanCount({
match: `${table}:*`,
})
}
Expand All @@ -128,7 +124,7 @@ export class RedisKeyValueDB implements CommonKeyValueDB, AsyncDisposable {
const incrementTuplesWithInternalKeys = increments.map(
([id, v]) => [this.idToKey(table, id), v] as [string, number],
)
const resultsWithInternalKeys = await this.client.incrBatch(incrementTuplesWithInternalKeys)
const resultsWithInternalKeys = await this.cfg.client.incrBatch(incrementTuplesWithInternalKeys)
const results = resultsWithInternalKeys.map(
([k, v]) => [this.keyToId(table, k), v] as IncrementTuple,
)
Expand All @@ -138,7 +134,7 @@ export class RedisKeyValueDB implements CommonKeyValueDB, AsyncDisposable {
async createTable(table: string, opt?: CommonDBCreateOptions): Promise<void> {
if (!opt?.dropIfExists) return

await this.client.dropTable(table)
await this.cfg.client.dropTable(table)
}

private idsToKeys(table: string, ids: string[]): string[] {
Expand Down
Loading

0 comments on commit 9dec99d

Please sign in to comment.