Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[serve] Stop scheduling task early when requests have been cancelled #47847

Merged
merged 3 commits into from
Oct 8, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions python/ray/serve/_private/replica_scheduler/pow_2_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -647,7 +647,6 @@ async def select_from_candidate_replicas(

def _get_pending_request_matching_metadata(
self,
replica: ReplicaWrapper,
request_metadata: Optional[RequestMetadata] = None,
) -> Optional[PendingRequest]:
if request_metadata is None or not request_metadata.multiplexed_model_id:
Expand Down Expand Up @@ -676,7 +675,7 @@ def fulfill_next_pending_request(
# First try to match a pending request based on the request metadata (currently
# this only looks at the multiplexed model ID).
matched_pending_request = self._get_pending_request_matching_metadata(
replica, request_metadata
request_metadata
)
if matched_pending_request is not None:
matched_pending_request.future.set_result(replica)
Expand Down Expand Up @@ -744,6 +743,18 @@ async def fulfill_pending_requests(self):
)
logger.warning(warning_log)

# Clear out pending requests at the front of the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a minor optimization, but should this be at the top of the loop instead? The problematic situation could occur immediately after choose_two_replicas_with_backoff

# queue that have been cancelled, then reevaluate
# if we need to continue this scheduling task.
while (
len(self._pending_requests_to_fulfill) > 0
and self._pending_requests_to_fulfill[0].future.done()
):
self._pending_requests_to_fulfill.popleft()

if len(self._scheduling_tasks) > self.target_num_scheduling_tasks:
break

except Exception:
logger.exception("Unexpected error in fulfill_pending_requests.")
finally:
Expand Down
97 changes: 71 additions & 26 deletions python/ray/serve/tests/unit/test_pow_2_replica_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ def pow_2_scheduler(request) -> PowerOfTwoChoicesReplicaScheduler:
# In order to prevent issues like https://github.com/ray-project/ray/issues/40631,
# construct the scheduler on a different loop to mimic the deployment handle path.
async def construct_scheduler(loop: asyncio.AbstractEventLoop):
return PowerOfTwoChoicesReplicaScheduler(
scheduler = PowerOfTwoChoicesReplicaScheduler(
loop,
DeploymentID(name="TEST_DEPLOYMENT"),
handle_source=request.param.get(
Expand All @@ -151,6 +151,8 @@ async def construct_scheduler(loop: asyncio.AbstractEventLoop):
),
get_curr_time_s=TIMER.time,
)
scheduler.backoff_sequence_s = [0, 0.001, 0.001, 0.001, 0.001, 0.001, 0.001]
return scheduler

s = asyncio.new_event_loop().run_until_complete(
construct_scheduler(get_or_create_event_loop())
Expand Down Expand Up @@ -220,7 +222,7 @@ async def test_no_replicas_available_then_one_available(pow_2_scheduler):
loop = get_or_create_event_loop()

task = loop.create_task(s.choose_replica_for_request(fake_pending_request()))
done, _ = await asyncio.wait([task], timeout=0.1)
done, _ = await asyncio.wait([task], timeout=0.01)
assert len(done) == 0

r1 = FakeReplicaWrapper("r1")
Expand Down Expand Up @@ -250,14 +252,14 @@ async def test_replica_does_not_accept_then_accepts(pow_2_scheduler):
loop = get_or_create_event_loop()

task = loop.create_task(s.choose_replica_for_request(fake_pending_request()))
done, _ = await asyncio.wait([task], timeout=0.1)
done, _ = await asyncio.wait([task], timeout=0.01)
assert len(done) == 0

r1 = FakeReplicaWrapper("r1")
r1.set_queue_len_response(DEFAULT_MAX_ONGOING_REQUESTS + 1)
s.update_replicas([r1])

done, _ = await asyncio.wait([task], timeout=0.1)
done, _ = await asyncio.wait([task], timeout=0.01)
assert len(done) == 0

r1.set_queue_len_response(0)
Expand All @@ -284,14 +286,14 @@ async def test_no_replicas_accept_then_new_one_accepts(pow_2_scheduler):
loop = get_or_create_event_loop()

task = loop.create_task(s.choose_replica_for_request(fake_pending_request()))
done, _ = await asyncio.wait([task], timeout=0.1)
done, _ = await asyncio.wait([task], timeout=0.01)
assert len(done) == 0

r1 = FakeReplicaWrapper("r1")
r1.set_queue_len_response(DEFAULT_MAX_ONGOING_REQUESTS + 1)
s.update_replicas([r1])

done, _ = await asyncio.wait([task], timeout=0.1)
done, _ = await asyncio.wait([task], timeout=0.01)
assert len(done) == 0

r2 = FakeReplicaWrapper("r2")
Expand Down Expand Up @@ -325,11 +327,11 @@ async def test_one_replica_available_then_none_then_one(pow_2_scheduler):
s.update_replicas([r1])

task = loop.create_task(s.choose_replica_for_request(fake_pending_request()))
done, _ = await asyncio.wait([task], timeout=0.1)
done, _ = await asyncio.wait([task], timeout=0.01)
assert len(done) == 0

s.update_replicas([])
done, _ = await asyncio.wait([task], timeout=0.1)
done, _ = await asyncio.wait([task], timeout=0.01)
assert len(done) == 0

r1.set_queue_len_response(0)
Expand Down Expand Up @@ -490,7 +492,7 @@ async def test_tasks_scheduled_fifo(pow_2_scheduler):
loop.create_task(s.choose_replica_for_request(fake_pending_request()))
)

done, _ = await asyncio.wait(tasks, timeout=0.1)
done, _ = await asyncio.wait(tasks, timeout=0.01)
assert len(done) == 0

# Only a single request will be accepted at a time due to
Expand Down Expand Up @@ -539,7 +541,7 @@ async def test_retried_tasks_scheduled_fifo(pow_2_scheduler):
)
)

