diff --git a/rmw_zenoh_cpp/src/detail/buffer_pool.hpp b/rmw_zenoh_cpp/src/detail/buffer_pool.hpp new file mode 100644 index 00000000..156fa782 --- /dev/null +++ b/rmw_zenoh_cpp/src/detail/buffer_pool.hpp @@ -0,0 +1,74 @@ +// Copyright 2024 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef DETAIL__BUFFER_POOL_HPP_ +#define DETAIL__BUFFER_POOL_HPP_ + +#include +#include +#include + +#include "rcutils/allocator.h" + +// FIXME(fuzzypixelz): indeed, we leak all allocated buffers ;) +class BufferPool +{ +public: + struct Buffer + { + uint8_t *data; + size_t size; + }; + + BufferPool() = default; + + Buffer allocate(rcutils_allocator_t *allocator, size_t size) + { + std::lock_guard guard(mutex_); + + if (buffers_.empty()) { + uint8_t *data = static_cast(allocator->allocate(size, allocator->state)); + if (data == nullptr) { + return {}; + } + return Buffer {data, size}; + } else { + Buffer buffer = buffers_.back(); + buffers_.pop_back(); + if (buffer.size < size) { + uint8_t *data = static_cast(allocator->reallocate( + buffer.data, size, allocator->state)); + if (data == nullptr) { + return {}; + } + buffer.data = data; + buffer.size = size; + } + return buffer; + } + } + + void + deallocate(Buffer buffer) + { + std::lock_guard guard(mutex_); + buffers_.push_back(buffer); + } + +private: + std::vector buffers_; + std::mutex mutex_; +}; + +#endif // DETAIL__BUFFER_POOL_HPP_ diff --git a/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp index a2fdaf5e..17932867 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp @@ -24,6 +24,7 @@ #include "graph_cache.hpp" #include "rmw_node_data.hpp" +#include "buffer_pool.hpp" #include "rmw/ret_types.h" #include "rmw/types.h" @@ -92,6 +93,9 @@ struct rmw_context_impl_s final // Forward declaration class Data; + // Pool of serialization buffers. + BufferPool serialization_buffer_pool; + private: std::shared_ptr data_{nullptr}; }; diff --git a/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp index 7d83eec6..e3595b08 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp @@ -210,24 +210,21 @@ rmw_ret_t PublisherData::publish( type_support_impl_); // To store serialized message byte array. - char * msg_bytes = nullptr; + uint8_t * msg_bytes = nullptr; rcutils_allocator_t * allocator = &rmw_node_->context->options.allocator; - auto always_free_msg_bytes = rcpputils::make_scope_exit( - [&msg_bytes, allocator]() { - if (msg_bytes) { - allocator->deallocate(msg_bytes, allocator->state); - } - }); + rmw_context_impl_s *context_impl = static_cast(rmw_node_->data); - // Get memory from the allocator. - msg_bytes = static_cast(allocator->allocate(max_data_length, allocator->state)); + // Get memory from the serialization buffer pool. + BufferPool::Buffer serialization_buffer = + context_impl->serialization_buffer_pool.allocate(allocator, max_data_length); + msg_bytes = serialization_buffer.data; RMW_CHECK_FOR_NULL_WITH_MSG( msg_bytes, "bytes for message is null", return RMW_RET_BAD_ALLOC); // Object that manages the raw buffer - eprosima::fastcdr::FastBuffer fastbuffer(msg_bytes, max_data_length); + eprosima::fastcdr::FastBuffer fastbuffer(reinterpret_cast(msg_bytes), max_data_length); // Object that serializes the data rmw_zenoh_cpp::Cdr ser(fastbuffer); @@ -251,11 +248,14 @@ rmw_ret_t PublisherData::publish( sequence_number_++, entity_->copy_gid()); + auto delete_bytes = [buffer_pool = &context_impl->serialization_buffer_pool, + buffer = serialization_buffer](uint8_t * data){ + assert(buffer.data == data); + buffer_pool->deallocate(buffer); + }; + // TODO(ahcorde): shmbuf - std::vector raw_data( - reinterpret_cast(msg_bytes), - reinterpret_cast(msg_bytes) + data_length); - zenoh::Bytes payload(std::move(raw_data)); + zenoh::Bytes payload(msg_bytes, data_length, delete_bytes); pub_.put(std::move(payload), std::move(options), &result); if (result != Z_OK) {