Skip to content

Commit

Permalink
Bug fix for race condition and unpredictable presence of DONE packet.
Browse files Browse the repository at this point in the history
This commit addresses two separate issues:

1. There was previously a short window where a message was sent to the
netlink socket, but the sequence number had not yet been registered
for routing back to the context that it was sent from. This caused
messages to periodically disappear unexpectedly.
2. There was some inconsistency in how Nlmsg::Done packets were handled.
This commit standardizes on always returning them from the iteratora.
While this may not be preferable from a user standpoint, it will
significantly simplify the code, resulting in less room for error in the
future.
  • Loading branch information
jbaublitz committed Nov 3, 2023
1 parent 88bf372 commit fc27987
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 15 deletions.
6 changes: 4 additions & 2 deletions src/router/asynchronous.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,11 +236,13 @@ impl NlRouter {
.nl_seq(self.next_seq().await)
.nl_payload(nl_payload)
.build()?;
let flags = *msg.nl_flags();
let seq = *msg.nl_seq();
self.socket.send(&msg).await?;
let (sender, receiver) = channel(1024);
self.senders.lock().await.insert(seq, sender);
let flags = *msg.nl_flags();

self.socket.send(&msg).await?;

Ok(NlRouterReceiverHandle::new(
receiver,
Arc::clone(&self.senders),
Expand Down
20 changes: 16 additions & 4 deletions src/router/synchronous.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,16 @@ fn spawn_processing_thread(socket: Arc<NlSocketHandle>, senders: Senders) -> Pro
}
}
}
} else {
for (seq, sender) in lock.iter() {
if sender
.send(Err(RouterError::BadSeqOrPid(m.clone())))
.is_err()
{
error!("{}", RouterError::<u16, Buffer>::ClosedChannel);
seqs_to_remove.insert(*seq);
}
}
}
}
Err(e) => {
Expand Down Expand Up @@ -221,11 +231,14 @@ impl NlRouter {
.nl_seq(self.next_seq())
.nl_payload(nl_payload)
.build()?;
let flags = *msg.nl_flags();
let seq = *msg.nl_seq();
self.socket.send(&msg)?;

let (sender, receiver) = channel();
let seq = *msg.nl_seq();
self.senders.lock().insert(seq, sender);
let flags = *msg.nl_flags();

self.socket.send(&msg)?;

Ok(NlRouterReceiverHandle::new(
receiver,
Arc::clone(&self.senders),
Expand Down Expand Up @@ -474,7 +487,6 @@ where
self.next_is_ack = true;
} else {
self.next_is_none = true;
return None;
}
} else if self.next_is_ack {
self.next_is_none = true;
Expand Down
23 changes: 14 additions & 9 deletions src/rtnl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -488,13 +488,15 @@ mod test {
.unwrap();
for msg in recv {
let msg = msg.unwrap();
let handle = msg.get_payload().unwrap().rtattrs.get_attr_handle();
handle
.get_attr_payload_as_with_len::<String>(Ifla::Ifname)
.unwrap();
// Assert length of ethernet address
if let Ok(attr) = handle.get_attr_payload_as_with_len::<Vec<u8>>(Ifla::Address) {
assert_eq!(attr.len(), 6);
if let Some(payload) = msg.get_payload() {
let handle = payload.rtattrs.get_attr_handle();
handle
.get_attr_payload_as_with_len::<String>(Ifla::Ifname)
.unwrap();
// Assert length of ethernet address
if let Ok(attr) = handle.get_attr_payload_as_with_len::<Vec<u8>>(Ifla::Address) {
assert_eq!(attr.len(), 6);
}
}
}
}
Expand Down Expand Up @@ -523,8 +525,11 @@ mod test {
.unwrap();
for msg in recv {
let msg = msg.unwrap();
assert!(matches!(msg.get_payload().unwrap(), Tcmsg { .. }));
assert_eq!(msg.nl_type(), &Rtm::Newqdisc);
assert!(matches!(msg.get_payload(), Some(Tcmsg { .. }) | None));
assert!(matches!(
msg.nl_type(),
Rtm::Newqdisc | Rtm::UnrecognizedConst(3)
));
}
}
}

0 comments on commit fc27987

Please sign in to comment.