Skip to content

Commit

Permalink
Make sure all config classes inherit from DataPublisher
Browse files Browse the repository at this point in the history
  • Loading branch information
sveinugu committed Dec 9, 2024
1 parent 02d91d3 commit 73efb7d
Show file tree
Hide file tree
Showing 12 changed files with 59 additions and 67 deletions.
11 changes: 11 additions & 0 deletions src/omnipy/api/protocols/private/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,17 @@
_HasContentsT = TypeVar('_HasContentsT', bound='HasContents')


class IsDataPublisher(Protocol):
def subscribe_attr(self, attr_name: str, callback_fun: Callable[..., None]):
...

def subscribe(self, callback_fun: Callable[..., None], do_callback: bool = True) -> None:
...

def unsubscribe_all(self) -> None:
...


@runtime_checkable
class IsCallableParamAfterSelf(Protocol):
""""""
Expand Down
15 changes: 8 additions & 7 deletions src/omnipy/api/protocols/public/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@
ConfigOutputStorageProtocolOptions,
ConfigPersistOutputsOptions,
ConfigRestoreOutputsOptions)
from omnipy.api.protocols.private.util import IsDataPublisher
from omnipy.api.typedefs import LocaleType


class IsEngineConfig(Protocol):
class IsEngineConfig(IsDataPublisher, Protocol):
""""""
...

Expand All @@ -23,12 +24,12 @@ class IsPrefectEngineConfig(IsEngineConfig, Protocol):
use_cached_results: bool = False


class IsJobConfig(Protocol):
class IsJobConfig(IsDataPublisher, Protocol):
""""""
output_storage: 'IsOutputStorageConfig'


class IsDataConfig(Protocol):
class IsDataConfig(IsDataPublisher, Protocol):
""""""
interactive_mode: bool
dynamically_convert_elements_to_models: bool
Expand All @@ -38,7 +39,7 @@ class IsDataConfig(Protocol):
http_config_for_host: defaultdict[str, 'IsHttpConfig']


class IsHttpConfig(Protocol):
class IsHttpConfig(IsDataPublisher, Protocol):
""""""
requests_per_time_period: float
time_period_in_secs: float
Expand All @@ -47,7 +48,7 @@ class IsHttpConfig(Protocol):
retry_backoff_strategy: BackoffStrategy


class IsRootLogConfig(Protocol):
class IsRootLogConfig(IsDataPublisher, Protocol):
""""""
log_format_str: str
locale: LocaleType
Expand All @@ -62,7 +63,7 @@ class IsRootLogConfig(Protocol):
file_log_path: str


class IsOutputStorageConfigBase(Protocol):
class IsOutputStorageConfigBase(IsDataPublisher, Protocol):
persist_data_dir_path: str


Expand All @@ -77,7 +78,7 @@ class IsS3OutputStorageConfig(IsOutputStorageConfigBase, Protocol):
bucket_name: str


