Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't copy contiguous bytes on reception #343

Merged
merged 11 commits into from
Dec 23, 2024
10 changes: 4 additions & 6 deletions rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
#include <string>
#include <utility>
#include <variant>
#include <vector>

#include "attachment_helpers.hpp"
#include "cdr.hpp"
Expand All @@ -44,10 +43,10 @@ namespace rmw_zenoh_cpp
{
///=============================================================================
SubscriptionData::Message::Message(
std::vector<uint8_t> && p,
fuzzypixelz marked this conversation as resolved.
Show resolved Hide resolved
const zenoh::Bytes & p,
uint64_t recv_ts,
AttachmentData && attachment_)
: payload(std::move(p)), recv_timestamp(recv_ts), attachment(std::move(attachment_))
: payload(p), recv_timestamp(recv_ts), attachment(std::move(attachment_))
{
}

Expand Down Expand Up @@ -225,7 +224,7 @@ bool SubscriptionData::init()

sub_data->add_new_message(
std::make_unique<SubscriptionData::Message>(
sample.get_payload().as_vector(),
sample.get_payload(),
std::chrono::system_clock::now().time_since_epoch().count(),
std::move(attachment_data)),
std::string(sample.get_keyexpr().as_string_view()));
Expand Down Expand Up @@ -303,13 +302,12 @@ bool SubscriptionData::init()
"Unable to obtain attachment")
return;
}
auto payload = sample.get_payload().clone();
auto attachment_value = attachment.value();

AttachmentData attachment_data(attachment_value);
sub_data->add_new_message(
std::make_unique<SubscriptionData::Message>(
payload.as_vector(),
sample.get_payload(),
std::chrono::system_clock::now().time_since_epoch().count(),
std::move(attachment_data)),
std::string(sample.get_keyexpr().as_string_view()));
Expand Down
5 changes: 2 additions & 3 deletions rmw_zenoh_cpp/src/detail/rmw_subscription_data.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
#include <string>
#include <unordered_map>
#include <variant>
#include <vector>

#include <zenoh.hxx>

Expand All @@ -51,13 +50,13 @@ class SubscriptionData final : public std::enable_shared_from_this<SubscriptionD
struct Message
{
explicit Message(
std::vector<uint8_t> && p,
fuzzypixelz marked this conversation as resolved.
Show resolved Hide resolved
const zenoh::Bytes & bytes,
uint64_t recv_ts,
AttachmentData && attachment);

~Message();

std::vector<uint8_t> payload;
Payload payload;
uint64_t recv_timestamp;
AttachmentData attachment;
};
Expand Down
51 changes: 51 additions & 0 deletions rmw_zenoh_cpp/src/detail/zenoh_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,4 +77,55 @@ int64_t get_system_time_in_ns()
return std::chrono::duration_cast<std::chrono::nanoseconds>(now).count();
}

///=============================================================================
Payload::Payload(const zenoh::Bytes & bytes)
{
// NOTE(fuzzypixelz): `zenoh::Bytes` is an list of reference-couted buffers. When the list of
// buffers contains exactly one element, it is not necessary to concatenate the list of buffers.
// In this case, we store a clone of the bytes object to maintain a non-zero reference-count on
// the buffer. This ensures that the slice into said buffer stays valid until we drop our copy
// of the bytes object (at the very least). This case corresponds to the `Contiguous`
// alternative of the `bytes_` variant and aims to optimize away a memcpy during "session-local"
// communication.

zenoh::Bytes::SliceIterator slices = bytes.slice_iter();
std::optional<zenoh::Slice> slice = slices.next();
if (!slice.has_value()) {
bytes_ = nullptr;
} else {
if (!slices.next().has_value()) {
bytes_ = Contiguous {slice.value(), bytes.clone()};
} else {
bytes_ = bytes.as_vector();
}
}
}

const uint8_t * Payload::data() const
{
if (std::holds_alternative<Empty>(bytes_)) {
return nullptr;
} else if (std::holds_alternative<NonContiguous>(bytes_)) {
return std::get<NonContiguous>(bytes_).data();
} else {
return std::get<Contiguous>(bytes_).slice.data;
}
}

size_t Payload::size() const
{
if (std::holds_alternative<Empty>(bytes_)) {
return 0;
} else if (std::holds_alternative<NonContiguous>(bytes_)) {
return std::get<NonContiguous>(bytes_).size();
} else {
return std::get<Contiguous>(bytes_).slice.len;
}
}

bool Payload::empty() const
{
return std::holds_alternative<Empty>(bytes_);
}

} // namespace rmw_zenoh_cpp
29 changes: 29 additions & 0 deletions rmw_zenoh_cpp/src/detail/zenoh_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
#include <chrono>
#include <functional>
#include <optional>
#include <utility>
#include <variant>
#include <vector>

#include "rmw/types.h"

Expand Down Expand Up @@ -63,6 +66,32 @@ class ZenohQuery final
};

int64_t get_system_time_in_ns();

class Payload
{
public:
explicit Payload(const zenoh::Bytes & bytes);

~Payload() = default;

const uint8_t * data() const;

size_t size() const;

bool empty() const;

private:
struct Contiguous
{
zenoh::Slice slice;
zenoh::Bytes bytes;
};
using NonContiguous = std::vector<uint8_t>;
using Empty = std::nullptr_t;
// Is `std::vector<uint8_t>` in case of a non-contiguous payload
// and `zenoh::Slice` plus a `zenoh::Bytes` otherwise.
std::variant<NonContiguous, Contiguous, Empty> bytes_;
};
} // namespace rmw_zenoh_cpp

#endif // DETAIL__ZENOH_UTILS_HPP_
Loading