Skip to content

Commit

Permalink
Merge pull request #490 from OpenFn/release/next
Browse files Browse the repository at this point in the history
Release: worker 0.2.7
  • Loading branch information
josephjclark authored Nov 16, 2023
2 parents d52143b + 12bde0d commit a8d16f6
Show file tree
Hide file tree
Showing 33 changed files with 437 additions and 585 deletions.
11 changes: 11 additions & 0 deletions integration-tests/worker/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,16 @@
# @openfn/integration-tests-worker

## 1.0.17

### Patch Changes

- Updated dependencies [d542aa9]
- Updated dependencies [793d523]
- Updated dependencies [f17fb4a]
- @openfn/ws-worker@0.2.7
- @openfn/engine-multi@0.1.11
- @openfn/lightning-mock@1.0.12

## 1.0.16

### Patch Changes
Expand Down
2 changes: 1 addition & 1 deletion integration-tests/worker/package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "@openfn/integration-tests-worker",
"private": true,
"version": "1.0.16",
"version": "1.0.17",
"description": "Lightning WOrker integration tests",
"author": "Open Function Group <[email protected]>",
"license": "ISC",
Expand Down
23 changes: 23 additions & 0 deletions integration-tests/worker/test/exit-reasons.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,3 +53,26 @@ test('crash: syntax error', async (t) => {
t.is(error_type, 'CompileError');
t.regex(error_message, /Unexpected token \(1:9\)$/);
});

test('exception: autoinstall error', async (t) => {
const attempt = {
id: crypto.randomUUID(),
jobs: [
{
adaptor: '@openfn/[email protected]',
body: 'fn((s) => s)',
},
],
};

const result = await run(attempt);

const { reason, error_type, error_message } = result;

t.is(reason, 'exception');
t.is(error_type, 'AutoinstallError');
t.regex(
error_message,
/Error installing @openfn\/[email protected]/
);
});
7 changes: 7 additions & 0 deletions packages/cli/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
# @openfn/cli

## 0.4.8

### Patch Changes

- Updated dependencies [857c42b]
- @openfn/runtime@0.1.4

## 0.4.7

