From 2b0dc65cbe3670c4f6159f5157fcf78c764ef135 Mon Sep 17 00:00:00 2001 From: Kennedy Kori Date: Thu, 2 May 2024 09:31:32 +0300 Subject: [PATCH] feat(sources): add a `GatherSource` --- src/sghi/etl/commons/sources.py | 138 +++++++++++++++++++++++++++++++- 1 file changed, 136 insertions(+), 2 deletions(-) diff --git a/src/sghi/etl/commons/sources.py b/src/sghi/etl/commons/sources.py index de685dc..37d7530 100644 --- a/src/sghi/etl/commons/sources.py +++ b/src/sghi/etl/commons/sources.py @@ -3,7 +3,9 @@ from __future__ import annotations import logging -from collections.abc import Callable +from collections.abc import Callable, Iterable, Sequence +from concurrent.futures import Executor, Future, ThreadPoolExecutor +from contextlib import ExitStack from functools import update_wrapper from logging import Logger from typing import Final, Generic, Self, TypeVar, final @@ -12,7 +14,11 @@ from sghi.disposable import not_disposed from sghi.etl.core import Source -from sghi.utils import ensure_callable, type_fqn +from sghi.retry import Retry, noop_retry +from sghi.task import ConcurrentExecutor, Supplier, supplier +from sghi.utils import ensure_callable, ensure_not_none_nor_empty, type_fqn + +from .utils import fail_fast # ============================================================================= # TYPES @@ -22,6 +28,10 @@ _RDT = TypeVar("_RDT") """Raw Data Type.""" +_T = TypeVar("_T") + +_ResultGatherer = Callable[[Iterable[Future[_T]]], Iterable[_T]] + _SourceCallable = Callable[[], _RDT] @@ -80,6 +90,130 @@ def source(f: Callable[[], _RDT]) -> Source[_RDT]: # ============================================================================= +@final +class GatherSource(Source[Sequence[_RDT]], Generic[_RDT]): + """A :class:`Source` that aggregates raw data from multiple sources.""" + + __slots__ = ( + "_sources", + "_retry_policy_factory", + "_executor_factory", + "_result_gatherer", + "_is_disposed", + "_logger", + "_exit_stack", + "_executor", + "_prepped_sources", + ) + + def __init__( + self, + sources: Sequence[Source[_RDT]], + retry_policy_factory: Callable[[], Retry] = noop_retry, + executor_factory: Callable[[], Executor] = ThreadPoolExecutor, + result_gatherer: _ResultGatherer[_RDT] = fail_fast, + ) -> None: + super().__init__() + ensure_not_none_nor_empty( + value=sources, + message="'sources' MUST NOT be None or empty.", + ) + self._sources: Sequence[Source[_RDT]] = tuple(sources) + self._retry_policy_factory: Callable[[], Retry] = ensure_callable( + value=retry_policy_factory, + message="'retry_policy_factory' MUST be a callable.", + ) + self._executor_factory: Callable[[], Executor] = ensure_callable( + value=executor_factory, + message="'executor_factory' MUST be a callable.", + ) + self._result_gatherer: _ResultGatherer[_RDT] = ensure_callable( + value=result_gatherer, + message="'result_gatherer' MUST be a callable.", + ) + self._is_disposed: bool = False + self._logger: Logger = logging.getLogger(type_fqn(self.__class__)) + self._exit_stack: ExitStack = ExitStack() + self._executor: ConcurrentExecutor[None, _RDT] = ConcurrentExecutor( + executor=self._executor_factory() + ) + + # Prepare embedded sources for execution by ensuring that they are all + # disposed of properly once this object is disposed. + self._prepped_sources: Sequence[Supplier[_RDT]] = tuple( + self._source_to_task(self._exit_stack.push(_source)) + for _source in self._sources + ) + + @not_disposed + @override + def __enter__(self) -> Self: + """Return ``self`` upon entering the runtime context. + + .. admonition:: Don't use after dispose + :class: error + + Invoking this method on an instance that is disposed(i.e. the + :attr:`is_disposed` property on the instance is ``True``) will + result in a :exc:`ResourceDisposedError` being raised. + + :return: This instance. + + :raise ResourceDisposedError: If this source has already been disposed. + """ + return super(Source, self).__enter__() + + @property + @override + def is_disposed(self) -> bool: + return self._is_disposed + + @not_disposed + @override + def draw(self) -> Sequence[_RDT]: + self._logger.info("Aggregating data from all available sources.") + + with self._executor as executor: + futures = executor.execute(None) + + return tuple(self._result_gatherer(futures)) + + @override + def dispose(self) -> None: + """Release any underlying resources contained by this source. + + All embedded sources are also disposed. After this method returns + successfully, the :attr:`is_disposed` property should return ``True``. + + .. note:: + Unless otherwise specified, trying to use methods of a + ``Disposable`` instance decorated with the + :func:`~sghi.disposable.not_disposed` decorator after this method + returns should generally be considered a programming error and + should result in a :exc:`~sghi.disposable.ResourceDisposedError` + being raised. + + This method should be idempotent allowing it to be called more + than once; only the first call, however, should have an effect. + + :return: None. + """ + self._is_disposed = True + self._exit_stack.close() + self._executor.dispose() + self._logger.info("Disposal complete.") + + def _source_to_task(self, s: Source[_RDT]) -> Supplier[_RDT]: + @supplier + def do_draw() -> _RDT: + with s as _s: + draw = self._retry_policy_factory().retry(_s.draw) + return draw() + + # noinspection PyTypeChecker + return do_draw + + @final class _SourceOfCallable(Source[_RDT], Generic[_RDT]): __slots__ = ("_delegate_to", "_is_disposed", "_logger")