Skip to content

Commit

Permalink
[Core] Add retry exception allowlist for user-defined filtering of re…
Browse files Browse the repository at this point in the history
…tryable application-level errors. (ray-project#25896)

This PR adds supported for specifying an exception allowlist (List[Exception]) as the retry_exceptions argument, such that an application-level exception will only be retried if it is in the allowlist.
  • Loading branch information
clarkzinzow authored Jul 2, 2022
1 parent 68b8933 commit 2a4d22f
Show file tree
Hide file tree
Showing 25 changed files with 369 additions and 93 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,8 @@ venv
# Vim
.*.swp
*.swp
.*.swo
*.swo
tags
tags.lock
tags.temp
Expand Down
7 changes: 6 additions & 1 deletion cpp/src/ray/runtime/task/task_executor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -133,9 +133,10 @@ Status TaskExecutor::ExecuteTask(
const std::vector<rpc::ObjectReference> &arg_refs,
const std::vector<ObjectID> &return_ids,
const std::string &debugger_breakpoint,
const std::string &serialized_retry_exception_allowlist,
std::vector<std::shared_ptr<ray::RayObject>> *results,
std::shared_ptr<ray::LocalMemoryBuffer> &creation_task_exception_pb_bytes,
bool *is_application_level_error,
bool *is_retryable_error,
const std::vector<ConcurrencyGroup> &defined_concurrency_groups,
const std::string name_of_concurrency_group_to_execute) {
RAY_LOG(DEBUG) << "Execute task type: " << TaskType_Name(task_type)
Expand All @@ -147,6 +148,10 @@ Status TaskExecutor::ExecuteTask(
auto typed_descriptor = function_descriptor->As<ray::CppFunctionDescriptor>();
std::string func_name = typed_descriptor->FunctionName();
bool cross_lang = !typed_descriptor->Caller().empty();
// TODO(Clark): Support retrying application-level errors for C++.
// TODO(Clark): Support exception allowlist for retrying application-level
// errors for C++.
*is_retryable_error = false;

Status status{};
std::shared_ptr<msgpack::sbuffer> data = nullptr;
Expand Down
3 changes: 2 additions & 1 deletion cpp/src/ray/runtime/task/task_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,10 @@ class TaskExecutor {
const std::vector<rpc::ObjectReference> &arg_refs,
const std::vector<ObjectID> &return_ids,
const std::string &debugger_breakpoint,
const std::string &serialized_retry_exception_allowlist,
std::vector<std::shared_ptr<ray::RayObject>> *results,
std::shared_ptr<ray::LocalMemoryBuffer> &creation_task_exception_pb_bytes,
bool *is_application_level_error,
bool *is_retryable_error,
const std::vector<ConcurrencyGroup> &defined_concurrency_groups,
const std::string name_of_concurrency_group_to_execute);

Expand Down
91 changes: 91 additions & 0 deletions doc/source/ray-core/doc_code/tasks_fault_tolerance.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
# flake8: noqa

# fmt: off
# __tasks_fault_tolerance_retries_begin__
import numpy as np
import os
import ray
import time

ray.init(ignore_reinit_error=True)

@ray.remote(max_retries=1)
def potentially_fail(failure_probability):
time.sleep(0.2)
if np.random.random() < failure_probability:
os._exit(0)
return 0

for _ in range(3):
try:
# If this task crashes, Ray will retry it up to one additional
# time. If either of the attempts succeeds, the call to ray.get
# below will return normally. Otherwise, it will raise an
# exception.
ray.get(potentially_fail.remote(0.5))
print('SUCCESS')
except ray.exceptions.WorkerCrashedError:
print('FAILURE')
# __tasks_fault_tolerance_retries_end__
# fmt: on

# fmt: off
# __tasks_fault_tolerance_retries_exception_begin__
import numpy as np
import os
import ray
import time

ray.init(ignore_reinit_error=True)

class RandomError(Exception):
pass

@ray.remote(max_retries=1, retry_exceptions=True)
def potentially_fail(failure_probability):
if failure_probability < 0 or failure_probability > 1:
raise ValueError(
"failure_probability must be between 0 and 1, but got: "
f"{failure_probability}"
)
time.sleep(0.2)
if np.random.random() < failure_probability:
raise RandomError("Failed!")
return 0

for _ in range(3):
try:
# If this task crashes, Ray will retry it up to one additional
# time. If either of the attempts succeeds, the call to ray.get
# below will return normally. Otherwise, it will raise an
# exception.
ray.get(potentially_fail.remote(0.5))
print('SUCCESS')
except RandomError:
print('FAILURE')

# Provide the exceptions that we want to retry as an allowlist.
retry_on_exception = potentially_fail.options(retry_exceptions=[RandomError])
try:
# This will fail since we're passing in -1 for the failure_probability,
# which will raise a ValueError in the task and does not match the RandomError
# exception that we provided.
ray.get(retry_on_exception.remote(-1))
except ValueError:
print("FAILED AS EXPECTED")
else:
raise RuntimeError("An exception should be raised so this shouldn't be reached.")

# These will retry on the RandomError exception.
for _ in range(3):
try:
# If this task crashes, Ray will retry it up to one additional
# time. If either of the attempts succeeds, the call to ray.get
# below will return normally. Otherwise, it will raise an
# exception.
ray.get(retry_on_exception.remote(0.5))
print('SUCCESS')
except RandomError:
print('FAILURE AFTER RETRIES')
# __tasks_fault_tolerance_retries_exception_end__
# fmt: on
63 changes: 37 additions & 26 deletions doc/source/ray-core/tasks/fault-tolerance.rst
Original file line number Diff line number Diff line change
@@ -1,8 +1,14 @@
.. _task-fault-tolerance:

===============
Fault Tolerance
===============

.. _task-retries:

Retries
=======

When a worker is executing a task, if the worker dies unexpectedly, either
because the process crashed or because the machine failed, Ray will rerun
the task until either the task succeeds or the maximum number of retries is
Expand All @@ -15,35 +21,40 @@ using :ref:`runtime environments<runtime-environments>`.

You can experiment with this behavior by running the following code.

.. code-block:: python
import numpy as np
import os
import ray
import time
ray.init(ignore_reinit_error=True)
@ray.remote(max_retries=1)
def potentially_fail(failure_probability):
time.sleep(0.2)
if np.random.random() < failure_probability:
os._exit(0)
return 0
for _ in range(3):
try:
# If this task crashes, Ray will retry it up to one additional
# time. If either of the attempts succeeds, the call to ray.get
# below will return normally. Otherwise, it will raise an
# exception.
ray.get(potentially_fail.remote(0.5))
print('SUCCESS')
except ray.exceptions.WorkerCrashedError:
print('FAILURE')
.. literalinclude:: ../doc_code/tasks_fault_tolerance.py
:language: python
:start-after: __tasks_fault_tolerance_retries_begin__
:end-before: __tasks_fault_tolerance_retries_end__

You can also control whether application-level errors are retried, and even **which**
application-level errors are retried, via the ``retry_exceptions`` argument. This is
``False`` by default, so if your application code within the Ray task raises an
exception, this failure will **not** be retried. This is to ensure that Ray is not
retrying non-idempotent tasks when they have partially executed.
However, if your tasks are idempotent, then you can enable application-level error
retries with ``retry_exceptions=True``, or even retry a specific set of
application-level errors (such as a class of exception types that you know to be
transient) by providing an allowlist of exceptions:

.. literalinclude:: ../doc_code/tasks_fault_tolerance.py
:language: python
:start-after: __tasks_fault_tolerance_retries_exception_begin__
:end-before: __tasks_fault_tolerance_retries_exception_end__

The semantics for each of the potential ``retry_exceptions`` values are as follows:

* ``retry_exceptions=False`` (default): Application-level errors are not retried.

* ``retry_exceptions=True``: All application-level errors are retried.

* ``retry_exceptions=[Exc1, Exc2]``: Application-level errors that are instances of
either ``Exc1`` or ``Exc2`` are retried.

.. _object-reconstruction:

Lineage-based Object Reconstruction
===================================

Ray also implements *lineage reconstruction* to recover task outputs that are
lost from the distributed object store. This can occur during node failures.
Ray will first automatically attempt to recover the value by looking for copies
Expand Down
20 changes: 19 additions & 1 deletion python/ray/_private/ray_option_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,13 @@ def _resource_option(name: str, default_value: Any = None):
}


def issubclass_safe(obj: Any, cls_: type) -> bool:
try:
return issubclass(obj, cls_)
except TypeError:
return False


_task_only_options = {
"max_calls": _counting_option("max_calls", False, default_value=0),
# Normal tasks may be retried on failure this many times.
Expand All @@ -117,7 +124,18 @@ def _resource_option(name: str, default_value: Any = None):
lambda x: x is None,
"Setting 'object_store_memory' is not implemented for tasks",
),
"retry_exceptions": Option(bool, default_value=False),
"retry_exceptions": Option(
(bool, list, tuple),
lambda x: (
isinstance(x, bool)
or (
isinstance(x, (list, tuple))
and all(issubclass_safe(x_, Exception) for x_ in x)
)
),
"retry_exceptions must be either a boolean or a list of exceptions",
default_value=False,
),
}

_actor_only_options = {
Expand Down
6 changes: 3 additions & 3 deletions python/ray/_private/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -2773,9 +2773,9 @@ def method(self):
this actor or task and its children. See
:ref:`runtime-environments` for detailed documentation. This API is
in beta and may change before becoming stable.
retry_exceptions: Only for *remote functions*. This specifies
whether application-level errors should be retried
up to max_retries times.
retry_exceptions: Only for *remote functions*. This specifies whether
application-level errors should be retried up to max_retries times.
This can be a boolean or a list of exceptions that should be retried.
scheduling_strategy: Strategy about how to
schedule a remote function or actor. Possible values are
None: ray will figure out the scheduling strategy to use, it
Expand Down
Loading

0 comments on commit 2a4d22f

Please sign in to comment.