Skip to content

Commit

Permalink
Worker: Update version listings (#611)
Browse files Browse the repository at this point in the history
* engine: publish versions with workflow-start and support multiple adaptors

* worker: update version handling

We only include versions on workflow start and allow multiple adaptor versions to be printed
  • Loading branch information
josephjclark authored Feb 23, 2024
1 parent 4f5f1dd commit 58e0d11
Show file tree
Hide file tree
Showing 21 changed files with 254 additions and 227 deletions.
5 changes: 5 additions & 0 deletions .changeset/fair-bobcats-applaud.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@openfn/engine-multi': patch
---

Record adaptor versions as an array
5 changes: 5 additions & 0 deletions .changeset/yellow-clouds-thank.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@openfn/ws-worker': patch
---

Move version log to workflow start
12 changes: 7 additions & 5 deletions packages/engine-multi/src/api/autoinstall.ts
Original file line number Diff line number Diff line change
Expand Up @@ -135,11 +135,13 @@ const autoinstall = async (context: ExecutionContext): Promise<ModulePaths> => {

const v = version || 'unknown';

// Write the adaptor version to the context
// This is a reasonably accurate, but not totally bulletproof, report
// @ts-ignore
// TODO need to remove this soon as it's basically lying
context.versions[name] = v;
// Write the adaptor version to the context for reporting later
if (!context.versions[name]) {
context.versions[name] = [];
}
if (!context.versions[name].includes(v)) {
(context.versions[name] as string[]).push(v);
}

paths[a] = {
path: `${repoDir}/node_modules/${alias}`,
Expand Down
2 changes: 1 addition & 1 deletion packages/engine-multi/src/api/lifecycle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ export const workflowStart = (
// forward the event on to any external listeners
context.emit(externalEvents.WORKFLOW_START, {
threadId,
versions: context.versions,
});
};

Expand Down Expand Up @@ -81,7 +82,6 @@ export const jobStart = (
context.emit(externalEvents.JOB_START, {
jobId,
threadId,
versions: context.versions,
});
};

Expand Down
2 changes: 1 addition & 1 deletion packages/engine-multi/src/classes/ExecutionContext.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import type {
import type { ExternalEvents, EventMap } from '../events';

/**
* The ExeuctionContext class wraps an event emitter with some useful context
* The ExecutionContext class wraps an event emitter with some useful context
* and automatically appends the workflow id to each emitted events
*
* Each running workflow has its own context object
Expand Down
5 changes: 3 additions & 2 deletions packages/engine-multi/src/events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,9 @@ interface ExternalEvent {
workflowId: string;
}

export interface WorkflowStartPayload extends ExternalEvent {}
export interface WorkflowStartPayload extends ExternalEvent {
versions: Versions;
}

export interface WorkflowCompletePayload extends ExternalEvent {
state: any;
Expand All @@ -64,7 +66,6 @@ export interface WorkflowErrorPayload extends ExternalEvent {

export interface JobStartPayload extends ExternalEvent {
jobId: string;
versions: Versions;
}

export interface JobCompletePayload extends ExternalEvent {
Expand Down
3 changes: 2 additions & 1 deletion packages/engine-multi/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,5 +83,6 @@ export type Versions = {
node: string;
engine: string;
compiler: string;
[adaptor: string]: string;

[adaptor: string]: string | string[];
};
4 changes: 2 additions & 2 deletions packages/engine-multi/test/api/autoinstall.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -565,7 +565,7 @@ test('write versions to context', async (t) => {
await autoinstall(context);

// @ts-ignore
t.is(context.versions['@openfn/language-common'], '1.0.0');
t.deepEqual(context.versions['@openfn/language-common'], ['1.0.0']);
});

test("write versions to context even if we don't install", async (t) => {
Expand All @@ -578,5 +578,5 @@ test("write versions to context even if we don't install", async (t) => {
await autoinstall(context);

// @ts-ignore
t.is(context.versions['@openfn/language-common'], '1.0.0');
t.deepEqual(context.versions['@openfn/language-common'], ['1.0.0']);
});
7 changes: 4 additions & 3 deletions packages/engine-multi/test/api/execute.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ test.serial('should emit a workflow-start event', async (t) => {
id: 'x',
plan,
} as WorkflowState;

let workflowStart;

const context = createContext({ state, options });
Expand All @@ -96,6 +97,9 @@ test.serial('should emit a workflow-start event', async (t) => {

// No need to do a deep test of the event payload here
t.is(workflowStart!.workflowId!, 'x');
// Just a shallow test on the actual version object to verify that it's been attached
t.truthy(workflowStart!.versions);
t.regex(workflowStart!.versions.node, new RegExp(/(\d+).(\d+).\d+/));
});

test.serial('should emit a log event with the memory limit', async (t) => {
Expand Down Expand Up @@ -156,9 +160,6 @@ test.serial('should emit a job-start event', async (t) => {
await execute(context);

t.is(event.jobId, 'j');
t.truthy(event.versions);
// Just a shallow test on the actual version object to verify that it's been attached
t.regex(event.versions.node, new RegExp(/(\d+).(\d+).\d+/));
});

test.serial('should emit a job-complete event', async (t) => {
Expand Down
7 changes: 3 additions & 4 deletions packages/engine-multi/test/api/lifecycle.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,9 @@ test(`workflowStart: emits ${e.WORKFLOW_START}`, (t) => {
};

context.on(e.WORKFLOW_START, (evt) => {
t.deepEqual(evt, {
workflowId,
threadId: '123',
});
t.truthy(evt.versions);
t.is(evt.workflowId, workflowId);
t.is(evt.threadId, '123');
done();
});

Expand Down
2 changes: 1 addition & 1 deletion packages/engine-multi/test/integration.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ test.serial('trigger workflow-start', (t) => {
api.execute(plan, emptyState).on('workflow-start', (evt) => {
t.is(evt.workflowId, plan.id);
t.truthy(evt.threadId);
t.truthy(evt.versions);
t.pass('workflow started');
done();
});
Expand All @@ -77,7 +78,6 @@ test.serial('trigger job-start', (t) => {
t.is(e.workflowId, '2');
t.is(e.jobId, 'j1');
t.truthy(e.threadId);
t.truthy(e.versions);
t.pass('job started');
done();
});
Expand Down
18 changes: 4 additions & 14 deletions packages/ws-worker/src/api/execute.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,7 @@
import type { ExecutionPlan, Lazy, State } from '@openfn/lexicon';
import type { RunLogPayload, RunStartPayload } from '@openfn/lexicon/lightning';
import type { RunLogPayload } from '@openfn/lexicon/lightning';
import type { Logger } from '@openfn/logger';
import type {
RuntimeEngine,
Resolvers,
WorkflowStartPayload,
} from '@openfn/engine-multi';
import type { RuntimeEngine, Resolvers } from '@openfn/engine-multi';

import {
getWithReply,
Expand All @@ -21,6 +17,7 @@ import {
STEP_START,
GET_CREDENTIAL,
} from '../events';
import handleRunStart from '../events/run-start';
import handleStepComplete from '../events/step-complete';
import handleStepStart from '../events/step-start';
import handleRunComplete from '../events/run-complete';
Expand Down Expand Up @@ -114,7 +111,7 @@ export function execute(
// so that they send in order
const listeners = Object.assign(
{},
addEvent('workflow-start', throttle(onWorkflowStart)),
addEvent('workflow-start', throttle(handleRunStart)),
addEvent('job-start', throttle(handleStepStart)),
addEvent('job-complete', throttle(handleStepComplete)),
addEvent('job-error', throttle(onJobError)),
Expand Down Expand Up @@ -213,13 +210,6 @@ export function onJobError(context: Context, event: any) {
}
}

export function onWorkflowStart(
{ channel }: Context,
_event: WorkflowStartPayload
) {
return sendEvent<RunStartPayload>(channel, RUN_START);
}

export function onJobLog({ channel, state }: Context, event: JSONLog) {
const timeInMicroseconds = BigInt(event.time) / BigInt(1e3);

Expand Down
47 changes: 47 additions & 0 deletions packages/ws-worker/src/events/run-start.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import type { RunStartPayload } from '@openfn/lexicon/lightning';
import { timestamp } from '@openfn/logger';
import type { WorkflowStartPayload } from '@openfn/engine-multi';

import { RUN_START } from '../events';
import { sendEvent, Context, onJobLog } from '../api/execute';
import calculateVersionString from '../util/versions';

import pkg from '../../package.json' assert { type: 'json' };

export default async function onRunStart(
context: Context,
event: WorkflowStartPayload
) {
const { channel, state } = context;
// Cheat on the timestamp time to make sure this is the first thing in the log
const time = (timestamp() - BigInt(10e6)).toString();

// Send the log with its own little state object
// to preserve the run id
// Otherwise, by the time the log sends,
// the active step could have changed
// TODO if I fix ordering I think I can kill this
const versionLogContext = {
...context,
state: {
...state,
activeStep: state.activeStep,
},
};

const versions = {
worker: pkg.version,
...event.versions,
};

await sendEvent<RunStartPayload>(channel, RUN_START, { versions });

const versionMessage = calculateVersionString(versions);

await onJobLog(versionLogContext, {
time,
message: [versionMessage],
level: 'info',
name: 'VER',
});
}
47 changes: 1 addition & 46 deletions packages/ws-worker/src/events/step-start.ts
Original file line number Diff line number Diff line change
@@ -1,70 +1,25 @@
import crypto from 'node:crypto';
import { timestamp } from '@openfn/logger';
import { JobStartPayload } from '@openfn/engine-multi';
import type { Job } from '@openfn/lexicon';
import type { StepStartPayload } from '@openfn/lexicon/lightning';

import pkg from '../../package.json' assert { type: 'json' };
import { STEP_START } from '../events';
import { sendEvent, Context, onJobLog } from '../api/execute';
import calculateVersionString from '../util/versions';
import { sendEvent, Context } from '../api/execute';

export default async function onStepStart(
context: Context,
event: JobStartPayload
) {
// Cheat on the timestamp time to make sure this is the first thing in the log
const time = (timestamp() - BigInt(10e6)).toString();

const { channel, state } = context;

// generate a run id and write it to state
state.activeStep = crypto.randomUUID();
state.activeJob = event.jobId;

const job = state.plan.workflow.steps.find(
({ id }) => id === event.jobId
) as Job;

const input_dataclip_id = state.inputDataclips[event.jobId];

const versions = {
worker: pkg.version,
...event.versions,
};

// Send the log with its own little state object
// to preserve the run id
// Otherwise, by the time the log sends,
// the active step could have changed
// TODO if I fix ordering I think I can kill this
const versionLogContext = {
...context,
state: {
...state,
activeStep: state.activeStep,
},
};

await sendEvent<StepStartPayload>(channel, STEP_START, {
step_id: state.activeStep!,
job_id: state.activeJob!,
input_dataclip_id,

versions,
});

const versionMessage = calculateVersionString(
versionLogContext.state.activeStep,
versions,
job?.adaptor
);

await onJobLog(versionLogContext, {
time,
message: [versionMessage],
level: 'info',
name: 'VER',
});
return;
}
1 change: 1 addition & 0 deletions packages/ws-worker/src/mock/runtime-engine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ async function createMock() {
level: 'info',
json: true,
message: JSON.stringify(args),
name: 'JOB',
time: Date.now(),
});
},
Expand Down
18 changes: 10 additions & 8 deletions packages/ws-worker/src/util/versions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,33 +6,35 @@ export type Versions = {
node: string;
worker: string;

[adaptor: string]: string;
[adaptor: string]: string | string[];
};

export default (stepId: string, versions: Versions, adaptor?: string) => {
export default (versions: Versions) => {
let longest = 'worker'.length; // Bit wierdly defensive but ensure padding is reasonable even if version has no props
for (const v in versions) {
longest = Math.max(v.length, longest);
}

const { node, worker, ...adaptors } = versions;
const { node, worker, compiler, runtime, engine, ...adaptors } = versions;
// Prefix and pad version numbers
const prefix = (str: string) => ` ${t} ${str.padEnd(longest + 4, ' ')}`;

let str = `Versions for step ${stepId}:
let str = `Versions:
${prefix('node.js')}${versions.node || 'unknown'}
${prefix('worker')}${versions.worker || 'unknown'}`;

if (Object.keys(adaptors).length) {
let allAdaptors = Object.keys(adaptors);
if (adaptor) {
allAdaptors = allAdaptors.filter((name) => adaptor.startsWith(name));
}
str +=
'\n' +
allAdaptors
.sort()
.map((adaptorName) => `${prefix(adaptorName)}${adaptors[adaptorName]}`)
.map(
(adaptorName) =>
`${prefix(adaptorName)}${(adaptors[adaptorName] as string[])
.sort()
.join(', ')}`
)
.join('\n');
}

Expand Down
12 changes: 0 additions & 12 deletions packages/ws-worker/test/api/execute.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import {
import {
onJobLog,
execute,
onWorkflowStart,
loadDataclip,
loadCredential,
sendEvent,
Expand Down Expand Up @@ -181,17 +180,6 @@ test('jobError should trigger step:complete with a reason and default state', as
t.deepEqual(stepCompleteEvent.output_dataclip, '{}');
});

test('workflowStart should send an empty run:start event', async (t) => {
const channel = mockChannel({
[RUN_START]: () => {
t.pass();
},
});

// @ts-ignore
await onWorkflowStart({ channel });
});

// test('workflowComplete should send an run:complete event', async (t) => {
// const result = { answer: 42 };

Expand Down
Loading

0 comments on commit 58e0d11

Please sign in to comment.