From c24e71d9b0e0be43aa8261cc860e1eb6b7f93cd4 Mon Sep 17 00:00:00 2001 From: Scott DeWitt Date: Wed, 28 Feb 2024 17:01:31 -0500 Subject: [PATCH] feat: specify config files on command line and refactor --- src/__main__.py | 293 ++++++++++++++++++---------- src/newrelic_logging/cache.py | 4 +- src/newrelic_logging/config.py | 38 ++-- src/newrelic_logging/integration.py | 28 ++- 4 files changed, 240 insertions(+), 123 deletions(-) diff --git a/src/__main__.py b/src/__main__.py index 4a9df56..42e0fbd 100644 --- a/src/__main__.py +++ b/src/__main__.py @@ -1,117 +1,103 @@ #!/usr/bin/env python -import getopt +import optparse import os import sys +from typing import Any + from apscheduler.executors.pool import ThreadPoolExecutor from apscheduler.jobstores.memory import MemoryJobStore from apscheduler.schedulers.background import BlockingScheduler from pytz import utc from yaml import Loader, load -from newrelic_logging.config import getenv +from newrelic_logging.config import Config, getenv from newrelic_logging.integration import Integration from newrelic_logging.telemetry import print_info, print_warn -config_dir = None -argv = sys.argv[1:] -print_info(f'Integration start. Using program arguments {argv}') -try: - opts, args = getopt.getopt(argv, 'c:', ['config_dir=']) - for opt, arg in opts: - if opt in ('-c', '--config_dir'): - config_dir = arg - -except getopt.GetoptError as e: - sys.exit(f'error parsing command line options: {e}') - -if config_dir is None: - config_dir = os.environ.get('CONFIG_DIR') - if config_dir is None: - config_dir = os.getcwd() - -config_file = f'{config_dir}/config.yml' - -if not os.path.exists(config_file): - sys.exit(f'config file {config_file} not found') -event_mapping_file = f'{config_dir}/event_type_fields.yml' -numeric_fields_file = f'{config_dir}/numeric_fields.yml' - -def main(): - config = load_config(config_file) - - if not os.path.exists(event_mapping_file): - print_info(f'event_mapping_file {event_mapping_file} not found, so event mapping will not be used') - event_mapping = {} - else: - with open(event_mapping_file) as stream: - event_mapping = load(stream, Loader=Loader)['mapping'] - - if not os.path.exists(numeric_fields_file): - print_info(f'numeric_fields_file {numeric_fields_file} not found') - numeric_fields_mapping = {"Common", - ['EXEC_TIME', 'RUN_TIME', 'NUMBER_OF_INTERVIEWS', 'NUMBER_COLUMNS', 'NUM_SESSIONS', - 'CPU_TIME', 'EPT', 'DB_CPU_TIME', 'VIEW_STATE_SIZE', 'ROWS_PROCESSED', - 'RESPONSE_SIZE', 'PAGE_START_TIME', 'NUMBER_EXCEPTION_FILTERS', - 'BROWSER_DEVICE_TYPE', 'NUMBER_FIELDS', 'CALLOUT_TIME', 'DURATION', - 'STATUS_CODE', 'DB_BLOCKS', 'NUMBER_OF_RECORDS', 'TOTAL_TIME', 'RECORDS_FAILED', - 'ROW_COUNT', 'AVERAGE_ROW_SIZE', 'DB_TOTAL_TIME', - 'READ_TIME', 'REQUEST_SIZE', 'EFFECTIVE_PAGE_TIME', 'RESULT_SIZE_MB', - 'RECORDS_PROCESSED', 'NUM_CLICKS', 'NUMBER_BUCKETS', 'TOTAL_EXECUTION_TIME', - 'NUMBER_SOQL_QUERIES', 'FLOW_LOAD_TIME', 'REOPEN_COUNT', 'NUMBER_OF_ERRORS', - 'LIMIT_USAGE_PERCENT']} - else: - with open(numeric_fields_file) as stream: - numeric_fields_mapping = load(stream, Loader=Loader)['mapping'] - - numeric_fields_list = set() - for event_num_fields in numeric_fields_mapping.values(): - for num_field in event_num_fields: - numeric_fields_list.add(num_field) - Integration.numeric_fields_list = numeric_fields_list - - run_as_service = config.get('run_as_service', False) - - if not run_as_service: - if 'cron_interval_minutes' in config: - cron_interval = config['cron_interval_minutes'] - else: - cron_interval = int(getenv("CRON_INTERVAL_MINUTES", 60)) - - integration = Integration(config, event_mapping, cron_interval) - integration.run() - else: - service_schedule = config.get('service_schedule') - service_hour = service_schedule['hour'] - service_minute = service_schedule['minute'] - integration = Integration(config, event_mapping, 0) - jobstores = { - 'default': MemoryJobStore(), - } - executors = { - 'default': ThreadPoolExecutor(20), - } - job_defaults = { - 'coalesce': False, - 'max_instances': 3 - } - scheduler = BlockingScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc) - scheduler.add_job(integration.run, trigger='cron', hour=service_hour, minute=service_minute, second='0') - - print_info('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C')) - scheduler.start() - -def load_config(config_file: str): - with open(config_file) as stream: +CONFIG_DIR = 'CONFIG_DIR' +DEFAULT_CONFIG_FILE = 'config.yml' +DEFAULT_EVENT_TYPE_FIELDS_MAPPING_FILE = 'event_type_fields.yml' +DEFAULT_NUMERIC_FIELDS_MAPPING_FILE = 'numeric_fields.yml' +QUERIES = 'queries' +MAPPING = 'mapping' +SERVICE_SCHEDULE = 'service_schedule' +CRON_INTERVAL_MINUTES = 'cron_interval_minutes' +RUN_AS_SERVICE = 'run_as_service' +DEFAULT_NUMERIC_FIELDS_MAPPING = { + "Common": + [ + 'EXEC_TIME', 'RUN_TIME', 'NUMBER_OF_INTERVIEWS', + 'NUMBER_COLUMNS', 'NUM_SESSIONS', 'CPU_TIME', 'EPT', + 'DB_CPU_TIME', 'VIEW_STATE_SIZE', 'ROWS_PROCESSED', + 'RESPONSE_SIZE', 'PAGE_START_TIME', 'NUMBER_EXCEPTION_FILTERS', + 'BROWSER_DEVICE_TYPE', 'NUMBER_FIELDS', 'CALLOUT_TIME', + 'DURATION', 'STATUS_CODE', 'DB_BLOCKS', 'NUMBER_OF_RECORDS', + 'TOTAL_TIME', 'RECORDS_FAILED','ROW_COUNT', 'AVERAGE_ROW_SIZE', + 'DB_TOTAL_TIME', 'READ_TIME', 'REQUEST_SIZE', + 'EFFECTIVE_PAGE_TIME', 'RESULT_SIZE_MB', 'RECORDS_PROCESSED', + 'NUM_CLICKS', 'NUMBER_BUCKETS', 'TOTAL_EXECUTION_TIME', + 'NUMBER_SOQL_QUERIES', 'FLOW_LOAD_TIME', 'REOPEN_COUNT', + 'NUMBER_OF_ERRORS', 'LIMIT_USAGE_PERCENT', + ] +} + + +def parse_args() -> optparse.Values: + # Create the parser object + parser = optparse.OptionParser() + + # Populate options + parser.add_option( + '-c', + '--config_dir', + default=None, + help='directory containing configuration files', + ) + + parser.add_option( + '-f', + '--config_file', + default=DEFAULT_CONFIG_FILE, + help='name of configuration file', + ) + + parser.add_option( + '-e', + '--event_type_fields_mapping', + default=DEFAULT_EVENT_TYPE_FIELDS_MAPPING_FILE, + help='name of event type fields mapping file', + ) + + parser.add_option( + '-n', + '--num_fields_mapping', + default=DEFAULT_NUMERIC_FIELDS_MAPPING_FILE, + help='name of numeric fields mapping file', + ) + + # Parse arguments + (values, _) = parser.parse_args() + + return values + + +def load_config(config_path: str) -> Config: + if not os.path.exists(config_path): + sys.exit(f'config file {config_path} not found') + + with open(config_path) as stream: config = load(stream, Loader=Loader) + new_queries = [] - if 'queries' in config: - for query in config['queries']: + if QUERIES in config: + for query in config[QUERIES]: if type(query) is str: with open(query) as stream: sub_query_config = load(stream, Loader=Loader) - if 'queries' in sub_query_config and type(sub_query_config['queries']) is list: - new_queries = new_queries + sub_query_config['queries'] + if QUERIES in sub_query_config \ + and type(sub_query_config[QUERIES]) is list: + new_queries = new_queries + sub_query_config[QUERIES] else: print_warn("Malformed subconfig file. Ignoring") elif type(query) is dict: @@ -119,9 +105,118 @@ def load_config(config_file: str): else: print_warn("Malformed 'queries' member in config, expected either dictionaries or strings in the array. Ignoring.") pass - config['queries'] = new_queries - return config + config[QUERIES] = new_queries + + return Config(config) + + +def load_mapping_file(mapping_file_path: str, default_mapping: Any) -> dict: + if not os.path.exists(mapping_file_path): + print_info(f'mapping file {mapping_file_path} not found, using default mapping') + return default_mapping + + with open(mapping_file_path) as stream: + return load(stream, Loader=Loader)[MAPPING] + + +def run_once( + config: Config, + event_type_fields_mapping: dict, + numeric_fields_list: set +): + Integration( + config, + event_type_fields_mapping, + numeric_fields_list, + config.get_int(CRON_INTERVAL_MINUTES, 60), + ).run() + + +def run_as_service( + config: Config, + event_type_fields_mapping: dict, + numeric_fields_list: set, +): + scheduler = BlockingScheduler( + jobstores={ 'default': MemoryJobStore() }, + executors={ 'default': ThreadPoolExecutor(20) }, + job_defaults={ + 'coalesce': False, + 'max_instances': 3 + }, + timezone=utc + ) + + if not SERVICE_SCHEDULE in config: + raise Exception('"run_as_service" configured but no "service_schedule" property found') + + service_schedule = config[SERVICE_SCHEDULE] + scheduler.add_job( + Integration( + config, + event_type_fields_mapping, + numeric_fields_list, + 0 + ).run, + trigger='cron', + hour=service_schedule['hour'], + minute=service_schedule['minute'], + second='0', + ) + + print_info('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C')) + scheduler.start() + + +def run( + config: Config, + event_type_fields_mapping: dict, + numeric_fields_list: set +): + if not config.get(RUN_AS_SERVICE, False): + run_once(config, event_type_fields_mapping, numeric_fields_list) + return + + run_as_service(config, event_type_fields_mapping, numeric_fields_list) + + +def main(): + print_info(f'Integration start. Using program arguments {sys.argv[1:]}') + + # Parse command line arguments + options = parse_args() + + # Initialize vars from options + config_dir = options.config_dir + if config_dir == None: + config_dir = getenv(CONFIG_DIR, os.getcwd()) + + # Load config + config = load_config(f'{config_dir}/{options.config_file}') + + # Initialize event mappings + event_type_fields_mapping = load_mapping_file( + f'{config_dir}/{options.event_type_fields_mapping}', + {}, + ) + + # Initialize numeric field mapping + numeric_fields_mapping = load_mapping_file( + f'{config_dir}/{options.num_fields_mapping}', + DEFAULT_NUMERIC_FIELDS_MAPPING, + ) + + # Build the numeric fields list + numeric_fields_list = set() + for event_num_fields in numeric_fields_mapping.values(): + for num_field in event_num_fields: + numeric_fields_list.add(num_field) + + # Run the application or startup the service + run(config, event_type_fields_mapping, numeric_fields_list) + + print_info("Integration end.") + if __name__ == "__main__": main() - print_info("Integration end.") diff --git a/src/newrelic_logging/cache.py b/src/newrelic_logging/cache.py index 6c5a0f9..536f6fc 100644 --- a/src/newrelic_logging/cache.py +++ b/src/newrelic_logging/cache.py @@ -141,7 +141,7 @@ def make_cache(config: Config): password_display = "XXXXXX" if password != None else None print_info( - f'cache enabled, connecting to redis instance {host}:{port}:{db}, ssl={ssl}, password={password_display}' + f'Cache enabled, connecting to redis instance {host}:{port}:{db}, ssl={ssl}, password={password_display}' ) return DataCache(redis.Redis( @@ -152,6 +152,6 @@ def make_cache(config: Config): ssl=ssl ), expire_days) - print_info('cache disabled') + print_info('Cache disabled') return None diff --git a/src/newrelic_logging/config.py b/src/newrelic_logging/config.py index 28b9041..81a4b42 100644 --- a/src/newrelic_logging/config.py +++ b/src/newrelic_logging/config.py @@ -15,13 +15,17 @@ def _get_nested_helper(val: Any, arr: list[str] = [], index: int = 0) -> Any: key = arr[index] if index == len(arr) - 1: return val[key] if key in val else NOT_FOUND - return _get_nested_helper(val[key], arr, index + 1) if key in val else NOT_FOUND + return _get_nested_helper(val[key], arr, index + 1) \ + if key in val else NOT_FOUND elif type(val) is list: key = arr[index] - if type(key) is int and key >= 0: + if key.isdigit(): + arr_index = int(key) + if arr_index < 0 or arr_index >= len(val): + return NOT_FOUND if index == len(arr) - 1: - return val[key] if key < len(val) else NOT_FOUND - return _get_nested_helper(val[key], arr, index + 1) if key < len(val) else NOT_FOUND + return val[arr_index] + return _get_nested_helper(val[arr_index], arr, index + 1) return NOT_FOUND @@ -48,7 +52,7 @@ def tobool(s): class Config: - def __init__(self, config: dict, prefix: str): + def __init__(self, config: dict, prefix: str = ''): self.config = config self.prefix = prefix @@ -70,17 +74,15 @@ def set_prefix(self, prefix: str) -> None: def getenv(self, env_var_name: str, default = None) -> str: return getenv(env_var_name, default, self.prefix) - def get(self, key: str, default = None) -> Any: + def get(self, key: str, default = None, allow_none = False) -> Any: val = get_nested(self.config, key) - if not val == NOT_FOUND: + if not val == NOT_FOUND and not allow_none and not val == None: return val - env_var_name = re.sub(r'[^a-zA-Z0-9_]', '_', key.upper()) - val = self.getenv(env_var_name, default) - if not val is False and not val is None: - return val - - return default + return self.getenv( + re.sub(r'[^a-zA-Z0-9_]', '_', key.upper()), + default, + ) def get_int(self, key: str, default = None) -> int: val = self.get(key, default) @@ -88,3 +90,13 @@ def get_int(self, key: str, default = None) -> int: def get_bool(self, key: str, default = None) -> bool: return tobool(self.get(key, default)) + + def sub(self, key: str, default: dict = {}, prefix: str = None): + val = get_nested(self.config, key) + if val == None or val == NOT_FOUND: + return Config(default, self.prefix if prefix == None else prefix ) + + if not type(val) is dict: + raise Exception(f'can not create sub config for property {key} of type {type(val)} because it is not a dictionary') + + return Config(val, self.prefix if prefix == None else prefix) diff --git a/src/newrelic_logging/integration.py b/src/newrelic_logging/integration.py index f79bb93..ba50de3 100644 --- a/src/newrelic_logging/integration.py +++ b/src/newrelic_logging/integration.py @@ -17,22 +17,32 @@ class DataFormat(Enum): LOGS = 1 EVENTS = 2 -#TODO: move queries to the instance level, so we can have different queries for each instance. -#TODO: also keep general queries that apply to all instances. +# TODO: move queries to the instance level, so we can have different queries for +# each instance. +# TODO: also keep general queries that apply to all instances. class Integration: numeric_fields_list = set() - def __init__(self, config, event_type_fields_mapping, initial_delay): + def __init__( + self, + config: Config, + event_type_fields_mapping: dict = {}, + numeric_fields_list: set = set(), + initial_delay: int = 0, + ): + Integration.numeric_fields_list = numeric_fields_list self.instances = [] Telemetry(config["integration_name"]) - for instance in config['instances']: + for count, instance in enumerate(config['instances']): instance_name = instance['name'] - arguments = instance['arguments'] if 'arguments' in instance else {} labels = instance['labels'] labels['nr-labs'] = 'data' - prefix = arguments['auth_env_prefix'] if 'auth_env_prefix' in arguments else '' - instance_config = Config(arguments, prefix) + instance_config = config.sub(f'instances.{count}.arguments') + instance_config.set_prefix( + instance_config['auth_env_prefix'] \ + if 'auth_env_prefix' in instance_config else '' + ) auth_env = AuthEnv(instance_config) if 'queries' in config: @@ -40,8 +50,8 @@ def __init__(self, config, event_type_fields_mapping, initial_delay): else: client = SalesForce(auth_env, instance_name, instance_config, event_type_fields_mapping, initial_delay) - if 'auth' in arguments: - auth = arguments['auth'] + if 'auth' in instance_config: + auth = instance_config['auth'] if 'grant_type' in auth: oauth_type = auth['grant_type'] else: