From aaa11854fd95e23144ce677c25229ea240a9d996 Mon Sep 17 00:00:00 2001 From: Felix Hilgers Date: Tue, 26 Nov 2024 00:41:21 +0100 Subject: [PATCH] feat: implement demo program for sendmsg Signed-off-by: Felix Hilgers --- rust/Cargo.lock | 130 +++++++++ rust/Cargo.toml | 6 +- rust/playground/sendmsg-demo/Cargo.toml | 17 ++ rust/playground/sendmsg-demo/src/main.rs | 334 +++++++++++++++++++++++ 4 files changed, 486 insertions(+), 1 deletion(-) create mode 100644 rust/playground/sendmsg-demo/Cargo.toml create mode 100644 rust/playground/sendmsg-demo/src/main.rs diff --git a/rust/Cargo.lock b/rust/Cargo.lock index 89c4111c..3cacfe03 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -558,6 +558,12 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" +[[package]] +name = "cfg_aliases" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "613afe47fcd5fac7ccf1db93babcb082c5994d996f20b8b159f2ad1658eb5724" + [[package]] name = "chrono" version = "0.4.38" @@ -640,6 +646,19 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "console" +version = "0.15.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0e1f83fc076bd6dd27517eacdf25fef6c4dfe5f1d7448bafaaf3a26f13b5e4eb" +dependencies = [ + "encode_unicode", + "lazy_static", + "libc", + "unicode-width 0.1.14", + "windows-sys 0.52.0", +] + [[package]] name = "core-error" version = "0.0.0" @@ -670,12 +689,31 @@ version = "0.8.20" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "22ec99545bb0ed0ea7bb9b8e1e9122ea386ff8a48c0922e43f36d45ab09e0e80" +[[package]] +name = "dialoguer" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "658bce805d770f407bc62102fca7c2c64ceef2fbcb2b8bd19d2765ce093980de" +dependencies = [ + "console", + "shell-words", + "tempfile", + "thiserror 1.0.68", + "zeroize", +] + [[package]] name = "either" version = "1.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "60b1af1c220855b6ceac025d3f6ecdd2b7c4894bfe9cd9bda4fbb4bc7c0d4cf0" +[[package]] +name = "encode_unicode" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a357d28ed41a50f9c765dbfe56cbc04a64e53e5fc58ba79fbc34c10ef3df831f" + [[package]] name = "equivalent" version = "1.0.1" @@ -1040,6 +1078,19 @@ dependencies = [ "hashbrown 0.15.0", ] +[[package]] +name = "indicatif" +version = "0.17.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cbf675b85ed934d3c67b5c5469701eec7db22689d0a2139d856e0925fa28b281" +dependencies = [ + "console", + "number_prefix", + "portable-atomic", + "unicode-width 0.2.0", + "web-time", +] + [[package]] name = "is_terminal_polyfill" version = "1.70.1" @@ -1115,6 +1166,15 @@ version = "2.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "78ca9ab1a0babb1e7d5695e3530886289c18cf2f87ec19a575a0abdce112e3a3" +[[package]] +name = "memoffset" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "488016bfae457b036d996092f6cb448677611ce4449e970ceaf42695203f218a" +dependencies = [ + "autocfg", +] + [[package]] name = "mime" version = "0.3.17" @@ -1164,6 +1224,19 @@ version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "defc4c55412d89136f966bbb339008b474350e5e6e78d2714439c386b3137a03" +[[package]] +name = "nix" +version = "0.29.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "71e2746dc3a24dd78b3cfcb7be93368c6de9963d30f43a6a73998a9cf4b17b46" +dependencies = [ + "bitflags", + "cfg-if", + "cfg_aliases", + "libc", + "memoffset", +] + [[package]] name = "nom" version = "7.1.3" @@ -1213,6 +1286,12 @@ dependencies = [ "syn", ] +[[package]] +name = "number_prefix" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "830b246a0e5f20af87141b25c173cd1b609bd7779a4617d6ec582abaf90870f3" + [[package]] name = "object" version = "0.36.5" @@ -1303,6 +1382,12 @@ version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b4596b6d070b27117e987119b4dac604f3c58cfb0b191112e24771b2faeac1a6" +[[package]] +name = "portable-atomic" +version = "1.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "280dc24453071f1b63954171985a0b0d30058d287960968b9b2aca264c8d4ee6" + [[package]] name = "ppv-lite86" version = "0.2.20" @@ -1575,6 +1660,17 @@ dependencies = [ "serde", ] +[[package]] +name = "sendmsg-demo" +version = "0.1.0" +dependencies = [ + "anyhow", + "console", + "dialoguer", + "indicatif", + "nix", +] + [[package]] name = "serde" version = "1.0.214" @@ -1628,6 +1724,12 @@ dependencies = [ "uniffi", ] +[[package]] +name = "shell-words" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "24188a676b6ae68c3b2cb3a01be17fbf7240ce009799bb56d5b1409051e78fde" + [[package]] name = "shlex" version = "1.3.0" @@ -2018,6 +2120,18 @@ version = "1.0.13" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e91b56cd4cadaeb79bbf1a5645f6b4f8dc5bde8834ad5894a8db35fda9efa1fe" +[[package]] +name = "unicode-width" +version = "0.1.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7dd6e30e90baa6f72411720665d41d89b9a3d039dc45b8faea1ddd07f617f6af" + +[[package]] +name = "unicode-width" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1fc81956842c57dac11422a97c3b8195a1ff727f06e85c84ed2e8aa277c9a0fd" + [[package]] name = "uniffi" version = "0.28.2" @@ -2223,6 +2337,16 @@ version = "0.2.95" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "65fc09f10666a9f147042251e0dda9c18f166ff7de300607007e96bdebc1068d" +[[package]] +name = "web-time" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a6580f308b1fad9207618087a65c04e7a10bc77e02c8e84e9b00dd4b12fa0bb" +dependencies = [ + "js-sys", + "wasm-bindgen", +] + [[package]] name = "weedle2" version = "5.0.0" @@ -2392,3 +2516,9 @@ dependencies = [ "quote", "syn", ] + +[[package]] +name = "zeroize" +version = "1.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ced3678a2879b30306d323f4542626697a464a97c0a07c9aebf7ebca65cd4dde" diff --git a/rust/Cargo.toml b/rust/Cargo.toml index 342f0ffd..063c49ae 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -7,7 +7,7 @@ [workspace] resolver = "2" -members = ["xtask", "backend/daemon", "backend/ebpf", "backend/common", "shared", "client"] +members = ["xtask", "backend/daemon", "backend/ebpf", "backend/common", "shared", "client", "playground/sendmsg-demo"] default-members = ["xtask", "backend/daemon", "backend/common", "shared", "client"] [workspace.package] @@ -49,6 +49,10 @@ tracing = { version = "0.1.40" } tracing-subscriber = { version = "0.3.18", features = ["env-filter"] } procfs = "0.17.0" async-broadcast = "0.7.1" +console = "0.15.8" +dialoguer = "0.11.0" +indicatif = "0.17.9" +nix = "0.29.0" [profile.dev] panic = "abort" diff --git a/rust/playground/sendmsg-demo/Cargo.toml b/rust/playground/sendmsg-demo/Cargo.toml new file mode 100644 index 00000000..b73d7919 --- /dev/null +++ b/rust/playground/sendmsg-demo/Cargo.toml @@ -0,0 +1,17 @@ +# SPDX-FileCopyrightText: 2024 Felix Hilgers +# +# SPDX-License-Identifier: MIT + +[package] +name = "sendmsg-demo" +version = "0.1.0" +license.workspace = true +repository.workspace = true +edition.workspace = true + +[dependencies] +anyhow = { workspace = true } +console = { workspace = true } +dialoguer = { workspace = true } +indicatif = { workspace = true } +nix = { workspace = true, features = ["socket", "uio"] } diff --git a/rust/playground/sendmsg-demo/src/main.rs b/rust/playground/sendmsg-demo/src/main.rs new file mode 100644 index 00000000..437f7ae7 --- /dev/null +++ b/rust/playground/sendmsg-demo/src/main.rs @@ -0,0 +1,334 @@ +// SPDX-FileCopyrightText: 2024 Felix Hilgers +// +// SPDX-License-Identifier: MIT + +use std::{ + io::{IoSlice, IoSliceMut}, os::fd::{AsRawFd, OwnedFd}, process::id, sync::mpsc, thread::{sleep, spawn}, time::Duration +}; + +use console::style; +use dialoguer::{theme::ColorfulTheme, Input}; +use indicatif::{MultiProgress, ProgressBar, ProgressFinish, ProgressStyle}; +use nix::sys::socket::{ + getsockopt, recvmsg, sendmsg, setsockopt, socketpair, + sockopt::SndBuf, + AddressFamily, MsgFlags, SockFlag, SockType, + }; + +pub struct Inputs<'a> { + packet_amount: Input<'a, u64>, + time_between_packets: Input<'a, u64>, + time_to_block: Input<'a, u64>, +} + +impl<'a> Inputs<'a> { + pub fn new(theme: &'a ColorfulTheme) -> Self { + let packet_amount = Input::::with_theme(theme) + .with_prompt("How many packets do you want to send?".to_owned()); + + let time_between_packets = Input::::with_theme(theme) + .with_prompt("How long to wait between packets in seconds?".to_owned()); + + let time_to_block = Input::::with_theme(theme) + .with_prompt("How long to block when sending packets in seconds?".to_owned()); + + Self { + packet_amount, + time_between_packets, + time_to_block, + } + } + + pub fn interact(self) -> Result { + let packet_amount = self.packet_amount.interact()?; + let time_between_packets = self.time_between_packets.interact()?; + let time_to_block = self.time_to_block.interact()?; + + Ok(Data { + packet_amount, + time_between_packets, + time_to_block, + }) + } +} + +pub struct Data { + packet_amount: u64, + time_between_packets: u64, + time_to_block: u64, +} + +#[derive(Clone)] +pub struct Bars { + _multi: MultiProgress, + packets_sent: ProgressBar, + packets_received: ProgressBar, + wait_bar: ProgressBar, + block_bar: ProgressBar, +} + +impl Bars { + pub fn new(packet_amount: u64, time_between_packets: u64, time_to_block: u64) -> Self { + let multi = MultiProgress::new(); + + let send_style = ProgressStyle::default_bar() + .template("[{elapsed_precise}] {bar:40.green} {pos:>7}/{len:7} {msg}") + .expect("template should be valid"); + + let receive_style = ProgressStyle::default_bar() + .template("[{elapsed_precise}] {bar:40.blue} {pos:>7}/{len:7} {msg}") + .expect("template should be valid"); + + let sleep_style = ProgressStyle::default_bar() + .template("[{elapsed_precise}] {bar:40.yellow} {pos:>7}/{len:7} {msg}") + .expect("template should be valid"); + + let blocking_style = ProgressStyle::default_bar() + .template("[{elapsed_precise}] {bar:40.red} {pos:>7}/{len:7} {msg}") + .expect("template should be valid"); + + + let packets_sent = ProgressBar::new(packet_amount) + .with_style(send_style) + .with_message("packets sent") + .with_finish(ProgressFinish::WithMessage("done".into())); + + let packets_received = ProgressBar::new(packet_amount) + .with_style(receive_style) + .with_message("packets received") + .with_finish(ProgressFinish::WithMessage("done".into())); + + let wait_bar = ProgressBar::new(time_between_packets * 50) + .with_style(sleep_style) + .with_message("sleeping") + .with_finish(ProgressFinish::WithMessage("done".into())); + + let block_bar = ProgressBar::new(time_to_block * 50) + .with_style(blocking_style) + .with_message("blocking") + .with_finish(ProgressFinish::WithMessage("done".into())); + + multi.add(wait_bar.clone()); + multi.add(block_bar.clone()); + multi.add(packets_sent.clone()); + multi.add(packets_received.clone()); + + Self { + _multi: multi, + packets_sent, + packets_received, + wait_bar, + block_bar, + } + } + + pub fn tick(&self) { + self.wait_bar.tick(); + self.block_bar.tick(); + self.packets_sent.tick(); + self.packets_received.tick(); + } +} + +struct Sender { + fd: OwnedFd, + blocking_buffer: Vec, +} + +impl Sender { + pub fn new(fd: OwnedFd, blocking_size: usize) -> Self { + Self { + fd, + blocking_buffer: vec![1u8; blocking_size], + } + } + + pub fn send(&self) -> Result<(), nix::Error> { + sendmsg::<()>( + self.fd.as_raw_fd(), + &[IoSlice::new(&self.blocking_buffer)], + &[], + MsgFlags::empty(), + None, + )?; + Ok(()) + } +} + +struct Receiver { + fd: OwnedFd, + blocking_buffer: Vec, +} + +impl Receiver { + pub fn new(fd: OwnedFd, blocking_size: usize) -> Self { + Self { + fd, + blocking_buffer: vec![0u8; blocking_size], + } + } + + pub fn recv(&mut self) -> Result<(), nix::Error> { + recvmsg::<()>( + self.fd.as_raw_fd(), + &mut [IoSliceMut::new(&mut self.blocking_buffer)], + None, + MsgFlags::empty(), + )?; + recvmsg::<()>( + self.fd.as_raw_fd(), + &mut [IoSliceMut::new(&mut self.blocking_buffer)], + None, + MsgFlags::empty(), + )?; + + Ok(()) + } +} + +fn create_blocking_pair() -> Result<(Sender, Receiver), nix::Error> { + let (tx, rx) = socketpair( + AddressFamily::Unix, + SockType::Stream, + None, + SockFlag::empty(), + )?; + + // set send buffer to minimal size + setsockopt(&tx, SndBuf, &0)?; + + let actual_sndbuf_size = getsockopt(&tx, SndBuf)?; + + Ok(( + Sender::new(tx, actual_sndbuf_size), + Receiver::new(rx, actual_sndbuf_size), + )) +} + +struct SenderTask { + sender: Sender, + packet_amount: u64, + time_between_packets: u64, + packets_sent: ProgressBar, + wait_bar: ProgressBar, + wait_finished_tx: mpsc::Sender<()>, + receiver_finished_rx: mpsc::Receiver<()>, +} + +impl SenderTask { + pub fn execute(self) -> Result<(), nix::Error> { + for _ in 0..self.packet_amount { + self.wait_bar.reset(); + self.wait_bar.set_message("sleeping"); + for _ in 0..(self.time_between_packets * 50) { + sleep(Duration::from_millis(20)); + self.wait_bar.inc(1); + } + self.wait_bar.finish_using_style(); + self.wait_finished_tx.send(()).unwrap(); + + self.packets_sent.inc(1); + + self.sender.send()?; + } + self.packets_sent.finish_using_style(); + self.receiver_finished_rx.recv().unwrap(); + + Ok(()) + } +} + +struct ReceiverTask { + receiver: Receiver, + packet_amount: u64, + time_to_block: u64, + packets_received: ProgressBar, + block_bar: ProgressBar, + wait_finished_rx: mpsc::Receiver<()>, + receiver_finished_tx: mpsc::Sender<()>, +} + +impl ReceiverTask { + pub fn execute(mut self) -> Result<(), nix::Error> { + for _ in 0..self.packet_amount { + self.wait_finished_rx.recv().unwrap(); + + self.block_bar.reset(); + self.block_bar.set_message("blocking"); + for _ in 0..(self.time_to_block * 50) { + sleep(Duration::from_millis(20)); + self.block_bar.inc(1); + } + self.block_bar.finish_using_style(); + + self.receiver.recv()?; + + self.packets_received.inc(1); + } + + self.packets_received.finish_using_style(); + self.receiver_finished_tx.send(()).unwrap(); + + Ok(()) + } +} + +fn create_task_pair( + packet_amount: u64, + time_between_packets: u64, + time_to_block: u64, + bars: Bars, +) -> Result<(SenderTask, ReceiverTask), nix::Error> { + let (sender, receiver) = create_blocking_pair()?; + let (wait_finished_tx, wait_finished_rx) = mpsc::channel::<()>(); + let (receiver_finished_tx, receiver_finished_rx) = mpsc::channel::<()>(); + + let sender = SenderTask { + sender, + packet_amount, + time_between_packets, + packets_sent: bars.packets_sent.clone(), + wait_bar: bars.wait_bar.clone(), + wait_finished_tx, + receiver_finished_rx, + }; + + let receiver = ReceiverTask { + receiver, + packet_amount, + time_to_block, + packets_received: bars.packets_received.clone(), + block_bar: bars.block_bar.clone(), + wait_finished_rx, + receiver_finished_tx, + }; + + Ok((sender, receiver)) +} + +fn main() -> anyhow::Result<()> { + println!("{} {}\n", style("Process PID:").bold(), style(id().to_string()).bold().green()); + + let Data { + packet_amount, + time_between_packets, + time_to_block, + .. + } = Inputs::new(&ColorfulTheme::default()).interact()?; + + println!(); + + let bars = Bars::new(packet_amount, time_between_packets, time_to_block); + + bars.tick(); + + let (sender, receiver) = + create_task_pair(packet_amount, time_between_packets, time_to_block, bars)?; + + let receiver_handle = spawn(move || receiver.execute()); + + sender.execute()?; + receiver_handle.join().unwrap()?; + + Ok(()) +}