diff --git a/src/omnipy/api/enums.py b/src/omnipy/api/enums.py index 042b0076..9fbc72ba 100644 --- a/src/omnipy/api/enums.py +++ b/src/omnipy/api/enums.py @@ -18,6 +18,13 @@ class RestoreOutputsOptions(str, Enum): FORCE_ENABLE_IGNORE_PARAMS = 'force_ignore_params' +class OutputStorageProtocolOptions(str, Enum): + """""" + LOCAL = 'local' + S3 = 's3' + FOLLOW_CONFIG = 'config' + + class ConfigPersistOutputsOptions(str, Enum): """""" DISABLED = 'disabled' @@ -31,7 +38,7 @@ class ConfigRestoreOutputsOptions(str, Enum): AUTO_ENABLE_IGNORE_PARAMS = 'auto_ignore_params' -class OutputStorageProtocolOptions(str, Enum): +class ConfigOutputStorageProtocolOptions(str, Enum): """""" LOCAL = 'local' S3 = 's3' diff --git a/src/omnipy/api/protocols/private/compute/job.py b/src/omnipy/api/protocols/private/compute/job.py index e584b978..77a24770 100644 --- a/src/omnipy/api/protocols/private/compute/job.py +++ b/src/omnipy/api/protocols/private/compute/job.py @@ -4,7 +4,10 @@ from types import MappingProxyType from typing import Any, Callable, Dict, Mapping, Optional, Protocol, Tuple, Type -from omnipy.api.enums import PersistOutputsOptions, RestoreOutputsOptions +from omnipy.api.enums import (ConfigOutputStorageProtocolOptions, + OutputStorageProtocolOptions, + PersistOutputsOptions, + RestoreOutputsOptions) from omnipy.api.protocols.private.compute.job_creator import IsJobCreator from omnipy.api.protocols.private.compute.mixins import IsUniquelyNamedJob from omnipy.api.protocols.private.engine import IsEngine @@ -112,6 +115,10 @@ def persist_outputs(self) -> Optional[PersistOutputsOptions]: def restore_outputs(self) -> Optional[RestoreOutputsOptions]: ... + @property + def output_storage_protocol(self) -> Optional[OutputStorageProtocolOptions]: + ... + @property def will_persist_outputs(self) -> PersistOutputsOptions: ... @@ -120,6 +127,10 @@ def will_persist_outputs(self) -> PersistOutputsOptions: def will_restore_outputs(self) -> RestoreOutputsOptions: ... + @property + def output_storage_protocol_to_use(self) -> OutputStorageProtocolOptions: + ... + @property def result_key(self) -> Optional[str]: ... diff --git a/src/omnipy/api/protocols/public/config.py b/src/omnipy/api/protocols/public/config.py index 2ad58166..0fb6db80 100644 --- a/src/omnipy/api/protocols/public/config.py +++ b/src/omnipy/api/protocols/public/config.py @@ -2,9 +2,9 @@ from typing import Protocol -from omnipy.api.enums import (ConfigPersistOutputsOptions, - ConfigRestoreOutputsOptions, - OutputStorageProtocolOptions) +from omnipy.api.enums import (ConfigOutputStorageProtocolOptions, + ConfigPersistOutputsOptions, + ConfigRestoreOutputsOptions) from omnipy.api.types import LocaleType @@ -59,6 +59,6 @@ class IsS3OutputStorage(IsOutputStorageBase, Protocol): class IsOutputStorage(Protocol): persist_outputs: ConfigPersistOutputsOptions restore_outputs: ConfigRestoreOutputsOptions - protocol: OutputStorageProtocolOptions + protocol: ConfigOutputStorageProtocolOptions local: IsLocalOutputStorage s3: IsS3OutputStorage diff --git a/src/omnipy/compute/mixins/serialize.py b/src/omnipy/compute/mixins/serialize.py index fa883094..603e2d98 100644 --- a/src/omnipy/compute/mixins/serialize.py +++ b/src/omnipy/compute/mixins/serialize.py @@ -4,9 +4,12 @@ import tarfile from typing import cast, Generator, Optional, Type +from omnipy.api.enums import ConfigOutputStorageProtocolOptions as ConfigProtocolOpts from omnipy.api.enums import ConfigPersistOutputsOptions as ConfigPersistOpts from omnipy.api.enums import ConfigRestoreOutputsOptions as ConfigRestoreOpts -from omnipy.api.enums import PersistOutputsOptions, RestoreOutputsOptions +from omnipy.api.enums import (OutputStorageProtocolOptions, + PersistOutputsOptions, + RestoreOutputsOptions) from omnipy.api.protocols.private.compute.job import IsJobBase from omnipy.api.protocols.public.config import IsJobConfig from omnipy.compute.mixins.func_signature import SignatureFuncJobBaseMixin @@ -21,13 +24,15 @@ PersistOpts = PersistOutputsOptions RestoreOpts = RestoreOutputsOptions +ProtocolOpts = OutputStorageProtocolOptions class SerializerFuncJobBaseMixin: def __init__(self, *, persist_outputs: Optional[PersistOutputsOptions] = None, - restore_outputs: Optional[RestoreOutputsOptions] = None): + restore_outputs: Optional[RestoreOutputsOptions] = None, + output_storage_protocol: Optional[OutputStorageProtocolOptions] = None): # TODO: Possibly reimplement logic using a state machine, e.g. "transitions" package if persist_outputs is None: @@ -40,6 +45,11 @@ def __init__(self, else: self._restore_outputs = RestoreOpts(restore_outputs) + if output_storage_protocol is None: + self._output_storage_protocol = ProtocolOpts.FOLLOW_CONFIG if self._has_job_config else None + else: + self._output_storage_protocol = ProtocolOpts(output_storage_protocol) + self._serializer_registry = self._create_serializer_registry() def _create_serializer_registry(self): @@ -82,6 +92,10 @@ def persist_outputs(self) -> Optional[PersistOutputsOptions]: def restore_outputs(self) -> Optional[RestoreOutputsOptions]: return self._restore_outputs + @property + def output_storage_protocol(self) -> Optional[OutputStorageProtocolOptions]: + return self._output_storage_protocol + @property def will_persist_outputs(self) -> PersistOutputsOptions: if not self._has_job_config or self._persist_outputs is not PersistOpts.FOLLOW_CONFIG: @@ -117,6 +131,15 @@ def will_restore_outputs(self) -> RestoreOutputsOptions: assert config_restore_opt == ConfigRestoreOpts.DISABLED return RestoreOpts.DISABLED + @property + def output_storage_protocol_to_use(self) -> OutputStorageProtocolOptions: + if not self._has_job_config or self._output_storage_protocol is not ProtocolOpts.FOLLOW_CONFIG: + return self._output_storage_protocol if self._output_storage_protocol is not None \ + else ProtocolOpts.LOCAL + else: + config_protocol = self._job_config.output_storage.protocol + return ProtocolOpts(config_protocol) + def _call_job(self, *args: object, **kwargs: object) -> object: self_as_name_job_base_mixin = cast(NameJobBaseMixin, self) diff --git a/src/omnipy/config/job.py b/src/omnipy/config/job.py index f144745a..947081a8 100644 --- a/src/omnipy/config/job.py +++ b/src/omnipy/config/job.py @@ -3,9 +3,9 @@ # from datetime import datetime from pathlib import Path -from omnipy.api.enums import (ConfigPersistOutputsOptions, - ConfigRestoreOutputsOptions, - OutputStorageProtocolOptions) +from omnipy.api.enums import (ConfigOutputStorageProtocolOptions, + ConfigPersistOutputsOptions, + ConfigRestoreOutputsOptions) from omnipy.api.protocols.public.config import (IsLocalOutputStorage, IsOutputStorage, IsS3OutputStorage) @@ -37,7 +37,7 @@ class OutputStorage: ConfigPersistOutputsOptions.ENABLE_FLOW_AND_TASK_OUTPUTS restore_outputs: ConfigRestoreOutputsOptions = \ ConfigRestoreOutputsOptions.DISABLED - protocol: OutputStorageProtocolOptions = OutputStorageProtocolOptions.LOCAL + protocol: ConfigOutputStorageProtocolOptions = ConfigOutputStorageProtocolOptions.LOCAL local: IsLocalOutputStorage = field(default_factory=LocalOutputStorage) s3: IsS3OutputStorage = field(default_factory=S3OutputStorage) diff --git a/tests/hub/test_runtime.py b/tests/hub/test_runtime.py index dd08fb61..e788c099 100644 --- a/tests/hub/test_runtime.py +++ b/tests/hub/test_runtime.py @@ -4,10 +4,10 @@ import pytest -from omnipy.api.enums import (ConfigPersistOutputsOptions, +from omnipy.api.enums import (ConfigOutputStorageProtocolOptions, + ConfigPersistOutputsOptions, ConfigRestoreOutputsOptions, - EngineChoice, - OutputStorageProtocolOptions) + EngineChoice) from omnipy.api.protocols.public.hub import IsRuntime from omnipy.hub.runtime import RuntimeConfig, RuntimeObjects @@ -42,7 +42,7 @@ def _assert_runtime_config_default(config: RuntimeConfig, dir_path: str): assert config.job.output_storage.restore_outputs == \ ConfigRestoreOutputsOptions.DISABLED assert config.job.output_storage.protocol == \ - OutputStorageProtocolOptions.LOCAL + ConfigOutputStorageProtocolOptions.LOCAL assert config.job.output_storage.local.persist_data_dir_path == \ os.path.join(dir_path, 'outputs') assert config.job.output_storage.s3.persist_data_dir_path == os.path.join('omnipy', 'outputs') diff --git a/tests/integration/novel/serialize/test_serialize.py b/tests/integration/novel/serialize/test_serialize.py index 60ddb9c5..f819c2e0 100644 --- a/tests/integration/novel/serialize/test_serialize.py +++ b/tests/integration/novel/serialize/test_serialize.py @@ -3,8 +3,10 @@ import pytest import pytest_cases as pc -from omnipy.api.enums import (ConfigPersistOutputsOptions, +from omnipy.api.enums import (ConfigOutputStorageProtocolOptions, + ConfigPersistOutputsOptions, ConfigRestoreOutputsOptions, + OutputStorageProtocolOptions, PersistOutputsOptions, RestoreOutputsOptions) from omnipy.api.protocols.public.hub import IsRuntime @@ -18,6 +20,8 @@ def test_all_properties_pytest_default_config(case_tmpl) -> None: assert job_obj.will_persist_outputs is PersistOutputsOptions.DISABLED assert job_obj.restore_outputs is None assert job_obj.will_restore_outputs is RestoreOutputsOptions.DISABLED + assert job_obj.output_storage_protocol is None + assert job_obj.output_storage_protocol_to_use is OutputStorageProtocolOptions.LOCAL @pc.parametrize_with_cases('case_tmpl', cases='.cases.jobs', prefix='case_config_') @@ -29,6 +33,7 @@ def test_all_properties_runtime_default_config( assert runtime.config.job.output_storage.persist_outputs == \ ConfigPersistOutputsOptions.ENABLE_FLOW_AND_TASK_OUTPUTS assert runtime.config.job.output_storage.restore_outputs == ConfigRestoreOutputsOptions.DISABLED + assert runtime.config.job.output_storage.protocol == ConfigOutputStorageProtocolOptions.LOCAL for job_obj in case_tmpl, case_tmpl.apply(): assert job_obj.persist_outputs is PersistOutputsOptions.FOLLOW_CONFIG @@ -37,6 +42,9 @@ def test_all_properties_runtime_default_config( assert job_obj.restore_outputs is RestoreOutputsOptions.FOLLOW_CONFIG assert job_obj.will_restore_outputs is RestoreOutputsOptions.DISABLED + assert job_obj.output_storage_protocol is OutputStorageProtocolOptions.FOLLOW_CONFIG + assert job_obj.output_storage_protocol_to_use is OutputStorageProtocolOptions.LOCAL + @pc.parametrize_with_cases( 'case_task_tmpl', cases='.cases.jobs', has_tag='task', prefix='case_config_') @@ -167,6 +175,65 @@ def test_properties_restore_outputs_override_config( assert job_obj_4.will_restore_outputs is RestoreOutputsOptions.AUTO_ENABLE_IGNORE_PARAMS +@pc.parametrize_with_cases( + 'case_task_tmpl', cases='.cases.jobs', has_tag='task', prefix='case_config_') +@pc.parametrize_with_cases( + 'case_flow_tmpl', cases='.cases.jobs', has_tag='flow', prefix='case_config_') +def test_properties_output_storage_protocols( + runtime: Annotated[IsRuntime, pytest.fixture], + case_task_tmpl, + case_flow_tmpl, +) -> None: + + runtime.config.job.output_storage.protocol = ConfigOutputStorageProtocolOptions.S3 + + for job_obj in case_task_tmpl, case_task_tmpl.apply(), case_flow_tmpl, case_flow_tmpl.apply(): + assert job_obj.output_storage_protocol is OutputStorageProtocolOptions.FOLLOW_CONFIG + assert job_obj.output_storage_protocol_to_use is OutputStorageProtocolOptions.S3 + + runtime.config.job.output_storage.protocol = ConfigOutputStorageProtocolOptions.LOCAL + + for job_obj in case_task_tmpl, case_task_tmpl.apply(), case_flow_tmpl, case_flow_tmpl.apply(): + assert job_obj.output_storage_protocol is OutputStorageProtocolOptions.FOLLOW_CONFIG + assert job_obj.output_storage_protocol_to_use is OutputStorageProtocolOptions.LOCAL + + +@pc.parametrize_with_cases('case_tmpl', cases='.cases.jobs', prefix='case_config_') +def test_properties_output_storage_protocols_override_config( + runtime: Annotated[IsRuntime, pytest.fixture], + case_tmpl, +) -> None: + assert runtime.config.job.output_storage.protocol == ConfigOutputStorageProtocolOptions.LOCAL + + case_tmpl_2 = case_tmpl.refine(output_storage_protocol='s3') + + for job_obj_2 in case_tmpl_2, case_tmpl_2.apply(): + assert job_obj_2.output_storage_protocol is OutputStorageProtocolOptions.S3 + assert job_obj_2.output_storage_protocol_to_use is OutputStorageProtocolOptions.S3 + + case_tmpl_3 = case_tmpl.refine(output_storage_protocol='local') + + for job_obj_3 in case_tmpl_3, case_tmpl_3.apply(): + assert job_obj_3.output_storage_protocol is OutputStorageProtocolOptions.LOCAL + assert job_obj_3.output_storage_protocol_to_use is OutputStorageProtocolOptions.LOCAL + + runtime.config.job.output_storage.protocol = 's3' + + for job_obj in case_tmpl, case_tmpl.apply(): + assert job_obj.output_storage_protocol is OutputStorageProtocolOptions.FOLLOW_CONFIG + assert job_obj.output_storage_protocol_to_use is OutputStorageProtocolOptions.S3 + + for job_obj_3 in case_tmpl_3, case_tmpl_3.apply(): + assert job_obj_3.output_storage_protocol is OutputStorageProtocolOptions.LOCAL + assert job_obj_3.output_storage_protocol_to_use is OutputStorageProtocolOptions.LOCAL + + case_tmpl_4 = case_tmpl_3.refine(output_storage_protocol='config') + + for job_obj_4 in case_tmpl_4, case_tmpl_4.apply(): + assert job_obj_4.output_storage_protocol is OutputStorageProtocolOptions.FOLLOW_CONFIG + assert job_obj_4.output_storage_protocol_to_use is OutputStorageProtocolOptions.S3 + + @pc.parametrize_with_cases('case_tmpl', cases='.cases.jobs', prefix='case_') def test_persist_and_restore( runtime: Annotated[IsRuntime, pytest.fixture],