Skip to content

Commit

Permalink
move Payload/zBuf-related functionality into separate file
Browse files Browse the repository at this point in the history
  • Loading branch information
DenisBiryukov91 committed Apr 5, 2024
1 parent 142b752 commit 4932ddf
Show file tree
Hide file tree
Showing 6 changed files with 271 additions and 269 deletions.
4 changes: 2 additions & 2 deletions include/zenoh_commons.h
Original file line number Diff line number Diff line change
Expand Up @@ -2121,13 +2121,13 @@ ZENOHC_API enum z_sample_kind_t z_sample_kind(const struct z_sample_t *sample);
* Note that other samples may have received the same buffer, meaning that mutating this buffer may
* affect the samples received by other subscribers.
*/
ZENOHC_API struct z_owned_buffer_t z_sample_owned_payload(const struct z_sample_t *sample);
ZENOHC_API zc_owned_payload_t z_sample_owned_payload(const struct z_sample_t *sample);
/**
* The sample's data, the return value aliases the sample.
*
* If you need ownership of the buffer, you may use `z_sample_owned_payload`.
*/
ZENOHC_API struct z_buffer_t z_sample_payload(const struct z_sample_t *sample);
ZENOHC_API zc_payload_t z_sample_payload(const struct z_sample_t *sample);
/**
* The qos with which the sample was received.
*/
Expand Down
132 changes: 1 addition & 131 deletions src/collections.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,7 @@
//

use libc::{c_char, size_t};
use std::ops::Deref;
use std::ptr::NonNull;
use zenoh::{
buffers::{buffer::SplitBuffer, ZBuf},
prelude::ZenohId,
};

use crate::{impl_guarded_transmute, GuardedTransmute};
use zenoh::prelude::ZenohId;

/// A contiguous view of bytes owned by some other entity.
///
Expand Down Expand Up @@ -235,126 +228,3 @@ impl From<&[u8]> for z_bytes_t {
}
}
}

pub use crate::z_owned_buffer_t;
impl_guarded_transmute!(noderefs Option<ZBuf>, z_owned_buffer_t);
impl Default for z_owned_buffer_t {
fn default() -> Self {
z_buffer_null()
}
}
impl From<ZBuf> for z_owned_buffer_t {
fn from(value: ZBuf) -> Self {
Some(value).transmute()
}
}

impl From<Option<ZBuf>> for z_owned_buffer_t {
fn from(value: Option<ZBuf>) -> Self {
match value {
Some(value) => value.into(),
None => z_buffer_null(),
}
}
}
impl core::ops::Deref for z_owned_buffer_t {
type Target = Option<ZBuf>;

fn deref(&self) -> &Self::Target {
unsafe { core::mem::transmute(self) }
}
}
impl core::ops::DerefMut for z_owned_buffer_t {
fn deref_mut(&mut self) -> &mut Self::Target {
unsafe { core::mem::transmute(self) }
}
}

/// The gravestone value for `z_owned_buffer_t`.
#[no_mangle]
pub extern "C" fn z_buffer_null() -> z_owned_buffer_t {
unsafe { core::mem::transmute(None::<ZBuf>) }
}

/// Decrements the buffer's reference counter, destroying it if applicable.
///
/// `buffer` will be reset to `z_buffer_null`, preventing UB on double-frees.
#[no_mangle]
pub extern "C" fn z_buffer_drop(buffer: &mut z_owned_buffer_t) {
core::mem::drop(buffer.take())
}

/// Returns `true` if the buffer is in a valid state.
#[no_mangle]
pub extern "C" fn z_buffer_check(buffer: &z_owned_buffer_t) -> bool {
buffer.is_some()
}

/// Loans the buffer, allowing you to call functions that only need a loan of it.
#[no_mangle]
pub extern "C" fn z_buffer_loan(buffer: &z_owned_buffer_t) -> z_buffer_t {
buffer.as_ref().into()
}

/// A loan of a `z_owned_buffer_t`.
///
/// As it is a split buffer, it may contain more than one slice. It's number of slices is returned by `z_buffer_slice_count`.
#[repr(C)]
#[derive(Clone, Copy, Default)]
pub struct z_buffer_t {
_inner: Option<NonNull<z_owned_buffer_t>>,
}

