diff --git a/README.md b/README.md index 9f1a297..957190d 100644 --- a/README.md +++ b/README.md @@ -43,8 +43,9 @@ const myWorker = require('./myWorker'); const client = redis.createClient(REDIS_PORT, REDIS_HOST, {}); const brokkrRedisClient = buildRedisClient(client); -const brokkr = new Brokkr(brokkrRedisClient); -brokkr.registerWorker(myWorker); +const brokkr = new Brokkr(brokkrRedisClient, "myBrokkrNamespace"); // namespace is to prevent collisions with other stuff you store in Redis +brokkr.registerWorker(myWorker); // We register myWorker at brokkr. +await brokkr.restorePreviousState(); // In case we had pending tasks from a previous time // Here is a good place to add brokkr to your server's ctx ctx.brokkr = brokkr; // Or whichever syntax you are using @@ -119,8 +120,10 @@ const {hammerHeadFactory, handleFactory, hammerFactory} = require('./workers.js' const client = redis.createClient(REDIS_PORT, REDIS_HOST, {}); const brokkrRedisClient = buildRedisClient(client); -const brokkr = new Brokkr(brokkrRedisClient); -brokkr.registerWorkers(hammerHeadFactory, handleFactory, hammerFactory); +const brokkr = new Brokkr(brokkrRedisClient, "myBrokkrNamespace"); +brokkr.registerWorkers(hammerHeadFactory, handleFactory, hammerFactory); // We can register multiple workers at once +await brokkr.restorePreviousState(); // In case we had pending tasks from a previous time + ctx.brokkr = brokkr; ``` diff --git a/lib/src/brokkr.ts b/lib/src/brokkr.ts index 998e28b..e21d5d8 100644 --- a/lib/src/brokkr.ts +++ b/lib/src/brokkr.ts @@ -1,14 +1,25 @@ import IClient from './clients/iclient'; -import Saga from './entities/saga'; +import Saga, {ISaga, SagaStatus, TABLE_NAME as SAGA_TABLE_NAME} from './entities/saga'; +import {getIds, getMultiple} from './helpers/db'; import { IWorker } from './interfaces'; import QueueManager, {IQueueManagerOpts} from './queue-manager'; +// DEVNOTE: Leaving this for now in case we add options in the future +// tslint:disable-next-line: no-empty-interface +interface IBrokkrOpts { +} + class Brokkr { private client: IClient; private namespace: string; private queueManager: QueueManager; - constructor(client: IClient, namespace: string = '', queueOpts: IQueueManagerOpts = {}) { + constructor( + client: IClient, + namespace: string = '', + brokkrOpts: IBrokkrOpts = {}, + queueOpts: IQueueManagerOpts = {} + ) { this.client = client; this.namespace = namespace; this.queueManager = new QueueManager(this.client, this.namespace, queueOpts); @@ -56,8 +67,22 @@ class Brokkr { this.queueManager.start(); } - public async restart(): Promise { - throw Error('TODO: Implement this in case the service running this lib has to be restarted mid-process'); + /** + * Looks for any Saga that exists in the storage and initiates a worker for it if it's not finished + */ + public async restorePreviousState() { + const sagaIds = await getIds(this.client, this.namespace, SAGA_TABLE_NAME); + const sagas = await getMultiple(this.client, this.namespace, SAGA_TABLE_NAME, sagaIds); + + const unfinishedSagas = sagas.filter( + saga => saga.status !== SagaStatus.Finished && saga.status !== SagaStatus.Failed + ); + + unfinishedSagas.forEach(sagaValues => { + let saga = new Saga(this.client, this.namespace); + saga = saga.instantiate(sagaValues); + this.queueManager.addSaga(saga); + }); } /** diff --git a/lib/src/index.ts b/lib/src/index.ts index b0dda32..8f5d580 100644 --- a/lib/src/index.ts +++ b/lib/src/index.ts @@ -1,3 +1,4 @@ export { default as Brokkr } from './brokkr'; export * from './entities'; export * from './clients'; +export * from './interfaces'; \ No newline at end of file diff --git a/lib/tests/integration/brokkr.test.ts b/lib/tests/integration/brokkr.test.ts index 3809be9..32cb1a2 100644 --- a/lib/tests/integration/brokkr.test.ts +++ b/lib/tests/integration/brokkr.test.ts @@ -1,7 +1,7 @@ import {RedisClient} from 'redis'; -import { Brokkr, buildRedisClient, IClient, Saga, SagaStatus, SagaStep, SagaStepStatus } from '../../src'; -import redisClientBuilder from '../helpers/redis-client-builder'; +import { Brokkr, buildRedisClient, IClient, Saga, SagaStep } from '../../src'; import { IWorker } from '../../src/interfaces'; +import redisClientBuilder from '../helpers/redis-client-builder'; describe('Brokkr integration tests', () => { let brokkr: Brokkr; @@ -18,7 +18,7 @@ describe('Brokkr integration tests', () => { // Reset db after each test redisClient.flushdb(() => { client = buildRedisClient(redisClient); - brokkr = new Brokkr(client, namespace, {pollingIntervalInMs: 100}); + brokkr = new Brokkr(client, namespace, {}, {pollingIntervalInMs: 100}); done(); }); }); @@ -81,6 +81,27 @@ describe('Brokkr integration tests', () => { expect(brokkr.getWorker("example")).toEqual(worker2); expect(brokkr.getWorker("example2")).toEqual(worker3); + }); + + describe('when brokkr is started with an already existing previous state', () => { + let brokkr2: Brokkr; + let sagaId: string; + beforeEach(() => { + sagaId = saga.getId() || ''; + if(sagaId === '') { + throw Error('saga must be initialized.'); + } + }) + + it('restores the previous sagas', async (done) => { + brokkr2 = new Brokkr(client, namespace); + expect(brokkr2.getSaga(sagaId)).not.toBeDefined(); + await brokkr2.restorePreviousState(); + expect(brokkr2.getSaga(sagaId)).toBeDefined(); + done(); + }); + + }) diff --git a/lib/tests/integration/queue-manager.test.ts b/lib/tests/integration/queue-manager.test.ts index 68d2a6b..9ff4930 100644 --- a/lib/tests/integration/queue-manager.test.ts +++ b/lib/tests/integration/queue-manager.test.ts @@ -18,7 +18,7 @@ describe('Worker integration tests', () => { // Reset db after each test redisClient.flushdb(() => { client = buildRedisClient(redisClient); - brokkr = new Brokkr(client, namespace, {pollingIntervalInMs: 100}); + brokkr = new Brokkr(client, namespace, {}, {pollingIntervalInMs: 100}); done(); }); });