Skip to content

Commit

Permalink
chore(sinks): delay embedded sinks disposal (#26)
Browse files Browse the repository at this point in the history
Refactor all composite sinks in the library to delay the disposal of
their embedded sinks. All embedded sinks will now be disposed of when
their parent sink is disposed of.
  • Loading branch information
kennedykori authored Jun 9, 2024
1 parent 1be71e7 commit 0164504
Showing 1 changed file with 13 additions and 13 deletions.
26 changes: 13 additions & 13 deletions src/sghi/etl/commons/sinks.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ def sink(f: Callable[[_PDT], None]) -> Sink[_PDT]:
class NullSink(Sink[_PDT], Generic[_PDT]):
"""A :class:`Sink` that discards all the data it receives.
Like to ``dev/null`` on Unix, instances of this ``Sink`` discard all data
Like ``dev/null`` on Unix, instances of this ``Sink`` discard all data
drained to them but report the drain operation as successful. This is
mostly useful as a placeholder or where further consumption of processed
data is not required.
Expand Down Expand Up @@ -190,7 +190,7 @@ class ScatterSink(Sink[_PDT], Generic[_PDT]):
Disposing instances of this class also disposes of their embedded sinks.
.. admonition:: Regarding retry safety
:class: tip
:class: caution
Instances of this ``Sink`` are **NOT SAFE** to retry.
"""
Expand Down Expand Up @@ -324,8 +324,8 @@ def drain(self, processed_data: _PDT) -> None:
"""
self._logger.info("Draining processed data to all available sinks.")

with self._executor as executor:
futures = executor.execute(processed_data)
executor = self._executor.__enter__()
futures = executor.execute(processed_data)

self._result_gatherer(futures)

Expand Down Expand Up @@ -357,9 +357,9 @@ def dispose(self) -> None:
def _sink_to_task(self, s: Sink[_PDT]) -> Task[_PDT, None]:
@task
def do_drain(processed_data: _PDT) -> None:
with s as _s:
drain = self._retry_policy_factory().retry(_s.drain)
return drain(processed_data)
_s = s.__enter__()
drain = self._retry_policy_factory().retry(_s.drain)
return drain(processed_data)

return do_drain

Expand Down Expand Up @@ -390,7 +390,7 @@ class SplitSink(Sink[Sequence[_PDT]], Generic[_PDT]):
Disposing instances of this class also disposes of their embedded sinks.
.. admonition:: Regarding retry safety
:class: tip
:class: caution
Instances of this ``Sink`` are **NOT SAFE** to retry.
""" # noqa: D205
Expand Down Expand Up @@ -550,8 +550,8 @@ def drain(self, processed_data: Sequence[_PDT]) -> None:
"to all available sinks."
)

with self._executor as executor:
futures = executor.execute(processed_data)
executor = self._executor.__enter__()
futures = executor.execute(processed_data)

self._result_gatherer(futures)

Expand Down Expand Up @@ -587,9 +587,9 @@ def _sink_to_task(
) -> Task[Sequence[_PDT], None]:
@task
def do_drain(processed_data: Sequence[_PDT]) -> None:
with s as _s:
drain = self._retry_policy_factory().retry(_s.drain)
return drain(processed_data[i])
_s = s.__enter__()
drain = self._retry_policy_factory().retry(_s.drain)
return drain(processed_data[i])

return do_drain

Expand Down

0 comments on commit 0164504

Please sign in to comment.