From ce25b409ea2794f3e89ae4c10d9bf79b8e0e386b Mon Sep 17 00:00:00 2001 From: Ricky Xu Date: Mon, 21 Aug 2023 22:26:20 -0700 Subject: [PATCH] [core][serve][autoscaler] Fix resource time rounding down makes idle reporting incorrect (#38685) --------- Signed-off-by: rickyyx --- python/ray/autoscaler/v2/schema.py | 15 +++ python/ray/autoscaler/v2/tests/test_e2e.py | 115 ++++++++++++++++++ python/ray/cluster_utils.py | 5 +- .../scheduling/local_resource_manager.cc | 11 +- .../scheduling/local_resource_manager_test.cc | 23 ++++ 5 files changed, 165 insertions(+), 4 deletions(-) diff --git a/python/ray/autoscaler/v2/schema.py b/python/ray/autoscaler/v2/schema.py index 86ace52582bf..b87102f0f78a 100644 --- a/python/ray/autoscaler/v2/schema.py +++ b/python/ray/autoscaler/v2/schema.py @@ -49,6 +49,21 @@ class NodeInfo: # Descriptive details. details: Optional[str] = None + def total_resources(self) -> Dict[str, float]: + if self.resource_usage is None: + return {} + return {r.resource_name: r.total for r in self.resource_usage.usage} + + def available_resources(self) -> Dict[str, float]: + if self.resource_usage is None: + return {} + return {r.resource_name: r.total - r.used for r in self.resource_usage.usage} + + def used_resources(self) -> Dict[str, float]: + if self.resource_usage is None: + return {} + return {r.resource_name: r.used for r in self.resource_usage.usage} + @dataclass class LaunchRequest: diff --git a/python/ray/autoscaler/v2/tests/test_e2e.py b/python/ray/autoscaler/v2/tests/test_e2e.py index 582c4984fe87..2a8093c4a2d4 100644 --- a/python/ray/autoscaler/v2/tests/test_e2e.py +++ b/python/ray/autoscaler/v2/tests/test_e2e.py @@ -1,10 +1,12 @@ import os import sys import time +from typing import Dict import pytest import ray +from ray._private.resource_spec import HEAD_NODE_RESOURCE_NAME from ray._private.test_utils import run_string_as_driver_nonblocking, wait_for_condition from ray.autoscaler.v2.sdk import get_cluster_status from ray.cluster_utils import AutoscalingCluster @@ -12,6 +14,12 @@ from ray.util.state.api import list_placement_groups, list_tasks +def is_head_node_from_resource_usage(usage: Dict[str, float]) -> bool: + if HEAD_NODE_RESOURCE_NAME in usage: + return True + return False + + # TODO(rickyx): We are NOT able to counter multi-node inconsistency yet. The problem is # right now, when node A (head node) has an infeasible task, # node B just finished running previous task. @@ -200,6 +208,113 @@ def verify(): cluster.shutdown() +def test_object_store_memory_idle_node(shutdown_only): + + ray.init() + + obj = ray.put("hello") + + def verify(): + state = get_cluster_status() + for node in state.healthy_nodes: + assert node.node_status == "RUNNING" + assert node.used_resources()["object_store_memory"] > 0 + return True + + wait_for_condition(verify) + + del obj + + import time + + time.sleep(1) + + def verify(): + state = get_cluster_status() + for node in state.healthy_nodes: + assert node.node_status == "IDLE" + assert node.used_resources()["object_store_memory"] == 0 + assert node.resource_usage.idle_time_ms >= 1000 + return True + + wait_for_condition(verify) + + +def test_serve_num_replica_idle_node(): + # Test that nodes become idle after serve scaling down. + cluster = AutoscalingCluster( + head_resources={"CPU": 0}, + worker_node_types={ + "type-1": { + "resources": {"CPU": 4}, + "node_config": {}, + "min_workers": 0, + "max_workers": 30, + }, + }, + idle_timeout_minutes=999, + ) + + from ray import serve + + @serve.deployment(ray_actor_options={"num_cpus": 2}) + class Deployment: + def __call__(self): + return "hello" + + try: + cluster.start(override_env={"RAY_SERVE_PROXY_MIN_DRAINING_PERIOD_S": "2"}) + # 5 workers nodes should be busy and have 2(replicas) * 2(cpus per replicas) + # = 4 cpus used + serve.run(Deployment.options(num_replicas=10).bind()) + + expected_num_workers = 5 + + def verify(): + cluster_state = get_cluster_status() + + # Verify that nodes are busy. + assert len((cluster_state.healthy_nodes)) == expected_num_workers + 1 + for node in cluster_state.healthy_nodes: + assert node.node_status == "RUNNING" + if not is_head_node_from_resource_usage(node.total_resources()): + available = node.available_resources() + assert available["CPU"] == 0 + return True + + wait_for_condition(verify) + + # Downscale to 1 replicas, 4 workers nodes should be idle. + serve.run(Deployment.options(num_replicas=1).bind()) + + def verify(): + cluster_state = get_cluster_status() + # We should only have 1 running worker for the 1 replica, the rest idle. + expected_idle_workers = expected_num_workers - 1 + + assert len((cluster_state.healthy_nodes)) == expected_num_workers + 1 + idle_nodes = [] + + for node in cluster_state.healthy_nodes: + if not is_head_node_from_resource_usage(node.total_resources()): + available = node.available_resources() + if node.node_status == "IDLE": + assert available["CPU"] == 4 + idle_nodes.append(node) + from rich import print + + print(cluster_state.healthy_nodes) + assert len(idle_nodes) == expected_idle_workers + return True + + # A long sleep is needed for serve proxy to be removed. + wait_for_condition(verify, timeout=15, retry_interval_ms=1000) + + finally: + ray.shutdown() + cluster.shutdown() + + if __name__ == "__main__": if os.environ.get("PARALLEL_CI"): sys.exit(pytest.main(["-n", "auto", "--boxed", "-vs", __file__])) diff --git a/python/ray/cluster_utils.py b/python/ray/cluster_utils.py index 15bec6c13913..464869e93523 100644 --- a/python/ray/cluster_utils.py +++ b/python/ray/cluster_utils.py @@ -5,6 +5,7 @@ import subprocess import tempfile import time +from typing import Dict, Optional import yaml @@ -58,7 +59,7 @@ def _generate_config(self, head_resources, worker_node_types, **config_kwargs): custom_config.update(config_kwargs) return custom_config - def start(self, _system_config=None): + def start(self, _system_config=None, override_env: Optional[Dict] = None): """Start the cluster. After this call returns, you can connect to the cluster with @@ -88,6 +89,8 @@ def start(self, _system_config=None): ) env = os.environ.copy() env.update({"AUTOSCALER_UPDATE_INTERVAL_S": "1", "RAY_FAKE_CLUSTER": "1"}) + if override_env: + env.update(override_env) subprocess.check_call(cmd, env=env) def shutdown(self): diff --git a/src/ray/raylet/scheduling/local_resource_manager.cc b/src/ray/raylet/scheduling/local_resource_manager.cc index 79183194f59a..04193922ff70 100644 --- a/src/ray/raylet/scheduling/local_resource_manager.cc +++ b/src/ray/raylet/scheduling/local_resource_manager.cc @@ -429,9 +429,14 @@ std::optional LocalResourceManager::CreateSyncMessage( resources_data.set_object_pulls_queued(resources.object_pulls_queued); } - const auto now = absl::Now(); - resources_data.set_idle_duration_ms( - absl::ToInt64Milliseconds(now - GetResourceIdleTime().value_or(now))); + auto idle_time = GetResourceIdleTime(); + if (idle_time.has_value()) { + // We round up the idle duration to the nearest millisecond such that the idle + // reporting would be correct even if it's less than 1 millisecond. + const auto now = absl::Now(); + resources_data.set_idle_duration_ms(std::max( + static_cast(1), absl::ToInt64Milliseconds(now - idle_time.value()))); + } resources_data.set_is_draining(IsLocalNodeDraining()); diff --git a/src/ray/raylet/scheduling/local_resource_manager_test.cc b/src/ray/raylet/scheduling/local_resource_manager_test.cc index 23bc08e6744a..f099276c8a1d 100644 --- a/src/ray/raylet/scheduling/local_resource_manager_test.cc +++ b/src/ray/raylet/scheduling/local_resource_manager_test.cc @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +// Don't know why macro redefinition happens, but this is failing windows +// build. #include "ray/raylet/scheduling/local_resource_manager.h" #include "gtest/gtest.h" @@ -44,6 +46,13 @@ class LocalResourceManagerTest : public ::testing::Test { } } + rpc::ResourcesData GetSyncMessageForResourceReport() { + auto msg = manager->CreateSyncMessage(0, syncer::MessageType::RESOURCE_VIEW); + rpc::ResourcesData resources_data; + resources_data.ParseFromString(msg->sync_message()); + return resources_data; + } + scheduling::NodeID local_node_id = scheduling::NodeID(0); std::unique_ptr manager; }; @@ -272,6 +281,9 @@ TEST_F(LocalResourceManagerTest, IdleResourceTimeTest) { { auto idle_time = manager->GetResourceIdleTime(); ASSERT_EQ(idle_time, absl::nullopt); + + const auto &resources_data = GetSyncMessageForResourceReport(); + ASSERT_EQ(resources_data.idle_duration_ms(), 0); } // Deallocate the resource @@ -290,6 +302,10 @@ TEST_F(LocalResourceManagerTest, IdleResourceTimeTest) { auto dur = absl::Now() - *idle_time; ASSERT_GE(dur, absl::ZeroDuration()); ASSERT_LE(dur, absl::Seconds(1)); + + const auto &resources_data = GetSyncMessageForResourceReport(); + ASSERT_GE(resources_data.idle_duration_ms(), 0); + ASSERT_LE(resources_data.idle_duration_ms(), 1 * 1000); } } @@ -299,6 +315,9 @@ TEST_F(LocalResourceManagerTest, IdleResourceTimeTest) { manager->UpdateAvailableObjectStoreMemResource(); auto idle_time = manager->GetResourceIdleTime(); ASSERT_EQ(idle_time, absl::nullopt); + + const auto &resources_data = GetSyncMessageForResourceReport(); + ASSERT_EQ(resources_data.idle_duration_ms(), 0); } // Free object store memory usage should make node resource idle. @@ -309,6 +328,10 @@ TEST_F(LocalResourceManagerTest, IdleResourceTimeTest) { ASSERT_TRUE(idle_time.has_value()); auto dur = absl::Now() - *idle_time; ASSERT_GE(dur, absl::ZeroDuration()); + + // And syncer messages should be created correctly for resource reporting. + const auto &resources_data = GetSyncMessageForResourceReport(); + ASSERT_GE(resources_data.idle_duration_ms(), 0); } }