Skip to content

Commit

Permalink
feat(utils): add a workflow runner utility
Browse files Browse the repository at this point in the history
  • Loading branch information
kennedykori committed Jun 2, 2024
1 parent 5d9b9bb commit 9641f5f
Show file tree
Hide file tree
Showing 5 changed files with 244 additions and 1 deletion.
4 changes: 4 additions & 0 deletions docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@
nitpicky = True

nitpick_ignore = [
("py:attr", "sghi.etl.core.WorkflowDefinition.processor_factory"), # docs aren't published yet
("py:attr", "sghi.etl.core.WorkflowDefinition.sink_factory"), # docs aren't published yet
("py:attr", "sghi.etl.core.WorkflowDefinition.source_factory"), # docs aren't published yet
("py:class", "_RDT"), # private type annotations
("py:class", "_PDT"), # private type annotations
("py:class", "Executor"), # sphinx can't find it
Expand Down Expand Up @@ -85,6 +88,7 @@
("py:exc", "ResourceDisposedError"), # docs aren't published yet
("py:exc", "sghi.disposable.ResourceDisposedError"), # docs aren't published yet
("py:func", "sghi.disposable.not_disposed"), # docs aren't published yet
("py:meth", "sghi.etl.core.Source.draw"), # docs aren't published yet
("py:obj", "sghi.etl.commons.processors._PDT"), # private type annotations
("py:obj", "sghi.etl.commons.processors._RDT"), # private type annotations
("py:obj", "sghi.etl.commons.sinks._PDT"), # private type annotations
Expand Down
3 changes: 2 additions & 1 deletion src/sghi/etl/commons/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
)
from .sinks import NullSink, ScatterSink, SplitSink, sink
from .sources import GatherSource, source
from .utils import fail_fast, fail_fast_factory, ignored_failed
from .utils import fail_fast, fail_fast_factory, ignored_failed, run_workflow
from .workflow_definitions import SimpleWorkflowDefinition

__all__ = [
Expand All @@ -28,6 +28,7 @@
"ignored_failed",
"pipe_processors",
"processor",
"run_workflow",
"sink",
"source",
]
2 changes: 2 additions & 0 deletions src/sghi/etl/commons/utils/__init__.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
"""Common utilities."""

from .others import run_workflow
from .result_gatherers import fail_fast, fail_fast_factory, ignored_failed

__all__ = [
"fail_fast",
"fail_fast_factory",
"ignored_failed",
"run_workflow",
]
105 changes: 105 additions & 0 deletions src/sghi/etl/commons/utils/others.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
"""Other useful utilities."""

from __future__ import annotations

import logging
from logging import Logger
from typing import TYPE_CHECKING, Final, TypeVar

from sghi.utils import ensure_callable

if TYPE_CHECKING:
from collections.abc import Callable

from sghi.etl.core import WorkflowDefinition

# =============================================================================
# TYPES
# =============================================================================


_PDT = TypeVar("_PDT")
"""Type variable representing the data type after processing."""

_RDT = TypeVar("_RDT")
"""Type variable representing the raw data type."""


# =============================================================================
# CONSTANTS
# =============================================================================


_WORKFLOW_RUNNER_LOGGER: Final[Logger] = logging.getLogger(
name=f"{__name__}.run_workflow",
)


# =============================================================================
# UTILITIES
# =============================================================================


def run_workflow(wf: Callable[[], WorkflowDefinition[_RDT, _PDT]]) -> None:
"""Execute an ETL :class:`Workflow<WorkflowDefinition>`.
This function accepts a factory function that supplies an ETL
``WorkflowDefinition`` instance, it then invokes the function to get the
``WorkflowDefinition`` and then executes it. The execution of the
``WorkflowDefinition`` proceeds as follows:
- The callable returned by the
:attr:`~sghi.etl.core.WorkflowDefinition.source_factory` property of
the supplied ``WorkflowDefinition`` is used to get the
:class:`~sghi.etl.core.Source` associated with the workflow. The
:meth:`~sghi.etl.core.Source.draw` method of this ``Source`` is then
invoked to get the raw data to process.
- The callable returned by the
:attr:`~sghi.etl.core.WorkflowDefinition.processor_factory` property
of the supplied ``WorkflowDefinition`` is invoked to get the
:class:`~sghi.etl.core.Processor` associated with the workflow. This
``Processor`` is then applied to the raw data retrieved from the
``Source`` in the previous step to obtain processed data.
- The callable returned by the
:attr:`~sghi.etl.core.WorkflowDefinition.sink_factory` property of
the supplied ``WorkflowDefinition`` is invoked to get the
:class:`~sghi.etl.core.Sink` associated with the workflow. The
processed data from the previous step is drained into this ``Sink``.
- The ``Source``, ``Processor`` and ``Sink`` created in the previous
steps are disposed of. Note that this disposal also happens if an
error occurs during the workflow execution.
.. note::
The above is a general description of how the workflow execution
occurs. The actual implementation may vary slightly from this
description.
If the supplied value **IS NOT** a valid callable object, a
:exc:`ValueError` is raised.
:param wf: A factory function that supplies the ``WorkflowDefinition``
instance to be executed. This function is only invoked once. The given
value *MUST* be a valid callable object, and it *MUST NOT* have any
required arguments.
:return: None.
:raise ValueError: If ``wf`` is NOT a callable object.
"""
ensure_callable(wf, message="'wf' MUST be a valid callable object.")

