Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/more stable celery workers/main #149

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions skipper/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,4 @@
/pycharm-venv/**
*.pid
_tests_report_*.txt
testing/*
11 changes: 11 additions & 0 deletions skipper/celeryBeatDev.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# This Source Code Form is subject to the terms of the Mozilla Public License, v. 2.0.
# If a copy of the MPL was not distributed with this file,
# You can obtain one at https://mozilla.org/MPL/2.0/.
# This file is part of NF Compose
# [2019] - [2024] © NeuroForge GmbH & Co. KG


SKIPPER_CONTAINER_TYPE=CELERY exec python3 -m pipenv run celery -A skipper \
beat \
--pidfile=/neuroforge/skipper/celery.pid \
--loglevel=INFO
2 changes: 1 addition & 1 deletion skipper/celeryDev.sh
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,4 @@
# [2019] - [2024] © NeuroForge GmbH & Co. KG


SKIPPER_CONTAINER_TYPE=CELERY exec python3 -m pipenv run celery -A skipper worker --beat --loglevel=info -Q celery,event_queue,event_cleanup,health_check,data_series_cleanup,persist_data,file_registry_cleanup,index_creation,requeue_persist_data
SKIPPER_CONTAINER_TYPE=CELERY exec python3 -m pipenv run celery -A skipper worker --pool=gevent --concurrency 20 --loglevel=info -Q celery,event_queue,event_cleanup,health_check,data_series_cleanup,persist_data,file_registry_cleanup,index_creation,requeue_persist_data
1 change: 1 addition & 0 deletions skipper/runProduction.sh
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ if [ "${SKIPPER_CONTAINER_TYPE}" == "CELERY" ]; then
-O fair \
-Q $celery_worker_queues \
--loglevel=INFO \
--pool=gevent
--concurrency=$celery_worker_concurrency
elif [ "${SKIPPER_CONTAINER_TYPE}" == "CELERY_BEAT" ]; then
cd /neuroforge/skipper || (echo "/neuroforge/skipper does not exist" && exit 1)
Expand Down
2 changes: 1 addition & 1 deletion skipper/skipper/celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ def int_or_crontab(input: Any, key: str) -> Union[int, crontab]:
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)


app.conf.worker_prefetch_multiplier = 1
app.conf.beat_schedule = {
'event-queue-heartbeat': {
'task': '_3_wake_up_heartbeat_consumers',
Expand Down
37 changes: 37 additions & 0 deletions skipper/skipper/core/celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,40 @@ def apply_async(self, args=None, kwargs=None, task_id=None, producer=None, link=
return current_app.task(*args, **dict({'base': SyncDelayTask}, **kwargs))
else:
task = current_app.task


def acquire_semaphore(semaphore_key: str, concurrency_limit: int, lock_timeout: int) -> bool:
"""Attempt to acquire a semaphore for the given semaphore_key."""
from skipper.celery import app as celery_app

with celery_app.pool.acquire(block=True) as conn:
redis_client = conn.default_channel.client

key = f"semaphore:{semaphore_key}"
# Initialize the semaphore with an expiry if it doesn't exist
current_value = redis_client.get(key)
if current_value is None:
redis_client.set(key, 0, nx=True, ex=lock_timeout)

# Atomically increment the count
count = redis_client.incr(key)
if count > concurrency_limit:
# Decrement back if we exceed the limit and return False
redis_client.decr(key)
return False
return True

def release_semaphore(semaphore_key: str) -> None:
"""Release the semaphore for the given semaphore_key."""
from skipper.celery import app as celery_app

with celery_app.pool.acquire(block=True) as conn:
redis_client = conn.default_channel.client
key = f"semaphore:{semaphore_key}"
# Decrement the count to release a permit
redis_client.decr(key)

redis_val = redis_client.get(key)
# Reset the semaphore if it goes to zero
if redis_val is not None and int(redis_val) <= 0:
redis_client.delete(key)
3 changes: 2 additions & 1 deletion skipper/skipper/dataseries/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from skipper.dataseries.models import ConsumerEvent, BulkInsertTaskData, MetaModelTaskData
from skipper.dataseries.models.metamodel.consumer import Consumer
from skipper.dataseries.models.metamodel.data_series import DataSeries
from skipper.dataseries.raw_sql import dbtime


# only superusers are allowed to change things as this
Expand Down Expand Up @@ -141,7 +142,7 @@ class BulkInsertTaskDataAdmin(TenantAwareAdmin):

def requeue(self, request: HttpRequest, queryset: QuerySet[BulkInsertTaskData]) -> None:
for elem in queryset:
async_persist_data_point_chunk.delay(elem.id)
async_persist_data_point_chunk.delay(elem.id, dbtime.now())

def has_change_permission(self, request: HttpRequest, obj: Optional[Model] = None) -> bool:
return False
Expand Down
1 change: 1 addition & 0 deletions skipper/skipper/dataseries/models/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ def try_send_events(
"skipper.dataseries.consumer": str(consumer.id),
"skipper.core.tenant": str(consumer.tenant.name)
}):
logger.info("start sending events for consumer " + str(consumer.id))
_split_target_url = urlsplit(consumer.target)
sanitize_response: Callable[[str], str]
sanitize_headers: Callable[[Dict[str, str]], Dict[str, str]]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
from skipper.settings import DATA_SERIES_DYNAMIC_SQL_DB
from skipper.testing import SKIPPER_CELERY_TESTING
from skipper.core.lint import sql_cursor
from skipper.dataseries.raw_sql import dbtime

__T = TypeVar('__T')
__U = TypeVar('__U')
Expand Down Expand Up @@ -527,7 +528,8 @@ def create_bulk(
# when testing, immediately run the code,
# as we are in the same transaction always
async_persist_data_point_chunk.delay(
task_data_reference_id=task_data.id
task_data_reference_id=task_data.id,
queue_time=dbtime.now()
)
else:
# in production, we need to do this on transaction commit
Expand All @@ -549,14 +551,16 @@ def create_bulk(
sub_clock=sub_clock
)

queue_time = dbtime.now()
# spawn the celery task after the transaction commits
# this is to ensure the task exists
# There is a slight possibility that data
# might not be written because of a lost task
# but for that we have healthchecks
transaction.on_commit(
lambda: [async_persist_data_point_chunk.delay(
task_data_reference_id=tsk_id
task_data_reference_id=tsk_id,
queue_time=queue_time
) for tsk_id in task_data_ids],
using=settings.DATA_SERIES_DYNAMIC_SQL_DB_BULK
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,12 @@
from typing import Iterable, Dict, Any, List, Tuple, Type, TypeVar, Callable, Generator, Union, cast, Optional

from celery.utils.log import get_task_logger # type: ignore
from celery.exceptions import Ignore # type: ignore
from django.db import transaction
from django_multitenant.utils import set_current_tenant, get_current_tenant # type: ignore

from skipper import settings
from skipper.core.celery import task
from skipper.core.celery import task, acquire_semaphore, release_semaphore
from skipper.core.models.tenant import Tenant
from skipper.dataseries.models import BulkInsertTaskData
from skipper.dataseries.models.event import data_point_event, ConsumerEventType
Expand All @@ -25,6 +26,13 @@
from skipper.dataseries.storage.dynamic_sql.tasks.common import get_or_fail
from skipper.dataseries.storage.static_ds_information import DataPointSerializationKeys
from skipper.dataseries.raw_sql import dbtime
from skipper.environment import \
SKIPPER_CELERY_DATA_SERIES_BULK_ASYNC_CONCURRENCY_PER_DATA_SERIES, \
SKIPPER_CELERY_DATA_SERIES_BULK_ASYNC_CONCURRENCY_LOCK_TIMEOUT, \
SKIPPER_CELERY_DATA_SERIES_BULK_ASYNC_CONCURRENCY_MAX_RETRY_DELAY, \
SKIPPER_CELERY_PERSIST_DATA_POINT_CHUNK_REQUEUE_AGE_MINUTES, \
SKIPPER_CELERY_PERSIST_DATA_POINT_CHUNK_TASK_EXPIRE_MINUTES, \
SKIPPER_CELERY_PERSIST_DATA_POINT_CHUNK_REQUEUE_COOLDOWN_MINUTES

logger = get_task_logger(__name__)

Expand Down Expand Up @@ -140,67 +148,98 @@ class RetryException(Exception):

@task(
name="_3_wake_up_requeue_persist_data_point_chunk",
queue='requeue_persist_data'
queue='requeue_persist_data',
ignore_result=True,
bind=True
) # type: ignore
def wake_up_requeue_persist_data_point_chunk() -> None:
import base64
import json
from skipper.celery import app as celery_app

def wake_up_requeue_persist_data_point_chunk(self) -> None:
for elem in BulkInsertTaskData.objects.filter(
point_in_time__lt=dbtime.now() - timezone.timedelta(minutes=30)
).order_by('id'):
found = False
# we need to check if the task is still in the queue
with celery_app.pool.acquire(block=True) as conn:
cursor = conn.default_channel.client.scan_iter('persist_data:*')
for key in cursor:
task = conn.default_channel.client.get(key)
if task is not None:
j = json.loads(task)
j['body'] = json.loads(base64.b64decode(j['body']))
if elem.id == j['body']['task_data_reference_id']:
found = True
break

if not found:
async_persist_data_point_chunk.delay(elem.id)

point_in_time__lt=dbtime.now() - timezone.timedelta(
minutes=SKIPPER_CELERY_PERSIST_DATA_POINT_CHUNK_REQUEUE_AGE_MINUTES
)
).order_by('id').values('id'):
task_data_id = elem['id']
# dont requeue if we did already do that recently
if not acquire_semaphore(
semaphore_key='bulk-requeue-taskdata-id:task_data:' + str(task_data_id),
concurrency_limit=1,
lock_timeout=SKIPPER_CELERY_PERSIST_DATA_POINT_CHUNK_REQUEUE_COOLDOWN_MINUTES * 60
):
logger.info(f'skipping requeue for task data with id {task_data_id}, a task with the same task data will have been requeued recently or is currently being processed')
continue

logger.info(f'requeueing task data with id {task_data_id} as it was older than {SKIPPER_CELERY_PERSIST_DATA_POINT_CHUNK_REQUEUE_AGE_MINUTES} minutes')
async_persist_data_point_chunk.delay(
task_data_reference_id=task_data_id,
queue_time=dbtime.now()
)

# to prevent weird race conditions under heavy load, we retry if the data is not there yet

@task(
name="_3_dynamic_sql_persist_data_point_chunk",
acks_late=True,
queue='persist_data'
queue='persist_data',
bind=True,
max_retries=None,
expires=SKIPPER_CELERY_PERSIST_DATA_POINT_CHUNK_TASK_EXPIRE_MINUTES * 60
) # type: ignore
def async_persist_data_point_chunk(
task_data_reference_id: int
self,
task_data_reference_id: int,
queue_time: Optional[datetime.datetime] = None
) -> None:
with transaction.atomic():
# this must run outside of any tenant context or we dont get all data in a multitenant environment
set_current_tenant(None)

# skip the task if it was already claimed by another task
task_data = BulkInsertTaskData.objects.filter(id=task_data_reference_id).select_for_update(skip_locked=True).first()
if task_data is None:
logger.warn('task data not found, either claimed by someone else or task does not exist (anymore)')
logger.info(f'task data for task with id {task_data_reference_id} not found, either claimed by someone else or task does not exist (anymore)')

if task_data is not None:
# should we lock here?
persist_data_point_chunk(
tenant_id=str(task_data.tenant.id),
tenant_name=task_data.tenant.name,
data_series_id=str(task_data.data_series.id),
data_series_external_id=task_data.data_series.external_id,
data_series_backend=task_data.data_series.backend,
validated_datas=task_data.data['validated_datas'],
serialization_keys=task_data.data['serialization_keys'],
point_in_time_timestamp=task_data.point_in_time.timestamp(),
user_id=str(task_data.user.id),
record_source=task_data.record_source,
sub_clock=task_data.sub_clock
)
task_data.delete()
# TODO: write error if error happened
# make sure to notify the requeuing mechanism that we are working on this task via the extra semaphore
if not acquire_semaphore(
semaphore_key='bulk-requeue-taskdata-id:task_data:' + str(task_data.id),
concurrency_limit=1,
lock_timeout=SKIPPER_CELERY_PERSIST_DATA_POINT_CHUNK_REQUEUE_COOLDOWN_MINUTES * 60
):
# log and ignore if we could not acquire the semaphore.
# this is not the end of days, but there will be another task execution that does nothing
logger.info(f'could not acquire semaphore for task data with id {task_data.id}, a task with the same task data will have been requeued recently')

if not acquire_semaphore(
semaphore_key='async_persist_data_point_chunk:dataseries:' + str(task_data.data_series.id),
concurrency_limit=SKIPPER_CELERY_DATA_SERIES_BULK_ASYNC_CONCURRENCY_PER_DATA_SERIES,
lock_timeout=SKIPPER_CELERY_DATA_SERIES_BULK_ASYNC_CONCURRENCY_LOCK_TIMEOUT
):
retry_delay = min(2 ** self.request.retries, SKIPPER_CELERY_DATA_SERIES_BULK_ASYNC_CONCURRENCY_MAX_RETRY_DELAY)
logger.info("Retrying in {} seconds".format(retry_delay))
raise self.retry(countdown=retry_delay)

try:
# should we lock here?
persist_data_point_chunk(
tenant_id=str(task_data.tenant.id),
tenant_name=task_data.tenant.name,
data_series_id=str(task_data.data_series.id),
data_series_external_id=task_data.data_series.external_id,
data_series_backend=task_data.data_series.backend,
validated_datas=task_data.data['validated_datas'],
serialization_keys=task_data.data['serialization_keys'],
point_in_time_timestamp=task_data.point_in_time.timestamp(),
user_id=str(task_data.user.id),
record_source=task_data.record_source,
sub_clock=task_data.sub_clock
)
task_data.delete()
# TODO: write error if error happened
finally:
# Release the semaphore after processing
release_semaphore('async_persist_data_point_chunk:dataseries:' + str(task_data.data_series.id))
# also release the semaphore for the requeue to clean up memory in redis
release_semaphore('bulk-requeue-taskdata-id:task_data:' + str(task_data.id))


def persist_data_point_chunk(
Expand Down
2 changes: 1 addition & 1 deletion skipper/skipper/dataseries/tasks/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ def consumer_heartbeat(queue: BaseTenantQueue, job: TenantPostgresQueueJob) -> A
def actual_run_heartbeat_consumers(tenant_id: str, consumer_id: str) -> None:
_list = Tenant.objects.filter(id=tenant_id)
if len(_list) != 1:
logger.warn('did not find tenant with id ' + tenant_id)
logger.warning('did not find tenant with id ' + tenant_id)
return
tenant = _list[0]
queue = get_queue(
Expand Down
4 changes: 4 additions & 0 deletions skipper/skipper/environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@


SKIPPER_GUNICORN_WORKER_CONCURRENCY = int(os.environ.get('SKIPPER_GUNICORN_WORKER_CONCURRENCY', '2'))
SKIPPER_GUNICORN_WORKER_DB_POOL_TIMEOUT = int(os.environ.get('SKIPPER_GUNICORN_WORKER_DB_POOL_TIMEOUT', '10'))

SKIPPER_CELERY_WORKER_CONCURRENCY = int(os.environ.get('SKIPPER_CELERY_WORKER_CONCURRENCY', '20'))
SKIPPER_CELERY_WORKER_DB_POOL_TIMEOUT = int(os.environ.get('SKIPPER_CELERY_WORKER_DB_POOL_TIMEOUT', '60'))

SKIPPER_DOMAIN = os.environ['SKIPPER_DOMAIN']

Expand Down
16 changes: 16 additions & 0 deletions skipper/skipper/environment_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,23 @@
raise ValueError('SKIPPER_CELERY_FILE_REGISTRY_CLEANUP_MAX_AGE_HOURS must be a positive integer')
SKIPPER_CELERY_DATA_SERIES_HISTORY_CLEANUP_SCHEDULE = os.environ.get('SKIPPER_CELERY_DATA_SERIES_HISTORY_CLEANUP_SCHEDULE', '0 1 * * *')
SKIPPER_CELERY_DATA_SERIES_META_MODEL_CLEANUP_SCHEDULE = os.environ.get('SKIPPER_CELERY_DATA_SERIES_META_MODEL_CLEANUP_SCHEDULE', '0 1 * * *')

# these two will have to match or otherwise, we might get bad performance due to persist tasks being
# ignored and not requeued fast enough
SKIPPER_CELERY_PERSIST_DATA_POINT_CHUNK_REQUEUE_SCHEDULE = int(os.environ.get('SKIPPER_CELERY_PERSIST_DATA_POINT_CHUNK_REQUEUE_SCHEDULE', 60 * 30))
# after what time we requeue a chunk
SKIPPER_CELERY_PERSIST_DATA_POINT_CHUNK_REQUEUE_AGE_MINUTES = int(os.environ.get('SKIPPER_CELERY_PERSIST_DATA_POINT_CHUNK_REQUEUE_AGE_MINUTES', 60))
# expire after 60 minutes
SKIPPER_CELERY_PERSIST_DATA_POINT_CHUNK_TASK_EXPIRE_MINUTES = int(os.environ.get('SKIPPER_CELERY_PERSIST_DATA_POINT_CHUNK_TASK_EXPIRE_MINUTES', 60))
# cooldown after requeueing a chunk
# so we dont overload the system with requeues
SKIPPER_CELERY_PERSIST_DATA_POINT_CHUNK_REQUEUE_COOLDOWN_MINUTES = int(os.environ.get('SKIPPER_CELERY_PERSIST_DATA_POINT_CHUNK_REQUEUE_COOLDOWN_MINUTES', 40))
# by default we keep something resembling the old behaviour
# where we had effectively unbounded concurrency
SKIPPER_CELERY_DATA_SERIES_BULK_ASYNC_CONCURRENCY_PER_DATA_SERIES = int(os.environ.get('SKIPPER_CELERY_DATA_SERIES_BULK_ASYNC_CONCURRENCY_PER_DATA_SERIES', '100'))
SKIPPER_CELERY_DATA_SERIES_BULK_ASYNC_CONCURRENCY_LOCK_TIMEOUT = int(os.environ.get('SKIPPER_CELERY_DATA_SERIES_BULK_ASYNC_CONCURRENCY_LOCK_TIMEOUT', '180'))
SKIPPER_CELERY_DATA_SERIES_BULK_ASYNC_CONCURRENCY_MAX_RETRY_DELAY= int(os.environ.get('SKIPPER_CELERY_DATA_SERIES_BULK_ASYNC_CONCURRENCY_MAX_RETRY_DELAY', '60'))

SKIPPER_CELERY_HEALTH_CHECK_HEARTBEAT_SCHEDULE = int(os.environ.get('SKIPPER_CELERY_HEALTH_CHECK_HEARTBEAT_SCHEDULE', 30))
SKIPPER_CELERY_OUTSTANDING_TOKENS_CLEANUP_SCHEDULE = int(os.environ.get('SKIPPER_CELERY_OUTSTANDING_TOKENS_CLEANUP_SCHEDULE', 60 * 60))

Expand Down
28 changes: 17 additions & 11 deletions skipper/skipper/settings_env.py
Original file line number Diff line number Diff line change
Expand Up @@ -313,22 +313,28 @@ def task_upstream_dashboard(tenant: Tenant, user: Optional[Union[User, Anonymous
'sslmode': environment.SKIPPER_DB_SSL_MODE,
} if environment.SKIPPER_DB_SSL_ENABLE else {}
)
_db_engine = (
'django.db.backends.postgresql'
if skipper_container_type in ['DJANGO', 'DJANGO_INTERNAL'] and not environment.SKIPPER_TESTING and not TYPE_CHECKING and not (os.environ.get('MYPY_RUN', 'false') == 'true')
else 'django.db.backends.postgresql'
)
_db_options = (
{
_db_engine = 'django.db.backends.postgresql'

_db_options = {}
if skipper_container_type in ['DJANGO', 'DJANGO_INTERNAL'] and not environment.SKIPPER_TESTING and not TYPE_CHECKING and not (os.environ.get('MYPY_RUN', 'false') == 'true'):
_db_options = {
'pool': {
'min_size': max(2, environment.SKIPPER_GUNICORN_WORKER_CONCURRENCY),
'max_size': max(2, environment.SKIPPER_GUNICORN_WORKER_CONCURRENCY),
'timeout': 10
'timeout': environment.SKIPPER_GUNICORN_WORKER_DB_POOL_TIMEOUT
}
}
if skipper_container_type in ['DJANGO', 'DJANGO_INTERNAL'] and not environment.SKIPPER_TESTING and not TYPE_CHECKING and not (os.environ.get('MYPY_RUN', 'false') == 'true')
else {}
)
else:
_db_options = {
'pool': {
'min_size': max(2, environment.SKIPPER_CELERY_WORKER_CONCURRENCY),
'max_size': max(2, environment.SKIPPER_CELERY_WORKER_CONCURRENCY),
'timeout': environment.SKIPPER_CELERY_WORKER_DB_POOL_TIMEOUT
}
}

if environment.SKIPPER_TESTING or TYPE_CHECKING:
_db_options = {}

if os.environ.get('MYPY_RUN', 'false') == 'true':
DATABASES: Dict[str, Any] = {}
Expand Down
Loading