Skip to content

Commit

Permalink
Merge pull request #572 from OpenFn/release/next
Browse files Browse the repository at this point in the history
Release Worker 0.6.0
  • Loading branch information
josephjclark authored Jan 26, 2024
2 parents df38b06 + c4683f1 commit 434dc3f
Show file tree
Hide file tree
Showing 80 changed files with 2,594 additions and 1,145 deletions.
22 changes: 22 additions & 0 deletions integration-tests/worker/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,27 @@
# @openfn/integration-tests-worker

## 1.0.31

### Patch Changes

- Updated dependencies [eb10b1f]
- Updated dependencies [281391b]
- Updated dependencies [281391b]
- Updated dependencies [2857fe6]
- @openfn/ws-worker@0.6.0
- @openfn/engine-multi@0.3.1
- @openfn/lightning-mock@1.1.10

## 1.0.30

### Patch Changes

- Updated dependencies [9b9ca0c]
- Updated dependencies [9b9ca0c]
- @openfn/ws-worker@0.6.0
- @openfn/engine-multi@0.3.0
- @openfn/lightning-mock@1.1.9

## 1.0.29

### Patch Changes
Expand Down
2 changes: 1 addition & 1 deletion integration-tests/worker/package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "@openfn/integration-tests-worker",
"private": true,
"version": "1.0.29",
"version": "1.0.31",
"description": "Lightning WOrker integration tests",
"author": "Open Function Group <[email protected]>",
"license": "ISC",
Expand Down
4 changes: 2 additions & 2 deletions integration-tests/worker/test/benchmark.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import { run, humanMb } from '../src/util';
let lightning;
let worker;

const maxConcurrency = 1;
const maxConcurrency = 20;

