From 15273a7e42bfa7d67875a0a90c7c7906e5599d13 Mon Sep 17 00:00:00 2001 From: Oussama Teffahi Date: Tue, 3 Dec 2024 11:08:19 +0100 Subject: [PATCH] Apply QoS overwrites at resolution of PublisherBuilder and PublicationBuilder --- zenoh/src/api/builders/publisher.rs | 94 ++++++++--------------------- zenoh/src/api/session.rs | 12 +++- 2 files changed, 37 insertions(+), 69 deletions(-) diff --git a/zenoh/src/api/builders/publisher.rs b/zenoh/src/api/builders/publisher.rs index edc4c476a..163c734a2 100644 --- a/zenoh/src/api/builders/publisher.rs +++ b/zenoh/src/api/builders/publisher.rs @@ -207,7 +207,8 @@ impl Resolvable for PublicationBuilder { impl Wait for PublicationBuilder, PublicationBuilderPut> { #[inline] - fn wait(self) -> ::To { + fn wait(mut self) -> ::To { + self.publisher = self.publisher.apply_qos_overwrites(); self.publisher.session.0.resolve_put( &self.publisher.key_expr?, self.kind.payload, @@ -229,7 +230,8 @@ impl Wait for PublicationBuilder, PublicationBuilderPut impl Wait for PublicationBuilder, PublicationBuilderDelete> { #[inline] - fn wait(self) -> ::To { + fn wait(mut self) -> ::To { + self.publisher = self.publisher.apply_qos_overwrites(); self.publisher.session.0.resolve_put( &self.publisher.key_expr?, ZBytes::new(), @@ -267,19 +269,6 @@ impl IntoFuture for PublicationBuilder, PublicationBuil } } -/// Marks which [`PublisherBuilder`] QoS configurations were overwritten by Zenoh config. -/// Associated [`PublisherBuilder`] methods calls will not modify overwritten configurations -#[derive(Clone, Copy, Debug)] -pub(crate) struct PublicationOverwrittenQoS { - pub(crate) congestion_control: bool, - pub(crate) priority: bool, - pub(crate) express: bool, - #[cfg(feature = "unstable")] - pub(crate) reliability: bool, - #[cfg(feature = "unstable")] - pub(crate) destination: bool, -} - /// A builder for initializing a [`Publisher`]. /// /// # Examples @@ -308,7 +297,6 @@ pub struct PublisherBuilder<'a, 'b> { #[cfg(feature = "unstable")] pub(crate) reliability: Reliability, pub(crate) destination: Locality, - pub(crate) qos_overwrites: PublicationOverwrittenQoS, } impl Clone for PublisherBuilder<'_, '_> { @@ -326,7 +314,6 @@ impl Clone for PublisherBuilder<'_, '_> { #[cfg(feature = "unstable")] reliability: self.reliability, destination: self.destination, - qos_overwrites: self.qos_overwrites, } } } @@ -336,22 +323,16 @@ impl QoSBuilderTrait for PublisherBuilder<'_, '_> { /// Changes the [`crate::qos::CongestionControl`] to apply when routing the data. #[inline] fn congestion_control(self, congestion_control: CongestionControl) -> Self { - match self.qos_overwrites.congestion_control { - true => self, - false => Self { - congestion_control, - ..self - }, + Self { + congestion_control, + ..self } } /// Changes the [`crate::qos::Priority`] of the written data. #[inline] fn priority(self, priority: Priority) -> Self { - match self.qos_overwrites.priority { - true => self, - false => Self { priority, ..self }, - } + Self { priority, ..self } } /// Changes the Express policy to apply when routing the data. @@ -360,24 +341,18 @@ impl QoSBuilderTrait for PublisherBuilder<'_, '_> { /// This usually has a positive impact on latency but negative impact on throughput. #[inline] fn express(self, is_express: bool) -> Self { - match self.qos_overwrites.express { - true => self, - false => Self { is_express, ..self }, - } + Self { is_express, ..self } } } impl<'a, 'b> PublisherBuilder<'a, 'b> { - pub fn new(session: &'a Session, key_expr: TryIntoKeyExpr) -> Self - where - TryIntoKeyExpr: TryInto>, - >>::Error: Into, - { - let maybe_key_expr = key_expr.try_into().map_err(Into::into); + /// Looks up if any configured QoS overwrites apply on the builder's key expression. + /// Returns a new builder with the overwritten QoS parameters. + pub(crate) fn apply_qos_overwrites(self) -> Self { let mut qos_overwrites = PublisherQoSConfig::default(); - if let Ok(key_expr) = &maybe_key_expr { + if let Ok(key_expr) = &self.key_expr { // get overwritten builder - let state = zread!(session.0.state); + let state = zread!(self.session.0.state); let nodes_including = state .publisher_qos_tree .nodes_including(key_expr) @@ -400,39 +375,26 @@ impl<'a, 'b> PublisherBuilder<'a, 'b> { } Self { - session, - key_expr: maybe_key_expr, - encoding: Encoding::default(), congestion_control: qos_overwrites .congestion_control .map(|cc| cc.into()) - .unwrap_or(CongestionControl::DEFAULT), + .unwrap_or(self.congestion_control), priority: qos_overwrites .priority .map(|p| p.into()) - .unwrap_or(Priority::DEFAULT), - is_express: qos_overwrites.express.unwrap_or(false), + .unwrap_or(self.priority), + is_express: qos_overwrites.express.unwrap_or(self.is_express), #[cfg(feature = "unstable")] reliability: qos_overwrites .reliability .map(|r| r.into()) - .unwrap_or(Reliability::DEFAULT), + .unwrap_or(self.reliability), #[cfg(feature = "unstable")] destination: qos_overwrites .allowed_destination .map(|d| d.into()) - .unwrap_or(Locality::default()), - #[cfg(not(feature = "unstable"))] - destination: Locality::default(), - qos_overwrites: PublicationOverwrittenQoS { - congestion_control: qos_overwrites.congestion_control.is_some(), - priority: qos_overwrites.priority.is_some(), - express: qos_overwrites.express.is_some(), - #[cfg(feature = "unstable")] - reliability: qos_overwrites.reliability.is_some(), - #[cfg(feature = "unstable")] - destination: qos_overwrites.allowed_destination.is_some(), - }, + .unwrap_or(self.destination), + ..self } } @@ -443,9 +405,7 @@ impl<'a, 'b> PublisherBuilder<'a, 'b> { #[zenoh_macros::unstable] #[inline] pub fn allowed_destination(mut self, destination: Locality) -> Self { - if !self.qos_overwrites.destination { - self.destination = destination; - } + self.destination = destination; self } @@ -457,12 +417,9 @@ impl<'a, 'b> PublisherBuilder<'a, 'b> { #[zenoh_macros::unstable] #[inline] pub fn reliability(self, reliability: Reliability) -> Self { - match self.qos_overwrites.reliability { - true => self, - false => Self { - reliability, - ..self - }, + Self { + reliability, + ..self } } } @@ -472,7 +429,8 @@ impl<'b> Resolvable for PublisherBuilder<'_, 'b> { } impl Wait for PublisherBuilder<'_, '_> { - fn wait(self) -> ::To { + fn wait(mut self) -> ::To { + self = self.apply_qos_overwrites(); let mut key_expr = self.key_expr?; if !key_expr.is_fully_optimized(&self.session.0) { let session_id = self.session.0.id; diff --git a/zenoh/src/api/session.rs b/zenoh/src/api/session.rs index 46b56da81..09dc267c2 100644 --- a/zenoh/src/api/session.rs +++ b/zenoh/src/api/session.rs @@ -830,7 +830,17 @@ impl Session { TryIntoKeyExpr: TryInto>, >>::Error: Into, { - PublisherBuilder::new(self, key_expr) + PublisherBuilder { + session: self, + key_expr: key_expr.try_into().map_err(Into::into), + encoding: Encoding::default(), + congestion_control: CongestionControl::DEFAULT, + priority: Priority::DEFAULT, + is_express: false, + #[cfg(feature = "unstable")] + reliability: Reliability::DEFAULT, + destination: Locality::default(), + } } /// Obtain a [`Liveliness`] struct tied to this Zenoh [`Session`].