Skip to content

Commit

Permalink
Add use of generate_parameter_library
Browse files Browse the repository at this point in the history
  • Loading branch information
Wiktor-99 committed Feb 10, 2024
1 parent 98e4946 commit 7a4fd0c
Show file tree
Hide file tree
Showing 7 changed files with 427 additions and 105 deletions.
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ repos:
rev: 6.1.0
hooks:
- id: flake8
args: ["--ignore=E501,W503,E203"] # ignore too long line and line break before binary operator
args: ["--ignore=E501,W503,E203", "--exclude=grasp_service_parameters.py"]

- repo: https://github.com/PyCQA/doc8
rev: v1.1.1
Expand Down
2 changes: 2 additions & 0 deletions ros2_grasp_service/package.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
<maintainer email="[email protected]">Wiktor Bajor</maintainer>
<license>Apache-2.0</license>

<depend>generate_parameter_library</depend>

<test_depend>ament_copyright</test_depend>
<test_depend>ament_flake8</test_depend>
<test_depend>ament_pep257</test_depend>
Expand Down
134 changes: 41 additions & 93 deletions ros2_grasp_service/ros2_grasp_service/grasp_service.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
from rclpy import init, spin_once, shutdown, spin
from control_msgs.action import FollowJointTrajectory, GripperCommand
from rclpy.node import Node
from rclpy.parameter import Parameter
from std_srvs.srv import Empty
from rclpy.action import ActionClient
from action_msgs.msg import GoalStatus
from trajectory_msgs.msg import JointTrajectoryPoint, JointTrajectory
from builtin_interfaces.msg import Duration
from time import sleep
from ros2_grasp_service.grasp_service_parameters import grasp_service_parameters


class FollowJointTrajectoryActionClient(Node):
Expand Down Expand Up @@ -69,109 +69,57 @@ def get_result_callback(self, future):
class GraspService(Node):
def __init__(self):
super().__init__("grasp_service")
self.declare_initial_parameters()
self.get_initial_parameters()
self.declare_nested_parameters()
self.get_nested_parameters()
param_listener = grasp_service_parameters.ParamListener(self)
self.params = param_listener.get_params()
self.log_parameters()

self.service = self.create_service(srv_type=Empty, srv_name="grasp_service", callback=self.grasp)
self.joint_trajectory_action_client = FollowJointTrajectoryActionClient(self.joints_controller_name)
self.gripper_action = GripperActionClient(self.gripper_controller_name)
self.joint_trajectory_action_client = FollowJointTrajectoryActionClient(self.params.joints_controller_name)
self.gripper_action = GripperActionClient(self.params.gripper_controller_name)

def declare_nested_parameters(self):
self.declar_times_and_positions_parameters_from_list(
"times_for_positions_to_target", "positions_to_target", self.positions_to_target_list
)
self.declar_times_and_positions_parameters_from_list(
"times_for_positions_to_home", "positions_to_home", self.positions_to_home_list
)

def get_nested_parameters(self):
self.positions_to_target = self.get_nested_double_array_parameters(
"positions_to_target", self.positions_to_target_list
)

self.times_for_positions_to_target = self.get_nested_double_parameters(
"times_for_positions_to_target", self.positions_to_target_list
)
def log_parameters(self):
self.get_logger().info(f"Got joints {self.params.joints_names}")
self.get_logger().info(f"Got positions to target list {self.params.positions_to_target_list}")
for pos_to_target in self.params.positions_to_target_list:
self.get_logger().info(
f"Got {pos_to_target} to target"
f" {self.params.positions_to_target.get_entry(pos_to_target).positions} with time"
f" {self.params.positions_to_target.get_entry(pos_to_target).time_to_target}"
)

self.positions_to_home = self.get_nested_double_array_parameters(
"positions_to_home", self.positions_to_home_list
)
self.get_logger().info(f"Got positions to home list {self.params.positions_to_home_list}")

self.times_for_positions_to_home = self.get_nested_double_parameters(
"times_for_positions_to_home", self.positions_to_home_list
)
for pos_to_home in self.params.positions_to_home_list:
self.get_logger().info(
f"Got {pos_to_home} to home {self.params.positions_to_home.get_entry(pos_to_home).positions} with time"
f" {self.params.positions_to_home.get_entry(pos_to_home).time_to_target}"
)

def get_nested_double_array_parameters(self, primary_key_name, nested_keys_names_list):
return [
self.get_parameter(f"{primary_key_name}.{nested_key_name}").get_parameter_value().double_array_value
for nested_key_name in nested_keys_names_list
]
self.get_logger().info(f"Got time to wait on target {self.params.time_to_wait_on_target}")

def get_nested_double_parameters(self, primary_key_name, nested_keys_names_list):
return [
self.get_parameter(f"{primary_key_name}.{nested_key_name}").get_parameter_value().integer_value
for nested_key_name in nested_keys_names_list
]
self.get_logger().info(f"Got joints_controller_name {self.params.joints_controller_name}")
self.get_logger().info(f"Got gripper_controller_name {self.params.gripper_controller_name}")

