From c49ae2433f6973df0b9bc7f344b3df24c4dfb5ff Mon Sep 17 00:00:00 2001 From: Satyam Mattoo <96661612+SatyamMattoo@users.noreply.github.com> Date: Mon, 27 May 2024 20:08:21 +0530 Subject: [PATCH] worker: restructure env vars * Fixing: #604 * added new test for statePropsToRemove * requested changes * completed: requested changes --- packages/ws-worker/src/start.ts | 119 +--------------------- packages/ws-worker/src/util/cli.ts | 121 +++++++++++++++++++++++ packages/ws-worker/test/util/cli.test.ts | 92 +++++++++++++++++ 3 files changed, 216 insertions(+), 116 deletions(-) create mode 100644 packages/ws-worker/src/util/cli.ts create mode 100644 packages/ws-worker/test/util/cli.test.ts diff --git a/packages/ws-worker/src/start.ts b/packages/ws-worker/src/start.ts index 9d6e38d63..5889725cb 100644 --- a/packages/ws-worker/src/start.ts +++ b/packages/ws-worker/src/start.ts @@ -1,124 +1,11 @@ #!/usr/bin/env node -import yargs from 'yargs'; -import { hideBin } from 'yargs/helpers'; -import createLogger, { LogLevel } from '@openfn/logger'; - +import createLogger from '@openfn/logger'; import createRTE from '@openfn/engine-multi'; import createMockRTE from './mock/runtime-engine'; import createWorker, { ServerOptions } from './server'; +import cli from './util/cli'; -type Args = { - _: string[]; - port?: number; - lightning?: string; - repoDir?: string; - secret?: string; - loop?: boolean; - log: LogLevel; - lightningPublicKey?: string; - mock: boolean; - backoff: string; - capacity?: number; - runMemory?: number; - statePropsToRemove?: string[]; - maxRunDurationSeconds: number; -}; - -const { - WORKER_BACKOFF, - WORKER_CAPACITY, - WORKER_LIGHTNING_PUBLIC_KEY, - WORKER_LIGHTNING_SERVICE_URL, - WORKER_LOG_LEVEL, - WORKER_MAX_RUN_DURATION_SECONDS, - WORKER_MAX_RUN_MEMORY_MB, - WORKER_PORT, - WORKER_REPO_DIR, - WORKER_SECRET, - WORKER_STATE_PROPS_TO_REMOVE, -} = process.env; - -const args = yargs(hideBin(process.argv)) - .command('server', 'Start a ws-worker server') - .option('port', { - alias: 'p', - description: 'Port to run the server on. Env: WORKER_PORT', - type: 'number', - default: WORKER_PORT || 2222, - }) - // TODO maybe this is positional and required? - // frees up -l for the log - .option('lightning', { - alias: ['l', 'lightning-service-url'], - description: - 'Base url to Lightning websocket endpoint, eg, ws://localhost:4000/worker. Set to "mock" to use the default mock server. Env: WORKER_LIGHTNING_SERVICE_URL', - default: WORKER_LIGHTNING_SERVICE_URL || 'ws://localhost:4000/worker', - }) - .option('repo-dir', { - alias: 'd', - description: - 'Path to the runtime repo (where modules will be installed). Env: WORKER_REPO_DIR', - default: WORKER_REPO_DIR, - }) - .option('secret', { - alias: 's', - description: - 'Worker secret. (comes from WORKER_SECRET by default). Env: WORKER_SECRET', - default: WORKER_SECRET, - }) - .option('lightning-public-key', { - description: - 'Base64-encoded public key. Used to verify run tokens. Env: WORKER_LIGHTNING_PUBLIC_KEY', - default: WORKER_LIGHTNING_PUBLIC_KEY, - }) - .option('log', { - description: - 'Set the log level for stdout (default to info, set to debug for verbose output). Env: WORKER_LOG_LEVEL', - default: WORKER_LOG_LEVEL || 'debug', - type: 'string', - }) - .option('loop', { - description: 'Disable the claims loop', - default: true, - type: 'boolean', - }) - .option('mock', { - description: 'Use a mock runtime engine', - default: false, - type: 'boolean', - }) - .option('backoff', { - description: - 'Claim backoff rules: min/max (in seconds). Env: WORKER_BACKOFF', - default: WORKER_BACKOFF || '1/10', - }) - .option('capacity', { - description: 'max concurrent workers. Env: WORKER_CAPACITY', - default: WORKER_CAPACITY ? parseInt(WORKER_CAPACITY) : 5, - type: 'number', - }) - .option('state-props-to-remove', { - description: - 'A list of properties to remove from the final state returned by a job. Env: WORKER_STATE_PROPS_TO_REMOVE', - default: WORKER_STATE_PROPS_TO_REMOVE ?? ['configuration', 'response'], - type: 'array', - }) - .option('run-memory', { - description: - 'Maximum memory allocated to a single run, in mb. Env: WORKER_MAX_RUN_MEMORY_MB', - type: 'number', - default: WORKER_MAX_RUN_MEMORY_MB - ? parseInt(WORKER_MAX_RUN_MEMORY_MB) - : 500, - }) - .option('max-run-duration-seconds', { - alias: 't', - description: - 'Default run timeout for the server, in seconds. Env: WORKER_MAX_RUN_DURATION_SECONDS', - type: 'number', - default: WORKER_MAX_RUN_DURATION_SECONDS || 60 * 5, // 5 minutes - }) - .parse() as Args; +const args = cli(process.argv) const logger = createLogger('SRV', { level: args.log }); diff --git a/packages/ws-worker/src/util/cli.ts b/packages/ws-worker/src/util/cli.ts new file mode 100644 index 000000000..824c91d67 --- /dev/null +++ b/packages/ws-worker/src/util/cli.ts @@ -0,0 +1,121 @@ +import yargs from 'yargs'; +import { LogLevel } from '@openfn/logger'; +import { hideBin } from 'yargs/helpers'; + +type Args = { + _: string[]; + port?: number; + lightning?: string; + repoDir?: string; + secret?: string; + loop?: boolean; + log: LogLevel; + lightningPublicKey?: string; + mock: boolean; + backoff: string; + capacity?: number; + runMemory?: number; + statePropsToRemove?: string[]; + maxRunDurationSeconds: number; +}; + +function setArg(argValue?: T, envValue?: string, defaultValue?: T): T | undefined { + if (Array.isArray(defaultValue) && envValue !== undefined && argValue == undefined) { + return (envValue.split(',')) as T; + } + + if (typeof defaultValue === 'number' && envValue !== undefined && argValue == undefined) { + return parseInt(envValue) as T; + } + + return argValue ?? (envValue as T) ?? defaultValue; +} + +export default function parseArgs(argv: string[]): Args { + const { + WORKER_BACKOFF, + WORKER_CAPACITY, + WORKER_LIGHTNING_PUBLIC_KEY, + WORKER_LIGHTNING_SERVICE_URL, + WORKER_LOG_LEVEL, + WORKER_MAX_RUN_DURATION_SECONDS, + WORKER_MAX_RUN_MEMORY_MB, + WORKER_PORT, + WORKER_REPO_DIR, + WORKER_SECRET, + WORKER_STATE_PROPS_TO_REMOVE, + } = process.env; + + const parser = yargs(hideBin(argv)) + .command('server', 'Start a ws-worker server') + .option('port', { + alias: 'p', + description: 'Port to run the server on. Env: WORKER_PORT', + type: 'number', + }) + .option('lightning', { + alias: ['l', 'lightning-service-url'], + description: 'Base url to Lightning websocket endpoint, eg, ws://localhost:4000/worker. Set to "mock" to use the default mock server. Env: WORKER_LIGHTNING_SERVICE_URL', + }) + .option('repo-dir', { + alias: 'd', + description: 'Path to the runtime repo (where modules will be installed). Env: WORKER_REPO_DIR', + }) + .option('secret', { + alias: 's', + description: 'Worker secret. (comes from WORKER_SECRET by default). Env: WORKER_SECRET', + }) + .option('lightning-public-key', { + description: 'Base64-encoded public key. Used to verify run tokens. Env: WORKER_LIGHTNING_PUBLIC_KEY', + }) + .option('log', { + description: 'Set the log level for stdout (default to info, set to debug for verbose output). Env: WORKER_LOG_LEVEL', + }) + .option('loop', { + description: 'Disable the claims loop', + type: 'boolean', + default: true, + }) + .option('mock', { + description: 'Use a mock runtime engine', + type: 'boolean', + default: false, + }) + .option('backoff', { + description: 'Claim backoff rules: min/max (in seconds). Env: WORKER_BACKOFF', + }) + .option('capacity', { + description: 'max concurrent workers. Env: WORKER_CAPACITY', + type: 'number', + }) + .option('state-props-to-remove', { + description: 'A list of properties to remove from the final state returned by a job. Env: WORKER_STATE_PROPS_TO_REMOVE', + type: 'array', + }) + .option('run-memory', { + description: 'Maximum memory allocated to a single run, in mb. Env: WORKER_MAX_RUN_MEMORY_MB', + type: 'number', + }) + .option('max-run-duration-seconds', { + alias: 't', + description: 'Default run timeout for the server, in seconds. Env: WORKER_MAX_RUN_DURATION_SECONDS', + type: 'number', + }); + + const args = parser.parse() as Args; + + return { + ...args, + port: setArg(args.port, WORKER_PORT, 2222), + lightning: setArg(args.lightning, WORKER_LIGHTNING_SERVICE_URL, 'ws://localhost:4000/worker'), + repoDir: setArg(args.repoDir, WORKER_REPO_DIR), + secret: setArg(args.secret, WORKER_SECRET), + lightningPublicKey: setArg(args.lightningPublicKey, WORKER_LIGHTNING_PUBLIC_KEY), + log: setArg(args.log, WORKER_LOG_LEVEL as LogLevel, 'debug'), + backoff: setArg(args.backoff, WORKER_BACKOFF, '1/10'), + capacity: setArg(args.capacity, WORKER_CAPACITY, 5), + statePropsToRemove: setArg(args.statePropsToRemove, WORKER_STATE_PROPS_TO_REMOVE, ['configuration', 'response']), + runMemory: setArg(args.runMemory, WORKER_MAX_RUN_MEMORY_MB, 500), + maxRunDurationSeconds: setArg(args.maxRunDurationSeconds, WORKER_MAX_RUN_DURATION_SECONDS, 300), + } as Args; +} diff --git a/packages/ws-worker/test/util/cli.test.ts b/packages/ws-worker/test/util/cli.test.ts new file mode 100644 index 000000000..a84da93ae --- /dev/null +++ b/packages/ws-worker/test/util/cli.test.ts @@ -0,0 +1,92 @@ +import test from 'ava'; +import cli from '../../src/util/cli'; +import { LogLevel } from '@openfn/logger'; + + +const resetEnv = (env: NodeJS.ProcessEnv) => { + process.env = { ...env }; +}; + +test.beforeEach(t => { + // Store original environment variables + t.context = { ...process.env }; +}); + +test.afterEach(t => { + // Restore original environment variables + resetEnv(t.context as NodeJS.ProcessEnv); +}); + +test('cli should parse command line arguments correctly', t => { + const argv = 'node cli.js --port 3000 --log info --max-run-duration-seconds 120' + const args = cli(argv.split(' ')) + + t.is(args.port, 3000); + t.is(args.log, 'info' as LogLevel); + t.is(args.maxRunDurationSeconds, 120); +}); + +test('cli should use environment variables as defaults', t => { + process.env.WORKER_PORT = '4000'; + process.env.WORKER_LOG_LEVEL = 'error'; + process.env.WORKER_MAX_RUN_DURATION_SECONDS = '180'; + + const argv = 'node cli.js'; + const args = cli(argv.split(' ')); + + t.is(args.port, 4000); + t.is(args.log, 'error' as LogLevel); + t.is(args.maxRunDurationSeconds, 180); +}); + +test('cli should override environment variables with command line arguments', t => { + process.env.WORKER_PORT = '4000'; + process.env.WORKER_LOG_LEVEL = 'error'; + process.env.WORKER_MAX_RUN_DURATION_SECONDS = '180'; + + const argv = 'node cli.js --port 5000 --log debug --max-run-duration-seconds 240'; + const args = cli(argv.split(' ')); + + t.is(args.port, 5000); + t.is(args.log, 'debug' as LogLevel); + t.is(args.maxRunDurationSeconds, 240); +}); + +test('cli should set default values for unspecified options', t => { + const argv = 'node cli.js'; + + const args = cli(argv.split(' ')); + + t.is(args.port, 2222); + t.is(args.lightning, 'ws://localhost:4000/worker'); + t.is(args.log, 'debug' as LogLevel); + t.is(args.backoff, '1/10'); + t.is(args.capacity, 5); + t.deepEqual(args.statePropsToRemove, ['configuration', 'response']); + t.is(args.runMemory, 500); + t.is(args.maxRunDurationSeconds, 300); +}); + +test('cli should handle boolean options correctly', t => { + const argv = 'node cli.js --loop false --mock true'; + const args = cli(argv.split(' ')); + + t.is(args.loop, false); + t.is(args.mock, true); +}); + +test('cli should handle array options correctly', t => { + const argv = 'node cli.js --state-props-to-remove prop1 prop2 prop3'; + process.env.WORKER_STATE_PROPS_TO_REMOVE = 'prop4,prop5,prop6'; + const args = cli(argv.split(' ')); + + t.deepEqual(args.statePropsToRemove, ['prop1', 'prop2', 'prop3']); +}); + +test('cli should handle array options correctly for env variables', t => { + const argv = 'node cli.js'; + process.env.WORKER_STATE_PROPS_TO_REMOVE = 'prop1,prop2,prop3'; + const args = cli(argv.split(' ')); + + t.deepEqual(args.statePropsToRemove, ['prop1', 'prop2', 'prop3']); +});