Skip to content

Commit

Permalink
Add CacheConfig replies_qos option
Browse files Browse the repository at this point in the history
  • Loading branch information
OlivierHecart committed Dec 5, 2024
1 parent dbd7d22 commit 7537985
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 7 deletions.
78 changes: 72 additions & 6 deletions zenoh-ext/src/advanced_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,16 @@ use futures::{select, FutureExt};
use tokio::task;
use zenoh::{
handlers::FifoChannelHandler,
internal::bail,
internal::{bail, traits::QoSBuilderTrait},
key_expr::{
format::{ke, kedefine},
keyexpr, KeyExpr, OwnedKeyExpr,
},
liveliness::LivelinessToken,
pubsub::Subscriber,
qos::{CongestionControl, Priority},
query::{Query, Queryable, ZenohParameters},
sample::{Locality, Sample},
sample::{Locality, Sample, SampleBuilder},
Resolvable, Result as ZResult, Session, Wait, KE_ADV_PREFIX, KE_AT, KE_STARSTAR,
};

Expand All @@ -40,18 +41,58 @@ kedefine!(
pub(crate) ke_liveliness: "@adv/${zid:*}/${eid:*}/${meta:**}/@/${remaining:**}",
);

#[derive(Clone, Debug)]
pub struct QoS {
priority: Priority,
congestion_control: CongestionControl,
is_express: bool,
}

impl Default for QoS {
fn default() -> Self {
Self {
priority: Priority::Data,
congestion_control: CongestionControl::Block,
is_express: false,
}
}
}

#[zenoh_macros::internal_trait]
impl QoSBuilderTrait for QoS {
#[allow(unused_mut)]
fn congestion_control(mut self, congestion_control: CongestionControl) -> Self {
self.congestion_control = congestion_control;
self
}

#[allow(unused_mut)]
fn priority(mut self, priority: Priority) -> Self {
self.priority = priority;
self
}

#[allow(unused_mut)]
fn express(mut self, is_express: bool) -> Self {
self.is_express = is_express;
self
}
}

#[derive(Debug, Clone)]
/// Configure an [`AdvancedPublisher`](crate::AdvancedPublisher) cache.
pub struct CacheConfig {
sample_depth: usize,
resources_limit: Option<usize>,
replies_qos: QoS,
}

impl Default for CacheConfig {
fn default() -> Self {
Self {
sample_depth: 1,
resources_limit: None,
replies_qos: QoS::default(),
}
}
}
Expand All @@ -70,6 +111,12 @@ impl CacheConfig {
self.resources_limit = Some(limit);
self
}

/// The QoS to apply to replies.
pub fn replies_qos(mut self, qos: QoS) -> Self {
self.replies_qos = qos;
self
}
}

/// The builder of AdvancedCache, allowing to configure it.
Expand Down Expand Up @@ -219,6 +266,7 @@ impl AdvancedCache {
start: Option<u32>,
end: Option<u32>,
max: Option<u32>,
qos: &QoS,
) {
if let Some(max) = max {
let mut samples = VecDeque::new();
Expand All @@ -236,7 +284,16 @@ impl AdvancedCache {
}
}
for sample in samples.drain(..).rev() {
if let Err(e) = query.reply_sample(sample.clone()).await {
if let Err(e) = query
.reply_sample(
SampleBuilder::from(sample.clone())
.congestion_control(qos.congestion_control)
.priority(qos.priority)
.express(qos.is_express)
.into(),
)
.await
{
tracing::warn!("Error replying to query: {}", e);
}
}
Expand All @@ -250,7 +307,16 @@ impl AdvancedCache {
continue;
}
}
if let Err(e) = query.reply_sample(sample.clone()).await {
if let Err(e) = query
.reply_sample(
SampleBuilder::from(sample.clone())
.congestion_control(qos.congestion_control)
.priority(qos.priority)
.express(qos.is_express)
.into(),
)
.await
{
tracing::warn!("Error replying to query: {}", e);
}
}
Expand Down Expand Up @@ -296,12 +362,12 @@ impl AdvancedCache {
let max = query.parameters().get("_max").and_then(|s| s.parse::<u32>().ok());
if !query.selector().key_expr().as_str().contains('*') {
if let Some(queue) = cache.get(query.selector().key_expr().as_keyexpr()) {
process_queue(queue, &query, start, end, max).await;
process_queue(queue, &query, start, end, max, &history.replies_qos).await;
}
} else {
for (key_expr, queue) in cache.iter() {
if query.selector().key_expr().intersects(key_expr.borrow()) {
process_queue(queue, &query, start, end, max).await;
process_queue(queue, &query, start, end, max, &history.replies_qos).await;
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion zenoh-ext/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ pub use crate::serialization::{
#[cfg(feature = "unstable")]
#[allow(deprecated)]
pub use crate::{
advanced_cache::CacheConfig,
advanced_cache::{CacheConfig, QoS},
advanced_publisher::{AdvancedPublisher, AdvancedPublisherBuilder, Sequencing},
advanced_subscriber::{
AdvancedSubscriber, AdvancedSubscriberBuilder, HistoryConfig, RecoveryConfig,
Expand Down

0 comments on commit 7537985

Please sign in to comment.