From be775f196a66256cfcee95bda002d8deed7d5d3d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Stefan=20B=C3=BChler?= Date: Fri, 26 Nov 2021 00:00:40 +0100 Subject: [PATCH] Further preparation of handling batch messages --- netlink-proto/src/connection.rs | 18 ++++++++----- netlink-proto/src/lib.rs | 2 +- netlink-proto/src/protocol/mod.rs | 2 +- netlink-proto/src/protocol/protocol.rs | 37 +++++++++++++++++++------- 4 files changed, 40 insertions(+), 19 deletions(-) diff --git a/netlink-proto/src/connection.rs b/netlink-proto/src/connection.rs index f280f080..3478e271 100644 --- a/netlink-proto/src/connection.rs +++ b/netlink-proto/src/connection.rs @@ -25,6 +25,7 @@ use crate::{ codecs::{NetlinkCodec, NetlinkMessageCodec}, framed::NetlinkFramed, sys::{AsyncSocket, SocketAddr}, + OutgoingMessage, Protocol, Request, Response, @@ -102,14 +103,17 @@ where return; } - let (mut message, addr) = protocol.outgoing_messages.pop_front().unwrap(); - message.finalize(); + match protocol.outgoing_messages.pop_front().unwrap() { + OutgoingMessage::Single(mut message, addr) => { + message.finalize(); - trace!("sending outgoing message"); - if let Err(e) = Pin::as_mut(&mut socket).start_send((message, addr)) { - error!("failed to send message: {:?}", e); - self.socket_closed = true; - return; + trace!("sending outgoing message"); + if let Err(e) = Pin::as_mut(&mut socket).start_send((message, addr)) { + error!("failed to send message: {:?}", e); + self.socket_closed = true; + return; + } + } } } diff --git a/netlink-proto/src/lib.rs b/netlink-proto/src/lib.rs index ea3e8fe3..2e9f3747 100644 --- a/netlink-proto/src/lib.rs +++ b/netlink-proto/src/lib.rs @@ -175,7 +175,7 @@ mod framed; pub use crate::framed::*; mod protocol; -pub(crate) use self::protocol::{Protocol, Response}; +pub(crate) use self::protocol::{OutgoingMessage, Protocol, Response}; pub(crate) type Request = self::protocol::Request>>; diff --git a/netlink-proto/src/protocol/mod.rs b/netlink-proto/src/protocol/mod.rs index 7fc136fa..739926f5 100644 --- a/netlink-proto/src/protocol/mod.rs +++ b/netlink-proto/src/protocol/mod.rs @@ -4,5 +4,5 @@ mod protocol; mod request; -pub(crate) use protocol::{Protocol, Response}; +pub(crate) use protocol::{OutgoingMessage, Protocol, Response}; pub(crate) use request::Request; diff --git a/netlink-proto/src/protocol/protocol.rs b/netlink-proto/src/protocol/protocol.rs index ae077991..c7d0203d 100644 --- a/netlink-proto/src/protocol/protocol.rs +++ b/netlink-proto/src/protocol/protocol.rs @@ -44,6 +44,11 @@ struct PendingRequest { metadata: M, } +#[derive(Debug)] +pub(crate) enum OutgoingMessage { + Single(NetlinkMessage, SocketAddr), +} + #[derive(Debug, Default)] pub(crate) struct Protocol { /// Counter that is incremented for each message sent @@ -60,7 +65,7 @@ pub(crate) struct Protocol { pub incoming_requests: VecDeque<(NetlinkMessage, SocketAddr)>, /// The messages to be sent out - pub outgoing_messages: VecDeque<(NetlinkMessage, SocketAddr)>, + pub outgoing_messages: VecDeque>, } impl Protocol @@ -130,17 +135,15 @@ where debug!("done handling response to request {:?}", request_id); } - pub fn request(&mut self, request: Request) { - let Request::Single { - mut message, - metadata, - destination, - } = request; - - self.set_sequence_id(&mut message); + fn request_single( + &mut self, + message: &mut NetlinkMessage, + metadata: M, + destination: &SocketAddr, + ) { + self.set_sequence_id(message); let request_id = RequestId::new(self.sequence_id, destination.port_number()); let flags = message.header.flags; - self.outgoing_messages.push_back((message, destination)); // If we expect a response, we store the request id so that we // can map the response to this specific request. @@ -164,6 +167,20 @@ where } } + pub fn request(&mut self, request: Request) { + match request { + Request::Single { + mut message, + metadata, + destination, + } => { + self.request_single(&mut message, metadata, &destination); + self.outgoing_messages + .push_back(OutgoingMessage::Single(message, destination)); + } + } + } + fn set_sequence_id(&mut self, message: &mut NetlinkMessage) { self.sequence_id += 1; message.header.sequence_number = self.sequence_id;