Skip to content

Commit

Permalink
Add option to set compression threads priority (#1457)
Browse files Browse the repository at this point in the history
* feat(SequentialCompressionWriter): Added option to set compression thread priority


Signed-off-by: Janosch Machowinski <[email protected]>

* feat: Added compression_threads_priority option

Signed-off-by: Janosch Machowinski <[email protected]>

* test: implemented test case for setting priority value

Signed-off-by: Janosch Machowinski <[email protected]>

* Redesign test for the case when compression writer sets threads priority

Co-authored-by: Janosch Machowinski <[email protected]>
Signed-off-by: Michael Orlov <[email protected]>

* fix: Fixed inverted check

Signed-off-by: Janosch Machowinski <[email protected]>

* chore: made compression_threads_priority an int32_t

Signed-off-by: Janosch Machowinski <[email protected]>

* Enable thread priority for Windows

Signed-off-by: Michael Orlov <[email protected]>

* Address grammar in log messages

Signed-off-by: Michael Orlov <[email protected]>

* fix: fixed namespace collision warning in test

Signed-off-by: Janosch Machowinski <[email protected]>

* Add compression_threads_priority to the node params parser

Signed-off-by: Michael Orlov <[email protected]>

---------

Signed-off-by: Janosch Machowinski <[email protected]>
Signed-off-by: Michael Orlov <[email protected]>
Co-authored-by: Janosch Machowinski <[email protected]>
Co-authored-by: Michael Orlov <[email protected]>
  • Loading branch information
3 people authored Dec 16, 2023
1 parent 95f78b6 commit cea3fb0
Show file tree
Hide file tree
Showing 14 changed files with 214 additions and 13 deletions.
1 change: 1 addition & 0 deletions rosbag2_compression/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ if(BUILD_TESTING)
test/rosbag2_compression/test_sequential_compression_writer.cpp)
target_link_libraries(test_sequential_compression_writer
${PROJECT_NAME}
fake_plugin
rosbag2_cpp::rosbag2_cpp
rosbag2_storage::rosbag2_storage
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#include <cstdint>
#include <string>
#include <optional>

#include "visibility_control.hpp"

Expand Down Expand Up @@ -60,7 +61,12 @@ struct CompressionOptions
std::string compression_format;
CompressionMode compression_mode;
uint64_t compression_queue_size;
/// \brief // The number of compression threads
uint64_t compression_threads;
/// \brief If set, the compression thread(s) will try to set the given priority for itself
/// For Windows the valid values are: THREAD_PRIORITY_LOWEST, THREAD_PRIORITY_BELOW_NORMAL and
/// THREAD_PRIORITY_NORMAL. For POSIX compatible OSes this is the "nice" value.
std::optional<int32_t> thread_priority;
};

} // namespace rosbag2_compression
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#include <algorithm>
#include <chrono>
#include <cstring>
#include <functional>
#include <memory>
#include <stdexcept>
Expand All @@ -33,6 +34,12 @@
#include "rosbag2_storage/storage_interfaces/read_write_interface.hpp"

#include "logging.hpp"
#ifdef _WIN32
#include <windows.h>
#else
#include <unistd.h>
#include <sys/resource.h>
#endif

