Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[pull] master from ray-project:master #2312

Merged
merged 8 commits into from
Aug 16, 2023
33 changes: 32 additions & 1 deletion dashboard/modules/node/node_head.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from ray.autoscaler._private.util import (
LoadMetricsSummary,
get_per_node_breakdown_as_dict,
parse_usage,
)
import ray.dashboard.consts as dashboard_consts
import ray.dashboard.optional_utils as dashboard_optional_utils
Expand Down Expand Up @@ -246,6 +247,35 @@ async def get_node_module_internal_state(self, req) -> aiohttp.web.Response:
)

async def get_nodes_logical_resources(self) -> dict:

from ray.autoscaler.v2.utils import is_autoscaler_v2

if is_autoscaler_v2():
from ray.autoscaler.v2.sdk import get_cluster_status

try:
cluster_status = get_cluster_status()
except Exception:
logger.exception("Error getting cluster status")
return {}

per_node_resources = {}
# TODO(rickyx): we should just return structure data rather than strings.
for node in cluster_status.healthy_nodes:
if not node.resource_usage:
continue

usage_dict = {
r.resource_name: (r.used, r.total)
for r in node.resource_usage.usage
}
per_node_resources[node.node_id] = "\n".join(
parse_usage(usage_dict, verbose=True)
)

return per_node_resources

# Legacy autoscaler status code.
(status_string, error) = await asyncio.gather(
*[
self._gcs_aio_client.internal_kv_get(
Expand All @@ -257,7 +287,8 @@ async def get_nodes_logical_resources(self) -> dict:
]
]
)

if not status_string:
return {}
status_dict = json.loads(status_string)

