Skip to content

Commit

Permalink
worker: restructure env vars
Browse files Browse the repository at this point in the history
* Fixing: #604

* added new test for statePropsToRemove

* requested changes

* completed: requested changes
  • Loading branch information
SatyamMattoo authored May 27, 2024
1 parent 015b7f2 commit c49ae24
Show file tree
Hide file tree
Showing 3 changed files with 216 additions and 116 deletions.
119 changes: 3 additions & 116 deletions packages/ws-worker/src/start.ts
Original file line number Diff line number Diff line change
@@ -1,124 +1,11 @@
#!/usr/bin/env node
import yargs from 'yargs';
import { hideBin } from 'yargs/helpers';
import createLogger, { LogLevel } from '@openfn/logger';

import createLogger from '@openfn/logger';
import createRTE from '@openfn/engine-multi';
import createMockRTE from './mock/runtime-engine';
import createWorker, { ServerOptions } from './server';
import cli from './util/cli';

type Args = {
_: string[];
port?: number;
lightning?: string;
repoDir?: string;
secret?: string;
loop?: boolean;
log: LogLevel;
lightningPublicKey?: string;
mock: boolean;
backoff: string;
capacity?: number;
runMemory?: number;
statePropsToRemove?: string[];
maxRunDurationSeconds: number;
};

const {
WORKER_BACKOFF,
WORKER_CAPACITY,
WORKER_LIGHTNING_PUBLIC_KEY,
WORKER_LIGHTNING_SERVICE_URL,
WORKER_LOG_LEVEL,
WORKER_MAX_RUN_DURATION_SECONDS,
WORKER_MAX_RUN_MEMORY_MB,
WORKER_PORT,
WORKER_REPO_DIR,
WORKER_SECRET,
WORKER_STATE_PROPS_TO_REMOVE,
} = process.env;

const args = yargs(hideBin(process.argv))
.command('server', 'Start a ws-worker server')
.option('port', {
alias: 'p',
description: 'Port to run the server on. Env: WORKER_PORT',
type: 'number',
default: WORKER_PORT || 2222,
})
// TODO maybe this is positional and required?
// frees up -l for the log
.option('lightning', {
alias: ['l', 'lightning-service-url'],
description:
'Base url to Lightning websocket endpoint, eg, ws://localhost:4000/worker. Set to "mock" to use the default mock server. Env: WORKER_LIGHTNING_SERVICE_URL',
default: WORKER_LIGHTNING_SERVICE_URL || 'ws://localhost:4000/worker',
})
.option('repo-dir', {
alias: 'd',
description:
'Path to the runtime repo (where modules will be installed). Env: WORKER_REPO_DIR',
default: WORKER_REPO_DIR,
})
.option('secret', {
alias: 's',
description:
'Worker secret. (comes from WORKER_SECRET by default). Env: WORKER_SECRET',
default: WORKER_SECRET,
})
.option('lightning-public-key', {
description:
'Base64-encoded public key. Used to verify run tokens. Env: WORKER_LIGHTNING_PUBLIC_KEY',
default: WORKER_LIGHTNING_PUBLIC_KEY,
})
.option('log', {
description:
'Set the log level for stdout (default to info, set to debug for verbose output). Env: WORKER_LOG_LEVEL',
default: WORKER_LOG_LEVEL || 'debug',
type: 'string',
})
.option('loop', {
description: 'Disable the claims loop',
default: true,
type: 'boolean',
})
.option('mock', {
description: 'Use a mock runtime engine',
default: false,
type: 'boolean',
})
.option('backoff', {
description:
'Claim backoff rules: min/max (in seconds). Env: WORKER_BACKOFF',
default: WORKER_BACKOFF || '1/10',
})
.option('capacity', {
description: 'max concurrent workers. Env: WORKER_CAPACITY',
default: WORKER_CAPACITY ? parseInt(WORKER_CAPACITY) : 5,
type: 'number',
})
.option('state-props-to-remove', {
description:
'A list of properties to remove from the final state returned by a job. Env: WORKER_STATE_PROPS_TO_REMOVE',
default: WORKER_STATE_PROPS_TO_REMOVE ?? ['configuration', 'response'],
type: 'array',
})
.option('run-memory', {
description:
'Maximum memory allocated to a single run, in mb. Env: WORKER_MAX_RUN_MEMORY_MB',
type: 'number',
default: WORKER_MAX_RUN_MEMORY_MB
? parseInt(WORKER_MAX_RUN_MEMORY_MB)
: 500,
})
.option('max-run-duration-seconds', {
alias: 't',
description:
'Default run timeout for the server, in seconds. Env: WORKER_MAX_RUN_DURATION_SECONDS',
type: 'number',
default: WORKER_MAX_RUN_DURATION_SECONDS || 60 * 5, // 5 minutes
})
.parse() as Args;
const args = cli(process.argv)

