Skip to content

Commit

Permalink
refactor: condense and simplify examples
Browse files Browse the repository at this point in the history
  • Loading branch information
Devdutt Shenoi committed Oct 6, 2024
1 parent 79d5ff8 commit 165d169
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 143 deletions.
70 changes: 22 additions & 48 deletions rumqttc/examples/ack_promise.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,6 @@ use std::time::Duration;

#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<(), Box<dyn Error>> {
pretty_env_logger::init();
// color_backtrace::install();

let mut mqttoptions = MqttOptions::new("test-1", "localhost", 1883);
mqttoptions.set_keep_alive(Duration::from_secs(5));

Expand Down Expand Up @@ -39,57 +36,34 @@ async fn main() -> Result<(), Box<dyn Error>> {
}

// Publish at all QoS levels and wait for broker acknowledgement
match client
.publish("hello/world", QoS::AtMostOnce, false, vec![1; 1])
.await
.unwrap()
.await
{
Ok(pkid) => println!("Acknowledged Pub({pkid})"),
Err(e) => println!("Publish failed: {e:?}"),
}

match client
.publish("hello/world", QoS::AtLeastOnce, false, vec![1; 2])
.await
.unwrap()
.await
for (i, qos) in [QoS::AtMostOnce, QoS::AtLeastOnce, QoS::ExactlyOnce]
.into_iter()
.enumerate()
{
Ok(pkid) => println!("Acknowledged Pub({pkid})"),
Err(e) => println!("Publish failed: {e:?}"),
match client
.publish("hello/world", qos, false, vec![1; i])
.await
.unwrap()
.await
{
Ok(pkid) => println!("Acknowledged Pub({pkid})"),
Err(e) => println!("Publish failed: {e:?}"),
}
}

match client
.publish("hello/world", QoS::ExactlyOnce, false, vec![1; 3])
.await
.unwrap()
.await
// Publish with different QoS levels and spawn wait for notification
let mut set = JoinSet::new();
for (i, qos) in [QoS::AtMostOnce, QoS::AtLeastOnce, QoS::ExactlyOnce]
.into_iter()
.enumerate()
{
Ok(pkid) => println!("Acknowledged Pub({pkid})"),
Err(e) => println!("Publish failed: {e:?}"),
let token = client
.publish("hello/world", qos, false, vec![1; i])
.await
.unwrap();
set.spawn(token);
}

// Publish and spawn wait for notification
let mut set = JoinSet::new();

let future = client
.publish("hello/world", QoS::AtMostOnce, false, vec![1; 1])
.await
.unwrap();
set.spawn(async { future.await });

let future = client
.publish("hello/world", QoS::AtLeastOnce, false, vec![1; 2])
.await
.unwrap();
set.spawn(async { future.await });

let future = client
.publish("hello/world", QoS::ExactlyOnce, false, vec![1; 3])
.await
.unwrap();
set.spawn(async { future.await });