lm_summary_dict = status_dict.get("load_metrics_report")
Expand Down
19 changes: 10 additions & 9 deletions dashboard/modules/serve/tests/test_serve_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -714,26 +714,27 @@ def test_serve_namespace(ray_start_stop):
"""

config = {
"import_path": "ray.serve.tests.test_config_files.world.DagNode",
"applications": [
{
"name": "my_app",
"import_path": "ray.serve.tests.test_config_files.world.DagNode",
}
],
}

print("Deploying config.")
deploy_and_check_config(config)
deploy_config_multi_app(config)
wait_for_condition(
lambda: requests.post("http://localhost:8000/").text == "wonderful world",
timeout=15,
)
print("Deployments are live and reachable over HTTP.\n")

ray.init(address="auto", namespace="serve")
client = serve.start()
print("Connected to Serve with Python API.")
serve_status = client.get_serve_status()
my_app_status = serve.status().applications["my_app"]
assert (
len(serve_status.deployment_statuses) == 2
and serve_status.get_deployment_status(
f"{SERVE_DEFAULT_APP_NAME}{DEPLOYMENT_NAME_PREFIX_SEPARATOR}f"
)
len(my_app_status.deployments) == 2
and my_app_status.deployments[f"my_app{DEPLOYMENT_NAME_PREFIX_SEPARATOR}f"]
is not None
)
print("Successfully retrieved deployment statuses with Python API.")
Expand Down
42 changes: 42 additions & 0 deletions dashboard/tests/test_dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -628,6 +628,48 @@ def get_cluster_status():
assert "loadMetricsReport" in response.json()["data"]["clusterStatus"]


@pytest.mark.skipif(
os.environ.get("RAY_MINIMAL") == "1",
reason="This test is not supposed to work for minimal installation.",
)
@pytest.mark.parametrize(
"call_ray_start",
[
"""ray start --no-monitor --head --num-cpus 1 \
--system-config={"enable_autoscaler_v2":true}""",
"""ray start --head --num-cpus 1""",
],
indirect=True,
)
def test_get_nodes_summary(call_ray_start):

# The sleep is needed since it seems a previous shutdown could be not yet
# done when the next test starts. This prevents a previous cluster to be
# connected the current test session.
time.sleep(5)
address_info = ray.init(address=call_ray_start)
webui_url = address_info["webui_url"]
webui_url = format_web_url(webui_url)

def get_nodes_summary():
response = requests.get(f"{webui_url}/nodes?view=summary")
response.raise_for_status()
response = response.json()
print(response)

assert response["data"]["nodeLogicalResources"]
assert "0.0/1.0 CPU" in "".join(
response["data"]["nodeLogicalResources"].values()
)

assert wait_until_succeeded_without_exception(
get_nodes_summary,
(requests.RequestException,),
timeout_ms=10 * 1000,
retry_interval_ms=1000,
)


@pytest.mark.skipif(
os.environ.get("RAY_MINIMAL") == "1",
reason="This test is not supposed to work for minimal installation.",
Expand Down
4 changes: 2 additions & 2 deletions doc/source/data/batch_inference.rst
Original file line number Diff line number Diff line change
Expand Up @@ -476,13 +476,13 @@ The rest of the logic looks the same as in the `Quickstart <#quickstart>`_.
import xgboost

from ray.train import Checkpoint
from ray.train.xgboost import XGBoostCheckpoint
from ray.train.xgboost import LegacyXGBoostCheckpoint

test_dataset = valid_dataset.drop_columns(["target"])

class XGBoostPredictor:
def __init__(self, checkpoint: Checkpoint):
xgboost_checkpoint = XGBoostCheckpoint.from_checkpoint(checkpoint)
xgboost_checkpoint = LegacyXGBoostCheckpoint.from_checkpoint(checkpoint)
self.model = xgboost_checkpoint.get_model()

def __call__(self, data: pd.DataFrame) -> Dict[str, np.ndarray]:
Expand Down
12 changes: 6 additions & 6 deletions doc/source/ray-air/doc_code/computer_vision.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ def train_torch_model(dataset, preprocessor, per_epoch_preprocessor):

from ray import train
from ray.train import ScalingConfig
from ray.train.torch import TorchCheckpoint, TorchTrainer
from ray.train.torch import LegacyTorchCheckpoint, TorchTrainer

def train_one_epoch(model, *, criterion, optimizer, batch_size, epoch):
dataset_shard = train.get_dataset_shard("train")
Expand Down Expand Up @@ -213,7 +213,7 @@ def train_one_epoch(model, *, criterion, optimizer, batch_size, epoch):
"batch": i,
"running_loss": running_loss / 2000,
},
checkpoint=TorchCheckpoint.from_model(model),
checkpoint=LegacyTorchCheckpoint.from_model(model),
)
running_loss = 0

Expand Down Expand Up @@ -305,10 +305,10 @@ def create_torch_checkpoint(preprocessor):
# __torch_checkpoint_start__
from torchvision import models

from ray.train.torch import TorchCheckpoint
from ray.train.torch import LegacyTorchCheckpoint

model = models.resnet50(pretrained=True)
checkpoint = TorchCheckpoint.from_model(model, preprocessor=preprocessor)
checkpoint = LegacyTorchCheckpoint.from_model(model, preprocessor=preprocessor)
# __torch_checkpoint_stop__
return checkpoint

Expand All @@ -317,10 +317,10 @@ def create_tensorflow_checkpoint(preprocessor):
# __tensorflow_checkpoint_start__
import tensorflow as tf

from ray.train.tensorflow import TensorflowCheckpoint
from ray.train.tensorflow import LegacyTensorflowCheckpoint

model = tf.keras.applications.resnet50.ResNet50()
checkpoint = TensorflowCheckpoint.from_model(model, preprocessor=preprocessor)
checkpoint = LegacyTensorflowCheckpoint.from_model(model, preprocessor=preprocessor)
# __tensorflow_checkpoint_stop__
return checkpoint

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2146,9 +2146,9 @@
},
"outputs": [],
"source": [
"from ray.train.huggingface import TransformersCheckpoint\n",
"from ray.train.huggingface import LegacyTransformersCheckpoint\n",
"\n",
"checkpoint = TransformersCheckpoint.from_checkpoint(result.checkpoint)\n",
"checkpoint = LegacyTransformersCheckpoint.from_checkpoint(result.checkpoint)\n",
"hf_trainer = checkpoint.get_model(model=AutoModelForSequenceClassification)"
]
},
Expand Down Expand Up @@ -2237,7 +2237,7 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.8.13"
"version": "3.8.9"
},
"orphan": true,
"vscode": {
Expand Down
4 changes: 2 additions & 2 deletions doc/source/ray-air/examples/pytorch_tabular_starter.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@

from ray import train
from ray.train import ScalingConfig
from ray.train.torch import TorchCheckpoint, TorchTrainer
from ray.train.torch import LegacyTorchCheckpoint, TorchTrainer


def create_model(input_features):
Expand Down Expand Up @@ -75,7 +75,7 @@ def train_loop_per_worker(config):
train_loss.backward()
optimizer.step()
loss = train_loss.item()
train.report({"loss": loss}, checkpoint=TorchCheckpoint.from_model(model))
train.report({"loss": loss}, checkpoint=LegacyTorchCheckpoint.from_model(model))


num_features = len(train_dataset.schema().names) - 1
Expand Down
4 changes: 2 additions & 2 deletions doc/source/ray-air/examples/tfx_tabular_train_to_serve.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -689,7 +689,7 @@
"source": [
"from ray import train\n",
"from ray.train import Checkpoint\n",
"from ray.train.tensorflow import TensorflowCheckpoint\n",
"from ray.train.tensorflow import LegacyTensorflowCheckpoint\n",
"\n",
"def train_loop_per_worker():\n",
" dataset_shard = train.get_dataset_shard(\"train\")\n",
Expand All @@ -710,7 +710,7 @@
" # This saves checkpoint in a way that can be used by Ray Serve coherently.\n",
" train.report(\n",
" {},\n",
" checkpoint=TensorflowCheckpoint.from_model(model),\n",
" checkpoint=LegacyTensorflowCheckpoint.from_model(model),\n",
" )"
]
},
Expand Down
4 changes: 2 additions & 2 deletions doc/source/ray-overview/doc_test/ray_train.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import ray.train as train
from ray.train import ScalingConfig
from ray.train.torch import TorchTrainer, TorchCheckpoint
from ray.train.torch import TorchTrainer, LegacyTorchCheckpoint


def train_func():
Expand All @@ -29,7 +29,7 @@ def train_func():
optimizer.step()
train.report({"loss": loss.item()})

train.report({}, checkpoint=TorchCheckpoint.from_model(model))
train.report({}, checkpoint=LegacyTorchCheckpoint.from_model(model))


trainer = TorchTrainer(train_func, scaling_config=ScalingConfig(num_workers=4))
Expand Down
6 changes: 3 additions & 3 deletions doc/source/train/doc_code/dl_guide.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

import ray
from ray import train
from ray.train.torch import TorchCheckpoint, TorchTrainer
from ray.train.torch import LegacyTorchCheckpoint, TorchTrainer


def get_datasets() -> Dict[str, ray.data.Dataset]:
Expand All @@ -18,7 +18,7 @@ def train_loop_per_worker(config: dict):
from torchvision.models import resnet18

# Checkpoint loading
checkpoint: Optional[TorchCheckpoint] = train.get_checkpoint()
checkpoint: Optional[LegacyTorchCheckpoint] = train.get_checkpoint()
model = checkpoint.get_model() if checkpoint else resnet18()
ray.train.torch.prepare_model(model)

Expand All @@ -30,7 +30,7 @@ def train_loop_per_worker(config: dict):
# Checkpoint saving
train.report(
{"epoch": epoch},
checkpoint=TorchCheckpoint.from_model(model),
checkpoint=LegacyTorchCheckpoint.from_model(model),
)


Expand Down
Loading