Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

migrate redis client to utils #2

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 57 additions & 0 deletions __test__/unittest/redisClient.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import { redisClient as redisClientType } from '../../src/redis-client';
import RedisMock from 'ioredis-mock';

describe('Test redis client', () => {
const OLD_ENV = process.env;

beforeEach(() => {
jest.resetModules(); // Most important - it clears the cache
jest.clearAllMocks();
jest.mock('ioredis', () => RedisMock);
process.env = { ...OLD_ENV }; // Make a copy
});

afterAll(() => {
process.env = OLD_ENV; // Restore old environment
});
test('Client reports not enabled when REDIS_HOST is not set', () => {
const { redisClient } = require('../../src/redis-client');
expect(redisClient.clientEnabled).toEqual(false);
});

test('will receive process.env variables', () => {
// Set the variables
process.env.REDIS_HOST = 'localhost';
process.env.REDIS_PORT = '3367';
const { redisClient } = require('../../src/redis-client');
expect(redisClient).not.toEqual({});
});

test('Test if record is correctly stored', async () => {
process.env.REDIS_HOST = 'localhost';
process.env.REDIS_PORT = '3367';
const { redisClient } = require('../../src/redis-client');
await redisClient.client.setTimeout('record1', 'hello');
const res = await redisClient.client.getTimeout('record1');
expect(res).toEqual('hello');
});

test('Test if record is correctly stored', async () => {
process.env.REDIS_HOST = 'localhost';
process.env.REDIS_PORT = '3367';
const { redisClient } = require('../../src/redis-client');
await redisClient.client.setTimeout('record1', 'hello');
const res = await redisClient.client.getTimeout('record1');
expect(res).toEqual('hello');
});

test('Test key should be removed after ttl', async () => {
process.env.REDIS_HOST = 'localhost';
process.env.REDIS_PORT = '3367';
const { redisClient } = require('../../src/redis-client');
await redisClient.client.setTimeout('record1', 'hello', 'EX', 2);
await new Promise((r) => setTimeout(r, 3000));
const res = await redisClient.client.getTimeout('record1');
expect(res).not.toBeNull;
});
});
4 changes: 3 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@
"@opentelemetry/api": "^1.7.0",
"@sentry/node": "^7.74.1",
"debug": "^4.3.4",
"lodash": "^4.17.21"
"lodash": "^4.17.21",
"ioredis": "^5.3.2",
"redlock": "5.0.0-beta.2"
}
}
1 change: 1 addition & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@ export * from "./types";
export * from "./log_entry";
export * from "./telemetry";
export * from "./misc";
export * from './redis-client';
309 changes: 309 additions & 0 deletions src/redis-client.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,309 @@
import Redis, { Callback, Cluster, RedisKey, ScanStream } from 'ioredis';
import Debug from 'debug';
const debug = Debug('bte:biothings-explorer-trapi:redis-client');
import Redlock, { RedlockAbortSignal } from 'redlock';

const prefix = `{BTEHashSlotPrefix}:`;

type AsyncFunction = (...args: unknown[]) => Promise<unknown>;
type DropFirst<T extends unknown[]> = T extends [unknown, ...infer U] ? U : never;
type Awaited<T> = T extends PromiseLike<infer U> ? U : T;

function timeoutFunc<F extends AsyncFunction>(func: F, timeoutms = 0) {
return (...args: Parameters<F>): ReturnType<F> => {
return new Promise((resolve, reject) => {
const timeout = timeoutms ? timeoutms : parseInt(process.env.REDIS_TIMEOUT || '60000');
let done = false;
setTimeout(() => {
if (!done) {
reject(new Error(`redis call timed out, args: ${JSON.stringify([...args])}`));
}
}, timeout);
func(...args).then((returnValue: ReturnType<F>) => {
done = true;
resolve(returnValue);
});
}) as ReturnType<F>;
};
}

/**
* Decorate a function such that the first argument is given the module-defined prefix
*/
function addPrefix<F extends AsyncFunction>(func: F) {
return (arg0: Parameters<F>[0], ...args: DropFirst<Parameters<F>>): ReturnType<F> => {
if (arg0 && (arg0 as string).length > 0) {
arg0 = `${prefix}${arg0 as string}`;
}
return func(arg0, ...args) as ReturnType<F>;
};
}

