Skip to content

Commit

Permalink
Add Advanced Pub/Sub feature (#1582)
Browse files Browse the repository at this point in the history
* Expose and use ke macro

* Fix SourceInfo publication

* Add AdvancedPublisher AdvancedSubscriber and AdvancedSubscriber

* Fix doctests

* Fix doc warnings

* Remove debug trace

* Add history test

* Fix periodic queries

* Remove debug trace

* Lower test debug level

* Add retransmission tests

* Liveliness sub callback shoud increase pending queries counter

* Liveliness sub callback shoud spawn periodic queries when enbaled

* Add late_joiner test

* Only treat pending samples when there are no more pending queries

* Apply proper sequencing for history

* Improve AdvancedSubscriber

* Code reorg

* Code reorg

* Fix deduplication

* Subscribe to liveliness tokens with history

* Update builders

* Add examples

* Fix rustdoc

* Move stuff in State

* Code reorg

* Add smaple_miss_callback

* Add sample miss test

* Update z_advanced_sub example

* Explicit use in examples

* Update API

* Fix rustdoc

* Allow sample miss detection when recovery disabled

* Add miss_sample_callback to DataSubscriberBuilderExt

* Add sample_miss_detection to PublisherBuilderExt

* Add test_advanced_sample_miss test

* Deliver sample even when no miss callback

* Replace sample_miss_callback with sample_miss_listener

* Fix clippy warnings

* Fix tests

* Add HistoryConf max_samples option

* Add HistoryConf max_age option

* Use BTreeMap

* Add meta_keyexpr option

* Add late_joiner_detection and meta_keyexpr options on Subcriber side

* Renaming

* Fix compilation issues

* Remove AdvancedCache from public API

* Update Session admin to match AdvancedSub

* Gather constants

* Fix doc build

* Renaming

* Mark PublicationCache and QueryingSubscriber as deprecated and remove related examples

* Remove z_pub_cache and z_query_sub entries from zenoh-ext examples README

* Add z_advanced_pub and z_advanced_sub to zenoh-ext examples Cargo.toml

* Add CacheConfig replies_qos option

* Call cache directly from publisher

* Update doc

* Add missing unstable tags

* Add missing unstable tags

* Add missing unstable tags

* Add unstable tag everywhere

* Add missing AdvancedSubscriber methods

* Fix WeakSession::Session internal function

* Expose missing SampleMissListener and related structs

* Add AdvancedPublisherBuilderExt::advanced function

* Add missing AdvancedPublisherBuilder functions

* Fix doctests

* Expose Miss struct

* impl QoSBuilderTrait for AdvancedPublisherBuilder

* Propagate PublisherBuilder values to AdvancedPublisherBuilder

* Rename AdvancedSubscriber::close()

* Add unstable tags

* Add AdvancedSubscriber::detect_publishers function

* Remove debug println

* Renaming

* Add unstable tags

* Use std Range

* Spawn Timer in a tokio runtime

* Fix panic when last_delivered is None

* Release lock before calling get

* Update key mapping

* Improve doc

* fix: fix callback API (#1647)

* Update doc

* Fix ke_liveliness

* Fix doc

* Fix doc

---------

Co-authored-by: Joseph Perez <[email protected]>
  • Loading branch information
OlivierHecart and wyfo authored Dec 11, 2024
1 parent fb2d2bc commit cb3fa54
Show file tree
Hide file tree
Showing 29 changed files with 3,695 additions and 167 deletions.
23 changes: 23 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ quote = "1.0.37"
rand = { version = "0.8.5", default-features = false } # Default features are disabled due to usage in no_std crates
rand_chacha = "0.3.1"
rcgen = "0.13.1"
ref-cast = "1.0.23"
regex = "1.10.6"
ron = "0.8.1"
ringbuffer-spsc = "0.1.9"
Expand Down
10 changes: 10 additions & 0 deletions commons/zenoh-config/src/wrappers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,16 @@ pub struct EntityGlobalId(EntityGlobalIdProto);
pub type EntityId = u32;

impl EntityGlobalId {
/// Creates a new EntityGlobalId.
#[zenoh_macros::internal]
pub fn new(zid: ZenohId, eid: EntityId) -> Self {
EntityGlobalIdProto {
zid: zid.into(),
eid,
}
.into()
}

/// Returns the [`ZenohId`], i.e. the Zenoh session, this ID is associated to.
pub fn zid(&self) -> ZenohId {
self.0.zid.into()
Expand Down
2 changes: 1 addition & 1 deletion commons/zenoh-macros/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -515,7 +515,7 @@ pub fn ke(tokens: TokenStream) -> TokenStream {
let value: LitStr = syn::parse(tokens).unwrap();
let ke = value.value();
match zenoh_keyexpr::keyexpr::new(&ke) {
Ok(_) => quote!(unsafe {::zenoh::key_expr::keyexpr::from_str_unchecked(#ke)}).into(),
Ok(_) => quote!(unsafe { zenoh::key_expr::keyexpr::from_str_unchecked(#ke)}).into(),
Err(e) => panic!("{}", e),
}
}
Expand Down
4 changes: 3 additions & 1 deletion zenoh-ext/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,16 @@ tokio = { workspace = true, features = [
"macros",
"io-std",
] }
async-trait = { workspace = true }
bincode = { workspace = true }
zenoh-util = { workspace = true }
flume = { workspace = true }
futures = { workspace = true }
tracing = { workspace = true }
serde = { workspace = true, features = ["default"] }
leb128 = { workspace = true }
zenoh = { workspace = true, default-features = false }
uhlc = { workspace = true }
zenoh = { workspace = true, features = ["default"] }
zenoh-macros = { workspace = true }

[dev-dependencies]
Expand Down
8 changes: 4 additions & 4 deletions zenoh-ext/examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,12 @@ zenoh-ext = { workspace = true, features = ["unstable"] }
zenoh-config = { workspace = true }

[[example]]
name = "z_query_sub"
path = "examples/z_query_sub.rs"
name = "z_advanced_pub"
path = "examples/z_advanced_pub.rs"

[[example]]
name = "z_pub_cache"
path = "examples/z_pub_cache.rs"
name = "z_advanced_sub"
path = "examples/z_advanced_sub.rs"

[[example]]
name = "z_member"
Expand Down
22 changes: 12 additions & 10 deletions zenoh-ext/examples/examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,31 +15,33 @@

## Examples description

### z_pub_cache
### z_advanced_pub

Declares a publisher and an associated publication cache with a given key expression.
All the publications are locally cached (with a configurable history size - i.e. max number of cached data per resource). The cache can be queried by a QueryingSubscriber at startup (see next example).
Declares an AdvancedPublisher with a given key expression.
All the publications are locally cached (with a configurable history size - i.e. max number of cached data per resource, default 1). The cache can be queried by an AdvancedSubscriber for hsitory
or retransmission.

Typical usage:
```bash
z_pub_cache
z_advanced_pub
```
or
```bash
z_pub_cache --history 10
z_advanced_pub --history 10
```

### z_query_sub
### z_advanced_sub

Declares a querying subscriber with a selector.
At startup, the subscriber issuez a query (by default on the same selector than the subscription) and merge/sort/de-duplicate the query results with the publications received in parallel.
Declares an AdvancedSubscriber with a given key expression.
The AdvancedSubscriber can query for AdvancedPublisher history at startup
and on late joiner publisher detection. The AdvancedSubscriber can detect
sample loss and ask for retransmission.

Typical usage:
```bash
z_query_sub
z_advanced_sub
```


### z_member

Group Management example: join a group and display the received group events (Join, Leave, LeaseExpired), as well as an updated group view.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::time::Duration;

//
// Copyright (c) 2023 ZettaScale Technology
//
Expand All @@ -11,40 +13,37 @@
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//
use std::time::Duration;

use clap::{arg, Parser};
use zenoh::{config::Config, key_expr::KeyExpr};
use zenoh_config::ModeDependentValue;
use zenoh_ext::*;
use zenoh_ext::{AdvancedPublisherBuilderExt, CacheConfig};
use zenoh_ext_examples::CommonArgs;

#[tokio::main]
async fn main() {
// Initiate logging
zenoh::init_log_from_env_or("error");

let (config, key_expr, value, history, prefix, complete) = parse_args();
let (config, key_expr, value, history) = parse_args();

println!("Opening session...");
let session = zenoh::open(config).await.unwrap();

println!("Declaring PublicationCache on {}", &key_expr);
let mut publication_cache_builder = session
.declare_publication_cache(&key_expr)
.history(history)
.queryable_complete(complete);
if let Some(prefix) = prefix {
publication_cache_builder = publication_cache_builder.queryable_prefix(prefix);
}
let _publication_cache = publication_cache_builder.await.unwrap();
println!("Declaring AdvancedPublisher on {}", &key_expr);
let publisher = session
.declare_publisher(&key_expr)
.cache(CacheConfig::default().max_samples(history))
.sample_miss_detection()
.publisher_detection()
.await
.unwrap();

println!("Press CTRL-C to quit...");
for idx in 0..u32::MAX {
tokio::time::sleep(Duration::from_secs(1)).await;
let buf = format!("[{idx:4}] {value}");
println!("Put Data ('{}': '{}')", &key_expr, buf);
session.put(&key_expr, buf).await.unwrap();
publisher.put(buf).await.unwrap();
}
}

Expand All @@ -59,36 +58,16 @@ struct Args {
#[arg(short = 'i', long, default_value = "1")]
/// The number of publications to keep in cache.
history: usize,
#[arg(short = 'o', long)]
/// Set `complete` option to true. This means that this queryable is ultimate data source, no need to scan other queryables.
complete: bool,
#[arg(short = 'x', long)]
/// An optional queryable prefix.
prefix: Option<String>,
#[command(flatten)]
common: CommonArgs,
}

fn parse_args() -> (
Config,
KeyExpr<'static>,
String,
usize,
Option<String>,
bool,
) {
fn parse_args() -> (Config, KeyExpr<'static>, String, usize) {
let args = Args::parse();
let mut config: Config = args.common.into();
config
.timestamping
.set_enabled(Some(ModeDependentValue::Unique(true)))
.unwrap();
(
config,
args.key,
args.value,
args.history,
args.prefix,
args.complete,
)
(config, args.key, args.value, args.history)
}
84 changes: 84 additions & 0 deletions zenoh-ext/examples/examples/z_advanced_sub.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
//
// Copyright (c) 2023 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//
use std::time::Duration;

use clap::{arg, Parser};
use zenoh::config::Config;
use zenoh_ext::{AdvancedSubscriberBuilderExt, HistoryConfig, RecoveryConfig};
use zenoh_ext_examples::CommonArgs;

#[tokio::main]
async fn main() {
// Initiate logging
zenoh::init_log_from_env_or("error");

let (config, key_expr) = parse_args();

println!("Opening session...");
let session = zenoh::open(config).await.unwrap();

println!("Declaring AdvancedSubscriber on {}", key_expr);
let subscriber = session
.declare_subscriber(key_expr)
.history(HistoryConfig::default().detect_late_publishers())
.recovery(RecoveryConfig::default().periodic_queries(Some(Duration::from_secs(1))))
.subscriber_detection()
.await
.unwrap();

let miss_listener = subscriber.sample_miss_listener().await.unwrap();

println!("Press CTRL-C to quit...");
loop {
tokio::select! {
sample = subscriber.recv_async() => {
if let Ok(sample) = sample {
let payload = sample
.payload()
.try_to_string()
.unwrap_or_else(|e| e.to_string().into());
println!(
">> [Subscriber] Received {} ('{}': '{}')",
sample.kind(),
sample.key_expr().as_str(),
payload
);
}
},
miss = miss_listener.recv_async() => {
if let Ok(miss) = miss {
println!(
">> [Subscriber] Missed {} samples from {:?} !!!",
miss.nb(),
miss.source()
);
}
},
}
}
}

#[derive(clap::Parser, Clone, PartialEq, Eq, Hash, Debug)]
struct Args {
#[arg(short, long, default_value = "demo/example/**")]
/// The key expression to subscribe onto.
key: String,
#[command(flatten)]
common: CommonArgs,
}

fn parse_args() -> (Config, String) {
let args = Args::parse();
(args.common.into(), args.key)
}
Loading

0 comments on commit cb3fa54

Please sign in to comment.