Skip to content

Commit

Permalink
A working basic multi-threaded server
Browse files Browse the repository at this point in the history
  • Loading branch information
int08h committed Mar 1, 2024
1 parent efdb4b0 commit 5a14a6a
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 23 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ humansize = "1"
log = "0.4"
mio = "0.6"
mio-extras = "2.0"
once_cell = "1.19"
rand = "0.6"
ring = "0.16"
simple_logger = "1"
Expand Down
61 changes: 48 additions & 13 deletions src/bin/roughenough-server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,43 +23,54 @@
#[macro_use]
extern crate log;

use std::env;
use std::{env, thread};
use std::process;
use std::sync::Arc;
use std::sync::{Arc, Mutex};
use std::sync::atomic::{AtomicBool, Ordering};

use log::LevelFilter;
use mio::Events;
use mio::net::UdpSocket;
use once_cell::sync::Lazy;
use simple_logger::SimpleLogger;

use roughenough::config;
use roughenough::config::ServerConfig;
use roughenough::roughenough_version;
use roughenough::server::Server;

fn polling_loop(config: Box<dyn ServerConfig>) {
let mut server = Server::new(config.as_ref());
let keep_running = Arc::new(AtomicBool::new(true));
// All processing threads poll this. Starts TRUE and will be set to FASLE by
// the Ctrl-C (SIGINT) handler created in `set_ctrlc_handler()`
static KEEP_RUNNING: Lazy<AtomicBool> = Lazy::new(|| AtomicBool::new(true));

display_config(&server, config.as_ref());
fn polling_loop(cfg: Arc<Mutex<Box<dyn ServerConfig>>>, socket: Arc<UdpSocket>) {
let mut server = {
let config = cfg.lock().unwrap();
let server = Server::new(config.as_ref(), socket);

let kr_clone = keep_running.clone();
ctrlc::set_handler(move || kr_clone.store(false, Ordering::Release))
.expect("failed setting Ctrl-C handler");
display_config(&server, config.as_ref());
server
};

let mut events = Events::with_capacity(1024);
let mut events = Events::with_capacity(2048);

loop {
server.process_events(&mut events);

if !keep_running.load(Ordering::Acquire) {
if !KEEP_RUNNING.load(Ordering::Acquire) {
warn!("Ctrl-C caught, exiting...");
return;
}
}
}

fn set_ctrlc_handler() {
ctrlc::set_handler(move || KEEP_RUNNING.store(false, Ordering::Release))
.expect("failed setting Ctrl-C handler");
}

fn display_config(server: &Server, cfg: &dyn ServerConfig) {
info!("Processing thread : {}", server.thread_name());
info!("Long-term public key : {}", server.get_public_key());
info!("Max response batch size : {}", cfg.batch_size());
info!(
Expand Down Expand Up @@ -117,10 +128,34 @@ pub fn main() {
process::exit(1)
}
Ok(ref cfg) if !config::is_valid_config(cfg.as_ref()) => process::exit(1),
Ok(cfg) => cfg,
Ok(cfg) => Arc::new(Mutex::new(cfg)),
};

let sock_addr = config.lock().unwrap().udp_socket_addr().expect("udp sock addr");
let socket = {
let sock = UdpSocket::bind(&sock_addr).expect("failed to bind to socket");
Arc::new(sock)
};

polling_loop(config);
set_ctrlc_handler();

// TODO(stuart) pull TCP healthcheck out of worker threads
let mut thread_handles = Vec::new();

for i in 0 .. 4 {
let cfg = config.clone();
let sock = socket.try_clone().unwrap();
let thrd = thread::Builder::new()
.name(format!("worker-{}", i))
.spawn(move || polling_loop(cfg, sock.into()))
.unwrap();

thread_handles.push(thrd);
}

for handle in thread_handles {
handle.join().unwrap();
}

info!("Done.");
process::exit(0);
Expand Down
2 changes: 1 addition & 1 deletion src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ pub const DEFAULT_STATUS_INTERVAL: Duration = Duration::from_secs(600);
/// * [EnvironmentConfig](struct.EnvironmentConfig.html) - configure via environment variables
/// * [MemoryConfig](struct.MemoryConfig.html) - in-memory configuration for testing
///
pub trait ServerConfig {
pub trait ServerConfig : Send {
/// [Required] IP address or interface name to listen for client requests
fn interface(&self) -> &str;

Expand Down
11 changes: 10 additions & 1 deletion src/responder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
//!
use std::net::SocketAddr;
use std::thread;
use std::time::SystemTime;

use byteorder::{LittleEndian, WriteBytesExt};
Expand All @@ -41,6 +42,7 @@ pub struct Responder {
requests: Vec<(Vec<u8>, SocketAddr)>,
merkle: MerkleTree,
grease: Grease,
thread_id: String,
}

impl Responder {
Expand All @@ -50,6 +52,7 @@ impl Responder {
let long_term_public_key = HEX.encode(ltk.public_key());
let requests = Vec::with_capacity(config.batch_size() as usize);
let grease = Grease::new(config.fault_percentage());
let thread_id = thread::current().name().unwrap().to_string();

let merkle = if version == Version::Classic {
MerkleTree::new_sha512()
Expand All @@ -65,6 +68,7 @@ impl Responder {
merkle,
requests,
grease,
thread_id,
}
}

Expand Down Expand Up @@ -120,7 +124,8 @@ impl Responder {
}

debug!(
"Responded {} {} bytes to {} for '{}..' (#{} in batch)",
"Thread {} responded {} {} bytes to {} for '{}..' (#{} in batch)",
thread::current().name().unwrap(),
self.version,
bytes_sent,
src_addr,
Expand Down Expand Up @@ -180,4 +185,8 @@ impl Responder {
pub fn get_online_key(&self) -> &OnlineKey {
&self.online_key
}

pub fn get_thread_id(&self) -> &String {
&self.thread_id
}
}
24 changes: 16 additions & 8 deletions src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
use std::io::ErrorKind;
use std::io::Write;
use std::net::{IpAddr, Shutdown, SocketAddr};
use std::sync::Arc;
use std::thread;
use std::time::Duration;

use humansize::{file_size_opts as fsopts, FileSize};
Expand Down Expand Up @@ -55,7 +57,7 @@ const HTTP_RESPONSE: &str = "HTTP/1.1 200 OK\nContent-Length: 0\nConnection: clo
///
pub struct Server {
batch_size: u8,
socket: UdpSocket,
socket: Arc<UdpSocket>,
health_listener: Option<TcpListener>,
poll_duration: Option<Duration>,
status_interval: Duration,
Expand All @@ -64,6 +66,7 @@ pub struct Server {
responder_rfc: Responder,
responder_classic: Responder,
buf: [u8; 65_536],
thread_name: String,

stats: Box<dyn ServerStats>,

Expand All @@ -77,9 +80,9 @@ impl Server {
/// Create a new server instance from the provided
/// [`ServerConfig`](../config/trait.ServerConfig.html) trait object instance.
///
pub fn new(config: &dyn ServerConfig) -> Server {
let sock_addr = config.udp_socket_addr().expect("udp sock addr");
let socket = UdpSocket::bind(&sock_addr).expect("failed to bind to socket");
pub fn new(config: &dyn ServerConfig, socket: Arc<UdpSocket>) -> Server {
// let sock_addr = config.udp_socket_addr().expect("udp sock addr");
// let socket = UdpSocket::bind(&sock_addr).expect("failed to bind to socket");

let poll_duration = Some(Duration::from_millis(100));

Expand Down Expand Up @@ -134,6 +137,7 @@ impl Server {

let batch_size = config.batch_size();
let status_interval = config.status_interval();
let thread_name = thread::current().name().unwrap().to_string();

Server {
batch_size,
Expand All @@ -146,6 +150,7 @@ impl Server {
responder_rfc,
responder_classic,
buf: [0u8; 65_536],
thread_name,

stats,

Expand Down Expand Up @@ -183,10 +188,9 @@ impl Server {

let socket_now_empty = self.collect_requests();

self.responder_rfc
.send_responses(&mut self.socket, &mut self.stats);
self.responder_classic
.send_responses(&mut self.socket, &mut self.stats);
let sock_copy = Arc::get_mut(&mut self.socket).unwrap();
self.responder_rfc.send_responses(sock_copy, &mut self.stats);
self.responder_classic.send_responses(sock_copy, &mut self.stats);

if socket_now_empty {
break;
Expand Down Expand Up @@ -309,4 +313,8 @@ impl Server {
self.stats.clear();
self.timer.set_timeout(self.status_interval, ());
}

pub fn thread_name(&self) -> &str {
&self.thread_name
}
}

0 comments on commit 5a14a6a

Please sign in to comment.