diff --git a/src/sghi/etl/commons/processors.py b/src/sghi/etl/commons/processors.py index 35acf8b..fb0a961 100644 --- a/src/sghi/etl/commons/processors.py +++ b/src/sghi/etl/commons/processors.py @@ -198,7 +198,7 @@ class ProcessorPipe(Processor[_RDT, _PDT], Generic[_RDT, _PDT]): processors. .. admonition:: Regarding retry safety - :class: tip + :class: caution Instances of this ``Processor`` are **NOT SAFE** to retry. """ @@ -331,9 +331,9 @@ def dispose(self) -> None: def _processor_to_task(self, p: Processor[_RDT, _PDT]) -> Task[_RDT, _PDT]: @task def do_apply(raw_data: _RDT) -> _PDT: - with p as _p: - apply = self._retry_policy_factory().retry(_p.apply) - return apply(raw_data) + _p = p.__enter__() + apply = self._retry_policy_factory().retry(_p.apply) + return apply(raw_data) return do_apply @@ -368,7 +368,7 @@ class ScatterGatherProcessor( processors. .. admonition:: Regarding retry safety - :class: tip + :class: caution Instances of this ``Processor`` are **NOT SAFE** to retry. """ @@ -518,8 +518,8 @@ def apply(self, raw_data: _RDT) -> Sequence[_PDT]: "Forking processing of the received data to all embedded " "processors." ) - with self._executor as executor: - futures = executor.execute(raw_data) + executor = self._executor.__enter__() + futures = executor.execute(raw_data) return tuple(self._result_gatherer(futures)) @@ -551,9 +551,9 @@ def dispose(self) -> None: def _processor_to_task(self, p: Processor[_RDT, _PDT]) -> Task[_RDT, _PDT]: @task def do_apply(raw_data: _RDT) -> _PDT: - with p as _p: - apply = self._retry_policy_factory().retry(_p.apply) - return apply(raw_data) + _p = p.__enter__() + apply = self._retry_policy_factory().retry(_p.apply) + return apply(raw_data) return do_apply @@ -588,7 +588,7 @@ class SplitGatherProcessor( processors. .. admonition:: Regarding retry safety - :class: tip + :class: caution Instances of this ``Processor`` are **NOT SAFE** to retry. """ # noqa: D205 @@ -757,8 +757,8 @@ def apply(self, raw_data: Sequence[_RDT]) -> Sequence[_PDT]: "to each data part." ) - with self._executor as executor: - futures = executor.execute(raw_data) + executor = self._executor.__enter__() + futures = executor.execute(raw_data) return tuple(self._result_gatherer(futures)) @@ -794,9 +794,9 @@ def _processor_to_task( ) -> Task[Sequence[_RDT], _PDT]: @task def do_apply(raw_data: Sequence[_RDT]) -> _PDT: - with p as _p: - apply = self._retry_policy_factory().retry(_p.apply) - return apply(raw_data[i]) + _p = p.__enter__() + apply = self._retry_policy_factory().retry(_p.apply) + return apply(raw_data[i]) return do_apply diff --git a/test/sghi/etl/commons_tests/processors_tests.py b/test/sghi/etl/commons_tests/processors_tests.py index 609bbfb..6371ccb 100644 --- a/test/sghi/etl/commons_tests/processors_tests.py +++ b/test/sghi/etl/commons_tests/processors_tests.py @@ -4,14 +4,14 @@ from __future__ import annotations import time -from collections.abc import Sequence -from typing import TYPE_CHECKING +from collections.abc import Iterable, Sequence +from io import StringIO from unittest import TestCase import pytest from typing_extensions import override -from sghi.disposable import ResourceDisposedError +from sghi.disposable import ResourceDisposedError, not_disposed from sghi.etl.commons import ( NOOPProcessor, ProcessorPipe, @@ -22,8 +22,50 @@ from sghi.etl.core import Processor from sghi.task import task -if TYPE_CHECKING: - from collections.abc import Iterable +# ============================================================================= +# HELPERS +# ============================================================================= + + +class _IntsToChars(Processor[Iterable[int], Iterable[str]]): + """Streaming `ints` to `chars` converter. + + This is equivalent to: + + .. code-block:: python + + @processor + def ints_to_chars(ints: Iterable[int]) -> Iterable[str]: + yield from map(chr, ints) + + But has state to simulate a stateful streaming processor. + """ + + def __init__(self) -> None: + self._buffer: StringIO = StringIO(newline="") + + @property + @override + def is_disposed(self) -> bool: + return self._buffer.closed + + @not_disposed + @override + def apply(self, raw_data: Iterable[int]) -> Iterable[str]: + for _int in raw_data: + self._buffer.write(chr(_int)) + self._buffer.seek(0) + while _str := self._buffer.read(1): + yield _str + + @override + def dispose(self) -> None: + self._buffer.close() + + +# ============================================================================= +# TESTS +# ============================================================================= def test_processor_decorator_delegates_to_the_wrapped_callable() -> None: @@ -211,17 +253,13 @@ def setUp(self) -> None: def add_65(ints: Iterable[int]) -> Iterable[int]: yield from (v + 65 for v in ints) - @processor - def ints_to_chars(ints: Iterable[int]) -> Iterable[str]: - yield from map(chr, ints) - @processor def join_chars(values: Iterable[str]) -> str: return "".join(list(values)) self._embedded_processors: Sequence[Processor] = [ add_65, - ints_to_chars, + _IntsToChars(), join_chars, ] self._instance: Processor[Iterable[int], str] = ProcessorPipe( @@ -360,8 +398,21 @@ def test_apply_returns_the_expected_value(self) -> None: aggregating the outputs of applying the given input to all embedded processors contained by the ``ScatterGatherProcessor``. """ + + @processor + def ints_to_chars(ints: Iterable[int]) -> Iterable[str]: + yield from map(chr, ints) + + _processor: Processor[Iterable[int], Sequence[Iterable[str]]] + _processor = ScatterGatherProcessor([ints_to_chars, _IntsToChars()]) + data: Sequence[Iterable[str]] = _processor.apply([65, 66, 67, 68, 69]) + + for _d in data: + assert tuple(_d) == ("A", "B", "C", "D", "E") assert tuple(self._instance.apply(-45)) == (55, -450, -95) + _processor.dispose() + def test_dispose_has_the_intended_side_effects(self) -> None: """Calling :meth:`ScatterGatherProcessor.dispose` should result in the :attr:`ScatterGatherProcessor.is_disposed` property being set to @@ -574,8 +625,24 @@ def test_apply_returns_the_expected_value(self) -> None: applying each embedded processor to each data part of the given raw data. """ + + @processor + def ints_to_chars(ints: Iterable[int]) -> Iterable[str]: + yield from map(chr, ints) + + _processor: Processor[Sequence[Iterable[int]], Sequence[Iterable[str]]] + _processor = SplitGatherProcessor([ints_to_chars, _IntsToChars()]) + processed_data: Sequence[Iterable[str]] = _processor.apply( + [range(65, 70), range(70, 75)], + ) + + assert tuple(processed_data[0]) == ("A", "B", "C", "D", "E") + assert tuple(processed_data[1]) == ("F", "G", "H", "I", "J") + assert tuple(self._instance.apply([-90, 1, 60])) == (10, 10, 10) + _processor.dispose() + def test_dispose_has_the_intended_side_effects(self) -> None: """Calling :meth:`SplitGatherProcessor.dispose` should result in the :attr:`SplitGatherProcessor.is_disposed` property being set to