def get_initial_parameters(self):
self.joints_controller_name = self.get_parameter("joints_controller_name").get_parameter_value().string_value
self.joints_names = self.get_parameter("joints_names").get_parameter_value().string_array_value
self.time_to_wait_on_target = self.get_parameter("time_to_wait_on_target").get_parameter_value().integer_value
self.positions_to_target_list = (
self.get_parameter("positions_to_target_list").get_parameter_value().string_array_value
)
self.positions_to_home_list = (
self.get_parameter("positions_to_home_list").get_parameter_value().string_array_value
)
self.gripper_close_position = self.get_parameter("gripper_close").get_parameter_value().double_value
self.gripper_open_position = self.get_parameter("gripper_open").get_parameter_value().double_value
self.gripper_controller_name = self.get_parameter("gripper_controller_name").get_parameter_value().string_value

def declare_initial_parameters(self):
self.declare_parameters(
namespace="",
parameters=[
("joints_controller_name", Parameter.Type.STRING),
("joints_names", Parameter.Type.STRING_ARRAY),
("positions_to_target_list", Parameter.Type.STRING_ARRAY),
("positions_to_target_list", Parameter.Type.STRING_ARRAY),
("positions_to_home_list", Parameter.Type.STRING_ARRAY),
("time_to_wait_on_target", Parameter.Type.INTEGER),
("gripper_close", Parameter.Type.DOUBLE),
("gripper_open", Parameter.Type.DOUBLE),
("gripper_controller_name", Parameter.Type.STRING),
],
)
self.get_logger().info(f"Got gripper_close {self.params.gripper_close}")
self.get_logger().info(f"Got gripper_open {self.params.gripper_open}")

def log_parameters(self):
self.get_logger().info(f"Got joint {self.joints_names}")
self.get_logger().info(f"Got positions {self.positions_to_target_list}")
self.get_logger().info(f"Got positions to target {self.positions_to_target}")
self.get_logger().info(f"Got times for positions to target {self.times_for_positions_to_target}")
self.get_logger().info(f"Got positions to home {self.positions_to_home}")
self.get_logger().info(f"Got times for positions to home {self.times_for_positions_to_home}")
self.get_logger().info(f"Got time to wait on target {self.time_to_wait_on_target}")

def declar_times_and_positions_parameters_from_list(self, times_prefix, position_prefix, list_of_parameters):
for position in list_of_parameters:
self.get_logger().info(f"{position_prefix}.{position}")
self.declare_parameter(f"{times_prefix}.{position}", Parameter.Type.INTEGER)
self.declare_parameter(f"{position_prefix}.{position}", Parameter.Type.DOUBLE_ARRAY)

def create_trajectory_goal(self, array_of_points, times_in_sec):
def create_trajectory_goal(self, list_of_position, map_of_positions):
trajectory = JointTrajectory()
trajectory.joint_names = self.joints_names
trajectory.joint_names = self.params.joints_names
trajectory.points = [
JointTrajectoryPoint(positions=points, time_from_start=Duration(sec=time_from_sec))
for points, time_from_sec in zip(array_of_points, times_in_sec)
JointTrajectoryPoint(
positions=map_of_positions.get_entry(positions).positions,
time_from_start=Duration(sec=map_of_positions.get_entry(positions).time_to_target),
)
for positions in list_of_position
]

trajectory.points.append(
JointTrajectoryPoint(
positions=array_of_points[-1], time_from_start=Duration(sec=times_in_sec[-1], nanosec=1)
positions=map_of_positions.get_entry(list_of_position[-1]).positions,
time_from_start=Duration(
sec=map_of_positions.get_entry(list_of_position[-1]).time_to_target, nanosec=1
),
)
)
goal_msg = FollowJointTrajectory.Goal()
Expand All @@ -197,13 +145,13 @@ def set_gripper_postion(self, gripper_postion):
def grasp(self, request, response):
self.get_logger().error("Trigger received. Grasp sequence will be started.")

self.go_to_pose(self.positions_to_target, self.times_for_positions_to_target)
self.set_gripper_postion(self.gripper_close_position)
self.go_to_pose(self.params.positions_to_target_list, self.params.positions_to_target)
self.set_gripper_postion(self.params.gripper_close)

sleep(self.time_to_wait_on_target)
sleep(self.params.time_to_wait_on_target)

self.go_to_pose(self.positions_to_home, self.times_for_positions_to_home)
self.set_gripper_postion(self.gripper_open_position)
self.go_to_pose(self.params.positions_to_home_list, self.params.positions_to_home)
self.set_gripper_postion(self.params.gripper_open)

return response

Expand Down
Loading

0 comments on commit 7a4fd0c

Please sign in to comment.