diff --git a/rumqttc/tests/broker.rs b/rumqttc/tests/broker.rs index c450b68b4..609b381ed 100644 --- a/rumqttc/tests/broker.rs +++ b/rumqttc/tests/broker.rs @@ -9,9 +9,39 @@ use tokio::{task, time}; use bytes::BytesMut; use flume::{bounded, Receiver, Sender}; -use rumqttc::{Event, Incoming, Outgoing, Packet}; +use rumqttc::{Incoming, Packet}; use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; +#[derive(Debug, PartialEq)] +pub enum Event { + Incoming(Packet), + Outgoing(Outgoing), +} + +#[derive(Debug, PartialEq)] +pub enum Outgoing { + /// Publish packet with packet identifier. 0 implies QoS 0 + Publish(u16), + /// SubAck packet with packet identifier + SubAck(u16), + /// UnsubAck packet with packet identifier + UnsubAck(u16), + /// PubAck packet + PubAck(u16), + /// PubRec packet + PubRec(u16), + /// PubRel packet + PubRel(u16), + /// PubComp packet + PubComp(u16), + /// Ping request packet + PingReq, + /// Ping response packet + PingResp, + /// Disconnect packet + Disconnect, +} + pub struct Broker { pub(crate) framed: Network, pub(crate) incoming: VecDeque, @@ -116,8 +146,8 @@ impl Broker { } } - /// Sends an acknowledgement - pub async fn ack(&mut self, pkid: u16) { + /// Sends a publish acknowledgement + pub async fn puback(&mut self, pkid: u16) { let packet = Packet::PubAck(PubAck::new(pkid)); self.framed.write(packet).await.unwrap(); } @@ -134,6 +164,18 @@ impl Broker { self.framed.write(packet).await.unwrap(); } + /// Sends a subscribe acknowledgement + pub async fn suback(&mut self, pkid: u16, qos: QoS) { + let packet = Packet::SubAck(SubAck::new(pkid, vec![SubscribeReasonCode::Success(qos)])); + self.framed.write(packet).await.unwrap(); + } + + /// Sends an unsubscribe acknowledgement + pub async fn unsuback(&mut self, pkid: u16) { + let packet = Packet::UnsubAck(UnsubAck::new(pkid)); + self.framed.write(packet).await.unwrap(); + } + /// Sends an acknowledgement pub async fn pingresp(&mut self) { let packet = Packet::PingResp; @@ -308,8 +350,8 @@ fn outgoing(packet: &Packet) -> Outgoing { Packet::PubRec(pubrec) => Outgoing::PubRec(pubrec.pkid), Packet::PubRel(pubrel) => Outgoing::PubRel(pubrel.pkid), Packet::PubComp(pubcomp) => Outgoing::PubComp(pubcomp.pkid), - Packet::Subscribe(subscribe) => Outgoing::Subscribe(subscribe.pkid), - Packet::Unsubscribe(unsubscribe) => Outgoing::Unsubscribe(unsubscribe.pkid), + Packet::SubAck(suback) => Outgoing::SubAck(suback.pkid), + Packet::UnsubAck(unsuback) => Outgoing::UnsubAck(unsuback.pkid), Packet::PingReq => Outgoing::PingReq, Packet::PingResp => Outgoing::PingResp, Packet::Disconnect => Outgoing::Disconnect, diff --git a/rumqttc/tests/reliability.rs b/rumqttc/tests/reliability.rs index 2f559802b..2915d3e8e 100644 --- a/rumqttc/tests/reliability.rs +++ b/rumqttc/tests/reliability.rs @@ -179,7 +179,7 @@ async fn some_outgoing_and_no_incoming_should_trigger_pings_on_time() { loop { let event = broker.tick().await; - if event == Event::Incoming(Incoming::PingReq) { + if event == broker::Event::Incoming(Incoming::PingReq) { // wait for 3 pings count += 1; if count == 3 { @@ -218,7 +218,7 @@ async fn some_incoming_and_no_outgoing_should_trigger_pings_on_time() { loop { let event = broker.tick().await; - if event == Event::Incoming(Incoming::PingReq) { + if event == broker::Event::Incoming(Incoming::PingReq) { // wait for 3 pings count += 1; if count == 3 { @@ -320,12 +320,12 @@ async fn requests_are_recovered_after_inflight_queue_size_falls_below_max() { assert!(broker.read_publish().await.is_none()); // ack packet 1 and client would produce packet 4 - broker.ack(1).await; + broker.puback(1).await; assert!(broker.read_publish().await.is_some()); assert!(broker.read_publish().await.is_none()); // ack packet 2 and client would produce packet 5 - broker.ack(2).await; + broker.puback(2).await; assert!(broker.read_publish().await.is_some()); assert!(broker.read_publish().await.is_none()); } @@ -353,18 +353,18 @@ async fn packet_id_collisions_are_detected_and_flow_control_is_applied() { } // out of order ack - broker.ack(3).await; - broker.ack(4).await; + broker.puback(3).await; + broker.puback(4).await; time::sleep(Duration::from_secs(5)).await; - broker.ack(1).await; - broker.ack(2).await; + broker.puback(1).await; + broker.puback(2).await; // read and ack remaining packets in order for i in 5..=15 { let packet = broker.read_publish().await; let packet = packet.unwrap(); assert_eq!(packet.payload[0], i); - broker.ack(packet.pkid).await; + broker.puback(packet.pkid).await; } time::sleep(Duration::from_secs(10)).await; @@ -376,7 +376,7 @@ async fn packet_id_collisions_are_detected_and_flow_control_is_applied() { // Poll until there is collision. loop { match eventloop.poll().await.unwrap() { - Event::Outgoing(Outgoing::AwaitAck(1)) => break, + rumqttc::Event::Outgoing(rumqttc::Outgoing::AwaitAck(1)) => break, v => { println!("Poll = {v:?}"); continue; @@ -390,7 +390,7 @@ async fn packet_id_collisions_are_detected_and_flow_control_is_applied() { println!("Poll = {event:?}"); match event { - Event::Outgoing(Outgoing::Publish(ack)) => { + rumqttc::Event::Outgoing(rumqttc::Outgoing::Publish(ack)) => { if ack == 1 { let elapsed = start.elapsed().as_millis() as i64; let deviation_millis: i64 = (5000 - elapsed).abs(); @@ -466,7 +466,7 @@ async fn next_poll_after_connect_failure_reconnects() { } match eventloop.poll().await { - Ok(Event::Incoming(Packet::ConnAck(ConnAck { + Ok(rumqttc::Event::Incoming(Packet::ConnAck(ConnAck { code: ConnectReturnCode::Success, session_present: false, }))) => (), @@ -498,7 +498,7 @@ async fn reconnection_resumes_from_the_previous_state() { for i in 1..=2 { let packet = broker.read_publish().await.unwrap(); assert_eq!(i, packet.payload[0]); - broker.ack(packet.pkid).await; + broker.puback(packet.pkid).await; } // NOTE: An interesting thing to notice here is that reassigning a new broker @@ -512,7 +512,7 @@ async fn reconnection_resumes_from_the_previous_state() { for i in 3..=4 { let packet = broker.read_publish().await.unwrap(); assert_eq!(i, packet.payload[0]); - broker.ack(packet.pkid).await; + broker.puback(packet.pkid).await; } } @@ -696,7 +696,7 @@ async fn resolve_on_qos1_ack_from_broker() { .unwrap_err(); // Finally ack the packet - broker.ack(1).await; + broker.puback(1).await; // Token shouldn't resolve until packet is acked assert_eq!( @@ -780,3 +780,120 @@ async fn resolve_on_qos2_ack_from_broker() { 1 ); } + +#[tokio::test] +async fn resolve_on_sub_ack_from_broker() { + let options = MqttOptions::new("dummy", "127.0.0.1", 3006); + let (client, mut eventloop) = AsyncClient::new(options, 5); + + task::spawn(async move { + let res = run(&mut eventloop, false).await; + if let Err(e) = res { + match e { + ConnectionError::FlushTimeout => { + assert!(eventloop.network.is_none()); + println!("State is being clean properly"); + } + _ => { + println!("Couldn't fill the TCP send buffer to run this test properly. Try reducing the size of buffer."); + } + } + } + }); + + let mut broker = Broker::new(3006, 0, false).await; + + let mut token = client + .subscribe("hello/world", QoS::AtLeastOnce) + .await + .unwrap(); + + // Token shouldn't resolve before reaching broker + timeout(Duration::from_secs(1), &mut token) + .await + .unwrap_err(); + + let Packet::Subscribe(Subscribe { pkid, filters, .. }) = broker.read_packet().await.unwrap() + else { + unreachable!() + }; + assert_eq!( + filters, + [SubscribeFilter { + path: "hello/world".to_owned(), + qos: QoS::AtLeastOnce + }] + ); + assert_eq!(pkid, 1); + + // Token shouldn't resolve until packet is acked + timeout(Duration::from_secs(1), &mut token) + .await + .unwrap_err(); + + // Finally ack the packet + broker.suback(1, QoS::AtLeastOnce).await; + + // Token shouldn't resolve until packet is acked + assert_eq!( + timeout(Duration::from_secs(1), &mut token) + .await + .unwrap() + .unwrap(), + 1 + ); +} + +#[tokio::test] +async fn resolve_on_unsub_ack_from_broker() { + let options = MqttOptions::new("dummy", "127.0.0.1", 3006); + let (client, mut eventloop) = AsyncClient::new(options, 5); + + task::spawn(async move { + let res = run(&mut eventloop, false).await; + if let Err(e) = res { + match e { + ConnectionError::FlushTimeout => { + assert!(eventloop.network.is_none()); + println!("State is being clean properly"); + } + _ => { + println!("Couldn't fill the TCP send buffer to run this test properly. Try reducing the size of buffer."); + } + } + } + }); + + let mut broker = Broker::new(3006, 0, false).await; + + let mut token = client.unsubscribe("hello/world").await.unwrap(); + + // Token shouldn't resolve before reaching broker + timeout(Duration::from_secs(1), &mut token) + .await + .unwrap_err(); + + let Packet::Unsubscribe(Unsubscribe { topics, pkid, .. }) = broker.read_packet().await.unwrap() + else { + unreachable!() + }; + assert_eq!(topics, vec!["hello/world"]); + assert_eq!(pkid, 1); + + // Token shouldn't resolve until packet is acked + timeout(Duration::from_secs(1), &mut token) + .await + .unwrap_err(); + + // Finally ack the packet + broker.unsuback(1).await; + + // Token shouldn't resolve until packet is acked + assert_eq!( + timeout(Duration::from_secs(1), &mut token) + .await + .unwrap() + .unwrap(), + 1 + ); +}