Skip to content

Commit

Permalink
Merge pull request #613 from OpenFn/release/next
Browse files Browse the repository at this point in the history
Next Release
  • Loading branch information
taylordowns2000 authored Feb 23, 2024
2 parents d5a740b + 8c94a9d commit 6b4a5fa
Show file tree
Hide file tree
Showing 45 changed files with 851 additions and 360 deletions.
11 changes: 11 additions & 0 deletions integration-tests/worker/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,16 @@
# @openfn/integration-tests-worker

## 1.0.36

### Patch Changes

- Updated dependencies [4f5f1dd]
- Updated dependencies [58e0d11]
- Updated dependencies [58e0d11]
- @openfn/engine-multi@1.1.0
- @openfn/ws-worker@1.0.1
- @openfn/lightning-mock@2.0.1

## 1.0.35

### Patch Changes
Expand Down
2 changes: 1 addition & 1 deletion integration-tests/worker/package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "@openfn/integration-tests-worker",
"private": true,
"version": "1.0.35",
"version": "1.0.36",
"description": "Lightning WOrker integration tests",
"author": "Open Function Group <[email protected]>",
"license": "ISC",
Expand Down
38 changes: 37 additions & 1 deletion integration-tests/worker/test/runs.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ const humanMb = (sizeInBytes: number) => Math.round(sizeInBytes / 1024 / 1024);
const run = async (t, attempt) => {
return new Promise<any>(async (done, reject) => {
lightning.on('step:complete', ({ payload }) => {
t.is(payload.reason, 'success');

// TODO friendlier job names for this would be nice (rather than run ids)
t.log(
`run ${payload.step_id} done in ${payload.duration / 1000}s [${humanMb(
Expand Down Expand Up @@ -192,7 +194,7 @@ test.serial('run parallel jobs', async (t) => {
// });
});

test('run a http adaptor job', async (t) => {
test.serial('run a http adaptor job', async (t) => {
const job = createJob({
adaptor: '@openfn/[email protected]',
body: `get("https://jsonplaceholder.typicode.com/todos/1");
Expand All @@ -212,3 +214,37 @@ test('run a http adaptor job', async (t) => {
completed: false,
});
});

test.serial('use different versions of the same adaptor', async (t) => {
// http@5 exported an axios global - so run this job and validate that the global is there
const job1 = createJob({
body: `import { axios } from "@openfn/language-http";
fn((s) => {
if (!axios) {
throw new Error('AXIOS NOT FOUND')
}
return s;
})`,
adaptor: '@openfn/[email protected]',
});

// http@6 no longer exports axios - so throw an error if we see it
const job2 = createJob({
body: `import { axios } from "@openfn/language-http";
fn((s) => {
if (axios) {
throw new Error('AXIOS FOUND')
}
return s;
})`,
adaptor: '@openfn/[email protected]',
});

// Just for fun, run each job a couple of times to make sure that there's no wierd caching or ordering anything
const steps = [job1, job2, job1, job2];
const attempt = createRun([], steps, []);

const result = await run(t, attempt);
t.log(result);
t.falsy(result.errors);
});
9 changes: 9 additions & 0 deletions packages/cli/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,14 @@
# @openfn/cli

## 1.1.0

### Patch Changes

Allow multiple version of the same adaptor to run in the same workflow

- Updated dependencies [4f5f1dd]
- @openfn/runtime@1.1.0

## 1.0.0

### Major Changes
Expand Down
2 changes: 1 addition & 1 deletion packages/cli/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@openfn/cli",
"version": "1.0.0",
"version": "1.1.0",
"description": "CLI devtools for the openfn toolchain.",
"engines": {
"node": ">=18",
Expand Down
12 changes: 12 additions & 0 deletions packages/engine-multi/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,17 @@
# engine-multi

## 1.1.0

### Minor Changes

- 4f5f1dd: Support workflows with different versions of the same adaptor

### Patch Changes

- 58e0d11: Record adaptor versions as an array
- Updated dependencies [4f5f1dd]
- @openfn/runtime@1.1.0

## 1.0.0

### Major Changes
Expand Down
2 changes: 1 addition & 1 deletion packages/engine-multi/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@openfn/engine-multi",
"version": "1.0.0",
"version": "1.1.0",
"description": "Multi-process runtime engine",
"main": "dist/index.js",
"type": "module",
Expand Down
27 changes: 20 additions & 7 deletions packages/engine-multi/src/api/autoinstall.ts
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,6 @@ const autoinstall = async (context: ExecutionContext): Promise<ModulePaths> => {
}

if (!skipRepoValidation && !didValidateRepo) {
// TODO what if this throws?
// Whole server probably needs to crash, so throwing is probably appropriate
// TODO do we need to do it on EVERY call? Can we not cache it?
await ensureRepo(repoDir, logger);
didValidateRepo = true;
Expand All @@ -137,12 +135,15 @@ const autoinstall = async (context: ExecutionContext): Promise<ModulePaths> => {

const v = version || 'unknown';

// Write the adaptor version to the context
// This is a reasonably accurate, but not totally bulletproof, report
// @ts-ignore
context.versions[name] = v;
// Write the adaptor version to the context for reporting later
if (!context.versions[name]) {
context.versions[name] = [];
}
if (!context.versions[name].includes(v)) {
(context.versions[name] as string[]).push(v);
}

paths[name] = {
paths[a] = {
path: `${repoDir}/node_modules/${alias}`,
version: v,
};
Expand All @@ -152,6 +153,18 @@ const autoinstall = async (context: ExecutionContext): Promise<ModulePaths> => {
}
}

// Write linker arguments back to the plan
for (const step of plan.workflow.steps) {
const job = step as unknown as Job;
if (paths[job.adaptor!]) {
const { name } = getNameAndVersion(job.adaptor!);
// @ts-ignore
job.linker = {
[name]: paths[job.adaptor!],
};
}
}

if (adaptorsToLoad.length) {
// Add this to the queue
const p = enqueue(adaptorsToLoad);
Expand Down
12 changes: 6 additions & 6 deletions packages/engine-multi/src/api/execute.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,13 @@ import {
} from './lifecycle';
import preloadCredentials from './preload-credentials';
import { ExecutionError } from '../errors';
import type { RunOptions } from '../worker/thread/run';

const execute = async (context: ExecutionContext) => {
const { state, callWorker, logger, options } = context;
try {
// TODO catch and "throw" nice clean autoinstall errors
const adaptorPaths = await autoinstall(context);
await autoinstall(context);

// TODO catch and "throw" nice clean compile errors
try {
await compile(context);
} catch (e: any) {
Expand All @@ -49,10 +48,9 @@ const execute = async (context: ExecutionContext) => {
const whitelist = options.whitelist?.map((w) => w.toString());

const runOptions = {
adaptorPaths,
whitelist,
statePropsToRemove: options.statePropsToRemove,
};
whitelist,
} as RunOptions;

const workerOptions = {
memoryLimitMb: options.memoryLimitMb,
Expand Down Expand Up @@ -109,13 +107,15 @@ const execute = async (context: ExecutionContext) => {
jobError(context, evt);
},
[workerEvents.LOG]: (evt: workerEvents.LogEvent) => {
// console.log(evt.log.name, evt.log.message);
log(context, evt);
},
// TODO this is also untested
[workerEvents.ERROR]: (evt: workerEvents.ErrorEvent) => {
error(context, { workflowId: state.plan.id, error: evt.error });
},
};

return callWorker(
'run',
[state.plan, state.input || {}, runOptions || {}],
Expand Down
2 changes: 1 addition & 1 deletion packages/engine-multi/src/api/lifecycle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ export const workflowStart = (
// forward the event on to any external listeners
context.emit(externalEvents.WORKFLOW_START, {
threadId,
versions: context.versions,
});
};

Expand Down Expand Up @@ -81,7 +82,6 @@ export const jobStart = (
context.emit(externalEvents.JOB_START, {
jobId,
threadId,
versions: context.versions,
});
};

Expand Down
2 changes: 1 addition & 1 deletion packages/engine-multi/src/classes/ExecutionContext.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import type {
import type { ExternalEvents, EventMap } from '../events';

/**
* The ExeuctionContext class wraps an event emitter with some useful context
* The ExecutionContext class wraps an event emitter with some useful context
* and automatically appends the workflow id to each emitted events
*
* Each running workflow has its own context object
Expand Down
5 changes: 3 additions & 2 deletions packages/engine-multi/src/events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,9 @@ interface ExternalEvent {
workflowId: string;
}

export interface WorkflowStartPayload extends ExternalEvent {}
export interface WorkflowStartPayload extends ExternalEvent {
versions: Versions;
}

export interface WorkflowCompletePayload extends ExternalEvent {
state: any;
Expand All @@ -64,7 +66,6 @@ export interface WorkflowErrorPayload extends ExternalEvent {

export interface JobStartPayload extends ExternalEvent {
jobId: string;
versions: Versions;
}

export interface JobCompletePayload extends ExternalEvent {
Expand Down
3 changes: 2 additions & 1 deletion packages/engine-multi/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,5 +83,6 @@ export type Versions = {
node: string;
engine: string;
compiler: string;
[adaptor: string]: string;

[adaptor: string]: string | string[];
};
9 changes: 4 additions & 5 deletions packages/engine-multi/src/worker/thread/run.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ import { execute, createLoggers } from './helpers';
import serializeError from '../../util/serialize-error';
import { JobErrorPayload } from '../../events';

type RunOptions = {
adaptorPaths: Record<string, { path: string }>;
export type RunOptions = {
repoDir: string;
whitelist?: RegExp[];
sanitize: SanitizePolicies;
statePropsToRemove?: string[];
Expand All @@ -26,8 +26,7 @@ const eventMap = {

register({
run: (plan: ExecutionPlan, input: State, runOptions: RunOptions) => {
const { adaptorPaths, whitelist, sanitize, statePropsToRemove } =
runOptions;
const { repoDir, whitelist, sanitize, statePropsToRemove } = runOptions;
const { logger, jobLogger, adaptorLogger } = createLoggers(
plan.id!,
sanitize,
Expand All @@ -52,7 +51,7 @@ register({
logger,
jobLogger,
linker: {
modules: adaptorPaths,
repo: repoDir,
whitelist,
cacheKey: plan.id,
},
Expand Down
Loading

0 comments on commit 6b4a5fa

Please sign in to comment.