From 8749f2dfd6438f9985229a8da8e166cd8fd1914f Mon Sep 17 00:00:00 2001 From: Cindy Zhang Date: Fri, 27 Sep 2024 15:53:11 -0700 Subject: [PATCH 1/2] stop scheduling task when all prs have been cancelled Signed-off-by: Cindy Zhang --- .../replica_scheduler/pow_2_scheduler.py | 15 ++- .../unit/test_pow_2_replica_scheduler.py | 97 ++++++++++++++----- 2 files changed, 84 insertions(+), 28 deletions(-) diff --git a/python/ray/serve/_private/replica_scheduler/pow_2_scheduler.py b/python/ray/serve/_private/replica_scheduler/pow_2_scheduler.py index ff163a523b4c..97a6c4ae5de0 100644 --- a/python/ray/serve/_private/replica_scheduler/pow_2_scheduler.py +++ b/python/ray/serve/_private/replica_scheduler/pow_2_scheduler.py @@ -647,7 +647,6 @@ async def select_from_candidate_replicas( def _get_pending_request_matching_metadata( self, - replica: ReplicaWrapper, request_metadata: Optional[RequestMetadata] = None, ) -> Optional[PendingRequest]: if request_metadata is None or not request_metadata.multiplexed_model_id: @@ -676,7 +675,7 @@ def fulfill_next_pending_request( # First try to match a pending request based on the request metadata (currently # this only looks at the multiplexed model ID). matched_pending_request = self._get_pending_request_matching_metadata( - replica, request_metadata + request_metadata ) if matched_pending_request is not None: matched_pending_request.future.set_result(replica) @@ -744,6 +743,18 @@ async def fulfill_pending_requests(self): ) logger.warning(warning_log) + # Clear out pending requests at the front of the + # queue that have been cancelled, then reevaluate + # if we need to continue this scheduling task. + while ( + len(self._pending_requests_to_fulfill) > 0 + and self._pending_requests_to_fulfill[0].future.done() + ): + self._pending_requests_to_fulfill.popleft() + + if len(self._scheduling_tasks) > self.target_num_scheduling_tasks: + break + except Exception: logger.exception("Unexpected error in fulfill_pending_requests.") finally: diff --git a/python/ray/serve/tests/unit/test_pow_2_replica_scheduler.py b/python/ray/serve/tests/unit/test_pow_2_replica_scheduler.py index 4c3cf5d9f9fb..e078885486c5 100644 --- a/python/ray/serve/tests/unit/test_pow_2_replica_scheduler.py +++ b/python/ray/serve/tests/unit/test_pow_2_replica_scheduler.py @@ -134,7 +134,7 @@ def pow_2_scheduler(request) -> PowerOfTwoChoicesReplicaScheduler: # In order to prevent issues like https://github.com/ray-project/ray/issues/40631, # construct the scheduler on a different loop to mimic the deployment handle path. async def construct_scheduler(loop: asyncio.AbstractEventLoop): - return PowerOfTwoChoicesReplicaScheduler( + scheduler = PowerOfTwoChoicesReplicaScheduler( loop, DeploymentID(name="TEST_DEPLOYMENT"), handle_source=request.param.get( @@ -151,6 +151,8 @@ async def construct_scheduler(loop: asyncio.AbstractEventLoop): ), get_curr_time_s=TIMER.time, ) + scheduler.backoff_sequence_s = [0, 0.001, 0.001, 0.001, 0.001, 0.001, 0.001] + return scheduler s = asyncio.new_event_loop().run_until_complete( construct_scheduler(get_or_create_event_loop()) @@ -220,7 +222,7 @@ async def test_no_replicas_available_then_one_available(pow_2_scheduler): loop = get_or_create_event_loop() task = loop.create_task(s.choose_replica_for_request(fake_pending_request())) - done, _ = await asyncio.wait([task], timeout=0.1) + done, _ = await asyncio.wait([task], timeout=0.01) assert len(done) == 0 r1 = FakeReplicaWrapper("r1") @@ -250,14 +252,14 @@ async def test_replica_does_not_accept_then_accepts(pow_2_scheduler): loop = get_or_create_event_loop() task = loop.create_task(s.choose_replica_for_request(fake_pending_request())) - done, _ = await asyncio.wait([task], timeout=0.1) + done, _ = await asyncio.wait([task], timeout=0.01) assert len(done) == 0 r1 = FakeReplicaWrapper("r1") r1.set_queue_len_response(DEFAULT_MAX_ONGOING_REQUESTS + 1) s.update_replicas([r1]) - done, _ = await asyncio.wait([task], timeout=0.1) + done, _ = await asyncio.wait([task], timeout=0.01) assert len(done) == 0 r1.set_queue_len_response(0) @@ -284,14 +286,14 @@ async def test_no_replicas_accept_then_new_one_accepts(pow_2_scheduler): loop = get_or_create_event_loop() task = loop.create_task(s.choose_replica_for_request(fake_pending_request())) - done, _ = await asyncio.wait([task], timeout=0.1) + done, _ = await asyncio.wait([task], timeout=0.01) assert len(done) == 0 r1 = FakeReplicaWrapper("r1") r1.set_queue_len_response(DEFAULT_MAX_ONGOING_REQUESTS + 1) s.update_replicas([r1]) - done, _ = await asyncio.wait([task], timeout=0.1) + done, _ = await asyncio.wait([task], timeout=0.01) assert len(done) == 0 r2 = FakeReplicaWrapper("r2") @@ -325,11 +327,11 @@ async def test_one_replica_available_then_none_then_one(pow_2_scheduler): s.update_replicas([r1]) task = loop.create_task(s.choose_replica_for_request(fake_pending_request())) - done, _ = await asyncio.wait([task], timeout=0.1) + done, _ = await asyncio.wait([task], timeout=0.01) assert len(done) == 0 s.update_replicas([]) - done, _ = await asyncio.wait([task], timeout=0.1) + done, _ = await asyncio.wait([task], timeout=0.01) assert len(done) == 0 r1.set_queue_len_response(0) @@ -490,7 +492,7 @@ async def test_tasks_scheduled_fifo(pow_2_scheduler): loop.create_task(s.choose_replica_for_request(fake_pending_request())) ) - done, _ = await asyncio.wait(tasks, timeout=0.1) + done, _ = await asyncio.wait(tasks, timeout=0.01) assert len(done) == 0 # Only a single request will be accepted at a time due to @@ -539,7 +541,7 @@ async def test_retried_tasks_scheduled_fifo(pow_2_scheduler): ) ) - done, _ = await asyncio.wait(tasks, timeout=0.1) + done, _ = await asyncio.wait(tasks, timeout=0.01) assert len(done) == 0 # Only a single request will be accepted at a time due to @@ -587,7 +589,7 @@ async def test_cancellation(pow_2_scheduler): task1 = loop.create_task(s.choose_replica_for_request(fake_pending_request())) task2 = loop.create_task(s.choose_replica_for_request(fake_pending_request())) - done, _ = await asyncio.wait([task1, task2], timeout=0.1) + done, _ = await asyncio.wait([task1, task2], timeout=0.01) assert len(done) == 0 task1.cancel() @@ -603,6 +605,49 @@ async def test_cancellation(pow_2_scheduler): assert s.num_pending_requests == 0 +@pytest.mark.asyncio +@pytest.mark.parametrize( + "pow_2_scheduler", + [ + {"prefer_local_node": True, "prefer_local_az": True}, + {"prefer_local_node": True, "prefer_local_az": False}, + {"prefer_local_node": False, "prefer_local_az": True}, + {"prefer_local_node": False, "prefer_local_az": False}, + ], + indirect=True, +) +async def test_cancellation_when_replicas_maxed(pow_2_scheduler): + """ + If a pending assignment is cancelled, it shouldn't get fulfilled and the next + request in the queue should be. + """ + s = pow_2_scheduler + loop = get_or_create_event_loop() + + task = loop.create_task(s.choose_replica_for_request(fake_pending_request())) + + # There is only one replica that is maxed out on requests + r1 = FakeReplicaWrapper("r1") + r1.set_queue_len_response(DEFAULT_MAX_ONGOING_REQUESTS) + s.update_replicas([r1]) + # So one scheduling task should have been started to try to schedule + # the request to a replica, but it should be blocked because the + # replica doesn't have capacity to accept new requests + done, _ = await asyncio.wait([task], timeout=0.01) + assert len(done) == 0 + assert s.curr_num_scheduling_tasks == 1 + + # Cancel while the scheduling task is repeatedly trying to find an + # available replica + task.cancel() + + # Verify that the scheduling tasks exit and there are no assignments left. + await async_wait_for_condition( + lambda: s.curr_num_scheduling_tasks == 0, retry_interval_ms=1 + ) + assert s.num_pending_requests == 0 + + @pytest.mark.asyncio @pytest.mark.parametrize( "pow_2_scheduler", @@ -624,7 +669,7 @@ async def test_only_task_cancelled(pow_2_scheduler): task = loop.create_task(s.choose_replica_for_request(fake_pending_request())) - done, _ = await asyncio.wait([task], timeout=0.1) + done, _ = await asyncio.wait([task], timeout=0.01) assert len(done) == 0 task.cancel() @@ -669,7 +714,7 @@ async def test_scheduling_task_cap(pow_2_scheduler): loop.create_task(s.choose_replica_for_request(fake_pending_request())) ) - done, _ = await asyncio.wait(tasks, timeout=0.1) + done, _ = await asyncio.wait(tasks, timeout=0.01) assert len(done) == 0 # There should be zero scheduling tasks while there are no replicas. @@ -679,7 +724,7 @@ async def test_scheduling_task_cap(pow_2_scheduler): r1.set_queue_len_response(DEFAULT_MAX_ONGOING_REQUESTS + 1) s.update_replicas([r1]) - done, _ = await asyncio.wait(tasks, timeout=0.1) + done, _ = await asyncio.wait(tasks, timeout=0.01) assert len(done) == 0 # Now that there is at least one replica available, there should be nonzero @@ -734,7 +779,7 @@ async def test_scheduling_task_cap_hard_limit(pow_2_scheduler): loop.create_task(s.choose_replica_for_request(fake_pending_request())) ) - done, _ = await asyncio.wait(tasks, timeout=0.1) + done, _ = await asyncio.wait(tasks, timeout=0.01) assert len(done) == 0 # There should be zero scheduling tasks while there are no replicas. @@ -744,7 +789,7 @@ async def test_scheduling_task_cap_hard_limit(pow_2_scheduler): r1.set_queue_len_response(DEFAULT_MAX_ONGOING_REQUESTS + 1) s.update_replicas([r1]) - done, _ = await asyncio.wait(tasks, timeout=0.1) + done, _ = await asyncio.wait(tasks, timeout=0.01) assert len(done) == 0 # Now that there is at least one replica available, there should be nonzero @@ -797,7 +842,7 @@ async def test_replica_responds_after_being_removed(pow_2_scheduler): # Start the scheduling task, which will hang waiting for the queue length response. task = loop.create_task(s.choose_replica_for_request(fake_pending_request())) - done, _ = await asyncio.wait([task], timeout=0.1) + done, _ = await asyncio.wait([task], timeout=0.01) assert len(done) == 0 assert s.curr_num_scheduling_tasks == 1 @@ -808,7 +853,7 @@ async def test_replica_responds_after_being_removed(pow_2_scheduler): r1.set_queue_len_response(0) # The original replica should *not* be scheduled. - done, _ = await asyncio.wait([task], timeout=0.1) + done, _ = await asyncio.wait([task], timeout=0.01) assert len(done) == 0 assert s.curr_num_scheduling_tasks == 1 @@ -1208,7 +1253,7 @@ async def test_multiple_queries_with_different_model_ids(self, pow_2_scheduler): ), ] - done, _ = await asyncio.wait(tasks, timeout=0.1) + done, _ = await asyncio.wait(tasks, timeout=0.01) assert len(done) == len(tasks) assert all( @@ -1241,7 +1286,7 @@ async def test_no_replicas_available_then_choose_one_with_id(self, pow_2_schedul ] # Scheduling tasks should be in backoff. - done, _ = await asyncio.wait(tasks, timeout=0.1) + done, _ = await asyncio.wait(tasks, timeout=0.01) assert len(done) == 0 # Now add two more replicas, one of which has the model ID. @@ -1279,7 +1324,7 @@ async def test_tasks_scheduled_fifo_among_model_ids(self, pow_2_scheduler): ) ) - done, _ = await asyncio.wait(m1_tasks + m2_tasks, timeout=0.1) + done, _ = await asyncio.wait(m1_tasks + m2_tasks, timeout=0.01) assert len(done) == 0 r1 = FakeReplicaWrapper("r1", model_ids={"m1"}, reset_after_response=True) @@ -1359,7 +1404,7 @@ async def test_queue_len_response_deadline_backoff(pow_2_scheduler): # Attempt to schedule; the replica will be attempted and a timeout will occur # due to the short timeout set above. task = loop.create_task(s.choose_replica_for_request(fake_pending_request())) - done, _ = await asyncio.wait([task], timeout=0.2) + done, _ = await asyncio.wait([task], timeout=0.01) assert len(done) == 0 # Verify first ping @@ -1404,7 +1449,7 @@ async def test_max_queue_len_response_deadline(pow_2_scheduler): # Attempt to schedule; the replica will be attempted and a timeout will occur # due to the short timeout set above. task = loop.create_task(s.choose_replica_for_request(fake_pending_request())) - done, _ = await asyncio.wait([task], timeout=0.2) + done, _ = await asyncio.wait([task], timeout=0.01) assert len(done) == 0 assert all( @@ -1555,7 +1600,7 @@ async def test_queue_len_cache_replica_at_capacity_is_probed(pow_2_scheduler): s.replica_queue_len_cache.update(r1.replica_id, DEFAULT_MAX_ONGOING_REQUESTS) task = loop.create_task(s.choose_replica_for_request(fake_pending_request())) - done, _ = await asyncio.wait([task], timeout=0.1) + done, _ = await asyncio.wait([task], timeout=0.01) assert len(done) == 0 # 1 probe from scheduling requests # + 1 probe from when the replica set was updated with replica r1 @@ -1563,7 +1608,7 @@ async def test_queue_len_cache_replica_at_capacity_is_probed(pow_2_scheduler): # Now let the replica respond and accept the request, it should be scheduled. r1.set_queue_len_response(DEFAULT_MAX_ONGOING_REQUESTS - 1) - done, _ = await asyncio.wait([task], timeout=0.1) + done, _ = await asyncio.wait([task], timeout=0.01) assert len(done) == 1 assert (await task) == r1 @@ -1591,7 +1636,7 @@ async def test_queue_len_cache_background_probing(pow_2_scheduler): s.replica_queue_len_cache.update(r1.replica_id, 0) task = loop.create_task(s.choose_replica_for_request(fake_pending_request())) - done, _ = await asyncio.wait([task], timeout=0.1) + done, _ = await asyncio.wait([task], timeout=0.01) assert len(done) == 1 assert (await task) == r1 # 0 probes from scheduling requests From feb3dbd20038914425567a8920f2be75e43cdd39 Mon Sep 17 00:00:00 2001 From: Cindy Zhang Date: Tue, 8 Oct 2024 11:05:03 -0700 Subject: [PATCH 2/2] move to top of loop Signed-off-by: Cindy Zhang --- .../replica_scheduler/pow_2_scheduler.py | 24 +++++++++---------- 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/python/ray/serve/_private/replica_scheduler/pow_2_scheduler.py b/python/ray/serve/_private/replica_scheduler/pow_2_scheduler.py index 97a6c4ae5de0..24e22a6a5cbb 100644 --- a/python/ray/serve/_private/replica_scheduler/pow_2_scheduler.py +++ b/python/ray/serve/_private/replica_scheduler/pow_2_scheduler.py @@ -717,6 +717,18 @@ async def fulfill_pending_requests(self): async for candidates in self.choose_two_replicas_with_backoff( request_metadata ): + # Clear out pending requests at the front of the + # queue that have been cancelled, then reevaluate + # if we need to continue this scheduling task. + while ( + len(self._pending_requests_to_fulfill) > 0 + and self._pending_requests_to_fulfill[0].future.done() + ): + self._pending_requests_to_fulfill.popleft() + + if len(self._scheduling_tasks) > self.target_num_scheduling_tasks: + break + replica = await self.select_from_candidate_replicas( candidates, backoff_index ) @@ -743,18 +755,6 @@ async def fulfill_pending_requests(self): ) logger.warning(warning_log) - # Clear out pending requests at the front of the - # queue that have been cancelled, then reevaluate - # if we need to continue this scheduling task. - while ( - len(self._pending_requests_to_fulfill) > 0 - and self._pending_requests_to_fulfill[0].future.done() - ): - self._pending_requests_to_fulfill.popleft() - - if len(self._scheduling_tasks) > self.target_num_scheduling_tasks: - break - except Exception: logger.exception("Unexpected error in fulfill_pending_requests.") finally: