Skip to content

Commit

Permalink
Merge pull request #47 from pyiron/add_pull
Browse files Browse the repository at this point in the history
  • Loading branch information
liamhuber authored Oct 24, 2023
2 parents 696ff14 + dd830e3 commit 091abf1
Show file tree
Hide file tree
Showing 9 changed files with 545 additions and 290 deletions.
224 changes: 99 additions & 125 deletions notebooks/workflow_example.ipynb

Large diffs are not rendered by default.

73 changes: 5 additions & 68 deletions pyiron_workflow/composite.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,12 @@
from typing import Literal, Optional, TYPE_CHECKING

from bidict import bidict
from toposort import toposort_flatten, CircularDependencyError

from pyiron_workflow.interfaces import Creator, Wrappers
from pyiron_workflow.io import Outputs, Inputs
from pyiron_workflow.node import Node
from pyiron_workflow.node_package import NodePackage
from pyiron_workflow.topology import set_run_connections_according_to_linear_dag
from pyiron_workflow.util import logger, DotDict, SeabornColors

if TYPE_CHECKING:
Expand Down Expand Up @@ -189,74 +189,11 @@ def disconnect_run(self) -> list[tuple[Channel, Channel]]:
def set_run_signals_to_dag_execution(self):
"""
Disconnects all `signals.input.run` connections among children and attempts to
reconnect these according to the DAG flow of the data.
Raises:
ValueError: When the data connections do not form a DAG.
"""
self.disconnect_run()
self._set_run_connections_and_starting_nodes_according_to_linear_dag()
# TODO: Replace this linear setup with something more powerful

def _set_run_connections_and_starting_nodes_according_to_linear_dag(self):
# This is the most primitive sort of topological exploitation we can do
# It is not efficient if the nodes have executors and can run in parallel
try:
# Topological sorting ensures that all input dependencies have been
# executed before the node depending on them gets run
# The flattened part is just that we don't care about topological
# generations that are mutually independent (inefficient but easier for now)
execution_order = toposort_flatten(self.get_data_digraph())
except CircularDependencyError as e:
raise ValueError(
f"Detected a cycle in the data flow topology, unable to automate the "
f"execution of non-DAGs: cycles found among {e.data}"
)

for i, label in enumerate(execution_order[:-1]):
next_node = execution_order[i + 1]
self.nodes[label] > self.nodes[next_node]
self.starting_nodes = [self.nodes[execution_order[0]]]

def get_data_digraph(self) -> dict[str, set[str]]:
"""
Builds a directed graph of node labels based on data connections between nodes
directly owned by this composite -- i.e. does not worry about data connections
which are entirely internal to an owned sub-graph.
Returns:
dict[str, set[str]]: A dictionary of nodes and the nodes they depend on for
data.
Raises:
ValueError: When a node appears in its own input.
reconnect these according to the DAG flow of the data. On success, sets the
starting nodes to just be the upstream-most node in this linear DAG flow.
"""
digraph = {}

for node in self.nodes.values():
node_dependencies = []
for channel in node.inputs:
locally_scoped_dependencies = []
for upstream in channel.connections:
if upstream.node.parent is self:
locally_scoped_dependencies.append(upstream.node.label)
elif channel.node.get_first_shared_parent(upstream.node) is self:
locally_scoped_dependencies.append(
upstream.node.get_parent_proximate_to(self).label
)
node_dependencies.extend(locally_scoped_dependencies)
node_dependencies = set(node_dependencies)
if node.label in node_dependencies:
# the toposort library has a
# [known issue](https://gitlab.com/ericvsmith/toposort/-/issues/3)
# That self-dependency isn't caught, so we catch it manually here.
raise ValueError(
f"Detected a cycle in the data flow topology, unable to automate "
f"the execution of non-DAGs: {node.label} appears in its own input."
)
digraph[node.label] = node_dependencies

return digraph
_, upstream_most_node = set_run_connections_according_to_linear_dag(self.nodes)
self.starting_nodes = [upstream_most_node]

