Skip to content

Commit

Permalink
chore(processors): delay embedded processors disposal
Browse files Browse the repository at this point in the history
Refactor all composite processors in the library to delay the disposal of
their embedded processors. All embedded processors with be disposed of
when their parent processor is disposed of. This will support streaming
processors that may need to remain "live" even after their `apply()`
method returns.
  • Loading branch information
kennedykori committed Jun 8, 2024
1 parent e5e3ce4 commit 86b776e
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 26 deletions.
32 changes: 16 additions & 16 deletions src/sghi/etl/commons/processors.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ class ProcessorPipe(Processor[_RDT, _PDT], Generic[_RDT, _PDT]):
processors.
.. admonition:: Regarding retry safety
:class: tip
:class: caution
Instances of this ``Processor`` are **NOT SAFE** to retry.
"""
Expand Down Expand Up @@ -331,9 +331,9 @@ def dispose(self) -> None:
def _processor_to_task(self, p: Processor[_RDT, _PDT]) -> Task[_RDT, _PDT]:
@task
def do_apply(raw_data: _RDT) -> _PDT:
with p as _p:
apply = self._retry_policy_factory().retry(_p.apply)
return apply(raw_data)
_p = p.__enter__()
apply = self._retry_policy_factory().retry(_p.apply)
return apply(raw_data)

return do_apply

Expand Down Expand Up @@ -368,7 +368,7 @@ class ScatterGatherProcessor(
processors.
.. admonition:: Regarding retry safety
:class: tip
:class: caution
Instances of this ``Processor`` are **NOT SAFE** to retry.
"""
Expand Down Expand Up @@ -518,8 +518,8 @@ def apply(self, raw_data: _RDT) -> Sequence[_PDT]:
"Forking processing of the received data to all embedded "
"processors."
)
with self._executor as executor:
futures = executor.execute(raw_data)
executor = self._executor.__enter__()
futures = executor.execute(raw_data)

return tuple(self._result_gatherer(futures))

Expand Down Expand Up @@ -551,9 +551,9 @@ def dispose(self) -> None:
def _processor_to_task(self, p: Processor[_RDT, _PDT]) -> Task[_RDT, _PDT]:
@task
def do_apply(raw_data: _RDT) -> _PDT:
with p as _p:
apply = self._retry_policy_factory().retry(_p.apply)
return apply(raw_data)
_p = p.__enter__()
apply = self._retry_policy_factory().retry(_p.apply)
return apply(raw_data)

return do_apply

