Skip to content

Commit

Permalink
Updated implementation to use Cyclone DDS API instead of cyclocut whe…
Browse files Browse the repository at this point in the history
…re possible. (eclipse-zenoh#133)

Co-authored-by: Julien Enoch <[email protected]>
  • Loading branch information
gmartin82 and JEnoch authored Jun 15, 2023
1 parent 064a09d commit eefc16f
Show file tree
Hide file tree
Showing 7 changed files with 158 additions and 674 deletions.
9 changes: 6 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ async-trait = "0.1.66"
bincode = "1.3.3"
cdr = "0.2.4"
clap = "3.2.23"
cyclors = "0.1.7"
cyclors = { git = "https://github.com/gmartin82/cyclors", branch = "master"}
derivative = "2.2.0"
env_logger = "0.10.0"
flume = "0.10.14"
Expand Down
86 changes: 63 additions & 23 deletions zenoh-plugin-dds/src/dds_mgt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//
use crate::qos::{HistoryKind, Qos};
use async_std::task;
use cyclors::qos::{HistoryKind, Qos};
use cyclors::*;
use flume::Sender;
use log::{debug, error, warn};
Expand All @@ -21,9 +21,9 @@ use std::collections::HashMap;
use std::ffi::{CStr, CString};
use std::mem::MaybeUninit;
use std::os::raw;
use std::slice;
use std::sync::Arc;
use std::time::Duration;
use zenoh::buffers::ZBuf;
use zenoh::prelude::*;
use zenoh::publication::CongestionControl;
use zenoh::Session;
Expand Down Expand Up @@ -58,6 +58,41 @@ pub(crate) enum DiscoveryEvent {
UndiscoveredSubscription { key: String },
}

pub(crate) struct DDSRawSample {
sdref: *mut ddsi_serdata,
data: ddsrt_iovec_t,
}

impl DDSRawSample {
pub(crate) fn create(serdata: *const ddsi_serdata) -> DDSRawSample {
unsafe {
let mut data = ddsrt_iovec_t {
iov_base: std::ptr::null_mut(),
iov_len: 0,
};

let size = ddsi_serdata_size(serdata);
let sdref = ddsi_serdata_to_ser_ref(serdata, 0, size as size_t, &mut data);

DDSRawSample { sdref, data }
}
}

pub(crate) fn as_slice(&self) -> &[u8] {
unsafe {
slice::from_raw_parts(self.data.iov_base as *const u8, self.data.iov_len as usize)
}
}
}

impl Drop for DDSRawSample {
fn drop(&mut self) {
unsafe {
ddsi_serdata_to_ser_unref(self.sdref, &self.data);
}
}
}

unsafe extern "C" fn on_data(dr: dds_entity_t, arg: *mut std::os::raw::c_void) {
let btx = Box::from_raw(arg as *mut (bool, Sender<DiscoveryEvent>));
let pub_discovery: bool = btx.0;
Expand Down Expand Up @@ -201,32 +236,39 @@ pub(crate) fn run_discovery(dp: dds_entity_t, tx: Sender<DiscoveryEvent>) {

unsafe extern "C" fn data_forwarder_listener(dr: dds_entity_t, arg: *mut std::os::raw::c_void) {
let pa = arg as *mut (String, KeyExpr, Arc<Session>, CongestionControl);
let mut zp: *mut cdds_ddsi_payload = std::ptr::null_mut();
let mut zp: *mut ddsi_serdata = std::ptr::null_mut();
#[allow(clippy::uninit_assumed_init)]
let mut si = MaybeUninit::<[dds_sample_info_t; 1]>::uninit();
while cdds_take_blob(dr, &mut zp, si.as_mut_ptr() as *mut dds_sample_info_t) > 0 {
while dds_takecdr(
dr,
&mut zp,
1,
si.as_mut_ptr() as *mut dds_sample_info_t,
DDS_ANY_STATE,
) > 0
{
let si = si.assume_init();
if si[0].valid_data {
let bs = Vec::from_raw_parts((*zp).payload, (*zp).size as usize, (*zp).size as usize);
let rbuf = ZBuf::from(bs);
let raw_sample = DDSRawSample::create(zp);
let data_in_slice = raw_sample.as_slice();

if *crate::LOG_PAYLOAD {
log::trace!(
"Route data from DDS {} to zenoh key={} - payload: {:?}",
&(*pa).0,
&(*pa).1,
rbuf
data_in_slice
);
} else {
log::trace!("Route data from DDS {} to zenoh key={}", &(*pa).0, &(*pa).1);
}
let _ = (*pa)
.2
.put(&(*pa).1, rbuf)
.put(&(*pa).1, data_in_slice)
.congestion_control((*pa).3)
.res_sync();
(*zp).payload = std::ptr::null_mut();
}
cdds_serdata_unref(zp as *mut ddsi_serdata);
ddsi_serdata_unref(zp);
}
}

Expand Down Expand Up @@ -259,8 +301,7 @@ pub fn create_forwarding_dds_reader(
let reader = dds_create_reader(dp, t, qos_native, sub_listener);
Qos::delete_qos_native(qos_native);
if reader >= 0 {
let res =
dds_reader_wait_for_historical_data(reader, crate::qos::DDS_100MS_DURATION);
let res = dds_reader_wait_for_historical_data(reader, qos::DDS_100MS_DURATION);
if res < 0 {
log::error!(
"Error calling dds_reader_wait_for_historical_data(): {}",
Expand Down Expand Up @@ -299,13 +340,15 @@ pub fn create_forwarding_dds_reader(
}

async_std::task::sleep(period).await;
let mut zp: *mut cdds_ddsi_payload = std::ptr::null_mut();
let mut zp: *mut ddsi_serdata = std::ptr::null_mut();
#[allow(clippy::uninit_assumed_init)]
let mut si = MaybeUninit::<[dds_sample_info_t; 1]>::uninit();
while cdds_take_blob(
while dds_takecdr(
reader,
&mut zp,
1,
si.as_mut_ptr() as *mut dds_sample_info_t,
DDS_ANY_STATE,
) > 0
{
let si = si.assume_init();
Expand All @@ -314,19 +357,16 @@ pub fn create_forwarding_dds_reader(
"Route (periodic) data to zenoh resource with rid={}",
z_key
);
let bs = Vec::from_raw_parts(
(*zp).payload,
(*zp).size as usize,
(*zp).size as usize,
);
let rbuf = ZBuf::from(bs);

let raw_sample = DDSRawSample::create(zp);
let data_in_slice = raw_sample.as_slice();

let _ = z
.put(&z_key, rbuf)
.put(&z_key, data_in_slice)
.congestion_control(congestion_ctrl)
.res_sync();
(*zp).payload = std::ptr::null_mut();
}
cdds_serdata_unref(zp as *mut ddsi_serdata);
ddsi_serdata_unref(zp);
}
}
});
Expand Down
42 changes: 23 additions & 19 deletions zenoh-plugin-dds/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// ZettaScale Zenoh Team, <[email protected]>
//
use async_trait::async_trait;
use cyclors::qos::*;
use cyclors::*;
use flume::{unbounded, Receiver, Sender};
use futures::select;
Expand All @@ -31,6 +32,7 @@ use zenoh::buffers::SplitBuffer;
use zenoh::liveliness::LivelinessToken;
use zenoh::plugins::{Plugin, RunningPluginTrait, Runtime, ZenohPlugin};
use zenoh::prelude::r#async::AsyncResolve;
use zenoh::prelude::r#sync::SyncResolve;
use zenoh::prelude::*;
use zenoh::publication::CongestionControl;
use zenoh::query::{ConsolidationMode, QueryTarget};
Expand All @@ -43,13 +45,11 @@ use zenoh_util::{Timed, TimedEvent, Timer};

pub mod config;
mod dds_mgt;
mod qos;
mod ros_discovery;
mod route_dds_zenoh;
mod route_zenoh_dds;
use config::Config;
use dds_mgt::*;
use qos::*;

use crate::ros_discovery::{
NodeEntitiesInfo, ParticipantEntitiesInfo, RosDiscoveryInfoMgr, ROS_DISCOVERY_INFO_TOPIC_NAME,
Expand Down Expand Up @@ -154,7 +154,7 @@ pub async fn run(runtime: Runtime, config: Config) {
let zsession = match zenoh::init(runtime)
.aggregated_subscribers(config.generalise_subs.clone())
.aggregated_publishers(config.generalise_pubs.clone())
.res()
.res_async()
.await
{
Ok(session) => Arc::new(session),
Expand All @@ -172,7 +172,7 @@ pub async fn run(runtime: Runtime, config: Config) {
let member = match zsession
.liveliness()
.declare_token(*KE_PREFIX_LIVELINESS_GROUP / &member_id)
.res()
.res_async()
.await
{
Ok(member) => member,
Expand Down Expand Up @@ -598,7 +598,11 @@ impl<'a> DdsPluginRuntime<'a> {
// send replies
for (ke, v) in kvs.drain(..) {
let admin_keyexpr = admin_keyexpr_prefix / &ke;
if let Err(e) = query.reply(Ok(Sample::new(admin_keyexpr, v))).res().await {
if let Err(e) = query
.reply(Ok(Sample::new(admin_keyexpr, v)))
.res_async()
.await
{
warn!("Error replying to admin query {:?}: {}", query, e);
}
}
Expand All @@ -611,7 +615,7 @@ impl<'a> DdsPluginRuntime<'a> {
.declare_subscriber(*KE_PREFIX_LIVELINESS_GROUP / *KE_ANY_N_SEGMENT)
.querying()
.with(flume::unbounded())
.res()
.res_async()
.await
.expect("Failed to create Liveliness Subscriber");

Expand All @@ -627,7 +631,7 @@ impl<'a> DdsPluginRuntime<'a> {
let admin_queryable = self
.zsession
.declare_queryable(admin_keyexpr_expr)
.res()
.res_async()
.await
.expect("Failed to create AdminSpace queryable");

Expand Down Expand Up @@ -899,19 +903,19 @@ impl<'a> DdsPluginRuntime<'a> {
let fwd_writers_key_prefix_key = self
.zsession
.declare_keyexpr(fwd_writers_key_prefix)
.res()
.res_async()
.await
.expect("Failed to declare key expression for Fwd Discovery of writers");
let fwd_readers_key_prefix_key = self
.zsession
.declare_keyexpr(fwd_readers_key_prefix)
.res()
.res_async()
.await
.expect("Failed to declare key expression for Fwd Discovery of readers");
let fwd_ros_discovery_key_declared = self
.zsession
.declare_keyexpr(&fwd_ros_discovery_key)
.res()
.res_async()
.await
.expect("Failed to declare key expression for Fwd Discovery of ros_discovery");

Expand All @@ -920,7 +924,7 @@ impl<'a> DdsPluginRuntime<'a> {
.zsession
.declare_publication_cache(fwd_declare_publication_cache_key)
.queryable_allowed_origin(Locality::Remote) // Note: don't reply to queries from local QueryingSubscribers
.res()
.res_async()
.await
.expect("Failed to declare PublicationCache for Fwd Discovery");

Expand All @@ -931,7 +935,7 @@ impl<'a> DdsPluginRuntime<'a> {
.querying()
.allowed_origin(Locality::Remote) // Note: ignore my own publications
.query_timeout(self.config.queries_timeout)
.res()
.res_async()
.await
.expect("Failed to declare QueryingSubscriber for Fwd Discovery");

Expand Down Expand Up @@ -968,7 +972,7 @@ impl<'a> DdsPluginRuntime<'a> {
Ok(s) => s,
Err(e) => { error!("INTERNAL ERROR: failed to serialize discovery message for {:?}: {}", entity, e); continue; }
};
if let Err(e) = self.zsession.put(&fwd_ke, ser_msg).congestion_control(CongestionControl::Block).res().await {
if let Err(e) = self.zsession.put(&fwd_ke, ser_msg).congestion_control(CongestionControl::Block).res_async().await {
error!("INTERNAL ERROR: failed to publish discovery message on {}: {}", fwd_ke, e);
}

Expand All @@ -983,7 +987,7 @@ impl<'a> DdsPluginRuntime<'a> {
if let Some((admin_keyexpr, _)) = self.remove_dds_writer(&key) {
let fwd_ke = &fwd_writers_key_prefix_key / &admin_keyexpr;
// publish its deletion from admin space
if let Err(e) = self.zsession.delete(&fwd_ke).congestion_control(CongestionControl::Block).res().await {
if let Err(e) = self.zsession.delete(&fwd_ke).congestion_control(CongestionControl::Block).res_async().await {
error!("INTERNAL ERROR: failed to publish undiscovery message on {:?}: {}", fwd_ke, e);
}
}
Expand Down Expand Up @@ -1029,7 +1033,7 @@ impl<'a> DdsPluginRuntime<'a> {
Ok(s) => s,
Err(e) => { error!("INTERNAL ERROR: failed to serialize discovery message for {:?}: {}", entity, e); continue; }
};
if let Err(e) = self.zsession.put(&fwd_ke, ser_msg).congestion_control(CongestionControl::Block).res().await {
if let Err(e) = self.zsession.put(&fwd_ke, ser_msg).congestion_control(CongestionControl::Block).res_async().await {
error!("INTERNAL ERROR: failed to publish discovery message on {}: {}", fwd_ke, e);
}

Expand All @@ -1044,7 +1048,7 @@ impl<'a> DdsPluginRuntime<'a> {
if let Some((admin_keyexpr, _)) = self.remove_dds_reader(&key) {
let fwd_ke = &fwd_readers_key_prefix_key / &admin_keyexpr;
// publish its deletion from admin space
if let Err(e) = self.zsession.delete(&fwd_ke).congestion_control(CongestionControl::Block).res().await {
if let Err(e) = self.zsession.delete(&fwd_ke).congestion_control(CongestionControl::Block).res_async().await {
error!("INTERNAL ERROR: failed to publish undiscovery message on {:?}: {}", fwd_ke, e);
}
}
Expand Down Expand Up @@ -1289,7 +1293,7 @@ impl<'a> DdsPluginRuntime<'a> {
.consolidation(ConsolidationMode::None)
.timeout(self.config.queries_timeout)
.res_sync()
}).res().await
}).res_async().await
{
warn!("Query on {} for discovery messages failed: {}", key, e);
}
Expand Down Expand Up @@ -1369,10 +1373,10 @@ impl<'a> DdsPluginRuntime<'a> {
_ = ros_disco_timer_rcv.recv_async() => {
let infos = ros_disco_mgr.read();
for (gid, buf) in infos {
trace!("Received ros_discovery_info from DDS for {}, forward via zenoh: {}", gid, hex::encode(buf.contiguous()));
trace!("Received ros_discovery_info from DDS for {}, forward via zenoh: {}", gid, hex::encode(buf.as_slice()));
// forward the payload on zenoh
let ke = &fwd_ros_discovery_key_declared / ke_for_sure!(&gid);
if let Err(e) = self.zsession.put(ke, buf).res().await {
if let Err(e) = self.zsession.put(ke, buf.as_slice()).res_sync() {
error!("Forward ROS discovery info failed: {}", e);
}
}
Expand Down
Loading

0 comments on commit eefc16f

Please sign in to comment.