diff --git a/docs/conf.py b/docs/conf.py index d2f9cf8..0b5bbdd 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -56,6 +56,9 @@ nitpicky = True nitpick_ignore = [ + ("py:attr", "sghi.etl.core.WorkflowDefinition.processor_factory"), # docs aren't published yet + ("py:attr", "sghi.etl.core.WorkflowDefinition.sink_factory"), # docs aren't published yet + ("py:attr", "sghi.etl.core.WorkflowDefinition.source_factory"), # docs aren't published yet ("py:class", "_RDT"), # private type annotations ("py:class", "_PDT"), # private type annotations ("py:class", "Executor"), # sphinx can't find it @@ -85,6 +88,7 @@ ("py:exc", "ResourceDisposedError"), # docs aren't published yet ("py:exc", "sghi.disposable.ResourceDisposedError"), # docs aren't published yet ("py:func", "sghi.disposable.not_disposed"), # docs aren't published yet + ("py:meth", "sghi.etl.core.Source.draw"), # docs aren't published yet ("py:obj", "sghi.etl.commons.processors._PDT"), # private type annotations ("py:obj", "sghi.etl.commons.processors._RDT"), # private type annotations ("py:obj", "sghi.etl.commons.sinks._PDT"), # private type annotations diff --git a/src/sghi/etl/commons/__init__.py b/src/sghi/etl/commons/__init__.py index 63e5c00..e94d29a 100644 --- a/src/sghi/etl/commons/__init__.py +++ b/src/sghi/etl/commons/__init__.py @@ -10,7 +10,7 @@ ) from .sinks import NullSink, ScatterSink, SplitSink, sink from .sources import GatherSource, source -from .utils import fail_fast, fail_fast_factory, ignored_failed +from .utils import fail_fast, fail_fast_factory, ignored_failed, run_workflow from .workflow_definitions import SimpleWorkflowDefinition __all__ = [ @@ -28,6 +28,7 @@ "ignored_failed", "pipe_processors", "processor", + "run_workflow", "sink", "source", ] diff --git a/src/sghi/etl/commons/utils/__init__.py b/src/sghi/etl/commons/utils/__init__.py index ec2650c..0ab0c14 100644 --- a/src/sghi/etl/commons/utils/__init__.py +++ b/src/sghi/etl/commons/utils/__init__.py @@ -1,9 +1,11 @@ """Common utilities.""" +from .others import run_workflow from .result_gatherers import fail_fast, fail_fast_factory, ignored_failed __all__ = [ "fail_fast", "fail_fast_factory", "ignored_failed", + "run_workflow", ] diff --git a/src/sghi/etl/commons/utils/others.py b/src/sghi/etl/commons/utils/others.py new file mode 100644 index 0000000..2241f98 --- /dev/null +++ b/src/sghi/etl/commons/utils/others.py @@ -0,0 +1,108 @@ +"""Other useful utilities.""" + +from __future__ import annotations + +import logging +from logging import Logger +from typing import TYPE_CHECKING, Final, TypeVar + +from sghi.utils import ensure_callable + +if TYPE_CHECKING: + from collections.abc import Callable + + from sghi.etl.core import WorkflowDefinition + +# ============================================================================= +# TYPES +# ============================================================================= + + +_PDT = TypeVar("_PDT") +"""Type variable representing the data type after processing.""" + +_RDT = TypeVar("_RDT") +"""Type variable representing the raw data type.""" + + +# ============================================================================= +# CONSTANTS +# ============================================================================= + + +_LOGGER: Final[Logger] = logging.getLogger(name=__name__) + + +# ============================================================================= +# UTILITIES +# ============================================================================= + + +def run_workflow(wf: Callable[[], WorkflowDefinition[_RDT, _PDT]]) -> None: + """Execute an ETL :class:`Workflow`. + + .. tip:: + + In the context of this function, **"ETL Workflow"** or the shorter + version **"Workflow”**, refers to an instance of the + :class:`WorkflowDefinition` class that is being executed or about to + be executed. + + This function accepts a factory function that supplies an ETL + ``WorkflowDefinition`` instance, it then invokes the function to get the + ``WorkflowDefinition``/workflow and then executes it. The execution of the + workflow proceeds as follows: + + - The callable returned by the + :attr:`~sghi.etl.core.WorkflowDefinition.source_factory` property of + the supplied ``WorkflowDefinition`` is used to get the + :class:`~sghi.etl.core.Source` associated with the workflow. The + :meth:`~sghi.etl.core.Source.draw` method of this ``Source`` is then + invoked to get the raw data to process. + - The callable returned by the + :attr:`~sghi.etl.core.WorkflowDefinition.processor_factory` property + of the supplied ``WorkflowDefinition`` is invoked to get the + :class:`~sghi.etl.core.Processor` associated with the workflow. This + ``Processor`` is then applied to the raw data retrieved from the + ``Source`` in the previous step to obtain processed data. + - The callable returned by the + :attr:`~sghi.etl.core.WorkflowDefinition.sink_factory` property of + the supplied ``WorkflowDefinition`` is invoked to get the + :class:`~sghi.etl.core.Sink` associated with the workflow. The + processed data from the previous step is drained into this ``Sink``. + - The ``Source``, ``Processor`` and ``Sink`` created in the previous + steps are disposed of. Note that this disposal also happens if an + error occurs during the workflow execution. + + .. note:: + + The above is a general description of how the workflow execution + occurs. The actual implementation may vary slightly from this + description. + + If an exception is raised during the workflow execution, all the workflow's + components (source, processor, sink) are disposed of followed by the + propagation of the error to the caller. If the supplied value **IS NOT** a + valid callable object, a :exc:`ValueError` is raised. + + :param wf: A factory function that supplies the ``WorkflowDefinition`` + instance to be executed. This function is only invoked once. The given + value *MUST* be a valid callable object, and it *MUST NOT* have any + required arguments. + + :return: None. + + :raise ValueError: If ``wf`` is NOT a callable object. + """ + ensure_callable(wf, message="'wf' MUST be a valid callable object.") + + wd: WorkflowDefinition = wf() + _LOGGER.info("[%s:%s] Starting workflow execution ...", wd.id, wd.name) + with ( + wd.source_factory() as source, + wd.processor_factory() as processor, + wd.sink_factory() as sink, + ): + sink.drain(processor.apply(source.draw())) + + _LOGGER.info("[%s:%s] Workflow execution complete.", wd.id, wd.name) diff --git a/test/sghi/etl/commons_tests/utils_tests/others_tests.py b/test/sghi/etl/commons_tests/utils_tests/others_tests.py new file mode 100644 index 0000000..57f2120 --- /dev/null +++ b/test/sghi/etl/commons_tests/utils_tests/others_tests.py @@ -0,0 +1,131 @@ +# ruff: noqa: D205 +"""Tests for the ``sghi.etl.commons.utils.others`` module.""" + +from __future__ import annotations + +from typing import TYPE_CHECKING + +import pytest + +from sghi.etl.commons import ( + NOOPProcessor, + NullSink, + ProcessorPipe, + SimpleWorkflowDefinition, + processor, + run_workflow, + sink, + source, +) + +if TYPE_CHECKING: + from collections.abc import Callable, Iterable, MutableSequence + + from sghi.etl.core import WorkflowDefinition + +# ============================================================================= +# HELPERS +# ============================================================================= + + +def _workflow_factory_generator( + repository: MutableSequence[str], + start: int = 0, + stop: int = 5, + step: int = 1, +) -> Callable[[], WorkflowDefinition[Iterable[int], Iterable[str]]]: + @source + def supply_ints() -> Iterable[int]: + yield from range(start, stop, step) + + @processor + def add_100(values: Iterable[int]) -> Iterable[int]: + for v in values: + yield v + 100 + + @processor + def ints_as_strings(ints: Iterable[int]) -> Iterable[str]: + yield from map(str, ints) + + @sink + def save_strings_to_repo(strings: Iterable[str]) -> None: + repository.extend(strings) + + def _create_workflow() -> WorkflowDefinition[Iterable[int], Iterable[str]]: + return SimpleWorkflowDefinition( + id="test_workflow", + name="Test Workflow", + source_factory=lambda: supply_ints, + processor_factory=lambda: ProcessorPipe( + [add_100, ints_as_strings], + ), + sink_factory=lambda: save_strings_to_repo, + ) + + return _create_workflow + + +# ============================================================================= +# TEST CASES +# ============================================================================= + + +def test_run_workflow_fails_on_non_callable_input() -> None: + """:func:`sghi.etl.commons.utils.run_workflow` should raise a + :exc:`ValueError` when given a non-callable input value. + """ + wf = _workflow_factory_generator([]) + for non_callable in (None, wf()): + with pytest.raises(ValueError, match="callable object.") as exp_info: + run_workflow(wf=non_callable) # type: ignore + + assert ( + exp_info.value.args[0] == "'wf' MUST be a valid callable object." + ) + + +def test_run_workflow_side_effects_on_failed_execution() -> None: + """:func:`sghi.etl.commons.utils.run_workflow` should dispose all the + workflow components (source, processor and sink) if an error occurs during + execution. + """ + + @source + def failing_source() -> str: + _err_msg: str = "Oops, something failed." + raise RuntimeError(_err_msg) + + _processor = NOOPProcessor() + _sink = NullSink() + + def create_failing_workflow() -> WorkflowDefinition[str, str]: + return SimpleWorkflowDefinition( + id="failing_workflow", + name="Failing Workflow", + source_factory=lambda: failing_source, + processor_factory=lambda: _processor, + sink_factory=lambda: _sink, + ) + + with pytest.raises(RuntimeError, match="Oops, something failed."): + run_workflow(wf=create_failing_workflow) + + assert failing_source.is_disposed + assert _processor.is_disposed + assert _sink.is_disposed + + +def test_run_workflow_side_effects_on_successful_execution() -> None: + """func:`sghi.etl.commons.utils.run_workflow` should execute an ETL + Workflow when given a factory function that returns the workflow. + """ + repository1: list[str] = [] + repository2: list[str] = [] + wf1 = _workflow_factory_generator(repository1) + wf2 = _workflow_factory_generator(repository2, 10, 60, 10) + + run_workflow(wf1) + run_workflow(wf2) + + assert repository1 == ["100", "101", "102", "103", "104"] + assert repository2 == ["110", "120", "130", "140", "150"]