Redis-based realtime collection with compare-and-set support for NodeJS.
yarn add redis-live-collection
A collection consists of key-value pairs, where keys and values are Buffer
s
and have a numeric version. A collection uses a set of Redis keys:
<collectionName>:values
: Hash mapping keys to values.<collectionName>:keys
: Sorted Set of keys with all scores set to0
.<collectionName>:versions
: Sorted Set of keys scored by version.<collectionName>:changes
: Stream of collection changes.<collectionName>:revision
: Stream ID of the last change entry.
By storing keys in a Sorted Set keys
we get the ability to efficiently
retrieve or remove lexicographic ranges of keys. Sorted Set versions
allows to
retrieve or remove ranges of versions.
A version is an arbitrary number
(including Infinity
). It is up to user to
assign a meaning to versions or even use versions at all.
Versions enable compare-and-set operations, where you can atomically compare current version of a key with given number and only update the key if the condition is true.
Stream changes
allows to watch
a collection and receive updates in
realtime.
Call initCommands
on
ioredis
instance to initialize Lua scripts:
import * as IORedis from 'ioredis';
import {initCommands} from 'redis-live-collection';
const redis = new IORedis();
initCommands(redis);
Now you can invoke collection operations using this instance:
import {set, getAll, watch} from 'redis-live-collection';
await set(redis, 'my-collection', 'some-key', Buffer.from('value'));
const {revision, items} = await getAll(redis, 'my-collection');
const observable = watch(redis, 'my-collection', revision);
Since reading from a Stream is blocking, to use watch
you need to
set up a connection pool. Example using
generic-pool
:
import {createPool} from 'generic-pool';
import {defer} from 'rxjs';
import {finalize} from 'rxjs/operators';
const redisPool = createPool(
{
async create() {
const redis = new IORedis({
lazyConnect: true,
});
initCommands(redis);
await redis.connect();
return redis;
},
async destroy(redis) {
await redis.quit();
},
},
{
min: 1,
max: 100,
evictionRunIntervalMillis: 10_000,
acquireTimeoutMillis: 10_000,
},
);
const {revision, items} = await pool.use((redis) =>
getAll(redis, 'my-collection'),
);
const observable = defer(() => pool.acquire()).pipe(
concatMap((redis) =>
watch(redis, 'my-collection', revision).pipe(
finalize(() => {
pool.release(redis);
}),
),
),
);
It is possible to use some operations with Pipelining. For example, to transactionally get multiple fields:
import {transformGetArguments, transformGetReply} from 'redis-live-collection';
const rawResults = await redis
.multi()
.lcGetBuffer(...transformGetArguments('my-collection', 'key-1'))
.lcGetBuffer(...transformGetArguments('my-collection', 'key-2'))
.lcGetBuffer(...transformGetArguments('my-collection', 'key-3'))
.exec();
const results = rawResults.map(([err, res]) => {
if (err) {
throw err;
}
return transformGetReply(res);
});
With Redis Cluster, it is important that all the keys used by collection are placed on the same shard. To achieve this, make sure to have hash tags in collection names:
const collection = '{my-collection}';
await getAll(redis, collection);
Every operation returns a promise that resolves to an object containing
revision
string property, which is the collection revision after the operation
is applied.
Every write operation has maxlen
integer argument that specifies how much
entries in the Stream of changes we want to retain. If you do high rate writes,
the default of 1000
may be too small for watch
to catch up. See
Redis docs.
function get(
redis: IORedis.Redis,
collection: string,
key: string | Buffer,
): Promise<{
revision: string;
value: Buffer | null;
version: number;
}>;
Get a value of a single key. If the key does not exist, returned value
is
null
and version
is 0
.
get
can be used with Pipelining.
function getAll(
redis: IORedis.Redis,
collection: string,
): Promise<{
revision: string;
items: Array<{
key: Buffer;
version: number;
value: Buffer;
}>;
}>;
Get all key-value pairs.
function getKeyRange(
redis: IORedis.Redis,
collection: string,
min: string | Buffer,
max: string | Buffer,
): Promise<{
revision: string;
items: Array<{
key: Buffer;
version: number;
value: Buffer;
}>;
}>;
Get all key-value pairs with keys lexicographically sorted between min
and
max
. See
Redis docs on
how to specify intervals. Use makePrefixRange
to get all
keys starting with a prefix.
getKeyRange
can be used with Pipelining.
Example:
// get keys from `a` (inclusive) to `z` (exclusive)
await getKeyRange(redis, 'my-collection', '[a', '(z');
// get all keys from `z` (inclusive) onwards
await getKeyRange(redis, 'my-collection', '[z', '+');
function getVersionRange(
redis: IORedis.Redis,
collection: string,
min: number | string,
max: number | string,
): Promise<{
revision: string;
items: Array<{
key: Buffer;
version: number;
value: Buffer;
}>;
}>;
Get all key-value pairs with version between min
and max
(inclusive).
Infinity
is allowed. See
Redis docs
on how to specify exclusive intervals.
getVersionRange
can be used with Pipelining.
function watch(
redis: IORedis.Redis,
collection: string,
lastRevision: string,
blockMs: number = 2500,
): Observable<ChangeEvent[]>;
Return RxJS Observable that emits changes to a collection happening after
lastRevision
. Changes are emitted as arrays of events of the following shape:
type ChangeEvent = SetEvent | RemoveEvent;
type SetEvent = {
type: 'set';
revision: string;
key: Buffer;
version: number;
value: Buffer;
};
type RemoveEvent = {
type: 'remove';
revision: string;
key: Buffer;
};
Example:
// observable that emits the whole collection as a Map on each change
const collectionObservable = defer(() => getAll(redis, 'my-collection')).pipe(
concatMap(({items, revision}) => {
const initialState = new Map();
for (const {key, value, version} of items) {
initialState.set(key.toString(), {value, version});
}
return concat(
of(initialState),
watch(redis, 'my-collection', revision).pipe(
scan((state, changes) => {
const nextState = new Map(state);
for (const event of changes) {
if (event.type === 'set') {
const {value, version} = event;
nextState.set(event.key.toString(), {value, version});
} else if (event.type === 'remove') {
nextState.delete(event.key.toString());
}
}
return nextState;
}, initialState),
),
);
}),
);
function set(
redis: IORedis.Redis,
collection: string,
key: string | Buffer,
value: Buffer,
version: number = Infinity,
maxlen: number = 1000,
): Promise<{
revision: string;
}>;
Update the key-value pair, overwriting previous value, if any. See API
for details on maxlen
argument.
set
can be used with Pipelining.
function compareAndSet(
redis: IORedis.Redis,
collection: string,
key: string | Buffer,
compareOperator: '<' | '<=' | '==' | '!=' | '>=' | '>',
compareVersion: number,
value: Buffer,
version: number = Infinity,
maxlen: number = 1000,
): Promise<{
revision: string;
success: boolean;
}>;
Update the key-value pair only if a comparison condition holds on the previous
version of the pair. If a previous value does not exist, its version is
considered to be 0
. See API for details on maxlen
argument.
compareAndSet
can be used with Pipelining.
Example:
// only update if previous version is older
const newVersion = Date.now();
await compareAndSet(
redis,
'my-collection',
'some-key',
'<',
newVersion,
Buffer.from('payload'),
newVersion,
);
// only update if the key does not exist
await compareAndSet(
redis,
'my-collection',
'some-key',
'==',
0,
Buffer.from('payload'),
);
function remove(
redis: IORedis.Redis,
collection: string,
key: string | Buffer,
maxlen: number = 1000,
): Promise<{
revision: string;
}>;
Delete the key-value pair, if it exists. See API for details on maxlen
argument.
remove
can be used with Pipelining.
function compareAndRemove(
redis: IORedis.Redis,
collection: string,
key: string | Buffer,
compareOperator: '<' | '<=' | '==' | '!=' | '>=' | '>',
compareVersion: number,
maxlen: number = 1000,
): Promise<{
revision: string;
success: boolean;
}>;
Delete the key-value pair only if a comparison condition holds on the previous
version of the pair. If a previous value does not exist, its version is
considered to be 0
. See API for details on maxlen
argument.
compareAndRemove
can be used with Pipelining.
function removeKeyRange(
redis: IORedis.Redis,
collection: string,
min: string | Buffer,
max: string | Buffer,
maxlen: number = 1000,
): Promise<{
revision: string;
removedCount: number;
}>;
Delete all key-value pairs with keys lexicographically sorted between min
and
max
. See
Redis docs on
how to specify intervals. Use makePrefixRange
to delete
all keys starting with a prefix. See API for details on maxlen
argument.
removeKeyRange
can be used with Pipelining.
function removeVersionRange(
redis: IORedis.Redis,
collection: string,
min: number | string,
max: number | string,
maxlen: number = 1000,
): Promise<{
revision: string;
removedCount: number;
}>;
Delete all key-value pairs with version between min
and max
(inclusive).
Infinity
is allowed. See
Redis docs
on how to specify exclusive intervals. See API for details on maxlen
argument.
removeVersionRange
can be used with Pipelining.
function makePrefixRange(prefix: string | Buffer): [Buffer, Buffer];
Make a [min, max]
range of all keys starting with a prefix. Useful with
getKeyRange
and removeKeyRange
operations:
const [min, max] = makePrefixRange('prefix:');
await getKeyRange(redis, 'my-collection', min, max);
await removeKeyRange(redis, 'my-collection', min, max);
function initCommands(redis: IORedis.Redis): void;
Enable collection operations on ioredis
instance by defining Lua commands.
Start Redis:
docker-compose up -d
Run tests:
yarn test