Skip to content

Commit

Permalink
removed 'from_parameter' node assignments (#24); fixed correct depend…
Browse files Browse the repository at this point in the history
…ency structure of nodes by introducing "callback", "process" and "data" edges.; implemented new topological sorting algorithm
  • Loading branch information
cnavacch committed Oct 17, 2020
1 parent a701bd7 commit c615221
Show file tree
Hide file tree
Showing 4 changed files with 332 additions and 159 deletions.
145 changes: 135 additions & 10 deletions src/openeo_pg_parser/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,49 @@
from pprint import pformat
from collections import OrderedDict
import igraph as ig

from openeo_pg_parser.utils import find_node_inputs
from openeo_pg_parser.definitions import OpenEOProcess
from openeo_pg_parser.definitions import OpenEOParameter


# def rec_sort_process_graph(process_graph, nodes, depth=0, list_idx=0):
# while True:
#
# current_node = nodes[list_idx]
# pg_lower_dep = process_graph.lineage(current_node, link="callback", ancestors=True,
# include_node=False)
# if len(pg_lower_dep) == 0:
# return
# else:
# depth rec_sort_process_graph(process_graph, sorted_nodes, depth=depth)
def create_edge(node_from, node_to, name="data", hidden=False):
"""
Creates a directed edge of type `graph.Edge` between the nodes `node_from` and `node_to`.
Parameters
----------
node_from : graph.Node
Start node of the edge.
node_to : graph.Node
End node of the edge.
name : str, optional
Name of the edge (default is "data")
hidden : bool, optional
True if edge should be ignored, e.g. for sorting (defaults to False).
Returns
-------
graph.Edge
Created directed edge consisting of the two given nodes.
"""
edge_nodes = [node_from, node_to]
edge_id = "_".join([edge_node.id for edge_node in edge_nodes])
edge = Edge(id=edge_id, name=name, nodes=edge_nodes, hidden=hidden)
node_to.add_edge(edge)
node_from.add_edge(edge)

class Node:
"""
A node of a graph, containing information about its edges, an ID, a name and a sub-graph/dictionary.
Expand Down Expand Up @@ -469,6 +508,77 @@ def find_partners(self, node, link=None, include_node=True):

return Graph.from_list(nodes)

def _linear_sorting(self, use_in_nodes=True):
"""
Internal sorting method for ordering the Node IDs in linear manner corresponding to their call order.
A callback node (calling/embedding a sub-process graph) is passed at least two times, once for
passing the output of itself to the sub-process (`use_in_nodes=True`) and once for passing the output
from the sub-process to the next process (`use_in_nodes=False`).
Parameters
----------
use_in_nodes : bool, optional
If true, the node is put into the order, when it is passed/called the first time passing output to the sub-process.
If false, the node is put into the order, when it is passed/called the second time passing output of
the sub-process to the next process.
Returns
-------
ordered_node_ids_filt : list
List of node IDs sorted by their call order.
"""
edges = []
for node in self.nodes:
for edge in node.edges:
if edge not in edges and edge.name != "callback":
edges.append(edge)

tuple_edges = []
for edge in edges:
first_node = edge.nodes[0]
sec_node = edge.nodes[1]
if first_node.depth < sec_node.depth:
if edge.name == "data":
if sec_node.uses_callback:
tuple_edges.append((first_node.id + "_in", sec_node.id + "_in"))
else:
tuple_edges.append((first_node.id + "_in", sec_node.id))
elif first_node.depth > sec_node.depth:
if edge.name == "process":
if first_node.uses_callback:
tuple_edges.append((first_node.id + "_out", sec_node.id + "_out"))
else:
tuple_edges.append((first_node.id, sec_node.id + "_out"))
else:
if edge.name == "process":
if first_node.uses_callback and sec_node.uses_callback:
tuple_edges.append((first_node.id + "_out", sec_node.id + "_in"))
elif not first_node.uses_callback and sec_node.uses_callback:
tuple_edges.append((first_node.id, sec_node.id + "_in"))
elif first_node.uses_callback and not sec_node.uses_callback:
tuple_edges.append((first_node.id + "_out", sec_node.id))
else:
tuple_edges.append((first_node.id, sec_node.id))

ig_graph = ig.Graph.TupleList(tuple_edges, directed=True)
node_order = ig_graph.topological_sorting()
ordered_node_ids = np.array(ig_graph.vs['name'])[node_order]

tag_this = "in" if use_in_nodes else "out"
tag_other = "out" if use_in_nodes else "in"
ordered_node_ids_filt = []
for node_id in ordered_node_ids:
if tag_this in node_id:
node_id = node_id.rstrip('_' + tag_this)
ordered_node_ids_filt.append(node_id)
elif tag_other in node_id:
continue
else:
ordered_node_ids_filt.append(node_id)

return ordered_node_ids_filt

def sort(self, by='dependency'):
"""
Sorts graph according to sorting strategy.
Expand All @@ -479,18 +589,18 @@ def sort(self, by='dependency'):
Sorting strategy:
- 'dependency': Sorts graph by each node dependency,
i.e., nodes being dependent on another node come after this node.
- 'depth': Sorts graph by the depth level of the nodes, from lower to higher depth.
Returns
-------
graph.Graph
Sorted graph.
"""

if by == "dependency":
# use igraph for topological sorting
ig = self.to_igraph()
node_order = ig.topological_sorting()
ordered_node_ids = np.array(ig.vs['name'])[node_order]
# use internal algo and igraph for topological sorting
ordered_node_ids = self._linear_sorting()
nodes_ordered = [self[ordered_node_id] for ordered_node_id in ordered_node_ids]
elif by == "depth":
depths = [node.depth for node in self.nodes]
Expand Down Expand Up @@ -570,7 +680,7 @@ def plot(self, layout="kamada_kawai", margin=100, bbox=(0, 0, 600, 600), node_si

return ig.plot(ig_graph, layout=ig_layout, margin=margin, bbox=bbox, vertex_size=node_size)

def to_igraph(self):
def to_igraph(self, edge_name=None):
"""
Converts a graph.Graph into an ig.Graph.
The ig.Graph object contains nodes with their ID's and edges with their respective names.
Expand All @@ -586,16 +696,20 @@ def to_igraph(self):
"""
edges = []
available_edge_names = ["process", "data"]
ignore_edge_names = ["callback"]
if edge_name is not None and edge_name in available_edge_names:
ignore_edge_names.extend(available_edge_names.remove(edge_name))
for node in self.nodes:
for edge in node.edges:
if not edge.hidden and edge not in edges:
if edge not in edges:
# ignore nodes, which are not contained in the graph
if edge.nodes[0].id not in self.ids or edge.nodes[1].id not in self.ids:
continue
# ignore non-result, callback nodes
if edge.name == "callback" and not edge.nodes[0].is_result:
if edge.name in ignore_edge_names:
continue
edges.append(edge)

if edges:
tuple_edges = [(edge.nodes[0].id, edge.nodes[1].id) for edge in edges]
ig_graph = ig.Graph.TupleList(tuple_edges, directed=True)
Expand Down Expand Up @@ -710,7 +824,7 @@ def parent_process(self):
Parent process node or None, if there is no parent process.
"""
parents = self.descendants("callback")
parents = self.descendants("process")
if len(parents) > 1:
err_msg = "Only one parent process is allowed."
raise Exception(err_msg)
Expand All @@ -731,7 +845,7 @@ def child_processes(self):
"""

return self.ancestors("callback")
return self.ancestors("process")

@property
def result_process(self):
Expand Down Expand Up @@ -772,6 +886,17 @@ def is_reducer(self):
""" bool : Checks if the current process is a reducer or not. """
return self.process.is_reducer

@property
def uses_callback(self):
callback_nodes = [edge.nodes[1] for edge in self.edges if "callback" in edge.name]
return self in callback_nodes

@property
def expects_parent_input(self):
""" """
keys_lineage = find_node_inputs(self, "from_parameter")
return len(keys_lineage) > 0

@property
def dimension(self):
""" str : Returns the dimension over which is reduced if the process is a reducer. """
Expand Down
Loading

0 comments on commit c615221

Please sign in to comment.