From 5693284349b519eaa8a9782c75c2a37707abea9f Mon Sep 17 00:00:00 2001 From: John Baublitz Date: Fri, 8 Sep 2023 01:04:19 -0400 Subject: [PATCH] Bug fix for race condition and unpredictable presence of DONE packet. This commit addresses two separate issues: 1. There was previously a short window where a message was sent to the netlink socket, but the sequence number had not yet been registered for routing back to the context that it was sent from. This caused messages to periodically disappear unexpectedly. 2. There was some inconsistency in how Nlmsg::Done packets were handled. This commit standardizes on always returning them from the iteratora. While this may not be preferable from a user standpoint, it will significantly simplify the code, resulting in less room for error in the future. --- src/router/asynchronous.rs | 6 ++++-- src/router/synchronous.rs | 20 ++++++++++++++++---- src/rtnl.rs | 23 ++++++++++++++--------- 3 files changed, 34 insertions(+), 15 deletions(-) diff --git a/src/router/asynchronous.rs b/src/router/asynchronous.rs index 550312c0..39823a7c 100644 --- a/src/router/asynchronous.rs +++ b/src/router/asynchronous.rs @@ -236,11 +236,13 @@ impl NlRouter { .nl_seq(self.next_seq().await) .nl_payload(nl_payload) .build()?; - let flags = *msg.nl_flags(); let seq = *msg.nl_seq(); - self.socket.send(&msg).await?; let (sender, receiver) = channel(1024); self.senders.lock().await.insert(seq, sender); + let flags = *msg.nl_flags(); + + self.socket.send(&msg).await?; + Ok(NlRouterReceiverHandle::new( receiver, Arc::clone(&self.senders), diff --git a/src/router/synchronous.rs b/src/router/synchronous.rs index 4db96480..5baabb26 100644 --- a/src/router/synchronous.rs +++ b/src/router/synchronous.rs @@ -92,6 +92,16 @@ fn spawn_processing_thread(socket: Arc, senders: Senders) -> Pro } } } + } else { + for (seq, sender) in lock.iter() { + if sender + .send(Err(RouterError::BadSeqOrPid(m.clone()))) + .is_err() + { + error!("{}", RouterError::::ClosedChannel); + seqs_to_remove.insert(*seq); + } + } } } Err(e) => { @@ -221,11 +231,14 @@ impl NlRouter { .nl_seq(self.next_seq()) .nl_payload(nl_payload) .build()?; - let flags = *msg.nl_flags(); - let seq = *msg.nl_seq(); - self.socket.send(&msg)?; + let (sender, receiver) = channel(); + let seq = *msg.nl_seq(); self.senders.lock().insert(seq, sender); + let flags = *msg.nl_flags(); + + self.socket.send(&msg)?; + Ok(NlRouterReceiverHandle::new( receiver, Arc::clone(&self.senders), @@ -474,7 +487,6 @@ where self.next_is_ack = true; } else { self.next_is_none = true; - return None; } } else if self.next_is_ack { self.next_is_none = true; diff --git a/src/rtnl.rs b/src/rtnl.rs index e7afbe2e..81b74eec 100644 --- a/src/rtnl.rs +++ b/src/rtnl.rs @@ -488,13 +488,15 @@ mod test { .unwrap(); for msg in recv { let msg = msg.unwrap(); - let handle = msg.get_payload().unwrap().rtattrs.get_attr_handle(); - handle - .get_attr_payload_as_with_len::(Ifla::Ifname) - .unwrap(); - // Assert length of ethernet address - if let Ok(attr) = handle.get_attr_payload_as_with_len::>(Ifla::Address) { - assert_eq!(attr.len(), 6); + if let Some(payload) = msg.get_payload() { + let handle = payload.rtattrs.get_attr_handle(); + handle + .get_attr_payload_as_with_len::(Ifla::Ifname) + .unwrap(); + // Assert length of ethernet address + if let Ok(attr) = handle.get_attr_payload_as_with_len::>(Ifla::Address) { + assert_eq!(attr.len(), 6); + } } } } @@ -523,8 +525,11 @@ mod test { .unwrap(); for msg in recv { let msg = msg.unwrap(); - assert!(matches!(msg.get_payload().unwrap(), Tcmsg { .. })); - assert_eq!(msg.nl_type(), &Rtm::Newqdisc); + assert!(matches!(msg.get_payload(), Some(Tcmsg { .. }) | None)); + assert!(matches!( + msg.nl_type(), + Rtm::Newqdisc | Rtm::UnrecognizedConst(3) + )); } } }