From 7ca544b556c2622f2314457e51d93e6a420d1c3c Mon Sep 17 00:00:00 2001 From: Mahmoud Mazouz Date: Mon, 16 Dec 2024 12:22:58 +0100 Subject: [PATCH] Recycle serialization buffers on transmission Adds a LIFO buffer pool in the context to reuse buffers allocated on serialization. The aim is not (only) to avoid the overhead of dynamic allocation but rather to enhance the cache locality of serialization buffers. --- rmw_zenoh_cpp/src/detail/buffer_pool.hpp | 74 ++++ .../src/detail/rmw_context_impl_s.hpp | 4 + .../src/detail/rmw_publisher_data.cpp | 26 +- .../src/detail/rmw_subscription_data.cpp | 370 +++++++++--------- 4 files changed, 272 insertions(+), 202 deletions(-) create mode 100644 rmw_zenoh_cpp/src/detail/buffer_pool.hpp diff --git a/rmw_zenoh_cpp/src/detail/buffer_pool.hpp b/rmw_zenoh_cpp/src/detail/buffer_pool.hpp new file mode 100644 index 00000000..156fa782 --- /dev/null +++ b/rmw_zenoh_cpp/src/detail/buffer_pool.hpp @@ -0,0 +1,74 @@ +// Copyright 2024 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef DETAIL__BUFFER_POOL_HPP_ +#define DETAIL__BUFFER_POOL_HPP_ + +#include +#include +#include + +#include "rcutils/allocator.h" + +// FIXME(fuzzypixelz): indeed, we leak all allocated buffers ;) +class BufferPool +{ +public: + struct Buffer + { + uint8_t *data; + size_t size; + }; + + BufferPool() = default; + + Buffer allocate(rcutils_allocator_t *allocator, size_t size) + { + std::lock_guard guard(mutex_); + + if (buffers_.empty()) { + uint8_t *data = static_cast(allocator->allocate(size, allocator->state)); + if (data == nullptr) { + return {}; + } + return Buffer {data, size}; + } else { + Buffer buffer = buffers_.back(); + buffers_.pop_back(); + if (buffer.size < size) { + uint8_t *data = static_cast(allocator->reallocate( + buffer.data, size, allocator->state)); + if (data == nullptr) { + return {}; + } + buffer.data = data; + buffer.size = size; + } + return buffer; + } + } + + void + deallocate(Buffer buffer) + { + std::lock_guard guard(mutex_); + buffers_.push_back(buffer); + } + +private: + std::vector buffers_; + std::mutex mutex_; +}; + +#endif // DETAIL__BUFFER_POOL_HPP_ diff --git a/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp index a2fdaf5e..17932867 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp @@ -24,6 +24,7 @@ #include "graph_cache.hpp" #include "rmw_node_data.hpp" +#include "buffer_pool.hpp" #include "rmw/ret_types.h" #include "rmw/types.h" @@ -92,6 +93,9 @@ struct rmw_context_impl_s final // Forward declaration class Data; + // Pool of serialization buffers. + BufferPool serialization_buffer_pool; + private: std::shared_ptr data_{nullptr}; }; diff --git a/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp index 7d83eec6..641f162d 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp @@ -214,20 +214,17 @@ rmw_ret_t PublisherData::publish( rcutils_allocator_t * allocator = &rmw_node_->context->options.allocator; - auto always_free_msg_bytes = rcpputils::make_scope_exit( - [&msg_bytes, allocator]() { - if (msg_bytes) { - allocator->deallocate(msg_bytes, allocator->state); - } - }); + rmw_context_impl_s *context_impl = static_cast(rmw_node_->data); - // Get memory from the allocator. - msg_bytes = static_cast(allocator->allocate(max_data_length, allocator->state)); + // Get memory from the serialization buffer pool. + BufferPool::Buffer serialization_buffer = + context_impl->serialization_buffer_pool.allocate(allocator, max_data_length); + msg_bytes = serialization_buffer.data; RMW_CHECK_FOR_NULL_WITH_MSG( msg_bytes, "bytes for message is null", return RMW_RET_BAD_ALLOC); // Object that manages the raw buffer - eprosima::fastcdr::FastBuffer fastbuffer(msg_bytes, max_data_length); + eprosima::fastcdr::FastBuffer fastbuffer(reinterpret_cast(msg_bytes), max_data_length); // Object that serializes the data rmw_zenoh_cpp::Cdr ser(fastbuffer); @@ -251,11 +248,14 @@ rmw_ret_t PublisherData::publish( sequence_number_++, entity_->copy_gid()); + auto delete_bytes = [buffer_pool = &context_impl->serialization_buffer_pool, + buffer = serialization_buffer](uint8_t * data){ + assert(buffer.data == data); + buffer_pool->deallocate(buffer); + }; + // TODO(ahcorde): shmbuf - std::vector raw_data( - reinterpret_cast(msg_bytes), - reinterpret_cast(msg_bytes) + data_length); - zenoh::Bytes payload(std::move(raw_data)); + zenoh::Bytes payload(msg_bytes, data_length, delete_bytes); pub_.put(std::move(payload), std::move(options), &result); if (result != Z_OK) { diff --git a/rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp index 5acbd909..9a49f11d 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp @@ -42,7 +42,7 @@ namespace rmw_zenoh_cpp { -///============================================================================= + ///============================================================================= SubscriptionData::Message::Message( std::vector && p, uint64_t recv_ts, @@ -51,100 +51,99 @@ SubscriptionData::Message::Message( { } -///============================================================================= + ///============================================================================= SubscriptionData::Message::~Message() { } -///============================================================================= + ///============================================================================= std::shared_ptr SubscriptionData::make( std::shared_ptr session, std::shared_ptr graph_cache, - const rmw_node_t * const node, + const rmw_node_t *const node, liveliness::NodeInfo node_info, std::size_t node_id, std::size_t subscription_id, const std::string & topic_name, - const rosidl_message_type_support_t * type_support, - const rmw_qos_profile_t * qos_profile) + const rosidl_message_type_support_t *type_support, + const rmw_qos_profile_t *qos_profile) { rmw_qos_profile_t adapted_qos_profile = *qos_profile; rmw_ret_t ret = QoS::get().best_available_qos( - node, topic_name.c_str(), &adapted_qos_profile, rmw_get_publishers_info_by_topic); + node, topic_name.c_str(), &adapted_qos_profile, rmw_get_publishers_info_by_topic); if (RMW_RET_OK != ret) { return nullptr; } - rcutils_allocator_t * allocator = &node->context->options.allocator; + rcutils_allocator_t *allocator = &node->context->options.allocator; - const rosidl_type_hash_t * type_hash = type_support->get_type_hash_func(type_support); + const rosidl_type_hash_t *type_hash = type_support->get_type_hash_func(type_support); auto callbacks = static_cast(type_support->data); auto message_type_support = std::make_unique(callbacks); - // Convert the type hash to a string so that it can be included in the keyexpr. - char * type_hash_c_str = nullptr; + // Convert the type hash to a string so that it can be included in the keyexpr. + char *type_hash_c_str = nullptr; rcutils_ret_t stringify_ret = rosidl_stringify_type_hash( - type_hash, - *allocator, - &type_hash_c_str); + type_hash, + *allocator, + &type_hash_c_str); if (RCUTILS_RET_BAD_ALLOC == stringify_ret) { RMW_SET_ERROR_MSG("Failed to allocate type_hash_c_str."); return nullptr; } auto free_type_hash_c_str = rcpputils::make_scope_exit( - [&allocator, &type_hash_c_str]() { + [&allocator, &type_hash_c_str]() + { allocator->deallocate(type_hash_c_str, allocator->state); - }); + }); - // Everything above succeeded and is setup properly. Now declare a subscriber - // with Zenoh; after this, callbacks may come in at any time. + // Everything above succeeded and is setup properly. Now declare a subscriber + // with Zenoh; after this, callbacks may come in at any time. std::size_t domain_id = node_info.domain_id_; auto entity = liveliness::Entity::make( - session->get_zid(), - std::to_string(node_id), - std::to_string(subscription_id), - liveliness::EntityType::Subscription, - std::move(node_info), - liveliness::TopicInfo{ + session->get_zid(), + std::to_string(node_id), + std::to_string(subscription_id), + liveliness::EntityType::Subscription, + std::move(node_info), + liveliness::TopicInfo{ std::move(domain_id), topic_name, message_type_support->get_name(), type_hash_c_str, - adapted_qos_profile} - ); + adapted_qos_profile}); if (entity == nullptr) { RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Unable to generate keyexpr for liveliness token for the subscription %s.", - topic_name.c_str()); + "rmw_zenoh_cpp", + "Unable to generate keyexpr for liveliness token for the subscription %s.", + topic_name.c_str()); return nullptr; } auto sub_data = std::shared_ptr( - new SubscriptionData{ + new SubscriptionData{ node, graph_cache, std::move(entity), std::move(session), type_support->data, - std::move(message_type_support) - }); + std::move(message_type_support)}); if (!sub_data->init()) { - // init() already set the error + // init() already set the error return nullptr; } return sub_data; } -///============================================================================= + ///============================================================================= SubscriptionData::SubscriptionData( - const rmw_node_t * rmw_node, + const rmw_node_t *rmw_node, std::shared_ptr graph_cache, std::shared_ptr entity, std::shared_ptr session, - const void * type_support_impl, + const void *type_support_impl, std::unique_ptr type_support) : rmw_node_(rmw_node), graph_cache_(std::move(graph_cache)), @@ -161,8 +160,8 @@ SubscriptionData::SubscriptionData( events_mgr_ = std::make_shared(); } -// We have to use an "init" function here, rather than do this in the constructor, because we use -// enable_shared_from_this, which is not available in constructors. + // We have to use an "init" function here, rather than do this in the constructor, because we use + // enable_shared_from_this, which is not available in constructors. bool SubscriptionData::init() { zenoh::ZResult result; @@ -172,30 +171,30 @@ bool SubscriptionData::init() return false; } - rmw_context_impl_t * context_impl = static_cast(rmw_node_->context->impl); + rmw_context_impl_t *context_impl = static_cast(rmw_node_->context->impl); sess_ = context_impl->session(); - // Instantiate the subscription with suitable options depending on the - // adapted_qos_profile. - // TODO(Yadunund): Rely on a separate function to return the sub - // as we start supporting more qos settings. + // Instantiate the subscription with suitable options depending on the + // adapted_qos_profile. + // TODO(Yadunund): Rely on a separate function to return the sub + // as we start supporting more qos settings. if (entity_->topic_info()->qos_.durability == RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL) { zenoh::ext::SessionExt::QueryingSubscriberOptions sub_options = zenoh::ext::SessionExt::QueryingSubscriberOptions::create_default(); const std::string selector = "*/" + entity_->topic_info()->topic_keyexpr_; zenoh::KeyExpr selector_ke(selector); sub_options.query_keyexpr = std::move(selector_ke); - // Tell the PublicationCache's Queryable that the query accepts any key expression as a reply. - // By default a query accepts only replies that matches its query selector. - // This allows us to selectively query certain PublicationCaches when defining the - // set_querying_subscriber_callback below. + // Tell the PublicationCache's Queryable that the query accepts any key expression as a reply. + // By default a query accepts only replies that matches its query selector. + // This allows us to selectively query certain PublicationCaches when defining the + // set_querying_subscriber_callback below. sub_options.query_accept_replies = ZC_REPLY_KEYEXPR_ANY; - // As this initial query is now using a "*", the query target is not COMPLETE. + // As this initial query is now using a "*", the query target is not COMPLETE. sub_options.query_target = Z_QUERY_TARGET_ALL; - // We set consolidation to none as we need to receive transient local messages - // from a number of publishers. Eg: To receive TF data published over /tf_static - // by various publishers. + // We set consolidation to none as we need to receive transient local messages + // from a number of publishers. Eg: To receive TF data published over /tf_static + // by various publishers. sub_options.query_consolidation = zenoh::QueryConsolidation(zenoh::ConsolidationMode::Z_CONSOLIDATION_MODE_NONE); @@ -206,17 +205,17 @@ bool SubscriptionData::init() auto sub_data = data_wp.lock(); if (sub_data == nullptr) { RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Unable to obtain SubscriptionData from data for %s.", - sample.get_keyexpr().as_string_view()); + "rmw_zenoh_cpp", + "Unable to obtain SubscriptionData from data for %s.", + sample.get_keyexpr().as_string_view()); return; } auto attachment = sample.get_attachment(); if (!attachment.has_value()) { RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Unable to obtain attachment") + "rmw_zenoh_cpp", + "Unable to obtain attachment") return; } @@ -224,34 +223,33 @@ bool SubscriptionData::init() AttachmentData attachment_data(attachment_value); sub_data->add_new_message( - std::make_unique( - sample.get_payload().as_vector(), - std::chrono::system_clock::now().time_since_epoch().count(), - std::move(attachment_data)), - std::string(sample.get_keyexpr().as_string_view())); - }, - zenoh::closures::none, - std::move(sub_options), - &result); + std::make_unique( + sample.get_payload().as_vector(), + std::chrono::system_clock::now().time_since_epoch().count(), + std::move(attachment_data)), + std::string(sample.get_keyexpr().as_string_view())); + }, + zenoh::closures::none, + std::move(sub_options), + &result); if (result != Z_OK) { RMW_SET_ERROR_MSG("unable to create zenoh subscription"); return false; } sub_ = std::move(sub); - // Register the querying subscriber with the graph cache to get latest - // messages from publishers that were discovered after their first publication. + // Register the querying subscriber with the graph cache to get latest + // messages from publishers that were discovered after their first publication. graph_cache_->set_querying_subscriber_callback( - entity_->topic_info().value().topic_keyexpr_, - entity_->keyexpr_hash(), + entity_->topic_info().value().topic_keyexpr_, + entity_->keyexpr_hash(), [data_wp](const std::string & queryable_prefix) -> void { auto sub_data = data_wp.lock(); if (sub_data == nullptr) { RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Unable to lock weak_ptr within querying subscription callback." - ); + "rmw_zenoh_cpp", + "Unable to lock weak_ptr within querying subscription callback."); return; } std::lock_guard lock(sub_data->mutex_); @@ -260,47 +258,43 @@ bool SubscriptionData::init() "/" + sub_data->entity_->topic_info().value().topic_keyexpr_; RMW_ZENOH_LOG_DEBUG_NAMED( - "rmw_zenoh_cpp", - "QueryingSubscriberCallback triggered over %s.", - selector.c_str() - ); + "rmw_zenoh_cpp", + "QueryingSubscriberCallback triggered over %s.", + selector.c_str()); zenoh::Session::GetOptions opts = zenoh::Session::GetOptions::create_default(); opts.timeout_ms = std::numeric_limits::max(); opts.consolidation = zenoh::ConsolidationMode::Z_CONSOLIDATION_MODE_NONE; opts.accept_replies = ZC_REPLY_KEYEXPR_ANY; zenoh::ZResult result; - std::get>(sub_data->sub_.value()).get( - zenoh::KeyExpr(selector), - std::move(opts), - &result); + std::get>(sub_data->sub_.value()).get(zenoh::KeyExpr( + selector), std::move(opts), &result); if (result != Z_OK) { RMW_SET_ERROR_MSG("unable to get querying subscriber."); return; } - } - ); + }); } else { zenoh::Session::SubscriberOptions sub_options = zenoh::Session::SubscriberOptions::create_default(); std::weak_ptr data_wp = shared_from_this(); zenoh::Subscriber sub = context_impl->session()->declare_subscriber( - sub_ke, - [data_wp](const zenoh::Sample & sample) { + sub_ke, + [data_wp](const zenoh::Sample & sample) + { auto sub_data = data_wp.lock(); if (sub_data == nullptr) { RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Unable to lock weak_ptr within querying subscription callback." - ); + "rmw_zenoh_cpp", + "Unable to lock weak_ptr within querying subscription callback."); return; } auto attachment = sample.get_attachment(); if (!attachment.has_value()) { RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Unable to obtain attachment") + "rmw_zenoh_cpp", + "Unable to obtain attachment") return; } auto payload = sample.get_payload().clone(); @@ -308,15 +302,15 @@ bool SubscriptionData::init() AttachmentData attachment_data(attachment_value); sub_data->add_new_message( - std::make_unique( - payload.as_vector(), - std::chrono::system_clock::now().time_since_epoch().count(), - std::move(attachment_data)), - std::string(sample.get_keyexpr().as_string_view())); - }, - zenoh::closures::none, - std::move(sub_options), - &result); + std::make_unique( + payload.as_vector(), + std::chrono::system_clock::now().time_since_epoch().count(), + std::move(attachment_data)), + std::string(sample.get_keyexpr().as_string_view())); + }, + zenoh::closures::none, + std::move(sub_options), + &result); if (result != Z_OK) { RMW_SET_ERROR_MSG("unable to create zenoh subscription"); return false; @@ -324,16 +318,16 @@ bool SubscriptionData::init() sub_ = std::move(sub); } - // Publish to the graph that a new subscription is in town. + // Publish to the graph that a new subscription is in town. std::string liveliness_keyexpr = entity_->liveliness_keyexpr(); token_ = context_impl->session()->liveliness_declare_token( - zenoh::KeyExpr(liveliness_keyexpr), - zenoh::Session::LivelinessDeclarationOptions::create_default(), - &result); + zenoh::KeyExpr(liveliness_keyexpr), + zenoh::Session::LivelinessDeclarationOptions::create_default(), + &result); if (result != Z_OK) { RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Unable to create liveliness token for the subscription."); + "rmw_zenoh_cpp", + "Unable to create liveliness token for the subscription."); return false; } @@ -342,51 +336,50 @@ bool SubscriptionData::init() return true; } -///============================================================================= + ///============================================================================= std::size_t SubscriptionData::keyexpr_hash() const { std::lock_guard lock(mutex_); return entity_->keyexpr_hash(); } -///============================================================================= + ///============================================================================= liveliness::TopicInfo SubscriptionData::topic_info() const { std::lock_guard lock(mutex_); return entity_->topic_info().value(); } -///============================================================================= + ///============================================================================= bool SubscriptionData::liveliness_is_valid() const { std::lock_guard lock(mutex_); - // The z_check function is now internal in zenoh-1.0.0 so we assume - // the liveliness token is still initialized as long as this entity has - // not been shutdown. + // The z_check function is now internal in zenoh-1.0.0 so we assume + // the liveliness token is still initialized as long as this entity has + // not been shutdown. return !is_shutdown_; } -///============================================================================= + ///============================================================================= std::shared_ptr SubscriptionData::events_mgr() const { std::lock_guard lock(mutex_); return events_mgr_; } -///============================================================================= + ///============================================================================= SubscriptionData::~SubscriptionData() { const rmw_ret_t ret = this->shutdown(); if (ret != RMW_RET_OK) { RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Error destructing publisher /%s.", - entity_->topic_info().value().name_.c_str() - ); + "rmw_zenoh_cpp", + "Error destructing publisher /%s.", + entity_->topic_info().value().name_.c_str()); } } -///============================================================================= + ///============================================================================= rmw_ret_t SubscriptionData::shutdown() { rmw_ret_t ret = RMW_RET_OK; @@ -395,43 +388,42 @@ rmw_ret_t SubscriptionData::shutdown() return ret; } - // Remove the registered callback from the GraphCache if any. + // Remove the registered callback from the GraphCache if any. graph_cache_->remove_querying_subscriber_callback( - entity_->topic_info().value().topic_keyexpr_, - entity_->keyexpr_hash() - ); - // Remove any event callbacks registered to this subscription. + entity_->topic_info().value().topic_keyexpr_, + entity_->keyexpr_hash()); + // Remove any event callbacks registered to this subscription. graph_cache_->remove_qos_event_callbacks(entity_->keyexpr_hash()); - // Unregister this subscription from the ROS graph. + // Unregister this subscription from the ROS graph. zenoh::ZResult result; std::move(token_).value().undeclare(&result); if (result != Z_OK) { RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Unable to undeclare liveliness token"); + "rmw_zenoh_cpp", + "Unable to undeclare liveliness token"); return RMW_RET_ERROR; } if (sub_.has_value()) { - zenoh::Subscriber * sub = std::get_if>(&sub_.value()); + zenoh::Subscriber *sub = std::get_if>(&sub_.value()); if (sub != nullptr) { std::move(*sub).undeclare(&result); if (result != Z_OK) { RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "failed to undeclare sub."); + "rmw_zenoh_cpp", + "failed to undeclare sub."); return RMW_RET_ERROR; } } else { - zenoh::ext::QueryingSubscriber * sub = + zenoh::ext::QueryingSubscriber *sub = std::get_if>(&sub_.value()); if (sub != nullptr) { std::move(*sub).undeclare(&result); if (result != Z_OK) { RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "failed to undeclare querying sub."); + "rmw_zenoh_cpp", + "failed to undeclare querying sub."); return RMW_RET_ERROR; } } @@ -444,16 +436,16 @@ rmw_ret_t SubscriptionData::shutdown() return ret; } -///============================================================================= + ///============================================================================= bool SubscriptionData::is_shutdown() const { std::lock_guard lock(mutex_); return is_shutdown_; } -///============================================================================= + ///============================================================================= bool SubscriptionData::queue_has_data_and_attach_condition_if_not( - rmw_wait_set_data_t * wait_set_data) + rmw_wait_set_data_t *wait_set_data) { std::lock_guard lock(mutex_); if (!message_queue_.empty()) { @@ -465,7 +457,7 @@ bool SubscriptionData::queue_has_data_and_attach_condition_if_not( return false; } -///============================================================================= + ///============================================================================= bool SubscriptionData::detach_condition_and_queue_is_empty() { std::lock_guard lock(mutex_); @@ -474,17 +466,17 @@ bool SubscriptionData::detach_condition_and_queue_is_empty() return message_queue_.empty(); } -///============================================================================= + ///============================================================================= rmw_ret_t SubscriptionData::take_one_message( - void * ros_message, - rmw_message_info_t * message_info, - bool * taken) + void *ros_message, + rmw_message_info_t *message_info, + bool *taken) { *taken = false; std::lock_guard lock(mutex_); if (is_shutdown_ || message_queue_.empty()) { - // This tells rcl that the check for a new message was done, but no messages have come in yet. + // This tells rcl that the check for a new message was done, but no messages have come in yet. return RMW_RET_OK; } std::unique_ptr msg_data = std::move(message_queue_.front()); @@ -494,21 +486,21 @@ rmw_ret_t SubscriptionData::take_one_message( if (payload_data.empty()) { RMW_ZENOH_LOG_DEBUG_NAMED( - "rmw_zenoh_cpp", - "SubscriptionData not able to get slice data"); + "rmw_zenoh_cpp", + "SubscriptionData not able to get slice data"); return RMW_RET_ERROR; } - // Object that manages the raw buffer + // Object that manages the raw buffer eprosima::fastcdr::FastBuffer fastbuffer( reinterpret_cast(const_cast(payload_data.data())), payload_data.size()); - // Object that serializes the data + // Object that serializes the data rmw_zenoh_cpp::Cdr deser(fastbuffer); if (!type_support_->deserialize_ros_message( - deser.get_cdr(), - ros_message, - type_support_impl_)) + deser.get_cdr(), + ros_message, + type_support_impl_)) { RMW_SET_ERROR_MSG("could not deserialize ROS message"); return RMW_RET_ERROR; @@ -518,13 +510,13 @@ rmw_ret_t SubscriptionData::take_one_message( message_info->source_timestamp = msg_data->attachment.source_timestamp(); message_info->received_timestamp = msg_data->recv_timestamp; message_info->publication_sequence_number = msg_data->attachment.sequence_number(); - // TODO(clalancette): fill in reception_sequence_number + // TODO(clalancette): fill in reception_sequence_number message_info->reception_sequence_number = 0; message_info->publisher_gid.implementation_identifier = rmw_zenoh_cpp::rmw_zenoh_identifier; memcpy( - message_info->publisher_gid.data, - msg_data->attachment.copy_gid().data(), - RMW_GID_STORAGE_SIZE); + message_info->publisher_gid.data, + msg_data->attachment.copy_gid().data(), + RMW_GID_STORAGE_SIZE); message_info->from_intra_process = false; } *taken = true; @@ -532,17 +524,17 @@ rmw_ret_t SubscriptionData::take_one_message( return RMW_RET_OK; } -///============================================================================= + ///============================================================================= rmw_ret_t SubscriptionData::take_serialized_message( - rmw_serialized_message_t * serialized_message, - bool * taken, - rmw_message_info_t * message_info) + rmw_serialized_message_t *serialized_message, + bool *taken, + rmw_message_info_t *message_info) { *taken = false; std::lock_guard lock(mutex_); if (is_shutdown_ || message_queue_.empty()) { - // This tells rcl that the check for a new message was done, but no messages have come in yet. + // This tells rcl that the check for a new message was done, but no messages have come in yet. return RMW_RET_OK; } std::unique_ptr msg_data = std::move(message_queue_.front()); @@ -552,22 +544,22 @@ rmw_ret_t SubscriptionData::take_serialized_message( if (payload_data.empty()) { RMW_ZENOH_LOG_DEBUG_NAMED( - "rmw_zenoh_cpp", - "SubscriptionData not able to get slice data"); + "rmw_zenoh_cpp", + "SubscriptionData not able to get slice data"); return RMW_RET_ERROR; } if (serialized_message->buffer_capacity < payload_data.size()) { rmw_ret_t ret = rmw_serialized_message_resize(serialized_message, payload_data.size()); if (ret != RMW_RET_OK) { - return ret; // Error message already set + return ret; // Error message already set } } serialized_message->buffer_length = payload_data.size(); memcpy( - serialized_message->buffer, - reinterpret_cast(const_cast(payload_data.data())), - payload_data.size()); + serialized_message->buffer, + reinterpret_cast(const_cast(payload_data.data())), + payload_data.size()); *taken = true; @@ -575,20 +567,20 @@ rmw_ret_t SubscriptionData::take_serialized_message( message_info->source_timestamp = msg_data->attachment.source_timestamp(); message_info->received_timestamp = msg_data->recv_timestamp; message_info->publication_sequence_number = msg_data->attachment.sequence_number(); - // TODO(clalancette): fill in reception_sequence_number + // TODO(clalancette): fill in reception_sequence_number message_info->reception_sequence_number = 0; message_info->publisher_gid.implementation_identifier = rmw_zenoh_cpp::rmw_zenoh_identifier; memcpy( - message_info->publisher_gid.data, - msg_data->attachment.copy_gid().data(), - RMW_GID_STORAGE_SIZE); + message_info->publisher_gid.data, + msg_data->attachment.copy_gid().data(), + RMW_GID_STORAGE_SIZE); message_info->from_intra_process = false; } return RMW_RET_OK; } -///============================================================================= + ///============================================================================= void SubscriptionData::add_new_message( std::unique_ptr msg, const std::string & topic_name) { @@ -600,31 +592,31 @@ void SubscriptionData::add_new_message( if (adapted_qos_profile.history != RMW_QOS_POLICY_HISTORY_KEEP_ALL && message_queue_.size() >= adapted_qos_profile.depth) { - // Log warning if message is discarded due to hitting the queue depth + // Log warning if message is discarded due to hitting the queue depth RMW_ZENOH_LOG_DEBUG_NAMED( - "rmw_zenoh_cpp", - "Message queue depth of %ld reached, discarding oldest message " - "for subscription for %s", - adapted_qos_profile.depth, - topic_name.c_str()); - - // If the adapted_qos_profile.depth is 0, the std::move command below will result - // in UB and the z_drop will segfault. We explicitly set the depth to a minimum of 1 - // in rmw_create_subscription() but to be safe, we only attempt to discard from the - // queue if it is non-empty. + "rmw_zenoh_cpp", + "Message queue depth of %ld reached, discarding oldest message " + "for subscription for %s", + adapted_qos_profile.depth, + topic_name.c_str()); + + // If the adapted_qos_profile.depth is 0, the std::move command below will result + // in UB and the z_drop will segfault. We explicitly set the depth to a minimum of 1 + // in rmw_create_subscription() but to be safe, we only attempt to discard from the + // queue if it is non-empty. if (!message_queue_.empty()) { std::unique_ptr old = std::move(message_queue_.front()); message_queue_.pop_front(); } } - // Check for messages lost if the new sequence number is not monotonically increasing. + // Check for messages lost if the new sequence number is not monotonically increasing. const size_t gid_hash = hash_gid(msg->attachment.copy_gid()); auto last_known_pub_it = last_known_published_msg_.find(gid_hash); if (last_known_pub_it != last_known_published_msg_.end()) { const int64_t seq_increment = std::abs( - msg->attachment.sequence_number() - - last_known_pub_it->second); + msg->attachment.sequence_number() - + last_known_pub_it->second); if (seq_increment > 1) { const size_t num_msg_lost = seq_increment - 1; total_messages_lost_ += num_msg_lost; @@ -632,16 +624,16 @@ void SubscriptionData::add_new_message( event_status->total_count_change = num_msg_lost; event_status->total_count = total_messages_lost_; events_mgr_->add_new_event( - ZENOH_EVENT_MESSAGE_LOST, - std::move(event_status)); + ZENOH_EVENT_MESSAGE_LOST, + std::move(event_status)); } } - // Always update the last known sequence number for the publisher. + // Always update the last known sequence number for the publisher. last_known_published_msg_[gid_hash] = msg->attachment.sequence_number(); message_queue_.emplace_back(std::move(msg)); - // Since we added new data, trigger user callback and guard condition if they are available + // Since we added new data, trigger user callback and guard condition if they are available data_callback_mgr_.trigger_callback(); if (wait_set_data_ != nullptr) { wait_set_data_->triggered = true; @@ -649,20 +641,20 @@ void SubscriptionData::add_new_message( } } -//============================================================================== + //============================================================================== void SubscriptionData::set_on_new_message_callback( rmw_event_callback_t callback, - const void * user_data) + const void *user_data) { std::lock_guard lock(mutex_); data_callback_mgr_.set_callback(user_data, std::move(callback)); } -//============================================================================== + //============================================================================== std::shared_ptr SubscriptionData::graph_cache() const { std::lock_guard lock(mutex_); return graph_cache_; } -} // namespace rmw_zenoh_cpp +} // namespace rmw_zenoh_cpp