-
Notifications
You must be signed in to change notification settings - Fork 4
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
enhance composable node support & code refactor (#5)
Merging the refactoring, please use issue #6 for lifecycle support
- Loading branch information
1 parent
7da9a41
commit 8207868
Showing
24 changed files
with
1,357 additions
and
1,654 deletions.
There are no files selected for viewing
This file was deleted.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,5 @@ | ||
# | ||
# Copyright (c) 2023 Composiv.ai, Eteration A.S. and others | ||
# Copyright (c) 2024 Composiv.ai, Eteration A.S. and others | ||
# | ||
# All rights reserved. This program and the accompanying materials | ||
# are made available under the terms of the Eclipse Public License v2.0 | ||
|
@@ -14,93 +14,94 @@ | |
# Composiv.ai, Eteration A.S. - initial API and implementation | ||
# | ||
|
||
import os | ||
import yaml | ||
import json | ||
import rclpy | ||
from rclpy.node import Node | ||
|
||
from ament_index_python.packages import get_package_share_directory | ||
from composer.stack import Stack | ||
from composer.model.stack import Stack | ||
from composer.twin import Twin | ||
from composer.edge_device import EdgeDevice | ||
from composer.model.edge_device import EdgeDevice | ||
from muto_msgs.srv import ComposePlugin | ||
from muto_msgs.msg import PluginResponse, StackManifest, PlanManifest | ||
|
||
|
||
class MutoDefaultComposePlugin(Node): | ||
""" | ||
Default composition plugin node for handling stack compositions | ||
""" | ||
def __init__(self): | ||
super().__init__("compose_plugin") | ||
|
||
self.declare_parameter("name", "example-01") | ||
self.declare_parameter("namespace", "org.eclipse.muto.sandbox") | ||
self.declare_parameter("stack_topic", "stack") | ||
self.declare_parameter("twin_topic", "twin") | ||
self.declare_parameter("anonymous", False) | ||
self.declare_parameter( | ||
"twin_url", "http://ditto:[email protected]") | ||
|
||
self.name = self.get_parameter("name").value | ||
self.namespace = self.get_parameter("namespace").value | ||
self.stack_topic = self.get_parameter("stack_topic").value | ||
self.twin_topic = self.get_parameter("twin_topic").value | ||
self.twin_url = self.get_parameter("twin_url").value | ||
self.anonymous = self.get_parameter("anonymous").value | ||
|
||
self.muto = { | ||
"name": self.name, | ||
"namespace": self.namespace, | ||
"stack_topic": self.stack_topic, | ||
"twin_topic": self.twin_topic, | ||
"twin_url": self.twin_url, | ||
"anonymous": self.anonymous | ||
self._init_parameters() | ||
self._init_services() | ||
|
||
def _init_parameters(self): | ||
params = { | ||
"name": "example-01", | ||
"namespace": "org.eclipse.muto.sandbox", | ||
"stack_topic": "stack", | ||
"twin_topic": "twin", | ||
"anonymous": False, | ||
"twin_url": "http://ditto:[email protected]" | ||
} | ||
for param, value in params.items(): | ||
self.declare_parameter(param, value) | ||
self.muto = {param: self.get_parameter(param).value for param in params} | ||
|
||
self.srv = self.create_service( | ||
ComposePlugin, "muto_compose", self.handle_compose | ||
) | ||
|
||
self.twin = Twin(node='muto_compose_plugin', | ||
config=self.muto, publisher=None) | ||
self.twin = Twin(node='muto_compose_plugin', config=self.muto, publisher=None) | ||
self.edge_device = EdgeDevice(twin=self.twin) | ||
|
||
def _init_services(self): | ||
self.create_service(ComposePlugin, "muto_compose", self.handle_compose) | ||
|
||
def handle_compose(self, req, res): | ||
plan = req.input | ||
""" | ||
Handles composition requests, merging current and next stack definitions. | ||
st = json.loads(plan.current.stack) | ||
current_stack = Stack(self.edge_device, st, None) | ||
st = json.loads(plan.next.stack) | ||
:param req: The service request containing the composition plan. | ||
:param res: The service response to be filled with the result of the composition. | ||
:return: The service response with the composition result. | ||
""" | ||
plan = req.input | ||
current_stack = Stack(self.edge_device, json.loads(plan.current.stack), None) | ||
next_stack_manifest = json.loads(plan.next.stack) | ||
|
||
if st.get('stackId', None) is not None: | ||
manifest = self.twin.stack(st['stackId']) | ||
nextStack = Stack(self.edge_device, manifest, None) | ||
else: | ||
nextStack = Stack(self.edge_device, st, None) | ||
next_stack = self._get_next_stack(next_stack_manifest) | ||
merged = current_stack.merge(next_stack) | ||
|
||
merged = current_stack.merge(nextStack) | ||
res.output = self._create_plan_manifest(plan, merged.manifest) | ||
return res | ||
|
||
res.output = PlanManifest( | ||
def _get_next_stack(self, manifest): | ||
""" | ||
Retrieves the next stack based on its manifest, handling direct definitions or references by ID. | ||
:param manifest: The manifest dictionary of the next stack. | ||
:return: An instance of the Stack class representing the next stack. | ||
""" | ||
if 'stackId' in manifest: | ||
manifest = self.twin.stack(manifest['stackId']) | ||
return Stack(self.edge_device, manifest, None) | ||
|
||
def _create_plan_manifest(self, plan, merged_manifest): | ||
""" | ||
Creates a PlanManifest message from the current and next plans, and the merged manifest. | ||
:param plan: The current composition plan. | ||
:param merged_manifest: The merged stack manifest. | ||
:return: A PlanManifest instance ready to be sent as a service response. | ||
""" | ||
return PlanManifest( | ||
current=plan.current, | ||
next=plan.next, | ||
pipeline=plan.pipeline, | ||
planned=StackManifest( | ||
type="json", | ||
stack=json.dumps(merged.manifest) | ||
), | ||
result=PluginResponse( | ||
result_code=0, error_message="", error_description="" | ||
) | ||
planned=StackManifest(type="json", stack=json.dumps(merged_manifest)), | ||
result=PluginResponse(result_code=0, error_message="", error_description="") | ||
) | ||
return res | ||
|
||
|
||
def main(args=None): | ||
rclpy.init(args=args) | ||
def main(): | ||
rclpy.init() | ||
node = MutoDefaultComposePlugin() | ||
rclpy.spin(node) | ||
node.destroy_node() | ||
rclpy.shutdown() | ||
|
||
|
||
if __name__ == "__main__": | ||
main() | ||
main() |
Oops, something went wrong.