Skip to content

Commit

Permalink
handle general error in sync_container_lifecycle()
Browse files Browse the repository at this point in the history
  • Loading branch information
fregataa committed Jun 13, 2024
1 parent 3b1eab3 commit f784d6c
Showing 1 changed file with 64 additions and 59 deletions.
123 changes: 64 additions & 59 deletions src/ai/backend/agent/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -1265,66 +1265,71 @@ async def sync_container_lifecycles(self, interval: float) -> None:
own_kernels: dict[KernelId, ContainerId] = {}
terminated_kernels: dict[KernelId, ContainerLifecycleEvent] = {}

async with self.registry_lock:
try:
# Check if: there are dead containers
for kernel_id, container in await self.enumerate_containers(DEAD_STATUS_SET):
if kernel_id in self.restarting_kernels:
continue
log.info(
"detected dead container during lifeycle sync (k:{}, c:{})",
kernel_id,
container.id,
)
session_id = SessionId(UUID(container.labels["ai.backend.session-id"]))
terminated_kernels[kernel_id] = ContainerLifecycleEvent(
kernel_id,
session_id,
container.id,
LifecycleEvent.CLEAN,
KernelLifecycleEventReason.SELF_TERMINATED,
)
for kernel_id, container in await self.enumerate_containers(ACTIVE_STATUS_SET):
alive_kernels[kernel_id] = container.id
session_id = SessionId(UUID(container.labels["ai.backend.session-id"]))
kernel_session_map[kernel_id] = session_id
own_kernels[kernel_id] = container.id
for kernel_id, kernel_obj in self.kernel_registry.items():
known_kernels[kernel_id] = kernel_obj["container_id"]
session_id = kernel_obj.session_id
kernel_session_map[kernel_id] = session_id
# Check if: kernel_registry has the container but it's gone.
for kernel_id in known_kernels.keys() - alive_kernels.keys():
if (
kernel_id in self.restarting_kernels
or self.kernel_registry[kernel_id].status == KernelStatus.PREPARING
):
continue
terminated_kernels[kernel_id] = ContainerLifecycleEvent(
kernel_id,
kernel_session_map[kernel_id],
known_kernels[kernel_id],
LifecycleEvent.CLEAN,
KernelLifecycleEventReason.SELF_TERMINATED,
)
# Check if: there are containers already deleted from my registry or not spawned by me.
for kernel_id in alive_kernels.keys() - known_kernels.keys():
if kernel_id in self.restarting_kernels:
continue
terminated_kernels[kernel_id] = ContainerLifecycleEvent(
kernel_id,
kernel_session_map[kernel_id],
alive_kernels[kernel_id],
LifecycleEvent.DESTROY,
KernelLifecycleEventReason.TERMINATED_UNKNOWN_CONTAINER,
)
finally:
# Enqueue the events.
for kernel_id, ev in terminated_kernels.items():
await self.container_lifecycle_queue.put(ev)
try:
async with self.registry_lock:
try:
# Check if: there are dead containers
for kernel_id, container in await self.enumerate_containers(DEAD_STATUS_SET):
if kernel_id in self.restarting_kernels:
continue
log.info(
"detected dead container during lifeycle sync (k:{}, c:{})",
kernel_id,
container.id,
)
session_id = SessionId(UUID(container.labels["ai.backend.session-id"]))
terminated_kernels[kernel_id] = ContainerLifecycleEvent(
kernel_id,
session_id,
container.id,
LifecycleEvent.CLEAN,
KernelLifecycleEventReason.SELF_TERMINATED,
)
for kernel_id, container in await self.enumerate_containers(ACTIVE_STATUS_SET):
alive_kernels[kernel_id] = container.id
session_id = SessionId(UUID(container.labels["ai.backend.session-id"]))
kernel_session_map[kernel_id] = session_id
own_kernels[kernel_id] = container.id
for kernel_id, kernel_obj in self.kernel_registry.items():
known_kernels[kernel_id] = kernel_obj["container_id"]
session_id = kernel_obj.session_id
kernel_session_map[kernel_id] = session_id
# Check if: kernel_registry has the container but it's gone.
for kernel_id in known_kernels.keys() - alive_kernels.keys():
if (
kernel_id in self.restarting_kernels
or self.kernel_registry[kernel_id].status == KernelStatus.PREPARING
):
continue
terminated_kernels[kernel_id] = ContainerLifecycleEvent(
kernel_id,
kernel_session_map[kernel_id],
known_kernels[kernel_id],
LifecycleEvent.CLEAN,
KernelLifecycleEventReason.SELF_TERMINATED,
)
# Check if: there are containers already deleted from my registry or not spawned by me.
for kernel_id in alive_kernels.keys() - known_kernels.keys():
if kernel_id in self.restarting_kernels:
continue
terminated_kernels[kernel_id] = ContainerLifecycleEvent(
kernel_id,
kernel_session_map[kernel_id],
alive_kernels[kernel_id],
LifecycleEvent.DESTROY,
KernelLifecycleEventReason.TERMINATED_UNKNOWN_CONTAINER,
)
finally:
# Enqueue the events.
for kernel_id, ev in terminated_kernels.items():
await self.container_lifecycle_queue.put(ev)

# Set container count
await self.set_container_count(len(own_kernels.keys()))
# Set container count
await self.set_container_count(len(own_kernels.keys()))
except asyncio.TimeoutError:
log.warning("sync_container_lifecycles() timeout")
except Exception:
log.exception("sync_container_lifecycles() failure, continuing")

async def set_container_count(self, container_count: int) -> None:
await redis_helper.execute(
Expand Down

0 comments on commit f784d6c

Please sign in to comment.