Skip to content

Commit

Permalink
Adding support for container nodes and composable nodes
Browse files Browse the repository at this point in the history
  • Loading branch information
Naci Dai committed Jan 17, 2024
1 parent 449264e commit 7da9a41
Show file tree
Hide file tree
Showing 5 changed files with 272 additions and 14 deletions.
161 changes: 161 additions & 0 deletions composer/composable.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
#
# Copyright (c) 2023 Composiv.ai, Eteration A.S. and others
#
# All rights reserved. This program and the accompanying materials
# are made available under the terms of the Eclipse Public License v2.0
# and Eclipse Distribution License v1.0 which accompany this distribution.
#
# The Eclipse Public License is available at
# http://www.eclipse.org/legal/epl-v10.html
# and the Eclipse Distribution License is available at
# http://www.eclipse.org/org/documents/edl-v10.php.
#
# Contributors:
# Composiv.ai, Eteration A.S. - initial API and implementation
#
#

import os
import composer.node as node


class Container(object):
def __init__(self, stack, manifest={}):
self.stack = stack
self._manifest = manifest
self._pkg = manifest.get('package', '')
self._exec = manifest.get('executable', '')
self._name = manifest.get('name', '')
muto_ns = os.getenv('MUTONS')
self._namespace = manifest.get('namespace', muto_ns)
self._output = manifest.get('output', 'log')
self.initialize()

def toManifest(self):
manifest = {
"package": self._pkg,
"executable": self._exec,
"name": self._name,
"namespace": self._namespace,
"node": [],
"output": self._output
}
for n in self._node:
manifest["node"].append(n.toManifest())
return manifest

def initialize(self):
self._node = []
for nDef in self.manifest.get('node', []):
sn = node.Node(self.stack, nDef, self)
self._node.append(sn)

def launch(self):
parameters = []
for p in self.param:
parameters.append({p.name: p.value})

def resolve_namespace(self):
ns = self.namespace
if not ns.startswith('/'):
ns = '/' + ns
if ns.endswith('/'):
return ns + self.name + '/'
return ns + '/' + self.name + '/'

def __eq__(self, other):
"""Overrides the default implementation"""
if isinstance(other, Container):
if self.package != other.package:
return False
if self.name != other.name:
return False
if self.namespace != other.namespace:
return False
if self.executable != other.executable:
return False
if len(self.node) != len(other.node):
return False
for n in self.node:
fail = True
for on in other.node:
if n == on:
fail = False
if fail:
return False
return True
return False

def __ne__(self, other):
"""Overrides the default implementation (unnecessary in Python 3)"""
return not self.__eq__(other)

def __hash__(self):
h = 7
if self.package is not None:
h = 31 * h + hash(self.package)
if self.name is not None:
h = 31 * h + hash(self.name)
if self.namespace is not None:
h = 31 * h + hash(self.namespace)
if self.executable is not None:
h = 31 * h + hash(self.executable)
for n in self.node:
h = 31 * h + hash(n)
return h

@property
def node(self):
return self._node

@property
def package(self):
return self._pkg

@package.setter
def package(self, n):
self._pkg = n

@property
def executable(self):
return self._exec

@executable.setter
def executable(self, n):
self._exec = n

@property
def namespace(self):
return self._namespace

@namespace.setter
def namespace(self, n):
self._namespace = n

@property
def name(self):
return self._name

@name.setter
def name(self, n):
self._name = n

@property
def output(self):
return self._output

@output.setter
def output(self, n):
self._output = n

@property
def action(self):
return self._action

@action.setter
def action(self, n):
self._action = n

@property
def manifest(self):
return self._manifest
14 changes: 12 additions & 2 deletions composer/launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,16 +46,26 @@ def shutdown(self):
"The process did not terminate gracefully and was terminated forcefully.")

def on_process_start(self, event, context):
name = event.process_name
if not bool(name):
name = event.action.node_name
details = event.action.process_details
if not bool(name):
name = details['name']
with self._lock:
self._active_nodes.append({details['name']: details['pid']})
self._active_nodes.append({name: details['pid']})
print(f"Active Nodes after start: {self._active_nodes}")

def on_process_exit(self, event, context):
name = event.process_name
if not bool(name):
name = event.action.node_name
details = event.action.process_details
if not bool(name):
name = details['name']
with self._lock:
for node in self._active_nodes[:]: # make a copy of the list
if node.get(details['name']) == details['pid']:
if node.get(name) == details['pid']:
self._active_nodes.remove(node)
break
print(f"Active Nodes after exit: {self._active_nodes}")
Expand Down
19 changes: 18 additions & 1 deletion composer/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,16 @@


class Node(object):
def __init__(self, stack, manifest={}):
def __init__(self, stack, manifest={}, container=None):
self.stack = stack
self._container = container
self._manifest = manifest
self._env = manifest.get('env', [])
self._param = manifest.get('param', [])
self._remap = manifest.get('remap', [])
self._pkg = manifest.get('pkg', '')
self._exec = manifest.get('exec', '')
self._plugin = manifest.get('plugin', '')
self._name = manifest.get('name', '')
self._ros_args = manifest.get('ros_args', '')
self._remap_args = []
Expand Down Expand Up @@ -71,6 +73,7 @@ def toManifest(self):
"remap": self._remap,
"pkg": self._pkg,
"exec": self._exec,
"plugin": self._plugin,
"name": self._name,
"ros_args": self._ros_args,
"args": self._args,
Expand Down Expand Up @@ -113,8 +116,11 @@ def __eq__(self, other):
return False
if self.exectbl != other.exectbl:
return False
if self.plugin != other.plugin:
return False
if self.args != other.args:
return False

return True
return False

Expand All @@ -132,6 +138,8 @@ def __hash__(self):
h = 31 * h + hash(self.namespace)
if self.exectbl is not None:
h = 31 * h + hash(self.exectbl)
if self.plugin is not None:
h = 31 * h + hash(self.plugin)
# if self.args is not None:
# h = 31 * h + hash(self.args)
return h
Expand Down Expand Up @@ -192,6 +200,15 @@ def ros_args(self):
def ros_args(self, n):
self._ros_args = n


@property
def plugin(self):
return self._plugin

@plugin.setter
def plugin(self, n):
self._plugin = n

@property
def args(self):
return self._args
Expand Down
2 changes: 1 addition & 1 deletion composer/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ def executeStep(self, plan, step):
rclpy.spin_until_future_complete(self, future)
if future.result() is not None:
response = future.result()
if response.output.result.result_code == 1000:
if response.output.result.result_code != 0:
print(f'Step failed: {step}')
return response
else:
Expand Down
Loading

0 comments on commit 7da9a41

Please sign in to comment.