Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Increase log interest #265

Merged
merged 8 commits into from
Nov 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ resolver = "2"

[workspace.package]
edition = "2021"
version = "1.0.18"
version = "1.0.19"
authors = ["Fred Clausen", "Mike Nye", "Alex Austin"]
description = "ACARS Router: A Utility to ingest ACARS/VDLM2 from many sources, process, and feed out to many consumers."
documentation = "https://github.com/sdr-enthusiasts/acars_router"
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ services:
- AR_SEND_UDP_VDLM2=acarshub:5555
- AR_RECV_ZMQ_VDLM2=dumpvdl2:45555
- AR_OVERRIDE_STATION_NAME=${FEEDER_NAME}
- AR_STATS_VERBOSE=false
tmpfs:
- /run:exec,size=64M
- /var/log
Expand Down
3 changes: 3 additions & 0 deletions rust/libraries/acars_config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ pub struct Input {
/// Print statistics every N minutes
#[clap(long, env = "AR_STATS_EVERY", value_parser, default_value = "5")]
pub stats_every: u64,
/// Chatty logging of stats
#[clap(long, env = "AR_STATS_VERBOSE", value_parser)]
pub stats_verbose: bool,
/// Attempt message reassembly on incomplete messages within the specified number of seconds
#[clap(
long,
Expand Down
98 changes: 96 additions & 2 deletions rust/libraries/acars_connection_manager/src/message_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@ use tokio::sync::mpsc::{Receiver, Sender};
use tokio::sync::Mutex;
use tokio::time::{sleep, Duration};

pub struct FrequencyCount {
freq: String,
count: u32,
}

#[derive(Clone, Debug, Default)]
pub struct MessageHandlerConfig {
pub add_proxy_id: bool,
Expand All @@ -27,6 +32,7 @@ pub struct MessageHandlerConfig {
pub should_override_station_name: bool,
pub station_name: String,
pub stats_every: u64,
pub stats_verbose: bool,
}

impl MessageHandlerConfig {
Expand All @@ -41,6 +47,7 @@ impl MessageHandlerConfig {
should_override_station_name: args.override_station_name.is_some(),
station_name: station_name.to_string(),
stats_every: args.stats_every,
stats_verbose: args.stats_verbose,
}
} else {
Self {
Expand All @@ -52,6 +59,7 @@ impl MessageHandlerConfig {
should_override_station_name: false,
station_name: Default::default(),
stats_every: args.stats_every,
stats_verbose: args.stats_verbose,
}
}
}
Expand All @@ -65,6 +73,8 @@ impl MessageHandlerConfig {
Arc::new(Mutex::new(VecDeque::with_capacity(100)));
let total_messages_processed: Arc<Mutex<i32>> = Arc::new(Mutex::new(0));
let total_messages_since_last: Arc<Mutex<i32>> = Arc::new(Mutex::new(0));
let all_frequencies_logged: Arc<Mutex<Vec<FrequencyCount>>> =
Arc::new(Mutex::new(Vec::new()));
let queue_type_stats: String = self.queue_type.clone();
let queue_type_dedupe: String = self.queue_type.clone();
let stats_every: u64 = self.stats_every * 60; // Value has to be in seconds. Input is in minutes.
Expand All @@ -77,11 +87,18 @@ impl MessageHandlerConfig {
let stats_total_messages_context: Arc<Mutex<i32>> = Arc::clone(&total_messages_processed);
let stats_total_messages_since_last_context: Arc<Mutex<i32>> =
Arc::clone(&total_messages_since_last);
let stats_frequency_context: Option<Arc<Mutex<Vec<FrequencyCount>>>> = if self.stats_verbose
{
Some(Arc::clone(&all_frequencies_logged))
} else {
None
};

tokio::spawn(async move {
print_stats(
stats_total_messages_context,
stats_total_messages_since_last_context,
stats_frequency_context,
stats_every,
queue_type_stats.as_str(),
)
Expand Down Expand Up @@ -132,6 +149,56 @@ impl MessageHandlerConfig {
Err(_) => f64::default(),
};

// See if the frequency is in the list of frequencies we've seen
// If not, add it to the list and log it
// match the message type

match &message {
AcarsVdlm2Message::Vdlm2Message(m) => {
// get the freq from Vdlm2Message::Vdlm2Body
let frequency: String = m.vdl2.freq.to_string();
// check and see if we have the frequency in all_frequencies_logged. If so, increment the count.
// if not, add it
let mut found: bool = false;
for freq in all_frequencies_logged.lock().await.iter_mut() {
if freq.freq == frequency {
freq.count += 1;
found = true;
break;
}
}

if !found {
let new_frequency: FrequencyCount = FrequencyCount {
freq: frequency,
count: 1,
};
all_frequencies_logged.lock().await.push(new_frequency);
}
}
AcarsVdlm2Message::AcarsMessage(m) => {
// get the freq from AcarsMessage::AcarsBody
let frequency: String = m.freq.to_string();

let mut found: bool = false;
for freq in all_frequencies_logged.lock().await.iter_mut() {
if freq.freq == frequency {
freq.count += 1;
found = true;
break;
}
}

if !found {
let new_frequency: FrequencyCount = FrequencyCount {
freq: frequency,
count: 1,
};
all_frequencies_logged.lock().await.push(new_frequency);
}
}
}

let get_message_time: Option<f64> = message.get_time();

match get_message_time {
Expand Down Expand Up @@ -251,15 +318,42 @@ impl MessageHandlerConfig {
pub async fn print_stats(
total_all_time: Arc<Mutex<i32>>,
total_since_last: Arc<Mutex<i32>>,
frequencies: Option<Arc<Mutex<Vec<FrequencyCount>>>>,
stats_every: u64,
queue_type: &str,
) {
let stats_minutes = stats_every / 60;
loop {
sleep(Duration::from_secs(stats_every)).await;
info!("{} in the last {} minute(s):\nTotal messages processed: {}\nTotal messages processed since last update: {}",
queue_type, stats_minutes, total_all_time.lock().await, total_since_last.lock().await);
let total_all_time_locked = *total_all_time.lock().await;
let mut output: String = String::new();
output.push_str(&format!(
"{} in the last {} minute(s):\nTotal messages processed: {}\nTotal messages processed since last update: {}\n",
queue_type, stats_minutes, total_all_time_locked, total_since_last.lock().await
));
*total_since_last.lock().await = 0;

// now print the frequencies, and show each as a percentage of the total_all_time

if let Some(f) = &frequencies {
// sort the frequencies by count
if let Some(f) = &frequencies {
f.lock().await.sort_by(|a, b| b.count.cmp(&a.count));
}

for freq in f.lock().await.iter() {
let percentage: f64 = (freq.count as f64 / total_all_time_locked as f64) * 100.0;
output.push_str(
format!(
"{} {}: {}/{} ({:.2}%)\n",
queue_type, freq.freq, freq.count, total_all_time_locked, percentage
)
.as_str(),
);
}
}

println!("{}", output);
}
}

Expand Down
Loading