Skip to content

Commit

Permalink
Merge pull request #28 from savannahghi/next
Browse files Browse the repository at this point in the history
release v1.1.0-rc.3
  • Loading branch information
kennedykori authored Nov 17, 2024
2 parents 09f466a + 53aee40 commit 387abca
Show file tree
Hide file tree
Showing 10 changed files with 2,437 additions and 50 deletions.
6 changes: 6 additions & 0 deletions docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,15 @@
("py:class", "TracebackType"), # Used as type annotation. Only available when type checking
("py:class", "concurrent.futures._base.Executor"), # sphinx can't find it
("py:class", "concurrent.futures._base.Future"), # sphinx can't find it
("py:class", "sghi.exceptions.SGHIError"), # sphinx can't find it
("py:class", "sghi.etl.commons.processors._RDT"), # private type annotations
("py:class", "sghi.etl.commons.processors._PDT"), # private type annotations
("py:class", "sghi.etl.commons.sinks._PDT"), # private type annotations
("py:class", "sghi.etl.commons.sources._RDT"), # private type annotations
("py:class", "sghi.etl.commons.utils.result_gatherers._T"), # private type annotations
("py:class", "sghi.etl.commons.utils.result_gatherers._T1"), # private type annotations
("py:class", "sghi.etl.commons.workflow_builder._RDT"), # private type annotations
("py:class", "sghi.etl.commons.workflow_builder._PDT"), # private type annotations
("py:class", "sghi.etl.commons.workflow_definitions._RDT"), # private type annotations
("py:class", "sghi.etl.commons.workflow_definitions._PDT"), # private type annotations
("py:class", "sghi.etl.core._RDT"), # private type annotations
Expand All @@ -93,6 +97,8 @@
("py:obj", "sghi.etl.commons.processors._RDT"), # private type annotations
("py:obj", "sghi.etl.commons.sinks._PDT"), # private type annotations
("py:obj", "sghi.etl.commons.sources._RDT"), # private type annotations
("py:obj", "sghi.etl.commons.workflow_builder._RDT"), # private type annotations
("py:obj", "sghi.etl.commons.workflow_builder._PDT"), # private type annotations
("py:obj", "sghi.etl.commons.workflow_definitions._RDT"), # private type annotations
("py:obj", "sghi.etl.commons.workflow_definitions._PDT"), # private type annotations
]
Expand Down
1 change: 1 addition & 0 deletions docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ API Reference
sghi.etl.commons.sinks
sghi.etl.commons.sources
sghi.etl.commons.utils
sghi.etl.commons.workflow_builder
sghi.etl.commons.workflow_definitions


Expand Down
8 changes: 8 additions & 0 deletions src/sghi/etl/commons/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,26 @@
from .sinks import NullSink, ScatterSink, SplitSink, sink
from .sources import GatherSource, source
from .utils import fail_fast, fail_fast_factory, ignored_failed, run_workflow
from .workflow_builder import (
NoSourceProvidedError,
SoleValueAlreadyRetrievedError,
WorkflowBuilder,
)
from .workflow_definitions import SimpleWorkflowDefinition

__all__ = [
"GatherSource",
"NOOPProcessor",
"NoSourceProvidedError",
"NullSink",
"ProcessorPipe",
"SimpleWorkflowDefinition",
"SoleValueAlreadyRetrievedError",
"ScatterGatherProcessor",
"ScatterSink",
"SplitGatherProcessor",
"SplitSink",
"WorkflowBuilder",
"fail_fast",
"fail_fast_factory",
"ignored_failed",
Expand Down
32 changes: 16 additions & 16 deletions src/sghi/etl/commons/processors.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ class ProcessorPipe(Processor[_RDT, _PDT], Generic[_RDT, _PDT]):
processors.
.. admonition:: Regarding retry safety
:class: tip
:class: caution
Instances of this ``Processor`` are **NOT SAFE** to retry.
"""
Expand Down Expand Up @@ -331,9 +331,9 @@ def dispose(self) -> None:
def _processor_to_task(self, p: Processor[_RDT, _PDT]) -> Task[_RDT, _PDT]:
@task
def do_apply(raw_data: _RDT) -> _PDT:
with p as _p:
apply = self._retry_policy_factory().retry(_p.apply)
return apply(raw_data)
_p = p.__enter__()
apply = self._retry_policy_factory().retry(_p.apply)
return apply(raw_data)

return do_apply

Expand Down Expand Up @@ -368,7 +368,7 @@ class ScatterGatherProcessor(
processors.
.. admonition:: Regarding retry safety
:class: tip
:class: caution
Instances of this ``Processor`` are **NOT SAFE** to retry.
"""
Expand Down Expand Up @@ -518,8 +518,8 @@ def apply(self, raw_data: _RDT) -> Sequence[_PDT]:
"Forking processing of the received data to all embedded "
"processors."
)
with self._executor as executor:
futures = executor.execute(raw_data)
executor = self._executor.__enter__()
futures = executor.execute(raw_data)

return tuple(self._result_gatherer(futures))

Expand Down Expand Up @@ -551,9 +551,9 @@ def dispose(self) -> None:
def _processor_to_task(self, p: Processor[_RDT, _PDT]) -> Task[_RDT, _PDT]:
@task
def do_apply(raw_data: _RDT) -> _PDT:
with p as _p:
apply = self._retry_policy_factory().retry(_p.apply)
return apply(raw_data)
_p = p.__enter__()
apply = self._retry_policy_factory().retry(_p.apply)
return apply(raw_data)

