diff --git a/webots_ros2_tests/test/test_system_driver.py b/webots_ros2_tests/test/test_system_driver.py index 80c1463d2..1bef73e8a 100644 --- a/webots_ros2_tests/test/test_system_driver.py +++ b/webots_ros2_tests/test/test_system_driver.py @@ -22,6 +22,9 @@ import time import pytest import rclpy +from rclpy.executors import SingleThreadedExecutor +from rclpy.node import Node +from rclpy.qos import QoSProfile, QoSDurabilityPolicy from sensor_msgs.msg import LaserScan, PointCloud2 from std_srvs.srv import Trigger from sensor_msgs.msg import Range, Image, Imu, Illuminance @@ -36,6 +39,28 @@ from webots_ros2_driver.webots_controller import WebotsController from webots_ros2_tests.utils import TestWebots, initialize_webots_test +def wait_for_topic(topic_name, timeout=30.0): + rclpy.init() + node = Node('readiness_checker') + executor = SingleThreadedExecutor() + executor.add_node(node) + + qos = QoSProfile(depth=1) + qos.durability = QoSDurabilityPolicy.TRANSIENT_LOCAL + + try: + node.get_logger().info(f"Waiting for topic '{topic_name}'...") + end_time = time.time() + timeout + while time.time() < end_time: + info = node.get_publishers_info_by_topic(topic_name) + if info: + node.get_logger().info(f"Topic '{topic_name}' is available.") + return + rclpy.spin_once(node, timeout_sec=0.1) + + raise RuntimeError(f"Topic '{topic_name}' not available within timeout of {timeout} seconds.") + finally: + rclpy.shutdown() @pytest.mark.rostest def generate_test_description(): @@ -56,10 +81,13 @@ def generate_test_description(): parameters=[{'robot_description': robot_description_path, 'use_sim_time': True}] ) + clock_ready_action = launch.actions.OpaqueFunction(function=lambda context: wait_for_topic('/clock', timeout=30.0)) + return LaunchDescription([ webots, webots._supervisor, webots_driver, + clock_ready_action, launch_testing.actions.ReadyToTest(), launch.actions.RegisterEventHandler( event_handler=launch.event_handlers.OnProcessExit(