Skip to content

Commit

Permalink
Add options to disable signal handling in Player and Recorder Python API
Browse files Browse the repository at this point in the history
- Also added test coverage for signals handling in Python API

Signed-off-by: Michael Orlov <[email protected]>
  • Loading branch information
MichaelOrlov committed Jun 12, 2024
1 parent e7e6005 commit a916b2b
Show file tree
Hide file tree
Showing 2 changed files with 249 additions and 40 deletions.
95 changes: 67 additions & 28 deletions rosbag2_py/src/rosbag2_py/_transport.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -184,15 +184,32 @@ class Player
const rosbag2_storage::StorageOptions & storage_options,
PlayOptions & play_options)
{
play_impl(storage_options, play_options, false);
play_impl(storage_options, play_options, false, 0, true);
}

void play_with_signal_option(
const rosbag2_storage::StorageOptions & storage_options,
PlayOptions & play_options,
bool enable_signal_handling)
{
play_impl(storage_options, play_options, false, 0, enable_signal_handling);
}

void burst(
const rosbag2_storage::StorageOptions & storage_options,
PlayOptions & play_options,
size_t num_messages)
{
play_impl(storage_options, play_options, true, num_messages);
play_impl(storage_options, play_options, true, num_messages, true);
}

void burst_with_signal_option(
const rosbag2_storage::StorageOptions & storage_options,
PlayOptions & play_options,
size_t num_messages,
bool enable_signal_handling)
{
play_impl(storage_options, play_options, true, num_messages, enable_signal_handling);
}

protected:
Expand All @@ -215,13 +232,10 @@ class Player
{
if (old_sigterm_handler_ != SIG_ERR) {
std::signal(SIGTERM, old_sigterm_handler_);
old_sigterm_handler_ = SIG_ERR;
}
if (old_sigint_handler_ != SIG_ERR) {
std::signal(SIGINT, old_sigint_handler_);
old_sigint_handler_ = SIG_ERR;
}
deferred_sig_number_ = -1;
}

static void process_deferred_signal()
Expand All @@ -243,9 +257,12 @@ class Player
const rosbag2_storage::StorageOptions & storage_options,
PlayOptions & play_options,
bool burst = false,
size_t burst_num_messages = 0)
size_t burst_num_messages = 0,
bool enable_signal_handling = true)
{
install_signal_handlers();
if (enable_signal_handling) {
install_signal_handlers();
}
try {
auto reader = rosbag2_transport::ReaderWriterFactory::make_reader(storage_options);
std::shared_ptr<KeyboardHandler> keyboard_handler;
Expand Down Expand Up @@ -297,25 +314,30 @@ class Player
}
exec.remove_node(player);
} catch (...) {
process_deferred_signal();
uninstall_signal_handlers();
if (enable_signal_handling) {
uninstall_signal_handlers();
process_deferred_signal();
}
throw;
}
process_deferred_signal();
uninstall_signal_handlers();
if (enable_signal_handling) {
uninstall_signal_handlers();
process_deferred_signal();
}
}

static std::atomic_bool exit_;
static_assert(std::atomic_bool::is_always_lock_free);
static std::condition_variable wait_for_exit_cv_;
static SignalHandlerType old_sigint_handler_;
static SignalHandlerType old_sigterm_handler_;
static int deferred_sig_number_;
static volatile std::sig_atomic_t deferred_sig_number_;
std::mutex wait_for_exit_mutex_;
};

Player::SignalHandlerType Player::old_sigint_handler_ {SIG_ERR};
Player::SignalHandlerType Player::old_sigterm_handler_ {SIG_ERR};
int Player::deferred_sig_number_{-1};
volatile std::sig_atomic_t Player::deferred_sig_number_{-1};
std::atomic_bool Player::exit_{false};
std::condition_variable Player::wait_for_exit_cv_{};

Expand All @@ -339,7 +361,18 @@ class Recorder
RecordOptions & record_options,
std::string & node_name)
{
install_signal_handlers();
record_with_signal_option(storage_options, record_options, node_name, true);
}

