Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support zero persistence #599

Merged
merged 3 commits into from
Feb 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions packages/lexicon/lightning.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ export type LightningPlanOptions = {
runTimeoutMs?: number;
sanitize?: SanitizePolicies;
start?: StepId;
output_dataclips?: boolean;
};

/**
Expand Down
19 changes: 12 additions & 7 deletions packages/ws-worker/src/api/execute.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
import type { ExecutionPlan, Lazy, State } from '@openfn/lexicon';
import type {
RunLogPayload,
RunStartPayload,
LightningPlanOptions,
} from '@openfn/lexicon/lightning';
import type { RunLogPayload, RunStartPayload } from '@openfn/lexicon/lightning';
import type { Logger } from '@openfn/logger';
import type {
RuntimeEngine,
Expand Down Expand Up @@ -31,6 +27,7 @@ import handleRunComplete from '../events/run-complete';
import handleRunError from '../events/run-error';

import type { Channel, RunState, JSONLog } from '../types';
import { WorkerRunOptions } from '../util/convert-lightning-plan';

const enc = new TextDecoder('utf-8');

Expand All @@ -41,6 +38,7 @@ export type Context = {
state: RunState;
logger: Logger;
engine: RuntimeEngine;
options: WorkerRunOptions;
onFinish: (result: any) => void;

// maybe its better for version numbers to be scribbled here as we go?
Expand All @@ -63,14 +61,21 @@ export function execute(
logger: Logger,
plan: ExecutionPlan,
input: Lazy<State>,
options: LightningPlanOptions = {},
options: WorkerRunOptions = {},
onFinish = (_result: any) => {}
) {
logger.info('executing ', plan.id);

const state = createRunState(plan, input);

const context: Context = { channel, state, logger, engine, onFinish };
const context: Context = {
channel,
state,
logger,
engine,
options,
onFinish,
};

const throttle = createThrottle();

Expand Down
10 changes: 3 additions & 7 deletions packages/ws-worker/src/channels/run.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,9 @@
import type { ExecutionPlan, Lazy, State } from '@openfn/lexicon';
import type {
GetPlanReply,
LightningPlan,
LightningPlanOptions,
} from '@openfn/lexicon/lightning';
import type { GetPlanReply, LightningPlan } from '@openfn/lexicon/lightning';
import type { Logger } from '@openfn/logger';

import { getWithReply } from '../util';
import convertRun from '../util/convert-lightning-plan';
import convertRun, { WorkerRunOptions } from '../util/convert-lightning-plan';
import { GET_PLAN } from '../events';
import type { Channel, Socket } from '../types';

Expand All @@ -25,7 +21,7 @@ const joinRunChannel = (
return new Promise<{
channel: Channel;
plan: ExecutionPlan;
options: LightningPlanOptions;
options: WorkerRunOptions;
input: Lazy<State>;
}>((resolve, reject) => {
// TMP - lightning seems to be sending two responses to me
Expand Down
10 changes: 7 additions & 3 deletions packages/ws-worker/src/events/step-complete.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import { calculateJobExitReason } from '../api/reasons';
import { sendEvent, Context } from '../api/execute';

export default function onStepComplete(
{ channel, state }: Context,
{ channel, state, options }: Context,
event: JobCompletePayload,
// TODO this isn't terribly graceful, but accept an error for crashes
error?: any
Expand Down Expand Up @@ -52,7 +52,6 @@ export default function onStepComplete(
step_id,
job_id,
output_dataclip_id: dataclipId,
output_dataclip: stringify(outputState),

reason,
error_message,
Expand All @@ -61,6 +60,11 @@ export default function onStepComplete(
mem: event.mem,
duration: event.duration,
thread_id: event.threadId,
};
} as StepCompletePayload;

if (!options || options.outputDataclips !== false) {
evt.output_dataclip = stringify(outputState);
}

return sendEvent<StepCompletePayload>(channel, STEP_COMPLETE, evt);
}
28 changes: 19 additions & 9 deletions packages/ws-worker/src/util/convert-lightning-plan.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,21 +39,31 @@ const mapTriggerEdgeCondition = (edge: Edge) => {
return condition;
};

// Options which relate to this execution but are not part of the plan
export type WorkerRunOptions = ExecuteOptions & {
// Defaults to true - must be explicity false to stop dataclips being sent
outputDataclips?: boolean;
};

export default (
run: LightningPlan
): { plan: ExecutionPlan; options: ExecuteOptions; input: Lazy<State> } => {
): { plan: ExecutionPlan; options: WorkerRunOptions; input: Lazy<State> } => {
// Some options get mapped straight through to the runtime's workflow options
// TODO or maybe not? Maybe they're all sent to the engine instead?
const runtimeOpts: Omit<WorkflowOptions, 'timeout'> = {};

// But some need to get passed down into the engine's options
const engineOpts: ExecuteOptions = {};

if (run.options?.runTimeoutMs) {
engineOpts.runTimeoutMs = run.options.runTimeoutMs;
}
if (run.options?.sanitize) {
engineOpts.sanitize = run.options.sanitize;
const engineOpts: WorkerRunOptions = {};

if (run.options) {
if (run.options.runTimeoutMs) {
engineOpts.runTimeoutMs = run.options.runTimeoutMs;
}
if (run.options.sanitize) {
engineOpts.sanitize = run.options.sanitize;
}
if (run.options.hasOwnProperty('output_dataclips')) {
engineOpts.outputDataclips = run.options.output_dataclips;
}
}

const plan: Partial<ExecutionPlan> = {
Expand Down
32 changes: 32 additions & 0 deletions packages/ws-worker/test/events/step-complete.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -154,3 +154,35 @@ test('send a step:complete event', async (t) => {
} as JobCompletePayload;
await handleStepComplete({ channel, state } as any, event);
});

test('do not include dataclips in step:complete if output_dataclip is false', async (t) => {
const plan = createPlan();
const jobId = 'job-1';
const result = { x: 10 };

const state = createRunState(plan);
state.activeJob = jobId;
state.activeStep = 'b';

const options = {
outputDataclips: false,
};

const channel = mockChannel({
[STEP_COMPLETE]: (evt: StepCompletePayload) => {
t.truthy(evt.output_dataclip_id);
t.falsy(evt.output_dataclip);
},
});

const event = {
jobId,
workflowId: plan.id,
state: result,
next: ['a'],
mem: { job: 1, system: 10 },
duration: 61,
thread_id: 'abc',
} as JobCompletePayload;
await handleStepComplete({ channel, state, options } as any, event);
});
53 changes: 39 additions & 14 deletions packages/ws-worker/test/lightning.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@

import test from 'ava';
import createLightningServer from '@openfn/lightning-mock';
import type {
LightningPlan,
RunCompletePayload,
} from '@openfn/lexicon/lightning';

import { createRun, createEdge, createJob } from './util';

import createWorkerServer from '../src/server';
import createMockRTE from '../src/mock/runtime-engine';
import * as e from '../src/events';
import { RunCompletePayload } from '@openfn/lexicon/lightning';
import createMockRTE from '../src/mock/runtime-engine';

let lng: any;
let worker: any;
Expand Down Expand Up @@ -38,17 +40,18 @@ test.afterEach(() => {

let rollingRunId = 0;

const getRun = (ext = {}, jobs?: any) => ({
id: `a${++rollingRunId}`,
jobs: jobs || [
{
id: 'j',
adaptor: '@openfn/[email protected]',
body: 'fn(() => ({ answer: 42 }))',
},
],
...ext,
});
const getRun = (ext = {}, jobs?: any[]): LightningPlan =>
({
id: `a${++rollingRunId}`,
jobs: jobs || [
{
id: 'j',
adaptor: '@openfn/[email protected]',
body: 'fn(() => ({ answer: 42 }))',
},
],
...ext,
} as LightningPlan);

test.serial(`events: lightning should respond to a ${e.CLAIM} event`, (t) => {
return new Promise((done) => {
Expand Down Expand Up @@ -342,6 +345,28 @@ test.serial(
}
);

test.serial(`events: ${e.STEP_COMPLETE} should not return dataclips`, (t) => {
return new Promise((done) => {
const run = getRun();
run.options = {
output_dataclips: false,
};

lng.onSocketEvent(e.STEP_COMPLETE, run.id, ({ payload }: any) => {
t.is(payload.job_id, 'j');
t.falsy(payload.output_dataclip);
t.truthy(payload.output_dataclip_id);
t.pass();
});

lng.onSocketEvent(e.RUN_COMPLETE, run.id, () => {
done();
});

lng.enqueueRun(run);
});
});

test.serial(`events: lightning should receive a ${e.RUN_LOG} event`, (t) => {
return new Promise((done) => {
const run = {
Expand Down
13 changes: 13 additions & 0 deletions packages/ws-worker/test/util/convert-lightning-plan.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,19 @@ test('handle starting_node_id as options', (t) => {
});
});

test('handle output_dataclip as options', (t) => {
const run: Partial<LightningPlan> = {
id: 'w',
options: {
output_dataclips: false,
},
};
const { options } = convertPlan(run as LightningPlan);
t.deepEqual(options, {
outputDataclips: false,
});
});

test('convert a single trigger', (t) => {
const run: Partial<LightningPlan> = {
id: 'w',
Expand Down