Skip to content

Commit

Permalink
fix: Change context indent to handle RPC exception correctly when cal…
Browse files Browse the repository at this point in the history
…ling create kernel
  • Loading branch information
fregataa committed Jul 19, 2024
1 parent dbe909d commit 6830de1
Showing 1 changed file with 41 additions and 39 deletions.
80 changes: 41 additions & 39 deletions src/ai/backend/manager/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -1711,11 +1711,11 @@ async def _update_kernel() -> None:

await execute_with_retry(_update_kernel)

async with self.agent_cache.rpc_context(
agent_alloc_ctx.agent_id,
order_key=str(scheduled_session.id),
) as rpc:
try:
try:
async with self.agent_cache.rpc_context(
agent_alloc_ctx.agent_id,
order_key=str(scheduled_session.id),
) as rpc:
get_image_ref = lambda k: image_infos[str(k.image_ref)].image_ref
# Issue a batched RPC call to create kernels on this agent
# created_infos = await rpc.call.create_kernels(
Expand Down Expand Up @@ -1783,42 +1783,44 @@ async def _update_kernel() -> None:
[binding.kernel.id for binding in items],
agent_alloc_ctx.agent_id,
)
except (asyncio.TimeoutError, asyncio.CancelledError):
log.warning("_create_kernels_in_one_agent(s:{}) cancelled", scheduled_session.id)
except Exception as e:
# The agent has already cancelled or issued the destruction lifecycle event
# for this batch of kernels.
ex = e
for binding in items:
kernel_id = binding.kernel.id

async def _update_failure() -> None:
async with self.db.begin_session() as db_sess:
now = datetime.now(tzutc())
query = (
sa.update(KernelRow)
.where(KernelRow.id == kernel_id)
.values(
status=KernelStatus.ERROR,
status_info=f"other-error ({ex!r})",
status_changed=now,
terminated_at=now,
status_history=sql_json_merge(
KernelRow.status_history,
(),
{
KernelStatus.ERROR.name: (
now.isoformat()
), # ["PULLING", "PREPARING"]
},
),
status_data=convert_to_status_data(ex, self.debug),
)
except (asyncio.TimeoutError, asyncio.CancelledError):
log.warning("_create_kernels_in_one_agent(s:{}) cancelled", scheduled_session.id)
except Exception as e:
ex = e
err_info = convert_to_status_data(ex, self.debug)

# The agent has already cancelled or issued the destruction lifecycle event
# for this batch of kernels.
for binding in items:
kernel_id = binding.kernel.id

async def _update_failure() -> None:
async with self.db.begin_session() as db_sess:
now = datetime.now(tzutc())
query = (
sa.update(KernelRow)
.where(KernelRow.id == kernel_id)
.values(
status=KernelStatus.ERROR,
status_info=f"other-error ({ex!r})",
status_changed=now,
terminated_at=now,
status_history=sql_json_merge(
KernelRow.status_history,
(),
{
KernelStatus.ERROR.name: (
now.isoformat()
), # ["PULLING", "PREPARING"]
},
),
status_data=err_info,
)
await db_sess.execute(query)
)
await db_sess.execute(query)

await execute_with_retry(_update_failure)
raise
await execute_with_retry(_update_failure)
raise

async def create_cluster_ssh_keypair(self) -> ClusterSSHKeyPair:
key = rsa.generate_private_key(
Expand Down

0 comments on commit 6830de1

Please sign in to comment.