Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve worker load distribution #39

Merged
merged 4 commits into from
Jun 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 10 additions & 9 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "roughenough"
version = "1.2.1-draft8"
version = "1.3.0-draft8"
repository = "https://github.com/int08h/roughenough"
authors = ["Stuart Stock <[email protected]>", "Aaron Hill <[email protected]>"]
license = "Apache-2.0"
Expand All @@ -20,20 +20,21 @@ gcpkms = ["google-cloudkms1", "hyper", "hyper-rustls", "serde", "serde_json", "y
[dependencies]
byteorder = "1"
chrono = "0.4"
clap = "2"
ctrlc = { version = "3.2", features = ["termination"] }
humansize = "1"
clap = "4"
ctrlc = { version = "3.4", features = ["termination"] }
humansize = "2"
log = "0.4"
mio = "0.6"
mio-extras = "2.0"
net2 = "0.2"
once_cell = "1.19"
rand = "0.6"
ring = "0.16"
simple_logger = "1"
ring = "0.17"
simple_logger = "5"
yaml-rust = "0.4"
zeroize = "1.4"
data-encoding = "2.3"
enum-iterator = "2.0"
zeroize = "1.8"
data-encoding = "2.6"
enum-iterator = "2.1"

# Used by 'awskms' and 'gcpkms'
futures = { version = "^0.3", optional = true }
Expand Down
50 changes: 31 additions & 19 deletions src/bin/roughenough-server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,16 @@
#[macro_use]
extern crate log;

use std::{env, io, thread};
use std::process;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex};
use std::{env, thread};
use std::sync::atomic::{AtomicBool, Ordering};

use log::LevelFilter;
use mio::net::UdpSocket;
use mio::Events;
use mio::net::UdpSocket;
use net2::UdpBuilder;
use net2::unix::UnixUdpBuilderExt;
use once_cell::sync::Lazy;
use simple_logger::SimpleLogger;

Expand All @@ -43,7 +45,7 @@ use roughenough::server::Server;
// the Ctrl-C (SIGINT) handler created in `set_ctrlc_handler()`
static KEEP_RUNNING: Lazy<AtomicBool> = Lazy::new(|| AtomicBool::new(true));

