diff --git a/rosbag2_interfaces/CMakeLists.txt b/rosbag2_interfaces/CMakeLists.txt index 6999fdc962..32ce755e5d 100644 --- a/rosbag2_interfaces/CMakeLists.txt +++ b/rosbag2_interfaces/CMakeLists.txt @@ -18,16 +18,20 @@ rosidl_generate_interfaces(${PROJECT_NAME} "msg/WriteSplitEvent.msg" "srv/Burst.srv" "srv/GetRate.srv" + "srv/IsDiscoveryStopped.srv" "srv/IsPaused.srv" "srv/Pause.srv" "srv/Play.srv" "srv/PlayNext.srv" + "srv/Record.srv" "srv/Resume.srv" "srv/Seek.srv" "srv/SetRate.srv" "srv/Snapshot.srv" "srv/SplitBagfile.srv" + "srv/StartDiscovery.srv" "srv/Stop.srv" + "srv/StopDiscovery.srv" "srv/TogglePaused.srv" DEPENDENCIES builtin_interfaces ADD_LINTER_TESTS diff --git a/rosbag2_interfaces/srv/IsDiscoveryStopped.srv b/rosbag2_interfaces/srv/IsDiscoveryStopped.srv new file mode 100644 index 0000000000..91a705f4f0 --- /dev/null +++ b/rosbag2_interfaces/srv/IsDiscoveryStopped.srv @@ -0,0 +1,2 @@ +--- +bool stopped diff --git a/rosbag2_interfaces/srv/Record.srv b/rosbag2_interfaces/srv/Record.srv new file mode 100644 index 0000000000..ed97d539c0 --- /dev/null +++ b/rosbag2_interfaces/srv/Record.srv @@ -0,0 +1 @@ +--- diff --git a/rosbag2_interfaces/srv/StartDiscovery.srv b/rosbag2_interfaces/srv/StartDiscovery.srv new file mode 100644 index 0000000000..ed97d539c0 --- /dev/null +++ b/rosbag2_interfaces/srv/StartDiscovery.srv @@ -0,0 +1 @@ +--- diff --git a/rosbag2_interfaces/srv/StopDiscovery.srv b/rosbag2_interfaces/srv/StopDiscovery.srv new file mode 100644 index 0000000000..ed97d539c0 --- /dev/null +++ b/rosbag2_interfaces/srv/StopDiscovery.srv @@ -0,0 +1 @@ +--- diff --git a/rosbag2_transport/include/rosbag2_transport/recorder.hpp b/rosbag2_transport/include/rosbag2_transport/recorder.hpp index 6755fd5ff6..c3b5830c86 100644 --- a/rosbag2_transport/include/rosbag2_transport/recorder.hpp +++ b/rosbag2_transport/include/rosbag2_transport/recorder.hpp @@ -30,11 +30,16 @@ #include "rosbag2_cpp/writer.hpp" +#include "rosbag2_interfaces/srv/is_discovery_stopped.hpp" #include "rosbag2_interfaces/srv/is_paused.hpp" #include "rosbag2_interfaces/srv/pause.hpp" +#include "rosbag2_interfaces/srv/record.hpp" #include "rosbag2_interfaces/srv/resume.hpp" #include "rosbag2_interfaces/srv/snapshot.hpp" #include "rosbag2_interfaces/srv/split_bagfile.hpp" +#include "rosbag2_interfaces/srv/start_discovery.hpp" +#include "rosbag2_interfaces/srv/stop.hpp" +#include "rosbag2_interfaces/srv/stop_discovery.hpp" #include "rosbag2_interfaces/msg/write_split_event.hpp" @@ -154,6 +159,10 @@ class Recorder : public rclcpp::Node ROSBAG2_TRANSPORT_PUBLIC bool is_paused(); + /// Return the current discovery state + ROSBAG2_TRANSPORT_PUBLIC + bool is_discovery_stopped(); + inline constexpr static const auto kPauseResumeToggleKey = KeyboardHandler::KeyCode::SPACE; protected: diff --git a/rosbag2_transport/src/rosbag2_transport/recorder.cpp b/rosbag2_transport/src/rosbag2_transport/recorder.cpp index d5d1f0b0e5..e0caf8a477 100644 --- a/rosbag2_transport/src/rosbag2_transport/recorder.cpp +++ b/rosbag2_transport/src/rosbag2_transport/recorder.cpp @@ -86,6 +86,9 @@ class RecorderImpl /// Stop discovery void stop_discovery(); + /// Return the current discovery state. + bool is_discovery_stopped(); + std::unordered_map get_requested_or_available_topics(); /// Public members for access by wrapper @@ -135,15 +138,20 @@ class RecorderImpl std::string serialization_format_; std::unordered_map topic_qos_profile_overrides_; std::unordered_set topic_unknown_types_; + rclcpp::Service::SharedPtr srv_is_discovery_stopped_; rclcpp::Service::SharedPtr srv_is_paused_; rclcpp::Service::SharedPtr srv_pause_; + rclcpp::Service::SharedPtr srv_record_; rclcpp::Service::SharedPtr srv_resume_; rclcpp::Service::SharedPtr srv_snapshot_; rclcpp::Service::SharedPtr srv_split_bagfile_; + rclcpp::Service::SharedPtr srv_start_discovery_; + rclcpp::Service::SharedPtr srv_stop_; + rclcpp::Service::SharedPtr srv_stop_discovery_; std::mutex start_stop_transition_mutex_; std::mutex discovery_mutex_; - std::atomic stop_discovery_ = false; + std::atomic stop_discovery_ = true; std::atomic_uchar paused_ = 0; std::atomic in_recording_ = false; std::shared_ptr keyboard_handler_; @@ -294,6 +302,56 @@ void RecorderImpl::record() writer_->split_bagfile(); }); + srv_start_discovery_ = node->create_service( + "~/start_discovery", + [this]( + const std::shared_ptr/* request_header */, + const std::shared_ptr/* request */, + const std::shared_ptr/* response */) + { + start_discovery(); + }); + + srv_stop_discovery_ = node->create_service( + "~/stop_discovery", + [this]( + const std::shared_ptr/* request_header */, + const std::shared_ptr/* request */, + const std::shared_ptr/* response */) + { + stop_discovery(); + }); + + srv_is_discovery_stopped_ = node->create_service( + "~/is_discovery_stopped", + [this]( + const std::shared_ptr/* request_header */, + const std::shared_ptr/* request */, + const std::shared_ptr response) + { + response->stopped = is_discovery_stopped(); + }); + + srv_record_ = node->create_service( + "~/record", + [this]( + const std::shared_ptr/* request_header */, + const std::shared_ptr/* request */, + const std::shared_ptr/* response */) + { + record(); + }); + + srv_stop_ = node->create_service( + "~/stop", + [this]( + const std::shared_ptr/* request_header */, + const std::shared_ptr/* request */, + const std::shared_ptr/* response */) + { + stop(); + }); + srv_pause_ = node->create_service( "~/pause", [this]( @@ -328,6 +386,10 @@ void RecorderImpl::record() node->create_publisher("events/write_split", 1); // Start the thread that will publish events + { + std::lock_guard lock(event_publisher_thread_mutex_); + event_publisher_thread_should_exit_ = false; + } event_publisher_thread_ = std::thread(&RecorderImpl::event_publisher_thread_main, this); rosbag2_cpp::bag_events::WriterEventCallbacks callbacks; @@ -436,7 +498,7 @@ bool RecorderImpl::is_paused() void RecorderImpl::start_discovery() { std::lock_guard state_lock(discovery_mutex_); - if (stop_discovery_.exchange(false)) { + if (!stop_discovery_.exchange(false)) { RCLCPP_DEBUG(node->get_logger(), "Recorder topic discovery is already running."); } else { discovery_future_ = @@ -464,6 +526,11 @@ void RecorderImpl::stop_discovery() } } +bool RecorderImpl::is_discovery_stopped() +{ + return stop_discovery_.load(); +} + void RecorderImpl::topics_discovery() { // If using sim time - wait until /clock topic received before even creating subscriptions @@ -849,6 +916,12 @@ Recorder::is_paused() return pimpl_->is_paused(); } +bool +Recorder::is_discovery_stopped() +{ + return pimpl_->is_discovery_stopped(); +} + std::unordered_map Recorder::get_requested_or_available_topics() { diff --git a/rosbag2_transport/test/rosbag2_transport/mock_sequential_writer.hpp b/rosbag2_transport/test/rosbag2_transport/mock_sequential_writer.hpp index fba48d4987..763acf7617 100644 --- a/rosbag2_transport/test/rosbag2_transport/mock_sequential_writer.hpp +++ b/rosbag2_transport/test/rosbag2_transport/mock_sequential_writer.hpp @@ -38,6 +38,7 @@ class MockSequentialWriter : public rosbag2_cpp::writer_interfaces::BaseWriterIn void close() override { + topics_.clear(); writer_close_called_ = true; } diff --git a/rosbag2_transport/test/rosbag2_transport/test_record_services.cpp b/rosbag2_transport/test/rosbag2_transport/test_record_services.cpp index f585549bca..3513e67b5e 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_record_services.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_record_services.cpp @@ -21,11 +21,16 @@ #include "rclcpp/rclcpp.hpp" +#include "rosbag2_interfaces/srv/is_discovery_stopped.hpp" #include "rosbag2_interfaces/srv/is_paused.hpp" #include "rosbag2_interfaces/srv/pause.hpp" +#include "rosbag2_interfaces/srv/record.hpp" #include "rosbag2_interfaces/srv/resume.hpp" #include "rosbag2_interfaces/srv/snapshot.hpp" #include "rosbag2_interfaces/srv/split_bagfile.hpp" +#include "rosbag2_interfaces/srv/start_discovery.hpp" +#include "rosbag2_interfaces/srv/stop.hpp" +#include "rosbag2_interfaces/srv/stop_discovery.hpp" #include "rosbag2_transport/recorder.hpp" #include "rosbag2_test_common/publication_manager.hpp" @@ -39,11 +44,16 @@ using namespace ::testing; // NOLINT class RecordSrvsTest : public RecordIntegrationTestFixture { public: + using IsDiscoveryStopped = rosbag2_interfaces::srv::IsDiscoveryStopped; using IsPaused = rosbag2_interfaces::srv::IsPaused; using Pause = rosbag2_interfaces::srv::Pause; + using Record = rosbag2_interfaces::srv::Record; using Resume = rosbag2_interfaces::srv::Resume; using Snapshot = rosbag2_interfaces::srv::Snapshot; using SplitBagfile = rosbag2_interfaces::srv::SplitBagfile; + using StartDiscovery = rosbag2_interfaces::srv::StartDiscovery; + using Stop = rosbag2_interfaces::srv::Stop; + using StopDiscovery = rosbag2_interfaces::srv::StopDiscovery; explicit RecordSrvsTest(bool snapshot_mode = false) : RecordIntegrationTestFixture(), @@ -80,11 +90,17 @@ class RecordSrvsTest : public RecordIntegrationTestFixture pub_manager.setup_publisher(test_topic_, string_message, 10); const std::string ns = "/" + recorder_name_; + cli_is_discovery_stopped_ = client_node_->create_client(ns + + "/is_discovery_stopped"); cli_is_paused_ = client_node_->create_client(ns + "/is_paused"); cli_pause_ = client_node_->create_client(ns + "/pause"); + cli_record_ = client_node_->create_client(ns + "/record"); cli_resume_ = client_node_->create_client(ns + "/resume"); cli_snapshot_ = client_node_->create_client(ns + "/snapshot"); cli_split_bagfile_ = client_node_->create_client(ns + "/split_bagfile"); + cli_start_discovery_ = client_node_->create_client(ns + "/start_discovery"); + cli_stop_ = client_node_->create_client(ns + "/stop"); + cli_stop_discovery_ = client_node_->create_client(ns + "/stop_discovery"); exec_ = std::make_shared(); exec_->add_node(recorder_); @@ -141,11 +157,16 @@ class RecordSrvsTest : public RecordIntegrationTestFixture // Service clients rclcpp::Node::SharedPtr client_node_; + rclcpp::Client::SharedPtr cli_is_discovery_stopped_; rclcpp::Client::SharedPtr cli_is_paused_; rclcpp::Client::SharedPtr cli_pause_; + rclcpp::Client::SharedPtr cli_record_; rclcpp::Client::SharedPtr cli_resume_; rclcpp::Client::SharedPtr cli_snapshot_; rclcpp::Client::SharedPtr cli_split_bagfile_; + rclcpp::Client::SharedPtr cli_start_discovery_; + rclcpp::Client::SharedPtr cli_stop_; + rclcpp::Client::SharedPtr cli_stop_discovery_; bool snapshot_mode_; }; @@ -226,3 +247,37 @@ TEST_F(RecordSrvsTest, pause_resume) is_paused_response = successful_service_request(cli_is_paused_); EXPECT_FALSE(is_paused_response->paused); } + +TEST_F(RecordSrvsTest, stop_start_discovery) +{ + EXPECT_FALSE(recorder_->is_discovery_stopped()); + auto is_discovery_stopped_response = + successful_service_request(cli_is_discovery_stopped_); + EXPECT_FALSE(is_discovery_stopped_response->stopped); + + successful_service_request(cli_stop_discovery_); + EXPECT_TRUE(recorder_->is_discovery_stopped()); + is_discovery_stopped_response = + successful_service_request(cli_is_discovery_stopped_); + EXPECT_TRUE(is_discovery_stopped_response->stopped); + + successful_service_request(cli_start_discovery_); + EXPECT_FALSE(recorder_->is_discovery_stopped()); + is_discovery_stopped_response = + successful_service_request(cli_is_discovery_stopped_); + EXPECT_FALSE(is_discovery_stopped_response->stopped); +} + +TEST_F(RecordSrvsTest, record_stop) +{ + auto & writer = recorder_->get_writer_handle(); + auto & mock_writer = dynamic_cast(writer.get_implementation_handle()); + + EXPECT_FALSE(mock_writer.closed_was_called()); + + successful_service_request(cli_stop_); + EXPECT_TRUE(mock_writer.closed_was_called()); + + successful_service_request(cli_record_); + EXPECT_FALSE(mock_writer.closed_was_called()); +}