const logger = createLogger('SRV', { level: args.log });

Expand Down
121 changes: 121 additions & 0 deletions packages/ws-worker/src/util/cli.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
import yargs from 'yargs';
import { LogLevel } from '@openfn/logger';
import { hideBin } from 'yargs/helpers';

type Args = {
_: string[];
port?: number;
lightning?: string;
repoDir?: string;
secret?: string;
loop?: boolean;
log: LogLevel;
lightningPublicKey?: string;
mock: boolean;
backoff: string;
capacity?: number;
runMemory?: number;
statePropsToRemove?: string[];
maxRunDurationSeconds: number;
};

function setArg<T>(argValue?: T, envValue?: string, defaultValue?: T): T | undefined {
if (Array.isArray(defaultValue) && envValue !== undefined && argValue == undefined) {
return (envValue.split(',')) as T;
}

if (typeof defaultValue === 'number' && envValue !== undefined && argValue == undefined) {
return parseInt(envValue) as T;
}

return argValue ?? (envValue as T) ?? defaultValue;
}

export default function parseArgs(argv: string[]): Args {
const {
WORKER_BACKOFF,
WORKER_CAPACITY,
WORKER_LIGHTNING_PUBLIC_KEY,
WORKER_LIGHTNING_SERVICE_URL,
WORKER_LOG_LEVEL,
WORKER_MAX_RUN_DURATION_SECONDS,
WORKER_MAX_RUN_MEMORY_MB,
WORKER_PORT,
WORKER_REPO_DIR,
WORKER_SECRET,
WORKER_STATE_PROPS_TO_REMOVE,
} = process.env;

const parser = yargs(hideBin(argv))
.command('server', 'Start a ws-worker server')
.option('port', {
alias: 'p',
description: 'Port to run the server on. Env: WORKER_PORT',
type: 'number',
})
.option('lightning', {
alias: ['l', 'lightning-service-url'],
description: 'Base url to Lightning websocket endpoint, eg, ws://localhost:4000/worker. Set to "mock" to use the default mock server. Env: WORKER_LIGHTNING_SERVICE_URL',
})
.option('repo-dir', {
alias: 'd',
description: 'Path to the runtime repo (where modules will be installed). Env: WORKER_REPO_DIR',
})
.option('secret', {
alias: 's',
description: 'Worker secret. (comes from WORKER_SECRET by default). Env: WORKER_SECRET',
})
.option('lightning-public-key', {
description: 'Base64-encoded public key. Used to verify run tokens. Env: WORKER_LIGHTNING_PUBLIC_KEY',
})
.option('log', {
description: 'Set the log level for stdout (default to info, set to debug for verbose output). Env: WORKER_LOG_LEVEL',
})
.option('loop', {
description: 'Disable the claims loop',
type: 'boolean',
default: true,
})
.option('mock', {
description: 'Use a mock runtime engine',
type: 'boolean',
default: false,
})
.option('backoff', {
description: 'Claim backoff rules: min/max (in seconds). Env: WORKER_BACKOFF',
})
.option('capacity', {
description: 'max concurrent workers. Env: WORKER_CAPACITY',
type: 'number',
})
.option('state-props-to-remove', {
description: 'A list of properties to remove from the final state returned by a job. Env: WORKER_STATE_PROPS_TO_REMOVE',
type: 'array',
})
.option('run-memory', {
description: 'Maximum memory allocated to a single run, in mb. Env: WORKER_MAX_RUN_MEMORY_MB',
type: 'number',
})
.option('max-run-duration-seconds', {
alias: 't',
description: 'Default run timeout for the server, in seconds. Env: WORKER_MAX_RUN_DURATION_SECONDS',
type: 'number',
});

const args = parser.parse() as Args;

return {
...args,
port: setArg(args.port, WORKER_PORT, 2222),
lightning: setArg(args.lightning, WORKER_LIGHTNING_SERVICE_URL, 'ws://localhost:4000/worker'),
repoDir: setArg(args.repoDir, WORKER_REPO_DIR),
secret: setArg(args.secret, WORKER_SECRET),
lightningPublicKey: setArg(args.lightningPublicKey, WORKER_LIGHTNING_PUBLIC_KEY),
log: setArg(args.log, WORKER_LOG_LEVEL as LogLevel, 'debug'),
backoff: setArg(args.backoff, WORKER_BACKOFF, '1/10'),
capacity: setArg(args.capacity, WORKER_CAPACITY, 5),
statePropsToRemove: setArg(args.statePropsToRemove, WORKER_STATE_PROPS_TO_REMOVE, ['configuration', 'response']),
runMemory: setArg(args.runMemory, WORKER_MAX_RUN_MEMORY_MB, 500),
maxRunDurationSeconds: setArg(args.maxRunDurationSeconds, WORKER_MAX_RUN_DURATION_SECONDS, 300),
} as Args;
}
92 changes: 92 additions & 0 deletions packages/ws-worker/test/util/cli.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
import test from 'ava';
import cli from '../../src/util/cli';
import { LogLevel } from '@openfn/logger';


