Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨ Add the possibility to set the base pose in the visualizer and add the 3D point visualization #74

Merged
merged 2 commits into from
Jan 8, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 109 additions & 11 deletions robot_log_visualizer/file_reader/signal_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
import h5py
import numpy as np
from PyQt5.QtCore import pyqtSignal, QThread, QMutex, QMutexLocker
from robot_log_visualizer.utils.utils import PeriodicThreadState
from robot_log_visualizer.utils.utils import PeriodicThreadState, RobotStatePath
import idyntree.swig as idyn


class TextLoggingMsg:
Expand Down Expand Up @@ -41,6 +42,12 @@ def __init__(self, period: float):
self._index = 0
self.index_lock = QMutex()

self._robot_state_path = RobotStatePath()
self.robot_state_path_lock = QMutex()

self._3d_points_path = {}
self._3d_points_path_lock = QMutex()

self.period = period

self.data = {}
Expand Down Expand Up @@ -190,6 +197,17 @@ def index(self, index):
locker = QMutexLocker(self.index_lock)
self._index = index

@property
def robot_state_path(self):
locker = QMutexLocker(self.robot_state_path_lock)
value = self._robot_state_path
return value

@robot_state_path.setter
def robot_state_path(self, robot_state_path):
locker = QMutexLocker(self.robot_state_path_lock)
self._robot_state_path = robot_state_path

def register_update_index(self, slot):
self.update_index_signal.connect(slot)

Expand All @@ -207,18 +225,98 @@ def current_time(self):
value = self._current_time
return value

def get_joints_position(self):
return self.data[self.root_name]["joints_state"]["positions"]["data"]
def get_item_from_path(self, path, default_path=None):
data = self.data[self.root_name]

if not path:
if default_path is None:
return None, None
else:
for key in default_path:
data = data[key]
return data["data"], data["timestamps"]

for key in path:
data = data[key]

return data["data"], data["timestamps"]

def get_item_from_path_at_index(self, path, index, default_path=None):
data, timestamps = self.get_item_from_path(path, default_path)
if data is None:
return None
closest_index = np.argmin(np.abs(timestamps - self.timestamps[index]))
return data[closest_index, :]

def get_robot_state_at_index(self, index):
robot_state = {}

self.robot_state_path_lock.lock()
robot_state["joints_position"] = self.get_item_from_path_at_index(
self._robot_state_path.joints_state_path,
index,
default_path=["joints_state", "positions"],
)

robot_state["base_position"] = self.get_item_from_path_at_index(
self._robot_state_path.base_position_path, index
)

