From e02a8d07ab8747f7f378b2ab0d221a1f619b49d5 Mon Sep 17 00:00:00 2001 From: Fredrik Gustafsson Date: Thu, 1 Aug 2019 11:20:21 +0200 Subject: [PATCH] change parser interface and add real time dynamic config (#15) * change parser interface and add real time config * parser is now expected to be a function that takes a string and returns a dictionary or ValueError * real time alerts can now have dynamic configuration depending on the contents of the message, this can be useful to have different escalation policies depending on severity for instance * rename variable for clarity * bump version to 2.0.0 as this PR is introducing breaking changes for the interfaces we are bumping the major version * add newline to end of requirements.txt --- comet_core/app.py | 104 +++++++++++++++++++++++++--------------------- requirements.txt | 2 +- setup.py | 2 +- tests/test_app.py | 83 +++++++++++++++++++++--------------- 4 files changed, 109 insertions(+), 82 deletions(-) diff --git a/comet_core/app.py b/comet_core/app.py index c549a25..d7dc3fb 100644 --- a/comet_core/app.py +++ b/comet_core/app.py @@ -149,6 +149,7 @@ def __init__(self, database_uri='sqlite://'): self.routers = SourceTypeFunction() self.escalators = SourceTypeFunction() self.real_time_sources = list() + self.real_time_config_providers = dict() self.database_uri = database_uri self.batch_config = { @@ -171,15 +172,15 @@ def message_callback(self, source_type, message): boolean: True if parsing was successful, False otherwise """ LOG.info('received a message', extra={'source_type': source_type}) - parser = self.parsers.get(source_type) - if not parser: + parse = self.parsers.get(source_type) + if not parse: LOG.warning(f'no parser found', extra={'source_type': source_type}) return False - message_schema = parser() - message_dict, message_error = message_schema.loads(message) - if message_error: - LOG.warning(f'invalid message', extra={'source_type': source_type}) + try: + message_dict = parse(message) + except ValueError as err: + LOG.warning(f'invalid message', extra={'source_type': source_type, 'error': str(err)}) return False # Prepare an event container @@ -201,7 +202,7 @@ def message_callback(self, source_type, message): return True def set_config(self, source_type, config): - """Call to override default batching and escalation logic. + """Call to override default batching and batch escalation logic. Args: source_type (str): the source type to override the configuration for @@ -234,27 +235,48 @@ def decorator(clazz): else: self.inputs.append((clazz, kwargs)) - def register_parser(self, source_type, schema=None): - """Register a parser. + def register_parser(self, source_type, func=None): + """Register a parser function. - This method can be used either as a decorator or with a schema passed in. + This method can be used either as a decorator or with a parser function passed in. Args: source_type (str): the source type to register the parser for - schema (Optional[marshmallow.Schema]): a schema able to parse messages for the given source_type or None if - used as decorator + func (Optional[function]): a function that parse a message of type source_type, or None if used as a + decorator Return: function or None: if no scehma is given returns a decorator function, otherwise None """ - if not schema: + if not func: # pylint: disable=missing-docstring, missing-return-doc, missing-return-type-doc - def decorator(schema): - self.parsers[source_type] = schema - return schema + def decorator(func): + self.parsers[source_type] = func + return func return decorator else: - self.parsers[source_type] = schema + self.parsers[source_type] = func + + def register_config_provider(self, source_type, func=None): + """Register, per source type, a function that return config given a real time event. + + This method can be used either as a decorator or with a parser function passed in. + + Args: + source_type (str): the source type to register the config provider for + func (Optional[function]): a function that accepts an event and return a dictionary with configuration + Return: + dict: the config for the given real time event + """ + if not func: + # pylint: disable=missing-docstring, missing-return-doc, missing-return-type-doc + def decorator(func): + self.real_time_config_providers[source_type] = func + return func + + return decorator + else: + self.real_time_config_providers[source_type] = func def register_real_time_source(self, source_type): """Register real time source type @@ -308,7 +330,7 @@ def decorator(func): self.filters[source_type] = func def register_router(self, source_types=None, func=None): - """Register a hydrator. + """Register a router. This method can be used either as a decorator or with a routing function passed in. @@ -329,7 +351,7 @@ def decorator(func): self.routers.add(source_types, func) def register_escalator(self, source_types=None, func=None): - """Register a hydrator. + """Register a escalator. This method can be used either as a decorator or with a escalator function passed in. @@ -453,38 +475,26 @@ def process_unprocessed_events(self): need_escalation_events) def handle_non_addressed_events(self): - """Check if there are real time events sent to the user - and were not addressed by him. - each event has escalate_cadence parameter which let us decide - when is the earliest time to escalate if the user didn't - addressed the alert. + """Check if there are real time events sent to a user that were not addressed. + + Each event has escalate_cadence parameter which is used as the earliest time to escalate if the user did + not address the alert. """ for source_type in self.real_time_sources: - source_type_config = self.batch_config - if source_type in self.specific_configs: - source_type_config.update(self.specific_configs[source_type]) - else: - LOG.error('real time source type must have specific configs') - - non_addressed_events = \ - self.data_store.get_events_did_not_addressed(source_type) - + non_addressed_events = self.data_store.get_events_did_not_addressed(source_type) events_needs_escalation = [] - default_escalate_cadence = timedelta(hours=36) - for event in non_addressed_events: - search_name = event.data.get('search_name') - if search_name is not None: - alert_properties = \ - source_type_config['alerts'].get(search_name, {}) - escalate_cadence = \ - alert_properties.get('escalate_cadence', - default_escalate_cadence) - event_sent_at = event.sent_at - - # when is earliest time to escalate the specific event - if event_sent_at <= datetime.utcnow() - escalate_cadence: - events_needs_escalation.append(event) + # load configuration for event, using batch settings as default + event_config = {} + if source_type in self.real_time_config_providers: + event_config = self.real_time_config_providers[source_type](event) + + escalate_cadence = event_config.get('escalate_cadence', timedelta(hours=36)) + + event_sent_at = event.sent_at + # when is earliest time to escalate the specific event + if event_sent_at <= datetime.utcnow() - escalate_cadence: + events_needs_escalation.append(event) self._handle_events_need_escalation(source_type, events_needs_escalation) diff --git a/requirements.txt b/requirements.txt index 81b34cc..57af2f7 100644 --- a/requirements.txt +++ b/requirements.txt @@ -10,4 +10,4 @@ retrying==1.3.3 six==1.12.0 SQLAlchemy==1.3.0 Werkzeug==0.14.1 -marshmallow==2.17.0 \ No newline at end of file +marshmallow==2.17.0 diff --git a/setup.py b/setup.py index 63dc556..11ebb60 100644 --- a/setup.py +++ b/setup.py @@ -25,7 +25,7 @@ setuptools.setup( name="comet-core", - version="1.0.11", + version="2.0.0", url="https://github.com/spotify/comet-core", author="Spotify Platform Security", diff --git a/tests/test_app.py b/tests/test_app.py index 8c90b6c..dcc582f 100644 --- a/tests/test_app.py +++ b/tests/test_app.py @@ -61,6 +61,8 @@ def test_process_unprocessed_events(): app.register_parser('datastoretest2', json) app.register_parser('datastoretest3', json) + app.set_config('datastoretest2', {}) + specific_router = mock.Mock() router = mock.Mock() escalator = mock.Mock() @@ -125,12 +127,11 @@ def test_event_container(): def test_message_callback(app): @app.register_parser('test') - class TestParser: - def loads(self, msg): - ev = json.loads(msg) - if 'a' in ev: - return ev, None - return None, 'fail' + def parse_message(message): + ev = json.loads(message) + if 'a' in ev: + return ev, None + raise ValueError('fail') hydrator_mock = mock.Mock() app.register_hydrator('test', hydrator_mock) @@ -150,12 +151,9 @@ def loads(self, msg): def test_message_callback_filter(app): @app.register_parser('test') - class TestParser: - def loads(self, msg): - ev = json.loads(msg) - if 'a' in ev: - return ev, None - return None, 'fail' + def parse_message(message): + ev = json.loads(message) + return ev, None filter_mock = mock.Mock(return_value=None) app.register_filter('test', filter_mock) @@ -181,16 +179,32 @@ def test_register_parser(app): assert not app.parsers @app.register_parser('test1') - class TestParser: + def parse_message(message): pass # Override existing - app.register_parser('test1', TestParser) + app.register_parser('test1', parse_message) assert len(app.parsers) == 1 - app.register_parser('test2', TestParser) + app.register_parser('test2', parse_message) assert len(app.parsers) == 2 +def test_register_config_provider(app): + assert not app.real_time_config_providers + + @app.register_config_provider('test1') + def test_register_conf(event): + return {} + + # Override existing + app.register_config_provider('test1', test_register_conf) + assert len(app.real_time_config_providers) == 1, app.real_time_config_providers + + # Add another + app.register_config_provider('test2', test_register_conf) + assert len(app.real_time_config_providers) == 2, app.real_time_config_providers + + def test_register_hydrator(app): assert not app.hydrators @@ -272,7 +286,7 @@ def test_escalator(*args): def test_validate_config(app): @app.register_parser('test1') - class TestParser: + def parse_message(message): pass assert app.parsers @@ -281,7 +295,7 @@ class TestParser: app = Comet() - app.register_parser('test1', TestParser) + app.register_parser('test1', parse_message) @app.register_router('test1') def test_router(*args): @@ -389,25 +403,28 @@ def test_process_unprocessed_real_time_events(): def test_handle_non_addressed_events(): app = Comet() - app.register_parser('real_time_source', json) - app.register_parser('real_time_source2', json) + + @app.register_parser('real_time_source') + def parse_message(message): + data = json.loads(message) + return data, {} + + @app.register_parser('real_time_source2') + def parse_message(message): + data = json.loads(message) + return data, {} + + @app.register_config_provider('real_time_source') + def register_conf(event): + return {"escalate_cadence": timedelta(minutes=45)} + + @app.register_config_provider('real_time_source2') + def register_conf(event): + return {"escalate_cadence": timedelta(minutes=45)} + app.register_real_time_source('real_time_source') app.register_real_time_source('real_time_source2') - app.set_config('real_time_source', {'alerts': { - 'alert search name': - { - 'escalate_cadence': timedelta(minutes=45), - 'template': 'alerts_template' - } - }}) - app.set_config('real_time_source2', {'alerts': { - 'alert search name': - { - 'escalate_cadence': timedelta(minutes=45), - 'template': 'alerts_template' - } - }}) escalator = mock.Mock() escalator2 = mock.Mock() app.register_escalator('real_time_source', func=escalator)