Skip to content

Commit

Permalink
refactor: extract the caching logic into dedicated types
Browse files Browse the repository at this point in the history
  • Loading branch information
wyfo committed Dec 11, 2024
1 parent 867a552 commit 26eac3e
Show file tree
Hide file tree
Showing 3 changed files with 157 additions and 118 deletions.
95 changes: 44 additions & 51 deletions zenoh/src/api/builders/publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,7 @@
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//
use std::{
future::{IntoFuture, Ready},
sync::atomic::AtomicU64,
};
use std::future::{IntoFuture, Ready};

use itertools::Itertools;
use zenoh_config::qos::PublisherQoSConfig;
Expand All @@ -34,8 +31,8 @@ use crate::{
bytes::{OptionZBytes, ZBytes},
encoding::Encoding,
key_expr::KeyExpr,
publisher::{Priority, Publisher},
sample::{Locality, SampleKind},
publisher::{Priority, Publisher, PublisherCache, PublisherCacheValue},
sample::Locality,
},
Session,
};
Expand Down Expand Up @@ -212,12 +209,10 @@ impl Wait for PublicationBuilder<PublisherBuilder<'_, '_>, PublicationBuilderPut
#[inline]
fn wait(mut self) -> <Self as Resolvable>::To {
self.publisher = self.publisher.apply_qos_overwrites();
self.publisher.session.0.resolve_put(
None,
self.publisher.session.0.resolve_push(
&mut PublisherCacheValue::default(),
&self.publisher.key_expr?,
self.kind.payload,
SampleKind::Put,
self.kind.encoding,
Some(self.kind),
self.publisher.congestion_control,
self.publisher.priority,
self.publisher.is_express,
Expand All @@ -236,12 +231,10 @@ impl Wait for PublicationBuilder<PublisherBuilder<'_, '_>, PublicationBuilderDel
#[inline]
fn wait(mut self) -> <Self as Resolvable>::To {
self.publisher = self.publisher.apply_qos_overwrites();
self.publisher.session.0.resolve_put(
None,
self.publisher.session.0.resolve_push(
&mut PublisherCacheValue::default(),
&self.publisher.key_expr?,
ZBytes::new(),
SampleKind::Delete,
Encoding::ZENOH_BYTES,
None,
self.publisher.congestion_control,
self.publisher.priority,
self.publisher.is_express,
Expand Down Expand Up @@ -473,7 +466,7 @@ impl Wait for PublisherBuilder<'_, '_> {
.declare_publisher_inner(key_expr.clone(), self.destination)?;
Ok(Publisher {
session: self.session.downgrade(),
cache: AtomicU64::new(0),
cache: PublisherCache::default(),
id,
key_expr,
encoding: self.encoding,
Expand Down Expand Up @@ -501,45 +494,45 @@ impl IntoFuture for PublisherBuilder<'_, '_> {

impl Wait for PublicationBuilder<&Publisher<'_>, PublicationBuilderPut> {
fn wait(self) -> <Self as Resolvable>::To {
self.publisher.session.resolve_put(
Some(&self.publisher.cache),
&self.publisher.key_expr,
self.kind.payload,
SampleKind::Put,
self.kind.encoding,
self.publisher.congestion_control,
self.publisher.priority,
self.publisher.is_express,
self.publisher.destination,
#[cfg(feature = "unstable")]
self.publisher.reliability,
self.timestamp,
#[cfg(feature = "unstable")]
self.source_info,
self.attachment,
)
self.publisher.cache.with_cache(|cached| {
self.publisher.session.resolve_push(
cached,
&self.publisher.key_expr,
Some(self.kind),
self.publisher.congestion_control,
self.publisher.priority,
self.publisher.is_express,
self.publisher.destination,
#[cfg(feature = "unstable")]
self.publisher.reliability,
self.timestamp,
#[cfg(feature = "unstable")]
self.source_info,
self.attachment,
)
})
}
}

impl Wait for PublicationBuilder<&Publisher<'_>, PublicationBuilderDelete> {
fn wait(self) -> <Self as Resolvable>::To {
self.publisher.session.resolve_put(
Some(&self.publisher.cache),
&self.publisher.key_expr,
ZBytes::new(),
SampleKind::Delete,
Encoding::ZENOH_BYTES,
self.publisher.congestion_control,
self.publisher.priority,
self.publisher.is_express,
self.publisher.destination,
#[cfg(feature = "unstable")]
self.publisher.reliability,
self.timestamp,
#[cfg(feature = "unstable")]
self.source_info,
self.attachment,
)
self.publisher.cache.with_cache(|cached| {
self.publisher.session.resolve_push(
cached,
&self.publisher.key_expr,
None,
self.publisher.congestion_control,
self.publisher.priority,
self.publisher.is_express,
self.publisher.destination,
#[cfg(feature = "unstable")]
self.publisher.reliability,
self.timestamp,
#[cfg(feature = "unstable")]
self.source_info,
self.attachment,
)
})
}
}

Expand Down
120 changes: 91 additions & 29 deletions zenoh/src/api/publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use std::{
fmt,
future::{IntoFuture, Ready},
pin::Pin,
sync::atomic::AtomicU64,
sync::atomic::{AtomicU64, Ordering},
task::{Context, Poll},
};

Expand All @@ -41,17 +41,20 @@ use {
zenoh_protocol::core::Reliability,
};

use crate::api::{
builders::publisher::{
PublicationBuilder, PublicationBuilderDelete, PublicationBuilderPut,
PublisherDeleteBuilder, PublisherPutBuilder,
use crate::{
api::{
builders::publisher::{
PublicationBuilder, PublicationBuilderDelete, PublicationBuilderPut,
PublisherDeleteBuilder, PublisherPutBuilder,
},
bytes::ZBytes,
encoding::Encoding,
key_expr::KeyExpr,
sample::{Locality, Sample, SampleFields},
session::{UndeclarableSealed, WeakSession},
Id,
},
bytes::ZBytes,
encoding::Encoding,
key_expr::KeyExpr,
sample::{Locality, Sample, SampleFields},
session::{UndeclarableSealed, WeakSession},
Id,
sample::SampleKind,
};

pub(crate) struct PublisherState {
Expand All @@ -70,6 +73,74 @@ impl fmt::Debug for PublisherState {
}
}

#[derive(Default)]
pub(crate) struct PublisherCache(AtomicU64);

impl PublisherCache {
pub(crate) fn with_cache<R>(&self, f: impl FnOnce(&mut PublisherCacheValue) -> R) -> R {
let cached = self.0.load(Ordering::Relaxed);
let mut to_cache = PublisherCacheValue(cached);
let res = f(&mut to_cache);
if to_cache.0 != cached {
let _ = self.0.compare_exchange_weak(
cached,
to_cache.0,
Ordering::Relaxed,
Ordering::Relaxed,
);
}
res
}
}

impl fmt::Debug for PublisherCache {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_tuple("PublisherCache")
.field(&PublisherCacheValue(self.0.load(Ordering::Relaxed)))
.finish()
}
}
#[derive(Default, PartialEq, Eq)]
pub(crate) struct PublisherCacheValue(u64);

impl PublisherCacheValue {
const VERSION_SHIFT: usize = 2;
const NO_REMOTE: u64 = 0b01;
const NO_LOCAL: u64 = 0b10;

pub(crate) fn match_subscription_version(&mut self, version: u64) {
if self.0 >> Self::VERSION_SHIFT != version {
self.0 = version << Self::VERSION_SHIFT;
}
}

pub(crate) fn has_remote_sub(&self) -> bool {
self.0 & Self::NO_REMOTE == 0
}

pub(crate) fn set_no_remote_sub(&mut self) {
self.0 |= Self::NO_REMOTE;
}

pub(crate) fn has_local_sub(&self) -> bool {
self.0 & Self::NO_LOCAL == 0
}

pub(crate) fn set_no_local_sub(&mut self) {
self.0 |= Self::NO_LOCAL;
}
}

impl fmt::Debug for PublisherCacheValue {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("PublisherCacheValue")
.field("subscription_version", &(self.0 >> Self::VERSION_SHIFT))
.field("has_remote_sub", &self.has_remote_sub())
.field("has_local_sub", &self.has_local_sub())
.finish()
}
}

/// A publisher that allows to send data through a stream.
///
/// Publishers are automatically undeclared when dropped.
Expand Down Expand Up @@ -102,7 +173,7 @@ impl fmt::Debug for PublisherState {
#[derive(Debug)]
pub struct Publisher<'a> {
pub(crate) session: WeakSession,
pub(crate) cache: AtomicU64,
pub(crate) cache: PublisherCache,
pub(crate) id: Id,
pub(crate) key_expr: KeyExpr<'a>,
pub(crate) encoding: Encoding,
Expand Down Expand Up @@ -392,23 +463,14 @@ impl Sink<Sample> for Publisher<'_> {
attachment,
..
} = item.into();
self.session.resolve_put(
Some(&self.cache),
&self.key_expr,
payload,
kind,
encoding,
self.congestion_control,
self.priority,
self.is_express,
self.destination,
#[cfg(feature = "unstable")]
self.reliability,
None,
#[cfg(feature = "unstable")]
SourceInfo::empty(),
attachment,
)
match kind {
SampleKind::Put => self
.put(payload)
.encoding(encoding)
.attachment(attachment)
.wait(),
SampleKind::Delete => self.delete().attachment(attachment).wait(),
}
}

#[inline]
Expand Down
Loading

0 comments on commit 26eac3e

Please sign in to comment.