impl From<Option<&ZBuf>> for z_buffer_t {
fn from(value: Option<&ZBuf>) -> Self {
unsafe { core::mem::transmute(value) }
}
}

impl From<z_buffer_t> for Option<&'static ZBuf> {
fn from(value: z_buffer_t) -> Self {
unsafe { core::mem::transmute(value) }
}
}

/// Increments the buffer's reference count, returning an owned version of the buffer.
#[no_mangle]
pub extern "C" fn z_buffer_clone(buffer: z_buffer_t) -> z_owned_buffer_t {
match buffer._inner {
Some(b) => unsafe { b.as_ref().deref().clone().transmute() },
None => ZBuf::empty().into(),
}
}

/// Returns the number of slices in the buffer.
///
/// If the return value is 0 or 1, then the buffer's data is contiguous in memory.
#[no_mangle]
pub extern "C" fn z_buffer_slice_count(buffer: z_buffer_t) -> usize {
match buffer.into() {
None => 0,
Some(buf) => ZBuf::slices(buf).len(),
}
}

/// Returns total number bytes in the buffer.
#[no_mangle]
pub extern "C" fn z_buffer_len(buffer: z_buffer_t) -> usize {
match buffer.into() {
None => 0,
Some(buf) => ZBuf::slices(buf).fold(0, |acc, s| acc + s.len()),
}
}

/// Returns the `index`th slice of the buffer, aliasing it.
///
/// Out of bounds accesses will return `z_bytes_empty`.
#[no_mangle]
pub extern "C" fn z_buffer_slice_at(buffer: z_buffer_t, index: usize) -> z_bytes_t {
match buffer.into() {
None => z_bytes_empty(),
Some(buf) => ZBuf::slices(buf)
.nth(index)
.map_or(z_bytes_empty(), |slice| slice.into()),
}
}
140 changes: 4 additions & 136 deletions src/commons.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,21 +12,18 @@
// ZettaScale Zenoh team, <[email protected]>
//

use std::any::Any;
use std::ops::Deref;
use std::slice;

use crate::collections::*;
use crate::keyexpr::*;
use crate::z_congestion_control_t;
use crate::z_id_t;
use crate::z_priority_t;
use crate::zc_owned_payload_t;
use crate::zc_payload_t;
use crate::{impl_guarded_transmute, GuardedTransmute};
use libc::c_void;
use libc::{c_char, c_ulong};
use zenoh::buffers::buffer::SplitBuffer;
use zenoh::buffers::ZBuf;
use zenoh::buffers::ZSliceBuffer;
use zenoh::prelude::SampleKind;
use zenoh::query::ReplyKeyExpr;
use zenoh::sample::Locality;
Expand Down Expand Up @@ -95,135 +92,6 @@ impl From<Option<&Timestamp>> for z_timestamp_t {
}
}

/// An owned payload, backed by a reference counted owner.
///
/// The `payload` field may be modified, and Zenoh will take the new values into account.
#[allow(non_camel_case_types)]
pub type zc_owned_payload_t = z_owned_buffer_t;

/// Clones the `payload` by incrementing its reference counter.
#[no_mangle]
pub extern "C" fn zc_payload_rcinc(payload: &zc_owned_payload_t) -> zc_owned_payload_t {
z_buffer_clone(z_buffer_loan(payload))
}
/// Returns `false` if `payload` is the gravestone value.
#[no_mangle]
pub extern "C" fn zc_payload_check(payload: &zc_owned_payload_t) -> bool {
z_buffer_check(payload)
}
/// Decrements `payload`'s backing refcount, releasing the memory if appropriate.
#[no_mangle]
pub extern "C" fn zc_payload_drop(payload: &mut zc_owned_payload_t) {
z_buffer_drop(payload)
}
/// Constructs `zc_owned_payload_t`'s gravestone value.
#[no_mangle]
pub extern "C" fn zc_payload_null() -> zc_owned_payload_t {
z_buffer_null()
}

/// Returns a :c:type:`zc_payload_t` loaned from `payload`.
#[no_mangle]
pub extern "C" fn zc_payload_loan(payload: &zc_owned_payload_t) -> zc_payload_t {
z_buffer_loan(payload)
}

#[allow(non_camel_case_types)]
pub type zc_payload_t = z_buffer_t;