/**
* Decorate a function such that each argument is given the module-defined prefix
*/
function addPrefixToAll<F extends AsyncFunction>(func: F) {
return (...args: Parameters<F>): ReturnType<F> => {
return func(...args.map((arg) => `${prefix}${arg}`)) as ReturnType<F>;
};
}

/**
* Decorate a Redlock function such that the locks are given the module-defined prefix
*/
function lockPrefix<F extends AsyncFunction>(func: F) {
return async (locks: Parameters<F>[0], ...args: DropFirst<Parameters<F>>): Promise<Awaited<ReturnType<F>>> => {
return (await func(
(locks as string[]).map((lockName: string) => `${prefix}${lockName}`),
...args,
)) as Awaited<ReturnType<F>>;
};
}

interface RedisClientInterface {
clearEdgeCache: () => void;
getTimeout: (key: RedisKey) => Promise<string>;
setTimeout: (key: RedisKey, value: string | number | Buffer) => Promise<'OK'>;
hsetTimeout: (...args: [key: RedisKey, ...fieldValues: (string | Buffer | number)[]]) => Promise<number>;
hgetallTimeout: (key: RedisKey) => Promise<Record<string, string>>;
expireTimeout: (key: RedisKey, seconds: string | number) => Promise<number>;
delTimeout: (key: RedisKey | RedisKey[]) => Promise<number>;
usingLock: (
resources: string[],
duration: number,
routine?: (signal: RedlockAbortSignal) => Promise<unknown>,
) => Promise<unknown>;
incrTimeout: (key: string) => Promise<number>;
decrTimeout: (key: string) => Promise<number>;
existsTimeout: (...args: RedisKey[]) => Promise<number>;
pingTimeout: () => Promise<'PONG'>;
}

function addClientFuncs(client: Redis | Cluster, redlock: Redlock): RedisClientInterface {
function decorate<F extends AsyncFunction>(func: F, timeoutms?: number): (...args: Parameters<F>) => ReturnType<F> {
let wrapped = timeoutFunc(func, timeoutms);
if (client instanceof Cluster) {
// Dirty way to cast the function so that typescript doesn't complain
// But given the extremely limited use-case of this function, it's fine for now
wrapped = addPrefix(wrapped) as unknown as (...args: Parameters<F>) => ReturnType<F>;
}

return wrapped;
}
return {
clearEdgeCache: () => null,
getTimeout: decorate((key: RedisKey) => client.get(key)),
setTimeout: decorate((key: RedisKey, value: string | number | Buffer) => client.set(key, value)),
hsetTimeout: decorate((...args: [key: RedisKey, ...fieldValues: (string | Buffer | number)[]]) =>
client.hset(...args),
),
hgetallTimeout: decorate((key: RedisKey) => client.hgetall(key)),

expireTimeout: decorate((key: RedisKey, seconds: string | number) => client.expire(key, seconds)),

delTimeout:
client instanceof Cluster
? addPrefixToAll(timeoutFunc((...args: RedisKey[]) => client.del(...args)))
: timeoutFunc((...args: RedisKey[]) => client.del(...args)),
usingLock: lockPrefix(
(resources: string[], duration: number, routine?: (signal: RedlockAbortSignal) => Promise<unknown>) =>
redlock.using(resources, duration, routine),
),
incrTimeout: decorate((key: string) => client.incr(key)),
decrTimeout: decorate((key: string) => client.decr(key)),
existsTimeout: decorate((...args: RedisKey[]) => client.exists(...args)),
pingTimeout: decorate(() => client.ping(), 10000), // for testing
// hmsetTimeout: decorate((...args) => client.hmset(...args)),
// keysTimeout: decorate((...args) => client.keys(...args)),
};
}

