Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Worker: Update version listings #611

Merged
merged 22 commits into from
Feb 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/fair-bobcats-applaud.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@openfn/engine-multi': patch
---

Record adaptor versions as an array
5 changes: 5 additions & 0 deletions .changeset/yellow-clouds-thank.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@openfn/ws-worker': patch
---

Move version log to workflow start
12 changes: 7 additions & 5 deletions packages/engine-multi/src/api/autoinstall.ts
Original file line number Diff line number Diff line change
Expand Up @@ -135,11 +135,13 @@ const autoinstall = async (context: ExecutionContext): Promise<ModulePaths> => {

const v = version || 'unknown';

// Write the adaptor version to the context
// This is a reasonably accurate, but not totally bulletproof, report
// @ts-ignore
// TODO need to remove this soon as it's basically lying
context.versions[name] = v;
// Write the adaptor version to the context for reporting later
if (!context.versions[name]) {
context.versions[name] = [];
}
if (!context.versions[name].includes(v)) {
(context.versions[name] as string[]).push(v);
}

paths[a] = {
path: `${repoDir}/node_modules/${alias}`,
Expand Down
2 changes: 1 addition & 1 deletion packages/engine-multi/src/api/lifecycle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ export const workflowStart = (
// forward the event on to any external listeners
context.emit(externalEvents.WORKFLOW_START, {
threadId,
versions: context.versions,
});
};

Expand Down Expand Up @@ -81,7 +82,6 @@ export const jobStart = (
context.emit(externalEvents.JOB_START, {
jobId,
threadId,
versions: context.versions,
});
};

Expand Down
2 changes: 1 addition & 1 deletion packages/engine-multi/src/classes/ExecutionContext.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import type {
import type { ExternalEvents, EventMap } from '../events';

/**
* The ExeuctionContext class wraps an event emitter with some useful context
* The ExecutionContext class wraps an event emitter with some useful context
* and automatically appends the workflow id to each emitted events
*
* Each running workflow has its own context object
Expand Down
5 changes: 3 additions & 2 deletions packages/engine-multi/src/events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,9 @@ interface ExternalEvent {
workflowId: string;
}

export interface WorkflowStartPayload extends ExternalEvent {}
export interface WorkflowStartPayload extends ExternalEvent {
versions: Versions;
}

export interface WorkflowCompletePayload extends ExternalEvent {
state: any;
Expand All @@ -64,7 +66,6 @@ export interface WorkflowErrorPayload extends ExternalEvent {

export interface JobStartPayload extends ExternalEvent {
jobId: string;
versions: Versions;
}

export interface JobCompletePayload extends ExternalEvent {
Expand Down
3 changes: 2 additions & 1 deletion packages/engine-multi/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,5 +83,6 @@ export type Versions = {
node: string;
engine: string;
compiler: string;
[adaptor: string]: string;

[adaptor: string]: string | string[];
};
4 changes: 2 additions & 2 deletions packages/engine-multi/test/api/autoinstall.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -565,7 +565,7 @@ test('write versions to context', async (t) => {
await autoinstall(context);

// @ts-ignore
t.is(context.versions['@openfn/language-common'], '1.0.0');
t.deepEqual(context.versions['@openfn/language-common'], ['1.0.0']);
});

test("write versions to context even if we don't install", async (t) => {
Expand All @@ -578,5 +578,5 @@ test("write versions to context even if we don't install", async (t) => {
await autoinstall(context);

// @ts-ignore
t.is(context.versions['@openfn/language-common'], '1.0.0');
t.deepEqual(context.versions['@openfn/language-common'], ['1.0.0']);
});
7 changes: 4 additions & 3 deletions packages/engine-multi/test/api/execute.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ test.serial('should emit a workflow-start event', async (t) => {
id: 'x',
plan,
} as WorkflowState;

let workflowStart;

const context = createContext({ state, options });
Expand All @@ -96,6 +97,9 @@ test.serial('should emit a workflow-start event', async (t) => {

// No need to do a deep test of the event payload here
t.is(workflowStart!.workflowId!, 'x');
// Just a shallow test on the actual version object to verify that it's been attached
t.truthy(workflowStart!.versions);
t.regex(workflowStart!.versions.node, new RegExp(/(\d+).(\d+).\d+/));
});

test.serial('should emit a log event with the memory limit', async (t) => {
Expand Down Expand Up @@ -156,9 +160,6 @@ test.serial('should emit a job-start event', async (t) => {
await execute(context);

t.is(event.jobId, 'j');
t.truthy(event.versions);
// Just a shallow test on the actual version object to verify that it's been attached
t.regex(event.versions.node, new RegExp(/(\d+).(\d+).\d+/));
});

test.serial('should emit a job-complete event', async (t) => {
Expand Down
7 changes: 3 additions & 4 deletions packages/engine-multi/test/api/lifecycle.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,9 @@ test(`workflowStart: emits ${e.WORKFLOW_START}`, (t) => {
};

context.on(e.WORKFLOW_START, (evt) => {
t.deepEqual(evt, {
workflowId,
threadId: '123',
});
t.truthy(evt.versions);
t.is(evt.workflowId, workflowId);
t.is(evt.threadId, '123');
done();
});

Expand Down
2 changes: 1 addition & 1 deletion packages/engine-multi/test/integration.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ test.serial('trigger workflow-start', (t) => {
api.execute(plan, emptyState).on('workflow-start', (evt) => {
t.is(evt.workflowId, plan.id);
t.truthy(evt.threadId);
t.truthy(evt.versions);
t.pass('workflow started');
done();
});
Expand All @@ -77,7 +78,6 @@ test.serial('trigger job-start', (t) => {
t.is(e.workflowId, '2');
t.is(e.jobId, 'j1');
t.truthy(e.threadId);
t.truthy(e.versions);
t.pass('job started');
done();
});
Expand Down
18 changes: 4 additions & 14 deletions packages/ws-worker/src/api/execute.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,7 @@
import type { ExecutionPlan, Lazy, State } from '@openfn/lexicon';
import type { RunLogPayload, RunStartPayload } from '@openfn/lexicon/lightning';
import type { RunLogPayload } from '@openfn/lexicon/lightning';
import type { Logger } from '@openfn/logger';
import type {
RuntimeEngine,
Resolvers,
WorkflowStartPayload,
} from '@openfn/engine-multi';
import type { RuntimeEngine, Resolvers } from '@openfn/engine-multi';

import {
getWithReply,
Expand All @@ -21,6 +17,7 @@ import {
STEP_START,
GET_CREDENTIAL,
} from '../events';
import handleRunStart from '../events/run-start';
import handleStepComplete from '../events/step-complete';
import handleStepStart from '../events/step-start';
import handleRunComplete from '../events/run-complete';
Expand Down Expand Up @@ -114,7 +111,7 @@ export function execute(
// so that they send in order
const listeners = Object.assign(
{},
addEvent('workflow-start', throttle(onWorkflowStart)),
addEvent('workflow-start', throttle(handleRunStart)),
addEvent('job-start', throttle(handleStepStart)),
addEvent('job-complete', throttle(handleStepComplete)),
addEvent('job-error', throttle(onJobError)),
Expand Down Expand Up @@ -213,13 +210,6 @@ export function onJobError(context: Context, event: any) {
}
}

export function onWorkflowStart(
{ channel }: Context,
_event: WorkflowStartPayload
) {
return sendEvent<RunStartPayload>(channel, RUN_START);
}

export function onJobLog({ channel, state }: Context, event: JSONLog) {
const timeInMicroseconds = BigInt(event.time) / BigInt(1e3);

Expand Down
47 changes: 47 additions & 0 deletions packages/ws-worker/src/events/run-start.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import type { RunStartPayload } from '@openfn/lexicon/lightning';
import { timestamp } from '@openfn/logger';
import type { WorkflowStartPayload } from '@openfn/engine-multi';

import { RUN_START } from '../events';
import { sendEvent, Context, onJobLog } from '../api/execute';
import calculateVersionString from '../util/versions';

import pkg from '../../package.json' assert { type: 'json' };

export default async function onRunStart(
context: Context,
event: WorkflowStartPayload
) {
const { channel, state } = context;
// Cheat on the timestamp time to make sure this is the first thing in the log
const time = (timestamp() - BigInt(10e6)).toString();

// Send the log with its own little state object
// to preserve the run id
// Otherwise, by the time the log sends,
// the active step could have changed
// TODO if I fix ordering I think I can kill this
const versionLogContext = {
...context,
state: {
...state,
activeStep: state.activeStep,
},
};

const versions = {
worker: pkg.version,
...event.versions,
};

await sendEvent<RunStartPayload>(channel, RUN_START, { versions });

const versionMessage = calculateVersionString(versions);

await onJobLog(versionLogContext, {
time,
message: [versionMessage],
level: 'info',
name: 'VER',
});
}
47 changes: 1 addition & 46 deletions packages/ws-worker/src/events/step-start.ts
Original file line number Diff line number Diff line change
@@ -1,70 +1,25 @@
import crypto from 'node:crypto';
import { timestamp } from '@openfn/logger';
import { JobStartPayload } from '@openfn/engine-multi';
import type { Job } from '@openfn/lexicon';
import type { StepStartPayload } from '@openfn/lexicon/lightning';

import pkg from '../../package.json' assert { type: 'json' };
import { STEP_START } from '../events';
import { sendEvent, Context, onJobLog } from '../api/execute';
import calculateVersionString from '../util/versions';
import { sendEvent, Context } from '../api/execute';

export default async function onStepStart(
context: Context,
event: JobStartPayload
) {
// Cheat on the timestamp time to make sure this is the first thing in the log
const time = (timestamp() - BigInt(10e6)).toString();

const { channel, state } = context;

// generate a run id and write it to state
state.activeStep = crypto.randomUUID();
state.activeJob = event.jobId;

const job = state.plan.workflow.steps.find(
({ id }) => id === event.jobId
) as Job;

const input_dataclip_id = state.inputDataclips[event.jobId];

const versions = {
worker: pkg.version,
...event.versions,
};

// Send the log with its own little state object
// to preserve the run id
// Otherwise, by the time the log sends,
// the active step could have changed
// TODO if I fix ordering I think I can kill this
const versionLogContext = {
...context,
state: {
...state,
activeStep: state.activeStep,
},
};

await sendEvent<StepStartPayload>(channel, STEP_START, {
step_id: state.activeStep!,
job_id: state.activeJob!,
input_dataclip_id,

versions,
});

const versionMessage = calculateVersionString(
versionLogContext.state.activeStep,
versions,
job?.adaptor
);

await onJobLog(versionLogContext, {
time,
message: [versionMessage],
level: 'info',
name: 'VER',
});
return;
}
1 change: 1 addition & 0 deletions packages/ws-worker/src/mock/runtime-engine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ async function createMock() {
level: 'info',
json: true,
message: JSON.stringify(args),
name: 'JOB',
time: Date.now(),
});
},
Expand Down
18 changes: 10 additions & 8 deletions packages/ws-worker/src/util/versions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,33 +6,35 @@ export type Versions = {
node: string;
worker: string;

[adaptor: string]: string;
[adaptor: string]: string | string[];
};

export default (stepId: string, versions: Versions, adaptor?: string) => {
export default (versions: Versions) => {
let longest = 'worker'.length; // Bit wierdly defensive but ensure padding is reasonable even if version has no props
for (const v in versions) {
longest = Math.max(v.length, longest);
}

const { node, worker, ...adaptors } = versions;
const { node, worker, compiler, runtime, engine, ...adaptors } = versions;
// Prefix and pad version numbers
const prefix = (str: string) => ` ${t} ${str.padEnd(longest + 4, ' ')}`;

let str = `Versions for step ${stepId}:
let str = `Versions:
${prefix('node.js')}${versions.node || 'unknown'}
${prefix('worker')}${versions.worker || 'unknown'}`;

if (Object.keys(adaptors).length) {
let allAdaptors = Object.keys(adaptors);
if (adaptor) {
allAdaptors = allAdaptors.filter((name) => adaptor.startsWith(name));
}
str +=
'\n' +
allAdaptors
.sort()
.map((adaptorName) => `${prefix(adaptorName)}${adaptors[adaptorName]}`)
.map(
(adaptorName) =>
`${prefix(adaptorName)}${(adaptors[adaptorName] as string[])
.sort()
.join(', ')}`
)
.join('\n');
}

Expand Down
12 changes: 0 additions & 12 deletions packages/ws-worker/test/api/execute.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import {
import {
onJobLog,
execute,
onWorkflowStart,
loadDataclip,
loadCredential,
sendEvent,
Expand Down Expand Up @@ -181,17 +180,6 @@ test('jobError should trigger step:complete with a reason and default state', as
t.deepEqual(stepCompleteEvent.output_dataclip, '{}');
});

test('workflowStart should send an empty run:start event', async (t) => {
const channel = mockChannel({
[RUN_START]: () => {
t.pass();
},
});

// @ts-ignore
await onWorkflowStart({ channel });
});

// test('workflowComplete should send an run:complete event', async (t) => {
// const result = { answer: 42 };

Expand Down
Loading