class IsOutputStorageConfig(Protocol):
class IsOutputStorageConfig(IsDataPublisher, Protocol):
persist_outputs: ConfigPersistOutputsOptions
restore_outputs: ConfigRestoreOutputsOptions
protocol: ConfigOutputStorageProtocolOptions
Expand Down
5 changes: 3 additions & 2 deletions src/omnipy/api/protocols/public/hub.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from omnipy.api.protocols.private.data import IsDataClassCreator
from omnipy.api.protocols.private.engine import IsEngine
from omnipy.api.protocols.private.log import IsRunStateRegistry
from omnipy.api.protocols.private.util import IsDataPublisher
from omnipy.api.protocols.public.config import (IsDataConfig,
IsJobConfig,
IsLocalRunnerConfig,
Expand All @@ -30,7 +31,7 @@ def config(self) -> IsRootLogConfig:
...


class IsRuntimeConfig(Protocol):
class IsRuntimeConfig(IsDataPublisher, Protocol):
""""""
job: IsJobConfig
data: IsDataConfig
Expand All @@ -52,7 +53,7 @@ def reset_to_defaults(self) -> None:
...


class IsRuntimeObjects(Protocol):
class IsRuntimeObjects(IsDataPublisher, Protocol):
""""""

job_creator: IsJobConfigHolder
Expand Down
5 changes: 3 additions & 2 deletions src/omnipy/config/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@

from omnipy.api.enums import BackoffStrategy
from omnipy.api.protocols.public.config import IsHttpConfig
from omnipy.util.publisher import DataPublisher

_terminal_size = shutil.get_terminal_size()


@dataclass
class HttpConfig:
class HttpConfig(DataPublisher):
# For RateLimitingClientSession helper class
requests_per_time_period: float = 60
time_period_in_secs: float = 60
Expand All @@ -22,7 +23,7 @@ class HttpConfig:


@dataclass
class DataConfig:
class DataConfig(DataPublisher):
interactive_mode: bool = True
dynamically_convert_elements_to_models: bool = False
terminal_size_columns: int = _terminal_size.columns
Expand Down
4 changes: 3 additions & 1 deletion src/omnipy/config/engine.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
from dataclasses import dataclass

from omnipy.util.publisher import DataPublisher


@dataclass
class EngineConfig:
class EngineConfig(DataPublisher):
...


Expand Down
9 changes: 5 additions & 4 deletions src/omnipy/config/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,20 @@
from omnipy.api.protocols.public.config import (IsLocalOutputStorageConfig,
IsOutputStorageConfig,
IsS3OutputStorageConfig)
from omnipy.util.publisher import DataPublisher


def _get_persist_data_dir_path() -> str:
return str(Path.cwd().joinpath(Path('outputs')))


@dataclass
class LocalOutputStorageConfig:
class LocalOutputStorageConfig(DataPublisher):
persist_data_dir_path: str = field(default_factory=_get_persist_data_dir_path)


@dataclass
class S3OutputStorageConfig:
class S3OutputStorageConfig(DataPublisher):
persist_data_dir_path: str = os.path.join('omnipy', 'outputs')
endpoint_url: str = ''
bucket_name: str = ''
Expand All @@ -29,7 +30,7 @@ class S3OutputStorageConfig:


@dataclass
class OutputStorageConfig:
class OutputStorageConfig(DataPublisher):
persist_outputs: ConfigPersistOutputsOptions = \
ConfigPersistOutputsOptions.ENABLE_FLOW_AND_TASK_OUTPUTS
restore_outputs: ConfigRestoreOutputsOptions = \
Expand All @@ -40,5 +41,5 @@ class OutputStorageConfig:


@dataclass
class JobConfig:
class JobConfig(DataPublisher):
output_storage: IsOutputStorageConfig = field(default_factory=OutputStorageConfig)
3 changes: 2 additions & 1 deletion src/omnipy/config/root_log.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,15 @@
from typing import TextIO

from omnipy.api.typedefs import LocaleType
from omnipy.util.publisher import DataPublisher


def _get_log_path() -> str:
return str(Path.cwd() / 'logs' / 'omnipy.log')


@dataclass
class RootLogConfig:
class RootLogConfig(DataPublisher):
log_format_str: str = '[{engine}] {asctime} - {levelname}: {message} ({name})'
locale: LocaleType = pkg_locale.getlocale()
log_to_stdout: bool = True
Expand Down
9 changes: 1 addition & 8 deletions src/omnipy/engine/local.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
from dataclasses import dataclass
from typing import Any, Callable, Type

from omnipy.api.protocols.public.compute import IsDagFlow, IsFuncFlow, IsLinearFlow, IsTask
Expand All @@ -8,12 +7,6 @@
FuncFlowRunnerEngine,
LinearFlowRunnerEngine,
TaskRunnerEngine)
from omnipy.util.publisher import RuntimeEntryPublisher


@dataclass
class LocalRunnerConfigEntryPublisher(LocalRunnerConfig, RuntimeEntryPublisher):
...


