Skip to content

Commit

Permalink
feat: added restorePreviousState logic
Browse files Browse the repository at this point in the history
  • Loading branch information
dmerrill6 committed May 29, 2019
1 parent ffaa0e2 commit fc7d541
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 12 deletions.
11 changes: 7 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,9 @@ const myWorker = require('./myWorker');
const client = redis.createClient(REDIS_PORT, REDIS_HOST, {});
const brokkrRedisClient = buildRedisClient(client);

const brokkr = new Brokkr(brokkrRedisClient);
brokkr.registerWorker(myWorker);
const brokkr = new Brokkr(brokkrRedisClient, "myBrokkrNamespace"); // namespace is to prevent collisions with other stuff you store in Redis
brokkr.registerWorker(myWorker); // We register myWorker at brokkr.
await brokkr.restorePreviousState(); // In case we had pending tasks from a previous time

// Here is a good place to add brokkr to your server's ctx
ctx.brokkr = brokkr; // Or whichever syntax you are using
Expand Down Expand Up @@ -119,8 +120,10 @@ const {hammerHeadFactory, handleFactory, hammerFactory} = require('./workers.js'
const client = redis.createClient(REDIS_PORT, REDIS_HOST, {});
const brokkrRedisClient = buildRedisClient(client);

const brokkr = new Brokkr(brokkrRedisClient);
brokkr.registerWorkers(hammerHeadFactory, handleFactory, hammerFactory);
const brokkr = new Brokkr(brokkrRedisClient, "myBrokkrNamespace");
brokkr.registerWorkers(hammerHeadFactory, handleFactory, hammerFactory); // We can register multiple workers at once
await brokkr.restorePreviousState(); // In case we had pending tasks from a previous time


ctx.brokkr = brokkr;
```
Expand Down
33 changes: 29 additions & 4 deletions lib/src/brokkr.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,25 @@
import IClient from './clients/iclient';
import Saga from './entities/saga';
import Saga, {ISaga, SagaStatus, TABLE_NAME as SAGA_TABLE_NAME} from './entities/saga';
import {getIds, getMultiple} from './helpers/db';
import { IWorker } from './interfaces';
import QueueManager, {IQueueManagerOpts} from './queue-manager';

// DEVNOTE: Leaving this for now in case we add options in the future
// tslint:disable-next-line: no-empty-interface
interface IBrokkrOpts {
}

class Brokkr {
private client: IClient;
private namespace: string;
private queueManager: QueueManager;

constructor(client: IClient, namespace: string = '', queueOpts: IQueueManagerOpts = {}) {
constructor(
client: IClient,
namespace: string = '',
brokkrOpts: IBrokkrOpts = {},
queueOpts: IQueueManagerOpts = {}
) {
this.client = client;
this.namespace = namespace;
this.queueManager = new QueueManager(this.client, this.namespace, queueOpts);
Expand Down Expand Up @@ -56,8 +67,22 @@ class Brokkr {
this.queueManager.start();
}

public async restart(): Promise<Saga> {
throw Error('TODO: Implement this in case the service running this lib has to be restarted mid-process');
/**
* Looks for any Saga that exists in the storage and initiates a worker for it if it's not finished
*/
public async restorePreviousState() {
const sagaIds = await getIds(this.client, this.namespace, SAGA_TABLE_NAME);
const sagas = await getMultiple<ISaga>(this.client, this.namespace, SAGA_TABLE_NAME, sagaIds);

const unfinishedSagas = sagas.filter(
saga => saga.status !== SagaStatus.Finished && saga.status !== SagaStatus.Failed
);

unfinishedSagas.forEach(sagaValues => {
let saga = new Saga(this.client, this.namespace);
saga = saga.instantiate(sagaValues);
this.queueManager.addSaga(saga);
});
}

/**
Expand Down
1 change: 1 addition & 0 deletions lib/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
export { default as Brokkr } from './brokkr';
export * from './entities';
export * from './clients';
export * from './interfaces';
27 changes: 24 additions & 3 deletions lib/tests/integration/brokkr.test.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import {RedisClient} from 'redis';
import { Brokkr, buildRedisClient, IClient, Saga, SagaStatus, SagaStep, SagaStepStatus } from '../../src';
import redisClientBuilder from '../helpers/redis-client-builder';
import { Brokkr, buildRedisClient, IClient, Saga, SagaStep } from '../../src';
import { IWorker } from '../../src/interfaces';
import redisClientBuilder from '../helpers/redis-client-builder';

describe('Brokkr integration tests', () => {
let brokkr: Brokkr;
Expand All @@ -18,7 +18,7 @@ describe('Brokkr integration tests', () => {
// Reset db after each test
redisClient.flushdb(() => {
client = buildRedisClient(redisClient);
brokkr = new Brokkr(client, namespace, {pollingIntervalInMs: 100});
brokkr = new Brokkr(client, namespace, {}, {pollingIntervalInMs: 100});
done();
});
});
Expand Down Expand Up @@ -81,6 +81,27 @@ describe('Brokkr integration tests', () => {

expect(brokkr.getWorker("example")).toEqual(worker2);
expect(brokkr.getWorker("example2")).toEqual(worker3);
});

describe('when brokkr is started with an already existing previous state', () => {
let brokkr2: Brokkr;
let sagaId: string;
beforeEach(() => {
sagaId = saga.getId() || '';
if(sagaId === '') {
throw Error('saga must be initialized.');
}
})

it('restores the previous sagas', async (done) => {
brokkr2 = new Brokkr(client, namespace);
expect(brokkr2.getSaga(sagaId)).not.toBeDefined();
await brokkr2.restorePreviousState();
expect(brokkr2.getSaga(sagaId)).toBeDefined();
done();
});



})

Expand Down
2 changes: 1 addition & 1 deletion lib/tests/integration/queue-manager.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ describe('Worker integration tests', () => {
// Reset db after each test
redisClient.flushdb(() => {
client = buildRedisClient(redisClient);
brokkr = new Brokkr(client, namespace, {pollingIntervalInMs: 100});
brokkr = new Brokkr(client, namespace, {}, {pollingIntervalInMs: 100});
done();
});
});
Expand Down

0 comments on commit fc7d541

Please sign in to comment.