From 1be71e7b4eda45916f7d86ca05298dd1da87872f Mon Sep 17 00:00:00 2001 From: Kennedy Kori Date: Sat, 8 Jun 2024 22:14:56 +0300 Subject: [PATCH] chore(processors): delay embedded processors disposal (#25) Refactor all composite processors in the library to delay the disposal of their embedded processors. All embedded processors will now be disposed of when their parent processor is disposed of. This will support streaming processors that may need to remain "live" even after their `apply()` method returns. --- src/sghi/etl/commons/processors.py | 32 +++---- .../etl/commons_tests/processors_tests.py | 87 ++++++++++++++++--- 2 files changed, 93 insertions(+), 26 deletions(-) diff --git a/src/sghi/etl/commons/processors.py b/src/sghi/etl/commons/processors.py index 35acf8b..fb0a961 100644 --- a/src/sghi/etl/commons/processors.py +++ b/src/sghi/etl/commons/processors.py @@ -198,7 +198,7 @@ class ProcessorPipe(Processor[_RDT, _PDT], Generic[_RDT, _PDT]): processors. .. admonition:: Regarding retry safety - :class: tip + :class: caution Instances of this ``Processor`` are **NOT SAFE** to retry. """ @@ -331,9 +331,9 @@ def dispose(self) -> None: def _processor_to_task(self, p: Processor[_RDT, _PDT]) -> Task[_RDT, _PDT]: @task def do_apply(raw_data: _RDT) -> _PDT: - with p as _p: - apply = self._retry_policy_factory().retry(_p.apply) - return apply(raw_data) + _p = p.__enter__() + apply = self._retry_policy_factory().retry(_p.apply) + return apply(raw_data) return do_apply @@ -368,7 +368,7 @@ class ScatterGatherProcessor( processors. .. admonition:: Regarding retry safety - :class: tip + :class: caution Instances of this ``Processor`` are **NOT SAFE** to retry. """ @@ -518,8 +518,8 @@ def apply(self, raw_data: _RDT) -> Sequence[_PDT]: "Forking processing of the received data to all embedded " "processors." ) - with self._executor as executor: - futures = executor.execute(raw_data) + executor = self._executor.__enter__() + futures = executor.execute(raw_data) return tuple(self._result_gatherer(futures)) @@ -551,9 +551,9 @@ def dispose(self) -> None: def _processor_to_task(self, p: Processor[_RDT, _PDT]) -> Task[_RDT, _PDT]: @task def do_apply(raw_data: _RDT) -> _PDT: - with p as _p: - apply = self._retry_policy_factory().retry(_p.apply) - return apply(raw_data) + _p = p.__enter__() + apply = self._retry_policy_factory().retry(_p.apply) + return apply(raw_data) return do_apply @@ -588,7 +588,7 @@ class SplitGatherProcessor( processors. .. admonition:: Regarding retry safety - :class: tip + :class: caution Instances of this ``Processor`` are **NOT SAFE** to retry. """ # noqa: D205 @@ -757,8 +757,8 @@ def apply(self, raw_data: Sequence[_RDT]) -> Sequence[_PDT]: "to each data part." ) - with self._executor as executor: - futures = executor.execute(raw_data) + executor = self._executor.__enter__() + futures = executor.execute(raw_data) return tuple(self._result_gatherer(futures)) @@ -794,9 +794,9 @@ def _processor_to_task( ) -> Task[Sequence[_RDT], _PDT]: @task def do_apply(raw_data: Sequence[_RDT]) -> _PDT: - with p as _p: - apply = self._retry_policy_factory().retry(_p.apply) - return apply(raw_data[i]) + _p = p.__enter__() + apply = self._retry_policy_factory().retry(_p.apply) + return apply(raw_data[i]) return do_apply diff --git a/test/sghi/etl/commons_tests/processors_tests.py b/test/sghi/etl/commons_tests/processors_tests.py index 609bbfb..6371ccb 100644 --- a/test/sghi/etl/commons_tests/processors_tests.py +++ b/test/sghi/etl/commons_tests/processors_tests.py @@ -4,14 +4,14 @@ from __future__ import annotations import time -from collections.abc import Sequence -from typing import TYPE_CHECKING +from collections.abc import Iterable, Sequence +from io import StringIO from unittest import TestCase import pytest from typing_extensions import override -from sghi.disposable import ResourceDisposedError +from sghi.disposable import ResourceDisposedError, not_disposed from sghi.etl.commons import ( NOOPProcessor, ProcessorPipe, @@ -22,8 +22,50 @@ from sghi.etl.core import Processor from sghi.task import task -if TYPE_CHECKING: - from collections.abc import Iterable +# ============================================================================= +# HELPERS +# ============================================================================= + + +class _IntsToChars(Processor[Iterable[int], Iterable[str]]): + """Streaming `ints` to `chars` converter. + + This is equivalent to: + + .. code-block:: python + + @processor + def ints_to_chars(ints: Iterable[int]) -> Iterable[str]: + yield from map(chr, ints) + + But has state to simulate a stateful streaming processor. + """ + + def __init__(self) -> None: + self._buffer: StringIO = StringIO(newline="") + + @property + @override + def is_disposed(self) -> bool: + return self._buffer.closed + + @not_disposed + @override + def apply(self, raw_data: Iterable[int]) -> Iterable[str]: + for _int in raw_data: + self._buffer.write(chr(_int)) + self._buffer.seek(0) + while _str := self._buffer.read(1): + yield _str + + @override + def dispose(self) -> None: + self._buffer.close() + + +# ============================================================================= +# TESTS +# ============================================================================= def test_processor_decorator_delegates_to_the_wrapped_callable() -> None: @@ -211,17 +253,13 @@ def setUp(self) -> None: def add_65(ints: Iterable[int]) -> Iterable[int]: yield from (v + 65 for v in ints) - @processor - def ints_to_chars(ints: Iterable[int]) -> Iterable[str]: - yield from map(chr, ints) - @processor def join_chars(values: Iterable[str]) -> str: return "".join(list(values)) self._embedded_processors: Sequence[Processor] = [ add_65, - ints_to_chars, + _IntsToChars(), join_chars, ] self._instance: Processor[Iterable[int], str] = ProcessorPipe( @@ -360,8 +398,21 @@ def test_apply_returns_the_expected_value(self) -> None: aggregating the outputs of applying the given input to all embedded processors contained by the ``ScatterGatherProcessor``. """ + + @processor + def ints_to_chars(ints: Iterable[int]) -> Iterable[str]: + yield from map(chr, ints) + + _processor: Processor[Iterable[int], Sequence[Iterable[str]]] + _processor = ScatterGatherProcessor([ints_to_chars, _IntsToChars()]) + data: Sequence[Iterable[str]] = _processor.apply([65, 66, 67, 68, 69]) + + for _d in data: + assert tuple(_d) == ("A", "B", "C", "D", "E") assert tuple(self._instance.apply(-45)) == (55, -450, -95) + _processor.dispose() + def test_dispose_has_the_intended_side_effects(self) -> None: """Calling :meth:`ScatterGatherProcessor.dispose` should result in the :attr:`ScatterGatherProcessor.is_disposed` property being set to @@ -574,8 +625,24 @@ def test_apply_returns_the_expected_value(self) -> None: applying each embedded processor to each data part of the given raw data. """ + + @processor + def ints_to_chars(ints: Iterable[int]) -> Iterable[str]: + yield from map(chr, ints) + + _processor: Processor[Sequence[Iterable[int]], Sequence[Iterable[str]]] + _processor = SplitGatherProcessor([ints_to_chars, _IntsToChars()]) + processed_data: Sequence[Iterable[str]] = _processor.apply( + [range(65, 70), range(70, 75)], + ) + + assert tuple(processed_data[0]) == ("A", "B", "C", "D", "E") + assert tuple(processed_data[1]) == ("F", "G", "H", "I", "J") + assert tuple(self._instance.apply([-90, 1, 60])) == (10, 10, 10) + _processor.dispose() + def test_dispose_has_the_intended_side_effects(self) -> None: """Calling :meth:`SplitGatherProcessor.dispose` should result in the :attr:`SplitGatherProcessor.is_disposed` property being set to