From f784d6c4dd67b0e724e64916bdad5ae422f830cd Mon Sep 17 00:00:00 2001 From: Sanghun Lee Date: Thu, 13 Jun 2024 19:22:20 +0900 Subject: [PATCH] handle general error in sync_container_lifecycle() --- src/ai/backend/agent/agent.py | 123 ++++++++++++++++++---------------- 1 file changed, 64 insertions(+), 59 deletions(-) diff --git a/src/ai/backend/agent/agent.py b/src/ai/backend/agent/agent.py index e779ef74caf..8d848b7e302 100644 --- a/src/ai/backend/agent/agent.py +++ b/src/ai/backend/agent/agent.py @@ -1265,66 +1265,71 @@ async def sync_container_lifecycles(self, interval: float) -> None: own_kernels: dict[KernelId, ContainerId] = {} terminated_kernels: dict[KernelId, ContainerLifecycleEvent] = {} - async with self.registry_lock: - try: - # Check if: there are dead containers - for kernel_id, container in await self.enumerate_containers(DEAD_STATUS_SET): - if kernel_id in self.restarting_kernels: - continue - log.info( - "detected dead container during lifeycle sync (k:{}, c:{})", - kernel_id, - container.id, - ) - session_id = SessionId(UUID(container.labels["ai.backend.session-id"])) - terminated_kernels[kernel_id] = ContainerLifecycleEvent( - kernel_id, - session_id, - container.id, - LifecycleEvent.CLEAN, - KernelLifecycleEventReason.SELF_TERMINATED, - ) - for kernel_id, container in await self.enumerate_containers(ACTIVE_STATUS_SET): - alive_kernels[kernel_id] = container.id - session_id = SessionId(UUID(container.labels["ai.backend.session-id"])) - kernel_session_map[kernel_id] = session_id - own_kernels[kernel_id] = container.id - for kernel_id, kernel_obj in self.kernel_registry.items(): - known_kernels[kernel_id] = kernel_obj["container_id"] - session_id = kernel_obj.session_id - kernel_session_map[kernel_id] = session_id - # Check if: kernel_registry has the container but it's gone. - for kernel_id in known_kernels.keys() - alive_kernels.keys(): - if ( - kernel_id in self.restarting_kernels - or self.kernel_registry[kernel_id].status == KernelStatus.PREPARING - ): - continue - terminated_kernels[kernel_id] = ContainerLifecycleEvent( - kernel_id, - kernel_session_map[kernel_id], - known_kernels[kernel_id], - LifecycleEvent.CLEAN, - KernelLifecycleEventReason.SELF_TERMINATED, - ) - # Check if: there are containers already deleted from my registry or not spawned by me. - for kernel_id in alive_kernels.keys() - known_kernels.keys(): - if kernel_id in self.restarting_kernels: - continue - terminated_kernels[kernel_id] = ContainerLifecycleEvent( - kernel_id, - kernel_session_map[kernel_id], - alive_kernels[kernel_id], - LifecycleEvent.DESTROY, - KernelLifecycleEventReason.TERMINATED_UNKNOWN_CONTAINER, - ) - finally: - # Enqueue the events. - for kernel_id, ev in terminated_kernels.items(): - await self.container_lifecycle_queue.put(ev) + try: + async with self.registry_lock: + try: + # Check if: there are dead containers + for kernel_id, container in await self.enumerate_containers(DEAD_STATUS_SET): + if kernel_id in self.restarting_kernels: + continue + log.info( + "detected dead container during lifeycle sync (k:{}, c:{})", + kernel_id, + container.id, + ) + session_id = SessionId(UUID(container.labels["ai.backend.session-id"])) + terminated_kernels[kernel_id] = ContainerLifecycleEvent( + kernel_id, + session_id, + container.id, + LifecycleEvent.CLEAN, + KernelLifecycleEventReason.SELF_TERMINATED, + ) + for kernel_id, container in await self.enumerate_containers(ACTIVE_STATUS_SET): + alive_kernels[kernel_id] = container.id + session_id = SessionId(UUID(container.labels["ai.backend.session-id"])) + kernel_session_map[kernel_id] = session_id + own_kernels[kernel_id] = container.id + for kernel_id, kernel_obj in self.kernel_registry.items(): + known_kernels[kernel_id] = kernel_obj["container_id"] + session_id = kernel_obj.session_id + kernel_session_map[kernel_id] = session_id + # Check if: kernel_registry has the container but it's gone. + for kernel_id in known_kernels.keys() - alive_kernels.keys(): + if ( + kernel_id in self.restarting_kernels + or self.kernel_registry[kernel_id].status == KernelStatus.PREPARING + ): + continue + terminated_kernels[kernel_id] = ContainerLifecycleEvent( + kernel_id, + kernel_session_map[kernel_id], + known_kernels[kernel_id], + LifecycleEvent.CLEAN, + KernelLifecycleEventReason.SELF_TERMINATED, + ) + # Check if: there are containers already deleted from my registry or not spawned by me. + for kernel_id in alive_kernels.keys() - known_kernels.keys(): + if kernel_id in self.restarting_kernels: + continue + terminated_kernels[kernel_id] = ContainerLifecycleEvent( + kernel_id, + kernel_session_map[kernel_id], + alive_kernels[kernel_id], + LifecycleEvent.DESTROY, + KernelLifecycleEventReason.TERMINATED_UNKNOWN_CONTAINER, + ) + finally: + # Enqueue the events. + for kernel_id, ev in terminated_kernels.items(): + await self.container_lifecycle_queue.put(ev) - # Set container count - await self.set_container_count(len(own_kernels.keys())) + # Set container count + await self.set_container_count(len(own_kernels.keys())) + except asyncio.TimeoutError: + log.warning("sync_container_lifecycles() timeout") + except Exception: + log.exception("sync_container_lifecycles() failure, continuing") async def set_container_count(self, container_count: int) -> None: await redis_helper.execute(