diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 0a30e76..4761588 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -57,7 +57,7 @@ pages: - poetry version $VERSION - poetry config repositories.ricos https://pypi.ritc.jp - poetry build -f wheel - # - poetry publish --username ricos --password $RICOS_PYPI_KEY -r ricos --no-ansi -n -v + - poetry publish --username ricos --password $RICOS_PYPI_KEY -r ricos --no-ansi -n -v - poetry publish --username __token__ --password $PYPI_PUBLISH_TOKEN --no-ansi -n -v diff --git a/pyproject.toml b/pyproject.toml index d8cb0a0..d28b033 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "dagstream" -version = "0.1.3" +version = "0.1.4" description = "" authors = ["sakamoto "] license = "Apache-2.0" diff --git a/src/dagstream/executor.py b/src/dagstream/executor.py index 7d52c6b..c70ee4c 100644 --- a/src/dagstream/executor.py +++ b/src/dagstream/executor.py @@ -1,6 +1,7 @@ import multiprocessing as multi from typing import Any +from dagstream.dagstream import IFunctionalNode from dagstream.graph_components import FunctionalDag @@ -50,7 +51,7 @@ class StreamParallelExecutor: """Parallel Executor for FunctionalDag Object.""" def __init__(self, functional_dag: FunctionalDag, n_processes: int = 1) -> None: - """Parallel Executor for FunctionalDag Object. + """THIS IS EXPERIMENTAL FEATURE. Parallel Executor for FunctionalDag Object. Parameters ---------- @@ -96,6 +97,9 @@ def run(self, *args, **kwargs) -> dict[str, Any]: all_processes: list[multi.Process] = [] results: dict[str, Any] = {} + _name2nodes: dict[str, IFunctionalNode] = { + node.name: node for node in self._dag._nodes + } while self._dag.is_active: nodes = self._dag.get_ready() @@ -114,8 +118,14 @@ def run(self, *args, **kwargs) -> dict[str, Any]: while not done_queue.empty(): _done_node, _result = done_queue.get() - self._dag.done(_done_node) - results.update({_done_node.name: _result}) + + # HACK: When using multiprocessing, id(IFunctionalNode) + # after running is not the same as one before running. + # This operation is incorporated in the Dagstream object, + # after names of all nodes are guranteed to be unique. + done_node = _name2nodes[_done_node.name] + self._dag.done(done_node) + results.update({done_node.name: _result}) if not self._dag.is_active: for _ in range(self._n_processes): diff --git a/tests/test_executors.py b/tests/test_executors.py index 1cbf095..f6fb763 100644 --- a/tests/test_executors.py +++ b/tests/test_executors.py @@ -22,6 +22,10 @@ def funcD(): time.sleep(1) +def funcE(): + time.sleep(1) + + def test__parallel_run_is_faster_than_single_run(): stream_parallel = DagStream() # All nodes can be run in parallel @@ -44,4 +48,16 @@ def test__parallel_run_is_faster_than_single_run(): assert abs(elapsed_time - elapsed_time_parallel * 4) < 1.0 +def test__can_run_in_parallel_with_orders(): + stream = DagStream() + A, B, C, D, E = stream.emplace(funcA, funcB, funcC, funcD, funcE) + + A.precede(B, C) + E.succeed(B, C, D) + D.succeed(C) + + executor = StreamParallelExecutor(stream.construct(), n_processes=4) + executor.run() + + # endregion diff --git a/tests/test_version.py b/tests/test_version.py index 6980737..d266d23 100644 --- a/tests/test_version.py +++ b/tests/test_version.py @@ -2,4 +2,4 @@ def test_version(): - assert __version__ == "0.1.3" + assert __version__ == "0.1.4"