Skip to content

Commit

Permalink
Implemented config update functionality for protocol
Browse files Browse the repository at this point in the history
  • Loading branch information
sveinugu committed Nov 3, 2023
1 parent 5fd99ac commit 4d477bb
Show file tree
Hide file tree
Showing 7 changed files with 125 additions and 17 deletions.
9 changes: 8 additions & 1 deletion src/omnipy/api/enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,13 @@ class RestoreOutputsOptions(str, Enum):
FORCE_ENABLE_IGNORE_PARAMS = 'force_ignore_params'


class OutputStorageProtocolOptions(str, Enum):
""""""
LOCAL = 'local'
S3 = 's3'
FOLLOW_CONFIG = 'config'


class ConfigPersistOutputsOptions(str, Enum):
""""""
DISABLED = 'disabled'
Expand All @@ -31,7 +38,7 @@ class ConfigRestoreOutputsOptions(str, Enum):
AUTO_ENABLE_IGNORE_PARAMS = 'auto_ignore_params'


class OutputStorageProtocolOptions(str, Enum):
class ConfigOutputStorageProtocolOptions(str, Enum):
""""""
LOCAL = 'local'
S3 = 's3'
Expand Down
13 changes: 12 additions & 1 deletion src/omnipy/api/protocols/private/compute/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@
from types import MappingProxyType
from typing import Any, Callable, Dict, Mapping, Optional, Protocol, Tuple, Type

from omnipy.api.enums import PersistOutputsOptions, RestoreOutputsOptions
from omnipy.api.enums import (ConfigOutputStorageProtocolOptions,
OutputStorageProtocolOptions,
PersistOutputsOptions,
RestoreOutputsOptions)
from omnipy.api.protocols.private.compute.job_creator import IsJobCreator
from omnipy.api.protocols.private.compute.mixins import IsUniquelyNamedJob
from omnipy.api.protocols.private.engine import IsEngine
Expand Down Expand Up @@ -112,6 +115,10 @@ def persist_outputs(self) -> Optional[PersistOutputsOptions]:
def restore_outputs(self) -> Optional[RestoreOutputsOptions]:
...

@property
def output_storage_protocol(self) -> Optional[OutputStorageProtocolOptions]:
...

@property
def will_persist_outputs(self) -> PersistOutputsOptions:
...
Expand All @@ -120,6 +127,10 @@ def will_persist_outputs(self) -> PersistOutputsOptions:
def will_restore_outputs(self) -> RestoreOutputsOptions:
...

@property
def output_storage_protocol_to_use(self) -> OutputStorageProtocolOptions:
...

@property
def result_key(self) -> Optional[str]:
...
Expand Down
8 changes: 4 additions & 4 deletions src/omnipy/api/protocols/public/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@

from typing import Protocol

from omnipy.api.enums import (ConfigPersistOutputsOptions,
ConfigRestoreOutputsOptions,
OutputStorageProtocolOptions)
from omnipy.api.enums import (ConfigOutputStorageProtocolOptions,
ConfigPersistOutputsOptions,
ConfigRestoreOutputsOptions)
from omnipy.api.types import LocaleType


Expand Down Expand Up @@ -59,6 +59,6 @@ class IsS3OutputStorage(IsOutputStorageBase, Protocol):
class IsOutputStorage(Protocol):
persist_outputs: ConfigPersistOutputsOptions
restore_outputs: ConfigRestoreOutputsOptions
protocol: OutputStorageProtocolOptions
protocol: ConfigOutputStorageProtocolOptions
local: IsLocalOutputStorage
s3: IsS3OutputStorage
27 changes: 25 additions & 2 deletions src/omnipy/compute/mixins/serialize.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,12 @@
import tarfile
from typing import cast, Generator, Optional, Type

