Skip to content
This repository has been archived by the owner on Aug 23, 2022. It is now read-only.

Commit

Permalink
change parser interface and add real time dynamic config (#15)
Browse files Browse the repository at this point in the history
* change parser interface and add real time config

* parser is now expected to be a function that takes a string and
  returns a dictionary or ValueError
* real time alerts can now have dynamic configuration depending on
  the contents of the message, this can be useful to have different
  escalation policies depending on severity for instance

* rename variable for clarity

* bump version to 2.0.0

as this PR is introducing breaking changes for the interfaces we
are bumping the major version

* add newline to end of requirements.txt
  • Loading branch information
Fredrik Gustafsson authored Aug 1, 2019
1 parent 7c9c602 commit e02a8d0
Show file tree
Hide file tree
Showing 4 changed files with 109 additions and 82 deletions.
104 changes: 57 additions & 47 deletions comet_core/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ def __init__(self, database_uri='sqlite://'):
self.routers = SourceTypeFunction()
self.escalators = SourceTypeFunction()
self.real_time_sources = list()
self.real_time_config_providers = dict()

self.database_uri = database_uri
self.batch_config = {
Expand All @@ -171,15 +172,15 @@ def message_callback(self, source_type, message):
boolean: True if parsing was successful, False otherwise
"""
LOG.info('received a message', extra={'source_type': source_type})
parser = self.parsers.get(source_type)
if not parser:
parse = self.parsers.get(source_type)
if not parse:
LOG.warning(f'no parser found', extra={'source_type': source_type})
return False

message_schema = parser()
message_dict, message_error = message_schema.loads(message)
if message_error:
LOG.warning(f'invalid message', extra={'source_type': source_type})
try:
message_dict = parse(message)
except ValueError as err:
LOG.warning(f'invalid message', extra={'source_type': source_type, 'error': str(err)})
return False

# Prepare an event container
Expand All @@ -201,7 +202,7 @@ def message_callback(self, source_type, message):
return True

def set_config(self, source_type, config):
"""Call to override default batching and escalation logic.
"""Call to override default batching and batch escalation logic.
Args:
source_type (str): the source type to override the configuration for
Expand Down Expand Up @@ -234,27 +235,48 @@ def decorator(clazz):
else:
self.inputs.append((clazz, kwargs))

def register_parser(self, source_type, schema=None):
"""Register a parser.
def register_parser(self, source_type, func=None):
"""Register a parser function.
This method can be used either as a decorator or with a schema passed in.
This method can be used either as a decorator or with a parser function passed in.
Args:
source_type (str): the source type to register the parser for
schema (Optional[marshmallow.Schema]): a schema able to parse messages for the given source_type or None if
used as decorator
func (Optional[function]): a function that parse a message of type source_type, or None if used as a
decorator
Return:
function or None: if no scehma is given returns a decorator function, otherwise None
"""
if not schema:
if not func:
# pylint: disable=missing-docstring, missing-return-doc, missing-return-type-doc
def decorator(schema):
self.parsers[source_type] = schema
return schema
def decorator(func):
self.parsers[source_type] = func
return func

return decorator
else:
self.parsers[source_type] = schema
self.parsers[source_type] = func

def register_config_provider(self, source_type, func=None):
"""Register, per source type, a function that return config given a real time event.
This method can be used either as a decorator or with a parser function passed in.
Args:
source_type (str): the source type to register the config provider for
func (Optional[function]): a function that accepts an event and return a dictionary with configuration
Return:
dict: the config for the given real time event
"""
if not func:
# pylint: disable=missing-docstring, missing-return-doc, missing-return-type-doc
def decorator(func):
self.real_time_config_providers[source_type] = func
return func

return decorator
else:
self.real_time_config_providers[source_type] = func

def register_real_time_source(self, source_type):
"""Register real time source type
Expand Down Expand Up @@ -308,7 +330,7 @@ def decorator(func):
self.filters[source_type] = func

def register_router(self, source_types=None, func=None):
"""Register a hydrator.
"""Register a router.
This method can be used either as a decorator or with a routing function passed in.
Expand All @@ -329,7 +351,7 @@ def decorator(func):
self.routers.add(source_types, func)

def register_escalator(self, source_types=None, func=None):
"""Register a hydrator.
"""Register a escalator.
This method can be used either as a decorator or with a escalator function passed in.
Expand Down Expand Up @@ -453,38 +475,26 @@ def process_unprocessed_events(self):
need_escalation_events)

def handle_non_addressed_events(self):
"""Check if there are real time events sent to the user
and were not addressed by him.
each event has escalate_cadence parameter which let us decide
when is the earliest time to escalate if the user didn't
addressed the alert.
"""Check if there are real time events sent to a user that were not addressed.
Each event has escalate_cadence parameter which is used as the earliest time to escalate if the user did
not address the alert.
"""
for source_type in self.real_time_sources:
source_type_config = self.batch_config
if source_type in self.specific_configs:
source_type_config.update(self.specific_configs[source_type])
else:
LOG.error('real time source type must have specific configs')

non_addressed_events = \
self.data_store.get_events_did_not_addressed(source_type)

non_addressed_events = self.data_store.get_events_did_not_addressed(source_type)
events_needs_escalation = []
default_escalate_cadence = timedelta(hours=36)

for event in non_addressed_events:
search_name = event.data.get('search_name')
if search_name is not None:
alert_properties = \
source_type_config['alerts'].get(search_name, {})
escalate_cadence = \
alert_properties.get('escalate_cadence',
default_escalate_cadence)
event_sent_at = event.sent_at

# when is earliest time to escalate the specific event
if event_sent_at <= datetime.utcnow() - escalate_cadence:
events_needs_escalation.append(event)
# load configuration for event, using batch settings as default
event_config = {}
if source_type in self.real_time_config_providers:
event_config = self.real_time_config_providers[source_type](event)

escalate_cadence = event_config.get('escalate_cadence', timedelta(hours=36))

event_sent_at = event.sent_at
# when is earliest time to escalate the specific event
if event_sent_at <= datetime.utcnow() - escalate_cadence:
events_needs_escalation.append(event)

self._handle_events_need_escalation(source_type,
events_needs_escalation)
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,4 @@ retrying==1.3.3
six==1.12.0
SQLAlchemy==1.3.0
Werkzeug==0.14.1
marshmallow==2.17.0
marshmallow==2.17.0
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

setuptools.setup(
name="comet-core",
version="1.0.11",
version="2.0.0",
url="https://github.com/spotify/comet-core",

author="Spotify Platform Security",
Expand Down
83 changes: 50 additions & 33 deletions tests/test_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ def test_process_unprocessed_events():
app.register_parser('datastoretest2', json)
app.register_parser('datastoretest3', json)

app.set_config('datastoretest2', {})

specific_router = mock.Mock()
router = mock.Mock()
escalator = mock.Mock()
Expand Down Expand Up @@ -125,12 +127,11 @@ def test_event_container():

def test_message_callback(app):
@app.register_parser('test')
class TestParser:
def loads(self, msg):
ev = json.loads(msg)
if 'a' in ev:
return ev, None
return None, 'fail'
def parse_message(message):
ev = json.loads(message)
if 'a' in ev:
return ev, None
raise ValueError('fail')

hydrator_mock = mock.Mock()
app.register_hydrator('test', hydrator_mock)
Expand All @@ -150,12 +151,9 @@ def loads(self, msg):

def test_message_callback_filter(app):
@app.register_parser('test')
class TestParser:
def loads(self, msg):
ev = json.loads(msg)
if 'a' in ev:
return ev, None
return None, 'fail'
def parse_message(message):
ev = json.loads(message)
return ev, None

filter_mock = mock.Mock(return_value=None)
app.register_filter('test', filter_mock)
Expand All @@ -181,16 +179,32 @@ def test_register_parser(app):
assert not app.parsers

@app.register_parser('test1')
class TestParser:
def parse_message(message):
pass

# Override existing
app.register_parser('test1', TestParser)
app.register_parser('test1', parse_message)
assert len(app.parsers) == 1
app.register_parser('test2', TestParser)
app.register_parser('test2', parse_message)
assert len(app.parsers) == 2


def test_register_config_provider(app):
assert not app.real_time_config_providers

@app.register_config_provider('test1')
def test_register_conf(event):
return {}

# Override existing
app.register_config_provider('test1', test_register_conf)
assert len(app.real_time_config_providers) == 1, app.real_time_config_providers

# Add another
app.register_config_provider('test2', test_register_conf)
assert len(app.real_time_config_providers) == 2, app.real_time_config_providers


def test_register_hydrator(app):
assert not app.hydrators

Expand Down Expand Up @@ -272,7 +286,7 @@ def test_escalator(*args):

def test_validate_config(app):
@app.register_parser('test1')
class TestParser:
def parse_message(message):
pass

assert app.parsers
Expand All @@ -281,7 +295,7 @@ class TestParser:

app = Comet()

app.register_parser('test1', TestParser)
app.register_parser('test1', parse_message)

@app.register_router('test1')
def test_router(*args):
Expand Down Expand Up @@ -389,25 +403,28 @@ def test_process_unprocessed_real_time_events():

def test_handle_non_addressed_events():
app = Comet()
app.register_parser('real_time_source', json)
app.register_parser('real_time_source2', json)

@app.register_parser('real_time_source')
def parse_message(message):
data = json.loads(message)
return data, {}

@app.register_parser('real_time_source2')
def parse_message(message):
data = json.loads(message)
return data, {}

@app.register_config_provider('real_time_source')
def register_conf(event):
return {"escalate_cadence": timedelta(minutes=45)}

@app.register_config_provider('real_time_source2')
def register_conf(event):
return {"escalate_cadence": timedelta(minutes=45)}

app.register_real_time_source('real_time_source')
app.register_real_time_source('real_time_source2')

app.set_config('real_time_source', {'alerts': {
'alert search name':
{
'escalate_cadence': timedelta(minutes=45),
'template': 'alerts_template'
}
}})
app.set_config('real_time_source2', {'alerts': {
'alert search name':
{
'escalate_cadence': timedelta(minutes=45),
'template': 'alerts_template'
}
}})
escalator = mock.Mock()
escalator2 = mock.Mock()
app.register_escalator('real_time_source', func=escalator)
Expand Down

0 comments on commit e02a8d0

Please sign in to comment.