Skip to content
This repository has been archived by the owner on Oct 26, 2022. It is now read-only.

Commit

Permalink
Allow sending batch of messages through connection handle
Browse files Browse the repository at this point in the history
This is motivated by netfilter; changes to netfilter must be done
through a series of messages (started by NFNL_MSG_BATCH_BEGIN, ended by
NFNL_MSG_BATCH_END).  The full batch needs to be submitted to the kernel
in one write/sendto/..., otherwise the kernel will abort the batch.

(And sending a batch without an END message is interpreted as a query to
verify the batch without actually committing it.)
  • Loading branch information
stbuehler committed Apr 10, 2022
1 parent 3363b7a commit a220a69
Show file tree
Hide file tree
Showing 4 changed files with 104 additions and 0 deletions.
12 changes: 12 additions & 0 deletions netlink-proto/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,18 @@ where
return;
}
}
OutgoingMessage::Batch(mut messages, addr) => {
for message in &mut messages {
message.finalize();
}

trace!("sending outgoing message");
if let Err(e) = Pin::as_mut(&mut socket).start_send((messages, addr)) {
error!("failed to send message: {:?}", e);
self.socket_closed = true;
return;
}
}
}
}

Expand Down
74 changes: 74 additions & 0 deletions netlink-proto/src/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,18 @@ where
Ok(rx)
}

/// Start a batch of messages
///
/// Collects multiple messages to be sent in one "request".
pub fn batch(&self, destination: SocketAddr) -> BatchHandle<T> {
BatchHandle {
handle: self.clone(),
destination,
messages: Vec::new(),
channels: Vec::new(),
}
}

pub fn notify(
&mut self,
message: NetlinkMessage<T>,
Expand All @@ -83,3 +95,65 @@ impl<T: Debug> Clone for ConnectionHandle<T> {
}
}
}

/// A handle to create a batch request (multiple requests serialized in one buffer)
///
/// The request needs to be [`sent`](`BatchHandle::send`) to actually do something.
#[derive(Debug)]
#[must_use = "A batch of messages must be sent to actually do something"]
pub struct BatchHandle<T>
where
T: Debug,
{
handle: ConnectionHandle<T>,
destination: SocketAddr,
messages: Vec<NetlinkMessage<T>>,
channels: Vec<UnboundedSender<NetlinkMessage<T>>>,
}

impl<T> BatchHandle<T>
where
T: Debug,
{
/// Add a new request to the batch and get the response as a stream of messages.
///
/// Similar to [`ConnectionHandle::request`].
///
/// Response stream will block until batch request is sent, and will be empty
/// if the batch request is dropped.
pub fn request(&mut self, message: NetlinkMessage<T>) -> impl Stream<Item = NetlinkMessage<T>> {
let (tx, rx) = unbounded::<NetlinkMessage<T>>();
self.messages.push(message);
self.channels.push(tx);
rx
}

/// Add a new request to the batch, but ignore response messages
///
/// Similar to [`ConnectionHandle::notify`].
pub fn notify(&mut self, message: NetlinkMessage<T>) {
let _ = self.request(message);
}

/// Send batch request
pub fn send(self) -> Result<(), Error<T>> {
debug!("handle: forwarding new request to connection");
let request = Request::Batch {
metadata: self.channels,
messages: self.messages,
destination: self.destination,
};
UnboundedSender::unbounded_send(&self.handle.requests_tx, request).map_err(|e| {
// the channel is unbounded, so it can't be full. If this
// failed, it means the Connection shut down.
if e.is_full() {
panic!("internal error: unbounded channel full?!");
} else if e.is_disconnected() {
Error::ConnectionClosed
} else {
panic!("unknown error: {:?}", e);
}
})?;
Ok(())
}
}
13 changes: 13 additions & 0 deletions netlink-proto/src/protocol/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ struct PendingRequest<M> {
#[derive(Debug)]
pub(crate) enum OutgoingMessage<T> {
Single(NetlinkMessage<T>, SocketAddr),
Batch(Vec<NetlinkMessage<T>>, SocketAddr),
}

#[derive(Debug, Default)]
Expand Down Expand Up @@ -178,6 +179,18 @@ where
self.outgoing_messages
.push_back(OutgoingMessage::Single(message, destination));
}
Request::Batch {
mut messages,
metadata,
destination,
} => {
assert_eq!(messages.len(), metadata.len());
for (msg, md) in messages.iter_mut().zip(metadata.into_iter()) {
self.request_single(msg, md, &destination);
}
self.outgoing_messages
.push_back(OutgoingMessage::Batch(messages, destination));
}
}
}

Expand Down
5 changes: 5 additions & 0 deletions netlink-proto/src/protocol/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,9 @@ pub(crate) enum Request<T, M> {
message: NetlinkMessage<T>,
destination: SocketAddr,
},
Batch {
metadata: Vec<M>,
messages: Vec<NetlinkMessage<T>>,
destination: SocketAddr,
},
}

0 comments on commit a220a69

Please sign in to comment.