Skip to content

Commit

Permalink
feat(processors): add a SplitGatherProcessor
Browse files Browse the repository at this point in the history
  • Loading branch information
kennedykori committed May 7, 2024
1 parent 75b30ec commit 408b7c0
Showing 1 changed file with 260 additions and 3 deletions.
263 changes: 260 additions & 3 deletions src/sghi/etl/commons/processors.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
from __future__ import annotations

import logging
from collections.abc import Callable, Sequence
from collections.abc import Callable, Iterable, Sequence
from concurrent.futures import Executor, Future, ThreadPoolExecutor
from contextlib import ExitStack
from functools import update_wrapper
from logging import Logger
Expand All @@ -14,8 +15,16 @@
from sghi.disposable import not_disposed
from sghi.etl.core import Processor
from sghi.retry import Retry, noop_retry
from sghi.task import Task, pipe, task
from sghi.utils import ensure_callable, ensure_not_none_nor_empty, type_fqn
from sghi.task import ConcurrentExecutor, Task, pipe, task
from sghi.utils import (
ensure_callable,
ensure_instance_of,
ensure_not_none_nor_empty,
ensure_predicate,
type_fqn,
)

from .utils import fail_fast

# =============================================================================
# TYPES
Expand All @@ -28,8 +37,12 @@
_RDT = TypeVar("_RDT")
"""Type variable representing the raw data type."""

_T = TypeVar("_T")

_ProcessorCallable = Callable[[_RDT], _PDT]

_ResultGatherer = Callable[[Iterable[Future[_T]]], Iterable[_T]]


# =============================================================================
# CONSTANTS
Expand Down Expand Up @@ -328,6 +341,249 @@ def do_apply(raw_data: _RDT) -> _PDT:
pipe_processors = ProcessorPipe


@final
class SplitGatherProcessor(
Processor[Sequence[_RDT], Sequence[_PDT]], Generic[_RDT, _PDT]
):
"""A :class:`Processor` that splits raw data and applies multiple
processors to the split data.
This ``Processor`` implementation takes aggregated raw data, splits it into
its constituent data parts, and then processes each data part concurrently.
This is achieved by mapping each data part to an embedded processor before
executing all the embedded processors concurrently. As such, the supplied
raw data's size must equal the number of embedded processors contained by a
processor of this type. A result gatherer function can be provided to
specify how to handle processing errors and/or which results from the
embedded processors to pick. A :class:`retry policy<Retry>` to handle
transient processing errors of the embedded processors can also be
provided. A suitable :class:`Executor` can be specified at instantiation to
control the concurrent application of the embedded processors.
Instances of this class are **NOT SAFE** to retry and **SHOULD NEVER** be
retried. However, they do support retrying their embedded processors. This
is disabled by default but can be enabled by providing a suitable value to
the ``retry_policy_factory`` constructor parameter when creating new
instances. When enabled, each embedded processor will be retried
individually per the specified retry policy in case it fails.
Disposing instances of this class also disposes of their embedded
processors.
.. admonition:: Regarding retry safety
:class: tip
Instances of this ``Processor`` are **NOT SAFE** to retry.
""" # noqa: D205

__slots__ = (
"_processors",
"_retry_policy_factory",
"_executor_factory",
"_result_gatherer",
"_is_disposed",
"_logger",
"_exit_stack",
"_prepped_processors",
"_executor",
)

def __init__(
self,
processors: Sequence[Processor[_RDT, _PDT]],
retry_policy_factory: Callable[[], Retry] = noop_retry,
executor_factory: Callable[[], Executor] = ThreadPoolExecutor,
result_gatherer: _ResultGatherer[_PDT] = fail_fast,
) -> None:
"""Create a new ``SplitGatherProcessor`` of the given properties.
:param processors: A ``Sequence`` of processors to apply to each raw
data part. These processors are also referred to as the embedded
processors. The given ``Sequence`` *MUST NOT* be empty.
:param retry_policy_factory: A callable that supplies retry policy
instance(s) to apply to each embedded processor. This *MUST* be a
valid callable object. Defaults to a factory that returns retry
policies that do nothing.
:param executor_factory: A callable that suppliers suitable
``Executor`` instance(s) to use for the concurrent processing. This
*MUST* be a valid callable object. Defaults to a factory that
returns ``ThreadPoolExecutor`` instances.
:param result_gatherer: A function that specifies how to handle
processing errors and/or which results from the embedded processors
to pick. This *MUST* be a valid callable object. Defaults to a
gatherer that fails if applying any of the embedded processors
failed, or returns all the processed data otherwise.
:raise TypeError: If ``processors`` is NOT a ``Sequence``.
:raise ValueError: If ``processors`` is empty or if
``retry_policy_factory``, ``executor_factory`` and
``result_gatherer`` are NOT callable objects.
"""
super().__init__()
ensure_not_none_nor_empty(
value=ensure_instance_of(
value=processors,
message=(
"'processors' MUST be a collections.abc.Sequence object."
),
klass=Sequence,
),
message="'processors' MUST NOT be empty.",
)
self._retry_policy_factory: Callable[[], Retry] = ensure_callable(
value=retry_policy_factory,
message="'retry_policy_factory' MUST be a callable.",
)
self._executor_factory: Callable[[], Executor] = ensure_callable(
value=executor_factory,
message="'executor_factory' MUST be a callable.",
)
self._result_gatherer: _ResultGatherer[_PDT] = ensure_callable(
value=result_gatherer,
message="'result_gatherer' MUST be a callable.",
)
self._processors: Sequence[Processor[_RDT, _PDT]] = tuple(processors)
self._is_disposed: bool = False
self._logger: Logger = logging.getLogger(type_fqn(self.__class__))
self._exit_stack: ExitStack = ExitStack()