namespace rosbag2_compression
{
Expand Down Expand Up @@ -62,6 +69,45 @@ SequentialCompressionWriter::~SequentialCompressionWriter()

void SequentialCompressionWriter::compression_thread_fn()
{
if (compression_options_.thread_priority) {
#ifdef _WIN32
// This must match THREAD_PRIORITY_IDLE, THREAD_PRIORITY_LOWEST...
int wanted_thread_priority = *compression_options_.thread_priority;
if (!SetThreadPriority(GetCurrentThread(), wanted_thread_priority)) {
ROSBAG2_COMPRESSION_LOG_WARN_STREAM(
"Could not set thread priority of compression thread to: " << wanted_thread_priority <<
". Error code: " << GetLastError());
} else {
auto detected_thread_priority = GetThreadPriority(GetCurrentThread());
if (detected_thread_priority == THREAD_PRIORITY_ERROR_RETURN) {
ROSBAG2_COMPRESSION_LOG_WARN_STREAM(
"Failed to get current thread priority. Error code: " << GetLastError());
} else if (wanted_thread_priority != detected_thread_priority) {
ROSBAG2_COMPRESSION_LOG_WARN_STREAM(
"Could not set thread priority of compression thread to: " <<
wanted_thread_priority << ". Detected thread priority: " << detected_thread_priority);
}
}
#else
int wanted_nice_value = *compression_options_.thread_priority;

errno = 0;
int cur_nice_value = getpriority(PRIO_PROCESS, 0);
if (cur_nice_value == -1 && errno != 0) {
ROSBAG2_COMPRESSION_LOG_WARN_STREAM(
"Could not set nice value of compression thread to: " << wanted_nice_value <<
" : Could not determine cur nice value");
} else {
int new_nice_value = nice(wanted_nice_value - cur_nice_value);
if ((new_nice_value == -1 && errno != 0)) {
ROSBAG2_COMPRESSION_LOG_WARN_STREAM(
"Could not set nice value of compression thread to: " << wanted_nice_value <<
". Error : " << std::strerror(errno));
}
}
#endif
}

// Every thread needs to have its own compression context for thread safety.
auto compressor = compression_factory_->create_compressor(
compression_options_.compression_format);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// Copyright 2023 Open Source Robotics Foundation, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef ROSBAG2_COMPRESSION__FAKE_COMPRESSION_FACTORY_HPP_
#define ROSBAG2_COMPRESSION__FAKE_COMPRESSION_FACTORY_HPP_

#include <memory>
#include <string>

#include "rosbag2_compression/compression_factory.hpp"
#include "fake_compressor.hpp"

class ROSBAG2_COMPRESSION_EXPORT FakeCompressionFactory
: public rosbag2_compression::CompressionFactory
{
public:
FakeCompressionFactory() = delete;

~FakeCompressionFactory() override = default;

explicit FakeCompressionFactory(int & detected_thread_priority)
: detected_thread_priority_(detected_thread_priority) {}

std::shared_ptr<rosbag2_compression::BaseCompressorInterface>
create_compressor(const std::string & /*compression_format*/) override
{
return std::make_shared<FakeCompressor>(detected_thread_priority_);
}

private:
int & detected_thread_priority_;
};

#endif // ROSBAG2_COMPRESSION__FAKE_COMPRESSION_FACTORY_HPP_
18 changes: 17 additions & 1 deletion rosbag2_compression/test/rosbag2_compression/fake_compressor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,27 @@
// limitations under the License.

#include <string>
#ifdef _WIN32
#include <windows.h>
#else
#include <sys/resource.h>
#endif

#include "pluginlib/class_list_macros.hpp"

#include "fake_compressor.hpp"

FakeCompressor::FakeCompressor(int & detected_thread_priority)
{
#ifndef _WIN32
int cur_nice_value = getpriority(PRIO_PROCESS, 0);
if (cur_nice_value != -1 && errno == 0) {
detected_thread_priority = cur_nice_value;
}
#else
detected_thread_priority = GetThreadPriority(GetCurrentThread());
#endif
}

std::string FakeCompressor::compress_uri(const std::string & uri)
{
return uri + "." + get_compression_identifier();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,14 @@
#include "rosbag2_compression/base_compressor_interface.hpp"
#include "rosbag2_storage/serialized_bag_message.hpp"

class FakeCompressor : public rosbag2_compression::BaseCompressorInterface
class ROSBAG2_COMPRESSION_EXPORT FakeCompressor : public rosbag2_compression::
BaseCompressorInterface
{
public:
FakeCompressor() = default;

explicit FakeCompressor(int & detected_thread_priority);

std::string compress_uri(const std::string & uri) override;

void compress_serialized_bag_message(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,14 @@
#include "mock_storage_factory.hpp"

#include "mock_compression_factory.hpp"
#include "fake_compression_factory.hpp"

#ifdef _WIN32
#include <windows.h>
#else
#include <sys/time.h>
#include <sys/resource.h>
#endif

using namespace testing; // NOLINT

Expand Down Expand Up @@ -143,13 +151,15 @@ class SequentialCompressionWriterTest : public TestWithParam<uint64_t>

const uint64_t kDefaultCompressionQueueSize = 1;
const uint64_t kDefaultCompressionQueueThreads = 4;
const std::optional<int32_t> kDefaultCompressionQueueThreadsPriority = std::nullopt;
};

TEST_F(SequentialCompressionWriterTest, open_throws_on_empty_storage_options_uri)
{
rosbag2_compression::CompressionOptions compression_options{
DefaultTestCompressor, rosbag2_compression::CompressionMode::FILE,
kDefaultCompressionQueueSize, kDefaultCompressionQueueThreads};
kDefaultCompressionQueueSize, kDefaultCompressionQueueThreads,
kDefaultCompressionQueueThreadsPriority};
initializeWriter(compression_options);

EXPECT_THROW(
Expand All @@ -163,7 +173,8 @@ TEST_F(SequentialCompressionWriterTest, open_throws_on_bad_compression_format)
{
rosbag2_compression::CompressionOptions compression_options{
"bad_format", rosbag2_compression::CompressionMode::FILE,
kDefaultCompressionQueueSize, kDefaultCompressionQueueThreads};
kDefaultCompressionQueueSize, kDefaultCompressionQueueThreads,
kDefaultCompressionQueueThreadsPriority};
initializeWriter(compression_options);

EXPECT_THROW(
Expand All @@ -175,7 +186,8 @@ TEST_F(SequentialCompressionWriterTest, open_throws_on_invalid_splitting_size)
{
rosbag2_compression::CompressionOptions compression_options{
DefaultTestCompressor, rosbag2_compression::CompressionMode::FILE,
kDefaultCompressionQueueSize, kDefaultCompressionQueueThreads};
kDefaultCompressionQueueSize, kDefaultCompressionQueueThreads,
kDefaultCompressionQueueThreadsPriority};

// Set minimum file size greater than max bagfile size option
const uint64_t min_split_file_size = 10;
Expand All @@ -196,7 +208,8 @@ TEST_F(SequentialCompressionWriterTest, open_succeeds_on_supported_compression_f
{
rosbag2_compression::CompressionOptions compression_options{
DefaultTestCompressor, rosbag2_compression::CompressionMode::FILE,
kDefaultCompressionQueueSize, kDefaultCompressionQueueThreads};
kDefaultCompressionQueueSize, kDefaultCompressionQueueThreads,
kDefaultCompressionQueueThreadsPriority};
initializeWriter(compression_options);

auto tmp_dir = rcpputils::fs::temp_directory_path() / "path_not_empty";
Expand All @@ -211,7 +224,8 @@ TEST_F(SequentialCompressionWriterTest, writer_calls_create_compressor)
{
rosbag2_compression::CompressionOptions compression_options{
DefaultTestCompressor, rosbag2_compression::CompressionMode::FILE,
kDefaultCompressionQueueSize, kDefaultCompressionQueueThreads};
kDefaultCompressionQueueSize, kDefaultCompressionQueueThreads,
kDefaultCompressionQueueThreadsPriority};
auto compression_factory = std::make_unique<StrictMock<MockCompressionFactory>>();
EXPECT_CALL(*compression_factory, create_compressor(_)).Times(1);

Expand All @@ -235,7 +249,8 @@ TEST_F(SequentialCompressionWriterTest, writer_creates_correct_metadata_relative
DefaultTestCompressor,
rosbag2_compression::CompressionMode::FILE,
kDefaultCompressionQueueSize,
kDefaultCompressionQueueThreads
kDefaultCompressionQueueThreads,
kDefaultCompressionQueueThreadsPriority
};

initializeFakeFileStorage();
Expand Down Expand Up @@ -277,7 +292,8 @@ TEST_F(SequentialCompressionWriterTest, writer_call_metadata_update_on_open_and_
DefaultTestCompressor,
rosbag2_compression::CompressionMode::MESSAGE,
0,
kDefaultCompressionQueueThreads
kDefaultCompressionQueueThreads,
kDefaultCompressionQueueThreadsPriority
};

initializeFakeFileStorage();
Expand Down Expand Up @@ -315,7 +331,8 @@ TEST_F(SequentialCompressionWriterTest, writer_call_metadata_update_on_bag_split
DefaultTestCompressor,
rosbag2_compression::CompressionMode::MESSAGE,
0,
kDefaultCompressionQueueThreads
kDefaultCompressionQueueThreads,
kDefaultCompressionQueueThreadsPriority
};

initializeFakeFileStorage();
Expand Down Expand Up @@ -363,7 +380,8 @@ TEST_P(SequentialCompressionWriterTest, writer_writes_with_compression_queue_siz
DefaultTestCompressor,
rosbag2_compression::CompressionMode::MESSAGE,
kCompressionQueueSize,
kDefaultCompressionQueueThreads
kDefaultCompressionQueueThreads,
kDefaultCompressionQueueThreadsPriority
};

initializeFakeFileStorage();
Expand All @@ -384,6 +402,60 @@ TEST_P(SequentialCompressionWriterTest, writer_writes_with_compression_queue_siz
EXPECT_EQ(fake_storage_size_, kNumMessagesToWrite);
}

TEST_P(SequentialCompressionWriterTest, writer_sets_threads_priority)
{
const std::string test_topic_name = "test_topic";
const std::string test_topic_type = "test_msgs/BasicTypes";
const uint64_t kCompressionQueueSize = GetParam();
#ifndef _WIN32
const int32_t wanted_thread_priority = 10;
errno = 0;
int cur_nice_value = getpriority(PRIO_PROCESS, 0);
ASSERT_TRUE(!(cur_nice_value == -1 && errno != 0));
#else
const int32_t wanted_thread_priority = THREAD_PRIORITY_LOWEST;
auto current_thread_priority = GetThreadPriority(GetCurrentThread());
ASSERT_NE(current_thread_priority, THREAD_PRIORITY_ERROR_RETURN);
ASSERT_NE(current_thread_priority, wanted_thread_priority);
#endif

// queue size should be 0 or at least the number of remaining messages to prevent message loss
rosbag2_compression::CompressionOptions compression_options {
DefaultTestCompressor,
rosbag2_compression::CompressionMode::MESSAGE,
kCompressionQueueSize,
kDefaultCompressionQueueThreads,
wanted_thread_priority
};

#ifndef _WIN32
// nice values are in the range from -20 to +19, so this value will never be read
int32_t detected_thread_priority = 100;
#else
int32_t detected_thread_priority = THREAD_PRIORITY_ERROR_RETURN;
#endif

initializeFakeFileStorage();
initializeWriter(
compression_options,
std::make_unique<FakeCompressionFactory>(detected_thread_priority));

writer_->open(tmp_dir_storage_options_);
writer_->create_topic({test_topic_name, test_topic_type, "", {}, ""});

auto message = std::make_shared<rosbag2_storage::SerializedBagMessage>();
message->topic_name = test_topic_name;

const size_t kNumMessagesToWrite = 5;
for (size_t i = 0; i < kNumMessagesToWrite; i++) {
writer_->write(message);
}
writer_.reset(); // reset will call writer destructor

EXPECT_EQ(detected_thread_priority, *compression_options.thread_priority);
EXPECT_EQ(fake_storage_size_, kNumMessagesToWrite);
}

INSTANTIATE_TEST_SUITE_P(
SequentialCompressionWriterTestQueueSizes,
SequentialCompressionWriterTest,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ void WriterBenchmark::create_writer()
if (!bag_config_.compression_format.empty()) {
rosbag2_compression::CompressionOptions compression_options{
bag_config_.compression_format, rosbag2_compression::CompressionMode::MESSAGE,
bag_config_.compression_queue_size, bag_config_.compression_threads};
bag_config_.compression_queue_size, bag_config_.compression_threads, std::nullopt};

writer_ = std::make_unique<rosbag2_compression::SequentialCompressionWriter>(
compression_options);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ struct RecordOptions
std::string compression_format = "";
uint64_t compression_queue_size = 1;
uint64_t compression_threads = 0;
int32_t compression_threads_priority = 0;
std::unordered_map<std::string, rclcpp::QoS> topic_qos_profile_overrides{};
bool include_hidden_topics = false;
bool include_unpublished_topics = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,10 @@ RecordOptions get_record_options_from_node_params(rclcpp::Node & node)
node, "record.compression_threads", 0, std::numeric_limits<int64_t>::max(),
record_options.compression_threads);

record_options.compression_threads_priority = param_utils::declare_integer_node_params<int32_t>(
node, "record.compression_threads_priority", std::numeric_limits<int32_t>::min(),
std::numeric_limits<int32_t>::max(), record_options.compression_threads_priority);

std::string qos_profile_overrides_path =
node.declare_parameter<std::string>("record.qos_profile_overrides_path", "");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ std::unique_ptr<rosbag2_cpp::Writer> ReaderWriterFactory::make_writer(
record_options.compression_format,
rosbag2_compression::compression_mode_from_string(record_options.compression_mode),
record_options.compression_queue_size,
record_options.compression_threads
record_options.compression_threads,
record_options.compression_threads_priority,
};
if (compression_options.compression_threads < 1) {
compression_options.compression_threads = std::thread::hardware_concurrency();
Expand Down
Loading

0 comments on commit cea3fb0

Please sign in to comment.