while let Some(Ok(res)) = set.join_next().await {
match res {
Ok(pkid) => println!("Acknowledged Pub({pkid})"),
Expand Down
75 changes: 28 additions & 47 deletions rumqttc/examples/ack_promise_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,6 @@ use std::thread::{self, sleep};
use std::time::Duration;

fn main() -> Result<(), Box<dyn Error>> {
pretty_env_logger::init();
// color_backtrace::install();

let mut mqttoptions = MqttOptions::new("test-1", "localhost", 1883);
mqttoptions.set_keep_alive(Duration::from_secs(5));

Expand Down Expand Up @@ -36,59 +33,43 @@ fn main() -> Result<(), Box<dyn Error>> {
}

// Publish at all QoS levels and wait for broker acknowledgement
match client
.publish("hello/world", QoS::AtMostOnce, false, vec![1; 1])
.unwrap()
.blocking_wait()
{
Ok(pkid) => println!("Acknowledged Pub({pkid})"),
Err(e) => println!("Publish failed: {e:?}"),
}

match client
.publish("hello/world", QoS::AtLeastOnce, false, vec![1; 2])
.unwrap()
.try_resolve()
for (i, qos) in [QoS::AtMostOnce, QoS::AtLeastOnce, QoS::ExactlyOnce]
.into_iter()
.enumerate()
{
Ok(pkid) => println!("Acknowledged Pub({pkid})"),
Err(e) => println!("Publish failed: {e:?}"),
}

match client
.publish("hello/world", QoS::ExactlyOnce, false, vec![1; 3])
.unwrap()
.blocking_wait()
{
Ok(pkid) => println!("Acknowledged Pub({pkid})"),
Err(e) => println!("Publish failed: {e:?}"),
match client
.publish("hello/world", qos, false, vec![1; i])
.unwrap()
.blocking_wait()
{
Ok(pkid) => println!("Acknowledged Pub({pkid})"),
Err(e) => println!("Publish failed: {e:?}"),
}
}

// Spawn threads for each publish, use channel to notify result
let (tx, rx) = bounded(1);

let future = client
.publish("hello/world", QoS::AtMostOnce, false, vec![1; 1])
.unwrap();
let tx_clone = tx.clone();
thread::spawn(move || {
let res = future.blocking_wait();
tx_clone.send(res).unwrap()
});

let future = client
.publish("hello/world", QoS::AtLeastOnce, false, vec![1; 2])
.unwrap();
let tx_clone = tx.clone();
thread::spawn(move || {
let res = future.blocking_wait();
tx_clone.send(res).unwrap()
});
for (i, qos) in [QoS::AtMostOnce, QoS::AtLeastOnce, QoS::ExactlyOnce]
.into_iter()
.enumerate()
{
let token = client
.publish("hello/world", qos, false, vec![1; i])
.unwrap();
let tx = tx.clone();
thread::spawn(move || {
let res = token.blocking_wait();
tx.send(res).unwrap()
});
}

let mut future = client
.publish("hello/world", QoS::ExactlyOnce, false, vec![1; 3])
// Try resolving a promise, if it is waiting to resolve, try again after a sleep of 1s
let mut token = client
.publish("hello/world", QoS::AtMostOnce, false, vec![1; 4])
.unwrap();
thread::spawn(move || loop {
match future.try_resolve() {
match token.try_resolve() {
Err(PromiseError::Waiting) => {
println!("Promise yet to resolve, retrying");
sleep(Duration::from_secs(1));
Expand Down
70 changes: 22 additions & 48 deletions rumqttc/examples/ack_promise_v5.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,6 @@ use std::time::Duration;

#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<(), Box<dyn Error>> {
pretty_env_logger::init();
// color_backtrace::install();

let mut mqttoptions = MqttOptions::new("test-1", "localhost", 1883);
mqttoptions.set_keep_alive(Duration::from_secs(5));

Expand Down Expand Up @@ -39,57 +36,34 @@ async fn main() -> Result<(), Box<dyn Error>> {
}

// Publish at all QoS levels and wait for broker acknowledgement
match client
.publish("hello/world", QoS::AtMostOnce, false, vec![1; 1])
.await
.unwrap()
.await
{
Ok(pkid) => println!("Acknowledged Pub({pkid})"),
Err(e) => println!("Publish failed: {e:?}"),
}

match client
.publish("hello/world", QoS::AtLeastOnce, false, vec![1; 2])
.await
.unwrap()
.await
for (i, qos) in [QoS::AtMostOnce, QoS::AtLeastOnce, QoS::ExactlyOnce]
.into_iter()
.enumerate()
{
Ok(pkid) => println!("Acknowledged Pub({pkid})"),
Err(e) => println!("Publish failed: {e:?}"),
match client
.publish("hello/world", qos, false, vec![1; i])
.await
.unwrap()
.await
{
Ok(pkid) => println!("Acknowledged Pub({pkid})"),
Err(e) => println!("Publish failed: {e:?}"),
}
}

match client
.publish("hello/world", QoS::ExactlyOnce, false, vec![1; 3])
.await
.unwrap()
.await
// Publish with different QoS levels and spawn wait for notification
let mut set = JoinSet::new();
for (i, qos) in [QoS::AtMostOnce, QoS::AtLeastOnce, QoS::ExactlyOnce]
.into_iter()
.enumerate()
{
Ok(pkid) => println!("Acknowledged Pub({pkid})"),
Err(e) => println!("Publish failed: {e:?}"),
let token = client
.publish("hello/world", qos, false, vec![1; i])
.await
.unwrap();
set.spawn(token);
}

// Publish and spawn wait for notification
let mut set = JoinSet::new();

let future = client
.publish("hello/world", QoS::AtMostOnce, false, vec![1; 1])
.await
.unwrap();
set.spawn(async { future.await });

let future = client
.publish("hello/world", QoS::AtLeastOnce, false, vec![1; 2])
.await
.unwrap();
set.spawn(async { future.await });

let future = client
.publish("hello/world", QoS::ExactlyOnce, false, vec![1; 3])
.await
.unwrap();
set.spawn(async { future.await });

while let Some(Ok(res)) = set.join_next().await {
match res {
Ok(pkid) => println!("Acknowledged Pub({pkid})"),
Expand Down

0 comments on commit 165d169

Please sign in to comment.