Skip to content

Commit

Permalink
[core][serve][autoscaler] Fix resource time rounding down makes idle …
Browse files Browse the repository at this point in the history
…reporting incorrect (ray-project#38685)


---------

Signed-off-by: rickyyx <[email protected]>
  • Loading branch information
rickyyx authored Aug 22, 2023
1 parent 543e096 commit ce25b40
Show file tree
Hide file tree
Showing 5 changed files with 165 additions and 4 deletions.
15 changes: 15 additions & 0 deletions python/ray/autoscaler/v2/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,21 @@ class NodeInfo:
# Descriptive details.
details: Optional[str] = None

def total_resources(self) -> Dict[str, float]:
if self.resource_usage is None:
return {}
return {r.resource_name: r.total for r in self.resource_usage.usage}

def available_resources(self) -> Dict[str, float]:
if self.resource_usage is None:
return {}
return {r.resource_name: r.total - r.used for r in self.resource_usage.usage}

def used_resources(self) -> Dict[str, float]:
if self.resource_usage is None:
return {}
return {r.resource_name: r.used for r in self.resource_usage.usage}


@dataclass
class LaunchRequest:
Expand Down
115 changes: 115 additions & 0 deletions python/ray/autoscaler/v2/tests/test_e2e.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,25 @@
import os
import sys
import time
from typing import Dict

import pytest

import ray
from ray._private.resource_spec import HEAD_NODE_RESOURCE_NAME
from ray._private.test_utils import run_string_as_driver_nonblocking, wait_for_condition
from ray.autoscaler.v2.sdk import get_cluster_status
from ray.cluster_utils import AutoscalingCluster
from ray.util.placement_group import placement_group, remove_placement_group
from ray.util.state.api import list_placement_groups, list_tasks


def is_head_node_from_resource_usage(usage: Dict[str, float]) -> bool:
if HEAD_NODE_RESOURCE_NAME in usage:
return True
return False


# TODO(rickyx): We are NOT able to counter multi-node inconsistency yet. The problem is
# right now, when node A (head node) has an infeasible task,
# node B just finished running previous task.
Expand Down Expand Up @@ -200,6 +208,113 @@ def verify():
cluster.shutdown()


def test_object_store_memory_idle_node(shutdown_only):

ray.init()

obj = ray.put("hello")

def verify():
state = get_cluster_status()
for node in state.healthy_nodes:
assert node.node_status == "RUNNING"
assert node.used_resources()["object_store_memory"] > 0
return True

wait_for_condition(verify)

del obj

import time

time.sleep(1)

def verify():
state = get_cluster_status()
for node in state.healthy_nodes:
assert node.node_status == "IDLE"
assert node.used_resources()["object_store_memory"] == 0
assert node.resource_usage.idle_time_ms >= 1000
return True

wait_for_condition(verify)


def test_serve_num_replica_idle_node():
# Test that nodes become idle after serve scaling down.
cluster = AutoscalingCluster(
head_resources={"CPU": 0},
worker_node_types={
"type-1": {
"resources": {"CPU": 4},
"node_config": {},
"min_workers": 0,
"max_workers": 30,
},
},
idle_timeout_minutes=999,
)

from ray import serve

@serve.deployment(ray_actor_options={"num_cpus": 2})
class Deployment:
def __call__(self):
return "hello"

try:
cluster.start(override_env={"RAY_SERVE_PROXY_MIN_DRAINING_PERIOD_S": "2"})
# 5 workers nodes should be busy and have 2(replicas) * 2(cpus per replicas)
# = 4 cpus used
serve.run(Deployment.options(num_replicas=10).bind())

expected_num_workers = 5

def verify():
cluster_state = get_cluster_status()

# Verify that nodes are busy.
assert len((cluster_state.healthy_nodes)) == expected_num_workers + 1
for node in cluster_state.healthy_nodes:
assert node.node_status == "RUNNING"
if not is_head_node_from_resource_usage(node.total_resources()):
available = node.available_resources()
assert available["CPU"] == 0
return True

wait_for_condition(verify)

# Downscale to 1 replicas, 4 workers nodes should be idle.
serve.run(Deployment.options(num_replicas=1).bind())

def verify():
cluster_state = get_cluster_status()
# We should only have 1 running worker for the 1 replica, the rest idle.
expected_idle_workers = expected_num_workers - 1

assert len((cluster_state.healthy_nodes)) == expected_num_workers + 1
idle_nodes = []

for node in cluster_state.healthy_nodes:
if not is_head_node_from_resource_usage(node.total_resources()):
available = node.available_resources()
if node.node_status == "IDLE":
assert available["CPU"] == 4
idle_nodes.append(node)
from rich import print

print(cluster_state.healthy_nodes)
assert len(idle_nodes) == expected_idle_workers
return True

# A long sleep is needed for serve proxy to be removed.
wait_for_condition(verify, timeout=15, retry_interval_ms=1000)

finally:
ray.shutdown()
cluster.shutdown()


if __name__ == "__main__":
if os.environ.get("PARALLEL_CI"):
sys.exit(pytest.main(["-n", "auto", "--boxed", "-vs", __file__]))
Expand Down
5 changes: 4 additions & 1 deletion python/ray/cluster_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import subprocess
import tempfile
import time
from typing import Dict, Optional

import yaml

Expand Down Expand Up @@ -58,7 +59,7 @@ def _generate_config(self, head_resources, worker_node_types, **config_kwargs):
custom_config.update(config_kwargs)
return custom_config

def start(self, _system_config=None):
def start(self, _system_config=None, override_env: Optional[Dict] = None):
"""Start the cluster.
After this call returns, you can connect to the cluster with
Expand Down Expand Up @@ -88,6 +89,8 @@ def start(self, _system_config=None):
)
env = os.environ.copy()
env.update({"AUTOSCALER_UPDATE_INTERVAL_S": "1", "RAY_FAKE_CLUSTER": "1"})
if override_env:
env.update(override_env)
subprocess.check_call(cmd, env=env)

def shutdown(self):
Expand Down
11 changes: 8 additions & 3 deletions src/ray/raylet/scheduling/local_resource_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -429,9 +429,14 @@ std::optional<syncer::RaySyncMessage> LocalResourceManager::CreateSyncMessage(
resources_data.set_object_pulls_queued(resources.object_pulls_queued);
}

const auto now = absl::Now();
resources_data.set_idle_duration_ms(
absl::ToInt64Milliseconds(now - GetResourceIdleTime().value_or(now)));
auto idle_time = GetResourceIdleTime();
if (idle_time.has_value()) {
// We round up the idle duration to the nearest millisecond such that the idle
// reporting would be correct even if it's less than 1 millisecond.
const auto now = absl::Now();
resources_data.set_idle_duration_ms(std::max(
static_cast<int64_t>(1), absl::ToInt64Milliseconds(now - idle_time.value())));
}

resources_data.set_is_draining(IsLocalNodeDraining());

Expand Down
23 changes: 23 additions & 0 deletions src/ray/raylet/scheduling/local_resource_manager_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

// Don't know why macro redefinition happens, but this is failing windows
// build.
#include "ray/raylet/scheduling/local_resource_manager.h"

#include "gtest/gtest.h"
Expand Down Expand Up @@ -44,6 +46,13 @@ class LocalResourceManagerTest : public ::testing::Test {
}
}

rpc::ResourcesData GetSyncMessageForResourceReport() {
auto msg = manager->CreateSyncMessage(0, syncer::MessageType::RESOURCE_VIEW);
rpc::ResourcesData resources_data;
resources_data.ParseFromString(msg->sync_message());
return resources_data;
}

scheduling::NodeID local_node_id = scheduling::NodeID(0);
std::unique_ptr<LocalResourceManager> manager;
};
Expand Down Expand Up @@ -272,6 +281,9 @@ TEST_F(LocalResourceManagerTest, IdleResourceTimeTest) {
{
auto idle_time = manager->GetResourceIdleTime();
ASSERT_EQ(idle_time, absl::nullopt);

const auto &resources_data = GetSyncMessageForResourceReport();
ASSERT_EQ(resources_data.idle_duration_ms(), 0);
}

// Deallocate the resource
Expand All @@ -290,6 +302,10 @@ TEST_F(LocalResourceManagerTest, IdleResourceTimeTest) {
auto dur = absl::Now() - *idle_time;
ASSERT_GE(dur, absl::ZeroDuration());
ASSERT_LE(dur, absl::Seconds(1));

const auto &resources_data = GetSyncMessageForResourceReport();
ASSERT_GE(resources_data.idle_duration_ms(), 0);
ASSERT_LE(resources_data.idle_duration_ms(), 1 * 1000);
}
}

Expand All @@ -299,6 +315,9 @@ TEST_F(LocalResourceManagerTest, IdleResourceTimeTest) {
manager->UpdateAvailableObjectStoreMemResource();
auto idle_time = manager->GetResourceIdleTime();
ASSERT_EQ(idle_time, absl::nullopt);

const auto &resources_data = GetSyncMessageForResourceReport();
ASSERT_EQ(resources_data.idle_duration_ms(), 0);
}

// Free object store memory usage should make node resource idle.
Expand All @@ -309,6 +328,10 @@ TEST_F(LocalResourceManagerTest, IdleResourceTimeTest) {
ASSERT_TRUE(idle_time.has_value());
auto dur = absl::Now() - *idle_time;
ASSERT_GE(dur, absl::ZeroDuration());

// And syncer messages should be created correctly for resource reporting.
const auto &resources_data = GetSyncMessageForResourceReport();
ASSERT_GE(resources_data.idle_duration_ms(), 0);
}
}

Expand Down

0 comments on commit ce25b40

Please sign in to comment.