Skip to content

Commit

Permalink
feat: Apply 'check-and-pull' mechanism to session creation lifecycle
Browse files Browse the repository at this point in the history
  • Loading branch information
fregataa committed Aug 31, 2024
1 parent 0927a15 commit 2ad51d8
Show file tree
Hide file tree
Showing 7 changed files with 173 additions and 110 deletions.
12 changes: 10 additions & 2 deletions src/ai/backend/common/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,12 @@ class DoScheduleEvent(EmptyEventArgs, AbstractEvent):
name = "do_schedule"


class DoPrepareEvent(EmptyEventArgs, AbstractEvent):
name = "do_prepare"
class DoCheckReadinessEvent(EmptyEventArgs, AbstractEvent):
name = "do_check_readiness"


class DoStartSessionEvent(EmptyEventArgs, AbstractEvent):
name = "do_start_session"


class DoScaleEvent(EmptyEventArgs, AbstractEvent):
Expand Down Expand Up @@ -438,6 +442,10 @@ class SessionScheduledEvent(SessionCreationEventArgs, AbstractEvent):
name = "session_scheduled"


class SessionCheckingReadinessEvent(SessionCreationEventArgs, AbstractEvent):
name = "session_checking_readiness"


class SessionPreparingEvent(SessionCreationEventArgs, AbstractEvent):
name = "session_preparing"

Expand Down
2 changes: 2 additions & 0 deletions src/ai/backend/manager/api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,6 @@ class ManagerStatus(enum.StrEnum):
class SchedulerEvent(enum.StrEnum):
SCHEDULE = "schedule"
PREPARE = "prepare"
CHECK_READINESS = "check_readiness"
START_SESSION = "start_session"
SCALE_SERVICES = "scale_services"
13 changes: 10 additions & 3 deletions src/ai/backend/manager/api/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,12 @@

from ai.backend.common import redis_helper
from ai.backend.common import validators as tx
from ai.backend.common.events import DoPrepareEvent, DoScaleEvent, DoScheduleEvent
from ai.backend.common.events import (
DoCheckReadinessEvent,
DoScaleEvent,
DoScheduleEvent,
DoStartSessionEvent,
)
from ai.backend.logging import BraceStyleAdapter

from .. import __version__
Expand Down Expand Up @@ -249,8 +254,10 @@ async def scheduler_trigger(request: web.Request, params: Any) -> web.Response:
match params["event"]:
case SchedulerEvent.SCHEDULE:
await root_ctx.event_producer.produce_event(DoScheduleEvent())
case SchedulerEvent.PREPARE:
await root_ctx.event_producer.produce_event(DoPrepareEvent())
case SchedulerEvent.PREPARE | SchedulerEvent.CHECK_READINESS:
await root_ctx.event_producer.produce_event(DoCheckReadinessEvent())
case SchedulerEvent.START_SESSION:
await root_ctx.event_producer.produce_event(DoStartSessionEvent())
case SchedulerEvent.SCALE_SERVICES:
await root_ctx.event_producer.produce_event(DoScaleEvent())
return web.Response(status=204)
Expand Down
1 change: 1 addition & 0 deletions src/ai/backend/manager/defs.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ class LockID(enum.IntEnum):
LOCKID_PREPARE = 92
LOCKID_SCHEDULE_TIMER = 191
LOCKID_PREPARE_TIMER = 192
LOCKID_START_TIMER = 198
LOCKID_SCALE_TIMER = 193
LOCKID_LOG_CLEANUP_TIMER = 195
LOCKID_IDLE_CHECK_TIMER = 196
Expand Down
13 changes: 13 additions & 0 deletions src/ai/backend/manager/models/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -794,6 +794,19 @@ async def get_session_id_by_kernel(
async with db.begin_readonly_session() as db_session:
return await db_session.scalar(query)

@classmethod
async def get_sessions_by_status(
cls, db_session: SASession, status: SessionStatus
) -> list[SessionRow]:
stmt = (
sa.select(SessionRow)
.where(SessionRow.status == status)
.options(
selectinload(SessionRow.kernels),
)
)
return (await db_session.scalars(stmt)).all()

@classmethod
async def get_session_to_determine_status(
cls, db_session: SASession, session_id: SessionId
Expand Down
21 changes: 4 additions & 17 deletions src/ai/backend/manager/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -2223,25 +2223,13 @@ async def _destroy(db_session: AsyncSession) -> SessionRow:
raise SessionNotFound

match target_session.status:
case SessionStatus.PENDING:
case SessionStatus.PENDING | SessionStatus.PULLING:
await SessionRow.set_session_status(
self.db, session_id, SessionStatus.CANCELLED
)
case SessionStatus.PULLING:
# Exceptionally allow superadmins to destroy PULLING sessions.
# Clients should be informed that they have to handle the containers destroyed here.
# TODO: detach image-pull process from kernel-start process and allow all users to destroy PULLING sessions.
if forced and user_role == UserRole.SUPERADMIN:
log.warning(
"force-terminating session (s:{}, status:{})",
session_id,
target_session.status,
)
await _force_destroy_for_suadmin(SessionStatus.CANCELLED)
return {}
raise GenericForbidden("Cannot destroy sessions in pulling status")
case (
SessionStatus.SCHEDULED
| SessionStatus.READY_TO_START
| SessionStatus.PREPARING
| SessionStatus.TERMINATING
| SessionStatus.ERROR
Expand Down Expand Up @@ -2300,7 +2288,7 @@ async def _destroy(db_session: AsyncSession) -> SessionRow:
kernel: KernelRow
for kernel in grouped_kernels:
match kernel.status:
case KernelStatus.PENDING:
case KernelStatus.PENDING | KernelStatus.PULLING:
await KernelRow.set_kernel_status(
self.db,
kernel.id,
Expand All @@ -2327,10 +2315,9 @@ async def _destroy(db_session: AsyncSession) -> SessionRow:
reason,
),
)
case KernelStatus.PULLING:
raise GenericForbidden("Cannot destroy kernels in pulling status")
case (
KernelStatus.SCHEDULED
| KernelStatus.READY_TO_START
| KernelStatus.PREPARING
| KernelStatus.TERMINATING
| KernelStatus.ERROR
Expand Down
Loading

0 comments on commit 2ad51d8

Please sign in to comment.