From b8fc7c8bfebd96e879d61cd59895ec026909cc21 Mon Sep 17 00:00:00 2001 From: Yunxuan Xiao Date: Tue, 22 Aug 2023 18:06:31 -0700 Subject: [PATCH 1/5] init (#38756) --- doc/source/ray-overview/examples.rst | 30 ++++++++++++++-------------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/doc/source/ray-overview/examples.rst b/doc/source/ray-overview/examples.rst index 59e51f096cca..e068af3d03e0 100644 --- a/doc/source/ray-overview/examples.rst +++ b/doc/source/ray-overview/examples.rst @@ -43,7 +43,7 @@ Ray Examples Training 175B Parameter Language Models at 1000 GPU scale with Alpa and Ray .. grid-item-card:: :bdg-primary:`Blog` - :class-item: gallery-item gen-ai + :class-item: gallery-item gen-ai cv training :link: https://www.anyscale.com/blog/faster-stable-diffusion-fine-tuning-with-ray-air Faster stable diffusion fine-tuning with Ray AIR @@ -61,7 +61,7 @@ Ray Examples How OpenAI Uses Ray to Train Tools like ChatGPT .. grid-item-card:: :bdg-secondary:`Code example` - :class-item: gallery-item llm gen-ai huggingface + :class-item: gallery-item llm gen-ai huggingface training nlp :link: /ray-air/examples/gptj_deepspeed_fine_tuning :link-type: doc @@ -74,7 +74,7 @@ Ray Examples Aviary toolkit serving live traffic for LLMs .. grid-item-card:: :bdg-success:`Tutorial` - :class-item: gallery-item pytorch + :class-item: gallery-item pytorch training :link: /ray-air/examples/convert_existing_pytorch_code_to_ray_air :link-type: doc @@ -102,7 +102,7 @@ Ray Examples Perform batch tuning on NYC Taxi Dataset with Ray AIR .. grid-item-card:: :bdg-secondary:`Code example` - :class-item: gallery-item + :class-item: gallery-item llm nlp gen-ai :link: /ray-air/examples/gptj_batch_prediction :link-type: doc @@ -214,14 +214,14 @@ Ray Examples Java tutorial for Ray Serve .. grid-item-card:: :bdg-secondary:`Code example` - :class-item: gallery-item serving + :class-item: gallery-item serving cv :link: /serve/tutorials/stable-diffusion :link-type: doc Serving a Stable Diffusion Model .. grid-item-card:: :bdg-secondary:`Code example` - :class-item: gallery-item serving + :class-item: gallery-item serving nlp :link: /serve/tutorials/text-classification :link-type: doc @@ -456,7 +456,7 @@ Ray Examples Simple Distributed Hyperparameter Optimization .. grid-item-card:: :bdg-primary:`Blog` - :class-item: gallery-item tuning + :class-item: gallery-item tuning nlp huggingface :link: https://www.anyscale.com/blog/hyperparameter-search-hugging-face-transformers-ray-tune Hyperparameter Search with πŸ€— Transformers @@ -518,7 +518,7 @@ Ray Examples A Guide To Tuning Horovod Parameters With Tune .. grid-item-card:: :bdg-secondary:`Code example` - :class-item: gallery-item tuning huggingface tune serve + :class-item: gallery-item tuning huggingface tune serve nlp :link: tune-huggingface-example :link-type: ref @@ -544,28 +544,28 @@ Ray Examples Getting Started with Ray Train .. grid-item-card:: :bdg-secondary:`Code example` - :class-item: gallery-item training huggingface + :class-item: gallery-item training huggingface nlp :link: /ray-air/examples/huggingface_text_classification :link-type: doc Fine-tune a πŸ€— Transformers model .. grid-item-card:: :bdg-secondary:`Code example` - :class-item: gallery-item pytorch training train + :class-item: gallery-item pytorch training train cv :link: torch_fashion_mnist_ex :link-type: ref PyTorch Fashion MNIST Training Example .. grid-item-card:: :bdg-secondary:`Code example` - :class-item: gallery-item pytorch training train + :class-item: gallery-item pytorch training train nlp :link: train_transformers_example :link-type: ref Transformers with PyTorch Training Example .. grid-item-card:: :bdg-secondary:`Code example` - :class-item: gallery-item tensorflow training train + :class-item: gallery-item tensorflow training train cv :link: tensorflow_mnist_example :link-type: ref @@ -579,21 +579,21 @@ Ray Examples End-to-end Horovod Training Example .. grid-item-card:: :bdg-secondary:`Code example` - :class-item: gallery-item pytorch training train + :class-item: gallery-item pytorch training cv :link: lightning_mnist_example :link-type: ref End-to-end PyTorch Lightning Training Example .. grid-item-card:: :bdg-secondary:`Code example` - :class-item: gallery-item data-processing train + :class-item: gallery-item data-processing training nlp :link: lightning_advanced_example :link-type: ref Use LightningTrainer with Ray Data and Batch Predictor .. grid-item-card:: :bdg-secondary:`Code example` - :class-item: gallery-item tensorflow + :class-item: gallery-item tensorflow tuning :link: tune_train_tf_example :link-type: ref From 23ed18f29fcb76274273489b0f2b5b98cc2d2175 Mon Sep 17 00:00:00 2001 From: Yunxuan Xiao Date: Tue, 22 Aug 2023 18:34:40 -0700 Subject: [PATCH 2/5] [Train] Split all Ray Datasets by default (#38694) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Previously we only shard the "train" Ray Dataset by default. If users want to shard other datasets, they need to explicitly specify it with a `DataConfig`. e.g. `DataConfig(datasets_to_split=["train", "eval"])`. We now change the default behavior to shard all datasets by default for the following considerations: - Efficiency: We want people to leverage Ray Data as best as possible. The best way to optimize training time is to leverage the fact that Ray Data can effectively shard all the datasets across workers. Training frameworks (e.g. Lightning) provide ways to aggregate results across workers, and we should be recommending users to shard their validation datasets. - Consistency: It is conceptually easier for users to understand a single default behavior applied to all Datasets and to be provided options to configure them. - Explicitness: The behavior for the magic β€œtrain” key is not very explicit, and users will not understand this until they really read through the documentation. Relying on untyped keywords is non-ideal. ### API - Shard all datasets(default): ```python TorchTrainer( datasets={"a": ds_1, "b": ds_2, "c": ds_3}, # data_config=DataConfig(datasets_to_split="all") ) ``` - Shard a subset of datasets ```python TorchTrainer( datasets={"a": ds_1, "b": ds_2, "c": ds_3}, data_config=DataConfig(datasets_to_split=["a", "b"]) ) ``` Signed-off-by: woshiyyya Signed-off-by: Yunxuan Xiao Co-authored-by: matthewdeng Co-authored-by: Eric Liang Co-authored-by: Cheng Su --- .../data-loading-preprocessing.rst | 29 ++++++---- .../ray/air/tests/test_new_dataset_config.py | 57 ++++++++++++++++++- python/ray/train/_internal/data_config.py | 41 +++++++++---- .../pytorch/torch_regression_example.py | 3 +- .../pytorch/tune_torch_regression_example.py | 3 +- .../train/tests/test_data_parallel_trainer.py | 2 +- python/ray/train/torch/torch_trainer.py | 6 +- 7 files changed, 112 insertions(+), 29 deletions(-) diff --git a/doc/source/train/user-guides/data-loading-preprocessing.rst b/doc/source/train/user-guides/data-loading-preprocessing.rst index 89160cda93e3..7a4ac4862350 100644 --- a/doc/source/train/user-guides/data-loading-preprocessing.rst +++ b/doc/source/train/user-guides/data-loading-preprocessing.rst @@ -209,10 +209,13 @@ Your preprocessed datasets can be passed into a Ray Train Trainer (e.g. :class:` The datasets passed into the Trainer's ``datasets`` can be accessed inside of the ``train_loop_per_worker`` run on each distributed training worker by calling :meth:`ray.train.get_dataset_shard`. -The default splitting behavior is as follows: +All datasets are split (i.e. sharded) across the training workers by default. :meth:`~ray.train.get_dataset_shard` will return ``1/n`` of the dataset, where ``n`` is the number of training workers. -- The ``"train"`` dataset is split (i.e. sharded) across the training workers. :meth:`~ray.train.get_dataset_shard` will return ``1/n`` of the dataset, where ``n`` is the number of training workers. -- All other dataset are not split. :meth:`~ray.train.get_dataset_shard` will return the full dataset. +.. note:: + + Please be aware that as the evaluation dataset is split, users have to aggregate the evaluation results across workers. + You might consider using `TorchMetrics `_ (:ref:`example `) or + utilities available in other frameworks that you can explore. This behavior can be overwritten by passing in the ``dataset_config`` argument. For more information on configuring splitting logic, see :ref:`Splitting datasets `. @@ -298,11 +301,11 @@ For more details, see the following sections for each framework. Splitting datasets ------------------ -By default, Ray Train splits the ``"train"`` dataset across workers using :meth:`Dataset.streaming_split `. Each worker sees a disjoint subset of the data, instead of iterating over the entire dataset. Unless randomly shuffled, the same splits are used for each iteration of the dataset. +By default, Ray Train splits all datasets across workers using :meth:`Dataset.streaming_split `. Each worker sees a disjoint subset of the data, instead of iterating over the entire dataset. Unless randomly shuffled, the same splits are used for each iteration of the dataset. -For all other datasets, Ray Train passes the entire dataset to each worker. +If want to customize which datasets are split, pass in a :class:`DataConfig ` to the Trainer constructor. -To customize this, pass in a :class:`DataConfig ` to the Trainer constructor. For example, to split both the training and validation datasets, do the following: +For example, to split only the training dataset, do the following: .. testcode:: @@ -317,18 +320,24 @@ To customize this, pass in a :class:`DataConfig ` to the T train_ds, val_ds = ds.train_test_split(0.3) def train_loop_per_worker(): - # Get an iterator to the dataset we passed in below. - it = train.get_dataset_shard("train") + # Get the sharded training dataset + train_ds = train.get_dataset_shard("train") for _ in range(2): - for batch in it.iter_batches(batch_size=128): + for batch in train_ds.iter_batches(batch_size=128): print("Do some training on batch", batch) + + # Get the unsharded full validation dataset + val_ds = train.get_dataset_shard("val") + for _ in range(2): + for batch in val_ds.iter_batches(batch_size=128): + print("Do some evaluation on batch", batch) my_trainer = TorchTrainer( train_loop_per_worker, scaling_config=ScalingConfig(num_workers=2), datasets={"train": train_ds, "val": val_ds}, dataset_config=ray.train.DataConfig( - datasets_to_split=["train", "val"], + datasets_to_split=["train"], ), ) my_trainer.fit() diff --git a/python/ray/air/tests/test_new_dataset_config.py b/python/ray/air/tests/test_new_dataset_config.py index 3814d679977e..bbeea5eb9abf 100644 --- a/python/ray/air/tests/test_new_dataset_config.py +++ b/python/ray/air/tests/test_new_dataset_config.py @@ -59,9 +59,9 @@ def test_basic(ray_start_4_cpus): test = TestBasic(1, True, {"train": 10, "test": -1}, datasets={"train": ds}) test.fit() - # Two workers, train split. + # Two workers, train and test split. test = TestBasic( - 2, True, {"train": 5, "test": 10}, datasets={"train": ds, "test": ds} + 2, True, {"train": 5, "test": 5}, datasets={"train": ds, "test": ds} ) test.fit() @@ -78,6 +78,59 @@ def test_basic(ray_start_4_cpus): test.fit() +def test_split(ray_start_4_cpus): + ds = ray.data.range(10) + + # Split all by default + test = TestBasic( + 2, + True, + {"train": 5, "test": 5, "val": 5}, + datasets={"train": ds, "test": ds, "val": ds}, + ) + test.fit() + + # Test flag "all" + test = TestBasic( + 2, + True, + {"train": 5, "test": 5}, + datasets={"train": ds, "test": ds}, + dataset_config=DataConfig(datasets_to_split="all"), + ) + + # Test split train only. + test = TestBasic( + 2, + True, + {"train": 5, "test": 10}, + datasets={"train": ds, "test": ds}, + dataset_config=DataConfig(datasets_to_split=["train"]), + ) + test.fit() + + # Test invalid arguments + for datasets_to_split in ["train", ("train"), {}]: + with pytest.raises(TypeError, match="`datasets_to_split` should be.*"): + test = TestBasic( + 2, + True, + {"train": 5, "test": 10}, + datasets={"train": ds, "test": ds}, + dataset_config=DataConfig(datasets_to_split=datasets_to_split), + ) + + # Test empty `datasets_to_split` list + test = TestBasic( + 2, + True, + {"train": 10, "test": 10}, + datasets={"train": ds, "test": ds}, + dataset_config=DataConfig(datasets_to_split=[]), + ) + test.fit() + + @pytest.mark.skip( reason="Incomplete implementation of _validate_dag causes other errors, so we " "remove DAG validation for now; see https://github.com/ray-project/ray/pull/37829" diff --git a/python/ray/train/_internal/data_config.py b/python/ray/train/_internal/data_config.py index d3a1d07b7246..e8f82dbdc204 100644 --- a/python/ray/train/_internal/data_config.py +++ b/python/ray/train/_internal/data_config.py @@ -1,8 +1,10 @@ -from typing import Optional, Dict, List +from typing import Optional, Union, Dict, List import ray from ray.actor import ActorHandle -from ray.train.constants import TRAIN_DATASET_KEY + +# TODO(justinvyu): Fix the circular import error +from ray.train.constants import TRAIN_DATASET_KEY # noqa from ray.train._internal.dataset_spec import DataParallelIngestSpec from ray.util.annotations import PublicAPI, DeveloperAPI from ray.air.config import DatasetConfig @@ -15,6 +17,13 @@ ) from ray.data.preprocessor import Preprocessor +import sys + +if sys.version_info >= (3, 8): + from typing import Literal +else: + from typing_extensions import Literal + @PublicAPI class DataConfig: @@ -26,21 +35,28 @@ class DataConfig: def __init__( self, - datasets_to_split: Optional[List[str]] = None, + datasets_to_split: Union[Literal["all"], List[str]] = "all", execution_options: Optional[ExecutionOptions] = None, ): """Construct a DataConfig. Args: - datasets_to_split: The list of dataset names to split between workers. - By default, only the "train" dataset will be split. + datasets_to_split: Specifies which datasets should be split among workers. + Can be set to "all" or a list of dataset names. Defaults to "all", + i.e. split all datasets. execution_options: The execution options to pass to Ray Data. By default, the options will be optimized for data ingest. When overriding this, base your options off of `DataConfig.default_ingest_options()`. """ - self._datasets_to_split: List[str] = ( - datasets_to_split if datasets_to_split is not None else [TRAIN_DATASET_KEY] - ) + if isinstance(datasets_to_split, list) or datasets_to_split == "all": + self._datasets_to_split = datasets_to_split + else: + raise TypeError( + "`datasets_to_split` should be a 'all' or a list of strings of " + "dataset names. Received " + f"{type(datasets_to_split).__name__} with value {datasets_to_split}." + ) + self._execution_options: ExecutionOptions = ( execution_options or DataConfig.default_ingest_options() ) @@ -68,12 +84,17 @@ def configure( equal to `world_size`. Each element of the list contains the assigned `DataIterator` instances by name for the worker. """ - output = [{} for i in range(world_size)] + output = [{} for _ in range(world_size)] + + if self._datasets_to_split == "all": + datasets_to_split = set(datasets.keys()) + else: + datasets_to_split = set(self._datasets_to_split) for name, ds in datasets.items(): ds = ds.copy(ds) ds.context.execution_options = self._execution_options - if name in self._datasets_to_split: + if name in datasets_to_split: for i, split in enumerate( ds.streaming_split( world_size, equal=True, locality_hints=worker_node_ids diff --git a/python/ray/train/examples/pytorch/torch_regression_example.py b/python/ray/train/examples/pytorch/torch_regression_example.py index 99f886570726..25c020cc30f8 100644 --- a/python/ray/train/examples/pytorch/torch_regression_example.py +++ b/python/ray/train/examples/pytorch/torch_regression_example.py @@ -11,7 +11,7 @@ import ray import ray.train as train -from ray.train import ScalingConfig +from ray.train import ScalingConfig, DataConfig from ray.data import Dataset from ray.train.torch import TorchTrainer @@ -120,6 +120,7 @@ def train_regression(num_workers=2, use_gpu=False): train_loop_config=config, scaling_config=ScalingConfig(num_workers=num_workers, use_gpu=use_gpu), datasets={"train": train_dataset, "validation": val_dataset}, + dataset_config=DataConfig(datasets_to_split=["train"]), ) result = trainer.fit() diff --git a/python/ray/train/examples/pytorch/tune_torch_regression_example.py b/python/ray/train/examples/pytorch/tune_torch_regression_example.py index ebc4c08787d0..b195ef2eb09d 100644 --- a/python/ray/train/examples/pytorch/tune_torch_regression_example.py +++ b/python/ray/train/examples/pytorch/tune_torch_regression_example.py @@ -3,7 +3,7 @@ import ray from ray import tune from ray.train.torch import TorchTrainer -from ray.train import ScalingConfig +from ray.train import ScalingConfig, DataConfig from ray.tune.tune_config import TuneConfig from ray.tune.tuner import Tuner @@ -20,6 +20,7 @@ def tune_linear(num_workers, num_samples, use_gpu): train_loop_config=config, scaling_config=ScalingConfig(num_workers=num_workers, use_gpu=use_gpu), datasets={"train": train_dataset, "validation": val_dataset}, + dataset_config=DataConfig(datasets_to_split=["train"]), ) tuner = Tuner( diff --git a/python/ray/train/tests/test_data_parallel_trainer.py b/python/ray/train/tests/test_data_parallel_trainer.py index b90c6dfb4cf6..27ab5bfbc10e 100644 --- a/python/ray/train/tests/test_data_parallel_trainer.py +++ b/python/ray/train/tests/test_data_parallel_trainer.py @@ -137,7 +137,7 @@ def get_dataset(): # All other datasets should not be sharded. val_dataset = train.get_dataset_shard("val") val_ds_count = len(list(val_dataset.iter_rows())) - assert val_ds_count == num_val_data + assert val_ds_count == num_val_data / scale_config.num_workers trainer = DataParallelTrainer( train_loop_per_worker=get_dataset, diff --git a/python/ray/train/torch/torch_trainer.py b/python/ray/train/torch/torch_trainer.py index a072e403a1fe..afe1500c89a5 100644 --- a/python/ray/train/torch/torch_trainer.py +++ b/python/ray/train/torch/torch_trainer.py @@ -164,10 +164,8 @@ def train_loop_per_worker(config): Sharding and additional configuration can be done by passing in a ``dataset_config``. dataset_config: The configuration for ingesting the input ``datasets``. - By default: - - - The ``"train"`` Dataset is split equally across workers. - - All other Datasets are **not** split. + By default, all the Ray Dataset are split equally across workers. + See :class:`~ray.train.DataConfig` for more details. resume_from_checkpoint: A checkpoint to resume training from. This checkpoint can be accessed from within ``train_loop_per_worker`` by calling ``ray.train.get_checkpoint()``. From dec4bd8f4d40a61d965256e6e5cc3b9c3b8972aa Mon Sep 17 00:00:00 2001 From: Edward Oakes Date: Tue, 22 Aug 2023 21:23:27 -0700 Subject: [PATCH 3/5] [serve] Unify usage tracking code under `ServeUsageTag` (#38658) We have a mixture of raw strings and Ray `TagKey`s being used throughout the code. Unifying them under a single interface. Also unified the report parsing code used in tests. --- .../ray/serve/_private/application_state.py | 14 +- python/ray/serve/_private/deployment_state.py | 23 +-- python/ray/serve/_private/usage.py | 40 ++++ python/ray/serve/_private/utils.py | 48 ----- python/ray/serve/api.py | 13 +- python/ray/serve/controller.py | 4 +- python/ray/serve/deployment.py | 17 +- python/ray/serve/drivers.py | 9 +- python/ray/serve/handle.py | 13 +- python/ray/serve/multiplex.py | 5 +- python/ray/serve/tests/test_telemetry.py | 189 +++++++++--------- 11 files changed, 176 insertions(+), 199 deletions(-) create mode 100644 python/ray/serve/_private/usage.py diff --git a/python/ray/serve/_private/application_state.py b/python/ray/serve/_private/application_state.py index a0c46fed50ea..59f2129004a0 100644 --- a/python/ray/serve/_private/application_state.py +++ b/python/ray/serve/_private/application_state.py @@ -9,7 +9,6 @@ import ray from ray import cloudpickle from ray.exceptions import RuntimeEnvSetupError -from ray._private.usage.usage_lib import TagKey, record_extra_usage_tag from ray._private.utils import import_attr from ray.serve.config import DeploymentConfig from ray.serve.exceptions import RayServeException @@ -35,6 +34,7 @@ from ray.serve._private.deployment_state import DeploymentStateManager from ray.serve._private.endpoint_state import EndpointState from ray.serve._private.storage.kv_store import KVStoreBase +from ray.serve._private.usage import ServeUsageTag from ray.serve._private.utils import ( check_obj_ref_ready_nowait, override_runtime_envs_except_env_vars, @@ -747,9 +747,7 @@ def apply_deployment_args(self, name: str, deployment_args: List[Dict]) -> None: self._endpoint_state, self._save_checkpoint_func, ) - record_extra_usage_tag( - TagKey.SERVE_NUM_APPS, str(len(self._application_states)) - ) + ServeUsageTag.NUM_APPS.record(str(len(self._application_states))) deployment_infos = { params["deployment_name"]: deploy_args_to_deployment_info( @@ -774,9 +772,7 @@ def deploy_config( endpoint_state=self._endpoint_state, save_checkpoint_func=self._save_checkpoint_func, ) - record_extra_usage_tag( - TagKey.SERVE_NUM_APPS, str(len(self._application_states)) - ) + ServeUsageTag.NUM_APPS.record(str(len(self._application_states))) self._application_states[name].deploy_config( app_config, deployment_time, @@ -850,9 +846,7 @@ def update(self): if len(apps_to_be_deleted) > 0: for app_name in apps_to_be_deleted: del self._application_states[app_name] - record_extra_usage_tag( - TagKey.SERVE_NUM_APPS, str(len(self._application_states)) - ) + ServeUsageTag.NUM_APPS.record(str(len(self._application_states))) def shutdown(self) -> None: for app_state in self._application_states.values(): diff --git a/python/ray/serve/_private/deployment_state.py b/python/ray/serve/_private/deployment_state.py index 77cc7ac0d8ff..f2d3aa10d78c 100644 --- a/python/ray/serve/_private/deployment_state.py +++ b/python/ray/serve/_private/deployment_state.py @@ -16,10 +16,6 @@ from ray.actor import ActorHandle from ray.exceptions import RayActorError, RayError, RayTaskError, RuntimeEnvSetupError from ray.util.placement_group import PlacementGroup -from ray._private.usage.usage_lib import ( - TagKey, - record_extra_usage_tag, -) from ray.serve._private.autoscaling_metrics import InMemoryMetricsStore from ray.serve._private.common import ( @@ -51,6 +47,7 @@ from ray.serve.generated.serve_pb2 import DeploymentLanguage from ray.serve._private.long_poll import LongPollHost, LongPollNamespace from ray.serve._private.storage.kv_store import KVStoreBase +from ray.serve._private.usage import ServeUsageTag from ray.serve._private.utils import ( JavaActorHandleProxy, format_actor_name, @@ -1358,16 +1355,12 @@ def _set_target_state(self, target_info: DeploymentInfo) -> None: self._target_state.version.deployment_config.autoscaling_config != target_state.version.deployment_config.autoscaling_config ): - record_extra_usage_tag( - TagKey.SERVE_AUTOSCALING_CONFIG_LIGHTWEIGHT_UPDATED, "True" - ) + ServeUsageTag.AUTOSCALING_CONFIG_LIGHTWEIGHT_UPDATED.record("True") elif ( self._target_state.version.deployment_config.num_replicas != target_state.version.deployment_config.num_replicas ): - record_extra_usage_tag( - TagKey.SERVE_NUM_REPLICAS_LIGHTWEIGHT_UPDATED, "True" - ) + ServeUsageTag.NUM_REPLICAS_LIGHTWEIGHT_UPDATED.record("True") self._target_state = target_state self._curr_status_info = DeploymentStatusInfo( @@ -1552,7 +1545,7 @@ def _stop_or_update_outdated_version_replicas(self, max_to_stop=math.inf) -> boo "with outdated deployment configs." ) # Record user config lightweight update - record_extra_usage_tag(TagKey.SERVE_USER_CONFIG_LIGHTWEIGHT_UPDATED, "True") + ServeUsageTag.USER_CONFIG_LIGHTWEIGHT_UPDATED.record("True") return replicas_changed @@ -2675,9 +2668,7 @@ def update(self) -> bool: return any_recovering def _record_deployment_usage(self): - record_extra_usage_tag( - TagKey.SERVE_NUM_DEPLOYMENTS, str(len(self._deployment_states)) - ) + ServeUsageTag.NUM_DEPLOYMENTS.record(str(len(self._deployment_states))) num_gpu_deployments = 0 for deployment_state in self._deployment_states.values(): @@ -2694,9 +2685,7 @@ def _record_deployment_usage(self): ) ): num_gpu_deployments += 1 - record_extra_usage_tag( - TagKey.SERVE_NUM_GPU_DEPLOYMENTS, str(num_gpu_deployments) - ) + ServeUsageTag.NUM_GPU_DEPLOYMENTS.record(str(num_gpu_deployments)) def record_multiplexed_replica_info(self, info: MultiplexedReplicaInfo): """ diff --git a/python/ray/serve/_private/usage.py b/python/ray/serve/_private/usage.py new file mode 100644 index 000000000000..2d04977470db --- /dev/null +++ b/python/ray/serve/_private/usage.py @@ -0,0 +1,40 @@ +from enum import Enum +from typing import Dict, Optional + +from ray._private.usage.usage_lib import TagKey, record_extra_usage_tag + + +class ServeUsageTag(Enum): + API_VERSION = TagKey.SERVE_API_VERSION + NUM_DEPLOYMENTS = TagKey.SERVE_NUM_DEPLOYMENTS + GCS_STORAGE = TagKey.GCS_STORAGE + NUM_GPU_DEPLOYMENTS = TagKey.SERVE_NUM_GPU_DEPLOYMENTS + FASTAPI_USED = TagKey.SERVE_FASTAPI_USED + DAG_DRIVER_USED = TagKey.SERVE_DAG_DRIVER_USED + HTTP_ADAPTER_USED = TagKey.SERVE_HTTP_ADAPTER_USED + GRPC_INGRESS_USED = TagKey.SERVE_GRPC_INGRESS_USED + REST_API_VERSION = TagKey.SERVE_REST_API_VERSION + NUM_APPS = TagKey.SERVE_NUM_APPS + NUM_REPLICAS_LIGHTWEIGHT_UPDATED = TagKey.SERVE_NUM_REPLICAS_LIGHTWEIGHT_UPDATED + USER_CONFIG_LIGHTWEIGHT_UPDATED = TagKey.SERVE_USER_CONFIG_LIGHTWEIGHT_UPDATED + AUTOSCALING_CONFIG_LIGHTWEIGHT_UPDATED = ( + TagKey.SERVE_AUTOSCALING_CONFIG_LIGHTWEIGHT_UPDATED + ) + RAY_SERVE_HANDLE_API_USED = TagKey.SERVE_RAY_SERVE_HANDLE_API_USED + RAY_SERVE_SYNC_HANDLE_API_USED = TagKey.SERVE_RAY_SERVE_SYNC_HANDLE_API_USED + DEPLOYMENT_HANDLE_API_USED = TagKey.SERVE_DEPLOYMENT_HANDLE_API_USED + DEPLOYMENT_HANDLE_TO_OBJECT_REF_API_USED = ( + TagKey.SERVE_DEPLOYMENT_HANDLE_TO_OBJECT_REF_API_USED + ) + MULTIPLEXED_API_USED = TagKey.SERVE_MULTIPLEXED_API_USED + + def record(self, value: str): + """Record telemetry value.""" + record_extra_usage_tag(self.value, value) + + def get_value_from_report(self, report: Dict) -> Optional[str]: + """Returns `None` if the tag isn't in the report.""" + if "extra_usage_tags" not in report: + return None + + return report["extra_usage_tags"].get(TagKey.Name(self.value).lower(), None) diff --git a/python/ray/serve/_private/utils.py b/python/ray/serve/_private/utils.py index 016b31a116cc..c4269572ecf9 100644 --- a/python/ray/serve/_private/utils.py +++ b/python/ray/serve/_private/utils.py @@ -41,7 +41,6 @@ from ray.util.serialization import StandaloneSerializationContext from ray._raylet import MessagePackSerializer from ray._private.utils import import_attr -from ray._private.usage.usage_lib import TagKey, record_extra_usage_tag from ray._private.resource_spec import HEAD_NODE_RESOURCE_NAME import __main__ @@ -493,53 +492,6 @@ def check_obj_ref_ready_nowait(obj_ref: ObjectRef) -> bool: return len(finished) == 1 -serve_telemetry_tag_map = { - "SERVE_API_VERSION": TagKey.SERVE_API_VERSION, - "SERVE_NUM_DEPLOYMENTS": TagKey.SERVE_NUM_DEPLOYMENTS, - "GCS_STORAGE": TagKey.GCS_STORAGE, - "SERVE_NUM_GPU_DEPLOYMENTS": TagKey.SERVE_NUM_GPU_DEPLOYMENTS, - "SERVE_FASTAPI_USED": TagKey.SERVE_FASTAPI_USED, - "SERVE_DAG_DRIVER_USED": TagKey.SERVE_DAG_DRIVER_USED, - "SERVE_HTTP_ADAPTER_USED": TagKey.SERVE_HTTP_ADAPTER_USED, - "SERVE_GRPC_INGRESS_USED": TagKey.SERVE_GRPC_INGRESS_USED, - "SERVE_REST_API_VERSION": TagKey.SERVE_REST_API_VERSION, - "SERVE_NUM_APPS": TagKey.SERVE_NUM_APPS, - "SERVE_NUM_REPLICAS_LIGHTWEIGHT_UPDATED": ( - TagKey.SERVE_NUM_REPLICAS_LIGHTWEIGHT_UPDATED - ), - "SERVE_USER_CONFIG_LIGHTWEIGHT_UPDATED": ( - TagKey.SERVE_USER_CONFIG_LIGHTWEIGHT_UPDATED - ), - "SERVE_AUTOSCALING_CONFIG_LIGHTWEIGHT_UPDATED": ( - TagKey.SERVE_AUTOSCALING_CONFIG_LIGHTWEIGHT_UPDATED - ), - "SERVE_RAY_SERVE_HANDLE_API_USED": TagKey.SERVE_RAY_SERVE_HANDLE_API_USED, - "SERVE_RAY_SERVE_SYNC_HANDLE_API_USED": TagKey.SERVE_RAY_SERVE_SYNC_HANDLE_API_USED, - "SERVE_DEPLOYMENT_HANDLE_API_USED": TagKey.SERVE_DEPLOYMENT_HANDLE_API_USED, - "SERVE_DEPLOYMENT_HANDLE_TO_OBJECT_REF_API_USED": ( - TagKey.SERVE_DEPLOYMENT_HANDLE_TO_OBJECT_REF_API_USED - ), - "SERVE_MULTIPLEXED_API_USED": TagKey.SERVE_MULTIPLEXED_API_USED, -} - - -def record_serve_tag(key: str, value: str): - """Record telemetry. - - TagKey objects cannot be pickled, so deployments can't directly record - telemetry using record_extra_usage_tag. They can instead call this function - which records telemetry for them. - """ - - if key not in serve_telemetry_tag_map: - raise ValueError( - f'The TagKey "{key}" does not exist. Expected a key from: ' - f"{list(serve_telemetry_tag_map.keys())}." - ) - - record_extra_usage_tag(serve_telemetry_tag_map[key], value) - - def extract_self_if_method_call(args: List[Any], func: Callable) -> Optional[object]: """Check if this is a method rather than a function. diff --git a/python/ray/serve/api.py b/python/ray/serve/api.py index 08fb56c99f3b..c6b62abae981 100644 --- a/python/ray/serve/api.py +++ b/python/ray/serve/api.py @@ -5,7 +5,6 @@ from functools import wraps from fastapi import APIRouter, FastAPI -from ray._private.usage.usage_lib import TagKey, record_extra_usage_tag import ray from ray import cloudpickle @@ -44,6 +43,7 @@ ASGIAppReplicaWrapper, make_fastapi_class_based_view, ) +from ray.serve._private.usage import ServeUsageTag from ray.serve._private.utils import ( DEFAULT, Default, @@ -51,7 +51,6 @@ in_interactive_shell, install_serve_encoders_to_fastapi, guarded_deprecation_warning, - record_serve_tag, get_random_letters, extract_self_if_method_call, ) @@ -112,7 +111,7 @@ def start( client = _private_api.serve_start(detached, http_options, dedicated_cpu, **kwargs) # Record after Ray has been started. - record_extra_usage_tag(TagKey.SERVE_API_VERSION, "v1") + ServeUsageTag.API_VERSION.record("v1") return client @@ -224,7 +223,7 @@ def __init__(self, *args, **kwargs): # Call user-defined constructor. cls.__init__(self, *args, **kwargs) - record_serve_tag("SERVE_FASTAPI_USED", "1") + ServeUsageTag.FASTAPI_USED.record("1") install_serve_encoders_to_fastapi() ASGIAppReplicaWrapper.__init__(self, frozen_app) @@ -427,7 +426,7 @@ def get_deployment(name: str) -> Deployment: Returns: Deployment """ - record_extra_usage_tag(TagKey.SERVE_API_VERSION, "v1") + ServeUsageTag.API_VERSION.record("v1") return _private_api.get_deployment(name) @@ -438,7 +437,7 @@ def list_deployments() -> Dict[str, Deployment]: Dictionary maps deployment name to Deployment objects. """ - record_extra_usage_tag(TagKey.SERVE_API_VERSION, "v1") + ServeUsageTag.API_VERSION.record("v1") return _private_api.list_deployments() @@ -482,7 +481,7 @@ def run( ) # Record after Ray has been started. - record_extra_usage_tag(TagKey.SERVE_API_VERSION, "v2") + ServeUsageTag.API_VERSION.record("v2") if isinstance(target, Application): deployments = pipeline_build(target._get_internal_dag_node(), name) diff --git a/python/ray/serve/controller.py b/python/ray/serve/controller.py index e820a0648bb0..6a787048916e 100644 --- a/python/ray/serve/controller.py +++ b/python/ray/serve/controller.py @@ -56,11 +56,11 @@ ServeActorDetails, ) from ray.serve._private.storage.kv_store import RayInternalKVStore +from ray.serve._private.usage import ServeUsageTag from ray.serve._private.utils import ( call_function_from_import_path, get_all_live_placement_group_names, get_head_node_id, - record_serve_tag, ) from ray.serve._private.application_state import ApplicationStateManager from ray.serve._private.default_impl import ( @@ -661,7 +661,7 @@ def deploy_apps( deployment_time: set deployment_timestamp. If not provided, time.time() is used to indicate the deployment time. """ - record_serve_tag("SERVE_API_VERSION", "v2") + ServeUsageTag.API_VERSION.record("v2") # TODO (zcin): We should still support single-app mode, i.e. # ServeApplicationSchema. Eventually, after migration is complete, we should # deprecate such usage. diff --git a/python/ray/serve/deployment.py b/python/ray/serve/deployment.py index fd914e0420ca..7660f8adbe0f 100644 --- a/python/ray/serve/deployment.py +++ b/python/ray/serve/deployment.py @@ -10,25 +10,30 @@ Tuple, Union, ) -from ray._private.usage.usage_lib import TagKey, record_extra_usage_tag -from ray.serve.context import get_global_client from ray.dag.dag_node import DAGNodeBase from ray.dag.class_node import ClassNode from ray.dag.function_node import FunctionNode +from ray.util.annotations import Deprecated, PublicAPI + from ray.serve.config import ( AutoscalingConfig, DeploymentConfig, ReplicaConfig, ) -from ray.serve._private.constants import SERVE_LOGGER_NAME, MIGRATION_MESSAGE +from ray.serve.context import get_global_client from ray.serve.handle import RayServeHandle, RayServeSyncHandle -from ray.serve._private.utils import DEFAULT, Default, guarded_deprecation_warning -from ray.util.annotations import Deprecated, PublicAPI from ray.serve.schema import ( RayActorOptionsSchema, DeploymentSchema, ) +from ray.serve._private.constants import SERVE_LOGGER_NAME, MIGRATION_MESSAGE +from ray.serve._private.usage import ServeUsageTag +from ray.serve._private.utils import ( + DEFAULT, + Default, + guarded_deprecation_warning, +) logger = logging.getLogger(SERVE_LOGGER_NAME) @@ -286,7 +291,7 @@ def deploy(self, *init_args, _blocking=True, **init_kwargs): init_kwargs: kwargs to pass to the class __init__ method. Not valid if this deployment wraps a function. """ - record_extra_usage_tag(TagKey.SERVE_API_VERSION, "v1") + ServeUsageTag.API_VERSION.record("v1") self._deploy(*init_args, _blocking=_blocking, **init_kwargs) # TODO(Sihan) Promote the _deploy to deploy after we fully deprecate the API diff --git a/python/ray/serve/drivers.py b/python/ray/serve/drivers.py index 898f06692ee1..e9cef0ed2fbd 100644 --- a/python/ray/serve/drivers.py +++ b/python/ray/serve/drivers.py @@ -21,7 +21,8 @@ from ray.serve.handle import RayServeHandle from ray.serve._private.constants import DEFAULT_GRPC_PORT, SERVE_LOGGER_NAME from ray.serve._private.http_util import ASGIAppReplicaWrapper -from ray.serve._private.utils import install_serve_encoders_to_fastapi, record_serve_tag +from ray.serve._private.usage import ServeUsageTag +from ray.serve._private.utils import install_serve_encoders_to_fastapi logger = logging.getLogger(SERVE_LOGGER_NAME) @@ -46,9 +47,9 @@ def __init__( HTTP requests to Ray Serve input. """ - record_serve_tag("SERVE_DAG_DRIVER_USED", "1") + ServeUsageTag.DAG_DRIVER_USED.record("1") if http_adapter is not None: - record_serve_tag("SERVE_HTTP_ADAPTER_USED", "1") + ServeUsageTag.HTTP_ADAPTER_USED.record("1") install_serve_encoders_to_fastapi() http_adapter = load_http_adapter(http_adapter) @@ -141,7 +142,7 @@ def __init__(self, port: int = DEFAULT_GRPC_PORT): self.setup_complete = asyncio.Event() self.running_task = get_or_create_event_loop().create_task(self.run()) - record_serve_tag("SERVE_GRPC_INGRESS_USED", "1") + ServeUsageTag.GRPC_INGRESS_USED.record("1") async def run(self): """Start gRPC Server""" diff --git a/python/ray/serve/handle.py b/python/ray/serve/handle.py index 228325e3eb0b..3c36f6b5c23f 100644 --- a/python/ray/serve/handle.py +++ b/python/ray/serve/handle.py @@ -13,9 +13,9 @@ from ray.serve._private.constants import ( RAY_SERVE_ENABLE_NEW_ROUTING, ) +from ray.serve._private.usage import ServeUsageTag from ray.serve._private.utils import ( get_random_letters, - record_serve_tag, DEFAULT, ) from ray.serve._private.router import Router, RequestMetadata @@ -122,13 +122,12 @@ def _record_telemetry_if_needed(self): and self.handle_options._request_protocol == RequestProtocol.UNDEFINED ): if self.__class__ == DeploymentHandle: - tag = "SERVE_DEPLOYMENT_HANDLE_API_USED" + ServeUsageTag.DEPLOYMENT_HANDLE_API_USED.record("1") elif self.__class__ == RayServeHandle: - tag = "SERVE_RAY_SERVE_HANDLE_API_USED" + ServeUsageTag.RAY_SERVE_HANDLE_API_USED.record("1") else: - tag = "SERVE_RAY_SERVE_SYNC_HANDLE_API_USED" + ServeUsageTag.RAY_SERVE_SYNC_HANDLE_API_USED.record("1") - record_serve_tag(tag, "1") self._recorded_telemetry = True def _set_request_protocol(self, request_protocol: RequestProtocol): @@ -456,7 +455,7 @@ async def _to_object_ref_or_gen( # `_record_telemetry` is used to filter other API calls that go through # this path as well as calls from the proxy. if _record_telemetry: - record_serve_tag("SERVE_DEPLOYMENT_HANDLE_TO_OBJECT_REF_API_USED", "1") + ServeUsageTag.DEPLOYMENT_HANDLE_TO_OBJECT_REF_API_USED.record("1") return await self._assign_request_task def _to_object_ref_or_gen_sync( @@ -470,7 +469,7 @@ def _to_object_ref_or_gen_sync( ) if _record_telemetry: - record_serve_tag("SERVE_DEPLOYMENT_HANDLE_TO_OBJECT_REF_API_USED", "1") + ServeUsageTag.DEPLOYMENT_HANDLE_TO_OBJECT_REF_API_USED.record("1") return self._object_ref_future.result() diff --git a/python/ray/serve/multiplex.py b/python/ray/serve/multiplex.py index 18c17eb86154..ea45fa565a88 100644 --- a/python/ray/serve/multiplex.py +++ b/python/ray/serve/multiplex.py @@ -16,7 +16,8 @@ get_internal_replica_context, ) from ray.serve._private.common import MultiplexedReplicaInfo -from ray.serve._private.utils import MetricsPusher, record_serve_tag +from ray.serve._private.usage import ServeUsageTag +from ray.serve._private.utils import MetricsPusher from ray.serve import metrics @@ -53,7 +54,7 @@ def __init__( per replica. """ - record_serve_tag("SERVE_MULTIPLEXED_API_USED", "1") + ServeUsageTag.MULTIPLEXED_API_USED.record("1") self.models = OrderedDict() self._func: Callable = model_load_func diff --git a/python/ray/serve/tests/test_telemetry.py b/python/ray/serve/tests/test_telemetry.py index 893d31fd97bc..1514115ff04c 100644 --- a/python/ray/serve/tests/test_telemetry.py +++ b/python/ray/serve/tests/test_telemetry.py @@ -8,21 +8,22 @@ from starlette.requests import Request import ray +from ray.dag.input_node import InputNode from ray._private.test_utils import wait_for_condition from ray._private.usage import usage_lib from ray import serve -from ray.dag.input_node import InputNode +from ray.serve.context import get_global_client from ray.serve.drivers import DefaultgRPCDriver, DAGDriver from ray.serve.http_adapters import json_request +from ray.serve.schema import ServeDeploySchema +from ray.serve._private.common import ApplicationStatus from ray.serve._private.constants import ( - SERVE_NAMESPACE, SERVE_DEFAULT_APP_NAME, SERVE_MULTIPLEXED_MODEL_ID, + SERVE_NAMESPACE, ) -from ray.serve.context import get_global_client -from ray.serve._private.common import ApplicationStatus -from ray.serve.schema import ServeDeploySchema +from ray.serve._private.usage import ServeUsageTag TELEMETRY_ROUTE_PREFIX = "/telemetry" @@ -154,18 +155,18 @@ async def app2(self): ) report = ray.get(storage_handle.get_report.remote()) - # Check all telemetry relevant to the Serve apps on this cluster - assert int(report["extra_usage_tags"]["serve_fastapi_used"]) == 1 - assert report["extra_usage_tags"]["serve_api_version"] == "v2" - assert int(report["extra_usage_tags"]["serve_num_apps"]) == 2 - assert int(report["extra_usage_tags"]["serve_num_deployments"]) == 2 - assert int(report["extra_usage_tags"]["serve_num_gpu_deployments"]) == 0 + # Check all telemetry relevant to the Serve apps on this cluster. + assert int(ServeUsageTag.FASTAPI_USED.get_value_from_report(report)) == 1 + assert ServeUsageTag.API_VERSION.get_value_from_report(report) == "v2" + assert int(ServeUsageTag.NUM_APPS.get_value_from_report(report)) == 2 + assert int(ServeUsageTag.NUM_DEPLOYMENTS.get_value_from_report(report)) == 2 + assert int(ServeUsageTag.NUM_GPU_DEPLOYMENTS.get_value_from_report(report)) == 0 - # Check that Serve telemetry not relevant to the running apps is omitted - assert "serve_dag_driver_used" not in report - assert "serve_http_adapter_used" not in report - assert "serve_grpc_ingress_used" not in report - assert "serve_rest_api_version" not in report + # Check that Serve telemetry not relevant to the running apps is omitted. + assert ServeUsageTag.DAG_DRIVER_USED.get_value_from_report(report) is None + assert ServeUsageTag.HTTP_ADAPTER_USED.get_value_from_report(report) is None + assert ServeUsageTag.GRPC_INGRESS_USED.get_value_from_report(report) is None + assert ServeUsageTag.REST_API_VERSION.get_value_from_report(report) is None def test_grpc_detected(manage_ray): @@ -200,17 +201,17 @@ def greeter(inputs: Dict[str, bytes]): report = ray.get(storage_handle.get_report.remote()) # Check all telemetry relevant to the Serve apps on this cluster - assert int(report["extra_usage_tags"]["serve_grpc_ingress_used"]) == 1 - assert report["extra_usage_tags"]["serve_api_version"] == "v2" - assert int(report["extra_usage_tags"]["serve_num_apps"]) == 2 - assert int(report["extra_usage_tags"]["serve_num_deployments"]) == 3 - assert int(report["extra_usage_tags"]["serve_num_gpu_deployments"]) == 0 + assert ServeUsageTag.GRPC_INGRESS_USED.get_value_from_report(report) == "1" + assert ServeUsageTag.API_VERSION.get_value_from_report(report) == "v2" + assert int(ServeUsageTag.NUM_APPS.get_value_from_report(report)) == 2 + assert int(ServeUsageTag.NUM_DEPLOYMENTS.get_value_from_report(report)) == 3 + assert int(ServeUsageTag.NUM_GPU_DEPLOYMENTS.get_value_from_report(report)) == 0 # Check that Serve telemetry not relevant to the running apps is omitted - assert "serve_dag_driver_used" not in report - assert "serve_http_adapter_used" not in report - assert "serve_fastapi_used" not in report - assert "serve_rest_api_version" not in report + assert ServeUsageTag.DAG_DRIVER_USED.get_value_from_report(report) is None + assert ServeUsageTag.HTTP_ADAPTER_USED.get_value_from_report(report) is None + assert ServeUsageTag.FASTAPI_USED.get_value_from_report(report) is None + assert ServeUsageTag.REST_API_VERSION.get_value_from_report(report) is None @pytest.mark.parametrize("use_adapter", [True, False]) @@ -250,20 +251,20 @@ def greeter(input): report = ray.get(storage_handle.get_report.remote()) # Check all telemetry relevant to the Serve apps on this cluster - assert int(report["extra_usage_tags"]["serve_dag_driver_used"]) == 1 - assert report["extra_usage_tags"]["serve_api_version"] == "v2" - assert int(report["extra_usage_tags"]["serve_num_apps"]) == 2 - assert int(report["extra_usage_tags"]["serve_num_deployments"]) == 3 - assert int(report["extra_usage_tags"]["serve_num_gpu_deployments"]) == 0 + assert ServeUsageTag.DAG_DRIVER_USED.get_value_from_report(report) == "1" + assert ServeUsageTag.API_VERSION.get_value_from_report(report) == "v2" + assert int(ServeUsageTag.NUM_APPS.get_value_from_report(report)) == 2 + assert int(ServeUsageTag.NUM_DEPLOYMENTS.get_value_from_report(report)) == 3 + assert int(ServeUsageTag.NUM_GPU_DEPLOYMENTS.get_value_from_report(report)) == 0 if use_adapter: - assert int(report["extra_usage_tags"]["serve_http_adapter_used"]) == 1 + assert ServeUsageTag.HTTP_ADAPTER_USED.get_value_from_report(report) == "1" # Check that Serve telemetry not relevant to the running apps is omitted - assert "serve_fastapi_used" not in report - assert "serve_grpc_ingress_used" not in report - assert "serve_rest_api_version" not in report + assert ServeUsageTag.FASTAPI_USED.get_value_from_report(report) is None + assert ServeUsageTag.GRPC_INGRESS_USED.get_value_from_report(report) is None + assert ServeUsageTag.REST_API_VERSION.get_value_from_report(report) is None if not use_adapter: - assert "serve_http_adapter_used" not in report + assert ServeUsageTag.HTTP_ADAPTER_USED.get_value_from_report(report) is None @serve.deployment @@ -335,27 +336,23 @@ def test_rest_api(manage_ray, tmp_dir, version): report = ray.get(storage.get_report.remote()) # Check all telemetry relevant to the Serve apps on this cluster - assert report["extra_usage_tags"]["serve_rest_api_version"] == version - assert report["extra_usage_tags"]["serve_api_version"] == "v2" - assert int(report["extra_usage_tags"]["serve_num_gpu_deployments"]) == 0 + assert ServeUsageTag.REST_API_VERSION.get_value_from_report(report) == version + assert ServeUsageTag.API_VERSION.get_value_from_report(report) == "v2" + assert int(ServeUsageTag.NUM_GPU_DEPLOYMENTS.get_value_from_report(report)) == 0 if version == "v1": - assert int(report["extra_usage_tags"]["serve_num_apps"]) == 1 - assert int(report["extra_usage_tags"]["serve_num_deployments"]) == 1 + assert int(ServeUsageTag.NUM_APPS.get_value_from_report(report)) == 1 + assert int(ServeUsageTag.NUM_DEPLOYMENTS.get_value_from_report(report)) == 1 elif version == "v2": - # Assert num of deployments from controller + # Assert num of deployments from controller. assert len(client.get_all_deployment_statuses()) == 2 - result = usage_lib.get_extra_usage_tags_to_report( - ray.experimental.internal_kv.internal_kv_get_gcs_client() - ) - assert int(result["serve_num_deployments"]) == 2 - assert int(report["extra_usage_tags"]["serve_num_apps"]) == 2 - assert int(report["extra_usage_tags"]["serve_num_deployments"]) == 2 + assert int(ServeUsageTag.NUM_APPS.get_value_from_report(report)) == 2 + assert int(ServeUsageTag.NUM_DEPLOYMENTS.get_value_from_report(report)) == 2 # Check that Serve telemetry not relevant to the running apps is omitted - assert "serve_fastapi_used" not in report - assert "serve_grpc_ingress_used" not in report - assert "serve_http_adapter_used" not in report - assert "serve_dag_driver_used" not in report + assert ServeUsageTag.FASTAPI_USED.get_value_from_report(report) is None + assert ServeUsageTag.GRPC_INGRESS_USED.get_value_from_report(report) is None + assert ServeUsageTag.HTTP_ADAPTER_USED.get_value_from_report(report) is None + assert ServeUsageTag.DAG_DRIVER_USED.get_value_from_report(report) is None # Check that app deletions are tracked in v2 if version == "v2": @@ -376,16 +373,16 @@ def test_rest_api(manage_ray, tmp_dir, version): wait_for_condition( lambda: int( - ray.get(storage.get_report.remote())["extra_usage_tags"][ - "serve_num_apps" - ] + ServeUsageTag.NUM_APPS.get_value_from_report( + ray.get(storage.get_report.remote()) + ) ) == 1, timeout=15, ) report = ray.get(storage.get_report.remote()) - assert int(report["extra_usage_tags"]["serve_num_apps"]) == 1 - assert int(report["extra_usage_tags"]["serve_num_deployments"]) == 1 + assert int(ServeUsageTag.NUM_APPS.get_value_from_report(report)) == 1 + assert int(ServeUsageTag.NUM_DEPLOYMENTS.get_value_from_report(report)) == 1 @serve.deployment(ray_actor_options={"num_cpus": 0}) @@ -413,11 +410,11 @@ def test_lightweight_config_options(manage_ray, lightweight_option, value): Check that lightweight config options are detected by telemetry. """ - lightweight_tagkeys = [ - "serve_num_replicas_lightweight_updated", - "serve_user_config_lightweight_updated", - "serve_autoscaling_config_lightweight_updated", - ] + lightweight_tagkeys = { + "num_replicas": ServeUsageTag.NUM_REPLICAS_LIGHTWEIGHT_UPDATED, + "user_config": ServeUsageTag.USER_CONFIG_LIGHTWEIGHT_UPDATED, + "autoscaling_config": ServeUsageTag.AUTOSCALING_CONFIG_LIGHTWEIGHT_UPDATED, + } subprocess.check_output(["ray", "start", "--head"]) wait_for_condition(check_ray_started, timeout=5) @@ -457,10 +454,10 @@ def test_lightweight_config_options(manage_ray, lightweight_option, value): report = ray.get(storage.get_report.remote()) # Check - assert int(report["extra_usage_tags"]["serve_num_apps"]) == 2 - assert report["extra_usage_tags"]["serve_api_version"] == "v2" - for tagkey in lightweight_tagkeys: - assert tagkey not in report["extra_usage_tags"] + assert int(ServeUsageTag.NUM_APPS.get_value_from_report(report)) == 2 + assert ServeUsageTag.API_VERSION.get_value_from_report(report) == "v2" + for tagkey in lightweight_tagkeys.values(): + assert tagkey.get_value_from_report(report) is None # Change config and deploy again config["applications"][1]["deployments"][0][lightweight_option] = value @@ -478,18 +475,18 @@ def test_lightweight_config_options(manage_ray, lightweight_option, value): # Check again wait_for_condition( - lambda: ray.get(storage.get_report.remote())["extra_usage_tags"][ - f"serve_{lightweight_option}_lightweight_updated" - ] + lambda: lightweight_tagkeys[lightweight_option].get_value_from_report( + ray.get(storage.get_report.remote()) + ) == "True", timeout=5, ) report = ray.get(storage.get_report.remote()) - assert int(report["extra_usage_tags"]["serve_num_apps"]) == 2 - assert report["extra_usage_tags"]["serve_api_version"] == "v2" - for tagkey in lightweight_tagkeys: - if not tagkey == f"serve_{lightweight_option}_lightweight_updated": - assert tagkey not in report["extra_usage_tags"] + assert int(ServeUsageTag.NUM_APPS.get_value_from_report(report)) == 2 + assert ServeUsageTag.API_VERSION.get_value_from_report(report) == "v2" + for tagkey in lightweight_tagkeys.values(): + if tagkey != lightweight_tagkeys[lightweight_option]: + assert tagkey.get_value_from_report(report) is None @pytest.mark.parametrize("use_new_handle_api", [False, True]) @@ -507,9 +504,14 @@ def test_handle_apis_detected(manage_ray, use_new_handle_api, call_in_deployment report = ray.get(storage_handle.get_report.remote()) print(report["extra_usage_tags"]) - assert "serve_deployment_handle_api_used" not in report["extra_usage_tags"] - assert "serve_ray_serve_handle_api_used" not in report["extra_usage_tags"] - assert "serve_ray_serve_sync_handle_api_used" not in report["extra_usage_tags"] + assert ( + ServeUsageTag.DEPLOYMENT_HANDLE_API_USED.get_value_from_report(report) is None + ) + assert ServeUsageTag.RAY_SERVE_HANDLE_API_USED.get_value_from_report(report) is None + assert ( + ServeUsageTag.RAY_SERVE_SYNC_HANDLE_API_USED.get_value_from_report(report) + is None + ) @serve.deployment class Downstream: @@ -546,18 +548,18 @@ def check_telemetry(): print(report["extra_usage_tags"]) if use_new_handle_api: assert ( - report["extra_usage_tags"].get("serve_deployment_handle_api_used", "0") + ServeUsageTag.DEPLOYMENT_HANDLE_API_USED.get_value_from_report(report) == "1" ) elif call_in_deployment: assert ( - report["extra_usage_tags"].get("serve_ray_serve_handle_api_used", "0") + ServeUsageTag.RAY_SERVE_HANDLE_API_USED.get_value_from_report(report) == "1" ) else: assert ( - report["extra_usage_tags"].get( - "serve_ray_serve_sync_handle_api_used", "0" + ServeUsageTag.RAY_SERVE_SYNC_HANDLE_API_USED.get_value_from_report( + report ) == "1" ) @@ -581,8 +583,10 @@ def test_deployment_handle_to_obj_ref_detected(manage_ray, mode): report = ray.get(storage_handle.get_report.remote()) print(report["extra_usage_tags"]) assert ( - "serve_deployment_handle_to_object_ref_api_used" - not in report["extra_usage_tags"] + ServeUsageTag.DEPLOYMENT_HANDLE_TO_OBJECT_REF_API_USED.get_value_from_report( + report + ) + is None ) @serve.deployment @@ -622,18 +626,11 @@ async def __call__(self, call_downstream=False): def check_telemetry(tag_should_be_set: bool): report = ray.get(storage_handle.get_report.remote()) print(report["extra_usage_tags"]) + tag = ServeUsageTag.DEPLOYMENT_HANDLE_TO_OBJECT_REF_API_USED if tag_should_be_set: - assert ( - report["extra_usage_tags"].get( - "serve_deployment_handle_to_object_ref_api_used", "0" - ) - == "1" - ) + assert tag.get_value_from_report(report) == "1" else: - assert ( - "serve_deployment_handle_to_object_ref_api_used" - not in report["extra_usage_tags"] - ) + assert tag.get_value_from_report(report) is None return True @@ -668,7 +665,7 @@ async def __call__(self, request): lambda: ray.get(storage_handle.get_reports_received.remote()) > 0, timeout=5 ) report = ray.get(storage_handle.get_report.remote()) - assert "serve_multiplexed_api_used" not in report["extra_usage_tags"] + assert ServeUsageTag.MULTIPLEXED_API_USED.get_value_from_report(report) is None client = get_global_client() wait_for_condition( @@ -687,9 +684,9 @@ async def __call__(self, request): wait_for_condition( lambda: int( - ray.get(storage_handle.get_report.remote())["extra_usage_tags"][ - "serve_multiplexed_api_used" - ] + ServeUsageTag.MULTIPLEXED_API_USED.get_value_from_report( + ray.get(storage_handle.get_report.remote()) + ) ) == 1, timeout=5, From bde33fa6422c8f2ec095bf6c6860d9d1b6e8cc3d Mon Sep 17 00:00:00 2001 From: Yunxuan Xiao Date: Wed, 23 Aug 2023 01:02:21 -0700 Subject: [PATCH 4/5] [Ray 2.7 Examples][3/n] Delete Obsolete Transformers GLUE Example (#38682) We clean up this outdated example which uses AccelerateTrainer API. Signed-off-by: woshiyyya --- .buildkite/pipeline.ml.yml | 23 +- doc/source/_toc.yml | 2 - doc/source/ray-overview/examples.rst | 7 - doc/source/train/examples.rst | 8 - .../transformers/transformers_example.rst | 8 - python/ray/train/BUILD | 24 - .../train/examples/transformers/README.rst | 57 -- .../train/examples/transformers/__init__.py | 0 .../train/examples/transformers/cluster.yaml | 58 -- .../transformers/transformers_example.py | 629 ------------------ 10 files changed, 12 insertions(+), 804 deletions(-) delete mode 100644 doc/source/train/examples/transformers/transformers_example.rst delete mode 100644 python/ray/train/examples/transformers/README.rst delete mode 100644 python/ray/train/examples/transformers/__init__.py delete mode 100644 python/ray/train/examples/transformers/cluster.yaml delete mode 100644 python/ray/train/examples/transformers/transformers_example.py diff --git a/.buildkite/pipeline.ml.yml b/.buildkite/pipeline.ml.yml index 908c0de30808..3eed205e90ce 100644 --- a/.buildkite/pipeline.ml.yml +++ b/.buildkite/pipeline.ml.yml @@ -356,17 +356,18 @@ --test_env=RAY_AIR_NEW_PERSISTENCE_MODE=1 python/ray/train/... -- label: ":steam_locomotive: :octopus: :floppy_disk: New persistence mode: Train + Tune tests and examples" - conditions: ["NO_WHEELS_REQUIRED", "RAY_CI_TRAIN_AFFECTED"] - instance_size: medium - commands: - - cleanup() { if [ "${BUILDKITE_PULL_REQUEST}" = "false" ]; then ./ci/build/upload_build_info.sh; fi }; trap cleanup EXIT - - TRAIN_TESTING=1 TUNE_TESTING=1 ./ci/env/install-dependencies.sh - - ./ci/env/env_info.sh - - bazel test --config=ci $(./ci/run/bazel_export_options) --build_tests_only - --test_tag_filters=tune,-gpu_only,-ray_air,-gpu,-doctest,-no_new_storage - --test_env=RAY_AIR_NEW_PERSISTENCE_MODE=1 - python/ray/train/... +# TODO(krfricke): Add new test for this suite +# - label: ":steam_locomotive: :octopus: :floppy_disk: New persistence mode: Train + Tune tests and examples" +# conditions: ["NO_WHEELS_REQUIRED", "RAY_CI_TRAIN_AFFECTED"] +# instance_size: medium +# commands: +# - cleanup() { if [ "${BUILDKITE_PULL_REQUEST}" = "false" ]; then ./ci/build/upload_build_info.sh; fi }; trap cleanup EXIT +# - TRAIN_TESTING=1 TUNE_TESTING=1 ./ci/env/install-dependencies.sh +# - ./ci/env/env_info.sh +# - bazel test --config=ci $(./ci/run/bazel_export_options) --build_tests_only +# --test_tag_filters=tune,-gpu_only,-ray_air,-gpu,-doctest,-no_new_storage +# --test_env=RAY_AIR_NEW_PERSISTENCE_MODE=1 +# python/ray/train/... - label: ":octopus: :floppy_disk: New persistence mode: Tune tests and examples (small)" diff --git a/doc/source/_toc.yml b/doc/source/_toc.yml index 606cbbbf9674..324fc9eb9242 100644 --- a/doc/source/_toc.yml +++ b/doc/source/_toc.yml @@ -93,8 +93,6 @@ parts: title: "PyTorch Lightning Advanced Example" - file: train/examples/lightning/lightning_exp_tracking title: "PyTorch Lightning with Experiment Tracking Tools" - - file: train/examples/transformers/transformers_example - title: "HF Transformers Example" - file: train/examples/tf/tensorflow_mnist_example title: "TensorFlow MNIST Example" - file: train/examples/horovod/horovod_example diff --git a/doc/source/ray-overview/examples.rst b/doc/source/ray-overview/examples.rst index e068af3d03e0..7038821f5a64 100644 --- a/doc/source/ray-overview/examples.rst +++ b/doc/source/ray-overview/examples.rst @@ -557,13 +557,6 @@ Ray Examples PyTorch Fashion MNIST Training Example - .. grid-item-card:: :bdg-secondary:`Code example` - :class-item: gallery-item pytorch training train nlp - :link: train_transformers_example - :link-type: ref - - Transformers with PyTorch Training Example - .. grid-item-card:: :bdg-secondary:`Code example` :class-item: gallery-item tensorflow training train cv :link: tensorflow_mnist_example diff --git a/doc/source/train/examples.rst b/doc/source/train/examples.rst index 901a3b6f87ee..bcc21e64e7b9 100644 --- a/doc/source/train/examples.rst +++ b/doc/source/train/examples.rst @@ -25,14 +25,6 @@ Distributed Training Examples using Ray Train PyTorch Fashion MNIST Training Example - .. grid-item-card:: - :img-top: /images/hugging.png - :class-img-top: pt-2 w-75 d-block mx-auto fixed-height-img - - .. button-ref:: train_transformers_example - - Transformers with PyTorch Training Example - .. grid-item-card:: :img-top: /images/tf_logo.png :class-img-top: pt-2 w-75 d-block mx-auto fixed-height-img diff --git a/doc/source/train/examples/transformers/transformers_example.rst b/doc/source/train/examples/transformers/transformers_example.rst deleted file mode 100644 index 7f7eeb4547fc..000000000000 --- a/doc/source/train/examples/transformers/transformers_example.rst +++ /dev/null @@ -1,8 +0,0 @@ -:orphan: - -.. _train_transformers_example : - -Ray Train Example for HuggingFace Transformers with PyTorch -=========================================================== - -.. literalinclude:: /../../python/ray/train/examples/transformers/transformers_example.py diff --git a/python/ray/train/BUILD b/python/ray/train/BUILD index eea6c71589be..a2cb7dcb0d3d 100644 --- a/python/ray/train/BUILD +++ b/python/ray/train/BUILD @@ -73,30 +73,6 @@ py_test( deps = [":train_lib"] ) -py_test( - name = "transformers_example_gpu", - size = "medium", - main = "examples/transformers/transformers_example.py", - srcs = ["examples/transformers/transformers_example.py"], - tags = ["team:ml", "exclusive", "tune", "gpu_only"], - deps = [":train_lib"], - args = ["--model_name_or_path=bert-base-cased", "--task_name=mrpc", - "--max_length=32", "--per_device_train_batch_size=64", - "--max_train_steps=2", "--start_local", "--num_workers=2", "--use_gpu"] -) - -py_test( - name = "transformers_example_cpu", - size = "medium", - main = "examples/transformers/transformers_example.py", - srcs = ["examples/transformers/transformers_example.py"], - tags = ["team:ml", "exclusive", "tune"], - deps = [":train_lib"], - args = ["--model_name_or_path=bert-base-cased", "--task_name=mrpc", - "--max_length=32", "--per_device_train_batch_size=64", - "--max_train_steps=2", "--start_local", "--num_workers=2"] -) - py_test( name = "tune_cifar_torch_pbt_example", size = "medium", diff --git a/python/ray/train/examples/transformers/README.rst b/python/ray/train/examples/transformers/README.rst deleted file mode 100644 index f0435e4f4474..000000000000 --- a/python/ray/train/examples/transformers/README.rst +++ /dev/null @@ -1,57 +0,0 @@ -HuggingFace Transformers Glue Fine-tuning Example -================================================= - -We've ported the ``huggingface/transformers/examples/pytorch/text-classification/run_glue_no_trainer.py`` example to -Ray Train. This example enables fine-tuning the library models for sequence classification on the GLUE benchmark: General Language Understanding Evaluation. - -This script can fine-tune the following models: - CoLA, SST-2, MRPC, STS-B, QQP, MNLI, QNLI, RTE, WNLI. - -Additional information can be found at the `HuggingFace Repository -`_. - -Local process training ----------------------- - -To run an example tuning MRPC locally, without Ray: - -.. code-block:: bash - - export TASK_NAME=mrpc - - python transformers_example.py \ - --model_name_or_path bert-base-cased \ - --task_name $TASK_NAME \ - --max_length 128 \ - --per_device_train_batch_size 32 \ - --learning_rate 2e-5 \ - --num_train_epochs 3 \ - --output_dir /tmp/$TASK_NAME/ - -This is the same as running `run_glue_no_trainer.py `_. - -Distributed multi-node GPU training ------------------------------------ - -To run an example tuning MRPC on AWS with 8 GPUs across multiple nodes: - -.. code-block:: bash - - export TASK_NAME=mrpc - - ray up cluster.yaml - # (Optional) ray monitor cluster.yaml - ray submit cluster.yaml transformers_example.py \ - --model_name_or_path bert-base-cased \ - --task_name $TASK_NAME \ - --max_length 128 \ - --per_device_train_batch_size 32 \ - --learning_rate 2e-5 \ - --num_train_epochs 3 \ - --output_dir /tmp/$TASK_NAME/ \ - --address auto \ - --num_workers 8 \ - --use_gpu - -The example can also be run using :ref:`Ray Job Submission `, which is in beta starting with Ray 1.12. \ No newline at end of file diff --git a/python/ray/train/examples/transformers/__init__.py b/python/ray/train/examples/transformers/__init__.py deleted file mode 100644 index e69de29bb2d1..000000000000 diff --git a/python/ray/train/examples/transformers/cluster.yaml b/python/ray/train/examples/transformers/cluster.yaml deleted file mode 100644 index 72e8676e0198..000000000000 --- a/python/ray/train/examples/transformers/cluster.yaml +++ /dev/null @@ -1,58 +0,0 @@ -# An unique identifier for the head node and workers of this cluster. -cluster_name: transformer-cluster - -# The maximum number of workers nodes to launch in addition to the head -# node. This takes precedence over min_workers. min_workers default to 0. -min_workers: 3 -max_workers: 3 - -# Cloud-provider specific configuration. -provider: - type: aws - region: us-west-2 - -# How Ray will authenticate with newly launched nodes. -auth: - ssh_user: ubuntu - -available_node_types: - ray.head.default: - min_workers: 0 - max_workers: 0 - resources: {} - node_config: - InstanceType: g3.8xlarge - ImageId: latest_dlami - InstanceMarketOptions: - MarketType: spot - BlockDeviceMappings: - - DeviceName: /dev/sda1 - Ebs: - VolumeSize: 300 - - - ray.worker.default: - min_workers: 3 - max_workers: 3 - resources: {} - node_config: - InstanceType: g3.8xlarge - ImageId: latest_dlami - InstanceMarketOptions: - MarketType: spot - BlockDeviceMappings: - - DeviceName: /dev/sda1 - Ebs: - VolumeSize: 300 - - -setup_commands: - # This replaces the standard anaconda Ray installation - - pip install ray[tune] - - pip install -U https://s3-us-west-2.amazonaws.com/ray-wheels/latest/ray-3.0.0.dev0-cp37-cp37m-manylinux2014_x86_64.whl - - # Install Transformers - - git clone https://github.com/huggingface/transformers || true - - cd transformers && - pip install -U . && - pip install -r ./examples/pytorch/text-classification/requirements.txt diff --git a/python/ray/train/examples/transformers/transformers_example.py b/python/ray/train/examples/transformers/transformers_example.py deleted file mode 100644 index 48d2e3ca8a9e..000000000000 --- a/python/ray/train/examples/transformers/transformers_example.py +++ /dev/null @@ -1,629 +0,0 @@ -# coding=utf-8 -# This is a modified example originally from The HuggingFace Inc. team. -# Modified by Matthew Deng. -# Copyright 2021 The HuggingFace Inc. team. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -""" Finetuning a πŸ€— Transformers model for sequence classification on GLUE.""" -import argparse -import logging -import math -import os -import random -from typing import Any, Dict - -import datasets -import transformers -from accelerate import Accelerator -from datasets import load_dataset, load_metric -from torch.utils.data.dataloader import DataLoader -from tqdm.auto import tqdm -from transformers import ( - AdamW, - AutoConfig, - AutoModelForSequenceClassification, - AutoTokenizer, - DataCollatorWithPadding, - PretrainedConfig, - SchedulerType, - default_data_collator, - get_scheduler, - set_seed, -) -from transformers.utils.versions import require_version - -import ray -from ray.train.huggingface import AccelerateTrainer -from ray.train import ScalingConfig - -logger = logging.getLogger(__name__) - -require_version( - "datasets>=1.8.0", - "To fix: pip install -r examples/pytorch/text-classification/requirements.txt", -) - -task_to_keys = { - "cola": ("sentence", None), - "mnli": ("premise", "hypothesis"), - "mrpc": ("sentence1", "sentence2"), - "qnli": ("question", "sentence"), - "qqp": ("question1", "question2"), - "rte": ("sentence1", "sentence2"), - "sst2": ("sentence", None), - "stsb": ("sentence1", "sentence2"), - "wnli": ("sentence1", "sentence2"), -} - - -def parse_args(): - parser = argparse.ArgumentParser( - description="Finetune a transformers model on a text classification task" - ) - parser.add_argument( - "--task_name", - type=str, - default=None, - help="The name of the glue task to train on.", - choices=list(task_to_keys.keys()), - ) - parser.add_argument( - "--train_file", - type=str, - default=None, - help="A csv or a json file containing the training data.", - ) - parser.add_argument( - "--validation_file", - type=str, - default=None, - help="A csv or a json file containing the validation data.", - ) - parser.add_argument( - "--max_length", - type=int, - default=128, - help=( - "The maximum total input sequence length after tokenization. " - "Sequences longer than this will be truncated, sequences shorter " - "will be padded if `--pad_to_max_lengh` is passed." - ), - ) - parser.add_argument( - "--pad_to_max_length", - action="store_true", - help="If passed, pad all samples to `max_length`. Otherwise, dynamic " - "padding is used.", - ) - parser.add_argument( - "--model_name_or_path", - type=str, - help="Path to pretrained model or model identifier from " - "huggingface.co/models.", - required=True, - ) - parser.add_argument( - "--use_slow_tokenizer", - action="store_true", - help="If passed, will use a slow tokenizer (not backed by the πŸ€— " - "Tokenizers library).", - ) - parser.add_argument( - "--per_device_train_batch_size", - type=int, - default=8, - help="Batch size (per device) for the training dataloader.", - ) - parser.add_argument( - "--per_device_eval_batch_size", - type=int, - default=8, - help="Batch size (per device) for the evaluation dataloader.", - ) - parser.add_argument( - "--learning_rate", - type=float, - default=5e-5, - help="Initial learning rate (after the potential warmup period) to use.", - ) - parser.add_argument( - "--weight_decay", type=float, default=0.0, help="Weight decay to use." - ) - parser.add_argument( - "--num_train_epochs", - type=int, - default=3, - help="Total number of training epochs to perform.", - ) - parser.add_argument( - "--max_train_steps", - type=int, - default=None, - help="Total number of training steps to perform. If provided, " - "overrides num_train_epochs.", - ) - parser.add_argument( - "--gradient_accumulation_steps", - type=int, - default=1, - help="Number of updates steps to accumulate before performing a " - "backward/update pass.", - ) - parser.add_argument( - "--lr_scheduler_type", - type=SchedulerType, - default="linear", - help="The scheduler type to use.", - choices=[ - "linear", - "cosine", - "cosine_with_restarts", - "polynomial", - "constant", - "constant_with_warmup", - ], - ) - parser.add_argument( - "--num_warmup_steps", - type=int, - default=0, - help="Number of steps for the warmup in the lr scheduler.", - ) - parser.add_argument( - "--output_dir", type=str, default=None, help="Where to store the final model." - ) - parser.add_argument( - "--seed", type=int, default=None, help="A seed for reproducible training." - ) - - # Ray arguments. - parser.add_argument( - "--start_local", action="store_true", help="Starts Ray on local machine." - ) - parser.add_argument( - "--address", type=str, default=None, help="Ray address to connect to." - ) - parser.add_argument( - "--num_workers", type=int, default=1, help="Number of workers to use." - ) - parser.add_argument( - "--use_gpu", action="store_true", help="If training should be done on GPUs." - ) - - args = parser.parse_args() - - # Sanity checks - if ( - args.task_name is None - and args.train_file is None - and args.validation_file is None - ): - raise ValueError("Need either a task name or a training/validation file.") - else: - if args.train_file is not None: - extension = args.train_file.split(".")[-1] - assert extension in [ - "csv", - "json", - ], "`train_file` should be a csv or a json file." - if args.validation_file is not None: - extension = args.validation_file.split(".")[-1] - assert extension in [ - "csv", - "json", - ], "`validation_file` should be a csv or a json file." - - if args.output_dir is not None: - os.makedirs(args.output_dir, exist_ok=True) - - return args - - -def train_func(config: Dict[str, Any]): - args = config["args"] - # Initialize the accelerator. We will let the accelerator handle device - # placement for us in this example. - accelerator = Accelerator(cpu=not args.use_gpu) - # Make one log on every process with the configuration for debugging. - logging.basicConfig( - format="%(asctime)s - %(levelname)s - %(name)s - %(message)s", - datefmt="%m/%d/%Y %H:%M:%S", - level=logging.INFO, - ) - logger.info(accelerator.state) - - # Setup logging, we only want one process per machine to log things on - # the screen. accelerator.is_local_main_process is only True for one - # process per machine. - logger.setLevel( - logging.INFO if accelerator.is_local_main_process else logging.ERROR - ) - if accelerator.is_local_main_process: - datasets.utils.logging.set_verbosity_warning() - transformers.utils.logging.set_verbosity_info() - else: - datasets.utils.logging.set_verbosity_error() - transformers.utils.logging.set_verbosity_error() - - # If passed along, set the training seed now. - if args.seed is not None: - set_seed(args.seed) - - # Get the datasets: you can either provide your own CSV/JSON training and - # evaluation files (see below) or specify a GLUE benchmark task (the - # dataset will be downloaded automatically from the datasets Hub). - - # For CSV/JSON files, this script will use as labels the column called - # 'label' and as pair of sentences the sentences in columns called - # 'sentence1' and 'sentence2' if such column exists or the first two - # columns not named label if at least two columns are provided. - - # If the CSVs/JSONs contain only one non-label column, the script does - # single sentence classification on this single column. You can easily - # tweak this behavior (see below) - - # In distributed training, the load_dataset function guarantee that only - # one local process can concurrently download the dataset. - if args.task_name is not None: - # Downloading and loading a dataset from the hub. - raw_datasets = load_dataset("glue", args.task_name) - else: - # Loading the dataset from local csv or json file. - data_files = {} - if args.train_file is not None: - data_files["train"] = args.train_file - if args.validation_file is not None: - data_files["validation"] = args.validation_file - extension = ( - args.train_file if args.train_file is not None else args.valid_file - ).split(".")[-1] - raw_datasets = load_dataset(extension, data_files=data_files) - # See more about loading any type of standard or custom dataset at - # https://huggingface.co/docs/datasets/loading_datasets.html. - - # Labels - if args.task_name is not None: - is_regression = args.task_name == "stsb" - if not is_regression: - label_list = raw_datasets["train"].features["label"].names - num_labels = len(label_list) - else: - num_labels = 1 - else: - # Trying to have good defaults here, don't hesitate to tweak to your - # needs. - is_regression = raw_datasets["train"].features["label"].dtype in [ - "float32", - "float64", - ] - if is_regression: - num_labels = 1 - else: - # A useful fast method: - # https://huggingface.co/docs/datasets/package_reference/main_classes.html#datasets.Dataset.unique # noqa:E501 - label_list = raw_datasets["train"].unique("label") - label_list.sort() # Let's sort it for determinism - num_labels = len(label_list) - - # Load pretrained model and tokenizer - # - # In distributed training, the .from_pretrained methods guarantee that - # only one local process can concurrently download model & vocab. - config = AutoConfig.from_pretrained( - args.model_name_or_path, num_labels=num_labels, finetuning_task=args.task_name - ) - tokenizer = AutoTokenizer.from_pretrained( - args.model_name_or_path, use_fast=not args.use_slow_tokenizer - ) - model = AutoModelForSequenceClassification.from_pretrained( - args.model_name_or_path, - from_tf=bool(".ckpt" in args.model_name_or_path), - config=config, - ) - - # Preprocessing the datasets - if args.task_name is not None: - sentence1_key, sentence2_key = task_to_keys[args.task_name] - else: - # Again, we try to have some nice defaults but don't hesitate to - # tweak to your use case. - non_label_column_names = [ - name for name in raw_datasets["train"].column_names if name != "label" - ] - if ( - "sentence1" in non_label_column_names - and "sentence2" in non_label_column_names - ): - sentence1_key, sentence2_key = "sentence1", "sentence2" - else: - if len(non_label_column_names) >= 2: - sentence1_key, sentence2_key = non_label_column_names[:2] - else: - sentence1_key, sentence2_key = non_label_column_names[0], None - - # Some models have set the order of the labels to use, - # so let's make sure we do use it. - label_to_id = None - if ( - model.config.label2id != PretrainedConfig(num_labels=num_labels).label2id - and args.task_name is not None - and not is_regression - ): - # Some have all caps in their config, some don't. - label_name_to_id = {k.lower(): v for k, v in model.config.label2id.items()} - if list(sorted(label_name_to_id.keys())) == list( # noqa:C413 - sorted(label_list) - ): # noqa:C413 - logger.info( - f"The configuration of the model provided the following label " - f"correspondence: {label_name_to_id}. Using it!" - ) - label_to_id = { - i: label_name_to_id[label_list[i]] for i in range(num_labels) - } - else: - logger.warning( - "Your model seems to have been trained with labels, " - "but they don't match the dataset: ", - f"model labels: {list(sorted(label_name_to_id.keys()))}, " # noqa:C413,E501 - f"dataset labels: {list(sorted(label_list))}." # noqa:C413 - "\nIgnoring the model labels as a result.", - ) - elif args.task_name is None: - label_to_id = {v: i for i, v in enumerate(label_list)} - - if label_to_id is not None: - model.config.label2id = label_to_id - model.config.id2label = {id: label for label, id in config.label2id.items()} - - padding = "max_length" if args.pad_to_max_length else False - - def preprocess_function(examples): - # Tokenize the texts - texts = ( - (examples[sentence1_key],) - if sentence2_key is None - else (examples[sentence1_key], examples[sentence2_key]) - ) - result = tokenizer( - *texts, padding=padding, max_length=args.max_length, truncation=True - ) - - if "label" in examples: - if label_to_id is not None: - # Map labels to IDs (not necessary for GLUE tasks) - result["labels"] = [ - label_to_id[l] for l in examples["label"] # noqa:E741 - ] - else: - # In all cases, rename the column to labels because the model - # will expect that. - result["labels"] = examples["label"] - return result - - processed_datasets = raw_datasets.map( - preprocess_function, - batched=True, - remove_columns=raw_datasets["train"].column_names, - desc="Running tokenizer on dataset", - ) - - train_dataset = processed_datasets["train"] - eval_dataset = processed_datasets[ - "validation_matched" if args.task_name == "mnli" else "validation" - ] - - # Log a few random samples from the training set: - for index in random.sample(range(len(train_dataset)), 3): - logger.info(f"Sample {index} of the training set: {train_dataset[index]}.") - - # DataLoaders creation: - if args.pad_to_max_length: - # If padding was already done ot max length, we use the default data - # collator that will just convert everything to tensors. - data_collator = default_data_collator - else: - # Otherwise, `DataCollatorWithPadding` will apply dynamic padding for - # us (by padding to the maximum length of the samples passed). When - # using mixed precision, we add `pad_to_multiple_of=8` to pad all - # tensors to multiple of 8s, which will enable the use of Tensor - # Cores on NVIDIA hardware with compute capability >= 7.5 (Volta). - data_collator = DataCollatorWithPadding( - tokenizer, pad_to_multiple_of=(8 if accelerator.use_fp16 else None) - ) - - train_dataloader = DataLoader( - train_dataset, - shuffle=True, - collate_fn=data_collator, - batch_size=args.per_device_train_batch_size, - ) - eval_dataloader = DataLoader( - eval_dataset, - collate_fn=data_collator, - batch_size=args.per_device_eval_batch_size, - ) - - # Optimizer - # Split weights in two groups, one with weight decay and the other not. - no_decay = ["bias", "LayerNorm.weight"] - optimizer_grouped_parameters = [ - { - "params": [ - p - for n, p in model.named_parameters() - if not any(nd in n for nd in no_decay) - ], - "weight_decay": args.weight_decay, - }, - { - "params": [ - p - for n, p in model.named_parameters() - if any(nd in n for nd in no_decay) - ], - "weight_decay": 0.0, - }, - ] - optimizer = AdamW(optimizer_grouped_parameters, lr=args.learning_rate) - - # Prepare everything with our `accelerator`. - model, optimizer, train_dataloader, eval_dataloader = accelerator.prepare( - model, optimizer, train_dataloader, eval_dataloader - ) - - # Note -> the training dataloader needs to be prepared before we grab - # his length below (cause its length will be shorter in multiprocess) - - # Scheduler and math around the number of training steps. - num_update_steps_per_epoch = math.ceil( - len(train_dataloader) / args.gradient_accumulation_steps - ) - if args.max_train_steps is None: - args.max_train_steps = args.num_train_epochs * num_update_steps_per_epoch - else: - args.num_train_epochs = math.ceil( - args.max_train_steps / num_update_steps_per_epoch - ) - - lr_scheduler = get_scheduler( - name=args.lr_scheduler_type, - optimizer=optimizer, - num_warmup_steps=args.num_warmup_steps, - num_training_steps=args.max_train_steps, - ) - - # Get the metric function - if args.task_name is not None: - metric = load_metric("glue", args.task_name) - else: - metric = load_metric("accuracy") - - # Train! - total_batch_size = ( - args.per_device_train_batch_size - * accelerator.num_processes - * args.gradient_accumulation_steps - ) - - logger.info("***** Running training *****") - logger.info(f" Num examples = {len(train_dataset)}") - logger.info(f" Num Epochs = {args.num_train_epochs}") - logger.info( - f" Instantaneous batch size per device =" - f" {args.per_device_train_batch_size}" - ) - logger.info( - f" Total train batch size (w. parallel, distributed & accumulation) " - f"= {total_batch_size}" - ) - logger.info(f" Gradient Accumulation steps = {args.gradient_accumulation_steps}") - logger.info(f" Total optimization steps = {args.max_train_steps}") - # Only show the progress bar once on each machine. - progress_bar = tqdm( - range(args.max_train_steps), disable=not accelerator.is_local_main_process - ) - completed_steps = 0 - - for epoch in range(args.num_train_epochs): - model.train() - for step, batch in enumerate(train_dataloader): - outputs = model(**batch) - loss = outputs.loss - loss = loss / args.gradient_accumulation_steps - accelerator.backward(loss) - if ( - step % args.gradient_accumulation_steps == 0 - or step == len(train_dataloader) - 1 - ): - optimizer.step() - lr_scheduler.step() - optimizer.zero_grad() - progress_bar.update(1) - completed_steps += 1 - - if completed_steps >= args.max_train_steps: - break - - model.eval() - for step, batch in enumerate(eval_dataloader): - outputs = model(**batch) - predictions = ( - outputs.logits.argmax(dim=-1) - if not is_regression - else outputs.logits.squeeze() - ) - metric.add_batch( - predictions=accelerator.gather(predictions), - references=accelerator.gather(batch["labels"]), - ) - - eval_metric = metric.compute() - logger.info(f"epoch {epoch}: {eval_metric}") - - if args.output_dir is not None: - accelerator.wait_for_everyone() - unwrapped_model = accelerator.unwrap_model(model) - unwrapped_model.save_pretrained(args.output_dir, save_function=accelerator.save) - - if args.task_name == "mnli": - # Final evaluation on mismatched validation set - eval_dataset = processed_datasets["validation_mismatched"] - eval_dataloader = DataLoader( - eval_dataset, - collate_fn=data_collator, - batch_size=args.per_device_eval_batch_size, - ) - eval_dataloader = accelerator.prepare(eval_dataloader) - - model.eval() - for step, batch in enumerate(eval_dataloader): - outputs = model(**batch) - predictions = outputs.logits.argmax(dim=-1) - metric.add_batch( - predictions=accelerator.gather(predictions), - references=accelerator.gather(batch["labels"]), - ) - - eval_metric = metric.compute() - logger.info(f"mnli-mm: {eval_metric}") - - -def main(): - args = parse_args() - config = {"args": args} - - if args.start_local or args.address or args.num_workers > 1 or args.use_gpu: - if args.start_local: - # Start a local Ray runtime. - ray.init(num_cpus=args.num_workers + 2) - else: - # Connect to a Ray cluster for distributed training. - ray.init(address=args.address) - trainer = AccelerateTrainer( - train_func, - train_loop_config=config, - accelerate_config={}, - scaling_config=ScalingConfig( - num_workers=args.num_workers, use_gpu=args.use_gpu - ), - ) - results = trainer.fit() - print(results.metrics) - else: - # Run training locally. - train_func(config) - - -if __name__ == "__main__": - main() From 548f81066174b022de8c07f1a89f53c605526327 Mon Sep 17 00:00:00 2001 From: Noah Jackson Date: Wed, 23 Aug 2023 02:32:17 -0700 Subject: [PATCH 5/5] use ProfilingLink instead of tags (#37840) In addition to removing unnecessary code duplication, these changes fix the links to the stack trace and flame graph pages when behind a reverse proxy. Previously the links used in NodeRow.tsx included a leading / that is not included in other links and does not work when hosting the Ray dashboard behind a reverse proxy. --- dashboard/client/src/pages/node/NodeRow.tsx | 23 ++++++--------------- 1 file changed, 6 insertions(+), 17 deletions(-) diff --git a/dashboard/client/src/pages/node/NodeRow.tsx b/dashboard/client/src/pages/node/NodeRow.tsx index 1f9239925e92..3f5540bbc005 100644 --- a/dashboard/client/src/pages/node/NodeRow.tsx +++ b/dashboard/client/src/pages/node/NodeRow.tsx @@ -15,6 +15,10 @@ import useSWR from "swr"; import { CodeDialogButtonWithPreview } from "../../common/CodeDialogButton"; import { API_REFRESH_INTERVAL_MS } from "../../common/constants"; import { NodeLink } from "../../common/links"; +import { + CpuProfilingLink, + CpuStackTraceLink, +} from "../../common/ProfilingLink"; import rowStyles from "../../common/RowStyles"; import PercentageBar from "../../components/PercentageBar"; import { StatusChip } from "../../components/StatusChip"; @@ -254,23 +258,8 @@ export const WorkerRow = ({ node, worker }: WorkerRowProps) => { Logs
-
- Stack Trace - -
- - CPU Flame Graph - + +