/// Increments internal payload reference count, returning owned payload.
#[no_mangle]
pub extern "C" fn zc_payload_clone(payload: zc_payload_t) -> zc_owned_payload_t {
z_buffer_clone(payload)
}

/// Decodes payload into null-terminated string
#[no_mangle]
#[allow(clippy::missing_safety_doc)]
pub unsafe extern "C" fn zc_payload_decode_into_string(
payload: zc_payload_t,
cstr: &mut z_owned_str_t,
) -> i8 {
let payload: Option<&ZBuf> = payload.into();
if payload.is_none() {
*cstr = z_str_null();
return 0;
}
*cstr = z_owned_str_t::preallocate(zc_payload_len(payload.into()));
let payload = payload.unwrap();

let mut pos = 0;
for s in payload.slices() {
cstr.insert_unchecked(pos, s);
pos += s.len();
}
0
}

/// Decodes payload into null-terminated string
#[no_mangle]
#[allow(clippy::missing_safety_doc)]
pub unsafe extern "C" fn zc_payload_decode_into_bytes(
payload: zc_payload_t,
b: &mut z_owned_bytes_t,
) -> i8 {
let payload: Option<&ZBuf> = payload.into();
if payload.is_none() {
*b = z_bytes_null();
return 0;
}
*b = z_owned_bytes_t::preallocate(zc_payload_len(payload.into()));
let payload = payload.unwrap();

let mut pos = 0;
for s in payload.slices() {
b.insert_unchecked(pos, s);
pos += s.len();
}
0
}

unsafe impl Send for z_bytes_t {}
unsafe impl Sync for z_bytes_t {}

impl ZSliceBuffer for z_bytes_t {
fn as_slice(&self) -> &[u8] {
unsafe { slice::from_raw_parts(self.start, self.len) }
}
fn as_mut_slice(&mut self) -> &mut [u8] {
unsafe { slice::from_raw_parts_mut(self.start as *mut u8, self.len) }
}
fn as_any(&self) -> &dyn Any {
self
}
}

/// Encodes byte sequence by aliasing.
#[no_mangle]
#[allow(clippy::missing_safety_doc)]
pub unsafe extern "C" fn zc_payload_encode_from_bytes(bytes: z_bytes_t) -> zc_owned_payload_t {
ZBuf::from(bytes).into()
}

/// Encodes a null-terminated string by aliasing.
#[no_mangle]
#[allow(clippy::missing_safety_doc)]
pub unsafe extern "C" fn zc_payload_encode_from_string(
cstr: *const libc::c_char,
) -> zc_owned_payload_t {
let bytes = z_bytes_t {
start: cstr as *const u8,
len: libc::strlen(cstr),
};
zc_payload_encode_from_bytes(bytes)
}

/// Returns total number bytes in the payload.
#[no_mangle]
pub extern "C" fn zc_payload_len(payload: zc_payload_t) -> usize {
z_buffer_len(payload)
}

/// QoS settings of zenoh message.
///
#[repr(C)]
Expand Down Expand Up @@ -290,15 +158,15 @@ pub extern "C" fn z_sample_encoding(sample: &z_sample_t) -> z_encoding_t {
///
/// If you need ownership of the buffer, you may use `z_sample_owned_payload`.
#[no_mangle]
pub extern "C" fn z_sample_payload(sample: &z_sample_t) -> z_buffer_t {
pub extern "C" fn z_sample_payload(sample: &z_sample_t) -> zc_payload_t {
Some(&sample.payload).into()
}
/// Returns the sample's payload after incrementing its internal reference count.
///
/// Note that other samples may have received the same buffer, meaning that mutating this buffer may
/// affect the samples received by other subscribers.
#[no_mangle]
pub extern "C" fn z_sample_owned_payload(sample: &z_sample_t) -> z_owned_buffer_t {
pub extern "C" fn z_sample_owned_payload(sample: &z_sample_t) -> zc_owned_payload_t {
sample.payload.clone().into()
}
/// The sample's kind (put or delete).
Expand Down
2 changes: 2 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ mod config;
pub use crate::config::*;
mod commons;
pub use crate::commons::*;
mod payload;
pub use crate::payload::*;
mod keyexpr;
pub use crate::keyexpr::*;
mod info;
Expand Down
Loading

0 comments on commit 4932ddf

Please sign in to comment.