Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sinks): add a ScatterSink #21

Merged
merged 1 commit into from
May 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/sghi/etl/commons/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
pipe_processors,
processor,
)
from .sinks import NullSink, SplitSink, sink
from .sinks import NullSink, ScatterSink, SplitSink, sink
from .sources import GatherSource, source
from .utils import fail_fast, fail_fast_factory, ignored_failed
from .workflow_definitions import SimpleWorkflowDefinition
Expand All @@ -20,6 +20,7 @@
"ProcessorPipe",
"SimpleWorkflowDefinition",
"ScatterGatherProcessor",
"ScatterSink",
"SplitGatherProcessor",
"SplitSink",
"fail_fast",
Expand Down
196 changes: 196 additions & 0 deletions src/sghi/etl/commons/sinks.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,201 @@ def dispose(self) -> None:
self._logger.info("Disposal complete.")


@final
class ScatterSink(Sink[_PDT], Generic[_PDT]):
"""A :class:`Sink` that drains processed data to multiple other sinks.

This ``Sink`` implementation drains (the same) processed data to multiple
other sinks (embedded sinks) concurrently. A suitable :class:`Executor` can
be specified at instantiation to control the concurrent draining to the
embedded sinks. A :class:`retry policy<Retry>` to handle transient draining
errors to the embedded sinks can also be provided. A result gatherer
function can be provided to specify how to handle errors while draining.

Instances of this class are **NOT SAFE** to retry and **SHOULD NEVER** be
retried. However, they do support retrying their embedded sinks. This is
disabled by default but can be enabled by providing a suitable value to
the ``retry_policy_factory`` constructor parameter when creating new
instances. When enabled, each embedded sink will be retried individually
per the specified retry policy in case it fails.

Disposing instances of this class also disposes of their embedded sinks.

.. admonition:: Regarding retry safety
:class: tip

Instances of this ``Sink`` are **NOT SAFE** to retry.
"""

__slots__ = (
"_sinks",
"_retry_policy_factory",
"_executor_factory",
"_result_gatherer",
"_is_disposed",
"_logger",
"_exit_stack",
"_prepped_sinks",
"_executor",
)

def __init__(
self,
sinks: Sequence[Sink[_PDT]],
retry_policy_factory: Callable[[], Retry] = noop_retry,
executor_factory: Callable[[], Executor] = ThreadPoolExecutor,
result_gatherer: _ResultGatherer[None] = fail_fast,
) -> None:
"""Create a new ``ScatterSink`` of the given properties.

:param sinks: A ``Sequence`` of sinks to drain processed data to. These
sinks are also referred to as the embedded sinks. The given
``Sequence`` *MUST NOT* be empty.
:param retry_policy_factory: A callable that supplies retry policy
instance(s) to apply to each embedded sink. This *MUST* be a valid
callable object. Defaults to a factory that returns retry policies
that do nothing.
:param executor_factory: A callable that suppliers suitable
``Executor`` instance(s) to use for the concurrent draining. This
*MUST* be a valid callable object. Defaults to a factory that
returns ``ThreadPoolExecutor`` instances.
:param result_gatherer: A function that specifies how to handle
draining errors. This *MUST* be a valid callable object. Defaults
to a gatherer that fails if draining to any of the embedded sinks
failed, or returns silently otherwise.

:raise TypeError: If ``sinks`` is NOT a ``Sequence``.
:raise ValueError: If ``sinks`` is empty or if
``retry_policy_factory``, ``executor_factory`` and
``result_gatherer`` are NOT callable objects.
"""
super().__init__()
ensure_not_none_nor_empty(
value=ensure_instance_of(
value=sinks,
message="'sinks' MUST be a collections.abc.Sequence object.",
klass=Sequence,
),
message="'sinks' MUST NOT be empty.",
)
self._sinks: Sequence[Sink[_PDT]] = tuple(sinks)
self._retry_policy_factory: Callable[[], Retry] = ensure_callable(
value=retry_policy_factory,
message="'retry_policy_factory' MUST be a callable.",
)
self._executor_factory: Callable[[], Executor] = ensure_callable(
value=executor_factory,
message="'executor_factory' MUST be a callable.",
)
self._result_gatherer: _ResultGatherer[None] = ensure_callable(
value=result_gatherer,
message="'result_gatherer' MUST be a callable.",
)
self._is_disposed: bool = False
self._logger: Logger = logging.getLogger(type_fqn(self.__class__))
self._exit_stack: ExitStack = ExitStack()

