diff --git a/openwisp_monitoring/check/tasks.py b/openwisp_monitoring/check/tasks.py index 6a643db48..75e8e22d3 100644 --- a/openwisp_monitoring/check/tasks.py +++ b/openwisp_monitoring/check/tasks.py @@ -7,6 +7,8 @@ from django.core.exceptions import ImproperlyConfigured, ObjectDoesNotExist from swapper import load_model +from openwisp_utils.tasks import OpenwispCeleryTask + from .settings import CHECKS_LIST logger = logging.getLogger(__name__) @@ -16,7 +18,7 @@ def get_check_model(): return load_model('check', 'Check') -@shared_task +@shared_task(time_limit=2 * 60 * 60) def run_checks(checks=None): """ Retrieves the id of all active checks in chunks of 2000 items @@ -49,7 +51,7 @@ def run_checks(checks=None): perform_check.delay(check['id']) -@shared_task +@shared_task(time_limit=30 * 60) def perform_check(uuid): """ Retrieves check according to the passed UUID @@ -65,7 +67,7 @@ def perform_check(uuid): print(json.dumps(result, indent=4, sort_keys=True)) -@shared_task +@shared_task(base=OpenwispCeleryTask) def auto_create_ping( model, app_label, object_id, check_model=None, content_type_model=None ): @@ -90,7 +92,7 @@ def auto_create_ping( check.save() -@shared_task +@shared_task(base=OpenwispCeleryTask) def auto_create_config_check( model, app_label, object_id, check_model=None, content_type_model=None ): @@ -117,7 +119,7 @@ def auto_create_config_check( check.save() -@shared_task +@shared_task(base=OpenwispCeleryTask) def auto_create_iperf3_check( model, app_label, object_id, check_model=None, content_type_model=None ): diff --git a/openwisp_monitoring/device/tasks.py b/openwisp_monitoring/device/tasks.py index a7d66612d..b2211d272 100644 --- a/openwisp_monitoring/device/tasks.py +++ b/openwisp_monitoring/device/tasks.py @@ -5,12 +5,14 @@ from django.utils.timezone import now, timedelta from swapper import load_model +from openwisp_utils.tasks import OpenwispCeleryTask + from ..check.tasks import perform_check logger = logging.getLogger(__name__) -@shared_task +@shared_task(base=OpenwispCeleryTask) def trigger_device_checks(pk, recovery=True): """ Retrieves all related checks to the passed ``device`` @@ -34,7 +36,7 @@ def trigger_device_checks(pk, recovery=True): device.monitoring.update_status(status) -@shared_task +@shared_task(base=OpenwispCeleryTask) def save_wifi_clients_and_sessions(device_data, device_pk): _WIFICLIENT_FIELDS = ['vendor', 'ht', 'vht', 'wmm', 'wds', 'wps'] WifiClient = load_model('device_monitoring', 'WifiClient') @@ -80,7 +82,7 @@ def save_wifi_clients_and_sessions(device_data, device_pk): ).update(stop_time=now()) -@shared_task +@shared_task(base=OpenwispCeleryTask) def delete_wifi_clients_and_sessions(days=6 * 30): WifiClient = load_model('device_monitoring', 'WifiClient') WifiSession = load_model('device_monitoring', 'WifiSession') @@ -91,7 +93,7 @@ def delete_wifi_clients_and_sessions(days=6 * 30): ).delete() -@shared_task +@shared_task(base=OpenwispCeleryTask) def offline_device_close_session(device_id): WifiSession = load_model('device_monitoring', 'WifiSession') WifiSession.objects.filter(device_id=device_id, stop_time__isnull=True).update( @@ -99,7 +101,7 @@ def offline_device_close_session(device_id): ) -@shared_task +@shared_task(base=OpenwispCeleryTask) def write_device_metrics(pk, data, time=None, current=False): DeviceData = load_model('device_monitoring', 'DeviceData') try: diff --git a/openwisp_monitoring/monitoring/tasks.py b/openwisp_monitoring/monitoring/tasks.py index 4d6b0f2ec..1601e0863 100644 --- a/openwisp_monitoring/monitoring/tasks.py +++ b/openwisp_monitoring/monitoring/tasks.py @@ -2,6 +2,8 @@ from django.core.exceptions import ObjectDoesNotExist from swapper import load_model +from openwisp_utils.tasks import OpenwispCeleryTask + from ..db import timeseries_db from ..db.exceptions import TimeseriesWriteException from .settings import RETRY_OPTIONS @@ -29,7 +31,12 @@ def _metric_post_write(name, values, metric_pk, check_threshold_kwargs, **kwargs post_metric_write.send(**signal_kwargs) -@shared_task(bind=True, autoretry_for=(TimeseriesWriteException,), **RETRY_OPTIONS) +@shared_task( + base=OpenwispCeleryTask, + bind=True, + autoretry_for=(TimeseriesWriteException,), + **RETRY_OPTIONS +) def timeseries_write( self, name, values, metric_pk=None, check_threshold_kwargs=None, **kwargs ): @@ -40,7 +47,12 @@ def timeseries_write( _metric_post_write(name, values, metric_pk, check_threshold_kwargs, **kwargs) -@shared_task(bind=True, autoretry_for=(TimeseriesWriteException,), **RETRY_OPTIONS) +@shared_task( + base=OpenwispCeleryTask, + bind=True, + autoretry_for=(TimeseriesWriteException,), + **RETRY_OPTIONS +) def timeseries_batch_write(self, data): """ Similar to timeseries_write function above, but operates on @@ -51,7 +63,7 @@ def timeseries_batch_write(self, data): _metric_post_write(**metric_data) -@shared_task +@shared_task(base=OpenwispCeleryTask) def delete_timeseries(key, tags): timeseries_db.delete_series(key=key, tags=tags)