Skip to content

Commit

Permalink
worker: support output_dataclips on run options
Browse files Browse the repository at this point in the history
  • Loading branch information
josephjclark committed Feb 13, 2024
1 parent cf9abea commit 56d61ac
Show file tree
Hide file tree
Showing 7 changed files with 86 additions and 21 deletions.
1 change: 1 addition & 0 deletions packages/lexicon/lightning.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ export type LightningPlanOptions = {
runTimeoutMs?: number;
sanitize?: SanitizePolicies;
start?: StepId;
output_dataclips?: boolean;
};

/**
Expand Down
13 changes: 11 additions & 2 deletions packages/ws-worker/src/api/execute.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import handleRunComplete from '../events/run-complete';
import handleRunError from '../events/run-error';

import type { Channel, RunState, JSONLog } from '../types';
import { WorkerRunOptions } from '../util/convert-lightning-plan';

const enc = new TextDecoder('utf-8');

Expand All @@ -41,6 +42,7 @@ export type Context = {
state: RunState;
logger: Logger;
engine: RuntimeEngine;
options: WorkerRunOptions;
onFinish: (result: any) => void;

// maybe its better for version numbers to be scribbled here as we go?
Expand All @@ -63,14 +65,21 @@ export function execute(
logger: Logger,
plan: ExecutionPlan,
input: Lazy<State>,
options: LightningPlanOptions = {},
options: WorkerRunOptions = {},
onFinish = (_result: any) => {}
) {
logger.info('executing ', plan.id);

const state = createRunState(plan, input);

const context: Context = { channel, state, logger, engine, onFinish };
const context: Context = {
channel,
state,
logger,
engine,
options,
onFinish,
};

const throttle = createThrottle();

Expand Down
10 changes: 3 additions & 7 deletions packages/ws-worker/src/channels/run.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,9 @@
import type { ExecutionPlan, Lazy, State } from '@openfn/lexicon';
import type {
GetPlanReply,
LightningPlan,
LightningPlanOptions,
} from '@openfn/lexicon/lightning';
import type { GetPlanReply, LightningPlan } from '@openfn/lexicon/lightning';
import type { Logger } from '@openfn/logger';

import { getWithReply } from '../util';
import convertRun from '../util/convert-lightning-plan';
import convertRun, { WorkerRunOptions } from '../util/convert-lightning-plan';
import { GET_PLAN } from '../events';
import type { Channel, Socket } from '../types';

Expand All @@ -25,7 +21,7 @@ const joinRunChannel = (
return new Promise<{
channel: Channel;
plan: ExecutionPlan;
options: LightningPlanOptions;
options: WorkerRunOptions;
input: Lazy<State>;
}>((resolve, reject) => {
// TMP - lightning seems to be sending two responses to me
Expand Down
10 changes: 7 additions & 3 deletions packages/ws-worker/src/events/step-complete.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import { calculateJobExitReason } from '../api/reasons';
import { sendEvent, Context } from '../api/execute';

export default function onStepComplete(
{ channel, state }: Context,
{ channel, state, options }: Context,
event: JobCompletePayload,
// TODO this isn't terribly graceful, but accept an error for crashes
error?: any
Expand Down Expand Up @@ -52,7 +52,6 @@ export default function onStepComplete(
step_id,
job_id,
output_dataclip_id: dataclipId,
output_dataclip: stringify(outputState),

reason,
error_message,
Expand All @@ -61,6 +60,11 @@ export default function onStepComplete(
mem: event.mem,
duration: event.duration,
thread_id: event.threadId,
};
} as StepCompletePayload;

if (!options || options.outputDataclips !== false) {
evt.output_dataclip = stringify(outputState);
}

return sendEvent<StepCompletePayload>(channel, STEP_COMPLETE, evt);
}
28 changes: 19 additions & 9 deletions packages/ws-worker/src/util/convert-lightning-plan.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,21 +39,31 @@ const mapTriggerEdgeCondition = (edge: Edge) => {
return condition;
};

// Options which relate to this execution but are not part of the plan
export type WorkerRunOptions = ExecuteOptions & {
// Defaults to true - must be explicity false to stop dataclips being sent
outputDataclips?: boolean;
};

export default (
run: LightningPlan
): { plan: ExecutionPlan; options: ExecuteOptions; input: Lazy<State> } => {
): { plan: ExecutionPlan; options: WorkerRunOptions; input: Lazy<State> } => {
// Some options get mapped straight through to the runtime's workflow options
// TODO or maybe not? Maybe they're all sent to the engine instead?
const runtimeOpts: Omit<WorkflowOptions, 'timeout'> = {};

// But some need to get passed down into the engine's options
const engineOpts: ExecuteOptions = {};

if (run.options?.runTimeoutMs) {
engineOpts.runTimeoutMs = run.options.runTimeoutMs;
}
if (run.options?.sanitize) {
engineOpts.sanitize = run.options.sanitize;
const engineOpts: WorkerRunOptions = {};

if (run.options) {
if (run.options.runTimeoutMs) {
engineOpts.runTimeoutMs = run.options.runTimeoutMs;
}
if (run.options.sanitize) {
engineOpts.sanitize = run.options.sanitize;
}
if (run.options.hasOwnProperty('output_dataclips')) {
engineOpts.outputDataclips = run.options.output_dataclips;
}
}

const plan: Partial<ExecutionPlan> = {
Expand Down
32 changes: 32 additions & 0 deletions packages/ws-worker/test/events/step-complete.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -154,3 +154,35 @@ test('send a step:complete event', async (t) => {
} as JobCompletePayload;
await handleStepComplete({ channel, state } as any, event);
});

test('do not include dataclips in step:complete if output_dataclip is false', async (t) => {
const plan = createPlan();
const jobId = 'job-1';
const result = { x: 10 };

const state = createRunState(plan);
state.activeJob = jobId;
state.activeStep = 'b';

const options = {
outputDataclips: false,
};

const channel = mockChannel({
[STEP_COMPLETE]: (evt: StepCompletePayload) => {
t.truthy(evt.output_dataclip_id);
t.falsy(evt.output_dataclip);
},
});

const event = {
jobId,
workflowId: plan.id,
state: result,
next: ['a'],
mem: { job: 1, system: 10 },
duration: 61,
thread_id: 'abc',
} as JobCompletePayload;
await handleStepComplete({ channel, state, options } as any, event);
});
13 changes: 13 additions & 0 deletions packages/ws-worker/test/util/convert-lightning-plan.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,19 @@ test('handle starting_node_id as options', (t) => {
});
});

test('handle output_dataclip as options', (t) => {
const run: Partial<LightningPlan> = {
id: 'w',
options: {
output_dataclips: false,
},
};
const { options } = convertPlan(run as LightningPlan);
t.deepEqual(options, {
outputDataclips: false,
});
});

test('convert a single trigger', (t) => {
const run: Partial<LightningPlan> = {
id: 'w',
Expand Down

0 comments on commit 56d61ac

Please sign in to comment.