return do_apply

Expand Down Expand Up @@ -588,7 +588,7 @@ class SplitGatherProcessor(
processors.
.. admonition:: Regarding retry safety
:class: tip
:class: caution
Instances of this ``Processor`` are **NOT SAFE** to retry.
""" # noqa: D205
Expand Down Expand Up @@ -757,8 +757,8 @@ def apply(self, raw_data: Sequence[_RDT]) -> Sequence[_PDT]:
"to each data part."
)

with self._executor as executor:
futures = executor.execute(raw_data)
executor = self._executor.__enter__()
futures = executor.execute(raw_data)

return tuple(self._result_gatherer(futures))

Expand Down Expand Up @@ -794,9 +794,9 @@ def _processor_to_task(
) -> Task[Sequence[_RDT], _PDT]:
@task
def do_apply(raw_data: Sequence[_RDT]) -> _PDT:
with p as _p:
apply = self._retry_policy_factory().retry(_p.apply)
return apply(raw_data[i])
_p = p.__enter__()
apply = self._retry_policy_factory().retry(_p.apply)
return apply(raw_data[i])

return do_apply

Expand Down
26 changes: 13 additions & 13 deletions src/sghi/etl/commons/sinks.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ def sink(f: Callable[[_PDT], None]) -> Sink[_PDT]:
class NullSink(Sink[_PDT], Generic[_PDT]):
"""A :class:`Sink` that discards all the data it receives.
Like to ``dev/null`` on Unix, instances of this ``Sink`` discard all data
Like ``dev/null`` on Unix, instances of this ``Sink`` discard all data
drained to them but report the drain operation as successful. This is
mostly useful as a placeholder or where further consumption of processed
data is not required.
Expand Down Expand Up @@ -190,7 +190,7 @@ class ScatterSink(Sink[_PDT], Generic[_PDT]):
Disposing instances of this class also disposes of their embedded sinks.
.. admonition:: Regarding retry safety
:class: tip
:class: caution
Instances of this ``Sink`` are **NOT SAFE** to retry.
"""
Expand Down Expand Up @@ -324,8 +324,8 @@ def drain(self, processed_data: _PDT) -> None:
"""
self._logger.info("Draining processed data to all available sinks.")

with self._executor as executor:
futures = executor.execute(processed_data)
executor = self._executor.__enter__()
futures = executor.execute(processed_data)

self._result_gatherer(futures)

Expand Down Expand Up @@ -357,9 +357,9 @@ def dispose(self) -> None:
def _sink_to_task(self, s: Sink[_PDT]) -> Task[_PDT, None]:
@task
def do_drain(processed_data: _PDT) -> None:
with s as _s:
drain = self._retry_policy_factory().retry(_s.drain)
return drain(processed_data)
_s = s.__enter__()
drain = self._retry_policy_factory().retry(_s.drain)
return drain(processed_data)

return do_drain

Expand Down Expand Up @@ -390,7 +390,7 @@ class SplitSink(Sink[Sequence[_PDT]], Generic[_PDT]):
Disposing instances of this class also disposes of their embedded sinks.
.. admonition:: Regarding retry safety
:class: tip
:class: caution
Instances of this ``Sink`` are **NOT SAFE** to retry.
""" # noqa: D205
Expand Down Expand Up @@ -550,8 +550,8 @@ def drain(self, processed_data: Sequence[_PDT]) -> None:
"to all available sinks."
)

with self._executor as executor:
futures = executor.execute(processed_data)
executor = self._executor.__enter__()
futures = executor.execute(processed_data)

self._result_gatherer(futures)

Expand Down Expand Up @@ -587,9 +587,9 @@ def _sink_to_task(
) -> Task[Sequence[_PDT], None]:
@task
def do_drain(processed_data: Sequence[_PDT]) -> None:
with s as _s:
drain = self._retry_policy_factory().retry(_s.drain)
return drain(processed_data[i])
_s = s.__enter__()
drain = self._retry_policy_factory().retry(_s.drain)
return drain(processed_data[i])

return do_drain

Expand Down
12 changes: 6 additions & 6 deletions src/sghi/etl/commons/sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ class GatherSource(Source[Sequence[_RDT]], Generic[_RDT]):
Disposing instances of this class also disposes of their embedded sources.
.. admonition:: Regarding retry safety
:class: tip
:class: caution
Instances of this ``Source`` are **NOT SAFE** to retry.
"""
Expand Down Expand Up @@ -256,8 +256,8 @@ def draw(self) -> Sequence[_RDT]:
"""
self._logger.info("Aggregating data from all available sources.")

with self._executor as executor:
futures = executor.execute(None)
executor = self._executor.__enter__()
futures = executor.execute(None)

return tuple(self._result_gatherer(futures))

Expand Down Expand Up @@ -289,9 +289,9 @@ def dispose(self) -> None:
def _source_to_task(self, s: Source[_RDT]) -> Supplier[_RDT]:
@supplier
def do_draw() -> _RDT:
with s as _s:
draw = self._retry_policy_factory().retry(_s.draw)
return draw()
_s = s.__enter__()
draw = self._retry_policy_factory().retry(_s.draw)
return draw()

# noinspection PyTypeChecker
return do_draw
Expand Down
Loading

0 comments on commit 387abca

Please sign in to comment.