From 058b2261427e3881d2413945a0b9839feecb3422 Mon Sep 17 00:00:00 2001 From: Davide Baldo Date: Wed, 11 Dec 2024 16:39:30 +0100 Subject: [PATCH] feat(rust): add `tcp-test` tool to measure portal properties --- Cargo.lock | 58 +++++- Cargo.toml | 1 + NOTICE.md | 3 + tools/stress-test/deploy | 2 +- tools/tcp-test/Cargo.toml | 14 ++ tools/tcp-test/README.md | 89 +++++++++ tools/tcp-test/deploy | 35 ++++ tools/tcp-test/src/main.rs | 344 +++++++++++++++++++++++++++++++++++ tools/tcp-test/src/server.rs | 158 ++++++++++++++++ tools/tcp-test/src/tls.rs | 61 +++++++ 10 files changed, 760 insertions(+), 5 deletions(-) create mode 100644 tools/tcp-test/Cargo.toml create mode 100644 tools/tcp-test/README.md create mode 100755 tools/tcp-test/deploy create mode 100644 tools/tcp-test/src/main.rs create mode 100644 tools/tcp-test/src/server.rs create mode 100644 tools/tcp-test/src/tls.rs diff --git a/Cargo.lock b/Cargo.lock index 78fda9f186a..1480ddd9a09 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -592,6 +592,7 @@ dependencies = [ "aws-lc-fips-sys", "aws-lc-sys", "paste", + "untrusted 0.7.1", "zeroize", ] @@ -5420,6 +5421,16 @@ version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d61c5ce1153ab5b689d0c074c4e7fc613e942dfb7dd9eea5ab202d2ad91fe361" +[[package]] +name = "pem" +version = "3.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e459365e590736a54c3fa561947c84837534b8e9af6fc5bf781307e82658fae" +dependencies = [ + "base64 0.22.1", + "serde", +] + [[package]] name = "pem-rfc7468" version = "0.7.0" @@ -6054,6 +6065,19 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "rcgen" +version = "0.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "54077e1872c46788540de1ea3d7f4ccb1983d12f9aa909b234468676c1a36779" +dependencies = [ + "aws-lc-rs", + "pem", + "rustls-pki-types", + "time", + "yasna", +] + [[package]] name = "redox_syscall" version = "0.5.7" @@ -6208,7 +6232,7 @@ dependencies = [ "getrandom", "libc", "spin", - "untrusted", + "untrusted 0.9.0", "windows-sys 0.52.0", ] @@ -6410,7 +6434,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b6275d1ee7a1cd780b64aca7726599a1dbc893b1e64144529e55c3c2f745765" dependencies = [ "ring", - "untrusted", + "untrusted 0.9.0", ] [[package]] @@ -6422,7 +6446,7 @@ dependencies = [ "aws-lc-rs", "ring", "rustls-pki-types", - "untrusted", + "untrusted 0.9.0", ] [[package]] @@ -6528,7 +6552,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "da046153aa2352493d6cb7da4b6e5c0c057d8a1d0a9aa8560baffdd945acd414" dependencies = [ "ring", - "untrusted", + "untrusted 0.9.0", ] [[package]] @@ -7463,6 +7487,17 @@ version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "42a4d50cdb458045afc8131fd91b64904da29548bcb63c7236e0844936c13078" +[[package]] +name = "tcp-test" +version = "0.1.0" +dependencies = [ + "clap", + "rcgen", + "rustls 0.23.18", + "tokio", + "tokio-rustls 0.26.0", +] + [[package]] name = "tcp_inlet_and_outlet" version = "0.1.0" @@ -8156,6 +8191,12 @@ version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "eb066959b24b5196ae73cb057f45598450d2c5f71460e98c49b738086eff9c06" +[[package]] +name = "untrusted" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a156c684c91ea7d62626509bce3cb4e1d9ed5c4d978f7b4352658f96a4c26b4a" + [[package]] name = "untrusted" version = "0.9.0" @@ -8964,6 +9005,15 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cfe53a6657fd280eaa890a3bc59152892ffa3e30101319d168b781ed6529b049" +[[package]] +name = "yasna" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e17bb3549cc1321ae1296b9cdc2698e2b6cb1992adfa19a8c72e5b7a738f44cd" +dependencies = [ + "time", +] + [[package]] name = "yoke" version = "0.7.4" diff --git a/Cargo.toml b/Cargo.toml index fb034b5285c..00a2e57e076 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,6 +10,7 @@ members = [ "tools/docs/example_blocks", "tools/docs/example_test_helper", "tools/stress-test", + "tools/tcp-test", ] # Coverage profile for generating code coverage with grcov. diff --git a/NOTICE.md b/NOTICE.md index 072ebb3bde9..96a239bbe2a 100644 --- a/NOTICE.md +++ b/NOTICE.md @@ -377,6 +377,7 @@ This file contains attributions for any 3rd-party open source code used in this | parking_lot_core | MIT, Apache-2.0 | https://crates.io/crates/parking_lot_core | | paste | MIT, Apache-2.0 | https://crates.io/crates/paste | | pathdiff | MIT, Apache-2.0 | https://crates.io/crates/pathdiff | +| pem | MIT | https://crates.io/crates/pem | | pem-rfc7468 | Apache-2.0, MIT | https://crates.io/crates/pem-rfc7468 | | percent-encoding | MIT, Apache-2.0 | https://crates.io/crates/percent-encoding | | petname | Apache-2.0 | https://crates.io/crates/petname | @@ -420,6 +421,7 @@ This file contains attributions for any 3rd-party open source code used in this | rand_pcg | MIT, Apache-2.0 | https://crates.io/crates/rand_pcg | | rayon | MIT, Apache-2.0 | https://crates.io/crates/rayon | | rayon-core | MIT, Apache-2.0 | https://crates.io/crates/rayon-core | +| rcgen | MIT, Apache-2.0 | https://crates.io/crates/rcgen | | redox_syscall | MIT | https://crates.io/crates/redox_syscall | | reedline | MIT | https://crates.io/crates/reedline | | regex | MIT, Apache-2.0 | https://crates.io/crates/regex | @@ -649,6 +651,7 @@ This file contains attributions for any 3rd-party open source code used in this | xml-rs | MIT | https://crates.io/crates/xml-rs | | xmlparser | MIT, Apache-2.0 | https://crates.io/crates/xmlparser | | yaml-rust | MIT, Apache-2.0 | https://crates.io/crates/yaml-rust | +| yasna | MIT, Apache-2.0 | https://crates.io/crates/yasna | | yoke | Unicode-3.0 | https://crates.io/crates/yoke | | yoke-derive | Unicode-3.0 | https://crates.io/crates/yoke-derive | | zerocopy | BSD-2-Clause, Apache-2.0, MIT | https://crates.io/crates/zerocopy | diff --git a/tools/stress-test/deploy b/tools/stress-test/deploy index 454ac793df9..64d20297fa6 100755 --- a/tools/stress-test/deploy +++ b/tools/stress-test/deploy @@ -24,7 +24,7 @@ function scp { TARGET_ARCHITECTURE=$(ssh uname -m) # always use cross-compilation to avoid reliance on new and unsupported glibc versions -cross build --bin=stress-test --bin=ockam -F aws-lc --release --target "${TARGET_ARCHITECTURE}-unknown-linux-gnu" +DOCKER_DEFAULT_PLATFORM=linux/amd64 cross build --bin=stress-test --bin=ockam -F aws-lc --release --target "${TARGET_ARCHITECTURE}-unknown-linux-gnu" echo "uploading..." scp \ diff --git a/tools/tcp-test/Cargo.toml b/tools/tcp-test/Cargo.toml new file mode 100644 index 00000000000..19099a28251 --- /dev/null +++ b/tools/tcp-test/Cargo.toml @@ -0,0 +1,14 @@ +[package] +name = "tcp-test" +version = "0.1.0" +edition = "2021" +license = "Apache-2.0" +publish = false +readme = "README.md" + +[dependencies] +clap = { version = "4", features = ["derive", "cargo", "env"] } +rcgen = { version = "0.13", features = ["pem", "aws_lc_rs"], default-features = false } +rustls = { version = "0.23" } +tokio = { version = "1", features = ["full"] } +tokio-rustls = { version = "0.26", features = ["aws-lc-rs"] } diff --git a/tools/tcp-test/README.md b/tools/tcp-test/README.md new file mode 100644 index 00000000000..6bf85469366 --- /dev/null +++ b/tools/tcp-test/README.md @@ -0,0 +1,89 @@ +This tool is useful to test the portal capabilities and operates as a **generic** TCP client and server. +It also implements (unsafe) TLS, which is useful for performance comparison. + +# Deployment + +Can be easily deployed via ssh using the `deploy` script: +```bash +$ ./deploy user@host +``` + +# Actions + +# latency +Measures the latency of a TCP packets to a given host and port. +It must be used with an echo server. + +```bash +$ tcp-test echo 1234 +Listening on 0.0.0.0:1234 + +$ tcp-test latency 127.0.0.1:1234 +Connection + First RTT: 0ms 440us, Second RTT: 0ms 143us +Connection + First RTT: 0ms 192us, Second RTT: 0ms 160us +Connection + First RTT: 0ms 276us, Second RTT: 0ms 162us +Connection + First RTT: 0ms 326us, Second RTT: 0ms 155us +Connection + First RTT: 0ms 227us, Second RTT: 0ms 172us +Connection + First RTT: 0ms 250us, Second RTT: 0ms 111us +Connection + First RTT: 0ms 461us, Second RTT: 0ms 122us +Connection + First RTT: 0ms 326us, Second RTT: 0ms 116us +Connection + First RTT: 0ms 320us, Second RTT: 0ms 104us +Connection + First RTT: 0ms 270us, Second RTT: 0ms 101us +Average - Connection + First RTT: 0ms 309us, Second RTT: 0ms 135us +``` + +# flood +Floods a given host and port with connections until it fails or reaches the provided maximum. +It's meant to measure the maximum number of connections a portal can handle. +It must be used with an echo server. + +Example: +```bash +$ tcp-test echo 1234 +Listening on 0.0.0.0:1234 + +$ tcp-test flood 127.0.0.1:1234 +Failed to connect: Os { code: 49, kind: AddrNotAvailable, message: "Can't assign requested address" } +Flooded 16344 connections +Pinging each... +All connections succeeded +``` + +# throughput +Measures the throughput of a TCP connection to a given host and port. +It can be used to measure the throughput of a portal, especially to compare a portal against TLS. +Can be used in conjunction with a null server for results similar to `iperf3`, but echo can also be +used for a full bandwidth test. + +```bash +$ tcp-test null 1234 +Listening on 0.0.0.0:1234 + +$ tcp-test throughput 127.0.0.1:1234 +Outgoing Throughput: 70.44 Gbps +Outgoing Throughput: 75.12 Gbps +Outgoing Throughput: 75.44 Gbps +Outgoing Throughput: 78.43 Gbps +Outgoing Throughput: 71.89 Gbps +Outgoing Throughput: 69.80 Gbps +Outgoing Throughput: 75.13 Gbps +^C +``` + +## echo +It starts a TCP server that listens to on a given port and **echoes back** any data it receives. +It's meant to be used in conjunction with the other actions. + +```bash +$ tcp-test echo 1234 +Listening on 0.0.0.0:1234 +``` + +# null +It starts a TCP server that listens to on a given port and **discards** any data it receives. +It's meant to be used in conjunction with the other actions. + +```bash +$ tcp-test null 1234 +Listening on 0.0.0.0:1234 +``` diff --git a/tools/tcp-test/deploy b/tools/tcp-test/deploy new file mode 100755 index 00000000000..453104a0f8f --- /dev/null +++ b/tools/tcp-test/deploy @@ -0,0 +1,35 @@ +#!/bin/bash + +# This scripts deploys a tcp test client and an ockam client to a remote machine accessible via ssh. + +set -e + +cd "$(dirname "$0")"/../../ + +if [ "$#" -lt 1 ]; then + echo "usage: $0 <[username@]host>" + exit 1 +fi + +export SSH_ENDPOINT="$1" + +function ssh { + /usr/bin/env ssh -q -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null -o LogLevel=QUIET -o ConnectTimeout=5 "${SSH_ENDPOINT}" "$@" +} + +function scp { + /usr/bin/env scp -q -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null -o LogLevel=QUIET -o ConnectTimeout=5 "$@" +} + +TARGET_ARCHITECTURE=$(ssh uname -m) + +# always use cross-compilation to avoid reliance on new and unsupported glibc versions +DOCKER_DEFAULT_PLATFORM=linux/amd64 cross build --bin=tcp-test --bin=ockam -F aws-lc --release --target "${TARGET_ARCHITECTURE}-unknown-linux-gnu" +echo "uploading..." + +scp \ + "target/${TARGET_ARCHITECTURE}-unknown-linux-gnu/release/ockam" \ + "target/${TARGET_ARCHITECTURE}-unknown-linux-gnu/release/tcp-test" \ + "${SSH_ENDPOINT}:" + +ssh sudo mv ockam tcp-test /usr/local/bin/ diff --git a/tools/tcp-test/src/main.rs b/tools/tcp-test/src/main.rs new file mode 100644 index 00000000000..5222e874795 --- /dev/null +++ b/tools/tcp-test/src/main.rs @@ -0,0 +1,344 @@ +mod server; +mod tls; + +use clap::{Args, Parser, Subcommand}; +use rustls::pki_types::ServerName; +use server::ServerMode; +use std::fmt::Debug; +use std::net::SocketAddr; +use std::sync::Arc; +use std::time::Instant; +use tls::NoTlsValidation; +use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; +use tokio_rustls::rustls; + +const DEFAULT_BUFFER_SIZE: &str = "65536"; +const DEFAULT_BUFFER_POOL: &str = "4"; + +#[derive(Debug, Args, Clone)] +struct EchoCommand { + bind_port: u16, + #[arg(long, default_value = "false")] + tls: bool, + #[arg(long, default_value = DEFAULT_BUFFER_SIZE, env = "BUFFER_SIZE")] + buffer_size: usize, + #[arg(long, default_value = DEFAULT_BUFFER_POOL, env = "BUFFER_POOL")] + buffer_pool: u8, +} + +#[derive(Debug, Args, Clone)] +struct NullCommand { + bind_port: u16, + #[arg(long, default_value = "false")] + tls: bool, + #[arg(long, default_value = DEFAULT_BUFFER_SIZE, env = "BUFFER_SIZE")] + buffer_size: usize, +} + +#[derive(Debug, Args, Clone)] +struct LatencyCommand { + address: SocketAddr, + #[arg(long, default_value = "10")] + count: u32, + #[arg(long, default_value = "false")] + tls: bool, +} + +#[derive(Debug, Args, Clone)] +struct FloodCommand { + address: SocketAddr, + #[arg(long, default_value = "0")] + max: u32, +} + +#[derive(Debug, Args, Clone)] +struct ThroughputCommand { + address: SocketAddr, + #[arg(long, default_value = "false")] + tls: bool, + #[arg(long, default_value = DEFAULT_BUFFER_SIZE, env = "BUFFER_SIZE")] + buffer_size: usize, +} + +#[derive(Subcommand, Debug, Clone)] +enum Action { + /// Measure the latency of a TCP connection + Latency(LatencyCommand), + /// Flood a TCP port with connections until it fails + Flood(FloodCommand), + /// Measure the throughput of TCP connection. + /// Use it with `Null` or `Echo` servers. + Throughput(ThroughputCommand), + /// Run a TCP echo server + Echo(EchoCommand), + /// Run a TCP server, discards all incoming data + Null(NullCommand), +} + +#[derive(Debug, Parser, Clone)] +#[command(name = "tcp-tester")] +struct Main { + /// Action to perform + #[command(subcommand)] + action: Action, +} + +#[tokio::main] +async fn main() { + let main: Main = Main::parse(); + rustls::crypto::aws_lc_rs::default_provider() + .install_default() + .unwrap(); + + match main.action { + Action::Echo(cmd) => { + server::start_server( + cmd.bind_port, + cmd.tls, + ServerMode::Echo(cmd.buffer_size, cmd.buffer_pool), + ) + .await + } + Action::Null(cmd) => { + server::start_server(cmd.bind_port, cmd.tls, ServerMode::Null(cmd.buffer_size)).await + } + Action::Latency(cmd) => latency(cmd).await, + Action::Flood(cmd) => flood(cmd).await, + Action::Throughput(cmd) => throughput(cmd).await, + } +} + +async fn throughput(command: ThroughputCommand) { + let connector = if command.tls { + let config = Arc::new( + rustls::ClientConfig::builder() + .dangerous() + .with_custom_certificate_verifier(Arc::new(NoTlsValidation)) + .with_no_client_auth(), + ); + Some(tokio_rustls::TlsConnector::from(config)) + } else { + None + }; + + let stream = tokio::net::TcpStream::connect(command.address) + .await + .unwrap(); + stream.set_nodelay(true).unwrap(); + let mut stream_write: Box; + let mut stream_read: Box; + + if let Some(connector) = &connector { + let result = connector + .connect(ServerName::IpAddress(command.address.ip().into()), stream) + .await; + match result { + Ok(stream) => { + let (r, w) = tokio::io::split(stream); + stream_read = Box::new(r); + stream_write = Box::new(w); + } + Err(error) => { + println!("Failed to connect: {:?}", error); + return; + } + } + } else { + let (r, w) = tokio::io::split(stream); + stream_read = Box::new(r); + stream_write = Box::new(w); + } + + // read as much as possible and discard the result + tokio::spawn(async move { + let mut incoming_buffer = vec![0u8; command.buffer_size]; + loop { + let result = stream_read.read(&mut incoming_buffer).await; + if let Err(err) = result { + println!("Failed to read: {:?}", err); + return; + } + } + }); + + // read and report the speed + let outgoing_buffer = vec![0u8; command.buffer_size]; + let mut total_bytes = 0; + let mut iterations = 0; + + let mut start = Instant::now(); + loop { + let result = stream_write.write_all(&outgoing_buffer).await; + iterations += 1; + match result { + Ok(()) => { + total_bytes += outgoing_buffer.len(); + } + Err(err) => { + println!("Failed to write: {:?}", err); + return; + } + } + + if iterations > 10_000 { + iterations = 0; + let elapsed = start.elapsed(); + let seconds = elapsed.as_secs(); + if seconds >= 1 { + println!( + "Outgoing Throughput: {:.2} Gbps", + (total_bytes as f32 / elapsed.as_secs_f32()) * 8.0 / 1_000_000_000.0 + ); + start = Instant::now(); + total_bytes = 0; + } + } + } +} + +async fn latency(command: LatencyCommand) { + let mut first_rtt_measurements = Vec::new(); + let mut second_rtt_measurements = Vec::new(); + let mut incoming_buffer = [0u8; 5]; + let tls_connector = if command.tls { + let config = Arc::new( + rustls::ClientConfig::builder() + .dangerous() + .with_custom_certificate_verifier(Arc::new(NoTlsValidation)) + .with_no_client_auth(), + ); + Some(tokio_rustls::TlsConnector::from(config)) + } else { + None + }; + + for _ in 0..command.count { + let result = tokio::net::TcpStream::connect(command.address).await; + match result { + Ok(stream) => { + stream.set_nodelay(true).unwrap(); + + let mut stream_write: Box; + let mut stream_read: Box; + + let start = Instant::now(); + + if let Some(tls_connector) = &tls_connector { + let result = tls_connector + .connect(ServerName::IpAddress(command.address.ip().into()), stream) + .await; + match result { + Ok(stream) => { + let (r, w) = tokio::io::split(stream); + stream_read = Box::new(r); + stream_write = Box::new(w); + } + Err(error) => { + println!("Failed to connect: {:?}", error); + continue; + } + } + } else { + let (r, w) = tokio::io::split(stream); + stream_read = Box::new(r); + stream_write = Box::new(w); + } + + let result = stream_write.write_all("hello".as_bytes()).await; + if let Err(err) = result { + println!("Failed to write: {:?}", err); + continue; + } + let result = stream_read.read_exact(&mut incoming_buffer).await; + if let Err(err) = result { + println!("Failed to read: {:?}", err); + continue; + } + let first_rtt = start.elapsed(); + first_rtt_measurements.push(first_rtt); + + let result = stream_write.write_all("hello".as_bytes()).await; + if let Err(err) = result { + println!("Failed to write: {:?}", err); + continue; + } + let result = stream_read.read_exact(&mut incoming_buffer).await; + if let Err(err) = result { + println!("Failed to read: {:?}", err); + continue; + } + let second_rtt = start.elapsed() - first_rtt; + second_rtt_measurements.push(second_rtt); + println!( + "Connection + First RTT: {}ms {}us, Second RTT: {}ms {}us", + first_rtt.as_millis(), + first_rtt.as_micros(), + second_rtt.as_millis(), + second_rtt.as_micros() + ); + } + Err(err) => { + println!("Failed to connect: {:?}", err); + } + } + } + + let total: std::time::Duration = first_rtt_measurements.iter().sum(); + let first_average = total / command.count; + let total: std::time::Duration = second_rtt_measurements.iter().sum(); + let second_average = total / command.count; + println!( + "Average - Connection + First RTT: {}ms {}us, Second RTT: {}ms {}us", + first_average.as_millis(), + first_average.as_micros(), + second_average.as_millis(), + second_average.as_micros(), + ); +} + +async fn flood(command: FloodCommand) { + let mut connections = Vec::new(); + let mut counter = 0; + loop { + let result = tokio::net::TcpStream::connect(command.address).await; + match result { + Ok(stream) => { + stream.set_nodelay(true).unwrap(); + connections.push(stream); + counter += 1; + if command.max > 0 && counter >= command.max { + break; + } + } + Err(err) => { + println!("Failed to connect: {:?}", err); + break; + } + } + } + + println!("Flooded {} connections", connections.len()); + println!("Pinging each..."); + + let mut failed = 0; + let mut incoming_buffer = [0u8; 5]; + for mut stream in connections { + let result = stream.write_all("hello".as_bytes()).await; + if let Err(err) = result { + println!("Failed to write: {:?}", err); + failed += 1; + continue; + } + let result = stream.read_exact(&mut incoming_buffer).await; + if let Err(err) = result { + println!("Failed to read: {:?}", err); + failed += 1; + } + } + + if failed == 0 { + println!("All connections succeeded"); + } else { + println!("{} connections failed", failed); + } +} diff --git a/tools/tcp-test/src/server.rs b/tools/tcp-test/src/server.rs new file mode 100644 index 00000000000..8151c9b99e2 --- /dev/null +++ b/tools/tcp-test/src/server.rs @@ -0,0 +1,158 @@ +use rustls::pki_types::pem::PemObject; +use rustls::pki_types::{CertificateDer, PrivateKeyDer}; +use std::collections::VecDeque; +use std::net::{IpAddr, SocketAddr}; +use std::sync::{Arc, Mutex}; +use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; +use tokio::sync::Notify; + +#[derive(Copy, Clone)] +pub enum ServerMode { + Echo(usize, u8), + Null(usize), +} + +pub async fn start_server(bind_port: u16, tls: bool, mode: ServerMode) { + let bind_address = SocketAddr::new(IpAddr::from([0, 0, 0, 0]), bind_port); + let listener = tokio::net::TcpListener::bind(bind_address).await.unwrap(); + + let acceptor = if tls { + println!("Listening TLS on {}", listener.local_addr().unwrap()); + let self_signed = rcgen::generate_simple_self_signed(vec!["localhost".into()]).unwrap(); + let config = Arc::new( + rustls::ServerConfig::builder() + .with_no_client_auth() + .with_single_cert( + vec![ + CertificateDer::from_pem_slice(self_signed.cert.pem().as_bytes()).unwrap(), + ], + PrivateKeyDer::try_from(self_signed.key_pair.serialize_der()).unwrap(), + ) + .unwrap(), + ); + Some(tokio_rustls::TlsAcceptor::from(config)) + } else { + println!("Listening on {}", listener.local_addr().unwrap()); + None + }; + + loop { + if let Ok((stream, _)) = listener.accept().await { + stream.set_nodelay(true).unwrap(); + if let Some(acceptor) = &acceptor { + let result = acceptor.accept(stream).await; + match result { + Ok(stream) => { + let (r, w) = tokio::io::split(stream); + let mode = mode; + tokio::spawn(async move { server(r, w, mode).await }); + } + Err(err) => { + println!("Failed to accept TLS connection: {:?}", err); + } + } + } else { + let (r, w) = tokio::io::split(stream); + let mode = mode; + tokio::spawn(async move { server(r, w, mode).await }); + } + } else { + println!("Failed to accept connection"); + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + } + } +} + +async fn server( + stream_reader: impl AsyncRead + Unpin + Send + 'static, + stream_writer: impl AsyncWrite + Unpin + Send + 'static, + mode: ServerMode, +) { + match mode { + ServerMode::Echo(buffer_size, pool_size) => { + fast_echo(stream_reader, stream_writer, buffer_size, pool_size).await; + } + ServerMode::Null(buffer_size) => { + let mut stream_reader = stream_reader; + let mut buffer = vec![0u8; buffer_size]; + loop { + let result = stream_reader.read(&mut buffer).await; + if let Err(_err) = result { + return; + } + } + } + } +} + +// To make echo as fast as possible, we want both reading and writing to be done in parallel and +// avoid any allocations. +// The echo server has two queues of buffers: incoming and outgoing. +// Once the incoming buffer is ready, it's moved to the outgoing buffer queue. +// Once the outgoing buffer is ready, it's moved back to the incoming buffer queue. +async fn fast_echo( + mut stream_reader: impl AsyncRead + Unpin + Send + 'static, + mut stream_writer: impl AsyncWrite + Unpin + Send + 'static, + buffer_size: usize, + pool_size: u8, +) { + let new_outgoing_buffer_event = Arc::new(Notify::new()); + let new_incoming_buffer_event = Arc::new(Notify::new()); + + let outgoing_buffers = Arc::new(Mutex::new(VecDeque::with_capacity(pool_size as usize))); + let incoming_buffers = { + let mut queue = VecDeque::with_capacity(pool_size as usize); + for _ in 0..queue.capacity() { + queue.push_back(vec![0u8; buffer_size]); + } + Arc::new(Mutex::new(queue)) + }; + + { + let incoming_buffers = incoming_buffers.clone(); + let new_incoming_buffer = new_incoming_buffer_event.clone(); + let outgoing_buffers = outgoing_buffers.clone(); + let new_outgoing_buffer = new_outgoing_buffer_event.clone(); + + tokio::spawn(async move { + loop { + let incoming_buffer = incoming_buffers.lock().unwrap().pop_front(); + if let Some(mut incoming_buffer) = incoming_buffer { + let result = stream_reader.read(&mut incoming_buffer).await; + match result { + Ok(bytes) => { + outgoing_buffers + .lock() + .unwrap() + .push_back((bytes, incoming_buffer)); + new_outgoing_buffer.notify_one(); + } + Err(_err) => { + return; + } + } + } else { + new_incoming_buffer.notified().await; + } + } + }); + } + + loop { + let outgoing_buffer = outgoing_buffers.lock().unwrap().pop_front(); + if let Some((bytes, outgoing_buffer)) = outgoing_buffer { + let result = stream_writer.write_all(&outgoing_buffer[..bytes]).await; + match result { + Ok(()) => { + incoming_buffers.lock().unwrap().push_back(outgoing_buffer); + new_incoming_buffer_event.notify_one(); + } + Err(_err) => { + return; + } + } + } else { + new_outgoing_buffer_event.notified().await; + } + } +} diff --git a/tools/tcp-test/src/tls.rs b/tools/tcp-test/src/tls.rs new file mode 100644 index 00000000000..90e77ccf18e --- /dev/null +++ b/tools/tcp-test/src/tls.rs @@ -0,0 +1,61 @@ +use rustls::client::danger::{HandshakeSignatureValid, ServerCertVerified, ServerCertVerifier}; +use rustls::pki_types::{CertificateDer, ServerName, UnixTime}; +use rustls::{DigitallySignedStruct, Error, SignatureScheme}; +use std::fmt::{Debug, Formatter}; + +pub struct NoTlsValidation; + +impl Debug for NoTlsValidation { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("NoTlsValidation").finish() + } +} + +impl ServerCertVerifier for NoTlsValidation { + fn verify_server_cert( + &self, + _end_entity: &CertificateDer<'_>, + _intermediates: &[CertificateDer<'_>], + _server_name: &ServerName<'_>, + _ocsp_response: &[u8], + _now: UnixTime, + ) -> Result { + Ok(ServerCertVerified::assertion()) + } + + fn verify_tls12_signature( + &self, + _message: &[u8], + _cert: &CertificateDer<'_>, + _dss: &DigitallySignedStruct, + ) -> Result { + Ok(HandshakeSignatureValid::assertion()) + } + + fn verify_tls13_signature( + &self, + _message: &[u8], + _cert: &CertificateDer<'_>, + _dss: &DigitallySignedStruct, + ) -> Result { + Ok(HandshakeSignatureValid::assertion()) + } + + fn supported_verify_schemes(&self) -> Vec { + vec![ + SignatureScheme::RSA_PKCS1_SHA1, + SignatureScheme::ECDSA_SHA1_Legacy, + SignatureScheme::RSA_PKCS1_SHA256, + SignatureScheme::ECDSA_NISTP256_SHA256, + SignatureScheme::RSA_PKCS1_SHA384, + SignatureScheme::ECDSA_NISTP384_SHA384, + SignatureScheme::RSA_PKCS1_SHA512, + SignatureScheme::ECDSA_NISTP521_SHA512, + SignatureScheme::RSA_PSS_SHA256, + SignatureScheme::RSA_PSS_SHA384, + SignatureScheme::RSA_PSS_SHA512, + SignatureScheme::ED25519, + SignatureScheme::ED448, + ] + } +}