diff --git a/src/ai/backend/manager/scheduler/dispatcher.py b/src/ai/backend/manager/scheduler/dispatcher.py index 2fa8eab76fb..f98428579cc 100644 --- a/src/ai/backend/manager/scheduler/dispatcher.py +++ b/src/ai/backend/manager/scheduler/dispatcher.py @@ -301,15 +301,12 @@ def _pipeline(r: Redis) -> RedisPipeline: ) result = await db_sess.execute(query) schedulable_scaling_groups = [row.scaling_group for row in result.fetchall()] - produce_do_prepare = False for sgroup_name in schedulable_scaling_groups: try: kernel_agent_bindings = await self._schedule_in_sgroup( sched_ctx, sgroup_name, ) - if kernel_agent_bindings: - produce_do_prepare = True await redis_helper.execute( self.redis_live, lambda r: r.hset( @@ -357,8 +354,6 @@ def _pipeline(r: Redis) -> RedisPipeline: datetime.now(tzutc()).isoformat(), ), ) - if produce_do_prepare: - await self.event_producer.produce_event(DoPrepareEvent()) except DBAPIError as e: if getattr(e.orig, "pgcode", None) == "55P03": log.info( @@ -463,7 +458,9 @@ async def _update(): len(cancelled_sessions), ) zero = ResourceSlot() + num_scheduled = 0 kernel_agent_bindings_in_sgroup: list[KernelAgentBinding] = [] + while len(pending_sessions) > 0: async with self.db.begin_readonly_session() as db_sess: candidate_agents = await list_schedulable_agents_by_sgroup(db_sess, sgroup_name) @@ -734,6 +731,9 @@ async def _update_session_status_data() -> None: continue else: kernel_agent_bindings_in_sgroup.extend(kernel_agent_bindings) + num_scheduled += 1 + if num_scheduled > 0: + await self.event_producer.produce_event(DoPrepareEvent()) return kernel_agent_bindings_in_sgroup async def _filter_agent_by_container_limit(