From 7f702fddacb9852bed4acf854bf82e9dbc2f4c10 Mon Sep 17 00:00:00 2001 From: Stuart Stock Date: Wed, 12 Jun 2024 16:45:30 -0500 Subject: [PATCH 1/4] experiment with per-thread binding --- Cargo.toml | 1 + src/bin/roughenough-server.rs | 44 ++++++++++++++++++++++------------- src/server.rs | 12 ++++------ 3 files changed, 34 insertions(+), 23 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 0f644ea..251edfc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,6 +26,7 @@ humansize = "1" log = "0.4" mio = "0.6" mio-extras = "2.0" +nix = { version = "0.29", features = ["socket"] } once_cell = "1.19" rand = "0.6" ring = "0.16" diff --git a/src/bin/roughenough-server.rs b/src/bin/roughenough-server.rs index 157cf97..31670ba 100644 --- a/src/bin/roughenough-server.rs +++ b/src/bin/roughenough-server.rs @@ -23,14 +23,18 @@ #[macro_use] extern crate log; +use std::{env, thread}; +use std::net::UdpSocket as StdUdpSocket; +use std::os::fd::{AsRawFd, FromRawFd}; use std::process; -use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, Mutex}; -use std::{env, thread}; +use std::sync::atomic::{AtomicBool, Ordering}; use log::LevelFilter; -use mio::net::UdpSocket; use mio::Events; +use mio::net::UdpSocket; +use nix::sys::socket::{setsockopt, sockopt::ReusePort}; +use nix::sys::socket::sockopt::ReuseAddr; use once_cell::sync::Lazy; use simple_logger::SimpleLogger; @@ -43,7 +47,8 @@ use roughenough::server::Server; // the Ctrl-C (SIGINT) handler created in `set_ctrlc_handler()` static KEEP_RUNNING: Lazy = Lazy::new(|| AtomicBool::new(true)); -fn polling_loop(cfg: Arc>>, socket: Arc) { +fn polling_loop(cfg: Arc>>) { + let socket = bind_socket(cfg.clone()); let mut server = { let config = cfg.lock().unwrap(); let server = Server::new(config.as_ref(), socket); @@ -108,6 +113,24 @@ fn display_config(server: &Server, cfg: &dyn ServerConfig) { } } +fn bind_socket(config: Arc>>) -> UdpSocket { + let sock_addr = config + .lock() + .unwrap() + .udp_socket_addr() + .expect("udp sock addr"); + + let socket = UdpSocket::bind(&sock_addr).expect("failed to bind to socket"); + + unsafe { + let std_sock = StdUdpSocket::from_raw_fd(socket.as_raw_fd()); + setsockopt(&std_sock, ReusePort, &true).expect("setting SO_REUSEPORT"); + setsockopt(&std_sock, ReuseAddr, &true).expect("setting SO_REUSEADDR"); + } + + socket +} + pub fn main() { SimpleLogger::new() .with_level(LevelFilter::Info) @@ -133,16 +156,6 @@ pub fn main() { Ok(cfg) => Arc::new(Mutex::new(cfg)), }; - let socket = { - let sock_addr = config - .lock() - .unwrap() - .udp_socket_addr() - .expect("udp sock addr"); - let sock = UdpSocket::bind(&sock_addr).expect("failed to bind to socket"); - Arc::new(sock) - }; - set_ctrlc_handler(); // TODO(stuart) move TCP healthcheck out of worker threads as it currently conflicts @@ -150,10 +163,9 @@ pub fn main() { for i in 0..config.lock().unwrap().num_workers() { let cfg = config.clone(); - let sock = socket.try_clone().unwrap(); let thread = thread::Builder::new() .name(format!("worker-{}", i)) - .spawn(move || polling_loop(cfg, sock.into())) + .spawn(move || polling_loop(cfg) ) .expect("failure spawning thread"); threads.push(thread); diff --git a/src/server.rs b/src/server.rs index 20d05b8..48198ec 100644 --- a/src/server.rs +++ b/src/server.rs @@ -19,7 +19,6 @@ use std::io::ErrorKind; use std::io::Write; use std::net::{IpAddr, Shutdown, SocketAddr}; -use std::sync::Arc; use std::thread; use std::time::Duration; @@ -56,7 +55,7 @@ const HTTP_RESPONSE: &str = "HTTP/1.1 200 OK\nContent-Length: 0\nConnection: clo /// pub struct Server { batch_size: u8, - socket: Arc, + socket: UdpSocket, health_listener: Option, poll_duration: Option, status_interval: Duration, @@ -80,7 +79,7 @@ impl Server { /// Create a new server instance from the provided /// [`ServerConfig`](../config/trait.ServerConfig.html) trait object instance. /// - pub fn new(config: &dyn ServerConfig, socket: Arc) -> Server { + pub fn new(config: &dyn ServerConfig, socket: UdpSocket) -> Server { let mut timer: Timer<()> = Timer::default(); timer.set_timeout(config.status_interval(), ()); @@ -186,13 +185,12 @@ impl Server { let socket_now_empty = self.collect_requests(); - let sock_copy = Arc::get_mut(&mut self.socket).unwrap(); self.responder_rfc - .send_responses(sock_copy, &mut self.stats); + .send_responses(&mut self.socket, &mut self.stats); self.responder_draft - .send_responses(sock_copy, &mut self.stats); + .send_responses(&mut self.socket, &mut self.stats); self.responder_classic - .send_responses(sock_copy, &mut self.stats); + .send_responses(&mut self.socket, &mut self.stats); if socket_now_empty { break; From c675201cb2fc6ff15dac88c20c31545ff167b7b5 Mon Sep 17 00:00:00 2001 From: Stuart Stock Date: Thu, 13 Jun 2024 14:12:10 -0500 Subject: [PATCH 2/4] Update dependencies to recent versions. --- Cargo.toml | 20 ++++++++++---------- src/merkle.rs | 8 ++++---- 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 251edfc..15d3c82 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "roughenough" -version = "1.2.1-draft8" +version = "1.3.0-draft8" repository = "https://github.com/int08h/roughenough" authors = ["Stuart Stock ", "Aaron Hill "] license = "Apache-2.0" @@ -20,21 +20,21 @@ gcpkms = ["google-cloudkms1", "hyper", "hyper-rustls", "serde", "serde_json", "y [dependencies] byteorder = "1" chrono = "0.4" -clap = "2" -ctrlc = { version = "3.2", features = ["termination"] } -humansize = "1" +clap = "4" +ctrlc = { version = "3.4", features = ["termination"] } +humansize = "2" log = "0.4" mio = "0.6" mio-extras = "2.0" -nix = { version = "0.29", features = ["socket"] } +net2 = "0.2" once_cell = "1.19" rand = "0.6" -ring = "0.16" -simple_logger = "1" +ring = "0.17" +simple_logger = "5" yaml-rust = "0.4" -zeroize = "1.4" -data-encoding = "2.3" -enum-iterator = "2.0" +zeroize = "1.8" +data-encoding = "2.6" +enum-iterator = "2.1" # Used by 'awskms' and 'gcpkms' futures = { version = "^0.3", optional = true } diff --git a/src/merkle.rs b/src/merkle.rs index 82db37e..67ce263 100644 --- a/src/merkle.rs +++ b/src/merkle.rs @@ -65,7 +65,7 @@ impl MerkleTree { } pub fn get_paths(&self, mut index: usize) -> Vec { - let mut paths = Vec::with_capacity(self.levels.len() * self.algorithm.output_len); + let mut paths = Vec::with_capacity(self.levels.len() * self.algorithm.output_len()); let mut level = 0; while !self.levels[level].is_empty() { @@ -95,7 +95,7 @@ impl MerkleTree { } if node_count % 2 != 0 { - self.levels[level - 1].push(vec![0; self.algorithm.output_len]); + self.levels[level - 1].push(vec![0; self.algorithm.output_len()]); node_count += 1; } @@ -146,9 +146,9 @@ impl MerkleTree { Hash::from(ctx.finish().as_ref()) }; - assert_eq!(paths.len() % self.algorithm.output_len, 0); + assert_eq!(paths.len() % self.algorithm.output_len(), 0); - for path in paths.chunks(self.algorithm.output_len) { + for path in paths.chunks(self.algorithm.output_len()) { let mut ctx = digest::Context::new(self.algorithm); ctx.update(TREE_NODE_TWEAK); From f462b48916c8046454e3bbca6f05cd860b2c113d Mon Sep 17 00:00:00 2001 From: Stuart Stock Date: Thu, 13 Jun 2024 14:13:25 -0500 Subject: [PATCH 3/4] Bind server port using REUSEADDR and REUSEPORT. Bind server socket with SO_REUSE{ADDR,PORT} socket options to more fairly distribute load to worker threads. --- src/bin/roughenough-server.rs | 58 +++++++++++++++++------------------ src/server.rs | 6 ++-- 2 files changed, 32 insertions(+), 32 deletions(-) diff --git a/src/bin/roughenough-server.rs b/src/bin/roughenough-server.rs index 31670ba..77ab93a 100644 --- a/src/bin/roughenough-server.rs +++ b/src/bin/roughenough-server.rs @@ -23,9 +23,7 @@ #[macro_use] extern crate log; -use std::{env, thread}; -use std::net::UdpSocket as StdUdpSocket; -use std::os::fd::{AsRawFd, FromRawFd}; +use std::{env, io, thread}; use std::process; use std::sync::{Arc, Mutex}; use std::sync::atomic::{AtomicBool, Ordering}; @@ -33,8 +31,8 @@ use std::sync::atomic::{AtomicBool, Ordering}; use log::LevelFilter; use mio::Events; use mio::net::UdpSocket; -use nix::sys::socket::{setsockopt, sockopt::ReusePort}; -use nix::sys::socket::sockopt::ReuseAddr; +use net2::UdpBuilder; +use net2::unix::UnixUdpBuilderExt; use once_cell::sync::Lazy; use simple_logger::SimpleLogger; @@ -47,8 +45,7 @@ use roughenough::server::Server; // the Ctrl-C (SIGINT) handler created in `set_ctrlc_handler()` static KEEP_RUNNING: Lazy = Lazy::new(|| AtomicBool::new(true)); -fn polling_loop(cfg: Arc>>) { - let socket = bind_socket(cfg.clone()); +fn polling_loop(cfg: Arc>>, socket: UdpSocket) { let mut server = { let config = cfg.lock().unwrap(); let server = Server::new(config.as_ref(), socket); @@ -57,7 +54,7 @@ fn polling_loop(cfg: Arc>>) { server }; - let mut events = Events::with_capacity(2048); + let mut events = Events::with_capacity(1024); loop { server.process_events(&mut events); @@ -74,6 +71,24 @@ fn set_ctrlc_handler() { .expect("failed setting Ctrl-C handler"); } +// Bind to the server port using SO_REUSEPORT and SO_REUSEADDR so the kernel will more fairly +// balance traffic to each worker. https://lwn.net/Articles/542629/ +fn bind_socket(config: Arc>>) -> io::Result { + let sock_addr = config + .lock() + .unwrap() + .udp_socket_addr() + .expect("udp sock addr"); + + let std_socket = UdpBuilder::new_v4()? + .reuse_address(true)? + .reuse_port(true)? + .bind(sock_addr)?; + + let mio_socket: UdpSocket = UdpSocket::from_socket(std_socket)?; + Ok(mio_socket) +} + fn display_config(server: &Server, cfg: &dyn ServerConfig) { info!("Processing thread : {}", server.thread_name()); info!("Number of workers : {}", cfg.num_workers()); @@ -113,24 +128,6 @@ fn display_config(server: &Server, cfg: &dyn ServerConfig) { } } -fn bind_socket(config: Arc>>) -> UdpSocket { - let sock_addr = config - .lock() - .unwrap() - .udp_socket_addr() - .expect("udp sock addr"); - - let socket = UdpSocket::bind(&sock_addr).expect("failed to bind to socket"); - - unsafe { - let std_sock = StdUdpSocket::from_raw_fd(socket.as_raw_fd()); - setsockopt(&std_sock, ReusePort, &true).expect("setting SO_REUSEPORT"); - setsockopt(&std_sock, ReuseAddr, &true).expect("setting SO_REUSEADDR"); - } - - socket -} - pub fn main() { SimpleLogger::new() .with_level(LevelFilter::Info) @@ -158,14 +155,17 @@ pub fn main() { set_ctrlc_handler(); - // TODO(stuart) move TCP healthcheck out of worker threads as it currently conflicts + // TODO(stuart) TCP healthcheck REUSEADDR and RESUSEPORT on the tcp socket + let mut threads = Vec::new(); - for i in 0..config.lock().unwrap().num_workers() { + let num_workers = config.lock().unwrap().num_workers(); + for i in 0..num_workers { let cfg = config.clone(); + let socket = bind_socket(cfg.clone()).unwrap(); let thread = thread::Builder::new() .name(format!("worker-{}", i)) - .spawn(move || polling_loop(cfg) ) + .spawn(move || polling_loop(cfg.clone(), socket) ) .expect("failure spawning thread"); threads.push(thread); diff --git a/src/server.rs b/src/server.rs index 48198ec..2d0c6e6 100644 --- a/src/server.rs +++ b/src/server.rs @@ -22,7 +22,7 @@ use std::net::{IpAddr, Shutdown, SocketAddr}; use std::thread; use std::time::Duration; -use humansize::{file_size_opts as fsopts, FileSize}; +use humansize::{BINARY, format_size}; use mio::net::{TcpListener, UdpSocket}; use mio::{Events, Poll, PollOpt, Ready, Token}; use mio_extras::timer::Timer; @@ -293,7 +293,7 @@ impl Server { counts.invalid_requests, counts.classic_responses_sent, counts.rfc_responses_sent, - counts.bytes_sent.file_size(fsopts::BINARY).unwrap(), + format_size(counts.bytes_sent, BINARY), counts.failed_send_attempts, counts.retried_send_attempts ); @@ -310,7 +310,7 @@ impl Server { self.stats.total_responses_sent(), self.stats.num_classic_responses_sent(), self.stats.num_rfc_responses_sent(), - self.stats.total_bytes_sent().file_size(fsopts::BINARY).unwrap(), + format_size(self.stats.total_bytes_sent(), BINARY), self.stats.total_failed_send_attempts(), self.stats.total_retried_send_attempts() ); From 40f60490d850c3fe3a5b5958b8a1d776018e4fe1 Mon Sep 17 00:00:00 2001 From: Stuart Stock Date: Thu, 13 Jun 2024 14:13:43 -0500 Subject: [PATCH 4/4] Update version to 1.3.0 --- src/lib.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 9407c04..2a16da2 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,4 +1,4 @@ -// Copyright 2017-2021 int08h LLC +// Copyright 2017-2024 int08h LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -16,8 +16,8 @@ //! An implementation of the [Roughtime](https://roughtime.googlesource.com/roughtime) //! secure time synchronization protocol. //! -//! Roughtime aims to achieve rough time synchronisation in a secure way that doesn't -//! depend on any particular time server, and in such a way that, if a time server does +//! Roughtime aims to achieve rough time synchronization in a secure way that doesn't +//! depend on any particular timeserver, and in such a way that, if a timeserver does //! misbehave, clients end up with cryptographic proof of it. //! //! # Protocol @@ -79,7 +79,7 @@ pub mod stats; pub mod version; /// Version of Roughenough -pub const VERSION: &str = "1.2.1-draft8"; +pub const VERSION: &str = "1.3.0-draft8"; /// Roughenough version string enriched with any compile-time optional features pub fn roughenough_version() -> String {