def get_joints_position_at_index(self, index):
joints_position_timestamps = self.data[self.root_name]["joints_state"][
"positions"
]["timestamps"]
# given the index find the closest timestamp
closest_index = np.argmin(
np.abs(joints_position_timestamps - self.timestamps[index])
robot_state["base_orientation"] = self.get_item_from_path_at_index(
self._robot_state_path.base_orientation_path, index
)
return self.get_joints_position()[closest_index, :]
self.robot_state_path_lock.unlock()

if robot_state["base_position"] is None:
robot_state["base_position"] = np.zeros(3)

if robot_state["base_orientation"] is None:
robot_state["base_orientation"] = np.zeros(3)

# check the size of the base_orientation if 3 we assume is store ad rpy otherwise as a quaternion we need to convert it in a rotation matrix
if robot_state["base_orientation"].shape[0] == 3:
robot_state["base_orientation"] = idyn.Rotation.RPY(
robot_state["base_orientation"][0],
robot_state["base_orientation"][1],
robot_state["base_orientation"][2],
).toNumPy()
if robot_state["base_orientation"].shape[0] == 4:
# convert the x y z w quaternion into w x y z
tmp_quat = robot_state["base_orientation"]
tmp_quat = np.array([tmp_quat[3], tmp_quat[0], tmp_quat[1], tmp_quat[2]])

robot_state["base_orientation"] = idyn.Rotation.RotationFromQuaternion(
tmp_quat
).toNumPy()

return robot_state

def get_3d_point_at_index(self, index):
points = {}

self._3d_points_path_lock.lock()

for key, value in self._3d_points_path.items():
# force the size of the points to be 3 if less than 3 we assume that the point is a 2d point and we add a 0 as z coordinate
points[key] = self.get_item_from_path_at_index(value, index)
if points[key].shape[0] < 3:
points[key] = np.concatenate(
(points[key], np.zeros(3 - points[key].shape[0]))
)

self._3d_points_path_lock.unlock()

return points

def register_3d_point(self, key, points_path):
self._3d_points_path_lock.lock()
self._3d_points_path[key] = points_path
self._3d_points_path_lock.unlock()

def unregister_3d_point(self, key):
self._3d_points_path_lock.lock()
del self._3d_points_path[key]
self._3d_points_path_lock.unlock()

def run(self):
while True:
Expand Down
50 changes: 40 additions & 10 deletions robot_log_visualizer/robot_visualizer/meshcat_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,16 @@ def __init__(self, signal_provider, period):
self.state_lock = QMutex()

self._period = period
self.meshcat_visualizer = MeshcatVisualizer()
self._meshcat_visualizer = MeshcatVisualizer()
self.meshcat_visualizer_mutex = QMutex()

self._is_model_loaded = False
self._signal_provider = signal_provider

self.custom_model_path = ""
self.custom_package_dir = ""
self.env_list = ["GAZEBO_MODEL_PATH", "ROS_PACKAGE_PATH", "AMENT_PREFIX_PATH"]
self._registered_3d_points = set()

@property
def state(self):
Expand All @@ -44,6 +47,19 @@ def state(self, new_state: PeriodicThreadState):
locker = QMutexLocker(self.state_lock)
self._state = new_state

def register_3d_point(self, point_path, color):
radius = 0.02
locker = QMutexLocker(self.meshcat_visualizer_mutex)
self._registered_3d_points.add(point_path)
self._meshcat_visualizer.load_sphere(
radius=radius, color=color, shape_name=point_path
)

def unregister_3d_point(self, point_path):
locker = QMutexLocker(self.meshcat_visualizer_mutex)
self._registered_3d_points.remove(point_path)
self._meshcat_visualizer.delete(shape_name=point_path)

def load_model(self, considered_joints, model_name):
def get_model_path_from_envs(env_list):
return [
Expand Down Expand Up @@ -132,7 +148,7 @@ def find_model_joints(model_name, considered_joints):
if not model_loader.isValid():
return False

self.meshcat_visualizer.load_model(
self._meshcat_visualizer.load_model(
model_loader.model(), model_name="robot", color=0.8
)

Expand All @@ -141,23 +157,37 @@ def find_model_joints(model_name, considered_joints):
return True

def run(self):
base_rotation = np.eye(3)
base_position = np.array([0.0, 0.0, 0.0])
identity = np.eye(3)

while True:
start = time.time()

if self.state == PeriodicThreadState.running and self._is_model_loaded:
robot_state = self._signal_provider.get_robot_state_at_index(
self._signal_provider.index
)

self.meshcat_visualizer_mutex.lock()
# These are the robot measured joint positions in radians
self.meshcat_visualizer.set_multibody_system_state(
base_position,
base_rotation,
joint_value=self._signal_provider.get_joints_position_at_index(
self._signal_provider.index
)[self.model_joints_index],
self._meshcat_visualizer.set_multibody_system_state(
base_position=robot_state["base_position"],
base_rotation=robot_state["base_orientation"],
joint_value=robot_state["joints_position"][self.model_joints_index],
model_name="robot",
)

for points_path, points in self._signal_provider.get_3d_point_at_index(
self._signal_provider.index
).items():
if points_path not in self._registered_3d_points:
continue

self._meshcat_visualizer.set_primitive_geometry_transform(
position=points, rotation=identity, shape_name=points_path
)

self.meshcat_visualizer_mutex.unlock()

sleep_time = self._period - (time.time() - start)
if sleep_time > 0:
time.sleep(sleep_time)
Expand Down
Loading
Loading