diff --git a/comet_core/app.py b/comet_core/app.py index 54886bf..531d11b 100644 --- a/comet_core/app.py +++ b/comet_core/app.py @@ -155,12 +155,25 @@ def __init__(self, database_uri="sqlite://"): self.database_uri = database_uri self.batch_config = { - "wait_for_more": timedelta(seconds=3), + "communication_digest_mode": True, + # By default (communication_digest_mode=True), all batch events will be grouped by an owner and source_type. + # And email will look like: + # here are your X new issues, and by the way, you have these Y old ones. + # In case of the non-digest mode, + # the router will receive only these events that are new or need a reminder. + "escalation_reminder_cadence": timedelta(days=7), + # `escalation_reminder_cadence` defines how often to send escalation reminders + "escalation_time": timedelta(seconds=10), + # `escalation_time` defines how soon event should be escalated (it takes ignore_fingerprints into account) "max_wait": timedelta(seconds=4), + # `max_wait` defines the amount of time to wait since the earliest event in an attempt to catch whole batch "new_threshold": timedelta(days=7), + # `new_threshold` defines amount of time to wait since the latest report of the given fingerprint to assume + # it as a regression of the detected issue "owner_reminder_cadence": timedelta(days=7), - "escalation_time": timedelta(seconds=10), - "escalation_reminder_cadence": timedelta(days=7), + # `owner_reminder_cadence` defines how often to send reminders + "wait_for_more": timedelta(seconds=3), + # `wait_for_more` defines the amount of time to wait since the latest event } self.specific_configs = {} @@ -377,7 +390,7 @@ def decorator(func): self.escalators.add(source_types, func) - # pylint: disable=too-many-branches + # pylint: disable=too-many-branches, too-many-locals, too-many-nested-blocks, too-many-statements def process_unprocessed_events(self): """Checks the database for unprocessed events and processes them. @@ -387,20 +400,20 @@ def process_unprocessed_events(self): same escalation recipient recently. All ignored events will be skipped for the above, but marked as processed. Config options we care about: - source_type_config['owner_reminder_cadence']: - source_type_config['notifications_send_emails'] + source_type_config['communication_digest_mode'], + source_type_config['escalation_reminder_cadence'], source_type_config['escalation_time'], - source_type_config['escalation_reminder_cadence'] - source_type_config['recipient_override'] - source_type_config['email_subject']: - source_type_config['wait_for_more']: - source_type_config['max_wait']: + source_type_config['max_wait'], + source_type_config['new_threshold'], + source_type_config['owner_reminder_cadence'], + source_type_config['wait_for_more'] """ + LOG.debug("Processing unprocessed events") # pylint: disable=consider-iterating-dictionary for source_type in self.parsers.keys(): - source_type_config = self.batch_config + source_type_config = self.batch_config.copy() if source_type in self.specific_configs: source_type_config.update(self.specific_configs[source_type]) @@ -451,14 +464,34 @@ def process_unprocessed_events(self): for owner, events in events_by_owner.items(): owner_reminder_cadence = source_type_config["owner_reminder_cadence"] - if any([event.new for event in events]) or self.data_store.check_any_issue_needs_reminder( - owner_reminder_cadence, events - ): + events_to_remind = [] + if source_type_config["communication_digest_mode"]: + if any([event.new for event in events]) or self.data_store.check_any_issue_needs_reminder( + owner_reminder_cadence, events + ): + events_to_remind = events + else: + fingerprints_to_remind = self.data_store.get_any_issues_need_reminder( + owner_reminder_cadence, events + ) + if fingerprints_to_remind: + for e in events: + if e.fingerprint in fingerprints_to_remind: + e.reminder = True + events_to_remind.append(e) + + for e in events: + if e.new and not e.fingerprint in fingerprints_to_remind: + events_to_remind.append(e) + + if events_to_remind: try: - self._route_events(owner, events, source_type) - self.data_store.update_processed_at_timestamp_to_now(events) + self._route_events(owner, events_to_remind, source_type) + self.data_store.update_processed_at_timestamp_to_now(events_to_remind) except CometCouldNotSendException: - LOG.error(f"Could not send alert to {owner}: {events}") + LOG.error(f"Could not send alert to {owner}: {events_to_remind}") + + self.data_store.update_processed_at_timestamp_to_now([e for e in events if e not in events_to_remind]) LOG.info("events-processed", extra={"events": len(events), "source-type": source_type, "owner": owner}) diff --git a/comet_core/data_store.py b/comet_core/data_store.py index 9956a68..258a3d5 100644 --- a/comet_core/data_store.py +++ b/comet_core/data_store.py @@ -43,7 +43,7 @@ def remove_duplicate_events(event_record_list): return list(events_hash_table.values()) -class DataStore: +class DataStore: # pylint: disable=too-many-public-methods """Abstraction of the comet storage layer. Args: @@ -156,6 +156,32 @@ def check_any_issue_needs_reminder(self, search_timedelta, records): return max(timestamps)[0] <= datetime.utcnow() - search_timedelta return False + def get_any_issues_need_reminder(self, search_timedelta, records): + """Returns all the `fingerprints` having corresponding `event` table entries with the latest `sent_at` + more then search_timedelta ago. + NOTE: if all database records for a fingerprint given in the `records` list have the sent_at values set to Null, + then this fingerprint will be treated as NOT needing a reminder, which might be unintuitive. + Args: + search_timedelta (datetime.timedelta): reminder interval + records (list): list of EventRecord objects to check + Returns: + list: list of fingerprints that represent issues that need to be reminded about + """ + fingerprints = [record.fingerprint for record in records] + fingerprints_to_remind = ( + self.session.query(func.max(EventRecord.sent_at).label("sent_at"), EventRecord.fingerprint) + .filter(EventRecord.fingerprint.in_(fingerprints) & EventRecord.sent_at.isnot(None)) + .group_by(EventRecord.fingerprint) + .all() + ) + result = [] + deltat = datetime.utcnow() - search_timedelta + for f in fingerprints_to_remind: + if f.sent_at <= deltat: + result.append(f.fingerprint) + + return result + def update_timestamp_column_to_now(self, records, column_name): """Update the `column_name` of the provided `EventRecord`s to datetime now diff --git a/comet_core/fingerprint.py b/comet_core/fingerprint.py index 3b51599..ca76c1d 100644 --- a/comet_core/fingerprint.py +++ b/comet_core/fingerprint.py @@ -14,9 +14,9 @@ """Helper function to compute the fingerprint of alerts.""" -import collections import hmac import json +from collections.abc import Iterable from copy import deepcopy from hashlib import sha256, shake_256 @@ -62,7 +62,7 @@ def filter_dict(orig_dict, blacklist): for item in blacklist: if isinstance(item, str) and item in orig_dict: del orig_dict[item] - elif isinstance(item, collections.Iterable): + elif isinstance(item, Iterable): pointer = orig_dict for sub in item[:-1]: pointer = pointer.get(sub, {}) diff --git a/setup.py b/setup.py index 87582eb..35f8058 100644 --- a/setup.py +++ b/setup.py @@ -16,7 +16,7 @@ setuptools.setup( name="comet-core", - version="2.5.1", + version="2.6.0", url="https://github.com/spotify/comet-core", author="Spotify Platform Security", author_email="wasabi@spotify.com", diff --git a/tests/test_app.py b/tests/test_app.py index 621e950..058f949 100644 --- a/tests/test_app.py +++ b/tests/test_app.py @@ -25,13 +25,13 @@ from comet_core.model import EventRecord, IgnoreFingerprintRecord -@freeze_time("2018-05-09 09:00:00") # pylint: disable=missing-docstring -def test_process_unprocessed_events(): +def test_process_unprocessed_events_digest_mode(): app = Comet() app.register_parser("datastoretest", json) app.register_parser("datastoretest2", json) app.register_parser("datastoretest3", json) + app.register_parser("datastoretest4", json) app.set_config("datastoretest2", {}) @@ -91,11 +91,120 @@ def test_process_unprocessed_events(): ) app.process_unprocessed_events() + + # it is expected to have specific_router called once for the datastoretest2 assert specific_router.call_count == 1 + # it is expected to have two calls of the generic router for the source_type datastoretest2 and datastoretest3 assert router.call_count == 2 + # and the user must be check_user assert router.call_args[0][2][0].owner == check_user + # due to the default escalation_time=10seconds, all three events (id=2,3,4) must be escalated assert escalator.call_count == 3 + app.data_store.add_record( + EventRecord( + id=5, + received_at=datetime.utcnow() - timedelta(days=2), + source_type="datastoretest", + owner=check_user, + data={}, + fingerprint="f1", + ) + ) + app.process_unprocessed_events() + + # f1 is expected to be processed, but not sent out + assert app.data_store.get_latest_event_with_fingerprint("f1").processed_at + assert not app.data_store.get_latest_event_with_fingerprint("f1").sent_at + + +# pylint: disable=missing-docstring +def test_process_unprocessed_events_non_digest_mode(): + app = Comet() + app.register_parser("datastoretest4", json) + + check_user = "an_owner" + router = mock.Mock() + escalator = mock.Mock() + app.register_router(func=router) + app.register_escalator(func=escalator) + + app.set_config("datastoretest4", {"communication_digest_mode": False, "new_threshold": timedelta(days=14)}) + + app.data_store.add_record( + EventRecord( + id=6, + received_at=datetime.utcnow() - timedelta(days=8), + source_type="datastoretest4", + sent_at=datetime.utcnow() - timedelta(days=8), + processed_at=datetime.utcnow() - timedelta(days=8), + owner=check_user, + data={}, + fingerprint="f5", + ) + ) + + app.data_store.add_record( + EventRecord( + id=7, + received_at=datetime.utcnow() - timedelta(days=2), + source_type="datastoretest4", + owner=check_user, + data={}, + fingerprint="f5", + ) + ) + + app.data_store.add_record( + EventRecord( + id=8, + received_at=datetime.utcnow(), + source_type="datastoretest4", + owner=check_user, + data={}, + fingerprint="f6", + ) + ) + + app.data_store.add_record( + EventRecord( + id=9, + received_at=datetime.utcnow() - timedelta(days=2), + source_type="datastoretest4", + sent_at=datetime.utcnow() - timedelta(days=2), + processed_at=datetime.utcnow() - timedelta(days=2), + owner=check_user, + data={}, + fingerprint="f7", + ) + ) + + app.data_store.add_record( + EventRecord( + id=10, + received_at=datetime.utcnow(), + source_type="datastoretest4", + owner=check_user, + data={}, + fingerprint="f7", + ) + ) + + # f5 is expected to be reminded + # f6 is expected to be new and sent as well + # f7 is NOT expected to be reminded + before_calling = router.call_count + app.process_unprocessed_events() + assert app.data_store.get_latest_event_with_fingerprint("f5").processed_at + assert app.data_store.get_latest_event_with_fingerprint("f6").processed_at + assert app.data_store.get_latest_event_with_fingerprint("f7").processed_at + assert router.call_count == before_calling + 1 + + sent_fingerprints = [e.fingerprint for e in router.call_args[0][2]] + assert "f5" in sent_fingerprints + assert "f6" in sent_fingerprints + assert "f7" not in sent_fingerprints + def test_event_container(): container = EventContainer("test", {}) diff --git a/tests/test_data_store.py b/tests/test_data_store.py index 12766ea..fa2e73e 100644 --- a/tests/test_data_store.py +++ b/tests/test_data_store.py @@ -163,6 +163,61 @@ def test_update_processed_at_timestamp_to_now(data_store_with_test_events): assert isinstance(record.processed_at, datetime) +def test_get_any_issues_need_reminder(): + data_store = comet_core.data_store.DataStore("sqlite://") + + test_fingerprint1 = "f1" + test_fingerprint2 = "f2" + test_fingerprint3 = "f3" + + one_a = EventRecord(sent_at=datetime.utcnow() - timedelta(days=9), source_type="datastoretest") + one_a.fingerprint = test_fingerprint1 + one_b = EventRecord(sent_at=datetime.utcnow() - timedelta(days=3), source_type="datastoretest") + one_b.fingerprint = test_fingerprint1 + + two_a = EventRecord(sent_at=datetime.utcnow() - timedelta(days=10), source_type="datastoretest") + two_a.fingerprint = test_fingerprint2 + two_b = EventRecord(sent_at=datetime.utcnow() - timedelta(days=8), source_type="datastoretest") + two_b.fingerprint = test_fingerprint2 + + two_c = EventRecord(source_type="datastoretest") # sent_at NULL + two_c.fingerprint = test_fingerprint2 + + three_a = EventRecord(source_type="datastoretest") # sent_at NULL + three_a.fingerprint = test_fingerprint3 + + data_store.add_record(one_a) + data_store.add_record(two_a) + data_store.add_record(two_b) + data_store.add_record(two_c) + data_store.add_record(three_a) + + # issue \ time ---> + # 1 --------a------|--------------> + # 2 ----a-------b--|--------------> (2c sent_at == NULL) + # 3 ---------------|--------------> (3a sent_at == NULL) + # ^ + # -7days + + result = data_store.get_any_issues_need_reminder(timedelta(days=7), [one_a, two_a, three_a]) + assert len(result) == 2 + assert test_fingerprint2 in result + assert test_fingerprint1 in result + + data_store.add_record(one_b) + + # issue \ time ---> + # 1 --------a------|-----b--------> + # 2 ----a-------b--|--------------> (2c sent_at == NULL) + # 3 ---------------|--------------> (3a sent_at == NULL) + # ^ + # -7days + + result = data_store.get_any_issues_need_reminder(timedelta(days=7), [one_a, two_a, three_a]) + assert len(result) == 1 + assert test_fingerprint2 in result + + def test_check_any_issue_needs_reminder(): data_store = comet_core.data_store.DataStore("sqlite://")