Skip to content

Commit

Permalink
[serve] Stop scheduling task early when requests have been cancelled (r…
Browse files Browse the repository at this point in the history
…ay-project#47847)

In `fulfill_pending_requests`, there are two nested loops:
- the outer loop greedily fulfills more requests so that if backoff
doesn't occur, it's not necessary for new asyncio tasks to be started to
fulfill each request
- the inner loop handles backoff if replicas can't be found to fulfill
the next request

The outer loop will be stopped if there are enough tasks to handle all
pending requests. However if all replicas are at max capacity, it's
possible for the inner loop to continue to loop even when the task is no
longer needed (e.g. when a request has been cancelled), because the
inner loop simply continues to try to find an available replica without
checking if the current task is even necessary.

This PR makes sure that at the end of each iteration of the inner loop,
it clears out requests in `pending_requests_to_fulfill` that have been
cancelled, and then breaks out of the loop if there are enough tasks to
handle the remaining requests.

Tests:
- Added a test that tests for the scenario where a request is cancelled
while it's trying to find an available replica
- Also modified the tests in `test_pow_2_scheduler.py` so that the
backoff sequence is small values (1ms), and the timeouts in the tests
are also low `10ms`, so that the unit tests run much faster (~5s now
compared to ~30s before).

## Related issue number

related: ray-project#47585

---------

Signed-off-by: Cindy Zhang <[email protected]>
Signed-off-by: ujjawal-khare <[email protected]>
  • Loading branch information
zcin authored and ujjawal-khare committed Oct 15, 2024
1 parent f63a571 commit 3e61970
Showing 1 changed file with 4 additions and 4 deletions.
8 changes: 4 additions & 4 deletions python/ray/serve/tests/unit/test_pow_2_replica_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1253,7 +1253,7 @@ async def test_multiple_queries_with_different_model_ids(self, pow_2_scheduler):
),
]

done, _ = await asyncio.wait(tasks, timeout=0.1)
done, _ = await asyncio.wait(tasks, timeout=0.01)
assert len(done) == len(tasks)

assert all(
Expand Down Expand Up @@ -1600,15 +1600,15 @@ async def test_queue_len_cache_replica_at_capacity_is_probed(pow_2_scheduler):
s.replica_queue_len_cache.update(r1.replica_id, DEFAULT_MAX_ONGOING_REQUESTS)

task = loop.create_task(s.choose_replica_for_request(fake_pending_request()))
done, _ = await asyncio.wait([task], timeout=0.1)
done, _ = await asyncio.wait([task], timeout=0.01)
assert len(done) == 0
# 1 probe from scheduling requests
# + 1 probe from when the replica set was updated with replica r1
assert len(r1.queue_len_deadline_history) - 1 == 1

# Now let the replica respond and accept the request, it should be scheduled.
r1.set_queue_len_response(DEFAULT_MAX_ONGOING_REQUESTS - 1)
done, _ = await asyncio.wait([task], timeout=0.1)
done, _ = await asyncio.wait([task], timeout=0.01)
assert len(done) == 1
assert (await task) == r1

Expand Down Expand Up @@ -1636,7 +1636,7 @@ async def test_queue_len_cache_background_probing(pow_2_scheduler):
s.replica_queue_len_cache.update(r1.replica_id, 0)

task = loop.create_task(s.choose_replica_for_request(fake_pending_request()))
done, _ = await asyncio.wait([task], timeout=0.1)
done, _ = await asyncio.wait([task], timeout=0.01)
assert len(done) == 1
assert (await task) == r1
# 0 probes from scheduling requests
Expand Down

0 comments on commit 3e61970

Please sign in to comment.