From cc8b9db3a7383950bfcf55e18ffcab08525cfa0e Mon Sep 17 00:00:00 2001 From: Joe Clark Date: Tue, 13 Feb 2024 16:00:52 +0000 Subject: [PATCH 1/3] worker: support output_dataclips on run options --- packages/lexicon/lightning.d.ts | 1 + packages/ws-worker/src/api/execute.ts | 13 ++++++-- packages/ws-worker/src/channels/run.ts | 10 ++---- .../ws-worker/src/events/step-complete.ts | 10 ++++-- .../src/util/convert-lightning-plan.ts | 28 ++++++++++------ .../test/events/step-complete.test.ts | 32 +++++++++++++++++++ .../test/util/convert-lightning-plan.test.ts | 13 ++++++++ 7 files changed, 86 insertions(+), 21 deletions(-) diff --git a/packages/lexicon/lightning.d.ts b/packages/lexicon/lightning.d.ts index e54902d95..c7e232366 100644 --- a/packages/lexicon/lightning.d.ts +++ b/packages/lexicon/lightning.d.ts @@ -44,6 +44,7 @@ export type LightningPlanOptions = { runTimeoutMs?: number; sanitize?: SanitizePolicies; start?: StepId; + output_dataclips?: boolean; }; /** diff --git a/packages/ws-worker/src/api/execute.ts b/packages/ws-worker/src/api/execute.ts index 48b4acca6..a0ebf9c8f 100644 --- a/packages/ws-worker/src/api/execute.ts +++ b/packages/ws-worker/src/api/execute.ts @@ -31,6 +31,7 @@ import handleRunComplete from '../events/run-complete'; import handleRunError from '../events/run-error'; import type { Channel, RunState, JSONLog } from '../types'; +import { WorkerRunOptions } from '../util/convert-lightning-plan'; const enc = new TextDecoder('utf-8'); @@ -41,6 +42,7 @@ export type Context = { state: RunState; logger: Logger; engine: RuntimeEngine; + options: WorkerRunOptions; onFinish: (result: any) => void; // maybe its better for version numbers to be scribbled here as we go? @@ -63,14 +65,21 @@ export function execute( logger: Logger, plan: ExecutionPlan, input: Lazy, - options: LightningPlanOptions = {}, + options: WorkerRunOptions = {}, onFinish = (_result: any) => {} ) { logger.info('executing ', plan.id); const state = createRunState(plan, input); - const context: Context = { channel, state, logger, engine, onFinish }; + const context: Context = { + channel, + state, + logger, + engine, + options, + onFinish, + }; const throttle = createThrottle(); diff --git a/packages/ws-worker/src/channels/run.ts b/packages/ws-worker/src/channels/run.ts index 104fba82d..8a505cf8a 100644 --- a/packages/ws-worker/src/channels/run.ts +++ b/packages/ws-worker/src/channels/run.ts @@ -1,13 +1,9 @@ import type { ExecutionPlan, Lazy, State } from '@openfn/lexicon'; -import type { - GetPlanReply, - LightningPlan, - LightningPlanOptions, -} from '@openfn/lexicon/lightning'; +import type { GetPlanReply, LightningPlan } from '@openfn/lexicon/lightning'; import type { Logger } from '@openfn/logger'; import { getWithReply } from '../util'; -import convertRun from '../util/convert-lightning-plan'; +import convertRun, { WorkerRunOptions } from '../util/convert-lightning-plan'; import { GET_PLAN } from '../events'; import type { Channel, Socket } from '../types'; @@ -25,7 +21,7 @@ const joinRunChannel = ( return new Promise<{ channel: Channel; plan: ExecutionPlan; - options: LightningPlanOptions; + options: WorkerRunOptions; input: Lazy; }>((resolve, reject) => { // TMP - lightning seems to be sending two responses to me diff --git a/packages/ws-worker/src/events/step-complete.ts b/packages/ws-worker/src/events/step-complete.ts index 51c5bfe8d..a542a5944 100644 --- a/packages/ws-worker/src/events/step-complete.ts +++ b/packages/ws-worker/src/events/step-complete.ts @@ -8,7 +8,7 @@ import { calculateJobExitReason } from '../api/reasons'; import { sendEvent, Context } from '../api/execute'; export default function onStepComplete( - { channel, state }: Context, + { channel, state, options }: Context, event: JobCompletePayload, // TODO this isn't terribly graceful, but accept an error for crashes error?: any @@ -52,7 +52,6 @@ export default function onStepComplete( step_id, job_id, output_dataclip_id: dataclipId, - output_dataclip: stringify(outputState), reason, error_message, @@ -61,6 +60,11 @@ export default function onStepComplete( mem: event.mem, duration: event.duration, thread_id: event.threadId, - }; + } as StepCompletePayload; + + if (!options || options.outputDataclips !== false) { + evt.output_dataclip = stringify(outputState); + } + return sendEvent(channel, STEP_COMPLETE, evt); } diff --git a/packages/ws-worker/src/util/convert-lightning-plan.ts b/packages/ws-worker/src/util/convert-lightning-plan.ts index df2427a3d..d6892a3ea 100644 --- a/packages/ws-worker/src/util/convert-lightning-plan.ts +++ b/packages/ws-worker/src/util/convert-lightning-plan.ts @@ -39,21 +39,31 @@ const mapTriggerEdgeCondition = (edge: Edge) => { return condition; }; +// Options which relate to this execution but are not part of the plan +export type WorkerRunOptions = ExecuteOptions & { + // Defaults to true - must be explicity false to stop dataclips being sent + outputDataclips?: boolean; +}; + export default ( run: LightningPlan -): { plan: ExecutionPlan; options: ExecuteOptions; input: Lazy } => { +): { plan: ExecutionPlan; options: WorkerRunOptions; input: Lazy } => { // Some options get mapped straight through to the runtime's workflow options - // TODO or maybe not? Maybe they're all sent to the engine instead? const runtimeOpts: Omit = {}; // But some need to get passed down into the engine's options - const engineOpts: ExecuteOptions = {}; - - if (run.options?.runTimeoutMs) { - engineOpts.runTimeoutMs = run.options.runTimeoutMs; - } - if (run.options?.sanitize) { - engineOpts.sanitize = run.options.sanitize; + const engineOpts: WorkerRunOptions = {}; + + if (run.options) { + if (run.options.runTimeoutMs) { + engineOpts.runTimeoutMs = run.options.runTimeoutMs; + } + if (run.options.sanitize) { + engineOpts.sanitize = run.options.sanitize; + } + if (run.options.hasOwnProperty('output_dataclips')) { + engineOpts.outputDataclips = run.options.output_dataclips; + } } const plan: Partial = { diff --git a/packages/ws-worker/test/events/step-complete.test.ts b/packages/ws-worker/test/events/step-complete.test.ts index 8ba8a8e3d..4972eb0f7 100644 --- a/packages/ws-worker/test/events/step-complete.test.ts +++ b/packages/ws-worker/test/events/step-complete.test.ts @@ -154,3 +154,35 @@ test('send a step:complete event', async (t) => { } as JobCompletePayload; await handleStepComplete({ channel, state } as any, event); }); + +test('do not include dataclips in step:complete if output_dataclip is false', async (t) => { + const plan = createPlan(); + const jobId = 'job-1'; + const result = { x: 10 }; + + const state = createRunState(plan); + state.activeJob = jobId; + state.activeStep = 'b'; + + const options = { + outputDataclips: false, + }; + + const channel = mockChannel({ + [STEP_COMPLETE]: (evt: StepCompletePayload) => { + t.truthy(evt.output_dataclip_id); + t.falsy(evt.output_dataclip); + }, + }); + + const event = { + jobId, + workflowId: plan.id, + state: result, + next: ['a'], + mem: { job: 1, system: 10 }, + duration: 61, + thread_id: 'abc', + } as JobCompletePayload; + await handleStepComplete({ channel, state, options } as any, event); +}); diff --git a/packages/ws-worker/test/util/convert-lightning-plan.test.ts b/packages/ws-worker/test/util/convert-lightning-plan.test.ts index dbed7a503..c0ffa73a4 100644 --- a/packages/ws-worker/test/util/convert-lightning-plan.test.ts +++ b/packages/ws-worker/test/util/convert-lightning-plan.test.ts @@ -165,6 +165,19 @@ test('handle starting_node_id as options', (t) => { }); }); +test('handle output_dataclip as options', (t) => { + const run: Partial = { + id: 'w', + options: { + output_dataclips: false, + }, + }; + const { options } = convertPlan(run as LightningPlan); + t.deepEqual(options, { + outputDataclips: false, + }); +}); + test('convert a single trigger', (t) => { const run: Partial = { id: 'w', From 97b381606e231aabbb964f382ff500ec7ba3e80a Mon Sep 17 00:00:00 2001 From: Joe Clark Date: Tue, 13 Feb 2024 18:00:05 +0000 Subject: [PATCH 2/3] worker: additioonal test of output_dataclips --- packages/ws-worker/test/lightning.test.ts | 54 +++++++++++++++++------ 1 file changed, 40 insertions(+), 14 deletions(-) diff --git a/packages/ws-worker/test/lightning.test.ts b/packages/ws-worker/test/lightning.test.ts index 15edea4e1..3ba4df866 100644 --- a/packages/ws-worker/test/lightning.test.ts +++ b/packages/ws-worker/test/lightning.test.ts @@ -4,13 +4,16 @@ import test from 'ava'; import createLightningServer from '@openfn/lightning-mock'; +import type { + LightningPlan, + Node, + RunCompletePayload, +} from '@openfn/lexicon/lightning'; import { createRun, createEdge, createJob } from './util'; - import createWorkerServer from '../src/server'; -import createMockRTE from '../src/mock/runtime-engine'; import * as e from '../src/events'; -import { RunCompletePayload } from '@openfn/lexicon/lightning'; +import createMockRTE from '../src/mock/runtime-engine'; let lng: any; let worker: any; @@ -38,17 +41,18 @@ test.afterEach(() => { let rollingRunId = 0; -const getRun = (ext = {}, jobs?: any) => ({ - id: `a${++rollingRunId}`, - jobs: jobs || [ - { - id: 'j', - adaptor: '@openfn/language-common@1.0.0', - body: 'fn(() => ({ answer: 42 }))', - }, - ], - ...ext, -}); +const getRun = (ext = {}, jobs?: any[]): LightningPlan => + ({ + id: `a${++rollingRunId}`, + jobs: jobs || [ + { + id: 'j', + adaptor: '@openfn/language-common@1.0.0', + body: 'fn(() => ({ answer: 42 }))', + }, + ], + ...ext, + } as LightningPlan); test.serial(`events: lightning should respond to a ${e.CLAIM} event`, (t) => { return new Promise((done) => { @@ -342,6 +346,28 @@ test.serial( } ); +test.serial(`events: ${e.STEP_COMPLETE} should not return dataclips`, (t) => { + return new Promise((done) => { + const run = getRun(); + run.options = { + output_dataclips: false, + }; + + lng.onSocketEvent(e.STEP_COMPLETE, run.id, ({ payload }: any) => { + t.is(payload.job_id, 'j'); + t.falsy(payload.output_dataclip); + t.truthy(payload.output_dataclip_id); + t.pass(); + }); + + lng.onSocketEvent(e.RUN_COMPLETE, run.id, () => { + done(); + }); + + lng.enqueueRun(run); + }); +}); + test.serial(`events: lightning should receive a ${e.RUN_LOG} event`, (t) => { return new Promise((done) => { const run = { From 31c97233700006ca1228c8bc5db040d566d9c6de Mon Sep 17 00:00:00 2001 From: Joe Clark Date: Tue, 13 Feb 2024 18:03:37 +0000 Subject: [PATCH 3/3] types --- packages/ws-worker/src/api/execute.ts | 6 +----- packages/ws-worker/test/lightning.test.ts | 1 - 2 files changed, 1 insertion(+), 6 deletions(-) diff --git a/packages/ws-worker/src/api/execute.ts b/packages/ws-worker/src/api/execute.ts index a0ebf9c8f..c35ea2ca9 100644 --- a/packages/ws-worker/src/api/execute.ts +++ b/packages/ws-worker/src/api/execute.ts @@ -1,9 +1,5 @@ import type { ExecutionPlan, Lazy, State } from '@openfn/lexicon'; -import type { - RunLogPayload, - RunStartPayload, - LightningPlanOptions, -} from '@openfn/lexicon/lightning'; +import type { RunLogPayload, RunStartPayload } from '@openfn/lexicon/lightning'; import type { Logger } from '@openfn/logger'; import type { RuntimeEngine, diff --git a/packages/ws-worker/test/lightning.test.ts b/packages/ws-worker/test/lightning.test.ts index 3ba4df866..4466bec36 100644 --- a/packages/ws-worker/test/lightning.test.ts +++ b/packages/ws-worker/test/lightning.test.ts @@ -6,7 +6,6 @@ import test from 'ava'; import createLightningServer from '@openfn/lightning-mock'; import type { LightningPlan, - Node, RunCompletePayload, } from '@openfn/lexicon/lightning';