Skip to content

Commit

Permalink
Apply QoS overwrites at resolution of PublisherBuilder and Publicatio…
Browse files Browse the repository at this point in the history
…nBuilder
  • Loading branch information
oteffahi committed Dec 3, 2024
1 parent 02fcd2e commit 15273a7
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 69 deletions.
94 changes: 26 additions & 68 deletions zenoh/src/api/builders/publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,8 @@ impl<P, T> Resolvable for PublicationBuilder<P, T> {

impl Wait for PublicationBuilder<PublisherBuilder<'_, '_>, PublicationBuilderPut> {
#[inline]
fn wait(self) -> <Self as Resolvable>::To {
fn wait(mut self) -> <Self as Resolvable>::To {
self.publisher = self.publisher.apply_qos_overwrites();
self.publisher.session.0.resolve_put(
&self.publisher.key_expr?,
self.kind.payload,
Expand All @@ -229,7 +230,8 @@ impl Wait for PublicationBuilder<PublisherBuilder<'_, '_>, PublicationBuilderPut

impl Wait for PublicationBuilder<PublisherBuilder<'_, '_>, PublicationBuilderDelete> {
#[inline]
fn wait(self) -> <Self as Resolvable>::To {
fn wait(mut self) -> <Self as Resolvable>::To {
self.publisher = self.publisher.apply_qos_overwrites();
self.publisher.session.0.resolve_put(
&self.publisher.key_expr?,
ZBytes::new(),
Expand Down Expand Up @@ -267,19 +269,6 @@ impl IntoFuture for PublicationBuilder<PublisherBuilder<'_, '_>, PublicationBuil
}
}

/// Marks which [`PublisherBuilder`] QoS configurations were overwritten by Zenoh config.
/// Associated [`PublisherBuilder`] methods calls will not modify overwritten configurations
#[derive(Clone, Copy, Debug)]
pub(crate) struct PublicationOverwrittenQoS {
pub(crate) congestion_control: bool,
pub(crate) priority: bool,
pub(crate) express: bool,
#[cfg(feature = "unstable")]
pub(crate) reliability: bool,
#[cfg(feature = "unstable")]
pub(crate) destination: bool,
}

/// A builder for initializing a [`Publisher`].
///
/// # Examples
Expand Down Expand Up @@ -308,7 +297,6 @@ pub struct PublisherBuilder<'a, 'b> {
#[cfg(feature = "unstable")]
pub(crate) reliability: Reliability,
pub(crate) destination: Locality,
pub(crate) qos_overwrites: PublicationOverwrittenQoS,
}

impl Clone for PublisherBuilder<'_, '_> {
Expand All @@ -326,7 +314,6 @@ impl Clone for PublisherBuilder<'_, '_> {
#[cfg(feature = "unstable")]
reliability: self.reliability,
destination: self.destination,
qos_overwrites: self.qos_overwrites,
}
}
}
Expand All @@ -336,22 +323,16 @@ impl QoSBuilderTrait for PublisherBuilder<'_, '_> {
/// Changes the [`crate::qos::CongestionControl`] to apply when routing the data.
#[inline]
fn congestion_control(self, congestion_control: CongestionControl) -> Self {
match self.qos_overwrites.congestion_control {
true => self,
false => Self {
congestion_control,
..self
},
Self {
congestion_control,
..self
}
}

/// Changes the [`crate::qos::Priority`] of the written data.
#[inline]
fn priority(self, priority: Priority) -> Self {
match self.qos_overwrites.priority {
true => self,
false => Self { priority, ..self },
}
Self { priority, ..self }
}

/// Changes the Express policy to apply when routing the data.
Expand All @@ -360,24 +341,18 @@ impl QoSBuilderTrait for PublisherBuilder<'_, '_> {
/// This usually has a positive impact on latency but negative impact on throughput.
#[inline]
fn express(self, is_express: bool) -> Self {
match self.qos_overwrites.express {
true => self,
false => Self { is_express, ..self },
}
Self { is_express, ..self }
}
}

impl<'a, 'b> PublisherBuilder<'a, 'b> {
pub fn new<TryIntoKeyExpr>(session: &'a Session, key_expr: TryIntoKeyExpr) -> Self
where
TryIntoKeyExpr: TryInto<KeyExpr<'b>>,
<TryIntoKeyExpr as TryInto<KeyExpr<'b>>>::Error: Into<zenoh_result::Error>,
{
let maybe_key_expr = key_expr.try_into().map_err(Into::into);
/// Looks up if any configured QoS overwrites apply on the builder's key expression.
/// Returns a new builder with the overwritten QoS parameters.
pub(crate) fn apply_qos_overwrites(self) -> Self {
let mut qos_overwrites = PublisherQoSConfig::default();
if let Ok(key_expr) = &maybe_key_expr {
if let Ok(key_expr) = &self.key_expr {
// get overwritten builder
let state = zread!(session.0.state);
let state = zread!(self.session.0.state);
let nodes_including = state
.publisher_qos_tree
.nodes_including(key_expr)
Expand All @@ -400,39 +375,26 @@ impl<'a, 'b> PublisherBuilder<'a, 'b> {
}

Self {
session,
key_expr: maybe_key_expr,
encoding: Encoding::default(),
congestion_control: qos_overwrites
.congestion_control
.map(|cc| cc.into())
.unwrap_or(CongestionControl::DEFAULT),
.unwrap_or(self.congestion_control),
priority: qos_overwrites
.priority
.map(|p| p.into())
.unwrap_or(Priority::DEFAULT),
is_express: qos_overwrites.express.unwrap_or(false),
.unwrap_or(self.priority),
is_express: qos_overwrites.express.unwrap_or(self.is_express),
#[cfg(feature = "unstable")]
reliability: qos_overwrites
.reliability
.map(|r| r.into())
.unwrap_or(Reliability::DEFAULT),
.unwrap_or(self.reliability),
#[cfg(feature = "unstable")]
destination: qos_overwrites
.allowed_destination
.map(|d| d.into())
.unwrap_or(Locality::default()),
#[cfg(not(feature = "unstable"))]
destination: Locality::default(),
qos_overwrites: PublicationOverwrittenQoS {
congestion_control: qos_overwrites.congestion_control.is_some(),
priority: qos_overwrites.priority.is_some(),
express: qos_overwrites.express.is_some(),
#[cfg(feature = "unstable")]
reliability: qos_overwrites.reliability.is_some(),
#[cfg(feature = "unstable")]
destination: qos_overwrites.allowed_destination.is_some(),
},
.unwrap_or(self.destination),
..self
}
}

Expand All @@ -443,9 +405,7 @@ impl<'a, 'b> PublisherBuilder<'a, 'b> {
#[zenoh_macros::unstable]
#[inline]
pub fn allowed_destination(mut self, destination: Locality) -> Self {
if !self.qos_overwrites.destination {
self.destination = destination;
}
self.destination = destination;
self
}

Expand All @@ -457,12 +417,9 @@ impl<'a, 'b> PublisherBuilder<'a, 'b> {
#[zenoh_macros::unstable]
#[inline]
pub fn reliability(self, reliability: Reliability) -> Self {
match self.qos_overwrites.reliability {
true => self,
false => Self {
reliability,
..self
},
Self {
reliability,
..self
}
}
}
Expand All @@ -472,7 +429,8 @@ impl<'b> Resolvable for PublisherBuilder<'_, 'b> {
}

impl Wait for PublisherBuilder<'_, '_> {
fn wait(self) -> <Self as Resolvable>::To {
fn wait(mut self) -> <Self as Resolvable>::To {
self = self.apply_qos_overwrites();
let mut key_expr = self.key_expr?;
if !key_expr.is_fully_optimized(&self.session.0) {
let session_id = self.session.0.id;
Expand Down
12 changes: 11 additions & 1 deletion zenoh/src/api/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -830,7 +830,17 @@ impl Session {
TryIntoKeyExpr: TryInto<KeyExpr<'b>>,
<TryIntoKeyExpr as TryInto<KeyExpr<'b>>>::Error: Into<zenoh_result::Error>,
{
PublisherBuilder::new(self, key_expr)
PublisherBuilder {
session: self,
key_expr: key_expr.try_into().map_err(Into::into),
encoding: Encoding::default(),
congestion_control: CongestionControl::DEFAULT,
priority: Priority::DEFAULT,
is_express: false,
#[cfg(feature = "unstable")]
reliability: Reliability::DEFAULT,
destination: Locality::default(),
}
}

/// Obtain a [`Liveliness`] struct tied to this Zenoh [`Session`].
Expand Down

0 comments on commit 15273a7

Please sign in to comment.