test.before(async () => {
const lightningPort = 4322;
Expand Down Expand Up @@ -90,7 +90,7 @@ test.serial.skip('run 100 attempts', async (t) => {

lightning.on('run:complete', (evt) => {
// May want to disable this but it's nice feedback
//console.log('Completed ', evt.attemptId);
t.log('Completed ', evt.attemptId);

if (evt.payload.reason !== 'success') {
t.log('Atempt failed:');
Expand Down
29 changes: 26 additions & 3 deletions integration-tests/worker/test/exit-reasons.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ test('crash: syntax error', async (t) => {
const result = await run(attempt);

const { reason, error_type, error_message } = result;

t.is(reason, 'crash');
t.is(error_type, 'CompileError');
t.regex(error_message, /Unexpected token \(1:9\)$/);
Expand Down Expand Up @@ -77,7 +76,31 @@ test('exception: autoinstall error', async (t) => {
);
});

test('kill: oom', async (t) => {
test('kill: oom (small, kill worker)', async (t) => {
const attempt = {
id: crypto.randomUUID(),
jobs: [
{
adaptor: '@openfn/language-common@latest',
body: `fn((s) => {
s.data = [];
while(true) {
s.data.push(new Array(1e6).fill("xyz"))
}
})`,
},
],
};

const result = await run(attempt);

const { reason, error_type, error_message } = result;
t.is(reason, 'kill');
t.is(error_type, 'OOMError');
t.is(error_message, 'Run exceeded maximum memory usage');
});

test('kill: oom (large, kill vm)', async (t) => {
const attempt = {
id: crypto.randomUUID(),
jobs: [
Expand All @@ -86,7 +109,7 @@ test('kill: oom', async (t) => {
body: `fn((s) => {
s.data = [];
while(true) {
s.data.push(new Array(1e5).fill("xyz"))
s.data.push(new Array(1e9).fill("xyz"))
}
})`,
},
Expand Down
27 changes: 12 additions & 15 deletions integration-tests/worker/test/integration.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -333,15 +333,19 @@ test('a timeout error should still call run-complete', (t) => {
id: crypto.randomUUID(),
jobs: [
{
adaptor: '@openfn/language-common@latest', // version lock to something stable?
body: 'fn((s) => new Promise((resolve) => setTimeout(() => resolve(s), 1000)))',
// don't try to autoinstall an adaptor because it'll count in the timeout
body: 'export default [(s) => new Promise((resolve) => setTimeout(() => resolve(s), 2000))]',
},
],
options: {
timeout: 100,
runTimeout: 500,
},
};

lightning.once('attempt:start', (event) => {
t.log('attempt started');
});

lightning.once('run:complete', (event) => {
t.is(event.payload.reason, 'kill');
t.is(event.payload.error_type, 'TimeoutError');
Expand Down Expand Up @@ -421,8 +425,8 @@ test('an OOM error should still call run-complete', (t) => {
// });
// });

// TODO this probably needs to move out into another test file
// not going to do it now as I've changed too much too quickly already...
// TODO this test is a bit different now
// I think it's worth keeping
test('stateful adaptor should create a new client for each attempt', (t) => {
return new Promise(async (done) => {
// We want to create our own special worker here
Expand Down Expand Up @@ -454,22 +458,15 @@ test('stateful adaptor should create a new client for each attempt', (t) => {
if (id === attempt2.id) {
const one = results[attempt1.id];
const two = results[attempt2.id];
// The module should be isolated within the same thread
t.is(one.threadId, two.threadId);

// The two attempts should run in different threads
t.not(one.threadId, two.threadId);
t.not(one.clientId, two.clientId);

done();
}
});

// Note that this API doesn't work!!
// shaeme, it would be useful
// lightning.waitForResult(attempt.id, (result) => {
// console.log(result)
// t.pass()
// done()
// })

const engineArgs = {
repoDir: path.resolve('./dummy-repo'),
maxWorkers: 1,
Expand Down
2 changes: 1 addition & 1 deletion integration-tests/worker/test/server.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ const getPort = () => ++portgen;
test.serial('worker should start, respond to 200, and close', async (t) => {
workerProcess = await spawnServer();

// The runnign server should respond to a get at root
// The running server should respond to a get at root
let { status } = await fetch('http://localhost:2222/');
t.is(status, 200);

Expand Down
7 changes: 7 additions & 0 deletions packages/cli/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
# @openfn/cli

## 0.4.15

### Patch Changes

- Updated dependencies [0f22694]
- @openfn/runtime@0.2.5

## 0.4.14

### Patch Changes
Expand Down
2 changes: 1 addition & 1 deletion packages/cli/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@openfn/cli",
"version": "0.4.14",
"version": "0.4.15",
"description": "CLI devtools for the openfn toolchain.",
"engines": {
"node": ">=18",
Expand Down
12 changes: 12 additions & 0 deletions packages/engine-multi/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,17 @@
# engine-multi

## 0.3.0

### Minor Changes

- 9b9ca0c: Replace workerpool with new child_process pool implementation
- 281391b: Replace timeout option with attemptTimeoutMs

### Patch Changes

- Updated dependencies [0f22694]
- @openfn/runtime@0.2.5

## 0.2.6

### Patch Changes
Expand Down
62 changes: 28 additions & 34 deletions packages/engine-multi/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,34 @@ A runtime engine which runs multiple jobs in worker threads.

A long-running node service, suitable for integration with a Worker, for executing workflows.

# Architecture

The engine runs in the main process and exposes an `execute` function, which can be called by some wrapping service (ie, the Lightning Worker).

The engine maintains a pool of long-lived child processes. For every `execute` call, the engine will pick an idle child process from the pool and execute the workflow within it.

Inside the child process, we actually execute the runtime inside a worker thread. Each child process has exactly one worker, which is created on demand and destroyed on completion.

So the process tree looks a bit like this:

```
-- main thread (execute, compile, autoinstall)
-- child_process (spawn worker)
-- worker_thread (@openfn/runtime)
```

Pooled child-processes are lazily spawned. If a worker never needs to run more than one task concurrently, it will only have one child process.

![architecture diagram](docs/architecture.png)

This architecture has several benefits:

- Each run executes in a clean sandbox inside a worker_thread /inside/ a child process. A double-buffered sandbox.
- The child process can always control the thread, even if the thread locks the CPU, to shut it down
- If the worker thread blows its memory limit, other runs will be unaffected as they are in different child processes

At the time of writing, compilation and autoinstall are run on the main thread - not in the child process.

## Usage

The Engine runs Workflows or Execution Plans. A plan MUST have an id.
Expand Down Expand Up @@ -82,37 +110,3 @@ engine.execute(plan, { resolvers });
```

Initial state and credentials are at the moment pre-loaded, with a "fully resolved" state object passed into the runtime. The Runtime has the ability to lazy load but implementing lazy loading across the worker_thread interface has proven tricky.

## Architecture

The Engine uses a dedicated worker found in src/worker/worker.ts. Most of the actual logic is in worker-helper.ts, and is shared by both the real worker (which calls out to @openfn/runtime), and the mock worker (which simulates and evals a run). The mock worker is mostly used in unit tests.

The main interface to the engine, API, exposes a very limited and controlled interface to consumers. api.ts provides the main export and is a thin API wrapper around the main implementation, providing defauls and validation.

The main implementation is in engine.ts, which exposes a much broader interface, with more options. This is potentially dangerous to consumers, but is extremely useful for unit testing here. For example, the dedicated worker path can be set here, as can the whitelist.

When execute is called and passed a plan, the engine first generates an execution context. This contains an event emitter just for that workflower and some contextualised state.

## Security Considerations & Memory Management

The engine uses workerpool to maintain a pool of worker threads.

As workflows come in to be executed, they are passed to workerpool which will pick an idle worker and execute the workflow within it.

workerpool has no natural environment hardening, which means workflows running in the same thread will share an environment. Globals set in workflow A will be available to workflow B, and by the same token an adaptor loaded for workflow A will be shared with workflow B.

Also, because the thread is long-lived, modules imported into the sandbox will be shared.

We have several mitgations against this, ensuring a safe, secure and stable execution environment:

- The runtime sandbox itself ensures that each job runs in an isolated context. If a job escapes the sandbox, it will have access to the thread's global scope
- Each workflow appends a unique id to all its imports, busting the node cache and forcing each module to be re-initialised. This means workers cannot share adaptors and all state is reset.
- To preserve memory, worker threads are regularly purged, meaning destroyed (note that this comes with a performance hit and undermines the use of worker pooling entirely!). When each workflow is complete, if there are no pending tasks to execute, all worker threads are destroyed.

Inside the worker thread, we ensure that:

- The parent `process.env` is not visible (by default in workerpool the woker will "inherit" the parent env)
- The parent process is not accessible (check this)
- The parent scope is not visible (this is innate in workerpool design).

After initialisation, the only way that the parent process and child thread can communicate is a) through the sendMessage() interface (which really means the child can only send messages that the parent is expecting), b) through a shared memory buffer (usage of which is limited and controlled by the parent), and c) returning a value from a function execution.
Binary file added packages/engine-multi/docs/architecture.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
11 changes: 5 additions & 6 deletions packages/engine-multi/package.json
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
{
"name": "@openfn/engine-multi",
"version": "0.2.6",
"version": "0.3.0",
"description": "Multi-process runtime engine",
"main": "dist/index.js",
"type": "module",
"scripts": {
"test": "pnpm ava",
"test": "pnpm ava --serial",
"test:types": "pnpm tsc --noEmit --project tsconfig.json",
"build": "tsup --config ./tsup.config.js",
"build:watch": "pnpm build --watch",
Expand All @@ -17,12 +17,10 @@
"@openfn/compiler": "workspace:*",
"@openfn/language-common": "2.0.0-rc3",
"@openfn/logger": "workspace:*",
"@openfn/runtime": "workspace:*",
"workerpool": "^6.5.1"
"@openfn/runtime": "workspace:*"
},
"devDependencies": {
"@types/node": "^18.15.13",
"@types/workerpool": "^6.4.4",
"ava": "5.3.1",
"ts-node": "^10.9.1",
"tslib": "^2.4.0",
Expand All @@ -31,7 +29,8 @@
"typescript": "^5.1.6"
},
"files": [
"dist",
"dist/index.js",
"dist/worker/",
"README.md",
"CHANGELOG.md"
]
Expand Down
11 changes: 5 additions & 6 deletions packages/engine-multi/src/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ export type LazyResolvers = {
expressions?: Resolver<string>;
};

export type RTEOptions = Partial<
export type APIOptions = Partial<
Omit<EngineOptions, 'whitelist' | 'noCompile'> & {
// Needed here for unit tests to support json expressions. Would rather exclude tbh
compile?: {
Expand All @@ -38,7 +38,7 @@ const DEFAULT_MEMORY_LIMIT = 500;
// Create the engine and handle user-facing stuff, like options parsing
// and defaulting
const createAPI = async function (
options: RTEOptions = {}
options: APIOptions = {}
): Promise<RuntimeEngine> {
let { repoDir } = options;

Expand All @@ -50,7 +50,7 @@ const createAPI = async function (
}
logger.info('repoDir set to ', repoDir);

const engineOptions = {
const engineOptions: EngineOptions = {
logger,

// TODO should resolvers be set here on passed to execute?
Expand All @@ -65,17 +65,16 @@ const createAPI = async function (
// TODO should we disable autoinstall overrides?
autoinstall: options.autoinstall,

minWorkers: options.minWorkers,
maxWorkers: options.maxWorkers,
memoryLimitMb: options.memoryLimitMb || DEFAULT_MEMORY_LIMIT,

purge: options.hasOwnProperty('purge') ? options.purge : true,
attemptTimeoutMs: options.attemptTimeoutMs,

statePropsToRemove: options.statePropsToRemove ?? [
'configuration',
'response',
],
};

logger.info(`memory limit set to ${options.memoryLimitMb}mb`);
logger.info(`statePropsToRemove set to: `, engineOptions.statePropsToRemove);

Expand Down
Loading

0 comments on commit 434dc3f

Please sign in to comment.