Skip to content

Commit

Permalink
Revert "[Core] Add retry exception allowlist for user-defined filteri… (
Browse files Browse the repository at this point in the history
  • Loading branch information
SongGuyang authored Jul 5, 2022
1 parent 8427928 commit cf7305a
Show file tree
Hide file tree
Showing 25 changed files with 75 additions and 360 deletions.
2 changes: 0 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -184,8 +184,6 @@ venv
# Vim
.*.swp
*.swp
.*.swo
*.swo
tags
tags.lock
tags.temp
Expand Down
7 changes: 1 addition & 6 deletions cpp/src/ray/runtime/task/task_executor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -127,10 +127,9 @@ Status TaskExecutor::ExecuteTask(
const std::vector<rpc::ObjectReference> &arg_refs,
const std::vector<ObjectID> &return_ids,
const std::string &debugger_breakpoint,
const std::string &serialized_retry_exception_allowlist,
std::vector<std::shared_ptr<ray::RayObject>> *results,
std::shared_ptr<ray::LocalMemoryBuffer> &creation_task_exception_pb_bytes,
bool *is_retryable_error,
bool *is_application_level_error,
const std::vector<ConcurrencyGroup> &defined_concurrency_groups,
const std::string name_of_concurrency_group_to_execute) {
RAY_LOG(DEBUG) << "Execute task type: " << TaskType_Name(task_type)
Expand All @@ -142,10 +141,6 @@ Status TaskExecutor::ExecuteTask(
auto typed_descriptor = function_descriptor->As<ray::CppFunctionDescriptor>();
std::string func_name = typed_descriptor->FunctionName();
bool cross_lang = !typed_descriptor->Caller().empty();
// TODO(Clark): Support retrying application-level errors for C++.
// TODO(Clark): Support exception allowlist for retrying application-level
// errors for C++.
*is_retryable_error = false;

Status status{};
std::shared_ptr<msgpack::sbuffer> data = nullptr;
Expand Down
3 changes: 1 addition & 2 deletions cpp/src/ray/runtime/task/task_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,9 @@ class TaskExecutor {
const std::vector<rpc::ObjectReference> &arg_refs,
const std::vector<ObjectID> &return_ids,
const std::string &debugger_breakpoint,
const std::string &serialized_retry_exception_allowlist,
std::vector<std::shared_ptr<ray::RayObject>> *results,
std::shared_ptr<ray::LocalMemoryBuffer> &creation_task_exception_pb_bytes,
bool *is_retryable_error,
bool *is_application_level_error,
const std::vector<ConcurrencyGroup> &defined_concurrency_groups,
const std::string name_of_concurrency_group_to_execute);

Expand Down
91 changes: 0 additions & 91 deletions doc/source/ray-core/doc_code/tasks_fault_tolerance.py

This file was deleted.

63 changes: 26 additions & 37 deletions doc/source/ray-core/tasks/fault-tolerance.rst
Original file line number Diff line number Diff line change
@@ -1,14 +1,8 @@
.. _task-fault-tolerance:

===============
Fault Tolerance
===============

.. _task-retries:

Retries
=======

When a worker is executing a task, if the worker dies unexpectedly, either
because the process crashed or because the machine failed, Ray will rerun
the task until either the task succeeds or the maximum number of retries is
Expand All @@ -21,40 +15,35 @@ using :ref:`runtime environments<runtime-environments>`.

You can experiment with this behavior by running the following code.

.. literalinclude:: ../doc_code/tasks_fault_tolerance.py
:language: python
:start-after: __tasks_fault_tolerance_retries_begin__
:end-before: __tasks_fault_tolerance_retries_end__

You can also control whether application-level errors are retried, and even **which**
application-level errors are retried, via the ``retry_exceptions`` argument. This is
``False`` by default, so if your application code within the Ray task raises an
exception, this failure will **not** be retried. This is to ensure that Ray is not
retrying non-idempotent tasks when they have partially executed.
However, if your tasks are idempotent, then you can enable application-level error
retries with ``retry_exceptions=True``, or even retry a specific set of
application-level errors (such as a class of exception types that you know to be
transient) by providing an allowlist of exceptions:

.. literalinclude:: ../doc_code/tasks_fault_tolerance.py
:language: python
:start-after: __tasks_fault_tolerance_retries_exception_begin__
:end-before: __tasks_fault_tolerance_retries_exception_end__

The semantics for each of the potential ``retry_exceptions`` values are as follows:

* ``retry_exceptions=False`` (default): Application-level errors are not retried.

* ``retry_exceptions=True``: All application-level errors are retried.

* ``retry_exceptions=[Exc1, Exc2]``: Application-level errors that are instances of
either ``Exc1`` or ``Exc2`` are retried.
.. code-block:: python
import numpy as np
import os
import ray
import time
ray.init(ignore_reinit_error=True)
@ray.remote(max_retries=1)
def potentially_fail(failure_probability):
time.sleep(0.2)
if np.random.random() < failure_probability:
os._exit(0)
return 0
for _ in range(3):
try:
# If this task crashes, Ray will retry it up to one additional
# time. If either of the attempts succeeds, the call to ray.get
# below will return normally. Otherwise, it will raise an
# exception.
ray.get(potentially_fail.remote(0.5))
print('SUCCESS')
except ray.exceptions.WorkerCrashedError:
print('FAILURE')
.. _object-reconstruction:

Lineage-based Object Reconstruction
===================================

Ray also implements *lineage reconstruction* to recover task outputs that are
lost from the distributed object store. This can occur during node failures.
Ray will first automatically attempt to recover the value by looking for copies
Expand Down
20 changes: 1 addition & 19 deletions python/ray/_private/ray_option_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,13 +102,6 @@ def _resource_option(name: str, default_value: Any = None):
}


def issubclass_safe(obj: Any, cls_: type) -> bool:
try:
return issubclass(obj, cls_)
except TypeError:
return False


_task_only_options = {
"max_calls": _counting_option("max_calls", False, default_value=0),
# Normal tasks may be retried on failure this many times.
Expand All @@ -124,18 +117,7 @@ def issubclass_safe(obj: Any, cls_: type) -> bool:
lambda x: x is None,
"Setting 'object_store_memory' is not implemented for tasks",
),
"retry_exceptions": Option(
(bool, list, tuple),
lambda x: (
isinstance(x, bool)
or (
isinstance(x, (list, tuple))
and all(issubclass_safe(x_, Exception) for x_ in x)
)
),
"retry_exceptions must be either a boolean or a list of exceptions",
default_value=False,
),
"retry_exceptions": Option(bool, default_value=False),
}

_actor_only_options = {
Expand Down
6 changes: 3 additions & 3 deletions python/ray/_private/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -2773,9 +2773,9 @@ def method(self):
this actor or task and its children. See
:ref:`runtime-environments` for detailed documentation. This API is
in beta and may change before becoming stable.
retry_exceptions: Only for *remote functions*. This specifies whether
application-level errors should be retried up to max_retries times.
This can be a boolean or a list of exceptions that should be retried.
retry_exceptions: Only for *remote functions*. This specifies
whether application-level errors should be retried
up to max_retries times.
scheduling_strategy: Strategy about how to
schedule a remote function or actor. Possible values are
None: ray will figure out the scheduling strategy to use, it
Expand Down
Loading

0 comments on commit cf7305a

Please sign in to comment.