diff --git a/docs/conf.py b/docs/conf.py index 0b5bbdd..a66ff76 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -71,11 +71,15 @@ ("py:class", "TracebackType"), # Used as type annotation. Only available when type checking ("py:class", "concurrent.futures._base.Executor"), # sphinx can't find it ("py:class", "concurrent.futures._base.Future"), # sphinx can't find it + ("py:class", "sghi.exceptions.SGHIError"), # sphinx can't find it ("py:class", "sghi.etl.commons.processors._RDT"), # private type annotations ("py:class", "sghi.etl.commons.processors._PDT"), # private type annotations ("py:class", "sghi.etl.commons.sinks._PDT"), # private type annotations ("py:class", "sghi.etl.commons.sources._RDT"), # private type annotations ("py:class", "sghi.etl.commons.utils.result_gatherers._T"), # private type annotations + ("py:class", "sghi.etl.commons.utils.result_gatherers._T1"), # private type annotations + ("py:class", "sghi.etl.commons.workflow_builder._RDT"), # private type annotations + ("py:class", "sghi.etl.commons.workflow_builder._PDT"), # private type annotations ("py:class", "sghi.etl.commons.workflow_definitions._RDT"), # private type annotations ("py:class", "sghi.etl.commons.workflow_definitions._PDT"), # private type annotations ("py:class", "sghi.etl.core._RDT"), # private type annotations @@ -93,6 +97,8 @@ ("py:obj", "sghi.etl.commons.processors._RDT"), # private type annotations ("py:obj", "sghi.etl.commons.sinks._PDT"), # private type annotations ("py:obj", "sghi.etl.commons.sources._RDT"), # private type annotations + ("py:obj", "sghi.etl.commons.workflow_builder._RDT"), # private type annotations + ("py:obj", "sghi.etl.commons.workflow_builder._PDT"), # private type annotations ("py:obj", "sghi.etl.commons.workflow_definitions._RDT"), # private type annotations ("py:obj", "sghi.etl.commons.workflow_definitions._PDT"), # private type annotations ] diff --git a/docs/index.rst b/docs/index.rst index 668d1fe..bfc1029 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -40,6 +40,7 @@ API Reference sghi.etl.commons.sinks sghi.etl.commons.sources sghi.etl.commons.utils + sghi.etl.commons.workflow_builder sghi.etl.commons.workflow_definitions diff --git a/src/sghi/etl/commons/__init__.py b/src/sghi/etl/commons/__init__.py index e94d29a..e49c76e 100644 --- a/src/sghi/etl/commons/__init__.py +++ b/src/sghi/etl/commons/__init__.py @@ -11,18 +11,26 @@ from .sinks import NullSink, ScatterSink, SplitSink, sink from .sources import GatherSource, source from .utils import fail_fast, fail_fast_factory, ignored_failed, run_workflow +from .workflow_builder import ( + NoSourceProvidedError, + SoleValueAlreadyRetrievedError, + WorkflowBuilder, +) from .workflow_definitions import SimpleWorkflowDefinition __all__ = [ "GatherSource", "NOOPProcessor", + "NoSourceProvidedError", "NullSink", "ProcessorPipe", "SimpleWorkflowDefinition", + "SoleValueAlreadyRetrievedError", "ScatterGatherProcessor", "ScatterSink", "SplitGatherProcessor", "SplitSink", + "WorkflowBuilder", "fail_fast", "fail_fast_factory", "ignored_failed", diff --git a/src/sghi/etl/commons/processors.py b/src/sghi/etl/commons/processors.py index fb0a961..46fff9b 100644 --- a/src/sghi/etl/commons/processors.py +++ b/src/sghi/etl/commons/processors.py @@ -803,7 +803,7 @@ def do_apply(raw_data: Sequence[_RDT]) -> _PDT: @final class _ProcessorOfCallable(Processor[_RDT, _PDT], Generic[_RDT, _PDT]): - __slots__ = ("_delegate_to", "_is_disposed", "_logger") + __slots__ = ("_delegate_to", "_is_disposed", "_logger", "__dict__") def __init__(self, delegate_to: _ProcessorCallable[_RDT, _PDT]) -> None: super().__init__() diff --git a/src/sghi/etl/commons/sinks.py b/src/sghi/etl/commons/sinks.py index 49c9b93..66c4644 100644 --- a/src/sghi/etl/commons/sinks.py +++ b/src/sghi/etl/commons/sinks.py @@ -596,7 +596,7 @@ def do_drain(processed_data: Sequence[_PDT]) -> None: @final class _SinkOfCallable(Sink[_PDT], Generic[_PDT]): - __slots__ = ("_delegate_to", "_is_disposed", "_logger") + __slots__ = ("_delegate_to", "_is_disposed", "_logger", "__dict__") def __init__(self, delegate_to: _SinkCallable[_PDT]) -> None: super().__init__() diff --git a/src/sghi/etl/commons/sources.py b/src/sghi/etl/commons/sources.py index ea2a2d9..5aef6f5 100644 --- a/src/sghi/etl/commons/sources.py +++ b/src/sghi/etl/commons/sources.py @@ -299,7 +299,7 @@ def do_draw() -> _RDT: @final class _SourceOfCallable(Source[_RDT], Generic[_RDT]): - __slots__ = ("_delegate_to", "_is_disposed", "_logger") + __slots__ = ("_delegate_to", "_is_disposed", "_logger", "__dict__") def __init__(self, delegate_to: _SourceCallable[_RDT]) -> None: super().__init__() diff --git a/src/sghi/etl/commons/workflow_builder.py b/src/sghi/etl/commons/workflow_builder.py new file mode 100644 index 0000000..a33ef4e --- /dev/null +++ b/src/sghi/etl/commons/workflow_builder.py @@ -0,0 +1,1086 @@ +"""A DSL for building :class:`sghi.etl.core.WorkflowDefinition` instances.""" + +from __future__ import annotations + +from collections.abc import Callable, Sequence +from typing import Any, Generic, Self, TypeVar + +from typing_extensions import override + +from sghi.etl.core import Processor, Sink, Source, WorkflowDefinition +from sghi.exceptions import SGHIError +from sghi.utils import ( + ensure_callable, + ensure_instance_of, + ensure_not_none_nor_empty, + ensure_optional_instance_of, +) + +from .processors import NOOPProcessor, ScatterGatherProcessor +from .sinks import NullSink, ScatterSink +from .sources import GatherSource +from .workflow_definitions import SimpleWorkflowDefinition + +# ============================================================================= +# TYPES +# ============================================================================= + + +_PDT = TypeVar("_PDT") +"""Type variable representing the data type after processing.""" + +_RDT = TypeVar("_RDT") +"""Type variable representing the raw data type.""" + +_T = TypeVar("_T") + +_T1 = TypeVar("_T1") + +_T2 = TypeVar("_T2") + +_CompositeProcessorFactory = Callable[ + [Sequence[Processor[Any, Any]]], + Processor[Any, Any], +] + +_CompositeSourceFactory = Callable[[Sequence[Source[Any]]], Source[Any]] + +_CompositeSinkFactory = Callable[[Sequence[Sink[Any]]], Sink[Any]] + +_ProcessorFactory = Callable[[], Processor[_T1, _T2]] + +_SinkFactory = Callable[[], Sink[_T2]] + +_SourceFactory = Callable[[], Source[_T1]] + +# ============================================================================= +# EXCEPTIONS +# ============================================================================= + + +class NoSourceProvidedError(SGHIError): + """A :class:`Source` wasn't provided to a :class:`WorkflowBuilder`. + + At least one ``Source`` instance or one factory function that suppliers + ``Source`` instances ought to be provided to a ``WorkflowBuilder`` + instance. This error is raised by the + :meth:`~sghi.etl.commons.workflow_builder.WorkflowBuilder.build` + method of a ``WorkflowBuilder`` instance where this invariant is broken. + """ + + +class SoleValueAlreadyRetrievedError(SGHIError): + """Raised when attempting to retrieve a value that should only be accessed + once. + + This is raised by the factory methods that are created internally by the + :class:`WorkflowBuilder` class to wrap instances of types :class:`Source`, + :class:`Processor` and :class:`Sink`. These factory methods are only meant + to be used once. Subsequent invocations of those factory methods will lead + to this error being raised. + """ # noqa: D205 + + +# ============================================================================= +# WORKFLOW BUILDER +# ============================================================================= + + +class WorkflowBuilder(Generic[_RDT, _PDT]): + """A builder class for constructing + :class:`workflow definitions` in a + structured manner. + + This class helps in assembling SGHI ETL Workflows by configuring + :class:`sources`, + :class:`processors` and + :class:`sinks`. This builder class offers a convenient + way to construct workflows by providing methods to register sources, + processors, and sinks, either individually or using factories. + """ # noqa: D205 + + __slots__ = ( + "_id", + "_name", + "_description", + "_source_factories", + "_processor_factories", + "_sink_factories", + "_default_processor_factory", + "_default_sink_factory", + "_composite_source_factory", + "_composite_processor_factory", + "_composite_sink_factory", + ) + + def __init__( + self, + id: str, # noqa: A002 + name: str, + description: str | None = None, + source_factories: Sequence[_SourceFactory[Any]] | None = None, + processor_factories: Sequence[_ProcessorFactory[Any, Any]] + | None = None, + sink_factories: Sequence[_SinkFactory[Any]] | None = None, + default_processor_factory: _ProcessorFactory[ + _RDT, _PDT + ] = NOOPProcessor, + default_sink_factory: _SinkFactory[_PDT] = NullSink, + composite_source_factory: _CompositeSourceFactory = GatherSource, + composite_processor_factory: _CompositeProcessorFactory = ScatterGatherProcessor, # noqa: E501 + composite_sink_factory: _CompositeSinkFactory = ScatterSink, + ) -> None: + r"""Create a ``WorkflowBuilder`` of the following properties. + + :param id: A unique identifier to assign to the assembled workflow(s). + This MUST be a non-empty string. + :param name: A name to assign to the assembled workflow(s). This MUST + be a non-empty string. + :param description: An optional textual description of the assembled + workflow(s). This MUST be a string when NOT ``None``. Defaults to + ``None`` when not provided. + :param source_factories: An optional ``Sequence`` of ``Source`` + factories that will be used to create the ``Source``\ s of the + assembled workflow(s). This MUST be a ``collections.abc.Sequence`` + instance when not ``None``. Defaults to ``None`` when not provided. + :param processor_factories: An optional ``Sequence`` of ``Processor`` + factories that will be used to create the ``Processor``\ s of the + assembled workflow(s). This MUST be a ``collections.abc.Sequence`` + when not ``None``. Defaults to ``None`` when not provided. + :param sink_factories: An optional ``Sequence`` of ``Sink`` factories + that will be used to create ``Sink``\ s of the assembled + workflow(s). This MUST be a ``collections.abc.Sequence`` when not + ``None``. Defaults to ``None`` when not provided. + :param default_processor_factory: An optional factory function that + will be used to create a ``Processor`` for the assembled + workflow(s) when no explicit ``Processor`` is specified. This MUST + be a valid callable object. Defaults to the ``NOOPProcessor`` + class, which does nothing. + :param default_sink_factory: An optional factory function that will be + used to create a ``Sink`` for the assembled workflow(s) when no + explicit ``Sink`` is specified. This MUST be a valid callable + object. Defaults to the ``NullSink`` class, whose instances discard + all data drained to them. + :param composite_source_factory: An optional factory function that will + be used to combine multiple ``Source``\ s into a single ``Source``. + This factory function is only used when more than one ``Source`` is + provided. This MUST be a valid callable object that accepts a + ``Sequence`` of ``Source`` instances. Defaults to the + ``GatherSource`` class, which yields data from all its embedded + ``Source``\ s concurrently. + :param composite_processor_factory: An optional factory function that + will be used to combine multiple ``Processor``\ s into a single + ``Processor``. This factory function is only used when more than + one ``Processor`` is provided. This MUST be a valid callable object + that accepts a ``Sequence`` of ``Processor`` instances. Defaults to + the ``ScatterGatherProcessor`` class which distributes processing + to all its embedded ``Processor``\ s concurrently before gathering + and returning their results. + :param composite_sink_factory: An optional factory function that will + be used to combine multiple ``Sink``\ s into a single ``Sink``. + This factory function is only used when more than one ``Sink`` is + provided. This MUST be a valid callable object that accepts a + ``Sequence`` of ``Sink`` instances. Defaults to the ``ScatterSink`` + class, which drains data to all its embedded ``Sink``\ s + concurrently. + + :raise TypeError: If ``id`` or ``name`` are NOT of type string. If + ``description`` is NOT a string when NOT ``None``. If + ``source_factories``, ``processor_factories`` or ``sink_factories`` + are NOT of type ``collections.abc.Sequence`` when NOT ``None``. + :raise ValueError: If ``id`` or ``name`` are empty strings. If + ``default_processor_factory``, ``default_sink_factory``, + ``composite_source_factory``, ``composite_processor_factory`` or + ``composite_sink_factory`` are NOT valid callable objects. + """ + super().__init__() + self._id: str = ensure_not_none_nor_empty( + value=ensure_instance_of( + value=id, + klass=str, + message="'id' MUST be a string.", + ), + message="'id' MUST NOT be an empty string.", + ) + self._name: str = ensure_not_none_nor_empty( + value=ensure_instance_of( + value=name, + klass=str, + message="'name' MUST be a string.", + ), + message="'name' MUST NOT be an empty string.", + ) + self._description: str | None = ensure_optional_instance_of( + value=description, + klass=str, + message="'description' MUST be a string when NOT None.", + ) + ensure_optional_instance_of( + value=source_factories, + klass=Sequence, + message=( + "'source_factories' MUST be a collections.abc.Sequence when " + "not None." + ), + ) + self._source_factories: list[_SourceFactory[_RDT]] = [] + self._source_factories.extend(source_factories or ()) + ensure_optional_instance_of( + value=processor_factories, + klass=Sequence, + message=( + "'processor_factories' MUST be a collections.abc.Sequence " + "when not None." + ), + ) + self._processor_factories: list[_ProcessorFactory[_RDT, _PDT]] = [] + self._processor_factories.extend(processor_factories or ()) + ensure_optional_instance_of( + value=sink_factories, + klass=Sequence, + message=( + "'sink_factories' MUST be a collections.abc.Sequence when " + "not None." + ), + ) + self._sink_factories: list[_SinkFactory[_PDT]] = [] + self._sink_factories.extend(sink_factories or ()) + self._default_processor_factory: _ProcessorFactory[_RDT, _PDT] + self._default_processor_factory = ensure_callable( + value=default_processor_factory, + message="'default_processor_factory' MUST be a callable object.", + ) + self._default_sink_factory: _SinkFactory[_PDT] = ensure_callable( + value=default_sink_factory, + message="'default_sink_factory' MUST be a callable object.", + ) + self._composite_source_factory: _CompositeSourceFactory + self._composite_source_factory = ensure_callable( + value=composite_source_factory, + message="'composite_source_factory' MUST be a callable object.", + ) + self._composite_processor_factory: _CompositeProcessorFactory + self._composite_processor_factory = ensure_callable( + value=composite_processor_factory, + message="'composite_processor_factory' MUST be a callable object.", + ) + self._composite_sink_factory: _CompositeSinkFactory + self._composite_sink_factory = ensure_callable( + value=composite_sink_factory, + message="'composite_sink_factory' MUST be a callable object.", + ) + + def __call__(self) -> WorkflowDefinition[_RDT, _PDT]: + """Build and return a :class:`~sghi.etl.core.WorkflowDefinition`. + + This method finalizes the workflow construction and returns a + ``WorkflowDefinition`` instance that encapsulates the workflow's + configuration. + + This method delegates the actual call to :meth:`build`. + + :return: The build ``WorkflowDefinition`` instance. + + :raise NoSourceProvidedError: If neither a ``Source`` nor a factory + function that supplies ``Source`` instances has been provided. + At least one of these is required. + + .. seealso:: :meth:`build`. + """ + return self.build() + + @override + def __repr__(self) -> str: + """Return a sting representation of this ``WorkflowBuilder``. + + This method provides a human-readable representation of this + ``WorkflowBuilder`` which is composed of its ID, name, and description. + + :return: A string representation of this ``WorkflowBuilder``. + """ + return self.__str__() + + @override + def __str__(self) -> str: + """Return a sting representation of this ``WorkflowBuilder``. + + This method provides a human-readable representation of this + ``WorkflowBuilder`` which is composed of its ID, name, and description. + + :return: A string representation of this ``WorkflowBuilder``. + """ + return ( + f"WorkflowBuilder(id={self._id}, name={self._name}, " + f"description={self._description})" + ) + + # PROPERTIES + # ------------------------------------------------------------------------- + @property + def composite_processor_factory(self) -> _CompositeProcessorFactory: + r"""Get the factory function used to combine multiple ``Processor``\ s. + + This property retrieves the factory function currently configured for + combining ``Processor``\ s when creating new ``WorkflowDefinition``\ s. + + .. note:: + + This is only used if more than one ``Processor`` is provided. + """ + return self._composite_processor_factory + + @composite_processor_factory.setter + def composite_processor_factory( + self, + __composite_processor_factory: _CompositeProcessorFactory, + ) -> None: + r"""Set the factory function used to combine multiple ``Processor``\ s. + + :param __composite_processor_factory: A factory function that accepts a + ``Sequence`` of ``Processor`` instances and returns a single + ``Processor`` that combines all the given ``Processor``\ s. This + MUST be a valid callable object. + + :raise ValueError: If the provided value *IS NOT* a valid callable + object. + """ + self._composite_processor_factory = ensure_callable( + value=__composite_processor_factory, + message="'composite_processor_factory' MUST be a callable object.", + ) + + @property + def composite_sink_factory(self) -> _CompositeSinkFactory: + r"""Get the factory function used to combine multiple ``Sink``\ s. + + This property retrieves the factory function currently configured for + combining ``Sink``\ s when creating new ``WorkflowDefinition``\ s. + + .. note:: + + This is only used if more than one ``Sink`` is provided. + """ + return self._composite_sink_factory + + @composite_sink_factory.setter + def composite_sink_factory( + self, + __composite_sink_factory: _CompositeSinkFactory, + ) -> None: + r"""Set the factory function used to combine multiple ``Sink``\ s. + + :param __composite_sink_factory: A factory function that accepts a + ``Sequence`` of ``Sink`` instances and returns a single ``Sink`` + that combines all the given ``Sink``\ s. This MUST be a valid + callable object. + + :raise ValueError: If the provided value *IS NOT* a valid callable + object. + """ + self._composite_sink_factory = ensure_callable( + value=__composite_sink_factory, + message="'composite_sink_factory' MUST be a callable object.", + ) + + @property + def composite_source_factory(self) -> _CompositeSourceFactory: + r"""Get the factory function used to combine multiple ``Source``\ s. + + This property retrieves the factory function currently configured for + combining ``Source``\ s when creating new ``WorkflowDefinition``\ s. + + .. note:: + + This is only used if more than one ``Source`` is provided. + """ + return self._composite_source_factory + + @composite_source_factory.setter + def composite_source_factory( + self, + __composite_source_factory: _CompositeSourceFactory, + ) -> None: + r"""Set the factory function used to combine multiple ``Source``\ s. + + :param __composite_source_factory: A factory function that accepts a + ``Sequence`` of ``Source`` instances and returns a single + ``Source`` that combines all the given ``Source``\ s. This MUST be + a valid callable object. + + :raise ValueError: If the provided value *IS NOT* a valid callable + object. + """ + self._composite_source_factory = ensure_callable( + value=__composite_source_factory, + message="'composite_source_factory' MUST be a callable object.", + ) + + @property + def default_processor_factory(self) -> _ProcessorFactory[_RDT, _PDT]: + r"""Get the factory function used to create default ``Processor``\ s. + + This property retrieves the factory function currently configured to + create ``Processor``\ s for the assembled ``WorkflowDefinition``\ (s) + when no explicit ``Processor`` is provided to the builder. + + The returned factory function is invoked each time :meth:`build` is + invoked and no ``Processor`` is provided to the builder. + """ + return self._default_processor_factory + + @default_processor_factory.setter + def default_processor_factory( + self, + __default_processor_factory: _ProcessorFactory[_RDT, _PDT], + ) -> None: + r"""Set the factory function used to create default ``Processor``\ s. + + :param __default_processor_factory: A factory function that supplies + ``Processor`` instances to be used as default ``Processor``\ (s) + for the assembled ``WorkflowDefinition``\ (s) when no explicit + ``Processor`` is provided to the builder. This MUST be a valid + callable object. + + :raise ValueError: If the provided value *IS NOT* a valid callable + object. + """ + self._default_processor_factory = ensure_callable( + value=__default_processor_factory, + message="'default_processor_factory' MUST be a callable object.", + ) + + @property + def default_sink_factory(self) -> _SinkFactory[_PDT]: + r"""Get the factory function used to create default ``Sink``\ s. + + This property retrieves the factory function currently configured to + create ``Sink``\ s for the assembled ``WorkflowDefinition``\ (s) when + no explicit ``Sink`` is provided to the builder. + + The returned factory function is invoked each time :meth:`build` is + invoked and no ``Sink`` is provided to the builder. + """ + return self._default_sink_factory + + @default_sink_factory.setter + def default_sink_factory( + self, + __default_sink_factory: _SinkFactory[_PDT], + ) -> None: + r"""Set the factory function used to create default ``Sink``\ s. + + :param __default_sink_factory: A factory function that supplies + ``Sink`` instances to be used as the default ``Sink``\ (s) for + the assembled ``WorkflowDefinition``\ (s) when no explicit + ``Sink`` is provided to the builder. This MUST be a valid + callable object. + + :raise ValueError: If the provided value *IS NOT* a valid callable + object. + """ + self._default_sink_factory: _SinkFactory[_PDT] = ensure_callable( + value=__default_sink_factory, + message="'default_sink_factory' MUST be a callable object.", + ) + + @property + def description(self) -> str | None: + """An optional textual description of the assembled workflow(s).""" + return self._description + + @description.setter + def description(self, __description: str | None) -> None: + r"""Set the description of the assembled ``WorkflowDefinition``\ (s). + + :param __description: An optional textual description of the assembled + ``WorkflowDefinition``\ (s). This MUST be a string when NOT + ``None``. + + :raise TypeError: If the given value is NEITHER a string nor ``None``. + """ + self._description: str | None = ensure_optional_instance_of( + value=__description, + klass=str, + message="'description' MUST be a string when NOT None.", + ) + + @property + def id(self) -> str: + """The identifier to assign to the assembled workflow(s).""" + return self._id + + @id.setter + def id(self, __id: str) -> None: + r"""Set an identifier for the assembled ``WorkflowDefinition``\ (s). + + :param __id: A unique identifier to assign to the assembled + ``WorkflowDefinition``\ (s). This MUST be a non-empty string. + + :raise ValueError: If the provided value is an empty string. + :raise TypeError: If the provided value is NOT of type string. + """ + self._id = ensure_not_none_nor_empty( + value=ensure_instance_of( + value=__id, + klass=str, + message="'id' MUST be a string.", + ), + message="'id' MUST NOT be an empty string.", + ) + + @property + def name(self) -> str: + """The name to assign to the assembled workflow(s).""" + return self._name + + @name.setter + def name(self, __name: str) -> None: + r"""Set the name of the assembled ``WorkflowDefinition``\ (s). + + :param __name: A name to assign to the created + ``WorkflowDefinition``\ (s). This MUST be a non-empty string. + + :raise ValueError: If the provided value is an empty string. + :raise TypeError: If the provided value is NOT of type string. + """ + self._name: str = ensure_not_none_nor_empty( + value=ensure_instance_of( + value=__name, + klass=str, + message="'name' MUST be a string.", + ), + message="'name' MUST NOT be an empty string.", + ) + + @property + def processor_factories(self) -> Sequence[_ProcessorFactory[Any, Any]]: + r"""Get the ``Processor`` factories registered to create + ``Processor``\ s for the assembled ``WorkflowDefinition``\ (s). + + This property retrieves a ``Sequence`` of factory functions currently + configured to create ``Processor``\ s for the assembled + ``WorkflowDefinition``\ (s). These are the factory functions explicitly + provided by clients of this class to this builder. + + .. tip:: + + If the ``Sequence`` returned by this property is empty, then when + :meth:`build` is invoked, the factory function returned by the + :attr:`default_processor_factory` property will be used to create + a default ``Processor`` for the assembled ``WorkflowDefinition``. + """ # noqa: D205 + return tuple(self._processor_factories) + + @processor_factories.setter + def processor_factories( + self, + __processor_factories: Sequence[_ProcessorFactory[Any, Any]], + ) -> None: + r"""Set the ``Processor`` factories to use when creating + ``Processor``\ s for the assembled ``WorkflowDefinition``\ (s). + + :param __processor_factories: A ``Sequence`` of factory functions that + supply ``Processor`` instances for the assembled + ``WorkflowDefinition``\ (s). This MUST be an instance of + ``collections.abc.Sequence``. An empty ``Sequence`` is a valid + value. + + :raise TypeError: If the provided value is NOT an instance of + ``collections.abc.Sequence``. + """ # noqa: D205 + ensure_instance_of( + value=__processor_factories, + klass=Sequence, + message="'processor_factories' MUST be a collections.abc.Sequence.", # noqa: E501 + ) + self._processor_factories = list(__processor_factories) + + @property + def sink_factories(self) -> Sequence[_SinkFactory[Any]]: + r"""Get the ``Sink`` factories registered to create ``Sink``\ s for the + assembled ``WorkflowDefinition``\ (s). + + This property retrieves a ``Sequence`` of factory functions currently + configured to create ``Sink``\ s for the assembled + ``WorkflowDefinition``\ (s). These are the factory functions explicitly + provided by clients of this class to this builder. + + .. tip:: + + If the ``Sequence`` returned by this property is empty, then when + :meth:`build` is invoked, the factory function returned by the + :attr:`default_sink_factory` property will be used to create a + default ``Sink`` for the assembled ``WorkflowDefinition``. + """ # noqa: D205 + return tuple(self._sink_factories) + + @sink_factories.setter + def sink_factories( + self, + __sink_factories: Sequence[_SinkFactory[Any]], + ) -> None: + r"""Set the ``Sink`` factories to use when creating ``Sink``\ s for the + assembled ``WorkflowDefinition``\ (s). + + :param __sink_factories: A ``Sequence`` of factory functions that + supply ``Sink`` instances for the assembled + ``WorkflowDefinition``\ (s). This MUST be an instance of + ``collections.abc.Sequence``. An empty ``Sequence`` is a valid + value. + + :raise TypeError: If the provided value is NOT an instance of + ``collections.abc.Sequence``. + """ # noqa: D205 + ensure_instance_of( + value=__sink_factories, + klass=Sequence, + message="'sink_factories' MUST be a collections.abc.Sequence.", + ) + self._sink_factories = list(__sink_factories) + + @property + def source_factories(self) -> Sequence[_SourceFactory[Any]]: + r"""Get the ``Source`` factories registered to create ``Source``\ s for + the assembled ``WorkflowDefinition``\ (s). + + This property retrieves a ``Sequence`` of factory functions currently + configured to create ``Source``\ s for the assembled + ``WorkflowDefinition``\ (s). These are the factory functions explicitly + provided by clients of this class to this builder. + + .. caution:: + + If the ``Sequence`` returned by this property is empty, then when + :meth:`build` is invoked, an exception will be raised. At least one + ``Source`` MUST be explicitly provided to a builder before a + ``WorkflowDefinition`` can be built. + """ # noqa: D205 + return tuple(self._source_factories) + + @source_factories.setter + def source_factories( + self, + __source_factories: Sequence[_SourceFactory[Any]], + ) -> None: + r"""Set the ``Source`` factories to use when creating ``Source``\ s + for the assembled ``WorkflowDefinition``\ (s). + + :param __source_factories: A ``Sequence`` of factory functions that + supply ``Source`` instances for the assembled + ``WorkflowDefinition``\ (s). This MUST be an instance of + ``collections.abc.Sequence``. An empty ``Sequence`` is a valid + value. + + :raise TypeError: If the provided value is NOT an instance of + ``collections.abc.Sequence``. + """ # noqa: D205 + ensure_instance_of( + value=__source_factories, + klass=Sequence, + message="'source_factories' MUST be a collections.abc.Sequence.", + ) + self._source_factories = list(__source_factories) + + # BUILD + # ------------------------------------------------------------------------- + def build(self) -> WorkflowDefinition[_RDT, _PDT]: + """Build and return a :class:`~sghi.etl.core.WorkflowDefinition`. + + This method finalizes the workflow construction and returns a + ``WorkflowDefinition`` instance that encapsulates the workflow's + configuration. + + :return: The build ``WorkflowDefinition`` instance. + + :raise NoSourceProvidedError: If neither a ``Source`` nor a factory + function that supplies ``Source`` instances has been provided. + At least one of these is required. + """ + return SimpleWorkflowDefinition( + id=self.id, + name=self._name, + description=self._description, + source_factory=self._build_source_factory(), + processor_factory=self._build_processor_factory(), + sink_factory=self._build_sink_factory(), + ) + + # DECORATORS + # ------------------------------------------------------------------------- + def applies_processor( + self, + processor: Processor[Any, Any], + ) -> Processor[Any, Any]: + r"""Register a ``Processor`` to be used in the assembled + ``WorkflowDefinition``. + + This method accepts a ``Processor`` instance and is meant to be + used as a decorator. It delegates the actual call to the + :meth:`~sghi.etl.commons.workflow_builder.WorkflowBuilder.apply_processor` + method and as such, the same constraints hold. + + .. important:: + + The given ``Processor`` instance is internally converted into a + factory function that only supplies the ``Processor`` instance + once. Attempting to get a value from the factory function again + (e.g. by invoking + :meth:`~sghi.etl.commons.workflow_builder.WorkflowBuilder.build` + again) will result in a :exc:`SoleValueAlreadyRetrievedError` being + raised. As a result, the builder SHOULD NOT be used to construct + multiple ``WorkflowDefinition``\ s. Doing so will result in failure + when the :meth:`build` method is invoked multiple times on the same + builder. Clients of this class should be aware of this limitation. + + :param processor: A ``Processor`` instance to be included in the + assembled ``WorkflowDefinition``. This MUST BE a ``Processor`` + instance. + + :return: The given ``Processor`` instance. + + :raise TypeError: If ``processor`` is NOT a ``Processor`` instance. + + .. seealso:: :meth:`apply_processor`. + """ # noqa: D205 + ensure_instance_of( + value=processor, + klass=Processor, + message=( + "'processor' MUST be an 'sghi.etl.core.Processor' instance." + ), + ) + self.apply_processor(processor) + return processor + + def drains_to(self, sink: Sink[Any]) -> Sink[Any]: + r"""Register a ``Sink`` to be used in the assembled + ``WorkflowDefinition``. + + This method accepts a ``Sink`` instance and is meant to be used as a + decorator. It delegates the actual call to the + :meth:`~sghi.etl.commons.workflow_builder.WorkflowBuilder.drain_to` + method and as such, the same constraints hold. + + .. important:: + + The given ``Sink`` instance is internally converted into a factory + function that only supplies the ``Sink`` instance once. Attempting + to get a value from the factory function again (e.g. by invoking + :meth:`~sghi.etl.commons.workflow_builder.WorkflowBuilder.build` + again) will result in a :exc:`SoleValueAlreadyRetrievedError` being + raised. As a result, the builder SHOULD NOT be used to construct + multiple ``WorkflowDefinition``\ s. Doing so will result in failure + when the :meth:`build` method is invoked multiple times on the same + builder. Clients of this class should be aware of this limitation. + + :param sink: A ``Sink`` instance to be included in the assembled + ``WorkflowDefinition``. This MUST BE a ``Sink`` instance. + + :return: The given ``Sink`` instance. + + :raise TypeError: If ``sink`` is NOT a ``Sink`` instance. + + .. seealso:: :meth:`drain_to`. + """ # noqa: D205 + ensure_instance_of( + value=sink, + klass=Sink, + message="'sink' MUST be an 'sghi.etl.core.Sink' instance.", + ) + self.drain_to(sink) + return sink + + def draws_from(self, source: Source[Any]) -> Source[Any]: + r"""Register a ``Source`` to be used in the assembled + ``WorkflowDefinition``. + + This method accepts a ``Source`` instance and is meant to be used as a + decorator. It delegates the actual call to the + :meth:`~sghi.etl.commons.workflow_builder.WorkflowBuilder.draw_from` + method and as such, the same constraints hold. + + .. important:: + + The given ``Source`` instance is internally converted into a + factory function that only supplies the ``Source`` instance + once. Attempting to get a value from the factory function again + (e.g. by invoking + :meth:`~sghi.etl.commons.workflow_builder.WorkflowBuilder.build` + again) will result in a :exc:`SoleValueAlreadyRetrievedError` being + raised. As a result, the builder SHOULD NOT be used to construct + multiple ``WorkflowDefinition``\ s. Doing so will result in failure + when the :meth:`build` method is invoked multiple times on the same + builder. Clients of this class should be aware of this limitation. + + :param source: A ``Source`` instance to be included in the assembled + ``WorkflowDefinition``. This MUST BE a ``Source`` instance. + + :return: The given ``Source`` instance. + + :raise TypeError: If ``processor`` is NOT a ``Source`` instance. + + .. seealso:: :meth:`draw_from`. + """ # noqa: D205 + ensure_instance_of( + value=source, + klass=Source, + message="'source' MUST be an 'sghi.etl.core.Source' instance.", + ) + self.draw_from(source) + return source + + # FLOW API + # ------------------------------------------------------------------------- + def apply_processor( + self, + processor: Processor[Any, Any] | _ProcessorFactory[Any, Any], + ) -> Self: + r"""Register a ``Processor`` to be used in the assembled + ``WorkflowDefinition``\ (s). + + This method accepts a ``Processor`` instance or factory + function that supplies ``Processor`` instances to be included + in the assembled ``WorkflowDefinition``\ (s). + + .. important:: + + When a ``Processor`` instance is provided instead of a factory + function, it is internally converted into a factory function that + only supplies the ``Processor`` instance once. Attempting to get a + value from the factory function again will result in a + :exc:`SoleValueAlreadyRetrievedError` being raised. As a result, + the builder SHOULD NOT be used to construct multiple + ``WorkflowDefinition``\ s. Doing so will result in failure when the + :meth:`build` method is invoked multiple times on the same builder. + Clients of this class should be aware of this limitation. + + :param processor: A ``Processor`` instance or factory function that + suppliers ``Processor`` instances to be included in the assembled + ``WorkflowDefinition``\ (s). This MUST EITHER be a ``Processor`` + instance or a valid callable object that suppliers ``Processor`` + instances. + + :return: This builder, i.e. ``self``. + + :raise ValueError: If ``processor`` is NEITHER a ``Processor`` + instance NOR a callable object. + """ # noqa: D205 + match processor: + case Processor(): + self._processor_factories.append( + self._create_factory(processor) + ) + case _ if callable(processor): + # noinspection PyTypeChecker + self._processor_factories.append(processor) + case _: + _err_msg: str = ( + "'processor' MUST be an 'sghi.etl.core.Processor' " + "instance or a factory function that returns an instance " + "of the same type." + ) + raise ValueError(_err_msg) + + return self + + def clear_processor_factories(self) -> Self: + r"""Remove all the registered ``Processor``\ s from the builder. + + After this method returns, the :attr:`processor_factories` property + returns an empty ``Sequence``. + """ + self._processor_factories.clear() + return self + + def clear_sink_factories(self) -> Self: + r"""Remove all the registered ``Sink``\ s from the builder. + + After this method returns, the :attr:`sink_factories` property returns + an empty ``Sequence``. + """ + self._sink_factories.clear() + return self + + def clear_source_factories(self) -> Self: + r"""Remove all the registered ``Source``\ s from the builder. + + After this method returns, the :attr:`source_factories` property + returns an empty ``Sequence``. + """ + self._source_factories.clear() + return self + + def drain_to(self, sink: Sink[Any] | _SinkFactory[Any]) -> Self: + r"""Register a ``Sink`` to be used in the assembled + ``WorkflowDefinition``\ (s). + + This method accepts a ``Sink`` instance or factory function that + supplies ``Sink`` instances to be included in the assembled + ``WorkflowDefinition``\ (s). + + .. important:: + + When a ``Sink`` instance is provided instead of a factory function, + it is internally converted into a factory function that only + supplies the ``Sink`` instance once. Attempting to get a + value from the factory function again will result in a + :exc:`SoleValueAlreadyRetrievedError` being raised. As a result, + the builder SHOULD NOT be used to construct multiple + ``WorkflowDefinition``\ s. Doing so will result in failure when the + :meth:`build` method is invoked multiple times on the same builder. + Clients of this class should be aware of this limitation. + + :param sink: A ``Sink`` instance or factory function that suppliers + ``Sink`` instances to be included in the assembled + ``WorkflowDefinition``\ (s). This MUST EITHER be a ``Sink`` + instance or a valid callable object that suppliers ``Sink`` + instances. + + :return: This builder, i.e. ``self``. + + :raise ValueError: If ``sink`` is NEITHER a ``Sink`` instance NOR a + callable object. + """ # noqa: D205 + match sink: + case Sink(): + self._sink_factories.append(self._create_factory(sink)) + case _ if callable(sink): + # noinspection PyTypeChecker + self._sink_factories.append(sink) + case _: + _err_msg: str = ( + "'sink' MUST be an 'sghi.etl.core.Sink' instance or a " + "factory function that returns an instance of the same " + "type." + ) + raise ValueError(_err_msg) + + return self + + def draw_from(self, source: Source[Any] | _SourceFactory[Any]) -> Self: + r"""Register a ``Source`` to be used in the assembled + ``WorkflowDefinition``\ (s). + + This method accepts a ``Source`` instance or factory function that + supplies ``Source`` instances to be included in the assembled + ``WorkflowDefinition``\ (s). + + .. important:: + + When a ``Source`` instance is provided instead of a factory + function, it is internally converted into a factory function that + only supplies the ``Source`` instance once. Attempting to get a + value from the factory function again will result in a + :exc:`SoleValueAlreadyRetrievedError` being raised. As a result, + the builder SHOULD NOT be used to construct multiple + ``WorkflowDefinition``\ s. Doing so will result in failure when the + :meth:`build` method is invoked multiple times on the same builder. + Clients of this class should be aware of this limitation. + + :param source: A ``Source`` instance or factory function that suppliers + ``Source`` instances to be included in the assembled + ``WorkflowDefinition``\ (s). This MUST EITHER be a ``Source`` + instance or a valid callable object that suppliers ``Source`` + instances. + + :return: This builder, i.e. ``self``. + + :raise ValueError: If ``source`` is NEITHER a ``Source`` instance NOR a + callable object. + """ # noqa: D205 + match source: + case Source(): + self._source_factories.append(self._create_factory(source)) + case _ if callable(source): + # noinspection PyTypeChecker + self._source_factories.append(source) + case _: + _err_msg: str = ( + "'source' MUST be an 'sghi.etl.core.Source' instance or a " + "factory function that returns an instance of the same " + "type." + ) + raise ValueError(_err_msg) + + return self + + # HELPERS + # ------------------------------------------------------------------------- + def _build_processor_factory(self) -> _ProcessorFactory[_RDT, _PDT]: + match self._processor_factories: + case (_, _, *_): + + def _factory() -> Processor[_RDT, _PDT]: # pragma: no cover + return self._composite_processor_factory( + [_pf() for _pf in self._processor_factories] + ) + + return _factory + case (entry, *_): + return entry + case _: + return self._default_processor_factory + + def _build_sink_factory(self) -> _SinkFactory[_PDT]: + match self._sink_factories: + case (_, _, *_): + + def _factory() -> Sink[_PDT]: # pragma: no cover + return self._composite_sink_factory( + [_sf() for _sf in self._sink_factories] + ) + + return _factory + case (entry, *_): + return entry + case _: + return self._default_sink_factory + + def _build_source_factory(self) -> _SourceFactory[_RDT]: + match self._source_factories: + case (_, _, *_): + + def _factory() -> Source[_RDT]: # pragma: no cover + return self._composite_source_factory( + [_sf() for _sf in self._source_factories] + ) + + return _factory + case (entry, *_): + return entry + case _: + _err_msg: str = ( + "No sources available. At least once 'Source' or one " + "factory function that supplies 'Source' instances MUST " + "be provided." + ) + raise NoSourceProvidedError(_err_msg) + + @staticmethod + def _create_factory(val: _T) -> Callable[[], _T]: + has_been_retrieved: bool = False + + def _return_once_then_raise_factory() -> _T: + nonlocal has_been_retrieved + if has_been_retrieved: + _err_msg: str = ( + "This factory's sole value has already been retrieved." + ) + raise SoleValueAlreadyRetrievedError(_err_msg) + + has_been_retrieved = True + return val + + return _return_once_then_raise_factory + + +# ============================================================================= +# MODULE EXPORTS +# ============================================================================= + + +__all__ = [ + "NoSourceProvidedError", + "SoleValueAlreadyRetrievedError", + "WorkflowBuilder", +] diff --git a/test/sghi/etl/commons_tests/processors_tests.py b/test/sghi/etl/commons_tests/processors_tests.py index 6371ccb..eb5986b 100644 --- a/test/sghi/etl/commons_tests/processors_tests.py +++ b/test/sghi/etl/commons_tests/processors_tests.py @@ -42,6 +42,7 @@ def ints_to_chars(ints: Iterable[int]) -> Iterable[str]: """ def __init__(self) -> None: + super().__init__() self._buffer: StringIO = StringIO(newline="") @property diff --git a/test/sghi/etl/commons_tests/sources_tests.py b/test/sghi/etl/commons_tests/sources_tests.py index 300ccf3..cfd5aad 100644 --- a/test/sghi/etl/commons_tests/sources_tests.py +++ b/test/sghi/etl/commons_tests/sources_tests.py @@ -22,6 +22,7 @@ class _StreamingSource(Source[Iterable[int]]): def __init__(self) -> None: + super().__init__() self._yielded: int = 0 self._is_disposed: bool = False diff --git a/test/sghi/etl/commons_tests/workflow_builder_tests.py b/test/sghi/etl/commons_tests/workflow_builder_tests.py new file mode 100644 index 0000000..c361631 --- /dev/null +++ b/test/sghi/etl/commons_tests/workflow_builder_tests.py @@ -0,0 +1,1182 @@ +# ruff: noqa: D205 +"""Tests for the :module:`sghi.etl.commons.workflow_builder` module.""" + +from __future__ import annotations + +from collections.abc import Iterator +from typing import TYPE_CHECKING, Any, Self +from unittest import TestCase + +import pytest +from typing_extensions import override + +from sghi.etl.commons import ( + GatherSource, + NOOPProcessor, + NoSourceProvidedError, + NullSink, + ProcessorPipe, + ScatterGatherProcessor, + ScatterSink, + SoleValueAlreadyRetrievedError, + SplitGatherProcessor, + SplitSink, + WorkflowBuilder, + processor, + sink, + source, +) +from sghi.etl.core import Source, WorkflowDefinition +from sghi.task import not_disposed + +if TYPE_CHECKING: + from collections.abc import Sequence + + from sghi.etl.core import Processor, Sink + +# ============================================================================= +# TESTS HELPERS +# ============================================================================= + + +class Zero(Source[Iterator[int]]): + """A :class:`Source` that provides as many zeros as drawn. + + This is similar to https://en.wikipedia.org/wiki//dev/zero. + """ + + __slots__ = ("_is_disposed",) + + def __init__(self) -> None: + """Create a new instance of ``Zero`` source.""" + super().__init__() + self._is_disposed: bool = False + + @not_disposed + @override + def __enter__(self) -> Self: + return super(Source, self).__enter__() + + @property + @override + def is_disposed(self) -> bool: + return self._is_disposed + + @override + def dispose(self) -> None: + self._is_disposed = True + + @override + def draw(self) -> Iterator[int]: + while True: + yield 0 + + +# ============================================================================= +# TESTS +# ============================================================================= + + +class TestWorkflowBuilder(TestCase): + """Tests for the :class:`sghi.etl.commons.WorkflowBuilder` class.""" + + @override + def setUp(self) -> None: + super().setUp() + self._instance1 = WorkflowBuilder( + id="test_workflow_1", + name="Test Workflow One", + ) + self._instance2 = WorkflowBuilder( + id="test_workflow_2", + name="Test Workflow Two", + description="A sample ETL workflow.", + composite_processor_factory=ProcessorPipe, + composite_sink_factory=SplitSink, + processor_factories=[NOOPProcessor], + sink_factories=[NullSink], + source_factories=[Zero], + ) + + def test_applies_processor_fails_when_given_invalid_input(self) -> None: + """The decorator :meth:`WorkflowBuilder.applies_processor` should raise + a :exc:`TypeError` when the given value IS NOT a :class:`Processor` + instance. + """ + for non_processor in (None, 1, 5.2, str, NullSink()): + with pytest.raises(TypeError, match="Processor") as exp_info: + self._instance1.applies_processor(non_processor) # type: ignore + + assert ( + exp_info.value.args[0] + == "'processor' MUST be an 'sghi.etl.core.Processor' instance." + ) + + def test_applies_processor_return_value(self) -> None: + """The decorator meth:`WorkflowBuilder.applies_processor` should return + the :class:`Processor` instance given to it. + """ + noop = NOOPProcessor() + + @processor + def double(values: Iterator[int]) -> Iterator[int]: + for value in values: + yield value * 2 + + assert self._instance1.applies_processor(noop) is noop + assert self._instance2.applies_processor(double) is double + + def test_applies_processor_side_effects(self) -> None: + """The decorator :meth:`WorkflowBuilder.applies_processor` should + convert the given :class:`Processor` instance into a factory function + that returns the ``Processor`` once and then append the factory + function to the list of available processor factories. + """ + noop = NOOPProcessor() + + self._instance1.applies_processor(noop) + assert len(self._instance1.processor_factories) == 1 + assert callable(self._instance1.processor_factories[0]) + + noop_factory = self._instance1.processor_factories[0] + # The 1st call should yield the original value + assert noop_factory() is noop + # Later calls should result in `SoleValueAlreadyRetrievedError` being + # raised. + for _ in range(10): + with pytest.raises(SoleValueAlreadyRetrievedError) as exp_info: + noop_factory() + + assert ( + exp_info.value.message + == "This factory's sole value has already been retrieved." + ) + + def test_apply_processor_fails_when_given_invalid_input(self) -> None: + """The method :meth:`WorkflowBuilder.apply_processor` should raise + a :exc:`ValueError` when the given value IS NOT a :class:`Processor` + instance or a callable object. + """ + for non_processor in (None, 1, 5.2, {}, (), []): + with pytest.raises(ValueError, match="Processor") as exp_info: + self._instance1.apply_processor(non_processor) # type: ignore + + assert exp_info.value.args[0] == ( + "'processor' MUST be an 'sghi.etl.core.Processor' " + "instance or a factory function that returns an instance " + "of the same type." + ) + + def test_apply_processor_return_value(self) -> None: + """The method meth:`WorkflowBuilder.apply_processor` should return + the current ``WorkflowBuilder`` instance, i.e. ``self``, on exit. + """ + + @processor + def double(values: Iterator[int]) -> Iterator[int]: + for value in values: + yield value * 2 + + np = NOOPProcessor + assert self._instance1.apply_processor(np) is self._instance1 + assert self._instance2.apply_processor(double) is self._instance2 + + def test_apply_processor_side_effects(self) -> None: + """The method :meth:`WorkflowBuilder.apply_processor` should + convert the given :class:`Processor` instance into a factory function + that returns the ``Processor`` once and then append the factory + function to the list of available processor factories. + + If the given value is a callable object, then append it to the list of + available processor factories as is. + """ + noop = NOOPProcessor() + + self._instance1.apply_processor(noop) + self._instance1.apply_processor(NOOPProcessor) + assert len(self._instance1.processor_factories) == 2 + assert callable(self._instance1.processor_factories[0]) + assert callable(self._instance1.processor_factories[1]) + assert self._instance1.processor_factories[1] is NOOPProcessor + + noop_factory = self._instance1.processor_factories[0] + # The 1st call should yield the original value + assert noop_factory() is noop + # Later calls should result in `SoleValueAlreadyRetrievedError` being + # raised. + for _ in range(10): + with pytest.raises(SoleValueAlreadyRetrievedError) as exp_info: + noop_factory() + + assert ( + exp_info.value.message + == "This factory's sole value has already been retrieved." + ) + + def test_build_fails_when_no_source_is_provided(self) -> None: + """:meth:`WorkflowBuilder.build` should raise a + :exc:`NoSourceProvidedError` when no ``Source`` or ``Source`` factory + has been provided. + """ + assert len(self._instance1.source_factories) == 0 + + with pytest.raises(NoSourceProvidedError) as exp_info: + self._instance1.build() + + assert exp_info.value.message == ( + "No sources available. At least once 'Source' or one " + "factory function that supplies 'Source' instances MUST " + "be provided." + ) + + def test_build_with_defaults(self) -> None: + """:meth:`WorkflowBuilder.build` should use + :attr:`WorkflowBuilder.default_processor_factory` and + :attr:`WorkflowBuilder.default_sink_factory` factories to create a + :class:`Processor` and :class:`Sink` respectively, when they aren't + provided. + """ + assert len(self._instance1.processor_factories) == 0 + assert len(self._instance1.sink_factories) == 0 + assert len(self._instance1.source_factories) == 0 + + workflow_def: WorkflowDefinition[Any, Any] + workflow_def = self._instance1.draw_from(Zero).build() + + assert isinstance(workflow_def, WorkflowDefinition) + assert isinstance(workflow_def.processor_factory(), NOOPProcessor) + assert isinstance(workflow_def.sink_factory(), NullSink) + assert isinstance(workflow_def.source_factory(), Zero) + + def test_build_with_multiple_components(self) -> None: + r""":meth:`WorkflowBuilder.build` should use the factories returned + by the :attr:`WorkflowBuilder.composite_processor_factory`, + :attr:`WorkflowBuilder.composite_sink_factory` and + :attr:`WorkflowBuilder.composite_source_factory` to combine multiple + ``Processor``\ s, ``Sink``\ s and ``Source``\ s respectively when they + are given. + """ + wb: WorkflowBuilder[Any, Any] = self._instance1 + assert len(wb.processor_factories) == 0 + assert len(wb.sink_factories) == 0 + assert len(wb.source_factories) == 0 + + @wb.applies_processor + @processor + def add_10(values: Sequence[Iterator[int]]) -> Iterator[int]: # pyright: ignore[reportUnusedFunction] + for value in values[0]: + yield value + 10 + + @wb.applies_processor + @processor + def mul_10(values: Sequence[Iterator[int]]) -> Iterator[int]: # pyright: ignore[reportUnusedFunction] + for value in values[1]: + yield value * 10 + + db1: list[int] = [] + db2: dict[str, int] = {} + + @wb.drains_to + @sink + def save_to_db1(values: Sequence[Iterator[int]]) -> None: # pyright: ignore[reportUnusedFunction] + db1.extend(values[0]) + + @wb.drains_to + @sink + def save_to_db2(values: Sequence[Iterator[int]]) -> None: # pyright: ignore[reportUnusedFunction] + for value in values[1]: + db2[str(value)] = value + + wb.composite_processor_factory = SplitGatherProcessor + wb.composite_source_factory = GatherSource + wb.composite_sink_factory = SplitSink + + workflow_def: WorkflowDefinition[Any, Any] + workflow_def = wb.draw_from(Zero).draw_from(Zero).build() + + assert isinstance(workflow_def, WorkflowDefinition) + assert isinstance( + workflow_def.processor_factory(), + SplitGatherProcessor, + ) + assert isinstance(workflow_def.sink_factory(), SplitSink) + assert isinstance(workflow_def.source_factory(), GatherSource) + + def test_build_with_single_components(self) -> None: + """:meth:`WorkflowBuilder.build` should return a ``WorkflowDefinition`` + whose ``processor_factory``, ``sink_factory`` or ``source_factory`` + are the same as those given to the builder if only one of the factory + for each of the component is given. + """ + wb: WorkflowBuilder[Any, Any] = self._instance1 + assert len(wb.processor_factories) == 0 + assert len(wb.sink_factories) == 0 + assert len(wb.source_factories) == 0 + + @wb.applies_processor + @processor + def add_10(values: Iterator[int]) -> Iterator[int]: + for value in values: + yield value + 10 + + @wb.drains_to + @sink + def discard(values: Iterator[int]) -> None: + for _ in values: + ... + + workflow_def: WorkflowDefinition[Any, Any] + workflow_def = wb.draw_from(Zero).build() + + assert isinstance(workflow_def, WorkflowDefinition) + assert workflow_def.processor_factory() is add_10 + assert workflow_def.sink_factory() is discard + assert isinstance(workflow_def.source_factory(), Zero) + + def test_clear_processor_factories_side_effects(self) -> None: + r""":meth:`WorkflowBuilder.clear_processor_factories` should remove all + registered :class:`Processor`\ s from the builder. + """ + assert len(self._instance1.processor_factories) == 0 + assert len(self._instance2.processor_factories) > 0 + + self._instance1.clear_processor_factories() + self._instance2.clear_processor_factories() + + assert len(self._instance1.processor_factories) == 0 + assert len(self._instance2.processor_factories) == 0 + + def test_clear_processor_factories_return_value(self) -> None: + """:meth:`WorkflowBuilder.clear_processor_factories` should return the + current ``WorkflowBuilder`` instance, i.e. ``self``. + """ + assert self._instance1.clear_processor_factories() is self._instance1 + assert self._instance2.clear_processor_factories() is self._instance2 + + def test_clear_sink_factories_side_effects(self) -> None: + r""":meth:`WorkflowBuilder.clear_sink_factories` should remove all + registered :class:`Sink`\ s from the builder. + """ + assert len(self._instance1.sink_factories) == 0 + assert len(self._instance2.sink_factories) > 0 + + self._instance1.clear_sink_factories() + self._instance2.clear_sink_factories() + + assert len(self._instance1.sink_factories) == 0 + assert len(self._instance2.sink_factories) == 0 + + def test_clear_sink_factories_return_value(self) -> None: + """:meth:`WorkflowBuilder.clear_sink_factories` should return the + current ``WorkflowBuilder`` instance, i.e. ``self``. + """ + assert self._instance1.clear_sink_factories() is self._instance1 + assert self._instance2.clear_sink_factories() is self._instance2 + + def test_clear_source_factories_side_effects(self) -> None: + r""":meth:`WorkflowBuilder.clear_source_factories` should remove all + registered :class:`Source`\ s from the builder. + """ + assert len(self._instance1.source_factories) == 0 + assert len(self._instance2.source_factories) > 0 + + self._instance1.clear_source_factories() + self._instance2.clear_source_factories() + + assert len(self._instance1.source_factories) == 0 + assert len(self._instance2.source_factories) == 0 + + def test_clear_source_factories_return_value(self) -> None: + """:meth:`WorkflowBuilder.clear_source_factories` should return the + current ``WorkflowBuilder`` instance, i.e. ``self``. + """ + assert self._instance1.clear_source_factories() is self._instance1 + assert self._instance2.clear_source_factories() is self._instance2 + + def test_composite_processor_factory_modification_with_valid_value_succeeds( # noqa: E501 + self, + ) -> None: + """Setting a callable object to the + :attr:`WorkflowBuilder.composite_processor_factory` attribute should + succeed. + """ + + def _composite_processor_factory( + processors: Sequence[Processor[Any, Any]], + ) -> Processor[Any, Any]: + return ProcessorPipe(processors) + + try: + self._instance1.composite_processor_factory = ProcessorPipe + self._instance2.composite_processor_factory = ( + _composite_processor_factory + ) + except Exception as exp: # noqa: BLE001 + _fail_reason: str = ( + "Setting the 'WorkflowBuilder.composite_processor_factory' " + "attribute with a callable object SHOULD succeed. However, " + f"the following exception was raised: '{exp!r}'." + ) + pytest.fail(reason=_fail_reason) + + assert self._instance1.composite_processor_factory is ProcessorPipe + assert ( + self._instance2.composite_processor_factory + is _composite_processor_factory + ) + + def test_composite_processor_factory_modification_with_an_invalid_value_fails( # noqa: E501 + self, + ) -> None: + """Setting a non-callable value to the + :attr:`WorkflowBuilder.composite_processor_factory` attribute should + raise a :exc:`ValueError`. + """ + for non_callable in (None, 1, 5.2, "not a callable"): + with pytest.raises(ValueError, match="be a callable") as exp_info: + self._instance1.composite_processor_factory = non_callable # type: ignore + + assert ( + exp_info.value.args[0] + == "'composite_processor_factory' MUST be a callable object." + ) + + def test_composite_processor_factory_return_value(self) -> None: + """:attr:`WorkflowBuilder.composite_processor_factory` should return + the factory function used to combine multiple processors. + """ + assert ( + self._instance1.composite_processor_factory + is ScatterGatherProcessor + ) + assert self._instance2.composite_processor_factory is ProcessorPipe + + def test_composite_sink_factory_modification_with_valid_value_succeeds( + self, + ) -> None: + """Setting a callable object to the + :attr:`WorkflowBuilder.composite_sink_factory` attribute should + succeed. + """ + + def _composite_sink_factory(sinks: Sequence[Sink[Any]]) -> Sink[Any]: + return SplitSink(sinks) + + try: + self._instance1.composite_sink_factory = SplitSink + self._instance2.composite_sink_factory = _composite_sink_factory + except Exception as exp: # noqa: BLE001 + _fail_reason: str = ( + "Setting the 'WorkflowBuilder.composite_sink_factory' " + "attribute with a callable object SHOULD succeed. However, " + f"the following exception was raised: '{exp!r}'." + ) + pytest.fail(reason=_fail_reason) + + assert self._instance1.composite_sink_factory is SplitSink + assert ( + self._instance2.composite_sink_factory is _composite_sink_factory + ) + + def test_composite_sink_factory_modification_with_an_invalid_value_fails( + self, + ) -> None: + """Setting a non-callable value to the + :attr:`WorkflowBuilder.composite_sink_factory` attribute should raise a + :exc:`ValueError`. + """ + for non_callable in (None, 1, 5.2, "not a callable"): + with pytest.raises(ValueError, match="be a callable") as exp_info: + self._instance1.composite_sink_factory = non_callable # type: ignore + + assert ( + exp_info.value.args[0] + == "'composite_sink_factory' MUST be a callable object." + ) + + def test_composite_sink_factory_return_value(self) -> None: + """:attr:`WorkflowBuilder.composite_sink_factory` should return + the factory function used to combine multiple sinks. + """ + assert self._instance1.composite_sink_factory is ScatterSink + assert self._instance2.composite_sink_factory is SplitSink + + def test_composite_source_factory_modification_with_valid_value_succeeds( + self, + ) -> None: + """Setting a callable object to the + :attr:`WorkflowBuilder.composite_source_factory` attribute should + succeed. + """ + + def _composite_source_factory( + sources: Sequence[Source[Any]], + ) -> Source[Any]: + return GatherSource(sources) + + try: + self._instance1.composite_source_factory = GatherSource + self._instance2.composite_source_factory = ( + _composite_source_factory + ) + except Exception as exp: # noqa: BLE001 + _fail_reason: str = ( + "Setting the 'WorkflowBuilder.composite_source_factory' " + "attribute with a callable object SHOULD succeed. However, " + f"the following exception was raised: '{exp!r}'." + ) + pytest.fail(reason=_fail_reason) + + assert self._instance1.composite_source_factory is GatherSource + assert ( + self._instance2.composite_source_factory + is _composite_source_factory + ) + + def test_composite_source_factory_modification_with_an_invalid_value_fails( + self, + ) -> None: + """Setting a non-callable value to the + :attr:`WorkflowBuilder.composite_source_factory` attribute should raise + a :exc:`ValueError`. + """ + for non_callable in (None, 1, 5.2, "not a callable"): + with pytest.raises(ValueError, match="be a callable") as exp_info: + self._instance1.composite_source_factory = non_callable # type: ignore + + assert ( + exp_info.value.args[0] + == "'composite_source_factory' MUST be a callable object." + ) + + def test_composite_source_factory_return_value(self) -> None: + """:attr:`WorkflowBuilder.composite_source_factory` should return + the factory function used to combine multiple sources. + """ + assert self._instance1.composite_source_factory is GatherSource + assert self._instance2.composite_source_factory is GatherSource + + def test_default_processor_factory_modification_with_valid_value_succeeds( + self, + ) -> None: + """Setting a callable object to the + :attr:`WorkflowBuilder.default_processor_factory` attribute should + succeed. + """ + + def _processor_factory() -> Processor[Any, Any]: + return NOOPProcessor() + + try: + self._instance1.default_processor_factory = NOOPProcessor + self._instance2.default_processor_factory = _processor_factory + except Exception as exp: # noqa: BLE001 + _fail_reason: str = ( + "Setting the 'WorkflowBuilder.default_processor_factory' " + "attribute with a callable object SHOULD succeed. However, " + f"the following exception was raised: '{exp!r}'." + ) + pytest.fail(reason=_fail_reason) + + assert self._instance1.default_processor_factory is NOOPProcessor + assert self._instance2.default_processor_factory is _processor_factory + + def test_default_processor_factory_modification_with_an_invalid_value_fails( # noqa: E501 + self, + ) -> None: + """Setting a non-callable value to the + :attr:`WorkflowBuilder.default_processor_factory` attribute should + raise a :exc:`ValueError`. + """ + for non_callable in (None, 1, 5.2, "not a callable"): + with pytest.raises(ValueError, match="be a callable") as exp_info: + self._instance1.default_processor_factory = non_callable # type: ignore + + assert ( + exp_info.value.args[0] + == "'default_processor_factory' MUST be a callable object." + ) + + def test_default_processor_factory_return_value(self) -> None: + """:attr:`WorkflowBuilder.default_processor_factory` should return + the factory function used to create the default ``Processor`` when + none is provided. + """ + assert self._instance1.default_processor_factory is NOOPProcessor + assert self._instance2.default_processor_factory is NOOPProcessor + + def test_default_sink_factory_modification_with_valid_value_succeeds( + self, + ) -> None: + """Setting a callable object to the + :attr:`WorkflowBuilder.default_sink_factory` attribute should succeed. + """ + + def _sink_factory() -> Sink[Any]: + return NullSink() + + try: + self._instance1.default_sink_factory = NullSink + self._instance2.default_sink_factory = _sink_factory + except Exception as exp: # noqa: BLE001 + _fail_reason: str = ( + "Setting the 'WorkflowBuilder.default_sink_factory' " + "attribute with a callable object SHOULD succeed. However, " + f"the following exception was raised: '{exp!r}'." + ) + pytest.fail(reason=_fail_reason) + + assert self._instance1.default_sink_factory is NullSink + assert self._instance2.default_sink_factory is _sink_factory + + def test_default_sink_factory_modification_with_an_invalid_value_fails( + self, + ) -> None: + """Setting a non-callable value to the + :attr:`WorkflowBuilder.default_sink_factory` attribute should + raise a :exc:`ValueError`. + """ + for non_callable in (None, 1, 5.2, "not a callable"): + with pytest.raises(ValueError, match="be a callable") as exp_info: + self._instance1.default_sink_factory = non_callable # type: ignore + + assert ( + exp_info.value.args[0] + == "'default_sink_factory' MUST be a callable object." + ) + + def test_default_sink_factory_return_value(self) -> None: + """:attr:`WorkflowBuilder.default_sink_factory` should return the + factory function used to create the default ``Sink`` when none is + provided. + """ + assert self._instance1.default_sink_factory is NullSink + assert self._instance2.default_sink_factory is NullSink + + def test_description_modification_with_valid_value_succeeds(self) -> None: + """Setting ``None`` or a non-empty string to the + :attr:`WorkflowBuilder.description` attribute should succeed. + """ + try: + self._instance1.description = "A test ETL workflow." + self._instance2.description = None + except Exception as exp: # noqa: BLE001 + _fail_reason: str = ( + "Setting the 'WorkflowBuilder.description' attribute with " + "None or a non-empty string SHOULD succeed. However, the " + f"following exception was raised: '{exp!r}'." + ) + pytest.fail(reason=_fail_reason) + + assert self._instance1.description == "A test ETL workflow." + assert self._instance2.description is None + + def test_description_modification_with_a_non_str_value_fails(self) -> None: + """Setting a non-none or non-string value to the + :attr:`WorkflowBuilder.description` should raise a :exc:`TypeError`. + """ + for non_str in (1, 5.2, self._instance2): + with pytest.raises(TypeError, match="be a string") as exp_info: + self._instance1.name = non_str # type: ignore + + assert exp_info.value.args[0] == "'name' MUST be a string." + + def test_description_return_value(self) -> None: + """:attr:`WorkflowBuilder.description` should return the description of + the workflow. + """ + assert self._instance1.description is None + assert self._instance2.description == "A sample ETL workflow." + + def test_drain_to_fails_when_given_invalid_input(self) -> None: + """The method :meth:`WorkflowBuilder.drain_to` should raise a + :exc:`ValueError` when the given value IS NOT a :class:`Sink` instance + or a callable object. + """ + for non_sink in (None, 1, 5.2, {}, (), []): + with pytest.raises(ValueError, match="Sink") as exp_info: + self._instance1.drain_to(non_sink) # type: ignore + + assert exp_info.value.args[0] == ( + "'sink' MUST be an 'sghi.etl.core.Sink' instance or a " + "factory function that returns an instance of the same " + "type." + ) + + def test_drain_to_return_value(self) -> None: + """The method meth:`WorkflowBuilder.drain_to` should return the current + ``WorkflowBuilder`` instance, i.e. ``self``, on exit. + """ + + @sink + def discard(values: Iterator[int]) -> None: + for _ in values: + ... + + assert self._instance1.drain_to(NullSink) is self._instance1 + assert self._instance2.drain_to(discard) is self._instance2 + + def test_drain_to_side_effects(self) -> None: + """The method :meth:`WorkflowBuilder.drain_to` should convert the + given :class:`Sink` instance into a factory function that returns the + ``Sink`` once and then append the factory function to the list of + available sink factories. + + If the given value is a callable object, then append it to the list of + available sink factories as is. + """ + null = NullSink() + + self._instance1.drain_to(null) + self._instance1.drain_to(NullSink) + assert len(self._instance1.sink_factories) == 2 + assert callable(self._instance1.sink_factories[0]) + assert callable(self._instance1.sink_factories[1]) + assert self._instance1.sink_factories[1] is NullSink + + null_factory = self._instance1.sink_factories[0] + # The 1st call should yield the original value + assert null_factory() is null + # Later calls should result in `SoleValueAlreadyRetrievedError` being + # raised. + for _ in range(10): + with pytest.raises(SoleValueAlreadyRetrievedError) as exp_info: + null_factory() + + assert ( + exp_info.value.message + == "This factory's sole value has already been retrieved." + ) + + def test_drains_to_fails_when_given_invalid_input(self) -> None: + """The decorator :meth:`WorkflowBuilder.drains_to` should raise a + :exc:`TypeError` when the given value IS NOT a :class:`Sink` instance. + """ + for non_sink in (None, 1, 5.2, str, Zero()): + with pytest.raises(TypeError, match="Sink") as exp_info: + self._instance1.drains_to(non_sink) # type: ignore + + assert ( + exp_info.value.args[0] + == "'sink' MUST be an 'sghi.etl.core.Sink' instance." + ) + + def test_drains_to_return_value(self) -> None: + """The decorator meth:`WorkflowBuilder.drains_to` should return the + :class:`Sink` instance given to it. + """ + null = NullSink() + + @sink + def discard(values: Iterator[int]) -> None: + for _ in values: + ... + + assert self._instance1.drains_to(null) is null + assert self._instance2.drains_to(discard) is discard + + def test_drains_to_side_effects(self) -> None: + """The decorator :meth:`WorkflowBuilder.drains_to` should convert the + given :class:`Sink` instance into a factory function that returns the + ``Sink`` once and then append the factory function to the list of + available sink factories. + """ + null = NullSink() + + self._instance1.drains_to(null) + assert len(self._instance1.sink_factories) == 1 + assert callable(self._instance1.sink_factories[0]) + + null_factory = self._instance1.sink_factories[0] + # The 1st call should yield the original value + assert null_factory() is null + # Later calls should result in `SoleValueAlreadyRetrievedError` being + # raised. + for _ in range(10): + with pytest.raises(SoleValueAlreadyRetrievedError) as exp_info: + null_factory() + + assert ( + exp_info.value.message + == "This factory's sole value has already been retrieved." + ) + + def test_draw_from_fails_when_given_invalid_input(self) -> None: + """The method :meth:`WorkflowBuilder.draw_from` should raise a + :exc:`ValueError` when the given value IS NOT a :class:`Source` + instance or a callable object. + """ + for non_source in (None, 1, 5.2, {}, (), []): + with pytest.raises(ValueError, match="Source") as exp_info: + self._instance1.draw_from(non_source) # type: ignore + + assert exp_info.value.args[0] == ( + "'source' MUST be an 'sghi.etl.core.Source' instance or a " + "factory function that returns an instance of the same " + "type." + ) + + def test_draw_from_return_value(self) -> None: + """The method meth:`WorkflowBuilder.draw_from` should return the + current ``WorkflowBuilder`` instance, i.e. ``self``, on exit. + """ + + @source + def empty() -> Iterator[None]: + while True: + yield None + + assert self._instance1.draw_from(Zero) is self._instance1 + assert self._instance2.draw_from(empty) is self._instance2 + + def test_draw_from_side_effects(self) -> None: + """The method :meth:`WorkflowBuilder.draw_from` should convert the + given :class:`Source` instance into a factory function that returns the + ``Source`` once and then append the factory function to the list of + available source factories. + + If the given value is a callable object, then append it to the list of + available source factories as is. + """ + zero = Zero() + + self._instance1.draw_from(zero) + self._instance1.draw_from(Zero) + assert len(self._instance1.source_factories) == 2 + assert callable(self._instance1.source_factories[0]) + assert callable(self._instance1.source_factories[1]) + assert self._instance1.source_factories[1] is Zero + + zero_factory = self._instance1.source_factories[0] + # The 1st call should yield the original value + assert zero_factory() is zero + # Later calls should result in `SoleValueAlreadyRetrievedError` being + # raised. + for _ in range(10): + with pytest.raises(SoleValueAlreadyRetrievedError) as exp_info: + zero_factory() + + assert ( + exp_info.value.message + == "This factory's sole value has already been retrieved." + ) + + def test_draws_from_fails_when_given_invalid_input(self) -> None: + """The decorator :meth:`WorkflowBuilder.draws_from` should raise a + :exc:`TypeError` when the given value IS NOT a :class:`Source` + instance. + """ + for non_source in (None, 1, 5.2, str, NOOPProcessor()): + with pytest.raises(TypeError, match="Source") as exp_info: + self._instance1.draws_from(non_source) # type: ignore + + assert ( + exp_info.value.args[0] + == "'source' MUST be an 'sghi.etl.core.Source' instance." + ) + + def test_draws_from_return_value(self) -> None: + """The decorator meth:`WorkflowBuilder.draws_from` should return the + :class:`Source` instance given to it. + """ + zero = Zero() + + @source + def empty() -> Iterator[None]: + while True: + yield None + + assert self._instance1.draws_from(zero) is zero + assert self._instance2.draws_from(empty) is empty + + def test_draws_from_side_effects(self) -> None: + """The decorator :meth:`WorkflowBuilder.draws_from` should convert the + given :class:`Source` instance into a factory function that returns the + ``Source`` once and then append the factory function to the list of + available sink factories. + """ + zero = Zero() + + self._instance1.draws_from(zero) + assert len(self._instance1.source_factories) == 1 + assert callable(self._instance1.source_factories[0]) + + zero_factory = self._instance1.source_factories[0] + # The 1st call should yield the original value + assert zero_factory() is zero + # Later calls should result in `SoleValueAlreadyRetrievedError` being + # raised. + for _ in range(10): + with pytest.raises(SoleValueAlreadyRetrievedError) as exp_info: + zero_factory() + + assert ( + exp_info.value.message + == "This factory's sole value has already been retrieved." + ) + + def test_id_modification_with_valid_value_succeeds(self) -> None: + """Setting a non-empty string to the :attr:`WorkflowBuilder.id` + attribute should succeed. + """ + try: + self._instance1.id = "sample_workflow_1" + self._instance2.id = "sample_workflow_2" + except Exception as exp: # noqa: BLE001 + _fail_reason: str = ( + "Setting the 'WorkflowBuilder.id' attribute with a non-empty " + "string SHOULD succeed. However, the following exception was " + f"raised: '{exp!r}'." + ) + pytest.fail(reason=_fail_reason) + + assert self._instance1.id == "sample_workflow_1" + assert self._instance2.id == "sample_workflow_2" + + def test_id_modification_with_a_non_str_value_fails(self) -> None: + """Setting a non-string value to the :attr:`WorkflowBuilder.id` should + raise a :exc:`TypeError`. + """ + for non_str in (None, 1, 5.2, self._instance2): + with pytest.raises(TypeError, match="be a string") as exp_info: + self._instance1.id = non_str # type: ignore + + assert exp_info.value.args[0] == "'id' MUST be a string." + + def test_id_modification_with_an_empty_str_value_fails(self) -> None: + """Setting an empty string to the :attr:`WorkflowBuilder.id` attribute + should raise a :exc:`ValueError`. + """ + with pytest.raises(ValueError, match="an empty string") as exp_info: + self._instance2.id = "" + + assert exp_info.value.args[0] == "'id' MUST NOT be an empty string." + + def test_id_return_value(self) -> None: + """:attr:`WorkflowBuilder.id` should return the unique identifier of + the workflow. + """ + assert self._instance1.id == "test_workflow_1" + assert self._instance2.id == "test_workflow_2" + + def test_invoking_instances_as_callable_return_value(self) -> None: + """Invoking instances of ``WorkflowBuilder`` as callables shoyld return + the same value as invoking :meth:`WorkflowBuilder.build`. + """ + wb: WorkflowBuilder[Any, Any] = self._instance1 + wb.draw_from(Zero).apply_processor(NOOPProcessor).drain_to(NullSink) + + workflow_def: WorkflowDefinition[Any, Any] = wb() + assert isinstance(workflow_def, WorkflowDefinition) + assert workflow_def.id == "test_workflow_1" + assert workflow_def.name == "Test Workflow One" + assert workflow_def.description is None + assert workflow_def.source_factory is Zero + assert workflow_def.processor_factory is NOOPProcessor + assert workflow_def.sink_factory is NullSink + + def test_name_modification_with_valid_value_succeeds(self) -> None: + """Setting a non-empty string to the :attr:`WorkflowBuilder.name` + attribute should succeed. + """ + try: + self._instance1.name = "Sample Workflow One" + self._instance2.name = "Sample Workflow Two" + except Exception as exp: # noqa: BLE001 + _fail_reason: str = ( + "Setting the 'WorkflowBuilder.name' attribute with a " + "non-empty string SHOULD succeed. However, the following " + f"exception was raised: '{exp!r}'." + ) + pytest.fail(reason=_fail_reason) + + assert self._instance1.name == "Sample Workflow One" + assert self._instance2.name == "Sample Workflow Two" + + def test_name_modification_with_a_non_str_value_fails(self) -> None: + """Setting a non-string value to the :attr:`WorkflowBuilder.name` + should raise a :exc:`TypeError`. + """ + for non_str in (None, 1, 5.2, self._instance2): + with pytest.raises(TypeError, match="be a string") as exp_info: + self._instance1.name = non_str # type: ignore + + assert exp_info.value.args[0] == "'name' MUST be a string." + + def test_name_modification_with_an_empty_str_value_fails(self) -> None: + """Setting an empty string to the :attr:`WorkflowBuilder.name` + attribute should raise a :exc:`ValueError`. + """ + with pytest.raises(ValueError, match="an empty string") as exp_info: + self._instance2.name = "" + + assert exp_info.value.args[0] == "'name' MUST NOT be an empty string." + + def test_name_return_value(self) -> None: + """:attr:`WorkflowBuilder.name` should return the name of the workflow.""" # noqa: E501 + assert self._instance1.name == "Test Workflow One" + assert self._instance2.name == "Test Workflow Two" + + def test_processor_factories_modification_with_valid_value_succeeds( + self, + ) -> None: + """Setting a ``Sequence`` to the + :attr:`WorkflowBuilder.processor_factories` attribute should succeed. + """ + try: + self._instance1.processor_factories = [NOOPProcessor] + self._instance2.processor_factories = () + except Exception as exp: # noqa: BLE001 + _fail_reason: str = ( + "Setting the 'WorkflowBuilder.processor_factories' attribute " + "with a Sequence SHOULD succeed. However, the following " + f"exception was raised: '{exp!r}'." + ) + pytest.fail(reason=_fail_reason) + + assert len(self._instance1.processor_factories) == 1 + assert len(self._instance2.processor_factories) == 0 + assert self._instance1.processor_factories[0] is NOOPProcessor + + def test_processor_factories_modification_with_a_non_str_value_fails( + self, + ) -> None: + """Setting a non-Sequence value to the + :attr:`WorkflowBuilder.processor_factories` should raise a + :exc:`TypeError`. + """ + for non_str in (None, 1, 5.2, self._instance2, {}): + with pytest.raises(TypeError, match="Sequence") as exp_info: + self._instance1.processor_factories = non_str # type: ignore + + assert ( + exp_info.value.args[0] + == "'processor_factories' MUST be a collections.abc.Sequence." + ) + + def test_processor_factories_return_value(self) -> None: + r""":attr:`WorkflowBuilder.processor_factories` should return a + ``Sequence`` of the factories registered to create ``Processor``\ s + for the assembled ``WorkflowDefinition``\ (s). + """ + assert len(self._instance1.processor_factories) == 0 + assert len(self._instance2.processor_factories) == 1 + assert self._instance2.processor_factories[0] == NOOPProcessor + + def test_sink_factories_modification_with_valid_value_succeeds( + self, + ) -> None: + """Setting a ``Sequence`` to the :attr:`WorkflowBuilder.sink_factories` + attribute should succeed. + """ + try: + self._instance1.sink_factories = [NullSink] + self._instance2.sink_factories = () + except Exception as exp: # noqa: BLE001 + _fail_reason: str = ( + "Setting the 'WorkflowBuilder.sink_factories' attribute with " + "a Sequence SHOULD succeed. However, the following " + f"exception was raised: '{exp!r}'." + ) + pytest.fail(reason=_fail_reason) + + assert len(self._instance1.sink_factories) == 1 + assert len(self._instance2.sink_factories) == 0 + assert self._instance1.sink_factories[0] is NullSink + + def test_sink_factories_modification_with_a_non_str_value_fails( + self, + ) -> None: + """Setting a non-Sequence value to the + :attr:`WorkflowBuilder.sink_factories` should raise a :exc:`TypeError`. + """ + for non_str in (None, 1, 5.2, self._instance2, {}): + with pytest.raises(TypeError, match="Sequence") as exp_info: + self._instance1.sink_factories = non_str # type: ignore + + assert ( + exp_info.value.args[0] + == "'sink_factories' MUST be a collections.abc.Sequence." + ) + + def test_sink_factories_return_value(self) -> None: + r""":attr:`WorkflowBuilder.sink_factories` should return a + ``Sequence`` of the factories registered to create ``Sink``\ s + for the assembled ``WorkflowDefinition``\ (s). + """ + assert len(self._instance1.sink_factories) == 0 + assert len(self._instance2.sink_factories) == 1 + assert self._instance2.sink_factories[0] == NullSink + + def test_source_factories_modification_with_valid_value_succeeds( + self, + ) -> None: + """Setting a ``Sequence`` to the + :attr:`WorkflowBuilder.source_factories` attribute should succeed. + """ + try: + self._instance1.source_factories = [Zero] + self._instance2.source_factories = () + except Exception as exp: # noqa: BLE001 + _fail_reason: str = ( + "Setting the 'WorkflowBuilder.source_factories' attribute " + "with a Sequence SHOULD succeed. However, the following " + f"exception was raised: '{exp!r}'." + ) + pytest.fail(reason=_fail_reason) + + assert len(self._instance1.source_factories) == 1 + assert len(self._instance2.source_factories) == 0 + assert self._instance1.source_factories[0] is Zero + + def test_source_factories_modification_with_a_non_str_value_fails( + self, + ) -> None: + """Setting a non-Sequence value to the + :attr:`WorkflowBuilder.source_factories` should raise a + :exc:`TypeError`. + """ + for non_str in (None, 1, 5.2, self._instance2, {}): + with pytest.raises(TypeError, match="Sequence") as exp_info: + self._instance1.source_factories = non_str # type: ignore + + assert ( + exp_info.value.args[0] + == "'source_factories' MUST be a collections.abc.Sequence." + ) + + def test_source_factories_return_value(self) -> None: + r""":attr:`WorkflowBuilder.source_factories` should return a + ``Sequence`` of the factories registered to create ``Source``\ s + for the assembled ``WorkflowDefinition``\ (s). + """ + assert len(self._instance1.source_factories) == 0 + assert len(self._instance2.source_factories) == 1 + assert self._instance2.source_factories[0] == Zero + + def test_string_representation(self) -> None: + """The :meth:`WorkflowBuilder.__str__` and + :meth:`WorkflowBuilder.__repr__` methods should return a string + representation of the current ``WorkflowBuilder`` instance. + """ + instance1_str: str = str(self._instance1) + instance1_rpr: str = repr(self._instance1) + instance2_str: str = str(self._instance2) + instance2_rpr: str = repr(self._instance2) + + assert ( + instance1_str + == instance1_rpr + == ( + "WorkflowBuilder(id=test_workflow_1, name=Test Workflow One, " + "description=None)" + ) + ) + assert ( + instance2_str + == instance2_rpr + == ( + "WorkflowBuilder(id=test_workflow_2, name=Test Workflow Two, " + "description=A sample ETL workflow.)" + ) + )