Skip to content

Commit

Permalink
try spinning out check for indexing into a system task
Browse files Browse the repository at this point in the history
  • Loading branch information
Richard Kuo (Danswer) committed Jan 9, 2025
1 parent 428f592 commit a14dff3
Show file tree
Hide file tree
Showing 12 changed files with 42 additions and 30 deletions.
4 changes: 4 additions & 0 deletions backend/onyx/background/celery/apps/app_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,10 @@ def on_task_postrun(
if not task_id:
return

if task.name.startswith("cloud"):
# this is a cloud / all tenant task ... no postrun is needed
return

# Get tenant_id directly from kwargs- each celery task has a tenant_id kwarg
if not kwargs:
logger.error(f"Task {task.name} (ID: {task_id}) is missing kwargs")
Expand Down
14 changes: 10 additions & 4 deletions backend/onyx/background/celery/apps/beat.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,7 @@ def _generate_schedule(

if MULTI_TENANT:
# cloud tasks only need the single task beat across all tenants
get_cloud_tasks_to_schedule: list[
dict[str, Any]
] = fetch_versioned_implementation(
get_cloud_tasks_to_schedule = fetch_versioned_implementation(
"onyx.background.celery.tasks.beat_schedule",
"get_cloud_tasks_to_schedule",
)
Expand All @@ -79,7 +77,15 @@ def _generate_schedule(
] = get_cloud_tasks_to_schedule()
for task in cloud_tasks_to_schedule:
task_name = task["name"]
new_schedule[task_name] = task
cloud_task = {
"task": task["task"],
"schedule": task["schedule"],
"kwargs": {},
}
if options := task.get("options"):
logger.debug(f"Adding options to task {task_name}: {options}")
cloud_task["options"] = options
new_schedule[task_name] = cloud_task

# regular task beats are multiplied across all tenants
get_tasks_to_schedule = fetch_versioned_implementation(
Expand Down
16 changes: 8 additions & 8 deletions backend/onyx/background/celery/tasks/beat_schedule.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
# by the DynamicTenantScheduler
cloud_tasks_to_schedule = [
{
"name": "cloud-check-for-indexing",
"name": "cloud_check-for-indexing",
"task": OnyxCeleryTask.CLOUD_CHECK_FOR_INDEXING,
"schedule": timedelta(seconds=15),
"options": {
Expand All @@ -36,7 +36,7 @@
"task": OnyxCeleryTask.CHECK_FOR_VESPA_SYNC_TASK,
"schedule": timedelta(seconds=20),
"options": {
"priority": OnyxCeleryPriority.HIGH,
"priority": OnyxCeleryPriority.MEDIUM,
"expires": BEAT_EXPIRES_DEFAULT,
},
},
Expand All @@ -45,7 +45,7 @@
"task": OnyxCeleryTask.CHECK_FOR_CONNECTOR_DELETION,
"schedule": timedelta(seconds=20),
"options": {
"priority": OnyxCeleryPriority.HIGH,
"priority": OnyxCeleryPriority.MEDIUM,
"expires": BEAT_EXPIRES_DEFAULT,
},
},
Expand All @@ -54,7 +54,7 @@
"task": OnyxCeleryTask.CHECK_FOR_PRUNING,
"schedule": timedelta(seconds=15),
"options": {
"priority": OnyxCeleryPriority.HIGH,
"priority": OnyxCeleryPriority.MEDIUM,
"expires": BEAT_EXPIRES_DEFAULT,
},
},
Expand All @@ -72,7 +72,7 @@
"task": OnyxCeleryTask.MONITOR_VESPA_SYNC,
"schedule": timedelta(seconds=5),
"options": {
"priority": OnyxCeleryPriority.HIGH,
"priority": OnyxCeleryPriority.MEDIUM,
"expires": BEAT_EXPIRES_DEFAULT,
},
},
Expand All @@ -81,7 +81,7 @@
"task": OnyxCeleryTask.CHECK_FOR_DOC_PERMISSIONS_SYNC,
"schedule": timedelta(seconds=30),
"options": {
"priority": OnyxCeleryPriority.HIGH,
"priority": OnyxCeleryPriority.MEDIUM,
"expires": BEAT_EXPIRES_DEFAULT,
},
},
Expand All @@ -90,7 +90,7 @@
"task": OnyxCeleryTask.CHECK_FOR_EXTERNAL_GROUP_SYNC,
"schedule": timedelta(seconds=20),
"options": {
"priority": OnyxCeleryPriority.HIGH,
"priority": OnyxCeleryPriority.MEDIUM,
"expires": BEAT_EXPIRES_DEFAULT,
},
},
Expand All @@ -103,7 +103,7 @@
"task": OnyxCeleryTask.CHECK_FOR_INDEXING,
"schedule": timedelta(seconds=15),
"options": {
"priority": OnyxCeleryPriority.HIGH,
"priority": OnyxCeleryPriority.MEDIUM,
"expires": BEAT_EXPIRES_DEFAULT,
},
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

from onyx.background.celery.apps.app_base import task_logger
from onyx.configs.app_configs import JOB_TIMEOUT
from onyx.configs.constants import CELERY_VESPA_SYNC_BEAT_LOCK_TIMEOUT
from onyx.configs.constants import CELERY_GENERIC_BEAT_LOCK_TIMEOUT
from onyx.configs.constants import OnyxCeleryTask
from onyx.configs.constants import OnyxRedisLocks
from onyx.db.connector_credential_pair import get_connector_credential_pair_from_id
Expand Down Expand Up @@ -41,7 +41,7 @@ def check_for_connector_deletion_task(

lock_beat: RedisLock = r.lock(
OnyxRedisLocks.CHECK_CONNECTOR_DELETION_BEAT_LOCK,
timeout=CELERY_VESPA_SYNC_BEAT_LOCK_TIMEOUT,
timeout=CELERY_GENERIC_BEAT_LOCK_TIMEOUT,
)

# these tasks should never overlap
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@
from onyx.access.models import DocExternalAccess
from onyx.background.celery.apps.app_base import task_logger
from onyx.configs.app_configs import JOB_TIMEOUT
from onyx.configs.constants import CELERY_GENERIC_BEAT_LOCK_TIMEOUT
from onyx.configs.constants import CELERY_PERMISSIONS_SYNC_LOCK_TIMEOUT
from onyx.configs.constants import CELERY_TASK_WAIT_FOR_FENCE_TIMEOUT
from onyx.configs.constants import CELERY_VESPA_SYNC_BEAT_LOCK_TIMEOUT
from onyx.configs.constants import DANSWER_REDIS_FUNCTION_LOCK_PREFIX
from onyx.configs.constants import DocumentSource
from onyx.configs.constants import OnyxCeleryPriority
Expand Down Expand Up @@ -99,7 +99,7 @@ def check_for_doc_permissions_sync(self: Task, *, tenant_id: str | None) -> bool

lock_beat: RedisLock = r.lock(
OnyxRedisLocks.CHECK_CONNECTOR_DOC_PERMISSIONS_SYNC_BEAT_LOCK,
timeout=CELERY_VESPA_SYNC_BEAT_LOCK_TIMEOUT,
timeout=CELERY_GENERIC_BEAT_LOCK_TIMEOUT,
)

# these tasks should never overlap
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from onyx.background.celery.apps.app_base import task_logger
from onyx.configs.app_configs import JOB_TIMEOUT
from onyx.configs.constants import CELERY_EXTERNAL_GROUP_SYNC_LOCK_TIMEOUT
from onyx.configs.constants import CELERY_VESPA_SYNC_BEAT_LOCK_TIMEOUT
from onyx.configs.constants import CELERY_GENERIC_BEAT_LOCK_TIMEOUT
from onyx.configs.constants import DANSWER_REDIS_FUNCTION_LOCK_PREFIX
from onyx.configs.constants import OnyxCeleryPriority
from onyx.configs.constants import OnyxCeleryQueues
Expand Down Expand Up @@ -99,7 +99,7 @@ def check_for_external_group_sync(self: Task, *, tenant_id: str | None) -> bool

lock_beat: RedisLock = r.lock(
OnyxRedisLocks.CHECK_CONNECTOR_EXTERNAL_GROUP_SYNC_BEAT_LOCK,
timeout=CELERY_VESPA_SYNC_BEAT_LOCK_TIMEOUT,
timeout=CELERY_GENERIC_BEAT_LOCK_TIMEOUT,
)

# these tasks should never overlap
Expand Down
9 changes: 5 additions & 4 deletions backend/onyx/background/celery/tasks/indexing/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
from onyx.configs.constants import CELERY_GENERIC_BEAT_LOCK_TIMEOUT
from onyx.configs.constants import CELERY_INDEXING_LOCK_TIMEOUT
from onyx.configs.constants import CELERY_TASK_WAIT_FOR_FENCE_TIMEOUT
from onyx.configs.constants import CELERY_VESPA_SYNC_BEAT_LOCK_TIMEOUT
from onyx.configs.constants import OnyxCeleryPriority
from onyx.configs.constants import OnyxCeleryTask
from onyx.configs.constants import OnyxRedisLocks
Expand Down Expand Up @@ -83,7 +82,7 @@ def check_for_indexing(self: Task, *, tenant_id: str | None) -> int | None:

lock_beat: RedisLock = redis_client.lock(
OnyxRedisLocks.CHECK_INDEXING_BEAT_LOCK,
timeout=CELERY_VESPA_SYNC_BEAT_LOCK_TIMEOUT,
timeout=CELERY_GENERIC_BEAT_LOCK_TIMEOUT,
)

# these tasks should never overlap
Expand Down Expand Up @@ -784,7 +783,7 @@ def cloud_check_for_indexing(self: Task) -> bool | None:
"""a lightweight task used to kick off individual check tasks for each tenant."""
time_start = time.monotonic()

redis_client = get_redis_client(tenant_id="system")
redis_client = get_redis_client(tenant_id="cloud")

lock_beat: RedisLock = redis_client.lock(
OnyxRedisLocks.CLOUD_CHECK_INDEXING_BEAT_LOCK,
Expand Down Expand Up @@ -820,5 +819,7 @@ def cloud_check_for_indexing(self: Task) -> bool | None:
redis_lock_dump(lock_beat, redis_client)

time_elapsed = time.monotonic() - time_start
task_logger.debug(f"check_for_indexing finished: elapsed={time_elapsed:.2f}")
task_logger.info(
f"cloud_check_for_indexing finished: num_tenants={len(tenant_ids)} elapsed={time_elapsed:.2f}"
)
return True
4 changes: 2 additions & 2 deletions backend/onyx/background/celery/tasks/indexing/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from onyx.background.celery.celery_redis import celery_find_task
from onyx.background.celery.celery_redis import celery_get_unacked_task_ids
from onyx.configs.app_configs import DISABLE_INDEX_UPDATE_ON_SWAP
from onyx.configs.constants import CELERY_VESPA_SYNC_BEAT_LOCK_TIMEOUT
from onyx.configs.constants import CELERY_GENERIC_BEAT_LOCK_TIMEOUT
from onyx.configs.constants import DANSWER_REDIS_FUNCTION_LOCK_PREFIX
from onyx.configs.constants import DocumentSource
from onyx.configs.constants import OnyxCeleryPriority
Expand Down Expand Up @@ -143,7 +143,7 @@ def progress(self, tag: str, amount: int) -> None:
try:
current_time = time.monotonic()
if current_time - self.last_lock_monotonic >= (
CELERY_VESPA_SYNC_BEAT_LOCK_TIMEOUT / 4
CELERY_GENERIC_BEAT_LOCK_TIMEOUT / 4
):
self.redis_lock.reacquire()
self.last_lock_reacquire = datetime.now(timezone.utc)
Expand Down
4 changes: 2 additions & 2 deletions backend/onyx/background/celery/tasks/pruning/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
from onyx.background.celery.tasks.indexing.utils import IndexingCallback
from onyx.configs.app_configs import ALLOW_SIMULTANEOUS_PRUNING
from onyx.configs.app_configs import JOB_TIMEOUT
from onyx.configs.constants import CELERY_GENERIC_BEAT_LOCK_TIMEOUT
from onyx.configs.constants import CELERY_PRUNING_LOCK_TIMEOUT
from onyx.configs.constants import CELERY_VESPA_SYNC_BEAT_LOCK_TIMEOUT
from onyx.configs.constants import DANSWER_REDIS_FUNCTION_LOCK_PREFIX
from onyx.configs.constants import OnyxCeleryPriority
from onyx.configs.constants import OnyxCeleryQueues
Expand Down Expand Up @@ -86,7 +86,7 @@ def check_for_pruning(self: Task, *, tenant_id: str | None) -> bool | None:

lock_beat: RedisLock = r.lock(
OnyxRedisLocks.CHECK_PRUNE_BEAT_LOCK,
timeout=CELERY_VESPA_SYNC_BEAT_LOCK_TIMEOUT,
timeout=CELERY_GENERIC_BEAT_LOCK_TIMEOUT,
)

# these tasks should never overlap
Expand Down
1 change: 1 addition & 0 deletions backend/onyx/db/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,7 @@ def get_all_tenant_ids() -> list[str] | list[None]:

if not MULTI_TENANT:
return [None]

with get_session_with_tenant(tenant_id=POSTGRES_DEFAULT_SCHEMA) as session:
result = session.execute(
text(
Expand Down
4 changes: 2 additions & 2 deletions backend/onyx/redis/redis_connector_doc_perm_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from redis.lock import Lock as RedisLock

from onyx.access.models import DocExternalAccess
from onyx.configs.constants import CELERY_VESPA_SYNC_BEAT_LOCK_TIMEOUT
from onyx.configs.constants import CELERY_GENERIC_BEAT_LOCK_TIMEOUT
from onyx.configs.constants import OnyxCeleryPriority
from onyx.configs.constants import OnyxCeleryQueues
from onyx.configs.constants import OnyxCeleryTask
Expand Down Expand Up @@ -147,7 +147,7 @@ def generate_tasks(
for doc_perm in new_permissions:
current_time = time.monotonic()
if lock and current_time - last_lock_time >= (
CELERY_VESPA_SYNC_BEAT_LOCK_TIMEOUT / 4
CELERY_GENERIC_BEAT_LOCK_TIMEOUT / 4
):
lock.reacquire()
last_lock_time = current_time
Expand Down
4 changes: 2 additions & 2 deletions backend/onyx/redis/redis_connector_prune.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from redis.lock import Lock as RedisLock
from sqlalchemy.orm import Session

from onyx.configs.constants import CELERY_VESPA_SYNC_BEAT_LOCK_TIMEOUT
from onyx.configs.constants import CELERY_GENERIC_BEAT_LOCK_TIMEOUT
from onyx.configs.constants import OnyxCeleryPriority
from onyx.configs.constants import OnyxCeleryQueues
from onyx.configs.constants import OnyxCeleryTask
Expand Down Expand Up @@ -122,7 +122,7 @@ def generate_tasks(
for doc_id in documents_to_prune:
current_time = time.monotonic()
if lock and current_time - last_lock_time >= (
CELERY_VESPA_SYNC_BEAT_LOCK_TIMEOUT / 4
CELERY_GENERIC_BEAT_LOCK_TIMEOUT / 4
):
lock.reacquire()
last_lock_time = current_time
Expand Down

0 comments on commit a14dff3

Please sign in to comment.