Skip to content

Commit

Permalink
[fix] fix problems not to proceed to next function in parallel running
Browse files Browse the repository at this point in the history
  • Loading branch information
riku-sakamoto committed Dec 17, 2023
1 parent edcc888 commit 3debd82
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 6 deletions.
2 changes: 1 addition & 1 deletion .gitlab-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ pages:
- poetry version $VERSION
- poetry config repositories.ricos https://pypi.ritc.jp
- poetry build -f wheel
# - poetry publish --username ricos --password $RICOS_PYPI_KEY -r ricos --no-ansi -n -v
- poetry publish --username ricos --password $RICOS_PYPI_KEY -r ricos --no-ansi -n -v
- poetry publish --username __token__ --password $PYPI_PUBLISH_TOKEN --no-ansi -n -v


Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "dagstream"
version = "0.1.3"
version = "0.1.4"
description = ""
authors = ["sakamoto <[email protected]>"]
license = "Apache-2.0"
Expand Down
16 changes: 13 additions & 3 deletions src/dagstream/executor.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import multiprocessing as multi
from typing import Any

from dagstream.dagstream import IFunctionalNode
from dagstream.graph_components import FunctionalDag


Expand Down Expand Up @@ -50,7 +51,7 @@ class StreamParallelExecutor:
"""Parallel Executor for FunctionalDag Object."""

def __init__(self, functional_dag: FunctionalDag, n_processes: int = 1) -> None:
"""Parallel Executor for FunctionalDag Object.
"""THIS IS EXPERIMENTAL FEATURE. Parallel Executor for FunctionalDag Object.
Parameters
----------
Expand Down Expand Up @@ -96,6 +97,9 @@ def run(self, *args, **kwargs) -> dict[str, Any]:
all_processes: list[multi.Process] = []

results: dict[str, Any] = {}
_name2nodes: dict[str, IFunctionalNode] = {
node.name: node for node in self._dag._nodes
}

while self._dag.is_active:
nodes = self._dag.get_ready()
Expand All @@ -114,8 +118,14 @@ def run(self, *args, **kwargs) -> dict[str, Any]:

while not done_queue.empty():
_done_node, _result = done_queue.get()
self._dag.done(_done_node)
results.update({_done_node.name: _result})

# HACK: When using multiprocessing, id(IFunctionalNode)
# after running is not the same as one before running.
# This operation is incorporated in the Dagstream object,
# after names of all nodes are guranteed to be unique.
done_node = _name2nodes[_done_node.name]
self._dag.done(done_node)
results.update({done_node.name: _result})

if not self._dag.is_active:
for _ in range(self._n_processes):
Expand Down
16 changes: 16 additions & 0 deletions tests/test_executors.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ def funcD():
time.sleep(1)


def funcE():
time.sleep(1)


def test__parallel_run_is_faster_than_single_run():
stream_parallel = DagStream()
# All nodes can be run in parallel
Expand All @@ -44,4 +48,16 @@ def test__parallel_run_is_faster_than_single_run():
assert abs(elapsed_time - elapsed_time_parallel * 4) < 1.0


def test__can_run_in_parallel_with_orders():
stream = DagStream()
A, B, C, D, E = stream.emplace(funcA, funcB, funcC, funcD, funcE)

A.precede(B, C)
E.succeed(B, C, D)
D.succeed(C)

executor = StreamParallelExecutor(stream.construct(), n_processes=4)
executor.run()


# endregion
2 changes: 1 addition & 1 deletion tests/test_version.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@


def test_version():
assert __version__ == "0.1.3"
assert __version__ == "0.1.4"

0 comments on commit 3debd82

Please sign in to comment.