from omnipy.api.enums import ConfigOutputStorageProtocolOptions as ConfigProtocolOpts
from omnipy.api.enums import ConfigPersistOutputsOptions as ConfigPersistOpts
from omnipy.api.enums import ConfigRestoreOutputsOptions as ConfigRestoreOpts
from omnipy.api.enums import PersistOutputsOptions, RestoreOutputsOptions
from omnipy.api.enums import (OutputStorageProtocolOptions,
PersistOutputsOptions,
RestoreOutputsOptions)
from omnipy.api.protocols.private.compute.job import IsJobBase
from omnipy.api.protocols.public.config import IsJobConfig
from omnipy.compute.mixins.func_signature import SignatureFuncJobBaseMixin
Expand All @@ -21,13 +24,15 @@

PersistOpts = PersistOutputsOptions
RestoreOpts = RestoreOutputsOptions
ProtocolOpts = OutputStorageProtocolOptions


class SerializerFuncJobBaseMixin:
def __init__(self,
*,
persist_outputs: Optional[PersistOutputsOptions] = None,
restore_outputs: Optional[RestoreOutputsOptions] = None):
restore_outputs: Optional[RestoreOutputsOptions] = None,
output_storage_protocol: Optional[OutputStorageProtocolOptions] = None):

# TODO: Possibly reimplement logic using a state machine, e.g. "transitions" package
if persist_outputs is None:
Expand All @@ -40,6 +45,11 @@ def __init__(self,
else:
self._restore_outputs = RestoreOpts(restore_outputs)

if output_storage_protocol is None:
self._output_storage_protocol = ProtocolOpts.FOLLOW_CONFIG if self._has_job_config else None
else:
self._output_storage_protocol = ProtocolOpts(output_storage_protocol)

self._serializer_registry = self._create_serializer_registry()

def _create_serializer_registry(self):
Expand Down Expand Up @@ -82,6 +92,10 @@ def persist_outputs(self) -> Optional[PersistOutputsOptions]:
def restore_outputs(self) -> Optional[RestoreOutputsOptions]:
return self._restore_outputs

@property
def output_storage_protocol(self) -> Optional[OutputStorageProtocolOptions]:
return self._output_storage_protocol

@property
def will_persist_outputs(self) -> PersistOutputsOptions:
if not self._has_job_config or self._persist_outputs is not PersistOpts.FOLLOW_CONFIG:
Expand Down Expand Up @@ -117,6 +131,15 @@ def will_restore_outputs(self) -> RestoreOutputsOptions:
assert config_restore_opt == ConfigRestoreOpts.DISABLED
return RestoreOpts.DISABLED

@property
def output_storage_protocol_to_use(self) -> OutputStorageProtocolOptions:
if not self._has_job_config or self._output_storage_protocol is not ProtocolOpts.FOLLOW_CONFIG:
return self._output_storage_protocol if self._output_storage_protocol is not None \
else ProtocolOpts.LOCAL
else:
config_protocol = self._job_config.output_storage.protocol
return ProtocolOpts(config_protocol)

def _call_job(self, *args: object, **kwargs: object) -> object:
self_as_name_job_base_mixin = cast(NameJobBaseMixin, self)

Expand Down
8 changes: 4 additions & 4 deletions src/omnipy/config/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
# from datetime import datetime
from pathlib import Path

from omnipy.api.enums import (ConfigPersistOutputsOptions,
ConfigRestoreOutputsOptions,
OutputStorageProtocolOptions)
from omnipy.api.enums import (ConfigOutputStorageProtocolOptions,
ConfigPersistOutputsOptions,
ConfigRestoreOutputsOptions)
from omnipy.api.protocols.public.config import (IsLocalOutputStorage,
IsOutputStorage,
IsS3OutputStorage)
Expand Down Expand Up @@ -37,7 +37,7 @@ class OutputStorage:
ConfigPersistOutputsOptions.ENABLE_FLOW_AND_TASK_OUTPUTS
restore_outputs: ConfigRestoreOutputsOptions = \
ConfigRestoreOutputsOptions.DISABLED
protocol: OutputStorageProtocolOptions = OutputStorageProtocolOptions.LOCAL
protocol: ConfigOutputStorageProtocolOptions = ConfigOutputStorageProtocolOptions.LOCAL
local: IsLocalOutputStorage = field(default_factory=LocalOutputStorage)
s3: IsS3OutputStorage = field(default_factory=S3OutputStorage)

Expand Down
8 changes: 4 additions & 4 deletions tests/hub/test_runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@

import pytest

from omnipy.api.enums import (ConfigPersistOutputsOptions,
from omnipy.api.enums import (ConfigOutputStorageProtocolOptions,
ConfigPersistOutputsOptions,
ConfigRestoreOutputsOptions,
EngineChoice,
OutputStorageProtocolOptions)
EngineChoice)
from omnipy.api.protocols.public.hub import IsRuntime
from omnipy.hub.runtime import RuntimeConfig, RuntimeObjects

Expand Down Expand Up @@ -42,7 +42,7 @@ def _assert_runtime_config_default(config: RuntimeConfig, dir_path: str):
assert config.job.output_storage.restore_outputs == \
ConfigRestoreOutputsOptions.DISABLED
assert config.job.output_storage.protocol == \
OutputStorageProtocolOptions.LOCAL
ConfigOutputStorageProtocolOptions.LOCAL
assert config.job.output_storage.local.persist_data_dir_path == \
os.path.join(dir_path, 'outputs')
assert config.job.output_storage.s3.persist_data_dir_path == os.path.join('omnipy', 'outputs')
Expand Down
69 changes: 68 additions & 1 deletion tests/integration/novel/serialize/test_serialize.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@
import pytest
import pytest_cases as pc

from omnipy.api.enums import (ConfigPersistOutputsOptions,
from omnipy.api.enums import (ConfigOutputStorageProtocolOptions,
ConfigPersistOutputsOptions,
ConfigRestoreOutputsOptions,
OutputStorageProtocolOptions,
PersistOutputsOptions,
RestoreOutputsOptions)
from omnipy.api.protocols.public.hub import IsRuntime
Expand All @@ -18,6 +20,8 @@ def test_all_properties_pytest_default_config(case_tmpl) -> None:
assert job_obj.will_persist_outputs is PersistOutputsOptions.DISABLED
assert job_obj.restore_outputs is None
assert job_obj.will_restore_outputs is RestoreOutputsOptions.DISABLED
assert job_obj.output_storage_protocol is None
assert job_obj.output_storage_protocol_to_use is OutputStorageProtocolOptions.LOCAL


@pc.parametrize_with_cases('case_tmpl', cases='.cases.jobs', prefix='case_config_')
Expand All @@ -29,6 +33,7 @@ def test_all_properties_runtime_default_config(
assert runtime.config.job.output_storage.persist_outputs == \
ConfigPersistOutputsOptions.ENABLE_FLOW_AND_TASK_OUTPUTS
assert runtime.config.job.output_storage.restore_outputs == ConfigRestoreOutputsOptions.DISABLED
assert runtime.config.job.output_storage.protocol == ConfigOutputStorageProtocolOptions.LOCAL

for job_obj in case_tmpl, case_tmpl.apply():
assert job_obj.persist_outputs is PersistOutputsOptions.FOLLOW_CONFIG
Expand All @@ -37,6 +42,9 @@ def test_all_properties_runtime_default_config(
assert job_obj.restore_outputs is RestoreOutputsOptions.FOLLOW_CONFIG
assert job_obj.will_restore_outputs is RestoreOutputsOptions.DISABLED

assert job_obj.output_storage_protocol is OutputStorageProtocolOptions.FOLLOW_CONFIG
assert job_obj.output_storage_protocol_to_use is OutputStorageProtocolOptions.LOCAL


@pc.parametrize_with_cases(
'case_task_tmpl', cases='.cases.jobs', has_tag='task', prefix='case_config_')
Expand Down Expand Up @@ -167,6 +175,65 @@ def test_properties_restore_outputs_override_config(
assert job_obj_4.will_restore_outputs is RestoreOutputsOptions.AUTO_ENABLE_IGNORE_PARAMS


@pc.parametrize_with_cases(
'case_task_tmpl', cases='.cases.jobs', has_tag='task', prefix='case_config_')
@pc.parametrize_with_cases(
'case_flow_tmpl', cases='.cases.jobs', has_tag='flow', prefix='case_config_')
def test_properties_output_storage_protocols(
runtime: Annotated[IsRuntime, pytest.fixture],
case_task_tmpl,
case_flow_tmpl,
) -> None:

runtime.config.job.output_storage.protocol = ConfigOutputStorageProtocolOptions.S3

for job_obj in case_task_tmpl, case_task_tmpl.apply(), case_flow_tmpl, case_flow_tmpl.apply():
assert job_obj.output_storage_protocol is OutputStorageProtocolOptions.FOLLOW_CONFIG
assert job_obj.output_storage_protocol_to_use is OutputStorageProtocolOptions.S3

runtime.config.job.output_storage.protocol = ConfigOutputStorageProtocolOptions.LOCAL

for job_obj in case_task_tmpl, case_task_tmpl.apply(), case_flow_tmpl, case_flow_tmpl.apply():
assert job_obj.output_storage_protocol is OutputStorageProtocolOptions.FOLLOW_CONFIG
assert job_obj.output_storage_protocol_to_use is OutputStorageProtocolOptions.LOCAL


@pc.parametrize_with_cases('case_tmpl', cases='.cases.jobs', prefix='case_config_')
def test_properties_output_storage_protocols_override_config(
runtime: Annotated[IsRuntime, pytest.fixture],
case_tmpl,
) -> None:
assert runtime.config.job.output_storage.protocol == ConfigOutputStorageProtocolOptions.LOCAL

case_tmpl_2 = case_tmpl.refine(output_storage_protocol='s3')

for job_obj_2 in case_tmpl_2, case_tmpl_2.apply():
assert job_obj_2.output_storage_protocol is OutputStorageProtocolOptions.S3
assert job_obj_2.output_storage_protocol_to_use is OutputStorageProtocolOptions.S3

case_tmpl_3 = case_tmpl.refine(output_storage_protocol='local')

for job_obj_3 in case_tmpl_3, case_tmpl_3.apply():
assert job_obj_3.output_storage_protocol is OutputStorageProtocolOptions.LOCAL
assert job_obj_3.output_storage_protocol_to_use is OutputStorageProtocolOptions.LOCAL

runtime.config.job.output_storage.protocol = 's3'

for job_obj in case_tmpl, case_tmpl.apply():
assert job_obj.output_storage_protocol is OutputStorageProtocolOptions.FOLLOW_CONFIG
assert job_obj.output_storage_protocol_to_use is OutputStorageProtocolOptions.S3

for job_obj_3 in case_tmpl_3, case_tmpl_3.apply():
assert job_obj_3.output_storage_protocol is OutputStorageProtocolOptions.LOCAL
assert job_obj_3.output_storage_protocol_to_use is OutputStorageProtocolOptions.LOCAL

case_tmpl_4 = case_tmpl_3.refine(output_storage_protocol='config')

for job_obj_4 in case_tmpl_4, case_tmpl_4.apply():
assert job_obj_4.output_storage_protocol is OutputStorageProtocolOptions.FOLLOW_CONFIG
assert job_obj_4.output_storage_protocol_to_use is OutputStorageProtocolOptions.S3


@pc.parametrize_with_cases('case_tmpl', cases='.cases.jobs', prefix='case_')
def test_persist_and_restore(
runtime: Annotated[IsRuntime, pytest.fixture],
Expand Down

0 comments on commit 4d477bb

Please sign in to comment.