Skip to content

Commit

Permalink
Enhance composable support (#8)
Browse files Browse the repository at this point in the history
* adjust stack payload for oncoming changes from agent

* move kill diff call to stack apply

* update stack msg for oncoming changes from agent and add error handling

* update model actions

* add container and composable node based action marking
  • Loading branch information
ibrahimsel authored Mar 11, 2024
1 parent 55a5489 commit b62e8af
Show file tree
Hide file tree
Showing 5 changed files with 130 additions and 53 deletions.
18 changes: 9 additions & 9 deletions composer/model/composable.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#
# Copyright (c) 2023 Composiv.ai, Eteration A.S. and others
# Copyright (c) 2024 Composiv.ai, Eteration A.S. and others
#
# All rights reserved. This program and the accompanying materials
# are made available under the terms of the Eclipse Public License v2.0
Expand Down Expand Up @@ -28,20 +28,22 @@ def __init__(self, stack, manifest=None):
self.package = manifest.get('package', '')
self.executable = manifest.get('executable', '')
self.name = manifest.get('name', '')
self.namespace = manifest.get('namespace', os.getenv('MUTONS'))
self.namespace = manifest.get('namespace', os.getenv('MUTONS', default=''))
self.output = manifest.get('output', 'screen')
self.nodes = [node.Node(stack, nDef, self) for nDef in manifest.get('node', [])]
self.remap = manifest.get('remap', [])
self.action = manifest.get('action', '')

def to_manifest(self):
def toManifest(self):
return {
"package": self.package,
"executable": self.executable,
"name": self.name,
"namespace": self.namespace,
"node": [n.toManifest() for n in self.nodes],
"output": self.output,
"remap": self.remap
"remap": self.remap,
"action": self.action
}

def resolve_namespace(self):
Expand All @@ -55,11 +57,9 @@ def __eq__(self, other):
return (self.package == other.package and
self.name == other.name and
self.namespace == other.namespace and
self.executable == other.executable and
len(self.nodes) == len(other.nodes) and
all(n == on for n, on in zip(self.nodes, other.nodes)))
self.executable == other.executable)


def __hash__(self):
node_hash = sum(hash(n) for n in self.nodes)
return hash((self.package, self.name, self.namespace, self.executable, node_hash))
return hash((self.package, self.name, self.namespace, self.executable))

4 changes: 1 addition & 3 deletions composer/model/edge_device.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#
# Copyright (c) 2023 Composiv.ai, Eteration A.S. and others
# Copyright (c) 2024 Composiv.ai, Eteration A.S. and others
#
# All rights reserved. This program and the accompanying materials
# are made available under the terms of the Eclipse Public License v2.0
Expand Down Expand Up @@ -69,8 +69,6 @@ def activate(self, current=None):

def apply(self, current=None):
try:
if self.current_stack:
self.current_stack.kill_diff(self.launcher, self.current_stack)
self._update_current_stack(current, ACTIVE)
self.current_stack.apply(self.launcher)
except Exception as e:
Expand Down
6 changes: 3 additions & 3 deletions composer/model/node.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#
# Copyright (c) 2023 Composiv.ai, Eteration A.S. and others
# Copyright (c) 2024 Composiv.ai, Eteration A.S. and others
#
# All rights reserved. This program and the accompanying materials
# are made available under the terms of the Eclipse Public License v2.0
Expand All @@ -22,7 +22,7 @@
import rclpy

class Node:
def __init__(self, stack, manifest=None, container=None):
def __init__(self, stack, manifest={}, container=None):
if manifest is None:
manifest = {}

Expand All @@ -44,7 +44,7 @@ def __init__(self, stack, manifest=None, container=None):
self.output = manifest.get('output', 'both')
self.iff = manifest.get('if', '')
self.unless = manifest.get('unless', '')
self.action = manifest.get('action', None)
self.action = manifest.get('action', '')
self.ros_params = [{key: value} for p in self.param for key, value in p.value.items()]
self.remap_args = [(stack.resolve_expression(rm['from']), stack.resolve_expression(rm['to'])) for rm in self.remap]

