Skip to content

Commit

Permalink
feat(sources): add a GatherSource
Browse files Browse the repository at this point in the history
  • Loading branch information
kennedykori committed May 2, 2024
1 parent 3866f68 commit 2b0dc65
Showing 1 changed file with 136 additions and 2 deletions.
138 changes: 136 additions & 2 deletions src/sghi/etl/commons/sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
from __future__ import annotations

import logging
from collections.abc import Callable
from collections.abc import Callable, Iterable, Sequence
from concurrent.futures import Executor, Future, ThreadPoolExecutor
from contextlib import ExitStack
from functools import update_wrapper
from logging import Logger
from typing import Final, Generic, Self, TypeVar, final
Expand All @@ -12,7 +14,11 @@

from sghi.disposable import not_disposed
from sghi.etl.core import Source
from sghi.utils import ensure_callable, type_fqn
from sghi.retry import Retry, noop_retry
from sghi.task import ConcurrentExecutor, Supplier, supplier
from sghi.utils import ensure_callable, ensure_not_none_nor_empty, type_fqn

from .utils import fail_fast

# =============================================================================
# TYPES
Expand All @@ -22,6 +28,10 @@
_RDT = TypeVar("_RDT")
"""Raw Data Type."""

_T = TypeVar("_T")

_ResultGatherer = Callable[[Iterable[Future[_T]]], Iterable[_T]]

_SourceCallable = Callable[[], _RDT]


Expand Down Expand Up @@ -80,6 +90,130 @@ def source(f: Callable[[], _RDT]) -> Source[_RDT]:
# =============================================================================


@final
class GatherSource(Source[Sequence[_RDT]], Generic[_RDT]):
"""A :class:`Source` that aggregates raw data from multiple sources."""

__slots__ = (
"_sources",
"_retry_policy_factory",
"_executor_factory",
"_result_gatherer",
"_is_disposed",
"_logger",
"_exit_stack",
"_executor",
"_prepped_sources",
)

def __init__(
self,
sources: Sequence[Source[_RDT]],
retry_policy_factory: Callable[[], Retry] = noop_retry,
executor_factory: Callable[[], Executor] = ThreadPoolExecutor,
result_gatherer: _ResultGatherer[_RDT] = fail_fast,
) -> None:
super().__init__()
ensure_not_none_nor_empty(
value=sources,
message="'sources' MUST NOT be None or empty.",
)
self._sources: Sequence[Source[_RDT]] = tuple(sources)
self._retry_policy_factory: Callable[[], Retry] = ensure_callable(
value=retry_policy_factory,
message="'retry_policy_factory' MUST be a callable.",
)
self._executor_factory: Callable[[], Executor] = ensure_callable(
value=executor_factory,
message="'executor_factory' MUST be a callable.",
)
self._result_gatherer: _ResultGatherer[_RDT] = ensure_callable(
value=result_gatherer,
message="'result_gatherer' MUST be a callable.",
)
self._is_disposed: bool = False
self._logger: Logger = logging.getLogger(type_fqn(self.__class__))
self._exit_stack: ExitStack = ExitStack()
self._executor: ConcurrentExecutor[None, _RDT] = ConcurrentExecutor(
executor=self._executor_factory()
)

# Prepare embedded sources for execution by ensuring that they are all
# disposed of properly once this object is disposed.
self._prepped_sources: Sequence[Supplier[_RDT]] = tuple(
self._source_to_task(self._exit_stack.push(_source))
for _source in self._sources
)

@not_disposed
@override
def __enter__(self) -> Self:
"""Return ``self`` upon entering the runtime context.
.. admonition:: Don't use after dispose
:class: error
Invoking this method on an instance that is disposed(i.e. the
:attr:`is_disposed` property on the instance is ``True``) will
result in a :exc:`ResourceDisposedError` being raised.
:return: This instance.
:raise ResourceDisposedError: If this source has already been disposed.
"""
return super(Source, self).__enter__()

@property
@override
def is_disposed(self) -> bool:
return self._is_disposed

@not_disposed
@override
def draw(self) -> Sequence[_RDT]:
self._logger.info("Aggregating data from all available sources.")

with self._executor as executor:
futures = executor.execute(None)

return tuple(self._result_gatherer(futures))

@override
def dispose(self) -> None:
"""Release any underlying resources contained by this source.
All embedded sources are also disposed. After this method returns
successfully, the :attr:`is_disposed` property should return ``True``.
.. note::
Unless otherwise specified, trying to use methods of a
``Disposable`` instance decorated with the
:func:`~sghi.disposable.not_disposed` decorator after this method
returns should generally be considered a programming error and
should result in a :exc:`~sghi.disposable.ResourceDisposedError`
being raised.
This method should be idempotent allowing it to be called more
than once; only the first call, however, should have an effect.
:return: None.
"""
self._is_disposed = True
self._exit_stack.close()
self._executor.dispose()
self._logger.info("Disposal complete.")

def _source_to_task(self, s: Source[_RDT]) -> Supplier[_RDT]:
@supplier
def do_draw() -> _RDT:
with s as _s:
draw = self._retry_policy_factory().retry(_s.draw)
return draw()

# noinspection PyTypeChecker
return do_draw


@final
class _SourceOfCallable(Source[_RDT], Generic[_RDT]):
__slots__ = ("_delegate_to", "_is_disposed", "_logger")
Expand Down

0 comments on commit 2b0dc65

Please sign in to comment.