Expand Down Expand Up @@ -588,7 +588,7 @@ class SplitGatherProcessor(
processors.
.. admonition:: Regarding retry safety
:class: tip
:class: caution
Instances of this ``Processor`` are **NOT SAFE** to retry.
""" # noqa: D205
Expand Down Expand Up @@ -757,8 +757,8 @@ def apply(self, raw_data: Sequence[_RDT]) -> Sequence[_PDT]:
"to each data part."
)

with self._executor as executor:
futures = executor.execute(raw_data)
executor = self._executor.__enter__()
futures = executor.execute(raw_data)

return tuple(self._result_gatherer(futures))

Expand Down Expand Up @@ -794,9 +794,9 @@ def _processor_to_task(
) -> Task[Sequence[_RDT], _PDT]:
@task
def do_apply(raw_data: Sequence[_RDT]) -> _PDT:
with p as _p:
apply = self._retry_policy_factory().retry(_p.apply)
return apply(raw_data[i])
_p = p.__enter__()
apply = self._retry_policy_factory().retry(_p.apply)
return apply(raw_data[i])

return do_apply

Expand Down
87 changes: 77 additions & 10 deletions test/sghi/etl/commons_tests/processors_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@
from __future__ import annotations

import time
from collections.abc import Sequence
from typing import TYPE_CHECKING
from collections.abc import Iterable, Sequence
from io import StringIO
from unittest import TestCase

import pytest
from typing_extensions import override

from sghi.disposable import ResourceDisposedError
from sghi.disposable import ResourceDisposedError, not_disposed
from sghi.etl.commons import (
NOOPProcessor,
ProcessorPipe,
Expand All @@ -22,8 +22,50 @@
from sghi.etl.core import Processor
from sghi.task import task

if TYPE_CHECKING:
from collections.abc import Iterable
# =============================================================================
# HELPERS
# =============================================================================


class _IntsToChars(Processor[Iterable[int], Iterable[str]]):
"""Streaming `ints` to `chars` converter.
This is equivalent to:
.. code-block:: python
@processor
def ints_to_chars(ints: Iterable[int]) -> Iterable[str]:
yield from map(chr, ints)
But has state to simulate a stateful streaming processor.
"""

def __init__(self) -> None:
self._buffer: StringIO = StringIO(newline="")

@property
@override
def is_disposed(self) -> bool:
return self._buffer.closed

@not_disposed
@override
def apply(self, raw_data: Iterable[int]) -> Iterable[str]:
for _int in raw_data:
self._buffer.write(chr(_int))
self._buffer.seek(0)
while _str := self._buffer.read(1):
yield _str

@override
def dispose(self) -> None:
self._buffer.close()


# =============================================================================
# TESTS
# =============================================================================


def test_processor_decorator_delegates_to_the_wrapped_callable() -> None:
Expand Down Expand Up @@ -211,17 +253,13 @@ def setUp(self) -> None:
def add_65(ints: Iterable[int]) -> Iterable[int]:
yield from (v + 65 for v in ints)

@processor
def ints_to_chars(ints: Iterable[int]) -> Iterable[str]:
yield from map(chr, ints)

@processor
def join_chars(values: Iterable[str]) -> str:
return "".join(list(values))

self._embedded_processors: Sequence[Processor] = [
add_65,
ints_to_chars,
_IntsToChars(),
join_chars,
]
self._instance: Processor[Iterable[int], str] = ProcessorPipe(
Expand Down Expand Up @@ -360,8 +398,21 @@ def test_apply_returns_the_expected_value(self) -> None:
aggregating the outputs of applying the given input to all embedded
processors contained by the ``ScatterGatherProcessor``.
"""

@processor
def ints_to_chars(ints: Iterable[int]) -> Iterable[str]:
yield from map(chr, ints)

_processor: Processor[Iterable[int], Sequence[Iterable[str]]]
_processor = ScatterGatherProcessor([ints_to_chars, _IntsToChars()])
data: Sequence[Iterable[str]] = _processor.apply([65, 66, 67, 68, 69])

for _d in data:
assert tuple(_d) == ("A", "B", "C", "D", "E")
assert tuple(self._instance.apply(-45)) == (55, -450, -95)

_processor.dispose()

def test_dispose_has_the_intended_side_effects(self) -> None:
"""Calling :meth:`ScatterGatherProcessor.dispose` should result in the
:attr:`ScatterGatherProcessor.is_disposed` property being set to
Expand Down Expand Up @@ -574,8 +625,24 @@ def test_apply_returns_the_expected_value(self) -> None:
applying each embedded processor to each data part of the given raw
data.
"""

@processor
def ints_to_chars(ints: Iterable[int]) -> Iterable[str]:
yield from map(chr, ints)

_processor: Processor[Sequence[Iterable[int]], Sequence[Iterable[str]]]
_processor = SplitGatherProcessor([ints_to_chars, _IntsToChars()])
processed_data: Sequence[Iterable[str]] = _processor.apply(
[range(65, 70), range(70, 75)],
)

assert tuple(processed_data[0]) == ("A", "B", "C", "D", "E")
assert tuple(processed_data[1]) == ("F", "G", "H", "I", "J")

assert tuple(self._instance.apply([-90, 1, 60])) == (10, 10, 10)

_processor.dispose()

def test_dispose_has_the_intended_side_effects(self) -> None:
"""Calling :meth:`SplitGatherProcessor.dispose` should result in the
:attr:`SplitGatherProcessor.is_disposed` property being set to
Expand Down

0 comments on commit 86b776e

Please sign in to comment.