Skip to content

Commit

Permalink
fetch containers eagerly in sync_container_lifecycle() and revert reg…
Browse files Browse the repository at this point in the history
…istry_lock when assign container id
  • Loading branch information
fregataa committed Jun 17, 2024
1 parent 3794d16 commit fd42bfb
Showing 1 changed file with 18 additions and 6 deletions.
24 changes: 18 additions & 6 deletions src/ai/backend/agent/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -1275,10 +1275,16 @@ async def sync_container_lifecycles(self, interval: float) -> None:
terminated_kernels: dict[KernelId, ContainerLifecycleEvent] = {}

try:
_containers = await self.enumerate_containers(ACTIVE_STATUS_SET | DEAD_STATUS_SET)
async with self.registry_lock:
try:
# Check if: there are dead containers
for kernel_id, container in await self.enumerate_containers(DEAD_STATUS_SET):
dead_containers = [
(kid, container)
for kid, container in _containers
if container.status in DEAD_STATUS_SET
]
for kernel_id, container in dead_containers:
if kernel_id in self.restarting_kernels:
continue
log.info(
Expand All @@ -1294,7 +1300,12 @@ async def sync_container_lifecycles(self, interval: float) -> None:
LifecycleEvent.CLEAN,
KernelLifecycleEventReason.SELF_TERMINATED,
)
for kernel_id, container in await self.enumerate_containers(ACTIVE_STATUS_SET):
active_containers = [
(kid, container)
for kid, container in _containers
if container.status in ACTIVE_STATUS_SET
]
for kernel_id, container in active_containers:
alive_kernels[kernel_id] = container.id
session_id = SessionId(UUID(container.labels["ai.backend.session-id"]))
kernel_session_map[kernel_id] = session_id
Expand Down Expand Up @@ -1336,9 +1347,9 @@ async def sync_container_lifecycles(self, interval: float) -> None:
# Set container count
await self.set_container_count(len(own_kernels.keys()))
except asyncio.TimeoutError:
log.warning("sync_container_lifecycles() timeout")
except Exception:
log.exception("sync_container_lifecycles() failure, continuing")
log.warning("sync_container_lifecycles() timeout, continuing")
except Exception as e:
log.exception(f"sync_container_lifecycles() failure, continuing (detail: {repr(e)})")

async def set_container_count(self, container_count: int) -> None:
await redis_helper.execute(
Expand Down Expand Up @@ -1969,7 +1980,8 @@ async def create_kernel(
f" (k:{kernel_id}, detail:{msg})",
)
cid = e.container_id
self.kernel_registry[ctx.kernel_id]["container_id"] = cid
async with self.registry_lock:
self.kernel_registry[ctx.kernel_id]["container_id"] = cid
await self.inject_container_lifecycle_event(
kernel_id,
session_id,
Expand Down

0 comments on commit fd42bfb

Please sign in to comment.