Expand Down
146 changes: 110 additions & 36 deletions composer/model/stack.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import rclpy
from composer.introspection.introspector import Introspector
from launch import LaunchDescription
from launch_ros.actions import Node
from launch_ros.actions import Node, LoadComposableNodes
from launch_ros.actions import ComposableNodeContainer
from launch_ros.descriptions import ComposableNode
from ament_index_python.packages import get_package_share_directory
Expand Down Expand Up @@ -119,13 +119,19 @@ def compare_composable(self, other):
Returns:
tuple: A tuple containing sets of common, different, and added composable nodes.
"""
a = set(self.flatten_composable([]))
b = set(other.flatten_composable([]))
common = a.intersection(b)
difference = a.difference(b)
added = b.difference(a)
return common, difference, added

current_composables = {f"{c.namespace}/{c.name}": c for c in self.flatten_composable([])}
other_composables = {f"{c.namespace}/{c.name}": c for c in other.flatten_composable([])}

common_keys = current_composables.keys() & other_composables.keys()
added_keys = other_composables.keys() - current_composables.keys()
removed_keys = current_composables.keys() - other_composables.keys()

common = [current_composables[key] for key in common_keys]
added = [other_composables[key] for key in added_keys]
removed = [current_composables[key] for key in removed_keys]

return common, added, removed

def flatten_nodes(self, list):
"""Flatten the nested structure of nodes in the stack.
Expand Down Expand Up @@ -256,23 +262,57 @@ def _merge_nodes(self, merged, other):
merged.node = common.union(added).union(difference)

def _merge_composables(self, merged, other):
common_composables, different_composables, added_composables = self.compare_composable(
other)
merged.composable = []

current_containers = {(c.namespace, c.name): c for c in self.composable}
other_containers = {(c.namespace, c.name): c for c in other.composable}

# Process added and removed containers
for key, container in other_containers.items():
if key not in current_containers:
# Mark all nodes within new containers as STARTACTION
for node in container.nodes:
node.action = STARTACTION
merged.composable.append(container)
else:
# For existing containers, compare nodes within and mark actions
current_container = current_containers[key]
self.compare_and_mark_nodes(current_container, container, merged)

for container in common_composables:
for node in container.nodes:
node.action = NOACTION
for key, container in current_containers.items():
if key not in other_containers:
# Mark all nodes within removed containers as STOPACTION
for node in container.nodes:
node.action = STOPACTION
merged.composable.append(container)

return merged

def compare_and_mark_nodes(self, current_container, other_container, merged):
current_nodes = {(n.namespace, n.name): n for n in current_container.nodes}
other_nodes = {(n.namespace, n.name): n for n in other_container.nodes}

for container in added_composables:
for node in container.nodes:
for key, node in other_nodes.items():
if key not in current_nodes:
node.action = STARTACTION
else:
node.action = NOACTION

for container in different_composables:
for node in container.nodes:
for key, node in current_nodes.items():
if key not in other_nodes:
node.action = STOPACTION
else:

if node.action != STARTACTION:
node.action = NOACTION

# Add processed nodes back into their respective containers
processed_container = other_container if other_container in merged.composable else current_container
processed_container.nodes = list(current_nodes.values()) + [n for n in other_nodes.values() if n.action == STARTACTION]
if processed_container not in merged.composable:
merged.composable.append(processed_container)


merged.composable = common_composables.union(
added_composables).union(different_composables)

def _merge_params(self, merged, other):
other_params = {param.name: param.value for param in other.param}
Expand Down Expand Up @@ -380,7 +420,7 @@ def kill_diff(self, launcher, stack):

def change_params_at_runtime(self, param_differences):
"""Change parameters at runtime based on differences.
### TODO: replace with set param service call
Args:
param_differences (dict): Dictionary containing parameter differences.
"""
Expand Down Expand Up @@ -420,7 +460,7 @@ def toManifest(self):
for n in self.node:
manifest["node"].append(n.toManifest())
for c in self.composable:
manifest["composable"].append(c.to_manifest())
manifest["composable"].append(c.toManifest())
return manifest

