Skip to content
This repository has been archived by the owner on Oct 26, 2022. It is now read-only.

Commit

Permalink
Further preparation of handling batch messages
Browse files Browse the repository at this point in the history
  • Loading branch information
stbuehler committed Nov 25, 2021
1 parent bf0f6ad commit be775f1
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 19 deletions.
18 changes: 11 additions & 7 deletions netlink-proto/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use crate::{
codecs::{NetlinkCodec, NetlinkMessageCodec},
framed::NetlinkFramed,
sys::{AsyncSocket, SocketAddr},
OutgoingMessage,
Protocol,
Request,
Response,
Expand Down Expand Up @@ -102,14 +103,17 @@ where
return;
}

let (mut message, addr) = protocol.outgoing_messages.pop_front().unwrap();
message.finalize();
match protocol.outgoing_messages.pop_front().unwrap() {
OutgoingMessage::Single(mut message, addr) => {
message.finalize();

trace!("sending outgoing message");
if let Err(e) = Pin::as_mut(&mut socket).start_send((message, addr)) {
error!("failed to send message: {:?}", e);
self.socket_closed = true;
return;
trace!("sending outgoing message");
if let Err(e) = Pin::as_mut(&mut socket).start_send((message, addr)) {
error!("failed to send message: {:?}", e);
self.socket_closed = true;
return;
}
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion netlink-proto/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ mod framed;
pub use crate::framed::*;

mod protocol;
pub(crate) use self::protocol::{Protocol, Response};
pub(crate) use self::protocol::{OutgoingMessage, Protocol, Response};
pub(crate) type Request<T> =
self::protocol::Request<T, UnboundedSender<crate::packet::NetlinkMessage<T>>>;

Expand Down
2 changes: 1 addition & 1 deletion netlink-proto/src/protocol/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,5 @@
mod protocol;
mod request;

pub(crate) use protocol::{Protocol, Response};
pub(crate) use protocol::{OutgoingMessage, Protocol, Response};
pub(crate) use request::Request;
37 changes: 27 additions & 10 deletions netlink-proto/src/protocol/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ struct PendingRequest<M> {
metadata: M,
}

#[derive(Debug)]
pub(crate) enum OutgoingMessage<T> {
Single(NetlinkMessage<T>, SocketAddr),
}

#[derive(Debug, Default)]
pub(crate) struct Protocol<T, M> {
/// Counter that is incremented for each message sent
Expand All @@ -60,7 +65,7 @@ pub(crate) struct Protocol<T, M> {
pub incoming_requests: VecDeque<(NetlinkMessage<T>, SocketAddr)>,

/// The messages to be sent out
pub outgoing_messages: VecDeque<(NetlinkMessage<T>, SocketAddr)>,
pub outgoing_messages: VecDeque<OutgoingMessage<T>>,
}

impl<T, M> Protocol<T, M>
Expand Down Expand Up @@ -130,17 +135,15 @@ where
debug!("done handling response to request {:?}", request_id);
}

pub fn request(&mut self, request: Request<T, M>) {
let Request::Single {
mut message,
metadata,
destination,
} = request;

self.set_sequence_id(&mut message);
fn request_single(
&mut self,
message: &mut NetlinkMessage<T>,
metadata: M,
destination: &SocketAddr,
) {
self.set_sequence_id(message);
let request_id = RequestId::new(self.sequence_id, destination.port_number());
let flags = message.header.flags;
self.outgoing_messages.push_back((message, destination));

// If we expect a response, we store the request id so that we
// can map the response to this specific request.
Expand All @@ -164,6 +167,20 @@ where
}
}

pub fn request(&mut self, request: Request<T, M>) {
match request {
Request::Single {
mut message,
metadata,
destination,
} => {
self.request_single(&mut message, metadata, &destination);
self.outgoing_messages
.push_back(OutgoingMessage::Single(message, destination));
}
}
}

fn set_sequence_id(&mut self, message: &mut NetlinkMessage<T>) {
self.sequence_id += 1;
message.header.sequence_number = self.sequence_id;
Expand Down

0 comments on commit be775f1

Please sign in to comment.