class RedisClient {
client: Record<string, never> | ReturnType<typeof addClientFuncs>;
enableRedis: boolean;
clientEnabled: boolean;
internalClient: Redis | Cluster;
constructor() {
this.client;
this.enableRedis = !(process.env.REDIS_HOST === undefined) && !(process.env.REDIS_PORT === undefined);

if (!this.enableRedis) {
this.client = {};
this.clientEnabled = false;
return;
}

interface RedisClusterDetails {
redisOptions: {
connectTimeout: number;
password?: string;
tls?: {
checkServerIdentity: () => undefined | Error;
};
};
// How long to wait given how many failed tries
clusterRetryStrategy?: (times: number) => number;
}

if (process.env.REDIS_CLUSTER === 'true') {
const details = {
redisOptions: {
connectTimeout: 20000,
},
clusterRetryStrategy(times: number) {
return Math.min(times * 100, 5000);
},
} as RedisClusterDetails;

if (process.env.REDIS_PASSWORD) {
details.redisOptions.password = process.env.REDIS_PASSWORD;
}
if (process.env.REDIS_TLS_ENABLED) {
details.redisOptions.tls = { checkServerIdentity: () => undefined };
}

const cluster = new Redis.Cluster(
[
{
host: process.env.REDIS_HOST,
port: parseInt(process.env.REDIS_PORT),
},
],
details,
);

// allow up to 10 minutes to acquire lock (in case of large items being saved/retrieved)
const redlock = new Redlock([cluster], { retryDelay: 500, retryCount: 1200 });

this.internalClient = cluster;

this.client = addClientFuncs(cluster, redlock);

this.client.clearEdgeCache = () => {
let count = 0;
const nodes = (this.internalClient as Cluster).nodes('master');
let completedNodes = 0;
nodes.forEach((node, i) => {
const stream = node.scanStream({
match: '*bte:edgeCache:*',
count: 50,
});

stream
.on('data', (foundKeys: string[]) => {
if (!foundKeys.length) return;
count += foundKeys.length;
try {
node.del(...foundKeys.map((key) => key.replace(`${prefix} `, ''))).then(
() => null,
(error) => {
debug(`Cache clear: error deleting ${foundKeys.length} keys`);
debug(error);
},
);
} catch (error) {
debug('Cache clearing failure:');
debug(error);
}
})
.on('error', (error) => {
debug(`Cache clearing failure on node ${i}:`);
debug(error);
completedNodes += 1;
if (completedNodes >= nodes.length) {
debug(`Cache clearing completes, cleared ${count} keys.`);
}
})
.on('end', () => {
debug(`Cache clearing completes on cluster node ${i}`);
completedNodes += 1;
if (completedNodes >= nodes.length) {
debug(`Cache clearing completes, cleared ${count} keys.`);
}
});
});
};

debug('Initialized redis client (cluster-mode)');
} else {
interface RedisDetails {
host: string;
port: number;
connectTimeout: number;
retryStrategy: (times: number) => number;
password?: string;
tls?: {
checkServerIdentity: () => undefined | Error;
};
}

const details = {
host: process.env.REDIS_HOST,
port: parseInt(process.env.REDIS_PORT),
connectTimeout: 20000,
retryStrategy(times) {
return Math.min(times * 100, 5000);
},
} as RedisDetails;
if (process.env.REDIS_PASSWORD) {
details.password = process.env.REDIS_PASSWORD;
}
if (process.env.REDIS_TLS_ENABLED) {
details.tls = { checkServerIdentity: () => undefined };
}
const client = new Redis(details);

// allow up to 10 minutes to acquire lock (in case of large items being saved/retrieved)
const redlock = new Redlock([client], { retryDelay: 500, retryCount: 1200 });

this.internalClient = client;

this.client = addClientFuncs(client, redlock);

this.client.clearEdgeCache = () => {
const stream = (redisClient.internalClient as Redis).scanStream({
match: '*bte:edgeCache:*',
count: 50,
});

let count = 0;

stream
.on('data', (foundKeys: string[]) => {
if (!foundKeys.length) return;
count += foundKeys.length;
try {
redisClient.internalClient.del(...foundKeys.map((key) => key.replace(`${prefix} `, ''))).then(
() => null,
(error) => {
debug(`Cache clear: error deleting ${foundKeys.length} keys`);
debug(error);
},
);
} catch (error) {
debug('Cache clearing failure:');
debug(error);
}
})
.on('error', (error) => {
debug('Cache clearing failure:');
debug(error);
})
.on('end', () => {
debug(`Cache clearing completes, cleared ${count} keys.`);
});
};

debug('Initialized redis client (non-cluster-mode)');
}
this.clientEnabled = true;
}
}

const redisClient = new RedisClient();

function getNewRedisClient(): RedisClient {
return new RedisClient();
}

export { redisClient, getNewRedisClient };
Loading