void record_with_signal_option(
const rosbag2_storage::StorageOptions & storage_options,
RecordOptions & record_options,
std::string & node_name,
bool enable_signal_handling)
{
if (enable_signal_handling) {
install_signal_handlers();
}
try {
exit_ = false;
auto exec = std::make_unique<rclcpp::executors::SingleThreadedExecutor>();
Expand Down Expand Up @@ -385,12 +418,16 @@ class Recorder
}
exec->remove_node(recorder);
} catch (...) {
process_deferred_signal();
uninstall_signal_handlers();
if (enable_signal_handling) {
uninstall_signal_handlers();
process_deferred_signal();
}
throw;
}
process_deferred_signal();
uninstall_signal_handlers();
if (enable_signal_handling) {
uninstall_signal_handlers();
process_deferred_signal();
}
}

static void cancel()
Expand Down Expand Up @@ -419,13 +456,10 @@ class Recorder
{
if (old_sigterm_handler_ != SIG_ERR) {
std::signal(SIGTERM, old_sigterm_handler_);
old_sigterm_handler_ = SIG_ERR;
}
if (old_sigint_handler_ != SIG_ERR) {
std::signal(SIGINT, old_sigint_handler_);
old_sigint_handler_ = SIG_ERR;
}
deferred_sig_number_ = -1;
}

static void process_deferred_signal()
Expand All @@ -444,16 +478,17 @@ class Recorder
}

static std::atomic_bool exit_;
static_assert(std::atomic_bool::is_always_lock_free);
static std::condition_variable wait_for_exit_cv_;
static SignalHandlerType old_sigint_handler_;
static SignalHandlerType old_sigterm_handler_;
static int deferred_sig_number_;
static volatile std::sig_atomic_t deferred_sig_number_;
std::mutex wait_for_exit_mutex_;
};

Recorder::SignalHandlerType Recorder::old_sigint_handler_ {SIG_ERR};
Recorder::SignalHandlerType Recorder::old_sigterm_handler_ {SIG_ERR};
int Recorder::deferred_sig_number_{-1};
volatile std::sig_atomic_t Recorder::deferred_sig_number_{-1};
std::atomic_bool Recorder::exit_{false};
std::condition_variable Recorder::wait_for_exit_cv_{};

Expand Down Expand Up @@ -591,18 +626,22 @@ PYBIND11_MODULE(_transport, m) {
.def(py::init<>())
.def(py::init<const std::string &>())
.def("play", &rosbag2_py::Player::play, py::arg("storage_options"), py::arg("play_options"))
.def(
"burst", &rosbag2_py::Player::burst, py::arg("storage_options"), py::arg("play_options"),
.def("play", &rosbag2_py::Player::play_with_signal_option, py::arg("storage_options"),
py::arg("play_options"), py::arg("enable_signal_handling"))
.def("burst", &rosbag2_py::Player::burst, py::arg("storage_options"), py::arg("play_options"),
py::arg("num_messages"))
.def("burst", &rosbag2_py::Player::burst_with_signal_option, py::arg("storage_options"),
py::arg("play_options"), py::arg("num_messages"), py::arg("enable_signal_handling"))
.def_static("cancel", &rosbag2_py::Player::cancel)
;

py::class_<rosbag2_py::Recorder>(m, "Recorder")
.def(py::init<>())
.def(py::init<const std::string &>())
.def(
"record", &rosbag2_py::Recorder::record, py::arg("storage_options"), py::arg("record_options"),
py::arg("node_name") = "rosbag2_recorder")
.def("record", &rosbag2_py::Recorder::record, py::arg("storage_options"),
py::arg("record_options"), py::arg("node_name") = "rosbag2_recorder")
.def("record", &rosbag2_py::Recorder::record_with_signal_option, py::arg("storage_options"),
py::arg("record_options"), py::arg("node_name"), py::arg("enable_signal_handling"))
.def_static("cancel", &rosbag2_py::Recorder::cancel)
;

Expand Down
Loading

0 comments on commit a916b2b

Please sign in to comment.