diff --git a/include/zenoh_commons.h b/include/zenoh_commons.h index fe2b58959..7e6060df7 100644 --- a/include/zenoh_commons.h +++ b/include/zenoh_commons.h @@ -2121,13 +2121,13 @@ ZENOHC_API enum z_sample_kind_t z_sample_kind(const struct z_sample_t *sample); * Note that other samples may have received the same buffer, meaning that mutating this buffer may * affect the samples received by other subscribers. */ -ZENOHC_API struct z_owned_buffer_t z_sample_owned_payload(const struct z_sample_t *sample); +ZENOHC_API zc_owned_payload_t z_sample_owned_payload(const struct z_sample_t *sample); /** * The sample's data, the return value aliases the sample. * * If you need ownership of the buffer, you may use `z_sample_owned_payload`. */ -ZENOHC_API struct z_buffer_t z_sample_payload(const struct z_sample_t *sample); +ZENOHC_API zc_payload_t z_sample_payload(const struct z_sample_t *sample); /** * The qos with which the sample was received. */ diff --git a/src/collections.rs b/src/collections.rs index 363c30a89..d55a9fa1b 100644 --- a/src/collections.rs +++ b/src/collections.rs @@ -13,14 +13,7 @@ // use libc::{c_char, size_t}; -use std::ops::Deref; -use std::ptr::NonNull; -use zenoh::{ - buffers::{buffer::SplitBuffer, ZBuf}, - prelude::ZenohId, -}; - -use crate::{impl_guarded_transmute, GuardedTransmute}; +use zenoh::prelude::ZenohId; /// A contiguous view of bytes owned by some other entity. /// @@ -235,126 +228,3 @@ impl From<&[u8]> for z_bytes_t { } } } - -pub use crate::z_owned_buffer_t; -impl_guarded_transmute!(noderefs Option, z_owned_buffer_t); -impl Default for z_owned_buffer_t { - fn default() -> Self { - z_buffer_null() - } -} -impl From for z_owned_buffer_t { - fn from(value: ZBuf) -> Self { - Some(value).transmute() - } -} - -impl From> for z_owned_buffer_t { - fn from(value: Option) -> Self { - match value { - Some(value) => value.into(), - None => z_buffer_null(), - } - } -} -impl core::ops::Deref for z_owned_buffer_t { - type Target = Option; - - fn deref(&self) -> &Self::Target { - unsafe { core::mem::transmute(self) } - } -} -impl core::ops::DerefMut for z_owned_buffer_t { - fn deref_mut(&mut self) -> &mut Self::Target { - unsafe { core::mem::transmute(self) } - } -} - -/// The gravestone value for `z_owned_buffer_t`. -#[no_mangle] -pub extern "C" fn z_buffer_null() -> z_owned_buffer_t { - unsafe { core::mem::transmute(None::) } -} - -/// Decrements the buffer's reference counter, destroying it if applicable. -/// -/// `buffer` will be reset to `z_buffer_null`, preventing UB on double-frees. -#[no_mangle] -pub extern "C" fn z_buffer_drop(buffer: &mut z_owned_buffer_t) { - core::mem::drop(buffer.take()) -} - -/// Returns `true` if the buffer is in a valid state. -#[no_mangle] -pub extern "C" fn z_buffer_check(buffer: &z_owned_buffer_t) -> bool { - buffer.is_some() -} - -/// Loans the buffer, allowing you to call functions that only need a loan of it. -#[no_mangle] -pub extern "C" fn z_buffer_loan(buffer: &z_owned_buffer_t) -> z_buffer_t { - buffer.as_ref().into() -} - -/// A loan of a `z_owned_buffer_t`. -/// -/// As it is a split buffer, it may contain more than one slice. It's number of slices is returned by `z_buffer_slice_count`. -#[repr(C)] -#[derive(Clone, Copy, Default)] -pub struct z_buffer_t { - _inner: Option>, -} - -impl From> for z_buffer_t { - fn from(value: Option<&ZBuf>) -> Self { - unsafe { core::mem::transmute(value) } - } -} - -impl From for Option<&'static ZBuf> { - fn from(value: z_buffer_t) -> Self { - unsafe { core::mem::transmute(value) } - } -} - -/// Increments the buffer's reference count, returning an owned version of the buffer. -#[no_mangle] -pub extern "C" fn z_buffer_clone(buffer: z_buffer_t) -> z_owned_buffer_t { - match buffer._inner { - Some(b) => unsafe { b.as_ref().deref().clone().transmute() }, - None => ZBuf::empty().into(), - } -} - -/// Returns the number of slices in the buffer. -/// -/// If the return value is 0 or 1, then the buffer's data is contiguous in memory. -#[no_mangle] -pub extern "C" fn z_buffer_slice_count(buffer: z_buffer_t) -> usize { - match buffer.into() { - None => 0, - Some(buf) => ZBuf::slices(buf).len(), - } -} - -/// Returns total number bytes in the buffer. -#[no_mangle] -pub extern "C" fn z_buffer_len(buffer: z_buffer_t) -> usize { - match buffer.into() { - None => 0, - Some(buf) => ZBuf::slices(buf).fold(0, |acc, s| acc + s.len()), - } -} - -/// Returns the `index`th slice of the buffer, aliasing it. -/// -/// Out of bounds accesses will return `z_bytes_empty`. -#[no_mangle] -pub extern "C" fn z_buffer_slice_at(buffer: z_buffer_t, index: usize) -> z_bytes_t { - match buffer.into() { - None => z_bytes_empty(), - Some(buf) => ZBuf::slices(buf) - .nth(index) - .map_or(z_bytes_empty(), |slice| slice.into()), - } -} diff --git a/src/commons.rs b/src/commons.rs index 5f8ede970..ed0f9dbf0 100644 --- a/src/commons.rs +++ b/src/commons.rs @@ -12,21 +12,18 @@ // ZettaScale Zenoh team, // -use std::any::Any; use std::ops::Deref; -use std::slice; use crate::collections::*; use crate::keyexpr::*; use crate::z_congestion_control_t; use crate::z_id_t; use crate::z_priority_t; +use crate::zc_owned_payload_t; +use crate::zc_payload_t; use crate::{impl_guarded_transmute, GuardedTransmute}; use libc::c_void; use libc::{c_char, c_ulong}; -use zenoh::buffers::buffer::SplitBuffer; -use zenoh::buffers::ZBuf; -use zenoh::buffers::ZSliceBuffer; use zenoh::prelude::SampleKind; use zenoh::query::ReplyKeyExpr; use zenoh::sample::Locality; @@ -95,135 +92,6 @@ impl From> for z_timestamp_t { } } -/// An owned payload, backed by a reference counted owner. -/// -/// The `payload` field may be modified, and Zenoh will take the new values into account. -#[allow(non_camel_case_types)] -pub type zc_owned_payload_t = z_owned_buffer_t; - -/// Clones the `payload` by incrementing its reference counter. -#[no_mangle] -pub extern "C" fn zc_payload_rcinc(payload: &zc_owned_payload_t) -> zc_owned_payload_t { - z_buffer_clone(z_buffer_loan(payload)) -} -/// Returns `false` if `payload` is the gravestone value. -#[no_mangle] -pub extern "C" fn zc_payload_check(payload: &zc_owned_payload_t) -> bool { - z_buffer_check(payload) -} -/// Decrements `payload`'s backing refcount, releasing the memory if appropriate. -#[no_mangle] -pub extern "C" fn zc_payload_drop(payload: &mut zc_owned_payload_t) { - z_buffer_drop(payload) -} -/// Constructs `zc_owned_payload_t`'s gravestone value. -#[no_mangle] -pub extern "C" fn zc_payload_null() -> zc_owned_payload_t { - z_buffer_null() -} - -/// Returns a :c:type:`zc_payload_t` loaned from `payload`. -#[no_mangle] -pub extern "C" fn zc_payload_loan(payload: &zc_owned_payload_t) -> zc_payload_t { - z_buffer_loan(payload) -} - -#[allow(non_camel_case_types)] -pub type zc_payload_t = z_buffer_t; - -/// Increments internal payload reference count, returning owned payload. -#[no_mangle] -pub extern "C" fn zc_payload_clone(payload: zc_payload_t) -> zc_owned_payload_t { - z_buffer_clone(payload) -} - -/// Decodes payload into null-terminated string -#[no_mangle] -#[allow(clippy::missing_safety_doc)] -pub unsafe extern "C" fn zc_payload_decode_into_string( - payload: zc_payload_t, - cstr: &mut z_owned_str_t, -) -> i8 { - let payload: Option<&ZBuf> = payload.into(); - if payload.is_none() { - *cstr = z_str_null(); - return 0; - } - *cstr = z_owned_str_t::preallocate(zc_payload_len(payload.into())); - let payload = payload.unwrap(); - - let mut pos = 0; - for s in payload.slices() { - cstr.insert_unchecked(pos, s); - pos += s.len(); - } - 0 -} - -/// Decodes payload into null-terminated string -#[no_mangle] -#[allow(clippy::missing_safety_doc)] -pub unsafe extern "C" fn zc_payload_decode_into_bytes( - payload: zc_payload_t, - b: &mut z_owned_bytes_t, -) -> i8 { - let payload: Option<&ZBuf> = payload.into(); - if payload.is_none() { - *b = z_bytes_null(); - return 0; - } - *b = z_owned_bytes_t::preallocate(zc_payload_len(payload.into())); - let payload = payload.unwrap(); - - let mut pos = 0; - for s in payload.slices() { - b.insert_unchecked(pos, s); - pos += s.len(); - } - 0 -} - -unsafe impl Send for z_bytes_t {} -unsafe impl Sync for z_bytes_t {} - -impl ZSliceBuffer for z_bytes_t { - fn as_slice(&self) -> &[u8] { - unsafe { slice::from_raw_parts(self.start, self.len) } - } - fn as_mut_slice(&mut self) -> &mut [u8] { - unsafe { slice::from_raw_parts_mut(self.start as *mut u8, self.len) } - } - fn as_any(&self) -> &dyn Any { - self - } -} - -/// Encodes byte sequence by aliasing. -#[no_mangle] -#[allow(clippy::missing_safety_doc)] -pub unsafe extern "C" fn zc_payload_encode_from_bytes(bytes: z_bytes_t) -> zc_owned_payload_t { - ZBuf::from(bytes).into() -} - -/// Encodes a null-terminated string by aliasing. -#[no_mangle] -#[allow(clippy::missing_safety_doc)] -pub unsafe extern "C" fn zc_payload_encode_from_string( - cstr: *const libc::c_char, -) -> zc_owned_payload_t { - let bytes = z_bytes_t { - start: cstr as *const u8, - len: libc::strlen(cstr), - }; - zc_payload_encode_from_bytes(bytes) -} - -/// Returns total number bytes in the payload. -#[no_mangle] -pub extern "C" fn zc_payload_len(payload: zc_payload_t) -> usize { - z_buffer_len(payload) -} - /// QoS settings of zenoh message. /// #[repr(C)] @@ -290,7 +158,7 @@ pub extern "C" fn z_sample_encoding(sample: &z_sample_t) -> z_encoding_t { /// /// If you need ownership of the buffer, you may use `z_sample_owned_payload`. #[no_mangle] -pub extern "C" fn z_sample_payload(sample: &z_sample_t) -> z_buffer_t { +pub extern "C" fn z_sample_payload(sample: &z_sample_t) -> zc_payload_t { Some(&sample.payload).into() } /// Returns the sample's payload after incrementing its internal reference count. @@ -298,7 +166,7 @@ pub extern "C" fn z_sample_payload(sample: &z_sample_t) -> z_buffer_t { /// Note that other samples may have received the same buffer, meaning that mutating this buffer may /// affect the samples received by other subscribers. #[no_mangle] -pub extern "C" fn z_sample_owned_payload(sample: &z_sample_t) -> z_owned_buffer_t { +pub extern "C" fn z_sample_owned_payload(sample: &z_sample_t) -> zc_owned_payload_t { sample.payload.clone().into() } /// The sample's kind (put or delete). diff --git a/src/lib.rs b/src/lib.rs index 96cc4bd76..20f766352 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -23,6 +23,8 @@ mod config; pub use crate::config::*; mod commons; pub use crate::commons::*; +mod payload; +pub use crate::payload::*; mod keyexpr; pub use crate::keyexpr::*; mod info; diff --git a/src/payload.rs b/src/payload.rs new file mode 100644 index 000000000..ba28eb3e4 --- /dev/null +++ b/src/payload.rs @@ -0,0 +1,261 @@ +use core::slice; +use std::{any::Any, ops::Deref, ptr::NonNull}; + +use zenoh::buffers::{buffer::SplitBuffer, ZBuf, ZSliceBuffer}; + +use crate::{ + impl_guarded_transmute, z_bytes_empty, z_bytes_null, z_bytes_t, z_owned_bytes_t, z_owned_str_t, + z_str_null, GuardedTransmute, +}; + +pub use crate::z_owned_buffer_t; +impl_guarded_transmute!(noderefs Option, z_owned_buffer_t); +impl Default for z_owned_buffer_t { + fn default() -> Self { + z_buffer_null() + } +} +impl From for z_owned_buffer_t { + fn from(value: ZBuf) -> Self { + Some(value).transmute() + } +} + +impl From> for z_owned_buffer_t { + fn from(value: Option) -> Self { + match value { + Some(value) => value.into(), + None => z_buffer_null(), + } + } +} +impl core::ops::Deref for z_owned_buffer_t { + type Target = Option; + + fn deref(&self) -> &Self::Target { + unsafe { core::mem::transmute(self) } + } +} +impl core::ops::DerefMut for z_owned_buffer_t { + fn deref_mut(&mut self) -> &mut Self::Target { + unsafe { core::mem::transmute(self) } + } +} + +/// The gravestone value for `z_owned_buffer_t`. +#[no_mangle] +pub extern "C" fn z_buffer_null() -> z_owned_buffer_t { + unsafe { core::mem::transmute(None::) } +} + +/// Decrements the buffer's reference counter, destroying it if applicable. +/// +/// `buffer` will be reset to `z_buffer_null`, preventing UB on double-frees. +#[no_mangle] +pub extern "C" fn z_buffer_drop(buffer: &mut z_owned_buffer_t) { + core::mem::drop(buffer.take()) +} + +/// Returns `true` if the buffer is in a valid state. +#[no_mangle] +pub extern "C" fn z_buffer_check(buffer: &z_owned_buffer_t) -> bool { + buffer.is_some() +} + +/// Loans the buffer, allowing you to call functions that only need a loan of it. +#[no_mangle] +pub extern "C" fn z_buffer_loan(buffer: &z_owned_buffer_t) -> z_buffer_t { + buffer.as_ref().into() +} + +/// A loan of a `z_owned_buffer_t`. +/// +/// As it is a split buffer, it may contain more than one slice. It's number of slices is returned by `z_buffer_slice_count`. +#[repr(C)] +#[derive(Clone, Copy, Default)] +pub struct z_buffer_t { + _inner: Option>, +} + +impl From> for z_buffer_t { + fn from(value: Option<&ZBuf>) -> Self { + unsafe { core::mem::transmute(value) } + } +} + +impl From for Option<&'static ZBuf> { + fn from(value: z_buffer_t) -> Self { + unsafe { core::mem::transmute(value) } + } +} + +/// Increments the buffer's reference count, returning an owned version of the buffer. +#[no_mangle] +pub extern "C" fn z_buffer_clone(buffer: z_buffer_t) -> z_owned_buffer_t { + match buffer._inner { + Some(b) => unsafe { b.as_ref().deref().clone().transmute() }, + None => ZBuf::empty().into(), + } +} + +/// Returns the number of slices in the buffer. +/// +/// If the return value is 0 or 1, then the buffer's data is contiguous in memory. +#[no_mangle] +pub extern "C" fn z_buffer_slice_count(buffer: z_buffer_t) -> usize { + match buffer.into() { + None => 0, + Some(buf) => ZBuf::slices(buf).len(), + } +} + +/// Returns total number bytes in the buffer. +#[no_mangle] +pub extern "C" fn z_buffer_len(buffer: z_buffer_t) -> usize { + match buffer.into() { + None => 0, + Some(buf) => ZBuf::slices(buf).fold(0, |acc, s| acc + s.len()), + } +} + +/// Returns the `index`th slice of the buffer, aliasing it. +/// +/// Out of bounds accesses will return `z_bytes_empty`. +#[no_mangle] +pub extern "C" fn z_buffer_slice_at(buffer: z_buffer_t, index: usize) -> z_bytes_t { + match buffer.into() { + None => z_bytes_empty(), + Some(buf) => ZBuf::slices(buf) + .nth(index) + .map_or(z_bytes_empty(), |slice| slice.into()), + } +} + +/// An owned payload, backed by a reference counted owner. +/// +/// The `payload` field may be modified, and Zenoh will take the new values into account. +#[allow(non_camel_case_types)] +pub type zc_owned_payload_t = z_owned_buffer_t; + +/// Clones the `payload` by incrementing its reference counter. +#[no_mangle] +pub extern "C" fn zc_payload_rcinc(payload: &zc_owned_payload_t) -> zc_owned_payload_t { + z_buffer_clone(z_buffer_loan(payload)) +} +/// Returns `false` if `payload` is the gravestone value. +#[no_mangle] +pub extern "C" fn zc_payload_check(payload: &zc_owned_payload_t) -> bool { + z_buffer_check(payload) +} +/// Decrements `payload`'s backing refcount, releasing the memory if appropriate. +#[no_mangle] +pub extern "C" fn zc_payload_drop(payload: &mut zc_owned_payload_t) { + z_buffer_drop(payload) +} +/// Constructs `zc_owned_payload_t`'s gravestone value. +#[no_mangle] +pub extern "C" fn zc_payload_null() -> zc_owned_payload_t { + z_buffer_null() +} + +/// Returns a :c:type:`zc_payload_t` loaned from `payload`. +#[no_mangle] +pub extern "C" fn zc_payload_loan(payload: &zc_owned_payload_t) -> zc_payload_t { + z_buffer_loan(payload) +} + +#[allow(non_camel_case_types)] +pub type zc_payload_t = z_buffer_t; + +/// Increments internal payload reference count, returning owned payload. +#[no_mangle] +pub extern "C" fn zc_payload_clone(payload: zc_payload_t) -> zc_owned_payload_t { + z_buffer_clone(payload) +} + +/// Decodes payload into null-terminated string +#[no_mangle] +#[allow(clippy::missing_safety_doc)] +pub unsafe extern "C" fn zc_payload_decode_into_string( + payload: zc_payload_t, + cstr: &mut z_owned_str_t, +) -> i8 { + let payload: Option<&ZBuf> = payload.into(); + if payload.is_none() { + *cstr = z_str_null(); + return 0; + } + *cstr = z_owned_str_t::preallocate(zc_payload_len(payload.into())); + let payload = payload.unwrap(); + + let mut pos = 0; + for s in payload.slices() { + cstr.insert_unchecked(pos, s); + pos += s.len(); + } + 0 +} + +/// Decodes payload into null-terminated string +#[no_mangle] +#[allow(clippy::missing_safety_doc)] +pub unsafe extern "C" fn zc_payload_decode_into_bytes( + payload: zc_payload_t, + b: &mut z_owned_bytes_t, +) -> i8 { + let payload: Option<&ZBuf> = payload.into(); + if payload.is_none() { + *b = z_bytes_null(); + return 0; + } + *b = z_owned_bytes_t::preallocate(zc_payload_len(payload.into())); + let payload = payload.unwrap(); + + let mut pos = 0; + for s in payload.slices() { + b.insert_unchecked(pos, s); + pos += s.len(); + } + 0 +} + +unsafe impl Send for z_bytes_t {} +unsafe impl Sync for z_bytes_t {} + +impl ZSliceBuffer for z_bytes_t { + fn as_slice(&self) -> &[u8] { + unsafe { slice::from_raw_parts(self.start, self.len) } + } + fn as_mut_slice(&mut self) -> &mut [u8] { + unsafe { slice::from_raw_parts_mut(self.start as *mut u8, self.len) } + } + fn as_any(&self) -> &dyn Any { + self + } +} + +/// Encodes byte sequence by aliasing. +#[no_mangle] +#[allow(clippy::missing_safety_doc)] +pub unsafe extern "C" fn zc_payload_encode_from_bytes(bytes: z_bytes_t) -> zc_owned_payload_t { + ZBuf::from(bytes).into() +} + +/// Encodes a null-terminated string by aliasing. +#[no_mangle] +#[allow(clippy::missing_safety_doc)] +pub unsafe extern "C" fn zc_payload_encode_from_string( + cstr: *const libc::c_char, +) -> zc_owned_payload_t { + let bytes = z_bytes_t { + start: cstr as *const u8, + len: libc::strlen(cstr), + }; + zc_payload_encode_from_bytes(bytes) +} + +/// Returns total number bytes in the payload. +#[no_mangle] +pub extern "C" fn zc_payload_len(payload: zc_payload_t) -> usize { + z_buffer_len(payload) +} diff --git a/src/put.rs b/src/put.rs index 5c8b8d807..5e2d59aec 100644 --- a/src/put.rs +++ b/src/put.rs @@ -14,6 +14,7 @@ use crate::commons::*; use crate::keyexpr::*; use crate::session::*; +use crate::zc_owned_payload_t; use crate::LOG_INVALID_SESSION; use libc::c_void; use zenoh::prelude::{sync::SyncResolve, Priority, SampleKind};