diff --git a/rosbag2_py/src/rosbag2_py/_transport.cpp b/rosbag2_py/src/rosbag2_py/_transport.cpp index a92dd3684..6143e6d1b 100644 --- a/rosbag2_py/src/rosbag2_py/_transport.cpp +++ b/rosbag2_py/src/rosbag2_py/_transport.cpp @@ -184,7 +184,15 @@ class Player const rosbag2_storage::StorageOptions & storage_options, PlayOptions & play_options) { - play_impl(storage_options, play_options, false); + play_impl(storage_options, play_options, false, 0, true); + } + + void play_with_signal_option( + const rosbag2_storage::StorageOptions & storage_options, + PlayOptions & play_options, + bool enable_signal_handling) + { + play_impl(storage_options, play_options, false, 0, enable_signal_handling); } void burst( @@ -192,7 +200,16 @@ class Player PlayOptions & play_options, size_t num_messages) { - play_impl(storage_options, play_options, true, num_messages); + play_impl(storage_options, play_options, true, num_messages, true); + } + + void burst_with_signal_option( + const rosbag2_storage::StorageOptions & storage_options, + PlayOptions & play_options, + size_t num_messages, + bool enable_signal_handling) + { + play_impl(storage_options, play_options, true, num_messages, enable_signal_handling); } protected: @@ -215,13 +232,10 @@ class Player { if (old_sigterm_handler_ != SIG_ERR) { std::signal(SIGTERM, old_sigterm_handler_); - old_sigterm_handler_ = SIG_ERR; } if (old_sigint_handler_ != SIG_ERR) { std::signal(SIGINT, old_sigint_handler_); - old_sigint_handler_ = SIG_ERR; } - deferred_sig_number_ = -1; } static void process_deferred_signal() @@ -243,9 +257,12 @@ class Player const rosbag2_storage::StorageOptions & storage_options, PlayOptions & play_options, bool burst = false, - size_t burst_num_messages = 0) + size_t burst_num_messages = 0, + bool enable_signal_handling = true) { - install_signal_handlers(); + if (enable_signal_handling) { + install_signal_handlers(); + } try { auto reader = rosbag2_transport::ReaderWriterFactory::make_reader(storage_options); std::shared_ptr keyboard_handler; @@ -297,25 +314,30 @@ class Player } exec.remove_node(player); } catch (...) { - process_deferred_signal(); - uninstall_signal_handlers(); + if (enable_signal_handling) { + uninstall_signal_handlers(); + process_deferred_signal(); + } throw; } - process_deferred_signal(); - uninstall_signal_handlers(); + if (enable_signal_handling) { + uninstall_signal_handlers(); + process_deferred_signal(); + } } static std::atomic_bool exit_; + static_assert(std::atomic_bool::is_always_lock_free); static std::condition_variable wait_for_exit_cv_; static SignalHandlerType old_sigint_handler_; static SignalHandlerType old_sigterm_handler_; - static int deferred_sig_number_; + static volatile std::sig_atomic_t deferred_sig_number_; std::mutex wait_for_exit_mutex_; }; Player::SignalHandlerType Player::old_sigint_handler_ {SIG_ERR}; Player::SignalHandlerType Player::old_sigterm_handler_ {SIG_ERR}; -int Player::deferred_sig_number_{-1}; +volatile std::sig_atomic_t Player::deferred_sig_number_{-1}; std::atomic_bool Player::exit_{false}; std::condition_variable Player::wait_for_exit_cv_{}; @@ -339,7 +361,18 @@ class Recorder RecordOptions & record_options, std::string & node_name) { - install_signal_handlers(); + record_with_signal_option(storage_options, record_options, node_name, true); + } + + void record_with_signal_option( + const rosbag2_storage::StorageOptions & storage_options, + RecordOptions & record_options, + std::string & node_name, + bool enable_signal_handling) + { + if (enable_signal_handling) { + install_signal_handlers(); + } try { exit_ = false; auto exec = std::make_unique(); @@ -385,12 +418,16 @@ class Recorder } exec->remove_node(recorder); } catch (...) { - process_deferred_signal(); - uninstall_signal_handlers(); + if (enable_signal_handling) { + uninstall_signal_handlers(); + process_deferred_signal(); + } throw; } - process_deferred_signal(); - uninstall_signal_handlers(); + if (enable_signal_handling) { + uninstall_signal_handlers(); + process_deferred_signal(); + } } static void cancel() @@ -419,13 +456,10 @@ class Recorder { if (old_sigterm_handler_ != SIG_ERR) { std::signal(SIGTERM, old_sigterm_handler_); - old_sigterm_handler_ = SIG_ERR; } if (old_sigint_handler_ != SIG_ERR) { std::signal(SIGINT, old_sigint_handler_); - old_sigint_handler_ = SIG_ERR; } - deferred_sig_number_ = -1; } static void process_deferred_signal() @@ -444,16 +478,17 @@ class Recorder } static std::atomic_bool exit_; + static_assert(std::atomic_bool::is_always_lock_free); static std::condition_variable wait_for_exit_cv_; static SignalHandlerType old_sigint_handler_; static SignalHandlerType old_sigterm_handler_; - static int deferred_sig_number_; + static volatile std::sig_atomic_t deferred_sig_number_; std::mutex wait_for_exit_mutex_; }; Recorder::SignalHandlerType Recorder::old_sigint_handler_ {SIG_ERR}; Recorder::SignalHandlerType Recorder::old_sigterm_handler_ {SIG_ERR}; -int Recorder::deferred_sig_number_{-1}; +volatile std::sig_atomic_t Recorder::deferred_sig_number_{-1}; std::atomic_bool Recorder::exit_{false}; std::condition_variable Recorder::wait_for_exit_cv_{}; @@ -591,18 +626,22 @@ PYBIND11_MODULE(_transport, m) { .def(py::init<>()) .def(py::init()) .def("play", &rosbag2_py::Player::play, py::arg("storage_options"), py::arg("play_options")) - .def( - "burst", &rosbag2_py::Player::burst, py::arg("storage_options"), py::arg("play_options"), + .def("play", &rosbag2_py::Player::play_with_signal_option, py::arg("storage_options"), + py::arg("play_options"), py::arg("enable_signal_handling")) + .def("burst", &rosbag2_py::Player::burst, py::arg("storage_options"), py::arg("play_options"), py::arg("num_messages")) + .def("burst", &rosbag2_py::Player::burst_with_signal_option, py::arg("storage_options"), + py::arg("play_options"), py::arg("num_messages"), py::arg("enable_signal_handling")) .def_static("cancel", &rosbag2_py::Player::cancel) ; py::class_(m, "Recorder") .def(py::init<>()) .def(py::init()) - .def( - "record", &rosbag2_py::Recorder::record, py::arg("storage_options"), py::arg("record_options"), - py::arg("node_name") = "rosbag2_recorder") + .def("record", &rosbag2_py::Recorder::record, py::arg("storage_options"), + py::arg("record_options"), py::arg("node_name") = "rosbag2_recorder") + .def("record", &rosbag2_py::Recorder::record_with_signal_option, py::arg("storage_options"), + py::arg("record_options"), py::arg("node_name"), py::arg("enable_signal_handling")) .def_static("cancel", &rosbag2_py::Recorder::cancel) ; diff --git a/rosbag2_py/test/test_transport.py b/rosbag2_py/test/test_transport.py index dabc96b1f..0700c3ad4 100644 --- a/rosbag2_py/test/test_transport.py +++ b/rosbag2_py/test/test_transport.py @@ -15,6 +15,7 @@ import datetime from pathlib import Path import re +import signal import threading from common import get_rosbag_options, wait_for @@ -30,6 +31,23 @@ RESOURCES_PATH = Path(__file__).parent / 'resources' PLAYBACK_UNTIL_TIMESTAMP_REGEX_STRING = r'\[rosbag2_player]: Playback until timestamp: -1' +RECORDING_STARTED_REGEX_STRING = r'\[rosbag2_recorder]: Recording...' + + +def check_record_start_output(cumulative_err, capfd): + out, err = capfd.readouterr() + cumulative_err += err + expected_string_regex = re.compile(RECORDING_STARTED_REGEX_STRING) + matches = expected_string_regex.search(cumulative_err) + return matches is not None + + +def check_playback_start_output(cumulative_err, capfd): + out, err = capfd.readouterr() + cumulative_err += err + expected_string_regex = re.compile(PLAYBACK_UNTIL_TIMESTAMP_REGEX_STRING) + matches = expected_string_regex.search(cumulative_err) + return matches is not None def test_options_qos_conversion(): @@ -67,6 +85,166 @@ def test_recoder_log_level(): rosbag2_py.Recorder(invalid_log_level) +@pytest.mark.parametrize('storage_id', TESTED_STORAGE_IDS) +def test_process_sigint_in_recorder(tmp_path, storage_id, capfd): + bag_path = tmp_path / 'test_process_sigint_in_recorder' + storage_options, converter_options = get_rosbag_options(str(bag_path), storage_id) + + recorder = rosbag2_py.Recorder() + + record_options = rosbag2_py.RecordOptions() + record_options.all_topics = True + + record_thread = threading.Thread( + target=recorder.record, + args=(storage_options, record_options), + daemon=True) + record_thread.start() + + captured_err = '' + assert wait_for(lambda: check_record_start_output(captured_err, capfd), + timeout=rclpy.duration.Duration(seconds=3)) + + sigint_triggered = False + try: + signal.raise_signal(signal.SIGINT) + # Expected to call deferred signal handler no later than exit from the record_thread + record_thread.join() + except KeyboardInterrupt: + sigint_triggered = True + pass + finally: + assert sigint_triggered + if record_thread.is_alive(): + record_thread.join(3) + assert not record_thread.is_alive() + + metadata_io = rosbag2_py.MetadataIo() + assert wait_for(lambda: metadata_io.metadata_file_exists(str(bag_path)), + timeout=rclpy.duration.Duration(seconds=3)) + metadata = metadata_io.read_metadata(str(bag_path)) + assert len(metadata.relative_file_paths) + storage_path = bag_path / metadata.relative_file_paths[0] + assert wait_for(lambda: storage_path.is_file(), + timeout=rclpy.duration.Duration(seconds=3)) + + +@pytest.mark.parametrize('storage_id', TESTED_STORAGE_IDS) +def test_record_process_sigint_in_python_handler(tmp_path, storage_id, capfd): + bag_path = tmp_path / 'test_record_sigint_in_python' + storage_options, converter_options = get_rosbag_options(str(bag_path), storage_id) + + recorder = rosbag2_py.Recorder() + + record_options = rosbag2_py.RecordOptions() + record_options.all_topics = True + + record_thread = threading.Thread( + target=recorder.record, + args=(storage_options, record_options, 'rosbag2_recorder', False), + daemon=True) + record_thread.start() + + captured_err = '' + assert wait_for(lambda: check_record_start_output(captured_err, capfd), + timeout=rclpy.duration.Duration(seconds=3)) + + sigint_triggered = False + try: + signal.raise_signal(signal.SIGINT) + except KeyboardInterrupt: + sigint_triggered = True + pass + finally: + assert sigint_triggered + # The recorder hasn't been stopped by signal need to call cancel() method to stop it. + recorder.cancel() + if record_thread.is_alive(): + record_thread.join(3) + assert not record_thread.is_alive() + + metadata_io = rosbag2_py.MetadataIo() + assert wait_for(lambda: metadata_io.metadata_file_exists(str(bag_path)), + timeout=rclpy.duration.Duration(seconds=3)) + metadata = metadata_io.read_metadata(str(bag_path)) + assert len(metadata.relative_file_paths) + storage_path = bag_path / metadata.relative_file_paths[0] + assert wait_for(lambda: storage_path.is_file(), + timeout=rclpy.duration.Duration(seconds=3)) + + +@pytest.mark.parametrize('storage_id', TESTED_STORAGE_IDS) +def test_process_sigint_in_player(storage_id, capfd): + bag_path = str(RESOURCES_PATH / storage_id / 'talker') + storage_options, converter_options = get_rosbag_options(bag_path, storage_id) + + player = rosbag2_py.Player() + + play_options = rosbag2_py.PlayOptions() + play_options.loop = True + play_options.start_paused = True + + player_thread = threading.Thread( + target=player.play, + args=(storage_options, play_options), + daemon=True) + player_thread.start() + + captured_err = '' + assert wait_for(lambda: check_playback_start_output(captured_err, capfd), + timeout=rclpy.duration.Duration(seconds=3)) + + sigint_triggered = False + try: + signal.raise_signal(signal.SIGINT) + # Expected to call deferred signal handler no later than exit from the player_thread + player_thread.join() + except KeyboardInterrupt: + sigint_triggered = True + pass + finally: + assert sigint_triggered + if player_thread.is_alive(): + player_thread.join(3) + assert not player_thread.is_alive() + + +@pytest.mark.parametrize('storage_id', TESTED_STORAGE_IDS) +def test_play_process_sigint_in_python_handler(storage_id, capfd): + bag_path = str(RESOURCES_PATH / storage_id / 'talker') + storage_options, converter_options = get_rosbag_options(bag_path, storage_id) + + player = rosbag2_py.Player() + + play_options = rosbag2_py.PlayOptions() + play_options.loop = True + play_options.start_paused = True + + player_thread = threading.Thread( + target=player.play, + args=(storage_options, play_options, False), + daemon=True) + player_thread.start() + + captured_err = '' + assert wait_for(lambda: check_playback_start_output(captured_err, capfd), + timeout=rclpy.duration.Duration(seconds=3)) + + sigint_triggered = False + try: + signal.raise_signal(signal.SIGINT) + except KeyboardInterrupt: + sigint_triggered = True + pass + finally: + assert sigint_triggered + # The player hasn't been stopped by signal need to call cancel() method to stop it. + player.cancel() + if player_thread.is_alive(): + player_thread.join(3) + assert not player_thread.is_alive() + + @pytest.mark.parametrize('storage_id', TESTED_STORAGE_IDS) def test_record_cancel(tmp_path, storage_id): bag_path = tmp_path / 'test_record_cancel' @@ -77,7 +255,7 @@ def test_record_cancel(tmp_path, storage_id): record_options = rosbag2_py.RecordOptions() record_options.all_topics = True record_options.is_discovery_disabled = False - record_options.topic_polling_interval = datetime.timedelta(milliseconds=100) + record_options.topic_polling_interval = datetime.timedelta(milliseconds=10) ctx = rclpy.Context() ctx.init() @@ -95,7 +273,8 @@ def test_record_cancel(tmp_path, storage_id): i = 0 msg = String() - while rclpy.ok() and i < 10: + assert rclpy.ok(context=ctx) + while rclpy.ok(context=ctx) and i < 10: msg.data = 'Hello World: {0}'.format(i) i += 1 pub.publish(msg) @@ -131,17 +310,8 @@ def test_play_cancel(storage_id, capfd): daemon=True) player_thread.start() - def check_playback_start_output(cumulative_out, cumulative_err): - out, err = capfd.readouterr() - cumulative_err += err - cumulative_out += out - expected_string_regex = re.compile(PLAYBACK_UNTIL_TIMESTAMP_REGEX_STRING) - matches = expected_string_regex.search(cumulative_err) - return matches is not None - - captured_out = '' captured_err = '' - assert wait_for(lambda: check_playback_start_output(captured_out, captured_err), + assert wait_for(lambda: check_playback_start_output(captured_err, capfd), timeout=rclpy.duration.Duration(seconds=3)) player.cancel()