Skip to content

Commit

Permalink
feat(workflow-builder): add a WorkflowBuilder
Browse files Browse the repository at this point in the history
A `WorkflowBuilder` is a class that aids in the construction of
`WorkflowDefinition` instances in a structured manner. The builder class
offers a convinient way to construct SGHI ETL Workflows by providing
methods to register sources, processors and sinks.
  • Loading branch information
kennedykori committed Nov 17, 2024
1 parent 5df425d commit d9a5024
Show file tree
Hide file tree
Showing 10 changed files with 2,288 additions and 3 deletions.
6 changes: 6 additions & 0 deletions docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,15 @@
("py:class", "TracebackType"), # Used as type annotation. Only available when type checking
("py:class", "concurrent.futures._base.Executor"), # sphinx can't find it
("py:class", "concurrent.futures._base.Future"), # sphinx can't find it
("py:class", "sghi.exceptions.SGHIError"), # sphinx can't find it
("py:class", "sghi.etl.commons.processors._RDT"), # private type annotations
("py:class", "sghi.etl.commons.processors._PDT"), # private type annotations
("py:class", "sghi.etl.commons.sinks._PDT"), # private type annotations
("py:class", "sghi.etl.commons.sources._RDT"), # private type annotations
("py:class", "sghi.etl.commons.utils.result_gatherers._T"), # private type annotations
("py:class", "sghi.etl.commons.utils.result_gatherers._T1"), # private type annotations
("py:class", "sghi.etl.commons.workflow_builder._RDT"), # private type annotations
("py:class", "sghi.etl.commons.workflow_builder._PDT"), # private type annotations
("py:class", "sghi.etl.commons.workflow_definitions._RDT"), # private type annotations
("py:class", "sghi.etl.commons.workflow_definitions._PDT"), # private type annotations
("py:class", "sghi.etl.core._RDT"), # private type annotations
Expand All @@ -93,6 +97,8 @@
("py:obj", "sghi.etl.commons.processors._RDT"), # private type annotations
("py:obj", "sghi.etl.commons.sinks._PDT"), # private type annotations
("py:obj", "sghi.etl.commons.sources._RDT"), # private type annotations
("py:obj", "sghi.etl.commons.workflow_builder._RDT"), # private type annotations
("py:obj", "sghi.etl.commons.workflow_builder._PDT"), # private type annotations
("py:obj", "sghi.etl.commons.workflow_definitions._RDT"), # private type annotations
("py:obj", "sghi.etl.commons.workflow_definitions._PDT"), # private type annotations
]
Expand Down
1 change: 1 addition & 0 deletions docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ API Reference
sghi.etl.commons.sinks
sghi.etl.commons.sources
sghi.etl.commons.utils
sghi.etl.commons.workflow_builder
sghi.etl.commons.workflow_definitions


Expand Down
8 changes: 8 additions & 0 deletions src/sghi/etl/commons/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,26 @@
from .sinks import NullSink, ScatterSink, SplitSink, sink
from .sources import GatherSource, source
from .utils import fail_fast, fail_fast_factory, ignored_failed, run_workflow
from .workflow_builder import (
NoSourceProvidedError,
SoleValueAlreadyRetrievedError,
WorkflowBuilder,
)
from .workflow_definitions import SimpleWorkflowDefinition

__all__ = [
"GatherSource",
"NOOPProcessor",
"NoSourceProvidedError",
"NullSink",
"ProcessorPipe",
"SimpleWorkflowDefinition",
"SoleValueAlreadyRetrievedError",
"ScatterGatherProcessor",
"ScatterSink",
"SplitGatherProcessor",
"SplitSink",
"WorkflowBuilder",
"fail_fast",
"fail_fast_factory",
"ignored_failed",
Expand Down
2 changes: 1 addition & 1 deletion src/sghi/etl/commons/processors.py
Original file line number Diff line number Diff line change
Expand Up @@ -803,7 +803,7 @@ def do_apply(raw_data: Sequence[_RDT]) -> _PDT:

@final
class _ProcessorOfCallable(Processor[_RDT, _PDT], Generic[_RDT, _PDT]):
__slots__ = ("_delegate_to", "_is_disposed", "_logger")
__slots__ = ("_delegate_to", "_is_disposed", "_logger", "__dict__")

def __init__(self, delegate_to: _ProcessorCallable[_RDT, _PDT]) -> None:
super().__init__()
Expand Down
2 changes: 1 addition & 1 deletion src/sghi/etl/commons/sinks.py
Original file line number Diff line number Diff line change
Expand Up @@ -596,7 +596,7 @@ def do_drain(processed_data: Sequence[_PDT]) -> None:

@final
class _SinkOfCallable(Sink[_PDT], Generic[_PDT]):
__slots__ = ("_delegate_to", "_is_disposed", "_logger")
__slots__ = ("_delegate_to", "_is_disposed", "_logger", "__dict__")

def __init__(self, delegate_to: _SinkCallable[_PDT]) -> None:
super().__init__()
Expand Down
2 changes: 1 addition & 1 deletion src/sghi/etl/commons/sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ def do_draw() -> _RDT:

@final
class _SourceOfCallable(Source[_RDT], Generic[_RDT]):
__slots__ = ("_delegate_to", "_is_disposed", "_logger")
__slots__ = ("_delegate_to", "_is_disposed", "_logger", "__dict__")

def __init__(self, delegate_to: _SourceCallable[_RDT]) -> None:
super().__init__()
Expand Down
Loading

0 comments on commit d9a5024

Please sign in to comment.