Skip to content

Commit

Permalink
worker: restructure env vars (#699)
Browse files Browse the repository at this point in the history
* worker: restructure env vars

* Fixing: #604

* added new test for statePropsToRemove

* requested changes

* completed: requested changes

* changeset

* worker: simplify typings

* worker: adjust tests

* release: worker @1.1.10

---------

Co-authored-by: Satyam Mattoo <[email protected]>
  • Loading branch information
josephjclark and SatyamMattoo authored May 27, 2024
1 parent 015b7f2 commit 4dafcb2
Show file tree
Hide file tree
Showing 7 changed files with 268 additions and 118 deletions.
7 changes: 7 additions & 0 deletions integration-tests/worker/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
# @openfn/integration-tests-worker

## 1.0.45

### Patch Changes

- Updated dependencies [bc45b3d]
- @openfn/ws-worker@1.1.10

## 1.0.44

### Patch Changes
Expand Down
2 changes: 1 addition & 1 deletion integration-tests/worker/package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "@openfn/integration-tests-worker",
"private": true,
"version": "1.0.44",
"version": "1.0.45",
"description": "Lightning WOrker integration tests",
"author": "Open Function Group <[email protected]>",
"license": "ISC",
Expand Down
6 changes: 6 additions & 0 deletions packages/ws-worker/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# ws-worker

## 1.1.10

### Patch Changes

- bc45b3d: Restructure handling of env vars

## 1.1.9

### Patch Changes
Expand Down
2 changes: 1 addition & 1 deletion packages/ws-worker/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@openfn/ws-worker",
"version": "1.1.9",
"version": "1.1.10",
"description": "A Websocket Worker to connect Lightning to a Runtime Engine",
"main": "dist/index.js",
"type": "module",
Expand Down
119 changes: 3 additions & 116 deletions packages/ws-worker/src/start.ts
Original file line number Diff line number Diff line change
@@ -1,124 +1,11 @@
#!/usr/bin/env node
import yargs from 'yargs';
import { hideBin } from 'yargs/helpers';
import createLogger, { LogLevel } from '@openfn/logger';

import createLogger from '@openfn/logger';
import createRTE from '@openfn/engine-multi';
import createMockRTE from './mock/runtime-engine';
import createWorker, { ServerOptions } from './server';
import cli from './util/cli';

type Args = {
_: string[];
port?: number;
lightning?: string;
repoDir?: string;
secret?: string;
loop?: boolean;
log: LogLevel;
lightningPublicKey?: string;
mock: boolean;
backoff: string;
capacity?: number;
runMemory?: number;
statePropsToRemove?: string[];
maxRunDurationSeconds: number;
};

const {
WORKER_BACKOFF,
WORKER_CAPACITY,
WORKER_LIGHTNING_PUBLIC_KEY,
WORKER_LIGHTNING_SERVICE_URL,
WORKER_LOG_LEVEL,
WORKER_MAX_RUN_DURATION_SECONDS,
WORKER_MAX_RUN_MEMORY_MB,
WORKER_PORT,
WORKER_REPO_DIR,
WORKER_SECRET,
WORKER_STATE_PROPS_TO_REMOVE,
} = process.env;

const args = yargs(hideBin(process.argv))
.command('server', 'Start a ws-worker server')
.option('port', {
alias: 'p',
description: 'Port to run the server on. Env: WORKER_PORT',
type: 'number',
default: WORKER_PORT || 2222,
})
// TODO maybe this is positional and required?
// frees up -l for the log
.option('lightning', {
alias: ['l', 'lightning-service-url'],
description:
'Base url to Lightning websocket endpoint, eg, ws://localhost:4000/worker. Set to "mock" to use the default mock server. Env: WORKER_LIGHTNING_SERVICE_URL',
default: WORKER_LIGHTNING_SERVICE_URL || 'ws://localhost:4000/worker',
})
.option('repo-dir', {
alias: 'd',
description:
'Path to the runtime repo (where modules will be installed). Env: WORKER_REPO_DIR',
default: WORKER_REPO_DIR,
})
.option('secret', {
alias: 's',
description:
'Worker secret. (comes from WORKER_SECRET by default). Env: WORKER_SECRET',
default: WORKER_SECRET,
})
.option('lightning-public-key', {
description:
'Base64-encoded public key. Used to verify run tokens. Env: WORKER_LIGHTNING_PUBLIC_KEY',
default: WORKER_LIGHTNING_PUBLIC_KEY,
})
.option('log', {
description:
'Set the log level for stdout (default to info, set to debug for verbose output). Env: WORKER_LOG_LEVEL',
default: WORKER_LOG_LEVEL || 'debug',
type: 'string',
})
.option('loop', {
description: 'Disable the claims loop',
default: true,
type: 'boolean',
})
.option('mock', {
description: 'Use a mock runtime engine',
default: false,
type: 'boolean',
})
.option('backoff', {
description:
'Claim backoff rules: min/max (in seconds). Env: WORKER_BACKOFF',
default: WORKER_BACKOFF || '1/10',
})
.option('capacity', {
description: 'max concurrent workers. Env: WORKER_CAPACITY',
default: WORKER_CAPACITY ? parseInt(WORKER_CAPACITY) : 5,
type: 'number',
})
.option('state-props-to-remove', {
description:
'A list of properties to remove from the final state returned by a job. Env: WORKER_STATE_PROPS_TO_REMOVE',
default: WORKER_STATE_PROPS_TO_REMOVE ?? ['configuration', 'response'],
type: 'array',
})
.option('run-memory', {
description:
'Maximum memory allocated to a single run, in mb. Env: WORKER_MAX_RUN_MEMORY_MB',
type: 'number',
default: WORKER_MAX_RUN_MEMORY_MB
? parseInt(WORKER_MAX_RUN_MEMORY_MB)
: 500,
})
.option('max-run-duration-seconds', {
alias: 't',
description:
'Default run timeout for the server, in seconds. Env: WORKER_MAX_RUN_DURATION_SECONDS',
type: 'number',
default: WORKER_MAX_RUN_DURATION_SECONDS || 60 * 5, // 5 minutes
})
.parse() as Args;
const args = cli(process.argv)

const logger = createLogger('SRV', { level: args.log });

Expand Down
155 changes: 155 additions & 0 deletions packages/ws-worker/src/util/cli.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
import yargs from 'yargs';
import { LogLevel } from '@openfn/logger';
import { hideBin } from 'yargs/helpers';

type Args = {
_: string[];
port?: number;
lightning?: string;
repoDir?: string;
secret?: string;
loop?: boolean;
log: LogLevel;
lightningPublicKey?: string;
mock: boolean;
backoff: string;
capacity?: number;
runMemory?: number;
statePropsToRemove?: string[];
maxRunDurationSeconds: number;
};

type ArgTypes = string | string[] | number | undefined;

function setArg(
argValue?: ArgTypes,
envValue?: string,
defaultValue?: ArgTypes
): ArgTypes {
if (
Array.isArray(defaultValue) &&
!argValue &&
typeof envValue === 'string'
) {
return envValue.split(',');
}

if (typeof defaultValue === 'number' && envValue && !argValue) {
return parseInt(envValue);
}

return argValue ?? envValue ?? defaultValue;
}

export default function parseArgs(argv: string[]): Args {
const {
WORKER_BACKOFF,
WORKER_CAPACITY,
WORKER_LIGHTNING_PUBLIC_KEY,
WORKER_LIGHTNING_SERVICE_URL,
WORKER_LOG_LEVEL,
WORKER_MAX_RUN_DURATION_SECONDS,
WORKER_MAX_RUN_MEMORY_MB,
WORKER_PORT,
WORKER_REPO_DIR,
WORKER_SECRET,
WORKER_STATE_PROPS_TO_REMOVE,
} = process.env;

const parser = yargs(hideBin(argv))
.command('server', 'Start a ws-worker server')
.option('port', {
alias: 'p',
description: 'Port to run the server on. Env: WORKER_PORT',
type: 'number',
})
.option('lightning', {
alias: ['l', 'lightning-service-url'],
description:
'Base url to Lightning websocket endpoint, eg, ws://localhost:4000/worker. Set to "mock" to use the default mock server. Env: WORKER_LIGHTNING_SERVICE_URL',
})
.option('repo-dir', {
alias: 'd',
description:
'Path to the runtime repo (where modules will be installed). Env: WORKER_REPO_DIR',
})
.option('secret', {
alias: 's',
description:
'Worker secret. (comes from WORKER_SECRET by default). Env: WORKER_SECRET',
})
.option('lightning-public-key', {
description:
'Base64-encoded public key. Used to verify run tokens. Env: WORKER_LIGHTNING_PUBLIC_KEY',
})
.option('log', {
description:
'Set the log level for stdout (default to info, set to debug for verbose output). Env: WORKER_LOG_LEVEL',
})
.option('loop', {
description: 'Disable the claims loop',
type: 'boolean',
default: true,
})
.option('mock', {
description: 'Use a mock runtime engine',
type: 'boolean',
default: false,
})
.option('backoff', {
description:
'Claim backoff rules: min/max (in seconds). Env: WORKER_BACKOFF',
})
.option('capacity', {
description: 'max concurrent workers. Env: WORKER_CAPACITY',
type: 'number',
})
.option('state-props-to-remove', {
description:
'A list of properties to remove from the final state returned by a job. Env: WORKER_STATE_PROPS_TO_REMOVE',
type: 'array',
})
.option('run-memory', {
description:
'Maximum memory allocated to a single run, in mb. Env: WORKER_MAX_RUN_MEMORY_MB',
type: 'number',
})
.option('max-run-duration-seconds', {
alias: 't',
description:
'Default run timeout for the server, in seconds. Env: WORKER_MAX_RUN_DURATION_SECONDS',
type: 'number',
});

const args = parser.parse() as Args;

return {
...args,
port: setArg(args.port, WORKER_PORT, 2222),
lightning: setArg(
args.lightning,
WORKER_LIGHTNING_SERVICE_URL,
'ws://localhost:4000/worker'
),
repoDir: setArg(args.repoDir, WORKER_REPO_DIR),
secret: setArg(args.secret, WORKER_SECRET),
lightningPublicKey: setArg(
args.lightningPublicKey,
WORKER_LIGHTNING_PUBLIC_KEY
),
log: setArg(args.log, WORKER_LOG_LEVEL as LogLevel, 'debug'),
backoff: setArg(args.backoff, WORKER_BACKOFF, '1/10'),
capacity: setArg(args.capacity, WORKER_CAPACITY, 5),
statePropsToRemove: setArg(
args.statePropsToRemove,
WORKER_STATE_PROPS_TO_REMOVE,
['configuration', 'response']
),
runMemory: setArg(args.runMemory, WORKER_MAX_RUN_MEMORY_MB, 500),
maxRunDurationSeconds: setArg(
args.maxRunDurationSeconds,
WORKER_MAX_RUN_DURATION_SECONDS,
300
),
} as Args;
}
Loading

0 comments on commit 4dafcb2

Please sign in to comment.