diff --git a/src/ai/backend/manager/registry.py b/src/ai/backend/manager/registry.py index ffd82447ac..283b9f3877 100644 --- a/src/ai/backend/manager/registry.py +++ b/src/ai/backend/manager/registry.py @@ -1711,11 +1711,11 @@ async def _update_kernel() -> None: await execute_with_retry(_update_kernel) - async with self.agent_cache.rpc_context( - agent_alloc_ctx.agent_id, - order_key=str(scheduled_session.id), - ) as rpc: - try: + try: + async with self.agent_cache.rpc_context( + agent_alloc_ctx.agent_id, + order_key=str(scheduled_session.id), + ) as rpc: get_image_ref = lambda k: image_infos[str(k.image_ref)].image_ref # Issue a batched RPC call to create kernels on this agent # created_infos = await rpc.call.create_kernels( @@ -1783,42 +1783,44 @@ async def _update_kernel() -> None: [binding.kernel.id for binding in items], agent_alloc_ctx.agent_id, ) - except (asyncio.TimeoutError, asyncio.CancelledError): - log.warning("_create_kernels_in_one_agent(s:{}) cancelled", scheduled_session.id) - except Exception as e: - # The agent has already cancelled or issued the destruction lifecycle event - # for this batch of kernels. - ex = e - for binding in items: - kernel_id = binding.kernel.id - - async def _update_failure() -> None: - async with self.db.begin_session() as db_sess: - now = datetime.now(tzutc()) - query = ( - sa.update(KernelRow) - .where(KernelRow.id == kernel_id) - .values( - status=KernelStatus.ERROR, - status_info=f"other-error ({ex!r})", - status_changed=now, - terminated_at=now, - status_history=sql_json_merge( - KernelRow.status_history, - (), - { - KernelStatus.ERROR.name: ( - now.isoformat() - ), # ["PULLING", "PREPARING"] - }, - ), - status_data=convert_to_status_data(ex, self.debug), - ) + except (asyncio.TimeoutError, asyncio.CancelledError): + log.warning("_create_kernels_in_one_agent(s:{}) cancelled", scheduled_session.id) + except Exception as e: + ex = e + err_info = convert_to_status_data(ex, self.debug) + + # The agent has already cancelled or issued the destruction lifecycle event + # for this batch of kernels. + for binding in items: + kernel_id = binding.kernel.id + + async def _update_failure() -> None: + async with self.db.begin_session() as db_sess: + now = datetime.now(tzutc()) + query = ( + sa.update(KernelRow) + .where(KernelRow.id == kernel_id) + .values( + status=KernelStatus.ERROR, + status_info=f"other-error ({ex!r})", + status_changed=now, + terminated_at=now, + status_history=sql_json_merge( + KernelRow.status_history, + (), + { + KernelStatus.ERROR.name: ( + now.isoformat() + ), # ["PULLING", "PREPARING"] + }, + ), + status_data=err_info, ) - await db_sess.execute(query) + ) + await db_sess.execute(query) - await execute_with_retry(_update_failure) - raise + await execute_with_retry(_update_failure) + raise async def create_cluster_ssh_keypair(self) -> ClusterSSHKeyPair: key = rsa.generate_private_key(