class LocalRunner(TaskRunnerEngine,
Expand All @@ -29,7 +22,7 @@ def _update_from_config(self) -> None:

@classmethod
def get_config_cls(cls) -> Type[IsLocalRunnerConfig]:
return LocalRunnerConfigEntryPublisher
return LocalRunnerConfig

def _init_task(self, task: IsTask, call_func: Callable) -> Any:
...
Expand Down
9 changes: 1 addition & 8 deletions src/omnipy/hub/log/root_log.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,11 @@
from omnipy.config.root_log import RootLogConfig
from omnipy.hub.log.handlers import DailyRotatingFileHandler
from omnipy.util.helpers import get_datetime_format
from omnipy.util.publisher import RuntimeEntryPublisher


@dataclass
class RootLogConfigEntryPublisher(RootLogConfig, RuntimeEntryPublisher):
...


@dataclass
class RootLogObjects:
_config: IsRootLogConfig = field(
init=False, repr=False, default_factory=RootLogConfigEntryPublisher)
_config: IsRootLogConfig = field(init=False, repr=False, default_factory=RootLogConfig)

formatter: logging.Formatter | None = None
stdout_handler: StreamHandler | None = None
Expand Down
20 changes: 11 additions & 9 deletions src/omnipy/hub/runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@
from omnipy.api.protocols.public.hub import IsRootLogObjects, IsRuntimeConfig, IsRuntimeObjects
from omnipy.compute.job import JobBase
from omnipy.config.data import DataConfig
from omnipy.config.engine import LocalRunnerConfig, PrefectEngineConfig
from omnipy.config.job import JobConfig
from omnipy.config.root_log import RootLogConfig
from omnipy.data.data_class_creator import DataClassBase
from omnipy.data.serializer import SerializerRegistry
from omnipy.engine.local import LocalRunner, LocalRunnerConfigEntryPublisher
from omnipy.hub.log.root_log import RootLogConfigEntryPublisher, RootLogObjects
from omnipy.engine.local import LocalRunner
from omnipy.hub.log.root_log import RootLogObjects
from omnipy.hub.registry import RunStateRegistry
from omnipy.modules.prefect.engine.prefect import PrefectEngine, PrefectEngineConfigEntryPublisher
from omnipy.modules.prefect.engine.prefect import PrefectEngine
from omnipy.util.helpers import called_from_omnipy_tests
from omnipy.util.publisher import DataPublisher, RuntimeEntryPublisher

Expand All @@ -44,9 +46,9 @@ class RuntimeConfig(RuntimeEntryPublisher):
job: IsJobConfig = field(default_factory=JobConfig)
data: IsDataConfig = field(default_factory=_data_config_factory)
engine: EngineChoice = EngineChoice.LOCAL
local: IsLocalRunnerConfig = field(default_factory=LocalRunnerConfigEntryPublisher)
prefect: IsPrefectEngineConfig = field(default_factory=PrefectEngineConfigEntryPublisher)
root_log: IsRootLogConfig = field(default_factory=RootLogConfigEntryPublisher)
local: IsLocalRunnerConfig = field(default_factory=LocalRunnerConfig)
prefect: IsPrefectEngineConfig = field(default_factory=PrefectEngineConfig)
root_log: IsRootLogConfig = field(default_factory=RootLogConfig)

def reset_to_defaults(self) -> None:
prev_back = self._back
Expand All @@ -55,9 +57,9 @@ def reset_to_defaults(self) -> None:
self.job = JobConfig()
self.data = DataConfig()
self.engine = EngineChoice.LOCAL
self.local = LocalRunnerConfigEntryPublisher()
self.prefect = PrefectEngineConfigEntryPublisher()
self.root_log = RootLogConfigEntryPublisher()
self.local = LocalRunnerConfig()
self.prefect = PrefectEngineConfig()
self.root_log = RootLogConfig()

self._back = prev_back
if self._back is not None:
Expand Down
11 changes: 2 additions & 9 deletions src/omnipy/modules/prefect/engine/prefect.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
from dataclasses import dataclass
from datetime import timedelta
from typing import Any, Callable, Type

Expand All @@ -10,16 +9,10 @@
LinearFlowRunnerEngine,
TaskRunnerEngine)
from omnipy.util.helpers import resolve
from omnipy.util.publisher import RuntimeEntryPublisher

