Skip to content

Commit

Permalink
worker: fixed a tricky issue with server shutdown
Browse files Browse the repository at this point in the history
If a server is destroyed before the lightning connection returned, the workloop will still fire even if the server is technically destroyed
  • Loading branch information
josephjclark committed Feb 15, 2024
1 parent 21119c5 commit 322813c
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 18 deletions.
12 changes: 5 additions & 7 deletions integration-tests/worker/src/init.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import crypto from 'node:crypto';
import createLightningServer, { toBase64 } from '@openfn/lightning-mock';
import createEngine from '@openfn/engine-multi';
import createWorkerServer from '@openfn/ws-worker';
import createLogger, { createMockLogger } from '@openfn/logger';
import { createMockLogger } from '@openfn/logger';

export const randomPort = () => Math.round(2000 + Math.random() * 1000);

Expand All @@ -26,9 +26,9 @@ export const initWorker = async (
) => {
const workerPort = randomPort();

const engineLogger = createLogger('engine', {
const engineLogger = createMockLogger('engine', {
level: 'debug',
// json: true,
json: true,
});

const engine = await createEngine({
Expand All @@ -38,15 +38,13 @@ export const initWorker = async (
});

const worker = createWorkerServer(engine, {
// logger: createMockLogger(),
logger: createLogger('worker', { level: 'debug' }),
logger: createMockLogger(),
// logger: createLogger('worker', { level: 'debug' }),
port: workerPort,
lightning: `ws://localhost:${lightningPort}/worker`,
secret: crypto.randomUUID(),
...workerArgs,
});

console.log(' ***** ', worker.id);

return { engine, engineLogger, worker };
};
10 changes: 2 additions & 8 deletions integration-tests/worker/test/integration.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -506,15 +506,11 @@ test('an OOM error should still call step-complete', (t) => {
// });
// });

// TODO this test is a bit different now
// I think it's worth keeping
test.only('stateful adaptor should create a new client for each attempt', (t) => {
test('stateful adaptor should create a new client for each attempt', (t) => {
return new Promise(async (done) => {
// We want to create our own special worker here
await worker.destroy();
// ({ worker } = await createDummyWorker());

console.log(' >> ', worker.id);
({ worker, engineLogger } = await createDummyWorker());

const attempt1 = {
id: crypto.randomUUID(),
Expand All @@ -536,10 +532,8 @@ test.only('stateful adaptor should create a new client for each attempt', (t) =>
let results = {};

lightning.on('run:complete', (evt) => {
console.log(evt.payload);
const id = evt.runId;
results[id] = lightning.getResult(id);
console.log(results[id]);

if (id === attempt2.id) {
const one = results[attempt1.id];
Expand Down
7 changes: 4 additions & 3 deletions packages/ws-worker/src/api/destroy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ const destroy = async (app: ServerApp, logger: Logger) => {
await app.engine.destroy();
app.socket?.disconnect();

logger.info('Server closed....');

resolve();
}),
]);
Expand All @@ -41,9 +43,7 @@ const waitForRuns = (app: ServerApp, logger: Logger) =>
new Promise<void>((resolve) => {
const log = () => {
logger.debug(
`Waiting for ${
Object.keys(app.workflows).length
} runs to complete...`
`Waiting for ${Object.keys(app.workflows).length} runs to complete...`
);
};

Expand All @@ -61,6 +61,7 @@ const waitForRuns = (app: ServerApp, logger: Logger) =>
log();
app.events.on(INTERNAL_RUN_COMPLETE, onRunComplete);
} else {
logger.debug('No active rns detected');
resolve();
}
});
Expand Down
10 changes: 10 additions & 0 deletions packages/ws-worker/src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,12 @@ function connect(app: ServerApp, logger: Logger, options: ServerOptions = {}) {

// A new connection made to the queue
const onConnect = ({ socket, channel }: SocketAndChannel) => {
if (app.destroyed) {
// Fix an edge case where a server can be destroyed before it is
// even connnected
// If this has happened, we do NOT want to go and start the workloop!
return;
}
logger.success('Connected to Lightning at', options.lightning);

// save the channel and socket
Expand Down Expand Up @@ -111,6 +117,10 @@ function connect(app: ServerApp, logger: Logger, options: ServerOptions = {}) {

// We failed to connect to the queue
const onError = (e: any) => {
if (app.destroyed) {
return;
}

logger.error(
'CRITICAL ERROR: could not connect to lightning at',
options.lightning
Expand Down

0 comments on commit 322813c

Please sign in to comment.