Skip to content

Commit

Permalink
Make number of workers configurable.
Browse files Browse the repository at this point in the history
  • Loading branch information
int08h committed Mar 1, 2024
1 parent 5a14a6a commit dc7144f
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 8 deletions.
14 changes: 7 additions & 7 deletions src/bin/roughenough-server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,11 @@ fn set_ctrlc_handler() {

fn display_config(server: &Server, cfg: &dyn ServerConfig) {
info!("Processing thread : {}", server.thread_name());
info!("Number of workers : {}", cfg.num_workers());
info!("Long-term public key : {}", server.get_public_key());
info!("Max response batch size : {}", cfg.batch_size());
info!(
"Status updates every : {} seconds",
cfg.status_interval().as_secs()
);
info!("Status updates every : {} seconds", cfg.status_interval().as_secs());

info!(
"Server listening on : {}:{}",
cfg.interface(),
Expand Down Expand Up @@ -104,6 +103,7 @@ fn display_config(server: &Server, cfg: &dyn ServerConfig) {
} else {
info!("Deliberate response errors : disabled");
}

}

pub fn main() {
Expand Down Expand Up @@ -131,18 +131,18 @@ pub fn main() {
Ok(cfg) => Arc::new(Mutex::new(cfg)),
};

let sock_addr = config.lock().unwrap().udp_socket_addr().expect("udp sock addr");
let socket = {
let sock_addr = config.lock().unwrap().udp_socket_addr().expect("udp sock addr");
let sock = UdpSocket::bind(&sock_addr).expect("failed to bind to socket");
Arc::new(sock)
};

set_ctrlc_handler();

// TODO(stuart) pull TCP healthcheck out of worker threads
// TODO(stuart) move TCP healthcheck out of worker threads as it currently conflicts
let mut thread_handles = Vec::new();

for i in 0 .. 4 {
for i in 0 .. config.lock().unwrap().num_workers() {
let cfg = config.clone();
let sock = socket.try_clone().unwrap();
let thrd = thread::Builder::new()
Expand Down
16 changes: 15 additions & 1 deletion src/config/environment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::env;
use std::{env, thread};
use std::time::Duration;

use data_encoding::{Encoding, HEXLOWER_PERMISSIVE};
Expand All @@ -39,6 +39,7 @@ const HEX: Encoding = HEXLOWER_PERMISSIVE;
/// health_check_port | `ROUGHENOUGH_HEALTH_CHECK_PORT`
/// client_stats | `ROUGHENOUGH_CLIENT_STATS`
/// fault_percentage | `ROUGHENOUGH_FAULT_PERCENTAGE`
/// num_workers | `ROUGHENOUGH_NUM_WORKERS`
///
pub struct EnvironmentConfig {
port: u16,
Expand All @@ -50,6 +51,7 @@ pub struct EnvironmentConfig {
health_check_port: Option<u16>,
client_stats: bool,
fault_percentage: u8,
num_workers: usize,
}

const ROUGHENOUGH_PORT: &str = "ROUGHENOUGH_PORT";
Expand All @@ -61,6 +63,7 @@ const ROUGHENOUGH_KMS_PROTECTION: &str = "ROUGHENOUGH_KMS_PROTECTION";
const ROUGHENOUGH_HEALTH_CHECK_PORT: &str = "ROUGHENOUGH_HEALTH_CHECK_PORT";
const ROUGHENOUGH_CLIENT_STATS: &str = "ROUGHENOUGH_CLIENT_STATS";
const ROUGHENOUGH_FAULT_PERCENTAGE: &str = "ROUGHENOUGH_FAULT_PERCENTAGE";
const ROUGHENOUGH_NUM_WORKERS: &str = "ROUGHENOUGH_NUM_WORKERS:";

impl EnvironmentConfig {
pub fn new() -> Result<Self, Error> {
Expand All @@ -74,6 +77,7 @@ impl EnvironmentConfig {
health_check_port: None,
client_stats: false,
fault_percentage: 0,
num_workers: thread::available_parallelism().unwrap().get(),
};

if let Ok(port) = env::var(ROUGHENOUGH_PORT) {
Expand Down Expand Up @@ -132,6 +136,12 @@ impl EnvironmentConfig {
.unwrap_or_else(|_| panic!("invalid fault_percentage: {}", fault_percentage));
};

if let Ok(num_workers) = env::var(ROUGHENOUGH_NUM_WORKERS) {
cfg.num_workers = num_workers
.parse()
.unwrap_or_else(|_| panic!("invalid num_workers: {}", num_workers));
};

Ok(cfg)
}
}
Expand Down Expand Up @@ -172,4 +182,8 @@ impl ServerConfig for EnvironmentConfig {
fn fault_percentage(&self) -> u8 {
self.fault_percentage
}

fn num_workers(&self) -> usize {
self.num_workers
}
}
11 changes: 11 additions & 0 deletions src/config/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use std::fs::File;
use std::io::Read;
use std::thread;
use std::time::Duration;

use data_encoding::{Encoding, HEXLOWER_PERMISSIVE};
Expand Down Expand Up @@ -48,6 +49,7 @@ pub struct FileConfig {
health_check_port: Option<u16>,
client_stats: bool,
fault_percentage: u8,
num_workers: usize,
}

impl FileConfig {
Expand Down Expand Up @@ -80,6 +82,7 @@ impl FileConfig {
health_check_port: None,
client_stats: false,
fault_percentage: 0,
num_workers: thread::available_parallelism().unwrap().get(),
};

for (key, value) in cfg[0].as_hash().unwrap() {
Expand Down Expand Up @@ -116,6 +119,10 @@ impl FileConfig {
let val = value.as_i64().unwrap() as u8;
config.fault_percentage = val;
}
"num_workers" => {
let val = value.as_i64().unwrap() as usize;
config.num_workers = val;
}
unknown => {
return Err(Error::InvalidConfiguration(format!(
"unknown config key: {}",
Expand Down Expand Up @@ -165,4 +172,8 @@ impl ServerConfig for FileConfig {
fn fault_percentage(&self) -> u8 {
self.fault_percentage
}

fn num_workers(&self) -> usize {
self.num_workers
}
}
7 changes: 7 additions & 0 deletions src/config/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::thread;
use std::time::Duration;

use data_encoding::{Encoding, HEXLOWER_PERMISSIVE};
Expand All @@ -35,6 +36,7 @@ pub struct MemoryConfig {
pub health_check_port: Option<u16>,
pub client_stats: bool,
pub fault_percentage: u8,
pub num_workers: usize,
}

impl MemoryConfig {
Expand All @@ -50,6 +52,7 @@ impl MemoryConfig {
health_check_port: None,
client_stats: false,
fault_percentage: 0,
num_workers: thread::available_parallelism().unwrap().get(),
}
}
}
Expand Down Expand Up @@ -90,4 +93,8 @@ impl ServerConfig for MemoryConfig {
fn fault_percentage(&self) -> u8 {
self.fault_percentage
}

fn num_workers(&self) -> usize {
self.num_workers
}
}
10 changes: 10 additions & 0 deletions src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ pub const DEFAULT_STATUS_INTERVAL: Duration = Duration::from_secs(600);
/// `kms_protection` | `ROUGHENOUGH_KMS_PROTECTION` | Optional | If compiled with KMS support, the ID of the KMS key used to protect the long-term identity.
/// `client_stats` | `ROUGHENOUGH_CLIENT_STATS` | Optional | A value of `on` or `yes` will enable tracking of per-client request statistics that will be output each time server status is logged. Default is `off` (disabled).
/// `fault_percentage` | `ROUGHENOUGH_FAULT_PERCENTAGE` | Optional | Likelihood (as a percentage) that the server will intentionally return an invalid client response. An integer range from `0` (disabled, all responses valid) to `50` (50% of responses will be invalid). Default is `0` (disabled).
/// `num_workers` | `ROUGHENOUGH_NUM_WORKERS` | Optional | Number of worker threads created to process requests. Defaults to `thread::available_parallelism()`
///
/// Implementations of this trait obtain a valid configuration from different back-end
/// sources. See:
Expand Down Expand Up @@ -109,6 +110,10 @@ pub trait ServerConfig : Send {
/// for background and rationale.
fn fault_percentage(&self) -> u8;

/// [Optional] The number of worker threads to start. Defaults to the value returned by
/// Rust's `thread::available_parallelism()`.
fn num_workers(&self) -> usize;

/// Convenience function to create a `SocketAddr` from the provided `interface` and `port`
fn udp_socket_addr(&self) -> Result<SocketAddr, Error> {
let addr = format!("{}:{}", self.interface(), self.port());
Expand Down Expand Up @@ -190,6 +195,11 @@ pub fn is_valid_config(cfg: &dyn ServerConfig) -> bool {
is_valid = false;
}

if cfg.num_workers() == 0 {
error!("num_workers must be > 0");
is_valid = false;
}

if is_valid {
if let Err(e) = cfg.udp_socket_addr() {
error!(
Expand Down

0 comments on commit dc7144f

Please sign in to comment.