Skip to content

Commit

Permalink
use a larger scan_iter value for performance
Browse files Browse the repository at this point in the history
  • Loading branch information
Richard Kuo (Danswer) committed Jan 7, 2025
1 parent 1f82a3d commit 8f01fe1
Show file tree
Hide file tree
Showing 6 changed files with 40 additions and 11 deletions.
7 changes: 5 additions & 2 deletions backend/onyx/background/celery/tasks/indexing/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
from onyx.redis.redis_connector_index import RedisConnectorIndexPayload
from onyx.redis.redis_pool import get_redis_client
from onyx.redis.redis_pool import redis_lock_dump
from onyx.redis.redis_pool import SCAN_ITER_COUNT_DEFAULT
from onyx.utils.logger import setup_logger
from onyx.utils.variable_functionality import global_version
from shared_configs.configs import INDEXING_MODEL_SERVER_HOST
Expand Down Expand Up @@ -262,7 +263,7 @@ def check_for_indexing(self: Task, *, tenant_id: str | None) -> int | None:
tenant_id == "tenant_i-043470d740845ec56"
or tenant_id == "tenant_82b497ce-88aa-4fbd-841a-92cae43529c8"
):
logger.info(
task_logger.info(
f"check_for_indexing lock: "
f"tenant={tenant_id} "
f"cc_pair={cc_pair_id} "
Expand Down Expand Up @@ -417,7 +418,9 @@ def validate_indexing_fences(
)

# validate all existing indexing jobs
for key_bytes in r.scan_iter(RedisConnectorIndex.FENCE_PREFIX + "*"):
for key_bytes in r.scan_iter(
RedisConnectorIndex.FENCE_PREFIX + "*", count=SCAN_ITER_COUNT_DEFAULT
):
lock_beat.reacquire()
with get_session_with_tenant(tenant_id) as db_session:
validate_indexing_fence(
Expand Down
26 changes: 20 additions & 6 deletions backend/onyx/background/celery/tasks/vespa/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
from onyx.redis.redis_document_set import RedisDocumentSet
from onyx.redis.redis_pool import get_redis_client
from onyx.redis.redis_pool import redis_lock_dump
from onyx.redis.redis_pool import SCAN_ITER_COUNT_DEFAULT
from onyx.redis.redis_usergroup import RedisUserGroup
from onyx.utils.logger import setup_logger
from onyx.utils.variable_functionality import fetch_versioned_implementation
Expand Down Expand Up @@ -840,7 +841,9 @@ def monitor_vespa_sync(self: Task, tenant_id: str | None) -> bool | None:

phase_start = time.monotonic()
lock_beat.reacquire()
for key_bytes in r.scan_iter(RedisConnectorDelete.FENCE_PREFIX + "*"):
for key_bytes in r.scan_iter(
RedisConnectorDelete.FENCE_PREFIX + "*", count=SCAN_ITER_COUNT_DEFAULT
):
monitor_connector_deletion_taskset(tenant_id, key_bytes, r)
lock_beat.reacquire()

Expand All @@ -851,7 +854,9 @@ def monitor_vespa_sync(self: Task, tenant_id: str | None) -> bool | None:

phase_start = time.monotonic()
lock_beat.reacquire()
for key_bytes in r.scan_iter(RedisDocumentSet.FENCE_PREFIX + "*"):
for key_bytes in r.scan_iter(
RedisDocumentSet.FENCE_PREFIX + "*", count=SCAN_ITER_COUNT_DEFAULT
):
with get_session_with_tenant(tenant_id) as db_session:
monitor_document_set_taskset(tenant_id, key_bytes, r, db_session)
lock_beat.reacquire()
Expand All @@ -860,7 +865,9 @@ def monitor_vespa_sync(self: Task, tenant_id: str | None) -> bool | None:

phase_start = time.monotonic()
lock_beat.reacquire()
for key_bytes in r.scan_iter(RedisUserGroup.FENCE_PREFIX + "*"):
for key_bytes in r.scan_iter(
RedisUserGroup.FENCE_PREFIX + "*", count=SCAN_ITER_COUNT_DEFAULT
):
monitor_usergroup_taskset = fetch_versioned_implementation_with_fallback(
"onyx.background.celery.tasks.vespa.tasks",
"monitor_usergroup_taskset",
Expand All @@ -874,7 +881,9 @@ def monitor_vespa_sync(self: Task, tenant_id: str | None) -> bool | None:

phase_start = time.monotonic()
lock_beat.reacquire()
for key_bytes in r.scan_iter(RedisConnectorPrune.FENCE_PREFIX + "*"):
for key_bytes in r.scan_iter(
RedisConnectorPrune.FENCE_PREFIX + "*", count=SCAN_ITER_COUNT_DEFAULT
):
with get_session_with_tenant(tenant_id) as db_session:
monitor_ccpair_pruning_taskset(tenant_id, key_bytes, r, db_session)
lock_beat.reacquire()
Expand All @@ -883,7 +892,9 @@ def monitor_vespa_sync(self: Task, tenant_id: str | None) -> bool | None:

phase_start = time.monotonic()
lock_beat.reacquire()
for key_bytes in r.scan_iter(RedisConnectorIndex.FENCE_PREFIX + "*"):
for key_bytes in r.scan_iter(
RedisConnectorIndex.FENCE_PREFIX + "*", count=SCAN_ITER_COUNT_DEFAULT
):
with get_session_with_tenant(tenant_id) as db_session:
monitor_ccpair_indexing_taskset(tenant_id, key_bytes, r, db_session)
lock_beat.reacquire()
Expand All @@ -892,7 +903,10 @@ def monitor_vespa_sync(self: Task, tenant_id: str | None) -> bool | None:

phase_start = time.monotonic()
lock_beat.reacquire()
for key_bytes in r.scan_iter(RedisConnectorPermissionSync.FENCE_PREFIX + "*"):
for key_bytes in r.scan_iter(
RedisConnectorPermissionSync.FENCE_PREFIX + "*",
count=SCAN_ITER_COUNT_DEFAULT,
):
with get_session_with_tenant(tenant_id) as db_session:
monitor_ccpair_permissions_taskset(tenant_id, key_bytes, r, db_session)
lock_beat.reacquire()
Expand Down
6 changes: 5 additions & 1 deletion backend/onyx/redis/redis_connector_doc_perm_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from onyx.configs.constants import OnyxCeleryPriority
from onyx.configs.constants import OnyxCeleryQueues
from onyx.configs.constants import OnyxCeleryTask
from onyx.redis.redis_pool import SCAN_ITER_COUNT_DEFAULT


class RedisConnectorPermissionSyncPayload(BaseModel):
Expand Down Expand Up @@ -68,7 +69,10 @@ def get_remaining(self) -> int:
def get_active_task_count(self) -> int:
"""Count of active permission sync tasks"""
count = 0
for _ in self.redis.scan_iter(RedisConnectorPermissionSync.FENCE_PREFIX + "*"):
for _ in self.redis.scan_iter(
RedisConnectorPermissionSync.FENCE_PREFIX + "*",
count=SCAN_ITER_COUNT_DEFAULT,
):
count += 1
return count

Expand Down
5 changes: 4 additions & 1 deletion backend/onyx/redis/redis_connector_ext_group_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
from redis.lock import Lock as RedisLock
from sqlalchemy.orm import Session

from onyx.redis.redis_pool import SCAN_ITER_COUNT_DEFAULT


class RedisConnectorExternalGroupSyncPayload(BaseModel):
started: datetime | None
Expand Down Expand Up @@ -63,7 +65,8 @@ def get_active_task_count(self) -> int:
"""Count of active external group syncing tasks"""
count = 0
for _ in self.redis.scan_iter(
RedisConnectorExternalGroupSync.FENCE_PREFIX + "*"
RedisConnectorExternalGroupSync.FENCE_PREFIX + "*",
count=SCAN_ITER_COUNT_DEFAULT,
):
count += 1
return count
Expand Down
5 changes: 4 additions & 1 deletion backend/onyx/redis/redis_connector_prune.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from onyx.configs.constants import OnyxCeleryQueues
from onyx.configs.constants import OnyxCeleryTask
from onyx.db.connector_credential_pair import get_connector_credential_pair_from_id
from onyx.redis.redis_pool import SCAN_ITER_COUNT_DEFAULT


class RedisConnectorPrune:
Expand Down Expand Up @@ -63,7 +64,9 @@ def get_remaining(self) -> int:
def get_active_task_count(self) -> int:
"""Count of active pruning tasks"""
count = 0
for key in self.redis.scan_iter(RedisConnectorPrune.FENCE_PREFIX + "*"):
for key in self.redis.scan_iter(
RedisConnectorPrune.FENCE_PREFIX + "*", count=SCAN_ITER_COUNT_DEFAULT
):
count += 1
return count

Expand Down
2 changes: 2 additions & 0 deletions backend/onyx/redis/redis_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@

logger = setup_logger()

SCAN_ITER_COUNT_DEFAULT = 4096


class TenantRedis(redis.Redis):
def __init__(self, tenant_id: str, *args: Any, **kwargs: Any) -> None:
Expand Down

0 comments on commit 8f01fe1

Please sign in to comment.