Skip to content

Commit

Permalink
Merge pull request #197 from Progress1/scheduler
Browse files Browse the repository at this point in the history
Wrap scheduler action with exception handler
  • Loading branch information
milankowww authored Dec 4, 2023
2 parents 268614b + 7e64d5f commit 2a78cc3
Show file tree
Hide file tree
Showing 10 changed files with 28 additions and 1 deletion.
1 change: 1 addition & 0 deletions src/collectors/collectors/atom_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ class AtomCollector(BaseCollector):

news_items = []

@BaseCollector.ignore_exceptions
def collect(self, source):

feed_url = source.parameter_values['ATOM_FEED_URL']
Expand Down
16 changes: 15 additions & 1 deletion src/collectors/collectors/base_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@
import re
import time
from dateutil import tz
from functools import wraps

from managers import time_manager
from managers.log_manager import log_debug, log_info, log_debug_trace
from managers.log_manager import log_debug, log_info, log_warning, log_critical, log_debug_trace
from remote.core_api import CoreApi
from shared.schema import collector, osint_source, news_item
from shared.schema.parameter import Parameter, ParameterType
Expand All @@ -34,6 +35,18 @@ def get_info(self):
def collect(self, source):
pass

# wrap scheduled action with exception because scheduler fail plan next one
@staticmethod
def ignore_exceptions(func):
@wraps(func)
def wrapper(self, source):
try:
func(self, source)
except Exception as ex:
log_critical("An unhandled exception occurred during scheduled collector run: {}".format(ex))
log_debug_trace(ex)
return wrapper

@staticmethod
def print_exception(source, error):
log_info('OSINTSource ID: ' + source.id)
Expand Down Expand Up @@ -285,6 +298,7 @@ def refresh(self):
source.scheduler_job = time_manager.schedule_job_minutes(int(interval), self.collect, source)
else:
# TODO: send update to core with the error message
log_warning("configuration not received, code: {}, response: {}".format(code, response))
pass
except Exception as ex:
log_debug_trace()
Expand Down
1 change: 1 addition & 0 deletions src/collectors/collectors/email_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ class EmailCollector(BaseCollector):

parameters.extend(BaseCollector.parameters)

@BaseCollector.ignore_exceptions
def collect(self, source):

news_items = []
Expand Down
1 change: 1 addition & 0 deletions src/collectors/collectors/manual_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,6 @@ class ManualCollector(BaseCollector):

parameters.extend(BaseCollector.parameters)

@BaseCollector.ignore_exceptions
def collect(self, source):
pass
1 change: 1 addition & 0 deletions src/collectors/collectors/rss_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ class RSSCollector(BaseCollector):

news_items = []

@BaseCollector.ignore_exceptions
def collect(self, source):
"""Collect data from RSS feed.
Expand Down
1 change: 1 addition & 0 deletions src/collectors/collectors/scheduled_tasks_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ class ScheduledTasksCollector(BaseCollector):

parameters.extend(BaseCollector.parameters)

@BaseCollector.ignore_exceptions
def collect(self, source):

news_items = []
Expand Down
1 change: 1 addition & 0 deletions src/collectors/collectors/slack_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ class SlackCollector(BaseCollector):

parameters.extend(BaseCollector.parameters)

@BaseCollector.ignore_exceptions
def collect(self, source):

news_items = []
Expand Down
1 change: 1 addition & 0 deletions src/collectors/collectors/twitter_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ class TwitterCollector(BaseCollector):

parameters.extend(BaseCollector.parameters)

@BaseCollector.ignore_exceptions
def collect(self, source):

try:
Expand Down
1 change: 1 addition & 0 deletions src/collectors/collectors/web_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,7 @@ def __run_tor(self):
subprocess.Popen(['tor'])
time.sleep(3)

@BaseCollector.ignore_exceptions
def collect(self, source):
"""Collects news items from this source (main function)"""

Expand Down
5 changes: 5 additions & 0 deletions src/collectors/managers/collectors_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@ def reportStatus():
log_debug("[{}] Sending status update...".format(__name__))
response, code = CoreApi.update_collector_status()
log_debug("[{}] Core responded with: HTTP {}, {}".format(__name__, code, response))
# for debuging scheduler tasks
# for key in collectors:
# for source in collectors[key].osint_sources:
# if hasattr(source, "scheduler_job"):
# log_debug("Last run: {}, Next run: {}, {}".format(source.scheduler_job.last_run or "never", source.scheduler_job.next_run or "never", source.name))
time.sleep(55)

def initialize():
Expand Down

0 comments on commit 2a78cc3

Please sign in to comment.