fn polling_loop(cfg: Arc<Mutex<Box<dyn ServerConfig>>>, socket: Arc<UdpSocket>) {
fn polling_loop(cfg: Arc<Mutex<Box<dyn ServerConfig>>>, socket: UdpSocket) {
let mut server = {
let config = cfg.lock().unwrap();
let server = Server::new(config.as_ref(), socket);
Expand All @@ -52,7 +54,7 @@ fn polling_loop(cfg: Arc<Mutex<Box<dyn ServerConfig>>>, socket: Arc<UdpSocket>)
server
};

let mut events = Events::with_capacity(2048);
let mut events = Events::with_capacity(1024);

loop {
server.process_events(&mut events);
Expand All @@ -69,6 +71,24 @@ fn set_ctrlc_handler() {
.expect("failed setting Ctrl-C handler");
}

// Bind to the server port using SO_REUSEPORT and SO_REUSEADDR so the kernel will more fairly
// balance traffic to each worker. https://lwn.net/Articles/542629/
fn bind_socket(config: Arc<Mutex<Box<dyn ServerConfig>>>) -> io::Result<UdpSocket> {
let sock_addr = config
.lock()
.unwrap()
.udp_socket_addr()
.expect("udp sock addr");

let std_socket = UdpBuilder::new_v4()?
.reuse_address(true)?
.reuse_port(true)?
.bind(sock_addr)?;

let mio_socket: UdpSocket = UdpSocket::from_socket(std_socket)?;
Ok(mio_socket)
}

fn display_config(server: &Server, cfg: &dyn ServerConfig) {
info!("Processing thread : {}", server.thread_name());
info!("Number of workers : {}", cfg.num_workers());
Expand Down Expand Up @@ -133,27 +153,19 @@ pub fn main() {
Ok(cfg) => Arc::new(Mutex::new(cfg)),
};

let socket = {
let sock_addr = config
.lock()
.unwrap()
.udp_socket_addr()
.expect("udp sock addr");
let sock = UdpSocket::bind(&sock_addr).expect("failed to bind to socket");
Arc::new(sock)
};

set_ctrlc_handler();

// TODO(stuart) move TCP healthcheck out of worker threads as it currently conflicts
// TODO(stuart) TCP healthcheck REUSEADDR and RESUSEPORT on the tcp socket

let mut threads = Vec::new();

for i in 0..config.lock().unwrap().num_workers() {
let num_workers = config.lock().unwrap().num_workers();
for i in 0..num_workers {
let cfg = config.clone();
let sock = socket.try_clone().unwrap();
let socket = bind_socket(cfg.clone()).unwrap();
let thread = thread::Builder::new()
.name(format!("worker-{}", i))
.spawn(move || polling_loop(cfg, sock.into()))
.spawn(move || polling_loop(cfg.clone(), socket) )
.expect("failure spawning thread");

threads.push(thread);
Expand Down
8 changes: 4 additions & 4 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2017-2021 int08h LLC
// Copyright 2017-2024 int08h LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -16,8 +16,8 @@
//! An implementation of the [Roughtime](https://roughtime.googlesource.com/roughtime)
//! secure time synchronization protocol.
//!
//! Roughtime aims to achieve rough time synchronisation in a secure way that doesn't
//! depend on any particular time server, and in such a way that, if a time server does
//! Roughtime aims to achieve rough time synchronization in a secure way that doesn't
//! depend on any particular timeserver, and in such a way that, if a timeserver does
//! misbehave, clients end up with cryptographic proof of it.
//!
//! # Protocol
Expand Down Expand Up @@ -79,7 +79,7 @@ pub mod stats;
pub mod version;

/// Version of Roughenough
pub const VERSION: &str = "1.2.1-draft8";
pub const VERSION: &str = "1.3.0-draft8";

/// Roughenough version string enriched with any compile-time optional features
pub fn roughenough_version() -> String {
Expand Down
8 changes: 4 additions & 4 deletions src/merkle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ impl MerkleTree {
}

pub fn get_paths(&self, mut index: usize) -> Vec<u8> {
let mut paths = Vec::with_capacity(self.levels.len() * self.algorithm.output_len);
let mut paths = Vec::with_capacity(self.levels.len() * self.algorithm.output_len());
let mut level = 0;

while !self.levels[level].is_empty() {
Expand Down Expand Up @@ -95,7 +95,7 @@ impl MerkleTree {
}

if node_count % 2 != 0 {
self.levels[level - 1].push(vec![0; self.algorithm.output_len]);
self.levels[level - 1].push(vec![0; self.algorithm.output_len()]);
node_count += 1;
}

Expand Down Expand Up @@ -146,9 +146,9 @@ impl MerkleTree {
Hash::from(ctx.finish().as_ref())
};

assert_eq!(paths.len() % self.algorithm.output_len, 0);
assert_eq!(paths.len() % self.algorithm.output_len(), 0);

for path in paths.chunks(self.algorithm.output_len) {
for path in paths.chunks(self.algorithm.output_len()) {
let mut ctx = digest::Context::new(self.algorithm);
ctx.update(TREE_NODE_TWEAK);

Expand Down
18 changes: 8 additions & 10 deletions src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,10 @@
use std::io::ErrorKind;
use std::io::Write;
use std::net::{IpAddr, Shutdown, SocketAddr};
use std::sync::Arc;
use std::thread;
use std::time::Duration;

use humansize::{file_size_opts as fsopts, FileSize};
use humansize::{BINARY, format_size};
use mio::net::{TcpListener, UdpSocket};
use mio::{Events, Poll, PollOpt, Ready, Token};
use mio_extras::timer::Timer;
Expand Down Expand Up @@ -56,7 +55,7 @@ const HTTP_RESPONSE: &str = "HTTP/1.1 200 OK\nContent-Length: 0\nConnection: clo
///
pub struct Server {
batch_size: u8,
socket: Arc<UdpSocket>,
socket: UdpSocket,
health_listener: Option<TcpListener>,
poll_duration: Option<Duration>,
status_interval: Duration,
Expand All @@ -80,7 +79,7 @@ impl Server {
/// Create a new server instance from the provided
/// [`ServerConfig`](../config/trait.ServerConfig.html) trait object instance.
///
pub fn new(config: &dyn ServerConfig, socket: Arc<UdpSocket>) -> Server {
pub fn new(config: &dyn ServerConfig, socket: UdpSocket) -> Server {
let mut timer: Timer<()> = Timer::default();
timer.set_timeout(config.status_interval(), ());

Expand Down Expand Up @@ -186,13 +185,12 @@ impl Server {

let socket_now_empty = self.collect_requests();

let sock_copy = Arc::get_mut(&mut self.socket).unwrap();
self.responder_rfc
.send_responses(sock_copy, &mut self.stats);
.send_responses(&mut self.socket, &mut self.stats);
self.responder_draft
.send_responses(sock_copy, &mut self.stats);
.send_responses(&mut self.socket, &mut self.stats);
self.responder_classic
.send_responses(sock_copy, &mut self.stats);
.send_responses(&mut self.socket, &mut self.stats);

if socket_now_empty {
break;
Expand Down Expand Up @@ -295,7 +293,7 @@ impl Server {
counts.invalid_requests,
counts.classic_responses_sent,
counts.rfc_responses_sent,
counts.bytes_sent.file_size(fsopts::BINARY).unwrap(),
format_size(counts.bytes_sent, BINARY),
counts.failed_send_attempts,
counts.retried_send_attempts
);
Expand All @@ -312,7 +310,7 @@ impl Server {
self.stats.total_responses_sent(),
self.stats.num_classic_responses_sent(),
self.stats.num_rfc_responses_sent(),
self.stats.total_bytes_sent().file_size(fsopts::BINARY).unwrap(),
format_size(self.stats.total_bytes_sent(), BINARY),
self.stats.total_failed_send_attempts(),
self.stats.total_retried_send_attempts()
);
Expand Down
Loading