# Prepare embedded processors for execution by ensuring that they are
# all disposed of properly once this object is disposed.
self._prepped_processors: Sequence[Task[Any, Any]] = tuple(
self._processor_to_task(self._exit_stack.push(_processor), _i)
for _i, _processor in enumerate(self._processors)
)
self._executor: ConcurrentExecutor[Sequence[_RDT], _PDT]
self._executor = ConcurrentExecutor(
*self._prepped_processors, executor=self._executor_factory()
)

@not_disposed
@override
def __enter__(self) -> Self:
"""Return ``self`` upon entering the runtime context.
.. admonition:: Don't use after dispose
:class: error
Invoking this method on an instance that is disposed(i.e. the
:attr:`is_disposed` property on the instance is ``True``) will
result in a :exc:`ResourceDisposedError` being raised.
:return: This instance.
:raise ResourceDisposedError: If this processor has already been
disposed.
"""
return super(Processor, self).__enter__()

@property
@override
def is_disposed(self) -> bool:
return self._is_disposed

@not_disposed
@override
def apply(self, raw_data: Sequence[_RDT]) -> Sequence[_PDT]:
"""Split the supplied raw data, process it and return the results.
This method decomposes the provided aggregated raw data into its
constituent data parts and maps each data part to an embedded
processor; before executing all the embedded processors concurrently.
It then applies the result-gatherer function assigned to this instance
(at creation) to the :class:`Future` objects resulting from the
concurrent execution. Each of these ``Future`` objects wraps the result
of processing each data part using an embedded processor contained in
this ``SplitGatherProcessor``, and they preserve the same order.
The contents of the resulting aggregate, and their ordering, are
determined by the provided result-gatherer function. That is, the
contents of the returned ``Sequence``, will be the same as those
returned by this object's result-gatherer function and in the same
order.
.. admonition:: Don't use after dispose
:class: error
Invoking this method on an instance that is disposed(i.e. the
:attr:`is_disposed` property on the instance is ``True``) will
result in a :exc:`ResourceDisposedError` being raised.
:param raw_data: The aggregated raw data to process. This *MUST* be a
``Sequence`` of raw data values whose size *MUST EQUAL* the number
of embedded processors contained by this ``SplitGatherProcessor``.
:return: The aggregated results of applying the embedded processors to
the provided raw data.
:raise ResourceDisposedError: If this processor has already been
disposed.
:raise TypeError: If ``raw_data`` *IS NOT* a ``Sequence``.
:raise ValueError: If the size of ``raw_data`` *DOES NOT EQUAL* the
number of embedded processors in this ``SplitGatherProcessor``.
"""
ensure_instance_of(
value=raw_data,
klass=Sequence,
message="'raw_data' MUST be a collections.abc.Sequence object",
)
ensure_predicate(
test=len(raw_data) == len(self._processors),
message=(
f"Expected 'raw_data' to be of size {len(self._processors)} "
f"but got size {len(raw_data)} instead."
),
exc_factory=ValueError,
)
self._logger.info(
"Splitting aggregated raw data and applying available processors "
"to each data part."
)

with self._executor as executor:
futures = executor.execute(raw_data)

return tuple(self._result_gatherer(futures))

@override
def dispose(self) -> None:
"""Release any underlying resources contained by this processor.
All embedded processors are also disposed. After this method returns
successfully, the :attr:`is_disposed` property should return ``True``.
.. note::
Unless otherwise specified, trying to use methods of a
``Disposable`` instance decorated with the
:func:`~sghi.disposable.not_disposed` decorator after this method
returns should generally be considered a programming error and
should result in a :exc:`~sghi.disposable.ResourceDisposedError`
being raised.
This method should be idempotent allowing it to be called more
than once; only the first call, however, should have an effect.
:return: None.
"""
self._is_disposed = True
self._exit_stack.close()
self._executor.dispose()
self._logger.info("Disposal complete.")

def _processor_to_task(
self,
p: Processor[_RDT, _PDT],
i: int,
) -> Task[Sequence[_RDT], _PDT]:
@task
def do_apply(raw_data: Sequence[_RDT]) -> _PDT:
with p as _p:
apply = self._retry_policy_factory().retry(_p.apply)
return apply(raw_data[i])

return do_apply


@final
class _ProcessorOfCallable(Processor[_RDT, _PDT], Generic[_RDT, _PDT]):
__slots__ = ("_delegate_to", "_is_disposed", "_logger")
Expand Down Expand Up @@ -405,6 +661,7 @@ def dispose(self) -> None:
__all__ = [
"NOOPProcessor",
"ProcessorPipe",
"SplitGatherProcessor",
"pipe_processors",
"processor",
]

0 comments on commit 408b7c0

Please sign in to comment.