Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sinks): add a sink decorator #7

Merged
merged 1 commit into from
Apr 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,22 +57,26 @@

nitpick_ignore = [
("py:class", "Processor"), # docs aren't published yet
("py:class", "Sink"), # docs aren't published yet
("py:class", "Source"), # docs aren't published yet
("py:class", "TracebackType"), # Used as type annotation. Only available when type checking
("py:class", "concurrent.futures._base.Future"), # sphinx can't find it
("py:class", "sghi.etl.commons.processors._RDT"), # private type annotations
("py:class", "sghi.etl.commons.processors._PDT"), # private type annotations
("py:class", "sghi.etl.commons.sinks._PDT"), # private type annotations
("py:class", "sghi.etl.commons.sources._RDT"), # private type annotations
("py:class", "sghi.etl.commons.utils.result_gatherers._T"), # private type annotations
("py:class", "sghi.etl.core._RDT"), # private type annotations
("py:class", "sghi.etl.core._PDT"), # private type annotations
("py:class", "sghi.etl.core.Processor"), # docs aren't published yet
("py:class", "sghi.etl.core.Sink"), # docs aren't published yet
("py:class", "sghi.etl.core.Source"), # docs aren't published yet
("py:exc", "ResourceDisposedError"), # docs aren't published yet
("py:exc", "sghi.disposable.ResourceDisposedError"), # docs aren't published yet
("py:func", "sghi.disposable.not_disposed"), # docs aren't published yet
("py:obj", "sghi.etl.commons.processors._PDT"), # private type annotations
("py:obj", "sghi.etl.commons.processors._RDT"), # private type annotations
("py:obj", "sghi.etl.commons.sinks._PDT"), # private type annotations
("py:obj", "sghi.etl.commons.sources._RDT"), # private type annotations
]

Expand Down
2 changes: 2 additions & 0 deletions src/sghi/etl/commons/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""Collection of utilities for working with SGHI ETL Workflows."""

from .processors import NOOPProcessor, processor
from .sinks import sink
from .sources import source
from .utils import fail_fast, fail_fast_factory, ignored_failed

Expand All @@ -10,5 +11,6 @@
"fail_fast_factory",
"ignored_failed",
"processor",
"sink",
"source",
]
156 changes: 156 additions & 0 deletions src/sghi/etl/commons/sinks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
"""Common :class:`~sghi.etl.core.Sink` implementations."""

from __future__ import annotations

import logging
from collections.abc import Callable
from functools import update_wrapper
from logging import Logger
from typing import Final, Generic, Self, TypeVar, final

from typing_extensions import override

from sghi.disposable import not_disposed
from sghi.etl.core import Sink
from sghi.utils import ensure_callable, type_fqn

# =============================================================================
# TYPES
# =============================================================================


_PDT = TypeVar("_PDT")
""""Type variable representing the data type after processing."""

_SinkCallable = Callable[[_PDT], None]


# =============================================================================
# TYPES
# =============================================================================


_OF_CALLABLE_LOGGER_PREFIX: Final[str] = f"{__name__}.@sink"


# =============================================================================
# DECORATORS
# =============================================================================


def sink(f: Callable[[_PDT], None]) -> Sink[_PDT]:
"""Mark/decorate a ``Callable`` as a :class:`Sink`.

The result is that the callable is converted into a ``Sink`` instance.
When used as a decorator, invoking the decorated callable has the same
effect as invoking the ``drain`` method of the resulting ``Sink`` instance.

.. important::

The decorated callable *MUST* accept at least one argument but have at
*MOST* one required argument (the processed data to drain/consume).

.. note::

The resulting values are true ``Sink`` instances that can be disposed.
Once disposed, any attempts to invoke these instances will
result in an :exc:`ResourceDisposedError` being raised.

.. admonition:: Regarding retry safety
:class: tip

The resulting ``Sink`` is safe to retry if and only if, the decorated
callable is safe to retry.

:param f: The callable to be decorated. The callable *MUST* have at *MOST*
one required argument (the processed data to drain/consume).

:return: A ``Sink`` instance.

:raise ValueError: If the given value is NOT a ``Callable``.
"""
ensure_callable(f, message="A callable object is required.")

return _SourceOfCallable(delegate_to=f)


# =============================================================================
# SINK IMPLEMENTATIONS
# =============================================================================


@final
class _SourceOfCallable(Sink[_PDT], Generic[_PDT]):
__slots__ = ("_delegate_to", "_is_disposed", "_logger")

def __init__(self, delegate_to: _SinkCallable[_PDT]) -> None:
super().__init__()
ensure_callable(
value=delegate_to,
message="'delegate_to' MUST be a callable object.",
)
self._delegate_to: _SinkCallable[_PDT] = delegate_to
self._is_disposed: bool = False
self._logger: Logger = logging.getLogger(
f"{_OF_CALLABLE_LOGGER_PREFIX}({type_fqn(self._delegate_to)})"
)
update_wrapper(self, self._delegate_to)

