Skip to content

Commit

Permalink
[serve] Optimize cold start time (ray-project#38351)
Browse files Browse the repository at this point in the history
Small optimization that makes handles immediately send reports to the controller if the current state is "idle":
- handle has no queued requests
- there are 0 replicas for the deployment

This makes it so that the first request sent to a scaled-to-zero deployment doesn't have to wait for the every-10-second metric push.

Ran some tests below for comparison. The load test is ramp 0 -> 10 users -> 50 users -> 100 users. You can see this greatly helps the first ramp up from 0 -> 10 users, and doesn't really affect anything else.
  • Loading branch information
zcin authored Aug 17, 2023
1 parent e1208a7 commit e79c914
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 1 deletion.
28 changes: 27 additions & 1 deletion python/ray/serve/_private/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,10 @@ async def assign_replica(
def update_running_replicas(self, running_replicas: List[RunningReplicaInfo]):
pass

@property
def curr_replicas(self) -> Dict[str, ReplicaWrapper]:
pass


@dataclass
class PendingRequest:
Expand Down Expand Up @@ -752,6 +756,12 @@ def __init__(self, event_loop: asyncio.AbstractEventLoop):
str, List[RunningReplicaInfo]
] = defaultdict(list)

@property
def curr_replicas(self) -> Dict[str, ReplicaWrapper]:
return {
r.replica_tag: ActorReplicaWrapper(r) for r in self.in_flight_queries.keys()
}

def _reset_replica_iterator(self):
"""Reset the iterator used to load balance replicas.
Expand Down Expand Up @@ -1032,14 +1042,20 @@ def __init__(
deployment_info = DeploymentInfo.from_proto(deployment_route.deployment_info)
self.metrics_pusher = None
if deployment_info.deployment_config.autoscaling_config:
self.autoscaling_enabled = True
self.push_metrics_to_controller = (
controller_handle.record_handle_metrics.remote
)
self.metrics_pusher = MetricsPusher()
self.metrics_pusher.register_task(
self._collect_handle_queue_metrics,
HANDLE_METRIC_PUSH_INTERVAL_S,
controller_handle.record_handle_metrics.remote,
self.push_metrics_to_controller,
)

self.metrics_pusher.start()
else:
self.autoscaling_enabled = False

def _collect_handle_queue_metrics(self) -> Dict[str, int]:
return {str(self.deployment_id): self.num_queued_queries}
Expand All @@ -1063,6 +1079,16 @@ async def assign_request(
},
)

# Optimization: if there are currently zero replicas for a deployment,
# push handle metric to controller to allow for fast cold start time.
# Only do it for the first query to arrive on the router.
if (
self.autoscaling_enabled
and len(self._replica_scheduler.curr_replicas) == 0
and self.num_queued_queries == 1
):
self.push_metrics_to_controller({self.deployment_name: 1}, time.time())

try:
query = Query(
args=list(request_args),
Expand Down
33 changes: 33 additions & 0 deletions python/ray/serve/tests/test_autoscaling_policy.py
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,39 @@ def __call__(self):
assert len(get_running_replicas(controller, "A")) == 2


def test_cold_start_time(serve_instance):
"""Send 100 requests and check that we autoscale up, and then back down."""

@serve.deployment(
autoscaling_config={
"min_replicas": 0,
"max_replicas": 1,
"look_back_period_s": 0.2,
},
)
class A:
def __call__(self):
return "hello"

handle = serve.run(A.bind())

def check_running():
assert serve.status().applications["default"].status == "RUNNING"
return True

wait_for_condition(check_running)

start = time.time()
result = ray.get(handle.remote())
cold_start_time = time.time() - start
assert cold_start_time < 2
print(
"Time taken for deployment at 0 replicas to serve first request:",
cold_start_time,
)
assert result == "hello"


def test_smoothing_factor_scale_up_from_0_replicas():
"""Test that the smoothing factor is respected when scaling up from 0 replicas."""

Expand Down

0 comments on commit e79c914

Please sign in to comment.