wd: WorkflowDefinition = wf()
_WORKFLOW_RUNNER_LOGGER.info(
"[%s:%s] Starting workflow execution ...", wd.id, wd.name
)
with (
wd.source_factory() as source,
wd.processor_factory() as processor,
wd.sink_factory() as sink,
):
sink.drain(processor.apply(source.draw()))

_WORKFLOW_RUNNER_LOGGER.info(
"[%s:%s] Workflow execution complete.", wd.id, wd.name
)
131 changes: 131 additions & 0 deletions test/sghi/etl/commons_tests/utils_tests/others_tests.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
# ruff: noqa: D205
"""Tests for the ``sghi.etl.commons.utils.others`` module."""

from __future__ import annotations

from typing import TYPE_CHECKING

import pytest

from sghi.etl.commons import (
NOOPProcessor,
NullSink,
ProcessorPipe,
SimpleWorkflowDefinition,
processor,
run_workflow,
sink,
source,
)

if TYPE_CHECKING:
from collections.abc import Callable, Iterable, MutableSequence

from sghi.etl.core import WorkflowDefinition

# =============================================================================
# HELPERS
# =============================================================================


def _workflow_factory_generator(
repository: MutableSequence[str],
start: int = 0,
stop: int = 5,
step: int = 1,
) -> Callable[[], WorkflowDefinition[Iterable[int], Iterable[str]]]:
@source
def supply_ints() -> Iterable[int]:
yield from range(start, stop, step)

@processor
def add_100(values: Iterable[int]) -> Iterable[int]:
for v in values:
yield v + 100

@processor
def ints_as_strings(ints: Iterable[int]) -> Iterable[str]:
yield from map(str, ints)

@sink
def save_strings_to_repo(strings: Iterable[str]) -> None:
repository.extend(strings)

def _create_workflow() -> WorkflowDefinition[Iterable[int], Iterable[str]]:
return SimpleWorkflowDefinition(
id="test_workflow",
name="Test Workflow",
source_factory=lambda: supply_ints,
processor_factory=lambda: ProcessorPipe(
[add_100, ints_as_strings],
),
sink_factory=lambda: save_strings_to_repo,
)

return _create_workflow


# =============================================================================
# TEST CASES
# =============================================================================


def test_run_workflow_fails_on_non_callable_input() -> None:
""":func:`sghi.etl.commons.utils.run_workflow` should raise a
:exc:`ValueError` when given a non-callable input value.
"""
wf = _workflow_factory_generator([])
for non_callable in (None, wf()):
with pytest.raises(ValueError, match="callable object.") as exp_info:
run_workflow(wf=non_callable) # type: ignore

assert (
exp_info.value.args[0] == "'wf' MUST be a valid callable object."
)


def test_run_workflow_side_effects_on_failed_execution() -> None:
""":func:`sghi.etl.commons.utils.run_workflow` should dispose all the
workflow components (source, processor and sink) if an error occurs during
execution.
"""

@source
def failing_source() -> str:
_err_msg: str = "Oops, something failed."
raise RuntimeError(_err_msg)

_processor = NOOPProcessor()
_sink = NullSink()

def create_failing_workflow() -> WorkflowDefinition[str, str]:
return SimpleWorkflowDefinition(
id="failing_workflow",
name="Failing Workflow",
source_factory=lambda: failing_source,
processor_factory=lambda: _processor,
sink_factory=lambda: _sink,
)

with pytest.raises(RuntimeError, match="Oops, something failed."):
run_workflow(wf=create_failing_workflow)

assert failing_source.is_disposed
assert _processor.is_disposed
assert _sink.is_disposed


def test_run_workflow_side_effects_on_successful_execution() -> None:
"""func:`sghi.etl.commons.utils.run_workflow` should execute an ETL
Workflow when given a factory function that returns the workflow.
"""
repository1: list[str] = []
repository2: list[str] = []
wf1 = _workflow_factory_generator(repository1)
wf2 = _workflow_factory_generator(repository2, 10, 60, 10)

run_workflow(wf1)
run_workflow(wf2)

assert repository1 == ["100", "101", "102", "103", "104"]
assert repository2 == ["110", "120", "130", "140", "150"]

0 comments on commit 9641f5f

Please sign in to comment.