From 73efb7d1a85492639a30a9525597e7ee74dcadc1 Mon Sep 17 00:00:00 2001 From: Sveinung Gundersen Date: Mon, 9 Dec 2024 21:46:41 +0100 Subject: [PATCH] Make sure all config classes inherit from DataPublisher --- src/omnipy/api/protocols/private/util.py | 11 +++++++++ src/omnipy/api/protocols/public/config.py | 15 ++++++------ src/omnipy/api/protocols/public/hub.py | 5 ++-- src/omnipy/config/data.py | 5 ++-- src/omnipy/config/engine.py | 4 +++- src/omnipy/config/job.py | 9 +++---- src/omnipy/config/root_log.py | 3 ++- src/omnipy/engine/local.py | 9 +------ src/omnipy/hub/log/root_log.py | 9 +------ src/omnipy/hub/runtime.py | 20 +++++++++------- src/omnipy/modules/prefect/engine/prefect.py | 11 ++------- tests/hub/runtime/test_runtime.py | 25 +++++++------------- 12 files changed, 59 insertions(+), 67 deletions(-) diff --git a/src/omnipy/api/protocols/private/util.py b/src/omnipy/api/protocols/private/util.py index 050d39d2..ef31f33b 100644 --- a/src/omnipy/api/protocols/private/util.py +++ b/src/omnipy/api/protocols/private/util.py @@ -12,6 +12,17 @@ _HasContentsT = TypeVar('_HasContentsT', bound='HasContents') +class IsDataPublisher(Protocol): + def subscribe_attr(self, attr_name: str, callback_fun: Callable[..., None]): + ... + + def subscribe(self, callback_fun: Callable[..., None], do_callback: bool = True) -> None: + ... + + def unsubscribe_all(self) -> None: + ... + + @runtime_checkable class IsCallableParamAfterSelf(Protocol): """""" diff --git a/src/omnipy/api/protocols/public/config.py b/src/omnipy/api/protocols/public/config.py index fcb50b7b..a0168bee 100644 --- a/src/omnipy/api/protocols/public/config.py +++ b/src/omnipy/api/protocols/public/config.py @@ -5,10 +5,11 @@ ConfigOutputStorageProtocolOptions, ConfigPersistOutputsOptions, ConfigRestoreOutputsOptions) +from omnipy.api.protocols.private.util import IsDataPublisher from omnipy.api.typedefs import LocaleType -class IsEngineConfig(Protocol): +class IsEngineConfig(IsDataPublisher, Protocol): """""" ... @@ -23,12 +24,12 @@ class IsPrefectEngineConfig(IsEngineConfig, Protocol): use_cached_results: bool = False -class IsJobConfig(Protocol): +class IsJobConfig(IsDataPublisher, Protocol): """""" output_storage: 'IsOutputStorageConfig' -class IsDataConfig(Protocol): +class IsDataConfig(IsDataPublisher, Protocol): """""" interactive_mode: bool dynamically_convert_elements_to_models: bool @@ -38,7 +39,7 @@ class IsDataConfig(Protocol): http_config_for_host: defaultdict[str, 'IsHttpConfig'] -class IsHttpConfig(Protocol): +class IsHttpConfig(IsDataPublisher, Protocol): """""" requests_per_time_period: float time_period_in_secs: float @@ -47,7 +48,7 @@ class IsHttpConfig(Protocol): retry_backoff_strategy: BackoffStrategy -class IsRootLogConfig(Protocol): +class IsRootLogConfig(IsDataPublisher, Protocol): """""" log_format_str: str locale: LocaleType @@ -62,7 +63,7 @@ class IsRootLogConfig(Protocol): file_log_path: str -class IsOutputStorageConfigBase(Protocol): +class IsOutputStorageConfigBase(IsDataPublisher, Protocol): persist_data_dir_path: str @@ -77,7 +78,7 @@ class IsS3OutputStorageConfig(IsOutputStorageConfigBase, Protocol): bucket_name: str -class IsOutputStorageConfig(Protocol): +class IsOutputStorageConfig(IsDataPublisher, Protocol): persist_outputs: ConfigPersistOutputsOptions restore_outputs: ConfigRestoreOutputsOptions protocol: ConfigOutputStorageProtocolOptions diff --git a/src/omnipy/api/protocols/public/hub.py b/src/omnipy/api/protocols/public/hub.py index 182056ef..df3cb427 100644 --- a/src/omnipy/api/protocols/public/hub.py +++ b/src/omnipy/api/protocols/public/hub.py @@ -7,6 +7,7 @@ from omnipy.api.protocols.private.data import IsDataClassCreator from omnipy.api.protocols.private.engine import IsEngine from omnipy.api.protocols.private.log import IsRunStateRegistry +from omnipy.api.protocols.private.util import IsDataPublisher from omnipy.api.protocols.public.config import (IsDataConfig, IsJobConfig, IsLocalRunnerConfig, @@ -30,7 +31,7 @@ def config(self) -> IsRootLogConfig: ... -class IsRuntimeConfig(Protocol): +class IsRuntimeConfig(IsDataPublisher, Protocol): """""" job: IsJobConfig data: IsDataConfig @@ -52,7 +53,7 @@ def reset_to_defaults(self) -> None: ... -class IsRuntimeObjects(Protocol): +class IsRuntimeObjects(IsDataPublisher, Protocol): """""" job_creator: IsJobConfigHolder diff --git a/src/omnipy/config/data.py b/src/omnipy/config/data.py index 7cbd6f84..f4254ae8 100644 --- a/src/omnipy/config/data.py +++ b/src/omnipy/config/data.py @@ -5,12 +5,13 @@ from omnipy.api.enums import BackoffStrategy from omnipy.api.protocols.public.config import IsHttpConfig +from omnipy.util.publisher import DataPublisher _terminal_size = shutil.get_terminal_size() @dataclass -class HttpConfig: +class HttpConfig(DataPublisher): # For RateLimitingClientSession helper class requests_per_time_period: float = 60 time_period_in_secs: float = 60 @@ -22,7 +23,7 @@ class HttpConfig: @dataclass -class DataConfig: +class DataConfig(DataPublisher): interactive_mode: bool = True dynamically_convert_elements_to_models: bool = False terminal_size_columns: int = _terminal_size.columns diff --git a/src/omnipy/config/engine.py b/src/omnipy/config/engine.py index 38092d6f..a3820fd3 100644 --- a/src/omnipy/config/engine.py +++ b/src/omnipy/config/engine.py @@ -1,8 +1,10 @@ from dataclasses import dataclass +from omnipy.util.publisher import DataPublisher + @dataclass -class EngineConfig: +class EngineConfig(DataPublisher): ... diff --git a/src/omnipy/config/job.py b/src/omnipy/config/job.py index 0d6da7bb..6bf39024 100644 --- a/src/omnipy/config/job.py +++ b/src/omnipy/config/job.py @@ -8,6 +8,7 @@ from omnipy.api.protocols.public.config import (IsLocalOutputStorageConfig, IsOutputStorageConfig, IsS3OutputStorageConfig) +from omnipy.util.publisher import DataPublisher def _get_persist_data_dir_path() -> str: @@ -15,12 +16,12 @@ def _get_persist_data_dir_path() -> str: @dataclass -class LocalOutputStorageConfig: +class LocalOutputStorageConfig(DataPublisher): persist_data_dir_path: str = field(default_factory=_get_persist_data_dir_path) @dataclass -class S3OutputStorageConfig: +class S3OutputStorageConfig(DataPublisher): persist_data_dir_path: str = os.path.join('omnipy', 'outputs') endpoint_url: str = '' bucket_name: str = '' @@ -29,7 +30,7 @@ class S3OutputStorageConfig: @dataclass -class OutputStorageConfig: +class OutputStorageConfig(DataPublisher): persist_outputs: ConfigPersistOutputsOptions = \ ConfigPersistOutputsOptions.ENABLE_FLOW_AND_TASK_OUTPUTS restore_outputs: ConfigRestoreOutputsOptions = \ @@ -40,5 +41,5 @@ class OutputStorageConfig: @dataclass -class JobConfig: +class JobConfig(DataPublisher): output_storage: IsOutputStorageConfig = field(default_factory=OutputStorageConfig) diff --git a/src/omnipy/config/root_log.py b/src/omnipy/config/root_log.py index 6e61baaf..636d00a6 100644 --- a/src/omnipy/config/root_log.py +++ b/src/omnipy/config/root_log.py @@ -6,6 +6,7 @@ from typing import TextIO from omnipy.api.typedefs import LocaleType +from omnipy.util.publisher import DataPublisher def _get_log_path() -> str: @@ -13,7 +14,7 @@ def _get_log_path() -> str: @dataclass -class RootLogConfig: +class RootLogConfig(DataPublisher): log_format_str: str = '[{engine}] {asctime} - {levelname}: {message} ({name})' locale: LocaleType = pkg_locale.getlocale() log_to_stdout: bool = True diff --git a/src/omnipy/engine/local.py b/src/omnipy/engine/local.py index 66606f7c..274e409a 100644 --- a/src/omnipy/engine/local.py +++ b/src/omnipy/engine/local.py @@ -1,4 +1,3 @@ -from dataclasses import dataclass from typing import Any, Callable, Type from omnipy.api.protocols.public.compute import IsDagFlow, IsFuncFlow, IsLinearFlow, IsTask @@ -8,12 +7,6 @@ FuncFlowRunnerEngine, LinearFlowRunnerEngine, TaskRunnerEngine) -from omnipy.util.publisher import RuntimeEntryPublisher - - -@dataclass -class LocalRunnerConfigEntryPublisher(LocalRunnerConfig, RuntimeEntryPublisher): - ... class LocalRunner(TaskRunnerEngine, @@ -29,7 +22,7 @@ def _update_from_config(self) -> None: @classmethod def get_config_cls(cls) -> Type[IsLocalRunnerConfig]: - return LocalRunnerConfigEntryPublisher + return LocalRunnerConfig def _init_task(self, task: IsTask, call_func: Callable) -> Any: ... diff --git a/src/omnipy/hub/log/root_log.py b/src/omnipy/hub/log/root_log.py index cb1b2fd0..7deb8bc1 100644 --- a/src/omnipy/hub/log/root_log.py +++ b/src/omnipy/hub/log/root_log.py @@ -8,18 +8,11 @@ from omnipy.config.root_log import RootLogConfig from omnipy.hub.log.handlers import DailyRotatingFileHandler from omnipy.util.helpers import get_datetime_format -from omnipy.util.publisher import RuntimeEntryPublisher - - -@dataclass -class RootLogConfigEntryPublisher(RootLogConfig, RuntimeEntryPublisher): - ... @dataclass class RootLogObjects: - _config: IsRootLogConfig = field( - init=False, repr=False, default_factory=RootLogConfigEntryPublisher) + _config: IsRootLogConfig = field(init=False, repr=False, default_factory=RootLogConfig) formatter: logging.Formatter | None = None stdout_handler: StreamHandler | None = None diff --git a/src/omnipy/hub/runtime.py b/src/omnipy/hub/runtime.py index 03ff2f83..f4ae8a85 100644 --- a/src/omnipy/hub/runtime.py +++ b/src/omnipy/hub/runtime.py @@ -16,13 +16,15 @@ from omnipy.api.protocols.public.hub import IsRootLogObjects, IsRuntimeConfig, IsRuntimeObjects from omnipy.compute.job import JobBase from omnipy.config.data import DataConfig +from omnipy.config.engine import LocalRunnerConfig, PrefectEngineConfig from omnipy.config.job import JobConfig +from omnipy.config.root_log import RootLogConfig from omnipy.data.data_class_creator import DataClassBase from omnipy.data.serializer import SerializerRegistry -from omnipy.engine.local import LocalRunner, LocalRunnerConfigEntryPublisher -from omnipy.hub.log.root_log import RootLogConfigEntryPublisher, RootLogObjects +from omnipy.engine.local import LocalRunner +from omnipy.hub.log.root_log import RootLogObjects from omnipy.hub.registry import RunStateRegistry -from omnipy.modules.prefect.engine.prefect import PrefectEngine, PrefectEngineConfigEntryPublisher +from omnipy.modules.prefect.engine.prefect import PrefectEngine from omnipy.util.helpers import called_from_omnipy_tests from omnipy.util.publisher import DataPublisher, RuntimeEntryPublisher @@ -44,9 +46,9 @@ class RuntimeConfig(RuntimeEntryPublisher): job: IsJobConfig = field(default_factory=JobConfig) data: IsDataConfig = field(default_factory=_data_config_factory) engine: EngineChoice = EngineChoice.LOCAL - local: IsLocalRunnerConfig = field(default_factory=LocalRunnerConfigEntryPublisher) - prefect: IsPrefectEngineConfig = field(default_factory=PrefectEngineConfigEntryPublisher) - root_log: IsRootLogConfig = field(default_factory=RootLogConfigEntryPublisher) + local: IsLocalRunnerConfig = field(default_factory=LocalRunnerConfig) + prefect: IsPrefectEngineConfig = field(default_factory=PrefectEngineConfig) + root_log: IsRootLogConfig = field(default_factory=RootLogConfig) def reset_to_defaults(self) -> None: prev_back = self._back @@ -55,9 +57,9 @@ def reset_to_defaults(self) -> None: self.job = JobConfig() self.data = DataConfig() self.engine = EngineChoice.LOCAL - self.local = LocalRunnerConfigEntryPublisher() - self.prefect = PrefectEngineConfigEntryPublisher() - self.root_log = RootLogConfigEntryPublisher() + self.local = LocalRunnerConfig() + self.prefect = PrefectEngineConfig() + self.root_log = RootLogConfig() self._back = prev_back if self._back is not None: diff --git a/src/omnipy/modules/prefect/engine/prefect.py b/src/omnipy/modules/prefect/engine/prefect.py index 4dbc716b..657756ef 100644 --- a/src/omnipy/modules/prefect/engine/prefect.py +++ b/src/omnipy/modules/prefect/engine/prefect.py @@ -1,4 +1,3 @@ -from dataclasses import dataclass from datetime import timedelta from typing import Any, Callable, Type @@ -10,16 +9,10 @@ LinearFlowRunnerEngine, TaskRunnerEngine) from omnipy.util.helpers import resolve -from omnipy.util.publisher import RuntimeEntryPublisher from .. import prefect_flow, prefect_task, PrefectFlow, PrefectTask, task_input_hash -@dataclass -class PrefectEngineConfigEntryPublisher(PrefectEngineConfig, RuntimeEntryPublisher): - ... - - class PrefectEngine(TaskRunnerEngine, LinearFlowRunnerEngine, DagFlowRunnerEngine, @@ -33,12 +26,12 @@ def _update_from_config(self) -> None: @classmethod def get_config_cls(cls) -> Type[IsPrefectEngineConfig]: - return PrefectEngineConfigEntryPublisher + return PrefectEngineConfig # TaskRunnerEngine def _init_task(self, task: IsTask, call_func: Callable) -> PrefectTask: - assert isinstance(self._config, PrefectEngineConfigEntryPublisher) + assert isinstance(self._config, PrefectEngineConfig) task_kwargs = dict( name=task.name, cache_key_fn=task_input_hash if self._config.use_cached_results else None, diff --git a/tests/hub/runtime/test_runtime.py b/tests/hub/runtime/test_runtime.py index 6032fef4..e848e787 100644 --- a/tests/hub/runtime/test_runtime.py +++ b/tests/hub/runtime/test_runtime.py @@ -18,13 +18,14 @@ from omnipy.config.data import DataConfig from omnipy.config.engine import LocalRunnerConfig, PrefectEngineConfig from omnipy.config.job import JobConfig +from omnipy.config.root_log import RootLogConfig from omnipy.data.data_class_creator import DataClassBase, DataClassCreator from omnipy.data.serializer import SerializerRegistry -from omnipy.engine.local import LocalRunner, LocalRunnerConfigEntryPublisher -from omnipy.hub.log.root_log import RootLogConfigEntryPublisher, RootLogObjects +from omnipy.engine.local import LocalRunner +from omnipy.hub.log.root_log import RootLogObjects from omnipy.hub.registry import RunStateRegistry from omnipy.hub.runtime import RuntimeConfig, RuntimeObjects -from omnipy.modules.prefect.engine.prefect import PrefectEngine, PrefectEngineConfigEntryPublisher +from omnipy.modules.prefect.engine.prefect import PrefectEngine from .helpers.mocks import (MockLocalRunner, MockLocalRunnerConfig, @@ -162,21 +163,21 @@ def test_init_runtime_config_after_job_creator( ), ( ('config', 'local'), - LocalRunnerConfigEntryPublisher, + LocalRunnerConfig, ('objects', 'local'), LocalRunner, 'config', ), ( ('config', 'prefect'), - PrefectEngineConfigEntryPublisher, + PrefectEngineConfig, ('objects', 'prefect'), PrefectEngine, 'config', ), ( ('config', 'root_log'), - RootLogConfigEntryPublisher, + RootLogConfig, ('objects', 'root_log'), RootLogObjects, 'config', @@ -298,16 +299,8 @@ def test_job_creator_subscribes_to_selected_engine( @pc.parametrize( 'engine_name, engine_cls, config_cls, mock_engine_cls, mock_config_class', [ - ('local', - LocalRunner, - LocalRunnerConfigEntryPublisher, - MockLocalRunner, - MockLocalRunnerConfig), - ('prefect', - PrefectEngine, - PrefectEngineConfigEntryPublisher, - MockPrefectEngine, - MockPrefectEngineConfig), + ('local', LocalRunner, LocalRunnerConfig, MockLocalRunner, MockLocalRunnerConfig), + ('prefect', PrefectEngine, PrefectEngineConfig, MockPrefectEngine, MockPrefectEngineConfig), ], ids=['local', 'prefect']) def test_new_engine_object_updates_engine_config_if_needed(