### Patch Changes
Expand Down
2 changes: 1 addition & 1 deletion packages/cli/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@openfn/cli",
"version": "0.4.7",
"version": "0.4.8",
"description": "CLI devtools for the openfn toolchain.",
"engines": {
"node": ">=18",
Expand Down
9 changes: 9 additions & 0 deletions packages/engine-multi/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,14 @@
# engine-multi

## 0.1.11

### Patch Changes

- 793d523: Updated purge strategy
- f17fb4a: Better error handling in autoinstall
- Updated dependencies [857c42b]
- @openfn/runtime@0.1.4

## 0.1.10

### Patch Changes
Expand Down
2 changes: 1 addition & 1 deletion packages/engine-multi/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@openfn/engine-multi",
"version": "0.1.10",
"version": "0.1.11",
"description": "Multi-process runtime engine",
"main": "dist/index.js",
"type": "module",
Expand Down
17 changes: 14 additions & 3 deletions packages/engine-multi/src/api/autoinstall.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import { install as runtimeInstall } from '@openfn/runtime';
import type { Logger } from '@openfn/logger';
import type { ExecutionContext } from '../types';
import { AUTOINSTALL_COMPLETE, AUTOINSTALL_ERROR } from '../events';
import { AutoinstallError } from '../errors';

// none of these options should be on the plan actually
export type AutoinstallOptions = {
Expand Down Expand Up @@ -54,6 +55,7 @@ const autoinstall = async (context: ExecutionContext): Promise<ModulePaths> => {
// TODO would rather do all this in parallel but this is fine for now
// TODO set iteration is weirdly difficult?
const paths: ModulePaths = {};

for (const a of adaptors) {
// Ensure that this is not blacklisted
// TODO what if it is? For now we'll log and skip it
Expand All @@ -71,8 +73,6 @@ const autoinstall = async (context: ExecutionContext): Promise<ModulePaths> => {
const needsInstalling = !(await isInstalledFn(a, repoDir, logger));
if (needsInstalling) {
if (!pending[a]) {
// TODO because autoinstall can take a while, we should emit that we're starting
// add a promise to the pending array
const startTime = Date.now();
pending[a] = installFn(a, repoDir, logger)
.then(() => {
Expand All @@ -86,15 +86,26 @@ const autoinstall = async (context: ExecutionContext): Promise<ModulePaths> => {
});
delete pending[a];
})
.catch((e) => {
.catch((e: any) => {
delete pending[a];

logger.error(`ERROR autoinstalling ${a}: ${e.message}`);
logger.error(e);
const duration = Date.now() - startTime;
context.emit(AUTOINSTALL_ERROR, {
module: name,
version: version!,
duration,
message: e.message || e.toString(),
});

// wrap and re-throw the error
throw new AutoinstallError(a, e);
});
} else {
logger.info(
`autoinstall waiting for previous promise for ${a} to resolve...`
);
}
// Return the pending promise (safe to do this multiple times)
// TODO if this is a chained promise, emit something like "using cache for ${name}"
Expand Down
26 changes: 12 additions & 14 deletions packages/engine-multi/src/api/call-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,15 @@ type WorkerOptions = {

// Adds a `callWorker` function to the API object, which will execute a task in a worker
export default function initWorkers(
api: EngineAPI,
engine: EngineAPI,
workerPath: string,
options: WorkerOptions = {},
logger?: Logger
) {
// TODO can we verify the worker path and throw if it's invalid?
// workerpool won't complain if we give it a nonsense path
const workers = createWorkers(workerPath, options);
api.callWorker = (
engine.callWorker = (
task: string,
args: any[] = [],
events: any = {},
Expand All @@ -48,22 +48,20 @@ export default function initWorkers(
promise.timeout(timeout);
}

if (options.purge) {
promise.then(() => {
const { pendingTasks } = workers.stats();
if (pendingTasks == 0) {
logger?.debug('Purging workers');
api.emit(PURGE);
workers.terminate();
}
});
}

return promise;
};

engine.purge = () => {
const { pendingTasks } = workers.stats();
if (pendingTasks == 0) {
logger?.debug('Purging workers');
engine.emit(PURGE);
workers.terminate();
}
};

// This will force termination instantly
api.closeWorkers = () => {
engine.closeWorkers = () => {
workers.terminate(true);

// Defer the return to allow workerpool to close down
Expand Down
10 changes: 5 additions & 5 deletions packages/engine-multi/src/api/execute.ts
Original file line number Diff line number Diff line change
Expand Up @@ -99,11 +99,11 @@ const execute = async (context: ExecutionContext) => {
error(context, { workflowId: state.plan.id, error: e });
logger.error(e);
});
} catch (e) {
// generic error wrapper
// this will catch anything you!
const wrappedError = new ExecutionError(e);
error(context, { workflowId: state.plan.id, error: wrappedError });
} catch (e: any) {
if (!e.severity) {
e = new ExecutionError(e);
}
error(context, { workflowId: state.plan.id, error: e });
}
};

Expand Down
8 changes: 7 additions & 1 deletion packages/engine-multi/src/api/lifecycle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -135,12 +135,18 @@ export const log = (
});
};

export const error = (context: ExecutionContext, event: any) => {
export const error = (
context: ExecutionContext,
event: internalEvents.ErrorEvent
) => {
const { threadId = '-', error } = event;

context.emit(externalEvents.WORKFLOW_ERROR, {
threadId,
// @ts-ignore
type: error.type || error.name || 'ERROR',
message: error.message || error.toString(),
// default to exception because if we don't know, it's our fault
severity: error.severity || 'exception',
});
};
15 changes: 6 additions & 9 deletions packages/engine-multi/src/engine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -98,11 +98,6 @@ const createEngine = async (options: EngineOptions, workerPath?: string) => {
const contexts: Record<string, ExecutionContext> = {};
const deferredListeners: Record<string, Record<string, EventHandler>[]> = {};

// TODO I think this is for later
//const activeWorkflows: string[] = [];

// TOOD I wonder if the engine should a) always accept a worker path
// and b) validate it before it runs
let resolvedWorkerPath;
if (workerPath) {
// If a path to the worker has been passed in, just use it verbatim
Expand Down Expand Up @@ -183,14 +178,17 @@ const createEngine = async (options: EngineOptions, workerPath?: string) => {
delete deferredListeners[workflowId];
}

// execute(context);

// Run the execute on a timeout so that consumers have a chance
// to register listeners
setTimeout(() => {
// TODO typing between the class and interface isn't right
// @ts-ignore
execute(context);
execute(context).finally(() => {
delete contexts[workflowId];
if (options.purge && Object.keys(contexts).length === 0) {
engine.purge?.();
}
});
}, 1);

// hmm. Am I happy to pass the internal workflow state OUT of the handler?
Expand All @@ -202,7 +200,6 @@ const createEngine = async (options: EngineOptions, workerPath?: string) => {
context.once(evt, fn),
off: (evt: string, fn: (...args: any[]) => void) => context.off(evt, fn),
};
// return context;
};

const listen = (
Expand Down
14 changes: 13 additions & 1 deletion packages/engine-multi/src/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,5 +57,17 @@ export class CompileError extends EngineError {
}
}

// Autoinstall Error (exception)
export class AutoinstallError extends EngineError {
severity = 'exception'; // Syntax errors are crashes, but what if we get a module resolution thing?
type = 'AutoinstallError';
name = 'AutoinstallError';
message;

constructor(specifier: string, error: any) {
super();

this.message = `Error installing ${specifier}: ${error.message}`;
}
}

// CredentialsError (exception)
1 change: 1 addition & 0 deletions packages/engine-multi/src/events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ export interface WorkflowCompletePayload extends ExternalEvent {
export interface WorkflowErrorPayload extends ExternalEvent {
type: string;
message: string;
severity: string;
}

export interface JobStartPayload extends ExternalEvent {
Expand Down
1 change: 1 addition & 0 deletions packages/engine-multi/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ export interface ExecutionContext extends EventEmitter {
export interface EngineAPI extends EventEmitter {
callWorker: CallWorker;
closeWorkers: () => void;
purge?: () => void;
}

export interface RuntimeEngine extends EventEmitter {
Expand Down
5 changes: 3 additions & 2 deletions packages/engine-multi/src/worker/events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,9 @@ export interface LogEvent extends InternalEvent {
message: JSONLog;
}

export interface ErrorEvent extends InternalEvent {
jobId?: string;
export interface ErrorEvent {
threadId?: string;
workflowId?: string;
error: {
message: string;
type: string;
Expand Down
Loading

0 comments on commit a8d16f6

Please sign in to comment.