diff --git a/rust/rsc/.config.json b/rust/rsc/.config.json index de95f14ff..571c319eb 100644 --- a/rust/rsc/.config.json +++ b/rust/rsc/.config.json @@ -1,6 +1,20 @@ { "database_url": "postgres://localhost:5433/test", - "server_addr": "0.0.0.0:3002", + "server_address": "0.0.0.0:3002", + "connection_pool_timeout": 60, "standalone": false, - "active_store": "bfa63b02-d41f-4eff-b0e3-f42bafb33a3d" + "active_store": "bfa63b02-d41f-4eff-b0e3-f42bafb33a3d", + "log_directory": null, + "blob_eviction": { + "tick_rate": 60, + "ttl": 3600, + "chunk_size": 16000 + }, + "job_eviction": { + "ttl": { + "tick_rate": 600, + "ttl": 86400, + "chunk_size": 16000 + } + } } diff --git a/rust/rsc/src/bin/rsc/config.rs b/rust/rsc/src/bin/rsc/config.rs index 1e12491bb..d670cc482 100644 --- a/rust/rsc/src/bin/rsc/config.rs +++ b/rust/rsc/src/bin/rsc/config.rs @@ -1,11 +1,52 @@ use config::{Config, ConfigError, Environment, File}; use serde::{Deserialize, Serialize}; +#[derive(Debug, Deserialize, Serialize)] +pub struct RSCTTLConfig { + // How often to run the eviction check in seconds + pub tick_rate: u64, + // How long an object is allowed to live + pub ttl: u64, + // Maximum number of objects to delete at a time. Must be 1 >= x <= 16000 + pub chunk_size: u32, +} + +#[derive(Debug, Deserialize, Serialize)] +pub struct RSCLRUConfig { + // The max size in bytes that the cache may reach before eviction + pub high_mark: u64, + // The end size in bytes that the cache should reach after eviction + pub low_mark: u64, + // Maximum number of objects to delete at a time. Must be 1 >= x <= 16000 + pub chunk_size: u32, +} + +#[derive(Debug, Deserialize, Serialize)] +pub enum RSCJobEvictionConfig { + // Time to live eviction strategy + #[serde(rename = "ttl")] + TTL(RSCTTLConfig), + // Least recently used eviction strategy + #[serde(rename = "lru")] + LRU(RSCLRUConfig), +} + #[derive(Debug, Deserialize, Serialize)] pub struct RSCConfig { + // The url used to connect to the postgres database pub database_url: String, - pub server_addr: String, + // The address the that server should bind to + pub server_address: String, + // The amount of time a query should wait for a connection before timing out in seconds + pub connection_pool_timeout: u32, + // The blob store that new blobs should be written into pub active_store: String, + // The directory that server logs should be written to. If None logs are written to stdout + pub log_directory: Option, + // The config to control blob eviction + pub blob_eviction: RSCTTLConfig, + // The config to control job eviction + pub job_eviction: RSCJobEvictionConfig, } impl RSCConfig { diff --git a/rust/rsc/src/bin/rsc/main.rs b/rust/rsc/src/bin/rsc/main.rs index 37f7ef7a6..cdf23c36c 100644 --- a/rust/rsc/src/bin/rsc/main.rs +++ b/rust/rsc/src/bin/rsc/main.rs @@ -180,7 +180,7 @@ fn create_router( "/blob", get({ let config = config.clone(); - move || blob::get_upload_url(config.server_addr.clone()) + move || blob::get_upload_url(config.server_address.clone()) }), ) .route("/version/check", get(check_version)) @@ -310,11 +310,6 @@ fn request_max_fileno_limit() { #[tokio::main] async fn main() -> Result<(), Box> { - // setup a subscriber for logging - let file_appender = tracing_appender::rolling::daily("./rsc_logs", "rsc.log"); - let (non_blocking, _guard) = tracing_appender::non_blocking(file_appender); - tracing_subscriber::fmt().with_writer(non_blocking).init(); - // Parse the arguments let args = ServerOptions::parse(); @@ -323,10 +318,22 @@ async fn main() -> Result<(), Box> { let config = Arc::new(config); if args.show_config { - println!("{}", serde_json::to_string(&config).unwrap()); + println!("{}", serde_json::to_string_pretty(&config).unwrap()); return Ok(()); } + // setup a subscriber for logging + let _guard = if let Some(log_directory) = config.log_directory.clone() { + let file_appender = tracing_appender::rolling::daily(log_directory, "rsc.log"); + let (non_blocking, guard) = tracing_appender::non_blocking(file_appender); + tracing_subscriber::fmt().with_writer(non_blocking).init(); + Some(guard) + } else { + let subscriber = tracing_subscriber::FmtSubscriber::new(); + tracing::subscriber::set_global_default(subscriber)?; + None + }; + // Increase the number of allowed open files the the max request_max_fileno_limit(); @@ -338,21 +345,23 @@ async fn main() -> Result<(), Box> { let stores = activate_stores(connection.clone()).await; // Launch evictions threads - let one_min_in_seconds = 60 * 1; - let ten_mins_in_seconds = one_min_in_seconds * 10; - let one_hour_in_seconds = one_min_in_seconds * 60; - let one_day_in_seconds = one_hour_in_seconds * 24 * 1; - launch_job_eviction(connection.clone(), ten_mins_in_seconds, one_day_in_seconds); + match &config.job_eviction { + config::RSCJobEvictionConfig::TTL(ttl) => { + launch_job_eviction(connection.clone(), ttl.tick_rate, ttl.ttl); + } + config::RSCJobEvictionConfig::LRU(_) => panic!("LRU not implemented"), + } + launch_blob_eviction( connection.clone(), - one_min_in_seconds, - one_hour_in_seconds, + config.blob_eviction.tick_rate, + config.blob_eviction.ttl, stores.clone(), ); // Launch the server let router = create_router(connection.clone(), config.clone(), &stores); - axum::Server::bind(&config.server_addr.parse()?) + axum::Server::bind(&config.server_address.parse()?) .serve(router.into_make_service()) .await?; @@ -398,8 +407,20 @@ mod tests { fn create_config(store_id: Uuid) -> config::RSCConfig { config::RSCConfig { database_url: "test:0000".to_string(), - server_addr: "".to_string(), + server_address: "".to_string(), active_store: store_id.to_string(), + connection_pool_timeout: 10, + log_directory: None, + blob_eviction: config::RSCTTLConfig { + tick_rate: 10, + ttl: 100, + chunk_size: 100, + }, + job_eviction: config::RSCJobEvictionConfig::TTL(config::RSCTTLConfig { + tick_rate: 10, + ttl: 100, + chunk_size: 100, + }), } } diff --git a/rust/rsc/src/bin/rsc_tool/main.rs b/rust/rsc/src/bin/rsc_tool/main.rs index 397309db0..fd0a5d0df 100644 --- a/rust/rsc/src/bin/rsc_tool/main.rs +++ b/rust/rsc/src/bin/rsc_tool/main.rs @@ -394,7 +394,7 @@ async fn main() -> Result<(), Box> { })?; if args.show_config { - println!("{}", serde_json::to_string(&config).unwrap()); + println!("{}", serde_json::to_string_pretty(&config).unwrap()); return Ok(()); }