From 4fcbd2bc869205b43c0f6cd1dba02cc1bc9048a7 Mon Sep 17 00:00:00 2001 From: Mahmoud Mazouz Date: Mon, 16 Dec 2024 14:08:51 +0100 Subject: [PATCH 01/11] Don't copy contiguous bytes on reception This uses the slices iterator API of zenoh-cpp to avoid unecessarily copying bytes into a vecotr, if and only if the bytes is made up of exactly one slice. --- rmw_zenoh_cpp/src/detail/payload.hpp | 88 +++++++++++++++++++ .../src/detail/rmw_subscription_data.cpp | 9 +- .../src/detail/rmw_subscription_data.hpp | 5 +- 3 files changed, 95 insertions(+), 7 deletions(-) create mode 100644 rmw_zenoh_cpp/src/detail/payload.hpp diff --git a/rmw_zenoh_cpp/src/detail/payload.hpp b/rmw_zenoh_cpp/src/detail/payload.hpp new file mode 100644 index 00000000..0afac726 --- /dev/null +++ b/rmw_zenoh_cpp/src/detail/payload.hpp @@ -0,0 +1,88 @@ +// Copyright 2024 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef DETAIL__PAYLOAD_HPP_ +#define DETAIL__PAYLOAD_HPP_ + +#include + +#include +#include +#include + +namespace rmw_zenoh_cpp +{ +///============================================================================= +class Payload +{ +public: + explicit Payload(const zenoh::Bytes & bytes) + { + auto slices = bytes.slice_iter(); + auto slice = slices.next(); + if (!slice.has_value()) { + bytes_ = nullptr; + } else { + if (!slices.next().has_value()) { + bytes_ = Contiguous {slice.value(), bytes.clone()}; + } else { + bytes_ = bytes.as_vector(); + } + } + } + + ~Payload() = default; + + const uint8_t * data() + { + if (std::holds_alternative(bytes_)) { + return nullptr; + } else if (std::holds_alternative(bytes_)) { + return std::get(bytes_).data(); + } else { + return std::get(bytes_).slice.data; + } + } + + size_t size() + { + if (std::holds_alternative(bytes_)) { + return 0; + } else if (std::holds_alternative(bytes_)) { + return std::get(bytes_).size(); + } else { + return std::get(bytes_).slice.len; + } + } + + bool empty() + { + return std::holds_alternative(bytes_); + } + +private: + struct Contiguous + { + zenoh::Slice slice; + zenoh::Bytes bytes; + }; + using NonContiguous = std::vector; + using Empty = std::nullptr_t; + // Is `std::vector` in case of a non-contiguous payload + // and `zenoh::Slice` plus a `zenoh::Bytes` otherwise. + std::variant bytes_; +}; +} // namespace rmw_zenoh_cpp + +#endif // DETAIL__PAYLOAD_HPP_ diff --git a/rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp index cce67b33..895b0f26 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp @@ -44,10 +44,10 @@ namespace rmw_zenoh_cpp { ///============================================================================= SubscriptionData::Message::Message( - std::vector && p, + const zenoh::Bytes & p, uint64_t recv_ts, AttachmentData && attachment_) -: payload(std::move(p)), recv_timestamp(recv_ts), attachment(std::move(attachment_)) +: payload(p), recv_timestamp(recv_ts), attachment(std::move(attachment_)) { } @@ -225,7 +225,7 @@ bool SubscriptionData::init() sub_data->add_new_message( std::make_unique( - sample.get_payload().as_vector(), + sample.get_payload(), std::chrono::system_clock::now().time_since_epoch().count(), std::move(attachment_data)), std::string(sample.get_keyexpr().as_string_view())); @@ -303,13 +303,12 @@ bool SubscriptionData::init() "Unable to obtain attachment") return; } - auto payload = sample.get_payload().clone(); auto attachment_value = attachment.value(); AttachmentData attachment_data(attachment_value); sub_data->add_new_message( std::make_unique( - payload.as_vector(), + sample.get_payload(), std::chrono::system_clock::now().time_since_epoch().count(), std::move(attachment_data)), std::string(sample.get_keyexpr().as_string_view())); diff --git a/rmw_zenoh_cpp/src/detail/rmw_subscription_data.hpp b/rmw_zenoh_cpp/src/detail/rmw_subscription_data.hpp index a3fab3f9..0855a5fb 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_subscription_data.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_subscription_data.hpp @@ -36,6 +36,7 @@ #include "attachment_helpers.hpp" #include "type_support_common.hpp" #include "zenoh_utils.hpp" +#include "payload.hpp" #include "rcutils/allocator.h" @@ -51,13 +52,13 @@ class SubscriptionData final : public std::enable_shared_from_this && p, + const zenoh::Bytes & bytes, uint64_t recv_ts, AttachmentData && attachment); ~Message(); - std::vector payload; + Payload payload; uint64_t recv_timestamp; AttachmentData attachment; }; From f7797f581376623ed3b2e620d3449cbbcb15f3e3 Mon Sep 17 00:00:00 2001 From: Mahmoud Mazouz Date: Wed, 18 Dec 2024 16:47:09 +0100 Subject: [PATCH 02/11] Sort includes Co-authored-by: Chris Lalancette Signed-off-by: Mahmoud Mazouz --- rmw_zenoh_cpp/src/detail/payload.hpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rmw_zenoh_cpp/src/detail/payload.hpp b/rmw_zenoh_cpp/src/detail/payload.hpp index 0afac726..23e98eed 100644 --- a/rmw_zenoh_cpp/src/detail/payload.hpp +++ b/rmw_zenoh_cpp/src/detail/payload.hpp @@ -17,8 +17,8 @@ #include -#include #include +#include #include namespace rmw_zenoh_cpp From 02aa50dc3bf2af65c437cf883e96b85c74608dd9 Mon Sep 17 00:00:00 2001 From: Mahmoud Mazouz Date: Wed, 18 Dec 2024 16:47:35 +0100 Subject: [PATCH 03/11] Fix zenoh-cpp include Co-authored-by: Chris Lalancette Signed-off-by: Mahmoud Mazouz --- rmw_zenoh_cpp/src/detail/payload.hpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rmw_zenoh_cpp/src/detail/payload.hpp b/rmw_zenoh_cpp/src/detail/payload.hpp index 23e98eed..4a521c58 100644 --- a/rmw_zenoh_cpp/src/detail/payload.hpp +++ b/rmw_zenoh_cpp/src/detail/payload.hpp @@ -15,7 +15,7 @@ #ifndef DETAIL__PAYLOAD_HPP_ #define DETAIL__PAYLOAD_HPP_ -#include +#include #include #include From 5643382487e417e400995bedd4556652c831dcdd Mon Sep 17 00:00:00 2001 From: Mahmoud Mazouz Date: Wed, 18 Dec 2024 17:20:02 +0100 Subject: [PATCH 04/11] Don't use auto type specifiers --- rmw_zenoh_cpp/src/detail/payload.hpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rmw_zenoh_cpp/src/detail/payload.hpp b/rmw_zenoh_cpp/src/detail/payload.hpp index 4a521c58..8ddcf7be 100644 --- a/rmw_zenoh_cpp/src/detail/payload.hpp +++ b/rmw_zenoh_cpp/src/detail/payload.hpp @@ -29,8 +29,8 @@ class Payload public: explicit Payload(const zenoh::Bytes & bytes) { - auto slices = bytes.slice_iter(); - auto slice = slices.next(); + zenoh::Bytes::SliceIterator slices = bytes.slice_iter(); + std::optional slice = slices.next(); if (!slice.has_value()) { bytes_ = nullptr; } else { From b173652db233fdb2f81ae4acc388cfbb0ec6c19b Mon Sep 17 00:00:00 2001 From: Mahmoud Mazouz Date: Wed, 18 Dec 2024 17:33:43 +0100 Subject: [PATCH 05/11] Remove unused `` includes --- rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp | 1 - rmw_zenoh_cpp/src/detail/rmw_subscription_data.hpp | 1 - 2 files changed, 2 deletions(-) diff --git a/rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp index 895b0f26..4133965b 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp @@ -24,7 +24,6 @@ #include #include #include -#include #include "attachment_helpers.hpp" #include "cdr.hpp" diff --git a/rmw_zenoh_cpp/src/detail/rmw_subscription_data.hpp b/rmw_zenoh_cpp/src/detail/rmw_subscription_data.hpp index 0855a5fb..bb033255 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_subscription_data.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_subscription_data.hpp @@ -25,7 +25,6 @@ #include #include #include -#include #include From 0704c35405cf5e00445aa1aacab280a0731f7d15 Mon Sep 17 00:00:00 2001 From: Mahmoud Mazouz Date: Thu, 19 Dec 2024 10:56:22 +0100 Subject: [PATCH 06/11] Explain lifetime of `Contiguous::slice` --- rmw_zenoh_cpp/src/detail/payload.hpp | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/rmw_zenoh_cpp/src/detail/payload.hpp b/rmw_zenoh_cpp/src/detail/payload.hpp index 8ddcf7be..34e069ef 100644 --- a/rmw_zenoh_cpp/src/detail/payload.hpp +++ b/rmw_zenoh_cpp/src/detail/payload.hpp @@ -29,6 +29,14 @@ class Payload public: explicit Payload(const zenoh::Bytes & bytes) { + // NOTE(fuzzypixelz): `zenoh::Bytes` is an list of reference-couted buffers. When the list of + // buffers contains exactly one element, it is not necessary to concatenate the list of buffers. + // In this case, we store a clone of the bytes object to maintain a non-zero reference-count on + // the buffer. This ensures that the slice into said buffer stays valid until we drop our copy + // of the bytes object (at the very least). This case corresponds to the `Contiguous` + // alternative of the `bytes_` variant and aims to optimize away a memcpy during "session-local" + // communication. + zenoh::Bytes::SliceIterator slices = bytes.slice_iter(); std::optional slice = slices.next(); if (!slice.has_value()) { From 1bcc0ceecf7aa50818a83f1a05a0d03fcb614679 Mon Sep 17 00:00:00 2001 From: Mahmoud Mazouz Date: Thu, 19 Dec 2024 15:51:20 +0100 Subject: [PATCH 07/11] Add missing includes --- rmw_zenoh_cpp/src/detail/payload.hpp | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/rmw_zenoh_cpp/src/detail/payload.hpp b/rmw_zenoh_cpp/src/detail/payload.hpp index 34e069ef..e8f661fd 100644 --- a/rmw_zenoh_cpp/src/detail/payload.hpp +++ b/rmw_zenoh_cpp/src/detail/payload.hpp @@ -17,6 +17,8 @@ #include +#include +#include #include #include #include @@ -29,14 +31,6 @@ class Payload public: explicit Payload(const zenoh::Bytes & bytes) { - // NOTE(fuzzypixelz): `zenoh::Bytes` is an list of reference-couted buffers. When the list of - // buffers contains exactly one element, it is not necessary to concatenate the list of buffers. - // In this case, we store a clone of the bytes object to maintain a non-zero reference-count on - // the buffer. This ensures that the slice into said buffer stays valid until we drop our copy - // of the bytes object (at the very least). This case corresponds to the `Contiguous` - // alternative of the `bytes_` variant and aims to optimize away a memcpy during "session-local" - // communication. - zenoh::Bytes::SliceIterator slices = bytes.slice_iter(); std::optional slice = slices.next(); if (!slice.has_value()) { From 2422c5e71ac02d43b5136652ee6f1e0432963385 Mon Sep 17 00:00:00 2001 From: Mahmoud Mazouz Date: Thu, 19 Dec 2024 15:53:50 +0100 Subject: [PATCH 08/11] Fix include order --- rmw_zenoh_cpp/src/detail/payload.hpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rmw_zenoh_cpp/src/detail/payload.hpp b/rmw_zenoh_cpp/src/detail/payload.hpp index e8f661fd..7743b43c 100644 --- a/rmw_zenoh_cpp/src/detail/payload.hpp +++ b/rmw_zenoh_cpp/src/detail/payload.hpp @@ -15,14 +15,14 @@ #ifndef DETAIL__PAYLOAD_HPP_ #define DETAIL__PAYLOAD_HPP_ -#include - #include #include #include #include #include +#include + namespace rmw_zenoh_cpp { ///============================================================================= From 5569bb45a10ad6ae68415d78041ffe1bd45b33aa Mon Sep 17 00:00:00 2001 From: Mahmoud Mazouz Date: Thu, 19 Dec 2024 16:44:20 +0100 Subject: [PATCH 09/11] Move `Payload` into `zenoh_utils` --- rmw_zenoh_cpp/src/detail/payload.hpp | 90 ------------------- .../src/detail/rmw_subscription_data.hpp | 1 - rmw_zenoh_cpp/src/detail/zenoh_utils.cpp | 43 +++++++++ rmw_zenoh_cpp/src/detail/zenoh_utils.hpp | 29 ++++++ 4 files changed, 72 insertions(+), 91 deletions(-) delete mode 100644 rmw_zenoh_cpp/src/detail/payload.hpp diff --git a/rmw_zenoh_cpp/src/detail/payload.hpp b/rmw_zenoh_cpp/src/detail/payload.hpp deleted file mode 100644 index 7743b43c..00000000 --- a/rmw_zenoh_cpp/src/detail/payload.hpp +++ /dev/null @@ -1,90 +0,0 @@ -// Copyright 2024 Open Source Robotics Foundation, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#ifndef DETAIL__PAYLOAD_HPP_ -#define DETAIL__PAYLOAD_HPP_ - -#include -#include -#include -#include -#include - -#include - -namespace rmw_zenoh_cpp -{ -///============================================================================= -class Payload -{ -public: - explicit Payload(const zenoh::Bytes & bytes) - { - zenoh::Bytes::SliceIterator slices = bytes.slice_iter(); - std::optional slice = slices.next(); - if (!slice.has_value()) { - bytes_ = nullptr; - } else { - if (!slices.next().has_value()) { - bytes_ = Contiguous {slice.value(), bytes.clone()}; - } else { - bytes_ = bytes.as_vector(); - } - } - } - - ~Payload() = default; - - const uint8_t * data() - { - if (std::holds_alternative(bytes_)) { - return nullptr; - } else if (std::holds_alternative(bytes_)) { - return std::get(bytes_).data(); - } else { - return std::get(bytes_).slice.data; - } - } - - size_t size() - { - if (std::holds_alternative(bytes_)) { - return 0; - } else if (std::holds_alternative(bytes_)) { - return std::get(bytes_).size(); - } else { - return std::get(bytes_).slice.len; - } - } - - bool empty() - { - return std::holds_alternative(bytes_); - } - -private: - struct Contiguous - { - zenoh::Slice slice; - zenoh::Bytes bytes; - }; - using NonContiguous = std::vector; - using Empty = std::nullptr_t; - // Is `std::vector` in case of a non-contiguous payload - // and `zenoh::Slice` plus a `zenoh::Bytes` otherwise. - std::variant bytes_; -}; -} // namespace rmw_zenoh_cpp - -#endif // DETAIL__PAYLOAD_HPP_ diff --git a/rmw_zenoh_cpp/src/detail/rmw_subscription_data.hpp b/rmw_zenoh_cpp/src/detail/rmw_subscription_data.hpp index bb033255..37ab0dba 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_subscription_data.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_subscription_data.hpp @@ -35,7 +35,6 @@ #include "attachment_helpers.hpp" #include "type_support_common.hpp" #include "zenoh_utils.hpp" -#include "payload.hpp" #include "rcutils/allocator.h" diff --git a/rmw_zenoh_cpp/src/detail/zenoh_utils.cpp b/rmw_zenoh_cpp/src/detail/zenoh_utils.cpp index 061ceb0f..5fe1fbcd 100644 --- a/rmw_zenoh_cpp/src/detail/zenoh_utils.cpp +++ b/rmw_zenoh_cpp/src/detail/zenoh_utils.cpp @@ -77,4 +77,47 @@ int64_t get_system_time_in_ns() return std::chrono::duration_cast(now).count(); } +///============================================================================= +Payload::Payload(const zenoh::Bytes & bytes) +{ + zenoh::Bytes::SliceIterator slices = bytes.slice_iter(); + std::optional slice = slices.next(); + if (!slice.has_value()) { + bytes_ = nullptr; + } else { + if (!slices.next().has_value()) { + bytes_ = Contiguous {slice.value(), bytes.clone()}; + } else { + bytes_ = bytes.as_vector(); + } + } +} + +const uint8_t * Payload::data() +{ + if (std::holds_alternative(bytes_)) { + return nullptr; + } else if (std::holds_alternative(bytes_)) { + return std::get(bytes_).data(); + } else { + return std::get(bytes_).slice.data; + } +} + +size_t Payload::size() +{ + if (std::holds_alternative(bytes_)) { + return 0; + } else if (std::holds_alternative(bytes_)) { + return std::get(bytes_).size(); + } else { + return std::get(bytes_).slice.len; + } +} + +bool Payload::empty() +{ + return std::holds_alternative(bytes_); +} + } // namespace rmw_zenoh_cpp diff --git a/rmw_zenoh_cpp/src/detail/zenoh_utils.hpp b/rmw_zenoh_cpp/src/detail/zenoh_utils.hpp index a88b132e..31a6d77c 100644 --- a/rmw_zenoh_cpp/src/detail/zenoh_utils.hpp +++ b/rmw_zenoh_cpp/src/detail/zenoh_utils.hpp @@ -21,6 +21,9 @@ #include #include #include +#include +#include +#include #include "rmw/types.h" @@ -63,6 +66,32 @@ class ZenohQuery final }; int64_t get_system_time_in_ns(); + +class Payload +{ +public: + explicit Payload(const zenoh::Bytes & bytes); + + ~Payload() = default; + + const uint8_t * data() const; + + size_t size() const; + + bool empty() const; + +private: + struct Contiguous + { + zenoh::Slice slice; + zenoh::Bytes bytes; + }; + using NonContiguous = std::vector; + using Empty = std::nullptr_t; + // Is `std::vector` in case of a non-contiguous payload + // and `zenoh::Slice` plus a `zenoh::Bytes` otherwise. + std::variant bytes_; +} } // namespace rmw_zenoh_cpp #endif // DETAIL__ZENOH_UTILS_HPP_ From 4ce7535cbd2bfef4caeb68a407ba754563a48b9d Mon Sep 17 00:00:00 2001 From: Mahmoud Mazouz Date: Fri, 20 Dec 2024 09:08:43 +0100 Subject: [PATCH 10/11] Restore lost comment --- rmw_zenoh_cpp/src/detail/zenoh_utils.cpp | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/rmw_zenoh_cpp/src/detail/zenoh_utils.cpp b/rmw_zenoh_cpp/src/detail/zenoh_utils.cpp index 5fe1fbcd..8145ef7c 100644 --- a/rmw_zenoh_cpp/src/detail/zenoh_utils.cpp +++ b/rmw_zenoh_cpp/src/detail/zenoh_utils.cpp @@ -80,6 +80,14 @@ int64_t get_system_time_in_ns() ///============================================================================= Payload::Payload(const zenoh::Bytes & bytes) { + // NOTE(fuzzypixelz): `zenoh::Bytes` is an list of reference-couted buffers. When the list of + // buffers contains exactly one element, it is not necessary to concatenate the list of buffers. + // In this case, we store a clone of the bytes object to maintain a non-zero reference-count on + // the buffer. This ensures that the slice into said buffer stays valid until we drop our copy + // of the bytes object (at the very least). This case corresponds to the `Contiguous` + // alternative of the `bytes_` variant and aims to optimize away a memcpy during "session-local" + // communication. + zenoh::Bytes::SliceIterator slices = bytes.slice_iter(); std::optional slice = slices.next(); if (!slice.has_value()) { From e289a48d9f0546108f5ac8d4972dc162c6a40c49 Mon Sep 17 00:00:00 2001 From: Mahmoud Mazouz Date: Fri, 20 Dec 2024 12:58:54 +0000 Subject: [PATCH 11/11] Fix build errors --- rmw_zenoh_cpp/src/detail/zenoh_utils.cpp | 6 +++--- rmw_zenoh_cpp/src/detail/zenoh_utils.hpp | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/rmw_zenoh_cpp/src/detail/zenoh_utils.cpp b/rmw_zenoh_cpp/src/detail/zenoh_utils.cpp index 8145ef7c..5770718b 100644 --- a/rmw_zenoh_cpp/src/detail/zenoh_utils.cpp +++ b/rmw_zenoh_cpp/src/detail/zenoh_utils.cpp @@ -101,7 +101,7 @@ Payload::Payload(const zenoh::Bytes & bytes) } } -const uint8_t * Payload::data() +const uint8_t * Payload::data() const { if (std::holds_alternative(bytes_)) { return nullptr; @@ -112,7 +112,7 @@ const uint8_t * Payload::data() } } -size_t Payload::size() +size_t Payload::size() const { if (std::holds_alternative(bytes_)) { return 0; @@ -123,7 +123,7 @@ size_t Payload::size() } } -bool Payload::empty() +bool Payload::empty() const { return std::holds_alternative(bytes_); } diff --git a/rmw_zenoh_cpp/src/detail/zenoh_utils.hpp b/rmw_zenoh_cpp/src/detail/zenoh_utils.hpp index 31a6d77c..dd7cff72 100644 --- a/rmw_zenoh_cpp/src/detail/zenoh_utils.hpp +++ b/rmw_zenoh_cpp/src/detail/zenoh_utils.hpp @@ -91,7 +91,7 @@ class Payload // Is `std::vector` in case of a non-contiguous payload // and `zenoh::Slice` plus a `zenoh::Bytes` otherwise. std::variant bytes_; -} +}; } // namespace rmw_zenoh_cpp #endif // DETAIL__ZENOH_UTILS_HPP_