# Prepare embedded sinks for execution by ensuring that they are all
# disposed of properly once this object is disposed.
self._prepped_sinks: Sequence[Task[_PDT, None]] = tuple(
self._sink_to_task(self._exit_stack.push(_sink))
for _sink in self._sinks
)
self._executor: ConcurrentExecutor[_PDT, None]
self._executor = ConcurrentExecutor(
*self._prepped_sinks, executor=self._executor_factory()
)

@not_disposed
@override
def __enter__(self) -> Self:
"""Return ``self`` upon entering the runtime context.

.. admonition:: Don't use after dispose
:class: error

Invoking this method on an instance that is disposed(i.e. the
:attr:`is_disposed` property on the instance is ``True``) will
result in a :exc:`ResourceDisposedError` being raised.

:return: This instance.

:raise ResourceDisposedError: If this sink has already been disposed.
"""
return super(Sink, self).__enter__()

@property
@override
def is_disposed(self) -> bool:
return self._is_disposed

@not_disposed
@override
def drain(self, processed_data: _PDT) -> None:
"""Consume the supplied processed data using all embedded sinks.

This method drains the provided processed data to all embedded sinks
concurrently. It then applies the result-gatherer function assigned to
this instance (at creation) to the :class:`Future` objects resulting
from the concurrent execution. Each of these ``Future`` objects wraps
the result of draining each data part to an embedded sink contained in
this ``ScatterSink``, and they preserve the same order.

.. admonition:: Don't use after dispose
:class: error

Invoking this method on an instance that is disposed(i.e. the
:attr:`is_disposed` property on the instance is ``True``) will
result in a :exc:`ResourceDisposedError` being raised.

:param processed_data: The processed data to consume.

:return: None.

:raise ResourceDisposedError: If this sink has already been disposed.
"""
self._logger.info("Draining processed data to all available sinks.")

with self._executor as executor:
futures = executor.execute(processed_data)

self._result_gatherer(futures)

@override
def dispose(self) -> None:
"""Release any underlying resources contained by this sink.

All embedded sinks are also disposed. After this method returns
successfully, the :attr:`is_disposed` property should return ``True``.

.. note::
Unless otherwise specified, trying to use methods of a
``Disposable`` instance decorated with the
:func:`~sghi.disposable.not_disposed` decorator after this method
returns should generally be considered a programming error and
should result in a :exc:`~sghi.disposable.ResourceDisposedError`
being raised.

This method should be idempotent allowing it to be called more
than once; only the first call, however, should have an effect.

:return: None.
"""
self._is_disposed = True
self._exit_stack.close()
self._executor.dispose()
self._logger.info("Disposal complete.")

def _sink_to_task(self, s: Sink[_PDT]) -> Task[_PDT, None]:
@task
def do_drain(processed_data: _PDT) -> None:
with s as _s:
drain = self._retry_policy_factory().retry(_s.drain)
return drain(processed_data)

return do_drain


@final
class SplitSink(Sink[Sequence[_PDT]], Generic[_PDT]):
"""A :class:`Sink` that splits processed data and drains the split data to
Expand Down Expand Up @@ -473,6 +668,7 @@ def dispose(self) -> None:

__all__ = [
"NullSink",
"ScatterSink",
"SplitSink",
"sink",
]
Loading