from .. import prefect_flow, prefect_task, PrefectFlow, PrefectTask, task_input_hash


@dataclass
class PrefectEngineConfigEntryPublisher(PrefectEngineConfig, RuntimeEntryPublisher):
...


class PrefectEngine(TaskRunnerEngine,
LinearFlowRunnerEngine,
DagFlowRunnerEngine,
Expand All @@ -33,12 +26,12 @@ def _update_from_config(self) -> None:

@classmethod
def get_config_cls(cls) -> Type[IsPrefectEngineConfig]:
return PrefectEngineConfigEntryPublisher
return PrefectEngineConfig

# TaskRunnerEngine

def _init_task(self, task: IsTask, call_func: Callable) -> PrefectTask:
assert isinstance(self._config, PrefectEngineConfigEntryPublisher)
assert isinstance(self._config, PrefectEngineConfig)
task_kwargs = dict(
name=task.name,
cache_key_fn=task_input_hash if self._config.use_cached_results else None,
Expand Down
25 changes: 9 additions & 16 deletions tests/hub/runtime/test_runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,14 @@
from omnipy.config.data import DataConfig
from omnipy.config.engine import LocalRunnerConfig, PrefectEngineConfig
from omnipy.config.job import JobConfig
from omnipy.config.root_log import RootLogConfig
from omnipy.data.data_class_creator import DataClassBase, DataClassCreator
from omnipy.data.serializer import SerializerRegistry
from omnipy.engine.local import LocalRunner, LocalRunnerConfigEntryPublisher
from omnipy.hub.log.root_log import RootLogConfigEntryPublisher, RootLogObjects
from omnipy.engine.local import LocalRunner
from omnipy.hub.log.root_log import RootLogObjects
from omnipy.hub.registry import RunStateRegistry
from omnipy.hub.runtime import RuntimeConfig, RuntimeObjects
from omnipy.modules.prefect.engine.prefect import PrefectEngine, PrefectEngineConfigEntryPublisher
from omnipy.modules.prefect.engine.prefect import PrefectEngine

from .helpers.mocks import (MockLocalRunner,
MockLocalRunnerConfig,
Expand Down Expand Up @@ -162,21 +163,21 @@ def test_init_runtime_config_after_job_creator(
),
(
('config', 'local'),
LocalRunnerConfigEntryPublisher,
LocalRunnerConfig,
('objects', 'local'),
LocalRunner,
'config',
),
(
('config', 'prefect'),
PrefectEngineConfigEntryPublisher,
PrefectEngineConfig,
('objects', 'prefect'),
PrefectEngine,
'config',
),
(
('config', 'root_log'),
RootLogConfigEntryPublisher,
RootLogConfig,
('objects', 'root_log'),
RootLogObjects,
'config',
Expand Down Expand Up @@ -298,16 +299,8 @@ def test_job_creator_subscribes_to_selected_engine(
@pc.parametrize(
'engine_name, engine_cls, config_cls, mock_engine_cls, mock_config_class',
[
('local',
LocalRunner,
LocalRunnerConfigEntryPublisher,
MockLocalRunner,
MockLocalRunnerConfig),
('prefect',
PrefectEngine,
PrefectEngineConfigEntryPublisher,
MockPrefectEngine,
MockPrefectEngineConfig),
('local', LocalRunner, LocalRunnerConfig, MockLocalRunner, MockLocalRunnerConfig),
('prefect', PrefectEngine, PrefectEngineConfig, MockPrefectEngine, MockPrefectEngineConfig),
],
ids=['local', 'prefect'])
def test_new_engine_object_updates_engine_config_if_needed(
Expand Down

0 comments on commit 73efb7d

Please sign in to comment.