Skip to content

Commit

Permalink
chore: expose cfg
Browse files Browse the repository at this point in the history
  • Loading branch information
mrnagydavid committed Oct 18, 2024
1 parent 4d88bb5 commit d95257c
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 37 deletions.
31 changes: 13 additions & 18 deletions src/redisHashKeyValueDB.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import {
} from '@naturalcycles/db-lib'
import { _chunk } from '@naturalcycles/js-lib'
import { ReadableTyped } from '@naturalcycles/nodejs-lib'
import { RedisClient } from './redisClient'
import { RedisKeyValueDBCfg } from './redisKeyValueDB'

/**
Expand All @@ -24,34 +23,30 @@ import { RedisKeyValueDBCfg } from './redisKeyValueDB'
* this implementation can take over for RedisKeyValueDB.
*/
export class RedishHashKeyValueDB implements CommonKeyValueDB, AsyncDisposable {
constructor(cfg: RedisKeyValueDBCfg) {
this.client = cfg.client
}

client: RedisClient
constructor(public cfg: RedisKeyValueDBCfg) {}

support = {
...commonKeyValueDBFullSupport,
}

async ping(): Promise<void> {
await this.client.ping()
await this.cfg.client.ping()
}

async [Symbol.asyncDispose](): Promise<void> {
await this.client.disconnect()
await this.cfg.client.disconnect()
}

async getByIds(table: string, ids: string[]): Promise<KeyValueDBTuple[]> {
if (!ids.length) return []
// we assume that the order of returned values is the same as order of input ids
const bufs = await this.client.hmgetBuffer(table, ids)
const bufs = await this.cfg.client.hmgetBuffer(table, ids)
return bufs.map((buf, i) => [ids[i], buf] as KeyValueDBTuple).filter(([_k, v]) => v !== null)
}

async deleteByIds(table: string, ids: string[]): Promise<void> {
if (!ids.length) return
await this.client.hdel(table, ids)
await this.cfg.client.hdel(table, ids)
}

async saveBatch(
Expand All @@ -64,14 +59,14 @@ export class RedishHashKeyValueDB implements CommonKeyValueDB, AsyncDisposable {
const record = Object.fromEntries(entries)

if (opt?.expireAt) {
await this.client.hsetWithTTL(table, record, opt.expireAt)
await this.cfg.client.hsetWithTTL(table, record, opt.expireAt)
} else {
await this.client.hset(table, record)
await this.cfg.client.hset(table, record)
}
}

streamIds(table: string, limit?: number): ReadableTyped<string> {
const stream = this.client
const stream = this.cfg.client
.hscanStream(table)
.flatMap(keyValueList => {
const keys: string[] = []
Expand All @@ -87,7 +82,7 @@ export class RedishHashKeyValueDB implements CommonKeyValueDB, AsyncDisposable {
}

streamValues(table: string, limit?: number): ReadableTyped<Buffer> {
return this.client
return this.cfg.client
.hscanStream(table)
.flatMap(keyValueList => {
const values: string[] = []
Expand All @@ -101,7 +96,7 @@ export class RedishHashKeyValueDB implements CommonKeyValueDB, AsyncDisposable {
}

streamEntries(table: string, limit?: number): ReadableTyped<KeyValueDBTuple> {
return this.client
return this.cfg.client
.hscanStream(table)
.flatMap(keyValueList => {
const entries = _chunk(keyValueList, 2) as [string, string][]
Expand All @@ -113,16 +108,16 @@ export class RedishHashKeyValueDB implements CommonKeyValueDB, AsyncDisposable {
}

async count(table: string): Promise<number> {
return await this.client.hscanCount(table)
return await this.cfg.client.hscanCount(table)
}

async incrementBatch(table: string, increments: IncrementTuple[]): Promise<IncrementTuple[]> {
return await this.client.hincrBatch(table, increments)
return await this.cfg.client.hincrBatch(table, increments)
}

async createTable(table: string, opt?: CommonDBCreateOptions): Promise<void> {
if (!opt?.dropIfExists) return

await this.client.del([table])
await this.cfg.client.del([table])
}
}
34 changes: 15 additions & 19 deletions src/redisKeyValueDB.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,34 +15,30 @@ export interface RedisKeyValueDBCfg {
}

export class RedisKeyValueDB implements CommonKeyValueDB, AsyncDisposable {
constructor(cfg: RedisKeyValueDBCfg) {
this.client = cfg.client
}

client: RedisClient
constructor(public cfg: RedisKeyValueDBCfg) {}

support = {
...commonKeyValueDBFullSupport,
}

async ping(): Promise<void> {
await this.client.ping()
await this.cfg.client.ping()
}

async [Symbol.asyncDispose](): Promise<void> {
await this.client.disconnect()
await this.cfg.client.disconnect()
}

async getByIds(table: string, ids: string[]): Promise<KeyValueDBTuple[]> {
if (!ids.length) return []
// we assume that the order of returned values is the same as order of input ids
const bufs = await this.client.mgetBuffer(this.idsToKeys(table, ids))
const bufs = await this.cfg.client.mgetBuffer(this.idsToKeys(table, ids))
return bufs.map((buf, i) => [ids[i], buf] as KeyValueDBTuple).filter(([_k, v]) => v !== null)
}

async deleteByIds(table: string, ids: string[]): Promise<void> {
if (!ids.length) return
await this.client.del(this.idsToKeys(table, ids))
await this.cfg.client.del(this.idsToKeys(table, ids))
}

async saveBatch(
Expand All @@ -55,7 +51,7 @@ export class RedisKeyValueDB implements CommonKeyValueDB, AsyncDisposable {
if (opt?.expireAt) {
// There's no supported mset with TTL: https://stackoverflow.com/questions/16423342/redis-multi-set-with-a-ttl
// so we gonna use a pipeline instead
await this.client.withPipeline(pipeline => {
await this.cfg.client.withPipeline(pipeline => {
for (const [k, v] of entries) {
pipeline.set(this.idToKey(table, k), v, 'EXAT', opt.expireAt!)
}
Expand All @@ -64,12 +60,12 @@ export class RedisKeyValueDB implements CommonKeyValueDB, AsyncDisposable {
const obj: Record<string, Buffer> = Object.fromEntries(
entries.map(([k, v]) => [this.idToKey(table, k), v]) as KeyValueDBTuple[],
)
await this.client.msetBuffer(obj)
await this.cfg.client.msetBuffer(obj)
}
}

streamIds(table: string, limit?: number): ReadableTyped<string> {
let stream = this.client
let stream = this.cfg.client
.scanStream({
match: `${table}:*`,
// count: limit, // count is actually a "batchSize", not a limit
Expand All @@ -84,13 +80,13 @@ export class RedisKeyValueDB implements CommonKeyValueDB, AsyncDisposable {
}

streamValues(table: string, limit?: number): ReadableTyped<Buffer> {
return this.client
return this.cfg.client
.scanStream({
match: `${table}:*`,
})
.flatMap(
async keys => {
return (await this.client.mgetBuffer(keys)).filter(_isTruthy)
return (await this.cfg.client.mgetBuffer(keys)).filter(_isTruthy)
},
{
concurrency: 16,
Expand All @@ -100,14 +96,14 @@ export class RedisKeyValueDB implements CommonKeyValueDB, AsyncDisposable {
}

streamEntries(table: string, limit?: number): ReadableTyped<KeyValueDBTuple> {
return this.client
return this.cfg.client
.scanStream({
match: `${table}:*`,
})
.flatMap(
async keys => {
// casting as Buffer[], because values are expected to exist for given keys
const bufs = (await this.client.mgetBuffer(keys)) as Buffer[]
const bufs = (await this.cfg.client.mgetBuffer(keys)) as Buffer[]
return _zip(this.keysToIds(table, keys), bufs)
},
{
Expand All @@ -119,7 +115,7 @@ export class RedisKeyValueDB implements CommonKeyValueDB, AsyncDisposable {

async count(table: string): Promise<number> {
// todo: implement more efficiently, e.g via LUA?
return await this.client.scanCount({
return await this.cfg.client.scanCount({
match: `${table}:*`,
})
}
Expand All @@ -128,7 +124,7 @@ export class RedisKeyValueDB implements CommonKeyValueDB, AsyncDisposable {
const incrementTuplesWithInternalKeys = increments.map(
([id, v]) => [this.idToKey(table, id), v] as [string, number],
)
const resultsWithInternalKeys = await this.client.incrBatch(incrementTuplesWithInternalKeys)
const resultsWithInternalKeys = await this.cfg.client.incrBatch(incrementTuplesWithInternalKeys)
const results = resultsWithInternalKeys.map(
([k, v]) => [this.keyToId(table, k), v] as IncrementTuple,
)
Expand All @@ -138,7 +134,7 @@ export class RedisKeyValueDB implements CommonKeyValueDB, AsyncDisposable {
async createTable(table: string, opt?: CommonDBCreateOptions): Promise<void> {
if (!opt?.dropIfExists) return

await this.client.dropTable(table)
await this.cfg.client.dropTable(table)
}

private idsToKeys(table: string, ids: string[]): string[] {
Expand Down

0 comments on commit d95257c

Please sign in to comment.