@not_disposed
@override
def __enter__(self) -> Self:
"""Return ``self`` upon entering the runtime context.

.. admonition:: Don't use after dispose
:class: error

Invoking this method on an instance that is disposed(i.e. the
:attr:`is_disposed` property on the instance is ``True``) will
result in a :exc:`ResourceDisposedError` being raised.

:return: This instance.

:raise ResourceDisposedError: If this sink has already been disposed.
"""
return super(Sink, self).__enter__()

@property
@override
def is_disposed(self) -> bool:
return self._is_disposed

@not_disposed
@override
def drain(self, processed_data: _PDT) -> None:
"""Delegate consumption of the processed data to the wrapped callable.

.. admonition:: Don't use after dispose
:class: error

Invoking this method on an instance that is disposed(i.e. the
:attr:`is_disposed` property on the instance is ``True``) will
result in a :exc:`ResourceDisposedError` being raised.

:param processed_data: The processed data to consume/drain.

:return: None.

:raise ResourceDisposedError: If this sink has already been disposed.
"""
self._logger.info("Delegating to '%s'.", type_fqn(self._delegate_to))
self._delegate_to(processed_data)

@override
def dispose(self) -> None:
self._is_disposed = True
self._logger.info("Disposal complete.")


# =============================================================================
# MODULE EXPORTS
# =============================================================================


__all__ = [
"sink",
]
100 changes: 100 additions & 0 deletions test/sghi/etl/commons_tests/sinks_tests.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
# ruff: noqa: D205
"""Tests for the :module:`sghi.etl.commons.sinks` module."""

from __future__ import annotations

from typing import TYPE_CHECKING

import pytest

from sghi.disposable import ResourceDisposedError
from sghi.etl.commons import sink
from sghi.etl.core import Sink

if TYPE_CHECKING:
from collections.abc import Iterable, MutableSequence


def test_sink_decorator_delegates_to_the_wrapped_callable() -> None:
""":func:`sink` should delegate to the wrapped callable when invoked."""
repository: MutableSequence[int] = []

def save_ints(values: Iterable[int]) -> None:
repository.extend(values)

ints_consumer: Sink[Iterable[int]] = sink(save_ints)
ints_consumer(range(5))

assert repository == [0, 1, 2, 3, 4]


def test_sink_decorator_fails_on_non_callable_input_value() -> None:
""":func:`sink` should raise a :exc:`ValueError` when given a
non-callable` value.
"""
with pytest.raises(ValueError, match="callable object") as exc_info:
sink("Not a function") # type: ignore

assert exc_info.value.args[0] == "A callable object is required."


def test_sink_decorator_fails_on_a_none_input_value() -> None:
""":func:`sink` should raise a :exc:`ValueError` when given a ``None``
value.
"""
with pytest.raises(ValueError, match="callable object") as exc_info:
sink(None) # type: ignore

assert exc_info.value.args[0] == "A callable object is required."


def test_sink_decorator_returns_expected_value() -> None:
""":func:`sink` should return a ``Sink`` instance."""
repository: MutableSequence[int] = []

@sink
def save_ints(values: Iterable[int]) -> None:
repository.extend(values)

print_all: Sink[str] = sink(print)

assert isinstance(save_ints, Sink)
assert isinstance(print_all, Sink)


def test_sink_decorated_value_usage_as_a_context_manager() -> None:
""":func:`sink` decorated callables are valid context managers and
should behave correctly when used as so.
"""
repository: MutableSequence[int] = []

def save_ints(values: Iterable[int]) -> None:
repository.extend(values)

with sink(save_ints) as ints_consumer:
ints_consumer(range(5))

assert repository == [0, 1, 2, 3, 4]
assert ints_consumer.is_disposed


def test_sink_decorated_value_usage_when_is_disposed_fails() -> None:
"""Usage of a :func:`sink` decorated callable should raise
:exc:`ResourceDisposedError` when invoked after being disposed.
"""
repository: MutableSequence[int] = []

@sink
def save_ints(values: Iterable[int]) -> None:
repository.extend(values)

save_ints.dispose()

with pytest.raises(ResourceDisposedError):
save_ints(range(5))

with pytest.raises(ResourceDisposedError):
save_ints.drain(range(5))

with pytest.raises(ResourceDisposedError):
save_ints.__enter__()
2 changes: 1 addition & 1 deletion test/sghi/etl/commons_tests/sources_tests.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# ruff: noqa: D205
"""Tests for the :module:`sghi.etl.commons.processors` module."""
"""Tests for the :module:`sghi.etl.commons.sources` module."""

from __future__ import annotations

Expand Down
Loading