Skip to content

Commit

Permalink
[change] Added default timeout to celery tasks #474
Browse files Browse the repository at this point in the history
Closes #474
  • Loading branch information
pandafy authored Jan 30, 2023
1 parent f4ea8a3 commit 79f3e62
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 13 deletions.
12 changes: 7 additions & 5 deletions openwisp_monitoring/check/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
from django.core.exceptions import ImproperlyConfigured, ObjectDoesNotExist
from swapper import load_model

from openwisp_utils.tasks import OpenwispCeleryTask

from .settings import CHECKS_LIST

logger = logging.getLogger(__name__)
Expand All @@ -16,7 +18,7 @@ def get_check_model():
return load_model('check', 'Check')


@shared_task
@shared_task(time_limit=2 * 60 * 60)
def run_checks(checks=None):
"""
Retrieves the id of all active checks in chunks of 2000 items
Expand Down Expand Up @@ -49,7 +51,7 @@ def run_checks(checks=None):
perform_check.delay(check['id'])


@shared_task
@shared_task(time_limit=30 * 60)
def perform_check(uuid):
"""
Retrieves check according to the passed UUID
Expand All @@ -65,7 +67,7 @@ def perform_check(uuid):
print(json.dumps(result, indent=4, sort_keys=True))


@shared_task
@shared_task(base=OpenwispCeleryTask)
def auto_create_ping(
model, app_label, object_id, check_model=None, content_type_model=None
):
Expand All @@ -90,7 +92,7 @@ def auto_create_ping(
check.save()


@shared_task
@shared_task(base=OpenwispCeleryTask)
def auto_create_config_check(
model, app_label, object_id, check_model=None, content_type_model=None
):
Expand All @@ -117,7 +119,7 @@ def auto_create_config_check(
check.save()


@shared_task
@shared_task(base=OpenwispCeleryTask)
def auto_create_iperf3_check(
model, app_label, object_id, check_model=None, content_type_model=None
):
Expand Down
12 changes: 7 additions & 5 deletions openwisp_monitoring/device/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@
from django.utils.timezone import now, timedelta
from swapper import load_model

from openwisp_utils.tasks import OpenwispCeleryTask

from ..check.tasks import perform_check

logger = logging.getLogger(__name__)


@shared_task
@shared_task(base=OpenwispCeleryTask)
def trigger_device_checks(pk, recovery=True):
"""
Retrieves all related checks to the passed ``device``
Expand All @@ -34,7 +36,7 @@ def trigger_device_checks(pk, recovery=True):
device.monitoring.update_status(status)


@shared_task
@shared_task(base=OpenwispCeleryTask)
def save_wifi_clients_and_sessions(device_data, device_pk):
_WIFICLIENT_FIELDS = ['vendor', 'ht', 'vht', 'wmm', 'wds', 'wps']
WifiClient = load_model('device_monitoring', 'WifiClient')
Expand Down Expand Up @@ -80,7 +82,7 @@ def save_wifi_clients_and_sessions(device_data, device_pk):
).update(stop_time=now())


@shared_task
@shared_task(base=OpenwispCeleryTask)
def delete_wifi_clients_and_sessions(days=6 * 30):
WifiClient = load_model('device_monitoring', 'WifiClient')
WifiSession = load_model('device_monitoring', 'WifiSession')
Expand All @@ -91,15 +93,15 @@ def delete_wifi_clients_and_sessions(days=6 * 30):
).delete()


@shared_task
@shared_task(base=OpenwispCeleryTask)
def offline_device_close_session(device_id):
WifiSession = load_model('device_monitoring', 'WifiSession')
WifiSession.objects.filter(device_id=device_id, stop_time__isnull=True).update(
stop_time=now()
)


@shared_task
@shared_task(base=OpenwispCeleryTask)
def write_device_metrics(pk, data, time=None, current=False):
DeviceData = load_model('device_monitoring', 'DeviceData')
try:
Expand Down
18 changes: 15 additions & 3 deletions openwisp_monitoring/monitoring/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
from django.core.exceptions import ObjectDoesNotExist
from swapper import load_model

from openwisp_utils.tasks import OpenwispCeleryTask

from ..db import timeseries_db
from ..db.exceptions import TimeseriesWriteException
from .settings import RETRY_OPTIONS
Expand Down Expand Up @@ -29,7 +31,12 @@ def _metric_post_write(name, values, metric_pk, check_threshold_kwargs, **kwargs
post_metric_write.send(**signal_kwargs)


@shared_task(bind=True, autoretry_for=(TimeseriesWriteException,), **RETRY_OPTIONS)
@shared_task(
base=OpenwispCeleryTask,
bind=True,
autoretry_for=(TimeseriesWriteException,),
**RETRY_OPTIONS
)
def timeseries_write(
self, name, values, metric_pk=None, check_threshold_kwargs=None, **kwargs
):
Expand All @@ -40,7 +47,12 @@ def timeseries_write(
_metric_post_write(name, values, metric_pk, check_threshold_kwargs, **kwargs)


@shared_task(bind=True, autoretry_for=(TimeseriesWriteException,), **RETRY_OPTIONS)
@shared_task(
base=OpenwispCeleryTask,
bind=True,
autoretry_for=(TimeseriesWriteException,),
**RETRY_OPTIONS
)
def timeseries_batch_write(self, data):
"""
Similar to timeseries_write function above, but operates on
Expand All @@ -51,7 +63,7 @@ def timeseries_batch_write(self, data):
_metric_post_write(**metric_data)


@shared_task
@shared_task(base=OpenwispCeleryTask)
def delete_timeseries(key, tags):
timeseries_db.delete_series(key=key, tags=tags)

Expand Down

0 comments on commit 79f3e62

Please sign in to comment.