Skip to content

Commit

Permalink
Merge pull request #3566 from onyx-dot-app/bugfix/primary_task_timings
Browse files Browse the repository at this point in the history
re-enable celery task execution logging in primary worker
  • Loading branch information
rkuo-danswer authored Dec 31, 2024
2 parents 3f92ed9 + b43a8e4 commit 9b65c23
Show file tree
Hide file tree
Showing 7 changed files with 38 additions and 13 deletions.
12 changes: 11 additions & 1 deletion backend/onyx/background/celery/apps/app_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -414,11 +414,21 @@ def on_setup_logging(
task_logger.setLevel(loglevel)
task_logger.propagate = False

# Hide celery task received and succeeded/failed messages
# hide celery task received spam
# e.g. "Task check_for_pruning[a1e96171-0ba8-4e00-887b-9fbf7442eab3] received"
strategy.logger.setLevel(logging.WARNING)

# uncomment this to hide celery task succeeded/failed spam
# e.g. "Task check_for_pruning[a1e96171-0ba8-4e00-887b-9fbf7442eab3] succeeded in 0.03137450001668185s: None"
trace.logger.setLevel(logging.WARNING)


def set_task_finished_log_level(logLevel: int) -> None:
"""call this to override the setLevel in on_setup_logging. We are interested
in the task timings in the cloud but it can be spammy for self hosted."""
trace.logger.setLevel(logLevel)


class TenantContextFilter(logging.Filter):

"""Logging filter to inject tenant ID into the logger's name."""
Expand Down
5 changes: 5 additions & 0 deletions backend/onyx/background/celery/apps/primary.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import logging
import multiprocessing
from typing import Any
from typing import cast
Expand Down Expand Up @@ -194,6 +195,10 @@ def on_setup_logging(
) -> None:
app_base.on_setup_logging(loglevel, logfile, format, colorize, **kwargs)

# this can be spammy, so just enable it in the cloud for now
if MULTI_TENANT:
app_base.set_task_finished_log_level(logging.INFO)


class HubPeriodicTask(bootsteps.StartStopStep):
"""Regularly reacquires the primary worker lock outside of the task queue.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ class TaskDependencyError(RuntimeError):
trail=False,
bind=True,
)
def check_for_connector_deletion_task(self: Task, *, tenant_id: str | None) -> None:
def check_for_connector_deletion_task(
self: Task, *, tenant_id: str | None
) -> bool | None:
r = get_redis_client(tenant_id=tenant_id)

lock_beat: RedisLock = r.lock(
Expand All @@ -45,7 +47,7 @@ def check_for_connector_deletion_task(self: Task, *, tenant_id: str | None) -> N
try:
# these tasks should never overlap
if not lock_beat.acquire(blocking=False):
return
return None

# collect cc_pair_ids
cc_pair_ids: list[int] = []
Expand Down Expand Up @@ -81,6 +83,8 @@ def check_for_connector_deletion_task(self: Task, *, tenant_id: str | None) -> N
if lock_beat.owned():
lock_beat.release()

return True


def try_generate_document_cc_pair_cleanup_tasks(
app: Celery,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ def _is_external_doc_permissions_sync_due(cc_pair: ConnectorCredentialPair) -> b
soft_time_limit=JOB_TIMEOUT,
bind=True,
)
def check_for_doc_permissions_sync(self: Task, *, tenant_id: str | None) -> None:
def check_for_doc_permissions_sync(self: Task, *, tenant_id: str | None) -> bool | None:
r = get_redis_client(tenant_id=tenant_id)

lock_beat: RedisLock = r.lock(
Expand All @@ -102,7 +102,7 @@ def check_for_doc_permissions_sync(self: Task, *, tenant_id: str | None) -> None
try:
# these tasks should never overlap
if not lock_beat.acquire(blocking=False):
return
return None

# get all cc pairs that need to be synced
cc_pair_ids_to_sync: list[int] = []
Expand Down Expand Up @@ -131,6 +131,8 @@ def check_for_doc_permissions_sync(self: Task, *, tenant_id: str | None) -> None
if lock_beat.owned():
lock_beat.release()

return True


def try_creating_permissions_sync_task(
app: Celery,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ def _is_external_group_sync_due(cc_pair: ConnectorCredentialPair) -> bool:
soft_time_limit=JOB_TIMEOUT,
bind=True,
)
def check_for_external_group_sync(self: Task, *, tenant_id: str | None) -> None:
def check_for_external_group_sync(self: Task, *, tenant_id: str | None) -> bool | None:
r = get_redis_client(tenant_id=tenant_id)

lock_beat: RedisLock = r.lock(
Expand All @@ -105,7 +105,7 @@ def check_for_external_group_sync(self: Task, *, tenant_id: str | None) -> None:
try:
# these tasks should never overlap
if not lock_beat.acquire(blocking=False):
return
return None

cc_pair_ids_to_sync: list[int] = []
with get_session_with_tenant(tenant_id) as db_session:
Expand Down Expand Up @@ -149,6 +149,8 @@ def check_for_external_group_sync(self: Task, *, tenant_id: str | None) -> None:
if lock_beat.owned():
lock_beat.release()

return True


def try_creating_external_group_sync_task(
app: Celery,
Expand Down
8 changes: 5 additions & 3 deletions backend/onyx/background/celery/tasks/pruning/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,18 +81,18 @@ def _is_pruning_due(cc_pair: ConnectorCredentialPair) -> bool:
soft_time_limit=JOB_TIMEOUT,
bind=True,
)
def check_for_pruning(self: Task, *, tenant_id: str | None) -> None:
def check_for_pruning(self: Task, *, tenant_id: str | None) -> bool | None:
r = get_redis_client(tenant_id=tenant_id)

lock_beat = r.lock(
lock_beat: RedisLock = r.lock(
OnyxRedisLocks.CHECK_PRUNE_BEAT_LOCK,
timeout=CELERY_VESPA_SYNC_BEAT_LOCK_TIMEOUT,
)

try:
# these tasks should never overlap
if not lock_beat.acquire(blocking=False):
return
return None

cc_pair_ids: list[int] = []
with get_session_with_tenant(tenant_id) as db_session:
Expand Down Expand Up @@ -127,6 +127,8 @@ def check_for_pruning(self: Task, *, tenant_id: str | None) -> None:
if lock_beat.owned():
lock_beat.release()

return True


def try_creating_prune_generator_task(
celery_app: Celery,
Expand Down
6 changes: 3 additions & 3 deletions backend/onyx/background/celery/tasks/vespa/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@
trail=False,
bind=True,
)
def check_for_vespa_sync_task(self: Task, *, tenant_id: str | None) -> None:
def check_for_vespa_sync_task(self: Task, *, tenant_id: str | None) -> bool | None:
"""Runs periodically to check if any document needs syncing.
Generates sets of tasks for Celery if syncing is needed."""
time_start = time.monotonic()
Expand All @@ -103,7 +103,7 @@ def check_for_vespa_sync_task(self: Task, *, tenant_id: str | None) -> None:
try:
# these tasks should never overlap
if not lock_beat.acquire(blocking=False):
return
return None

with get_session_with_tenant(tenant_id) as db_session:
try_generate_stale_document_sync_tasks(
Expand Down Expand Up @@ -166,7 +166,7 @@ def check_for_vespa_sync_task(self: Task, *, tenant_id: str | None) -> None:

time_elapsed = time.monotonic() - time_start
task_logger.debug(f"check_for_vespa_sync_task finished: elapsed={time_elapsed:.2f}")
return
return True


def try_generate_stale_document_sync_tasks(
Expand Down

0 comments on commit 9b65c23

Please sign in to comment.