const resetEnv = (env: NodeJS.ProcessEnv) => {
process.env = { ...env };
};

test.beforeEach(t => {
// Store original environment variables
t.context = { ...process.env };
});

test.afterEach(t => {
// Restore original environment variables
resetEnv(t.context as NodeJS.ProcessEnv);
});

test('cli should parse command line arguments correctly', t => {
const argv = 'node cli.js --port 3000 --log info --max-run-duration-seconds 120'
const args = cli(argv.split(' '))

t.is(args.port, 3000);
t.is(args.log, 'info' as LogLevel);
t.is(args.maxRunDurationSeconds, 120);
});

test('cli should use environment variables as defaults', t => {
process.env.WORKER_PORT = '4000';
process.env.WORKER_LOG_LEVEL = 'error';
process.env.WORKER_MAX_RUN_DURATION_SECONDS = '180';

const argv = 'node cli.js';
const args = cli(argv.split(' '));

t.is(args.port, 4000);
t.is(args.log, 'error' as LogLevel);
t.is(args.maxRunDurationSeconds, 180);
});

test('cli should override environment variables with command line arguments', t => {
process.env.WORKER_PORT = '4000';
process.env.WORKER_LOG_LEVEL = 'error';
process.env.WORKER_MAX_RUN_DURATION_SECONDS = '180';

const argv = 'node cli.js --port 5000 --log debug --max-run-duration-seconds 240';
const args = cli(argv.split(' '));

t.is(args.port, 5000);
t.is(args.log, 'debug' as LogLevel);
t.is(args.maxRunDurationSeconds, 240);
});

test('cli should set default values for unspecified options', t => {
const argv = 'node cli.js';

const args = cli(argv.split(' '));

t.is(args.port, 2222);
t.is(args.lightning, 'ws://localhost:4000/worker');
t.is(args.log, 'debug' as LogLevel);
t.is(args.backoff, '1/10');
t.is(args.capacity, 5);
t.deepEqual(args.statePropsToRemove, ['configuration', 'response']);
t.is(args.runMemory, 500);
t.is(args.maxRunDurationSeconds, 300);
});

test('cli should handle boolean options correctly', t => {
const argv = 'node cli.js --loop false --mock true';
const args = cli(argv.split(' '));

t.is(args.loop, false);
t.is(args.mock, true);
});

test('cli should handle array options correctly', t => {
const argv = 'node cli.js --state-props-to-remove prop1 prop2 prop3';
process.env.WORKER_STATE_PROPS_TO_REMOVE = 'prop4,prop5,prop6';
const args = cli(argv.split(' '));

t.deepEqual(args.statePropsToRemove, ['prop1', 'prop2', 'prop3']);
});

test('cli should handle array options correctly for env variables', t => {
const argv = 'node cli.js';
process.env.WORKER_STATE_PROPS_TO_REMOVE = 'prop1,prop2,prop3';
const args = cli(argv.split(' '));

t.deepEqual(args.statePropsToRemove, ['prop1', 'prop2', 'prop3']);
});

0 comments on commit c49ae24

Please sign in to comment.