def _build_io(
self,
Expand Down
32 changes: 17 additions & 15 deletions pyiron_workflow/function.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,9 @@ class Function(Node):
Further, functions with multiple return branches that return different types or
numbers of return values may or may not work smoothly, depending on the details.
Output is updated in the `process_run_result` inside the parent class `finish_run`
call, such that output data gets pushed after the node stops running but before
then `ran` signal fires: run, process and push result, ran.
Output is updated according to `process_run_result` -- which gets invoked by the
post-run callbacks defined in `Node` -- such that run results are used to populate
the output channels.
After a node is instantiated, its input can be updated as `*args` and/or `**kwargs`
on call.
Expand Down Expand Up @@ -103,7 +103,7 @@ class Function(Node):
run: Parse and process the input, execute the engine, process the results and
update the output.
disconnect: Disconnect all data and signal IO connections.
update_input: Allows input channels' values to be updated without any running.
set_input_values: Allows input channels' values to be updated without any running.
Examples:
At the most basic level, to use nodes all we need to do is provide the
Expand Down Expand Up @@ -173,9 +173,7 @@ class Function(Node):
using good variable names and returning those variables instead of using
`output_labels`.
If we force the node to `run()` (or call it) with bad types, it will raise an
error.
But, if we use the gentler `update()`, it will check types first and simply
return `None` if the input is not all `ready`.
error:
>>> from typing import Union
>>>
>>> def hinted_example(
Expand All @@ -186,13 +184,17 @@ class Function(Node):
... return p1, m1
>>>
>>> plus_minus_1 = Function(hinted_example, x="not an int")
>>> plus_minus_1.update()
>>> plus_minus_1.outputs.to_value_dict()
{'p1': <class 'pyiron_workflow.channels.NotData'>,
'm1': <class 'pyiron_workflow.channels.NotData'>}
>>> plus_minus_1.run()
ValueError: hinted_example received a run command but is not ready. The node
should be neither running nor failed, and all input values should conform to
type hints:
running: False
failed: False
x ready: False
y ready: True
Here, even though all the input has data, the node sees that some of it is the
wrong type and so the automatic updates don't proceed all the way to a run.
wrong type and so (by default) the run raises an error right away.
Note that the type hinting doesn't actually prevent us from assigning bad values
directly to the channel (although it will, by default, prevent connections
_between_ type-hinted channels with incompatible hints), but it _does_ stop the
Expand Down Expand Up @@ -333,7 +335,7 @@ def __init__(
# TODO: Parse output labels from the node function in case output_labels is None

self.signals = self._build_signal_channels()
self.update_input(*args, **kwargs)
self.set_input_values(*args, **kwargs)

def _get_output_labels(self, output_labels: str | list[str] | tuple[str] | None):
"""
Expand Down Expand Up @@ -516,7 +518,7 @@ def _convert_input_args_and_kwargs_to_input_kwargs(self, *args, **kwargs):

return kwargs

def update_input(self, *args, **kwargs) -> None:
def set_input_values(self, *args, **kwargs) -> None:
"""
Match positional and keyword arguments to input channels and update input
values.
Expand All @@ -527,7 +529,7 @@ def update_input(self, *args, **kwargs) -> None:
pairs.
"""
kwargs = self._convert_input_args_and_kwargs_to_input_kwargs(*args, **kwargs)
return super().update_input(**kwargs)
return super().set_input_values(**kwargs)

def __call__(self, *args, **kwargs) -> None:
kwargs = self._convert_input_args_and_kwargs_to_input_kwargs(*args, **kwargs)
Expand Down
2 changes: 1 addition & 1 deletion pyiron_workflow/macro.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ def __init__(
self._inputs: Inputs = self._build_inputs()
self._outputs: Outputs = self._build_outputs()

self.update_input(**kwargs)
self.set_input_values(**kwargs)

def _get_linking_channel(
self,
Expand Down
Loading

0 comments on commit 091abf1

Please sign in to comment.