done, _ = await asyncio.wait(tasks, timeout=0.1)
done, _ = await asyncio.wait(tasks, timeout=0.01)
assert len(done) == 0

# Only a single request will be accepted at a time due to
Expand Down Expand Up @@ -587,7 +589,7 @@ async def test_cancellation(pow_2_scheduler):
task1 = loop.create_task(s.choose_replica_for_request(fake_pending_request()))
task2 = loop.create_task(s.choose_replica_for_request(fake_pending_request()))

done, _ = await asyncio.wait([task1, task2], timeout=0.1)
done, _ = await asyncio.wait([task1, task2], timeout=0.01)
assert len(done) == 0

task1.cancel()
Expand All @@ -603,6 +605,49 @@ async def test_cancellation(pow_2_scheduler):
assert s.num_pending_requests == 0


@pytest.mark.asyncio
@pytest.mark.parametrize(
"pow_2_scheduler",
[
{"prefer_local_node": True, "prefer_local_az": True},
{"prefer_local_node": True, "prefer_local_az": False},
{"prefer_local_node": False, "prefer_local_az": True},
{"prefer_local_node": False, "prefer_local_az": False},
],
indirect=True,
)
async def test_cancellation_when_replicas_maxed(pow_2_scheduler):
"""
If a pending assignment is cancelled, it shouldn't get fulfilled and the next
request in the queue should be.
"""
s = pow_2_scheduler
loop = get_or_create_event_loop()

task = loop.create_task(s.choose_replica_for_request(fake_pending_request()))

# There is only one replica that is maxed out on requests
r1 = FakeReplicaWrapper("r1")
r1.set_queue_len_response(DEFAULT_MAX_ONGOING_REQUESTS)
s.update_replicas([r1])
# So one scheduling task should have been started to try to schedule
# the request to a replica, but it should be blocked because the
# replica doesn't have capacity to accept new requests
done, _ = await asyncio.wait([task], timeout=0.01)
assert len(done) == 0
assert s.curr_num_scheduling_tasks == 1

# Cancel while the scheduling task is repeatedly trying to find an
# available replica
task.cancel()

# Verify that the scheduling tasks exit and there are no assignments left.
await async_wait_for_condition(
lambda: s.curr_num_scheduling_tasks == 0, retry_interval_ms=1
)
assert s.num_pending_requests == 0


@pytest.mark.asyncio
@pytest.mark.parametrize(
"pow_2_scheduler",
Expand All @@ -624,7 +669,7 @@ async def test_only_task_cancelled(pow_2_scheduler):

task = loop.create_task(s.choose_replica_for_request(fake_pending_request()))

done, _ = await asyncio.wait([task], timeout=0.1)
done, _ = await asyncio.wait([task], timeout=0.01)
assert len(done) == 0

task.cancel()
Expand Down Expand Up @@ -669,7 +714,7 @@ async def test_scheduling_task_cap(pow_2_scheduler):
loop.create_task(s.choose_replica_for_request(fake_pending_request()))
)

done, _ = await asyncio.wait(tasks, timeout=0.1)
done, _ = await asyncio.wait(tasks, timeout=0.01)
assert len(done) == 0

# There should be zero scheduling tasks while there are no replicas.
Expand All @@ -679,7 +724,7 @@ async def test_scheduling_task_cap(pow_2_scheduler):
r1.set_queue_len_response(DEFAULT_MAX_ONGOING_REQUESTS + 1)
s.update_replicas([r1])

done, _ = await asyncio.wait(tasks, timeout=0.1)
done, _ = await asyncio.wait(tasks, timeout=0.01)
assert len(done) == 0

# Now that there is at least one replica available, there should be nonzero
Expand Down Expand Up @@ -734,7 +779,7 @@ async def test_scheduling_task_cap_hard_limit(pow_2_scheduler):
loop.create_task(s.choose_replica_for_request(fake_pending_request()))
)

done, _ = await asyncio.wait(tasks, timeout=0.1)
done, _ = await asyncio.wait(tasks, timeout=0.01)
assert len(done) == 0

