diff --git a/Cargo.toml b/Cargo.toml index b7398a13..ab597bd9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,7 +9,7 @@ resolver = "2" [workspace.package] edition = "2021" -version = "1.0.18" +version = "1.0.19" authors = ["Fred Clausen", "Mike Nye", "Alex Austin"] description = "ACARS Router: A Utility to ingest ACARS/VDLM2 from many sources, process, and feed out to many consumers." documentation = "https://github.com/sdr-enthusiasts/acars_router" diff --git a/README.md b/README.md index dde04398..a45ffea8 100644 --- a/README.md +++ b/README.md @@ -58,6 +58,7 @@ services: - AR_SEND_UDP_VDLM2=acarshub:5555 - AR_RECV_ZMQ_VDLM2=dumpvdl2:45555 - AR_OVERRIDE_STATION_NAME=${FEEDER_NAME} + - AR_STATS_VERBOSE=false tmpfs: - /run:exec,size=64M - /var/log diff --git a/rust/libraries/acars_config/src/lib.rs b/rust/libraries/acars_config/src/lib.rs index b5a290fd..55ecaf3b 100644 --- a/rust/libraries/acars_config/src/lib.rs +++ b/rust/libraries/acars_config/src/lib.rs @@ -44,6 +44,9 @@ pub struct Input { /// Print statistics every N minutes #[clap(long, env = "AR_STATS_EVERY", value_parser, default_value = "5")] pub stats_every: u64, + /// Chatty logging of stats + #[clap(long, env = "AR_STATS_VERBOSE", value_parser)] + pub stats_verbose: bool, /// Attempt message reassembly on incomplete messages within the specified number of seconds #[clap( long, diff --git a/rust/libraries/acars_connection_manager/src/message_handler.rs b/rust/libraries/acars_connection_manager/src/message_handler.rs index 09c7661b..ee3e951e 100644 --- a/rust/libraries/acars_connection_manager/src/message_handler.rs +++ b/rust/libraries/acars_connection_manager/src/message_handler.rs @@ -17,6 +17,11 @@ use tokio::sync::mpsc::{Receiver, Sender}; use tokio::sync::Mutex; use tokio::time::{sleep, Duration}; +pub struct FrequencyCount { + freq: String, + count: u32, +} + #[derive(Clone, Debug, Default)] pub struct MessageHandlerConfig { pub add_proxy_id: bool, @@ -27,6 +32,7 @@ pub struct MessageHandlerConfig { pub should_override_station_name: bool, pub station_name: String, pub stats_every: u64, + pub stats_verbose: bool, } impl MessageHandlerConfig { @@ -41,6 +47,7 @@ impl MessageHandlerConfig { should_override_station_name: args.override_station_name.is_some(), station_name: station_name.to_string(), stats_every: args.stats_every, + stats_verbose: args.stats_verbose, } } else { Self { @@ -52,6 +59,7 @@ impl MessageHandlerConfig { should_override_station_name: false, station_name: Default::default(), stats_every: args.stats_every, + stats_verbose: args.stats_verbose, } } } @@ -65,6 +73,8 @@ impl MessageHandlerConfig { Arc::new(Mutex::new(VecDeque::with_capacity(100))); let total_messages_processed: Arc> = Arc::new(Mutex::new(0)); let total_messages_since_last: Arc> = Arc::new(Mutex::new(0)); + let all_frequencies_logged: Arc>> = + Arc::new(Mutex::new(Vec::new())); let queue_type_stats: String = self.queue_type.clone(); let queue_type_dedupe: String = self.queue_type.clone(); let stats_every: u64 = self.stats_every * 60; // Value has to be in seconds. Input is in minutes. @@ -77,11 +87,18 @@ impl MessageHandlerConfig { let stats_total_messages_context: Arc> = Arc::clone(&total_messages_processed); let stats_total_messages_since_last_context: Arc> = Arc::clone(&total_messages_since_last); + let stats_frequency_context: Option>>> = if self.stats_verbose + { + Some(Arc::clone(&all_frequencies_logged)) + } else { + None + }; tokio::spawn(async move { print_stats( stats_total_messages_context, stats_total_messages_since_last_context, + stats_frequency_context, stats_every, queue_type_stats.as_str(), ) @@ -132,6 +149,56 @@ impl MessageHandlerConfig { Err(_) => f64::default(), }; + // See if the frequency is in the list of frequencies we've seen + // If not, add it to the list and log it + // match the message type + + match &message { + AcarsVdlm2Message::Vdlm2Message(m) => { + // get the freq from Vdlm2Message::Vdlm2Body + let frequency: String = m.vdl2.freq.to_string(); + // check and see if we have the frequency in all_frequencies_logged. If so, increment the count. + // if not, add it + let mut found: bool = false; + for freq in all_frequencies_logged.lock().await.iter_mut() { + if freq.freq == frequency { + freq.count += 1; + found = true; + break; + } + } + + if !found { + let new_frequency: FrequencyCount = FrequencyCount { + freq: frequency, + count: 1, + }; + all_frequencies_logged.lock().await.push(new_frequency); + } + } + AcarsVdlm2Message::AcarsMessage(m) => { + // get the freq from AcarsMessage::AcarsBody + let frequency: String = m.freq.to_string(); + + let mut found: bool = false; + for freq in all_frequencies_logged.lock().await.iter_mut() { + if freq.freq == frequency { + freq.count += 1; + found = true; + break; + } + } + + if !found { + let new_frequency: FrequencyCount = FrequencyCount { + freq: frequency, + count: 1, + }; + all_frequencies_logged.lock().await.push(new_frequency); + } + } + } + let get_message_time: Option = message.get_time(); match get_message_time { @@ -251,15 +318,42 @@ impl MessageHandlerConfig { pub async fn print_stats( total_all_time: Arc>, total_since_last: Arc>, + frequencies: Option>>>, stats_every: u64, queue_type: &str, ) { let stats_minutes = stats_every / 60; loop { sleep(Duration::from_secs(stats_every)).await; - info!("{} in the last {} minute(s):\nTotal messages processed: {}\nTotal messages processed since last update: {}", - queue_type, stats_minutes, total_all_time.lock().await, total_since_last.lock().await); + let total_all_time_locked = *total_all_time.lock().await; + let mut output: String = String::new(); + output.push_str(&format!( + "{} in the last {} minute(s):\nTotal messages processed: {}\nTotal messages processed since last update: {}\n", + queue_type, stats_minutes, total_all_time_locked, total_since_last.lock().await + )); *total_since_last.lock().await = 0; + + // now print the frequencies, and show each as a percentage of the total_all_time + + if let Some(f) = &frequencies { + // sort the frequencies by count + if let Some(f) = &frequencies { + f.lock().await.sort_by(|a, b| b.count.cmp(&a.count)); + } + + for freq in f.lock().await.iter() { + let percentage: f64 = (freq.count as f64 / total_all_time_locked as f64) * 100.0; + output.push_str( + format!( + "{} {}: {}/{} ({:.2}%)\n", + queue_type, freq.freq, freq.count, total_all_time_locked, percentage + ) + .as_str(), + ); + } + } + + println!("{}", output); } }