Skip to content

Commit

Permalink
Merge pull request #599 from OpenFn/no-output-dataclips
Browse files Browse the repository at this point in the history
Support zero persistence
  • Loading branch information
josephjclark authored Feb 13, 2024
2 parents c7f2b3e + 31c9723 commit e48998c
Show file tree
Hide file tree
Showing 8 changed files with 126 additions and 40 deletions.
1 change: 1 addition & 0 deletions packages/lexicon/lightning.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ export type LightningPlanOptions = {
runTimeoutMs?: number;
sanitize?: SanitizePolicies;
start?: StepId;
output_dataclips?: boolean;
};

/**
Expand Down
19 changes: 12 additions & 7 deletions packages/ws-worker/src/api/execute.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
import type { ExecutionPlan, Lazy, State } from '@openfn/lexicon';
import type {
RunLogPayload,
RunStartPayload,
LightningPlanOptions,
} from '@openfn/lexicon/lightning';
import type { RunLogPayload, RunStartPayload } from '@openfn/lexicon/lightning';
import type { Logger } from '@openfn/logger';
import type {
RuntimeEngine,
Expand Down Expand Up @@ -31,6 +27,7 @@ import handleRunComplete from '../events/run-complete';
import handleRunError from '../events/run-error';

import type { Channel, RunState, JSONLog } from '../types';
import { WorkerRunOptions } from '../util/convert-lightning-plan';

const enc = new TextDecoder('utf-8');

Expand All @@ -41,6 +38,7 @@ export type Context = {
state: RunState;
logger: Logger;
engine: RuntimeEngine;
options: WorkerRunOptions;
onFinish: (result: any) => void;

// maybe its better for version numbers to be scribbled here as we go?
Expand All @@ -63,14 +61,21 @@ export function execute(
logger: Logger,
plan: ExecutionPlan,
input: Lazy<State>,
options: LightningPlanOptions = {},
options: WorkerRunOptions = {},
onFinish = (_result: any) => {}
) {
logger.info('executing ', plan.id);

const state = createRunState(plan, input);

const context: Context = { channel, state, logger, engine, onFinish };
const context: Context = {
channel,
state,
logger,
engine,
options,
onFinish,
};

const throttle = createThrottle();

Expand Down
10 changes: 3 additions & 7 deletions packages/ws-worker/src/channels/run.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,9 @@
import type { ExecutionPlan, Lazy, State } from '@openfn/lexicon';
import type {
GetPlanReply,
LightningPlan,
LightningPlanOptions,
} from '@openfn/lexicon/lightning';
import type { GetPlanReply, LightningPlan } from '@openfn/lexicon/lightning';
import type { Logger } from '@openfn/logger';

import { getWithReply } from '../util';
import convertRun from '../util/convert-lightning-plan';
import convertRun, { WorkerRunOptions } from '../util/convert-lightning-plan';
import { GET_PLAN } from '../events';
import type { Channel, Socket } from '../types';

Expand All @@ -25,7 +21,7 @@ const joinRunChannel = (
return new Promise<{
channel: Channel;
plan: ExecutionPlan;
options: LightningPlanOptions;
options: WorkerRunOptions;
input: Lazy<State>;
}>((resolve, reject) => {
// TMP - lightning seems to be sending two responses to me
Expand Down
10 changes: 7 additions & 3 deletions packages/ws-worker/src/events/step-complete.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import { calculateJobExitReason } from '../api/reasons';
import { sendEvent, Context } from '../api/execute';

export default function onStepComplete(
{ channel, state }: Context,
{ channel, state, options }: Context,
event: JobCompletePayload,
// TODO this isn't terribly graceful, but accept an error for crashes
error?: any
Expand Down Expand Up @@ -52,7 +52,6 @@ export default function onStepComplete(
step_id,
job_id,
output_dataclip_id: dataclipId,
output_dataclip: stringify(outputState),

reason,
error_message,
Expand All @@ -61,6 +60,11 @@ export default function onStepComplete(
mem: event.mem,
duration: event.duration,
thread_id: event.threadId,
};
} as StepCompletePayload;

if (!options || options.outputDataclips !== false) {
evt.output_dataclip = stringify(outputState);
}

return sendEvent<StepCompletePayload>(channel, STEP_COMPLETE, evt);
}
28 changes: 19 additions & 9 deletions packages/ws-worker/src/util/convert-lightning-plan.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,21 +39,31 @@ const mapTriggerEdgeCondition = (edge: Edge) => {
return condition;
};

// Options which relate to this execution but are not part of the plan
export type WorkerRunOptions = ExecuteOptions & {
// Defaults to true - must be explicity false to stop dataclips being sent
outputDataclips?: boolean;
};

export default (
run: LightningPlan
): { plan: ExecutionPlan; options: ExecuteOptions; input: Lazy<State> } => {
): { plan: ExecutionPlan; options: WorkerRunOptions; input: Lazy<State> } => {
// Some options get mapped straight through to the runtime's workflow options
// TODO or maybe not? Maybe they're all sent to the engine instead?
const runtimeOpts: Omit<WorkflowOptions, 'timeout'> = {};

// But some need to get passed down into the engine's options
const engineOpts: ExecuteOptions = {};

if (run.options?.runTimeoutMs) {
engineOpts.runTimeoutMs = run.options.runTimeoutMs;
}
if (run.options?.sanitize) {
engineOpts.sanitize = run.options.sanitize;
const engineOpts: WorkerRunOptions = {};

if (run.options) {
if (run.options.runTimeoutMs) {
engineOpts.runTimeoutMs = run.options.runTimeoutMs;
}
if (run.options.sanitize) {
engineOpts.sanitize = run.options.sanitize;
}
if (run.options.hasOwnProperty('output_dataclips')) {
engineOpts.outputDataclips = run.options.output_dataclips;
}
}

const plan: Partial<ExecutionPlan> = {
Expand Down
32 changes: 32 additions & 0 deletions packages/ws-worker/test/events/step-complete.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -154,3 +154,35 @@ test('send a step:complete event', async (t) => {
} as JobCompletePayload;
await handleStepComplete({ channel, state } as any, event);
});

test('do not include dataclips in step:complete if output_dataclip is false', async (t) => {
const plan = createPlan();
const jobId = 'job-1';
const result = { x: 10 };

const state = createRunState(plan);
state.activeJob = jobId;
state.activeStep = 'b';

const options = {
outputDataclips: false,
};

const channel = mockChannel({
[STEP_COMPLETE]: (evt: StepCompletePayload) => {
t.truthy(evt.output_dataclip_id);
t.falsy(evt.output_dataclip);
},
});

const event = {
jobId,
workflowId: plan.id,
state: result,
next: ['a'],
mem: { job: 1, system: 10 },
duration: 61,
thread_id: 'abc',
} as JobCompletePayload;
await handleStepComplete({ channel, state, options } as any, event);
});
53 changes: 39 additions & 14 deletions packages/ws-worker/test/lightning.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@

import test from 'ava';
import createLightningServer from '@openfn/lightning-mock';
import type {
LightningPlan,
RunCompletePayload,
} from '@openfn/lexicon/lightning';

import { createRun, createEdge, createJob } from './util';

import createWorkerServer from '../src/server';
import createMockRTE from '../src/mock/runtime-engine';
import * as e from '../src/events';
import { RunCompletePayload } from '@openfn/lexicon/lightning';
import createMockRTE from '../src/mock/runtime-engine';

let lng: any;
let worker: any;
Expand Down Expand Up @@ -38,17 +40,18 @@ test.afterEach(() => {

let rollingRunId = 0;

const getRun = (ext = {}, jobs?: any) => ({
id: `a${++rollingRunId}`,
jobs: jobs || [
{
id: 'j',
adaptor: '@openfn/[email protected]',
body: 'fn(() => ({ answer: 42 }))',
},
],
...ext,
});
const getRun = (ext = {}, jobs?: any[]): LightningPlan =>
({
id: `a${++rollingRunId}`,
jobs: jobs || [
{
id: 'j',
adaptor: '@openfn/[email protected]',
body: 'fn(() => ({ answer: 42 }))',
},
],
...ext,
} as LightningPlan);

test.serial(`events: lightning should respond to a ${e.CLAIM} event`, (t) => {
return new Promise((done) => {
Expand Down Expand Up @@ -342,6 +345,28 @@ test.serial(
}
);

test.serial(`events: ${e.STEP_COMPLETE} should not return dataclips`, (t) => {
return new Promise((done) => {
const run = getRun();
run.options = {
output_dataclips: false,
};

lng.onSocketEvent(e.STEP_COMPLETE, run.id, ({ payload }: any) => {
t.is(payload.job_id, 'j');
t.falsy(payload.output_dataclip);
t.truthy(payload.output_dataclip_id);
t.pass();
});

lng.onSocketEvent(e.RUN_COMPLETE, run.id, () => {
done();
});

lng.enqueueRun(run);
});
});

test.serial(`events: lightning should receive a ${e.RUN_LOG} event`, (t) => {
return new Promise((done) => {
const run = {
Expand Down
13 changes: 13 additions & 0 deletions packages/ws-worker/test/util/convert-lightning-plan.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,19 @@ test('handle starting_node_id as options', (t) => {
});
});

test('handle output_dataclip as options', (t) => {
const run: Partial<LightningPlan> = {
id: 'w',
options: {
output_dataclips: false,
},
};
const { options } = convertPlan(run as LightningPlan);
t.deepEqual(options, {
outputDataclips: false,
});
});

test('convert a single trigger', (t) => {
const run: Partial<LightningPlan> = {
id: 'w',
Expand Down

0 comments on commit e48998c

Please sign in to comment.