diff --git a/Cargo.toml b/Cargo.toml index 481aaa9..3a99170 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,6 +26,7 @@ humansize = "1" log = "0.4" mio = "0.6" mio-extras = "2.0" +once_cell = "1.19" rand = "0.6" ring = "0.16" simple_logger = "1" diff --git a/src/bin/roughenough-server.rs b/src/bin/roughenough-server.rs index 1e4b7de..a6d0cf5 100644 --- a/src/bin/roughenough-server.rs +++ b/src/bin/roughenough-server.rs @@ -23,13 +23,15 @@ #[macro_use] extern crate log; -use std::env; +use std::{env, thread}; use std::process; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use std::sync::atomic::{AtomicBool, Ordering}; use log::LevelFilter; use mio::Events; +use mio::net::UdpSocket; +use once_cell::sync::Lazy; use simple_logger::SimpleLogger; use roughenough::config; @@ -37,29 +39,38 @@ use roughenough::config::ServerConfig; use roughenough::roughenough_version; use roughenough::server::Server; -fn polling_loop(config: Box) { - let mut server = Server::new(config.as_ref()); - let keep_running = Arc::new(AtomicBool::new(true)); +// All processing threads poll this. Starts TRUE and will be set to FASLE by +// the Ctrl-C (SIGINT) handler created in `set_ctrlc_handler()` +static KEEP_RUNNING: Lazy = Lazy::new(|| AtomicBool::new(true)); - display_config(&server, config.as_ref()); +fn polling_loop(cfg: Arc>>, socket: Arc) { + let mut server = { + let config = cfg.lock().unwrap(); + let server = Server::new(config.as_ref(), socket); - let kr_clone = keep_running.clone(); - ctrlc::set_handler(move || kr_clone.store(false, Ordering::Release)) - .expect("failed setting Ctrl-C handler"); + display_config(&server, config.as_ref()); + server + }; - let mut events = Events::with_capacity(1024); + let mut events = Events::with_capacity(2048); loop { server.process_events(&mut events); - if !keep_running.load(Ordering::Acquire) { + if !KEEP_RUNNING.load(Ordering::Acquire) { warn!("Ctrl-C caught, exiting..."); return; } } } +fn set_ctrlc_handler() { + ctrlc::set_handler(move || KEEP_RUNNING.store(false, Ordering::Release)) + .expect("failed setting Ctrl-C handler"); +} + fn display_config(server: &Server, cfg: &dyn ServerConfig) { + info!("Processing thread : {}", server.thread_name()); info!("Long-term public key : {}", server.get_public_key()); info!("Max response batch size : {}", cfg.batch_size()); info!( @@ -117,10 +128,34 @@ pub fn main() { process::exit(1) } Ok(ref cfg) if !config::is_valid_config(cfg.as_ref()) => process::exit(1), - Ok(cfg) => cfg, + Ok(cfg) => Arc::new(Mutex::new(cfg)), + }; + + let sock_addr = config.lock().unwrap().udp_socket_addr().expect("udp sock addr"); + let socket = { + let sock = UdpSocket::bind(&sock_addr).expect("failed to bind to socket"); + Arc::new(sock) }; - polling_loop(config); + set_ctrlc_handler(); + + // TODO(stuart) pull TCP healthcheck out of worker threads + let mut thread_handles = Vec::new(); + + for i in 0 .. 4 { + let cfg = config.clone(); + let sock = socket.try_clone().unwrap(); + let thrd = thread::Builder::new() + .name(format!("worker-{}", i)) + .spawn(move || polling_loop(cfg, sock.into())) + .unwrap(); + + thread_handles.push(thrd); + } + + for handle in thread_handles { + handle.join().unwrap(); + } info!("Done."); process::exit(0); diff --git a/src/config/mod.rs b/src/config/mod.rs index 7897385..a72664d 100644 --- a/src/config/mod.rs +++ b/src/config/mod.rs @@ -67,7 +67,7 @@ pub const DEFAULT_STATUS_INTERVAL: Duration = Duration::from_secs(600); /// * [EnvironmentConfig](struct.EnvironmentConfig.html) - configure via environment variables /// * [MemoryConfig](struct.MemoryConfig.html) - in-memory configuration for testing /// -pub trait ServerConfig { +pub trait ServerConfig : Send { /// [Required] IP address or interface name to listen for client requests fn interface(&self) -> &str; diff --git a/src/responder.rs b/src/responder.rs index b51876f..f29699f 100644 --- a/src/responder.rs +++ b/src/responder.rs @@ -17,6 +17,7 @@ //! use std::net::SocketAddr; +use std::thread; use std::time::SystemTime; use byteorder::{LittleEndian, WriteBytesExt}; @@ -41,6 +42,7 @@ pub struct Responder { requests: Vec<(Vec, SocketAddr)>, merkle: MerkleTree, grease: Grease, + thread_id: String, } impl Responder { @@ -50,6 +52,7 @@ impl Responder { let long_term_public_key = HEX.encode(ltk.public_key()); let requests = Vec::with_capacity(config.batch_size() as usize); let grease = Grease::new(config.fault_percentage()); + let thread_id = thread::current().name().unwrap().to_string(); let merkle = if version == Version::Classic { MerkleTree::new_sha512() @@ -65,6 +68,7 @@ impl Responder { merkle, requests, grease, + thread_id, } } @@ -120,7 +124,8 @@ impl Responder { } debug!( - "Responded {} {} bytes to {} for '{}..' (#{} in batch)", + "Thread {} responded {} {} bytes to {} for '{}..' (#{} in batch)", + thread::current().name().unwrap(), self.version, bytes_sent, src_addr, @@ -180,4 +185,8 @@ impl Responder { pub fn get_online_key(&self) -> &OnlineKey { &self.online_key } + + pub fn get_thread_id(&self) -> &String { + &self.thread_id + } } diff --git a/src/server.rs b/src/server.rs index dc66172..cbc4389 100644 --- a/src/server.rs +++ b/src/server.rs @@ -19,6 +19,8 @@ use std::io::ErrorKind; use std::io::Write; use std::net::{IpAddr, Shutdown, SocketAddr}; +use std::sync::Arc; +use std::thread; use std::time::Duration; use humansize::{file_size_opts as fsopts, FileSize}; @@ -55,7 +57,7 @@ const HTTP_RESPONSE: &str = "HTTP/1.1 200 OK\nContent-Length: 0\nConnection: clo /// pub struct Server { batch_size: u8, - socket: UdpSocket, + socket: Arc, health_listener: Option, poll_duration: Option, status_interval: Duration, @@ -64,6 +66,7 @@ pub struct Server { responder_rfc: Responder, responder_classic: Responder, buf: [u8; 65_536], + thread_name: String, stats: Box, @@ -77,9 +80,9 @@ impl Server { /// Create a new server instance from the provided /// [`ServerConfig`](../config/trait.ServerConfig.html) trait object instance. /// - pub fn new(config: &dyn ServerConfig) -> Server { - let sock_addr = config.udp_socket_addr().expect("udp sock addr"); - let socket = UdpSocket::bind(&sock_addr).expect("failed to bind to socket"); + pub fn new(config: &dyn ServerConfig, socket: Arc) -> Server { + // let sock_addr = config.udp_socket_addr().expect("udp sock addr"); + // let socket = UdpSocket::bind(&sock_addr).expect("failed to bind to socket"); let poll_duration = Some(Duration::from_millis(100)); @@ -134,6 +137,7 @@ impl Server { let batch_size = config.batch_size(); let status_interval = config.status_interval(); + let thread_name = thread::current().name().unwrap().to_string(); Server { batch_size, @@ -146,6 +150,7 @@ impl Server { responder_rfc, responder_classic, buf: [0u8; 65_536], + thread_name, stats, @@ -183,10 +188,9 @@ impl Server { let socket_now_empty = self.collect_requests(); - self.responder_rfc - .send_responses(&mut self.socket, &mut self.stats); - self.responder_classic - .send_responses(&mut self.socket, &mut self.stats); + let sock_copy = Arc::get_mut(&mut self.socket).unwrap(); + self.responder_rfc.send_responses(sock_copy, &mut self.stats); + self.responder_classic.send_responses(sock_copy, &mut self.stats); if socket_now_empty { break; @@ -309,4 +313,8 @@ impl Server { self.stats.clear(); self.timer.set_timeout(self.status_interval, ()); } + + pub fn thread_name(&self) -> &str { + &self.thread_name + } }