diff --git a/cubed/runtime/backup.py b/cubed/runtime/backup.py index 4e830b59..98a184ed 100644 --- a/cubed/runtime/backup.py +++ b/cubed/runtime/backup.py @@ -1,9 +1,24 @@ import math -from typing import Dict, TypeVar +from typing import Dict, Optional, TypeVar + +from cubed.spec import Spec +from cubed.utils import is_cloud_storage_path T = TypeVar("T") +def use_backups_default(spec: Optional[Spec]) -> bool: + """The default setting for ``use_backups``. + + Backup tasks are only enabled on cloud object stores since they provide atomic writes. + """ + return ( + spec is not None + and spec.work_dir is not None + and is_cloud_storage_path(spec.work_dir) + ) + + def should_launch_backup( task: T, now: float, diff --git a/cubed/runtime/executors/asyncio.py b/cubed/runtime/executors/asyncio.py index 5eed1a08..6e4f7a2c 100644 --- a/cubed/runtime/executors/asyncio.py +++ b/cubed/runtime/executors/asyncio.py @@ -11,7 +11,7 @@ async def async_map_unordered( create_futures_func: Callable[..., List[Tuple[Any, Future]]], input: Iterable[Any], - use_backups: bool = True, + use_backups: bool = False, create_backup_futures_func: Optional[ Callable[..., List[Tuple[Any, Future]]] ] = None, diff --git a/cubed/runtime/executors/dask.py b/cubed/runtime/executors/dask.py index a343389c..4c6e6471 100644 --- a/cubed/runtime/executors/dask.py +++ b/cubed/runtime/executors/dask.py @@ -17,6 +17,7 @@ from dask.distributed import Client from networkx import MultiDiGraph +from cubed.runtime.backup import use_backups_default from cubed.runtime.executors.asyncio import async_map_unordered from cubed.runtime.pipeline import visit_node_generations, visit_nodes from cubed.runtime.types import Callback, CubedPipeline, DagExecutor @@ -41,7 +42,7 @@ async def map_unordered( map_function: Callable[..., Any], map_iterdata: Iterable[Union[List[Any], Tuple[Any, ...], Dict[str, Any]]], retries: int = 2, - use_backups: bool = True, + use_backups: bool = False, batch_size: Optional[int] = None, return_stats: bool = False, name: Optional[str] = None, @@ -125,6 +126,8 @@ async def async_execute_dag( async with Client(asynchronous=True, **compute_kwargs) as client: if spec is not None: check_runtime_memory(spec, client) + if "use_backups" not in kwargs and use_backups_default(spec): + kwargs["use_backups"] = True if not compute_arrays_in_parallel: # run one pipeline at a time for name, node in visit_nodes(dag, resume=resume): diff --git a/cubed/runtime/executors/lithops.py b/cubed/runtime/executors/lithops.py index a4d4528a..870f1c90 100644 --- a/cubed/runtime/executors/lithops.py +++ b/cubed/runtime/executors/lithops.py @@ -20,7 +20,7 @@ from lithops.wait import ALWAYS, ANY_COMPLETED from networkx import MultiDiGraph -from cubed.runtime.backup import should_launch_backup +from cubed.runtime.backup import should_launch_backup, use_backups_default from cubed.runtime.executors.lithops_retries import ( RetryingFunctionExecutor, RetryingFuture, @@ -53,7 +53,7 @@ def map_unordered( include_modules: List[str] = [], timeout: Optional[int] = None, retries: int = 2, - use_backups: bool = True, + use_backups: bool = False, return_stats: bool = False, wait_dur_sec: Optional[int] = 1, **kwargs, @@ -174,7 +174,7 @@ def execute_dag( compute_arrays_in_parallel: Optional[bool] = None, **kwargs, ) -> None: - use_backups = kwargs.pop("use_backups", True) + use_backups = kwargs.pop("use_backups", use_backups_default(spec)) wait_dur_sec = kwargs.pop("wait_dur_sec", None) compute_id = kwargs.pop("compute_id") allowed_mem = spec.allowed_mem if spec is not None else None diff --git a/cubed/runtime/executors/local.py b/cubed/runtime/executors/local.py index 97d1a1ab..a62c07a4 100644 --- a/cubed/runtime/executors/local.py +++ b/cubed/runtime/executors/local.py @@ -12,6 +12,7 @@ from networkx import MultiDiGraph from tenacity import Retrying, stop_after_attempt +from cubed.runtime.backup import use_backups_default from cubed.runtime.executors.asyncio import async_map_unordered from cubed.runtime.pipeline import visit_node_generations, visit_nodes from cubed.runtime.types import Callback, CubedPipeline, DagExecutor, TaskEndEvent @@ -80,7 +81,7 @@ async def map_unordered( function: Callable[..., Any], input: Iterable[Any], retries: int = 2, - use_backups: bool = True, + use_backups: bool = False, batch_size: Optional[int] = None, return_stats: bool = False, name: Optional[str] = None, @@ -180,6 +181,8 @@ async def async_execute_dag( use_processes = kwargs.pop("use_processes", False) if spec is not None: check_runtime_memory(spec, max_workers) + if "use_backups" not in kwargs and use_backups_default(spec): + kwargs["use_backups"] = True if use_processes: max_tasks_per_child = kwargs.pop("max_tasks_per_child", None) if isinstance(use_processes, str): diff --git a/cubed/runtime/executors/modal.py b/cubed/runtime/executors/modal.py index 68a5331b..e1c441d8 100644 --- a/cubed/runtime/executors/modal.py +++ b/cubed/runtime/executors/modal.py @@ -11,6 +11,7 @@ from networkx import MultiDiGraph from tenacity import retry, retry_if_exception_type, stop_after_attempt +from cubed.runtime.backup import use_backups_default from cubed.runtime.executors.asyncio import async_map_unordered from cubed.runtime.pipeline import visit_node_generations, visit_nodes from cubed.runtime.types import Callback, DagExecutor @@ -119,7 +120,7 @@ def run_remotely(self, input, func=None, config=None, name=None, compute_id=None async def map_unordered( app_function: Function, input: Iterable[Any], - use_backups: bool = True, + use_backups: bool = False, backup_function: Optional[Function] = None, batch_size: Optional[int] = None, return_stats: bool = False, @@ -209,6 +210,8 @@ async def async_execute_dag( ) -> None: if spec is not None: check_runtime_memory(spec) + if "use_backups" not in kwargs and use_backups_default(spec): + kwargs["use_backups"] = True async with app.run(show_progress=False): cloud = cloud or "aws" if cloud == "aws": diff --git a/cubed/tests/test_utils.py b/cubed/tests/test_utils.py index ac38a1df..d9f70e9f 100644 --- a/cubed/tests/test_utils.py +++ b/cubed/tests/test_utils.py @@ -12,6 +12,7 @@ block_id_to_offset, broadcast_trick, extract_stack_summaries, + is_cloud_storage_path, is_local_path, join_path, map_nested, @@ -77,6 +78,17 @@ def test_is_local_path(): assert is_local_path("file://absolute_path/path") assert is_local_path("file:///absolute_path/path") assert not is_local_path("s3://host/path") + assert not is_local_path("gs://host/path") + + +def test_is_cloud_storage_path(): + assert not is_cloud_storage_path("relative_path/path") + assert not is_cloud_storage_path("/absolute_path/path") + assert not is_cloud_storage_path("file:relative_path/path") + assert not is_cloud_storage_path("file://absolute_path/path") + assert not is_cloud_storage_path("file:///absolute_path/path") + assert is_cloud_storage_path("s3://host/path") + assert is_cloud_storage_path("gs://host/path") def test_memory_repr(): diff --git a/cubed/utils.py b/cubed/utils.py index 3263800b..7c2b1a38 100644 --- a/cubed/utils.py +++ b/cubed/utils.py @@ -80,9 +80,14 @@ def join_path(dir_url: PathType, child_path: str) -> str: return urlunsplit(split_parts) -def is_local_path(path: str): +def is_local_path(path: PathType): """Determine if a path string is for the local filesystem.""" - return urlsplit(path).scheme in ("", "file") + return urlsplit(str(path)).scheme in ("", "file") + + +def is_cloud_storage_path(path: PathType): + """Determine if a path string is for cloud storage.""" + return urlsplit(str(path)).scheme in ("gs", "s3") def memory_repr(num: int) -> str: diff --git a/docs/configuration.md b/docs/configuration.md index 6241fdf7..7b4087d2 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -118,7 +118,7 @@ since it is deliberately designed not to have anything except the most basic fea | Property | Default | Description | |------------------------------|---------|----------------------------------------------------------------------------------------------------| | `retries` | 2 | The number of times to retry a task if it fails. | -| `use_backups` | `True` | Whether to use backup tasks for mitigating stragglers. | +| `use_backups` | | Whether to use backup tasks for mitigating stragglers. Defaults to `True` only if `work_dir` is a filesystem supporting atomic writes (currently a cloud store like S3 or GCS). | | `batch_size` | `None` | Number of input tasks to submit to be run in parallel. The default is not to batch. | | `compute_arrays_in_parallel` | `False` | Whether arrays are computed one at a time or in parallel. | | `max_workers` | `None` | The maximum number of workers to use in the `ThreadPoolExecutor`. Defaults to number of CPU cores. | @@ -128,7 +128,7 @@ since it is deliberately designed not to have anything except the most basic fea | Property | Default | Description | |------------------------------|---------|--------------------------------------------------------------------------------------------------------------------------------------------| -| `use_backups` | `True` | Whether to use backup tasks for mitigating stragglers. | +| `use_backups` | | Whether to use backup tasks for mitigating stragglers. Defaults to `True` only if `work_dir` is a filesystem supporting atomic writes (currently a cloud store like S3 or GCS). | | `batch_size` | `None` | Number of input tasks to submit to be run in parallel. `None` means don't batch. | | `compute_arrays_in_parallel` | `False` | Whether arrays are computed one at a time or in parallel. | | `max_workers` | `None` | The maximum number of workers to use in the `ProcessPoolExecutor`. Defaults to number of CPU cores. | @@ -154,7 +154,7 @@ Note that there is currently no way to set retries or a timeout for the Coiled e | Property | Default | Description | |------------------------------|---------|---------------------------------------------------------------------------------------------------------------------------------| | `retries` | 2 | The number of times to retry a task if it fails. | -| `use_backups` | `True` | Whether to use backup tasks for mitigating stragglers. | +| `use_backups` | | Whether to use backup tasks for mitigating stragglers. Defaults to `True` only if `work_dir` is a filesystem supporting atomic writes (currently a cloud store like S3 or GCS). | | `batch_size` | `None` | Number of input tasks to submit to be run in parallel. The default is not to batch. | | `compute_arrays_in_parallel` | `False` | Whether arrays are computed one at a time or in parallel. | | `compute_kwargs` | `None` | Keyword arguments to pass to Dask's [`distributed.Client`](https://distributed.dask.org/en/latest/api.html#client) constructor. | @@ -167,7 +167,7 @@ Note that there is currently no way to set a timeout for the Dask executor. |------------------------------|---------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| | `retries` | 2 | The number of times to retry a task if it fails. | | `timeout` | `None` | Tasks that take longer than the timeout will be automatically killed and retried. Defaults to the timeout specified when [deploying the lithops runtime image](https://lithops-cloud.github.io/docs/source/cli.html#lithops-runtime-deploy-runtime-name). This is 180 seconds in the [examples](https://github.com/cubed-dev/cubed/blob/main/examples/README.md). | -| `use_backups` | `True` | Whether to use backup tasks for mitigating stragglers. | +| `use_backups` | | Whether to use backup tasks for mitigating stragglers. Defaults to `True` only if `work_dir` is a filesystem supporting atomic writes (currently a cloud store like S3 or GCS). | | `compute_arrays_in_parallel` | `False` | Whether arrays are computed one at a time or in parallel. | | Other properties | N/A | Other properties will be passed as keyword arguments to the [`lithops.executors.FunctionExecutor`](https://lithops-cloud.github.io/docs/source/api_futures.html#lithops.executors.FunctionExecutor) constructor. | diff --git a/docs/user-guide/reliability.md b/docs/user-guide/reliability.md index 7030d27a..292b000a 100644 --- a/docs/user-guide/reliability.md +++ b/docs/user-guide/reliability.md @@ -23,4 +23,4 @@ A few slow running tasks (called stragglers) can disproportionately slow down th When a backup task is launched the original task is not cancelled, so it is to be expected that both tasks will complete and write their (identical) output. This only works since tasks are idempotent and each write a single, whole Zarr chunk in an atomic operation. (Updates to a single key are atomic in both [Amazon S3](https://docs.aws.amazon.com/AmazonS3/latest/userguide/Welcome.html#ConsistencyModel) and Google Cloud Storage.) -Backup tasks are enabled by default, but if you need to turn them off you can do so with ``use_backups=False``. +Backup tasks are only enabled by default on filesystems supporting atomic writes, which currently includes cloud stores like S3 or GCS. You can turn backup tasks off completely on cloud stores by setting ``use_backups=False``.