def process_remaps(self, remaps_config):
Expand All @@ -434,32 +474,62 @@ def process_remaps(self, remaps_config):
"""
return [(rmp['from'], rmp['to']) for rmp in remaps_config] if remaps_config else []

def should_node_run(self, node_name, node_namespace):
def should_node_run(self, node):
"""Check if a node should run.
This method clears the situation where a
node has NOACTION but it isn't running (Happens at when Muto first starts running)
node has NOACTION but it isn't running
NOACTION is meant to keep the common processes alive when switching stacks
Args:
node_name (str): Name of the node.
node_namespace (str): Namespace of the node.
node (object): The node object.
Returns:
bool: True if the node should run, False otherwise.
"""
active_nodes = [(active[1] if active[1] != '/' else '') +
'/' + active[0] for active in self.get_active_nodes()]
return f'/{node_namespace}/{node_name}' not in active_nodes

for i in active_nodes:
print("ACTIVE NODES: ", i)

should_node_run = f'/{node.namespace}/{node.name}' not in active_nodes
print(f'/{node.namespace}/{node.name} should run: {should_node_run}')
return should_node_run

def load_common_composables(self, container, launch_description: LaunchDescription):
"""If there are common containers in stack composables, load them onto the existing container
Args:
container (object): The container object
"""
node_desc = []
for cn in container.nodes:
if cn.action == LOADACTION:
print(f"LOADING {cn.namespace}/{cn.name}")
node_desc.append(ComposableNode(
package=cn.pkg,
name=cn.name,
namespace=cn.namespace,
plugin=cn.plugin
))
print(f'Node DESC: {node_desc}')

def handle_composable_nodes(self, composable_nodes, launch_description, launcher):
if node_desc:
load_action = LoadComposableNodes(
target_container=f'{container.namespace}/{container.name}',
composable_node_descriptions=[node_desc],
)
launch_description.add_action(load_action)

def handle_composable_nodes(self, composable_containers, launch_description, launcher):
"""Handle composable nodes during stack launching.
Args:
composable_nodes (list): List of composable nodes.
composable_containers (list): List of composable nodes.
launch_description (object): The launch description object.
"""
for c in composable_nodes:
node_desc = [ComposableNode(package=cn.pkg, plugin=cn.plugin, name=cn.name, namespace= cn.namespace, parameters=cn.ros_params, remappings=self.process_remaps(cn.remap))
for cn in c.nodes if cn.action == STARTACTION or (cn.action == NOACTION and self.should_node_run(cn.name, cn.namespace))]
for c in composable_containers:
node_desc = [ComposableNode(package=cn.pkg, plugin=cn.plugin, name=cn.name, namespace=cn.namespace, parameters=cn.ros_params, remappings=self.process_remaps(cn.remap))
for cn in c.nodes if cn.action == STARTACTION or (cn.action == NOACTION and self.should_node_run(cn))]

if node_desc: # If node_desc is not empty
container = ComposableNodeContainer(
Expand All @@ -472,6 +542,8 @@ def handle_composable_nodes(self, composable_nodes, launch_description, launcher
)
launch_description.add_action(container)

# self.load_common_composables(c, launch_description)

def handle_regular_nodes(self, nodes, launch_description, launcher):
"""Handle regular nodes during stack launching.
Expand All @@ -480,7 +552,7 @@ def handle_regular_nodes(self, nodes, launch_description, launcher):
launch_description (object): The launch description object.
"""
for n in nodes:
if n.action == STARTACTION or (n.action == NOACTION and self.should_node_run(n.name, n.namespace)):
if n.action == STARTACTION:
launch_description.add_action(Node(
package=n.pkg,
executable=n.exec,
Expand All @@ -491,7 +563,6 @@ def handle_regular_nodes(self, nodes, launch_description, launcher):
arguments=n.args.split(),
remappings=self.process_remaps(n.remap)
))


def handle_managed_nodes(self, nodes, verb):
"""Handle regular nodes during stack launching.
Expand All @@ -514,24 +585,26 @@ def launch(self, launcher):
launch_description = LaunchDescription()

try:
self.handle_composable_nodes(self.composable, launch_description, launcher)
self.handle_composable_nodes(
self.composable, launch_description, launcher)
self.handle_regular_nodes(self.node, launch_description, launcher)

except Exception as e:
print(f'Stack launching ended with exception: {e}')

launcher.start(launch_description)
all_nodes = self.node + [cn for c in self.composable for cn in c.nodes]
self.handle_managed_nodes(all_nodes, verb='start')

# After nodes are launched, take care of managed node actions
self.handle_managed_nodes(all_nodes, verb='start')

def apply(self, launcher):
"""Apply the stack.
Args:
launcher (object): The launcher object.
"""

self.kill_diff(launcher, self)
self.launch(launcher)

def resolve_expression(self, value=""):
Expand Down Expand Up @@ -605,3 +678,4 @@ def resolve_args(self, array=[]):
self.arg[name] = p

return result

9 changes: 7 additions & 2 deletions composer/muto_composer.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,13 @@ def on_stack_callback(self, msg):
:param msg: The MutoAction message containing the stack action and payload.
"""
if msg:
stack = json.loads(msg.payload)
self.router.route(msg.method, stack)
try:
stack = json.loads(msg.payload)["value"]
self.router.route(msg.method, stack)
except KeyError as k:
self.get_logger().error(f"Payload is not in the expected format: {k}")
except Exception as e:
self.get_logger().error(f"Invalid payload coming to muto composer: {e}")

def main(args=None):
rclpy.init(args=args)
Expand Down

0 comments on commit b62e8af

Please sign in to comment.