# There should be zero scheduling tasks while there are no replicas.
Expand All @@ -744,7 +789,7 @@ async def test_scheduling_task_cap_hard_limit(pow_2_scheduler):
r1.set_queue_len_response(DEFAULT_MAX_ONGOING_REQUESTS + 1)
s.update_replicas([r1])

done, _ = await asyncio.wait(tasks, timeout=0.1)
done, _ = await asyncio.wait(tasks, timeout=0.01)
assert len(done) == 0

# Now that there is at least one replica available, there should be nonzero
Expand Down Expand Up @@ -797,7 +842,7 @@ async def test_replica_responds_after_being_removed(pow_2_scheduler):
# Start the scheduling task, which will hang waiting for the queue length response.
task = loop.create_task(s.choose_replica_for_request(fake_pending_request()))

done, _ = await asyncio.wait([task], timeout=0.1)
done, _ = await asyncio.wait([task], timeout=0.01)
assert len(done) == 0
assert s.curr_num_scheduling_tasks == 1

Expand All @@ -808,7 +853,7 @@ async def test_replica_responds_after_being_removed(pow_2_scheduler):
r1.set_queue_len_response(0)

# The original replica should *not* be scheduled.
done, _ = await asyncio.wait([task], timeout=0.1)
done, _ = await asyncio.wait([task], timeout=0.01)
assert len(done) == 0
assert s.curr_num_scheduling_tasks == 1

Expand Down Expand Up @@ -1208,7 +1253,7 @@ async def test_multiple_queries_with_different_model_ids(self, pow_2_scheduler):
),
]

done, _ = await asyncio.wait(tasks, timeout=0.1)
done, _ = await asyncio.wait(tasks, timeout=0.01)
assert len(done) == len(tasks)

assert all(
Expand Down Expand Up @@ -1241,7 +1286,7 @@ async def test_no_replicas_available_then_choose_one_with_id(self, pow_2_schedul
]

# Scheduling tasks should be in backoff.
done, _ = await asyncio.wait(tasks, timeout=0.1)
done, _ = await asyncio.wait(tasks, timeout=0.01)
assert len(done) == 0

# Now add two more replicas, one of which has the model ID.
Expand Down Expand Up @@ -1279,7 +1324,7 @@ async def test_tasks_scheduled_fifo_among_model_ids(self, pow_2_scheduler):
)
)

done, _ = await asyncio.wait(m1_tasks + m2_tasks, timeout=0.1)
done, _ = await asyncio.wait(m1_tasks + m2_tasks, timeout=0.01)
assert len(done) == 0

r1 = FakeReplicaWrapper("r1", model_ids={"m1"}, reset_after_response=True)
Expand Down Expand Up @@ -1359,7 +1404,7 @@ async def test_queue_len_response_deadline_backoff(pow_2_scheduler):
# Attempt to schedule; the replica will be attempted and a timeout will occur
# due to the short timeout set above.
task = loop.create_task(s.choose_replica_for_request(fake_pending_request()))
done, _ = await asyncio.wait([task], timeout=0.2)
done, _ = await asyncio.wait([task], timeout=0.01)
assert len(done) == 0

# Verify first ping
Expand Down Expand Up @@ -1404,7 +1449,7 @@ async def test_max_queue_len_response_deadline(pow_2_scheduler):
# Attempt to schedule; the replica will be attempted and a timeout will occur
# due to the short timeout set above.
task = loop.create_task(s.choose_replica_for_request(fake_pending_request()))
done, _ = await asyncio.wait([task], timeout=0.2)
done, _ = await asyncio.wait([task], timeout=0.01)
assert len(done) == 0

assert all(
Expand Down Expand Up @@ -1555,15 +1600,15 @@ async def test_queue_len_cache_replica_at_capacity_is_probed(pow_2_scheduler):
s.replica_queue_len_cache.update(r1.replica_id, DEFAULT_MAX_ONGOING_REQUESTS)

task = loop.create_task(s.choose_replica_for_request(fake_pending_request()))
done, _ = await asyncio.wait([task], timeout=0.1)
done, _ = await asyncio.wait([task], timeout=0.01)
assert len(done) == 0
# 1 probe from scheduling requests
# + 1 probe from when the replica set was updated with replica r1
assert len(r1.queue_len_deadline_history) - 1 == 1

# Now let the replica respond and accept the request, it should be scheduled.
r1.set_queue_len_response(DEFAULT_MAX_ONGOING_REQUESTS - 1)
done, _ = await asyncio.wait([task], timeout=0.1)
done, _ = await asyncio.wait([task], timeout=0.01)
assert len(done) == 1
assert (await task) == r1

Expand Down Expand Up @@ -1591,7 +1636,7 @@ async def test_queue_len_cache_background_probing(pow_2_scheduler):
s.replica_queue_len_cache.update(r1.replica_id, 0)

task = loop.create_task(s.choose_replica_for_request(fake_pending_request()))
done, _ = await asyncio.wait([task], timeout=0.1)
done, _ = await asyncio.wait([task], timeout=0.01)
assert len(done) == 1
assert (await task) == r1
# 0 probes from scheduling requests
Expand Down