From 28d7e555a75be55746e7b817141329ba89799061 Mon Sep 17 00:00:00 2001 From: Fletcher Nichol Date: Wed, 7 Feb 2024 12:51:35 -0700 Subject: [PATCH] chore: add optional API to make tracing level changes "wait-able" This change adds several `*_and_wait()` method options to the `ApplicationTelemetryClient` type. When our services boot up we check the parsed CLI argument for verbosity flags. If there are more than one set then we issue a command to raise the tracing level but prior to this change, this operation was non-blocking/async. That means that the level may not have raised by the time the next line of code executes resulting in potential missing log messages. Signed-off-by: Fletcher Nichol --- bin/council/src/main.rs | 4 +- bin/cyclone/src/main.rs | 4 +- bin/module-index/src/main.rs | 4 +- bin/pinga/src/main.rs | 4 +- bin/sdf/src/main.rs | 4 +- bin/veritech/src/main.rs | 4 +- lib/telemetry-application-rs/src/lib.rs | 12 ++- lib/telemetry-rs/src/lib.rs | 128 +++++++++++++++++++++--- 8 files changed, 141 insertions(+), 23 deletions(-) diff --git a/bin/council/src/main.rs b/bin/council/src/main.rs index 1c25fe5d2f..270a915de9 100644 --- a/bin/council/src/main.rs +++ b/bin/council/src/main.rs @@ -46,7 +46,9 @@ async fn async_main() -> Result<()> { }; if args.verbose > 0 { - telemetry.set_verbosity(args.verbose.into()).await?; + telemetry + .set_verbosity_and_wait(args.verbose.into()) + .await?; } trace!(arguments =?args, "parsed cli arguments"); diff --git a/bin/cyclone/src/main.rs b/bin/cyclone/src/main.rs index bd71a4ba33..f52a9ecffb 100644 --- a/bin/cyclone/src/main.rs +++ b/bin/cyclone/src/main.rs @@ -41,7 +41,9 @@ async fn main() -> Result<()> { }; if args.verbose > 0 { - telemetry.set_verbosity(args.verbose.into()).await?; + telemetry + .set_verbosity_and_wait(args.verbose.into()) + .await?; } trace!(arguments =?args, "parsed cli arguments"); diff --git a/bin/module-index/src/main.rs b/bin/module-index/src/main.rs index c3f857d98c..a7a9e7d30f 100644 --- a/bin/module-index/src/main.rs +++ b/bin/module-index/src/main.rs @@ -46,7 +46,9 @@ async fn async_main() -> Result<()> { }; if args.verbose > 0 { - telemetry.set_verbosity(args.verbose.into()).await?; + telemetry + .set_verbosity_and_wait(args.verbose.into()) + .await?; } trace!(arguments =?args, "parsed cli arguments"); diff --git a/bin/pinga/src/main.rs b/bin/pinga/src/main.rs index 7a26506e36..fec381b455 100644 --- a/bin/pinga/src/main.rs +++ b/bin/pinga/src/main.rs @@ -46,7 +46,9 @@ async fn async_main() -> Result<()> { }; if args.verbose > 0 { - telemetry.set_verbosity(args.verbose.into()).await?; + telemetry + .set_verbosity_and_wait(args.verbose.into()) + .await?; } trace!(arguments =?args, "parsed cli arguments"); diff --git a/bin/sdf/src/main.rs b/bin/sdf/src/main.rs index be7cfdc7b0..0c43106e66 100644 --- a/bin/sdf/src/main.rs +++ b/bin/sdf/src/main.rs @@ -57,7 +57,9 @@ async fn async_main() -> Result<()> { }; if args.verbose > 0 { - telemetry.set_verbosity(args.verbose.into()).await?; + telemetry + .set_verbosity_and_wait(args.verbose.into()) + .await?; } trace!(arguments =?args, "parsed cli arguments"); diff --git a/bin/veritech/src/main.rs b/bin/veritech/src/main.rs index 266434a8ea..4e05944ad6 100644 --- a/bin/veritech/src/main.rs +++ b/bin/veritech/src/main.rs @@ -32,7 +32,9 @@ async fn main() -> Result<()> { }; if args.verbose > 0 { - telemetry.set_verbosity(args.verbose.into()).await?; + telemetry + .set_verbosity_and_wait(args.verbose.into()) + .await?; } trace!(arguments =?args, "parsed cli arguments"); diff --git a/lib/telemetry-application-rs/src/lib.rs b/lib/telemetry-application-rs/src/lib.rs index 8c7f7d9dc4..9350fc67d9 100644 --- a/lib/telemetry-application-rs/src/lib.rs +++ b/lib/telemetry-application-rs/src/lib.rs @@ -579,14 +579,22 @@ impl TelemetryUpdateTask { async fn run(mut self) { while let Some(command) = self.update_command_rx.recv().await { match command { - TelemetryCommand::TracingLevel(tracing_level) => { - if let Err(err) = self.update_tracing_level(tracing_level) { + TelemetryCommand::TracingLevel { level, wait } => { + if let Err(err) = self.update_tracing_level(level) { warn!( task = Self::NAME, error = ?err, "failed to update tracing level, using prior value", ); } + if let Some(tx) = wait { + if let Err(err) = tx.send(()) { + warn!( + error = ?err, + "receiver already closed when waiting on changing tracing level", + ); + } + } } TelemetryCommand::Shutdown(token) => { if !self.is_shutdown { diff --git a/lib/telemetry-rs/src/lib.rs b/lib/telemetry-rs/src/lib.rs index 937a98d861..30f4f5591e 100644 --- a/lib/telemetry-rs/src/lib.rs +++ b/lib/telemetry-rs/src/lib.rs @@ -19,11 +19,12 @@ use std::{ use async_trait::async_trait; use thiserror::Error; -use tokio::sync::{mpsc, Mutex}; +use tokio::sync::{mpsc, oneshot, Mutex}; use tokio_util::sync::CancellationToken; pub use opentelemetry::{self, trace::SpanKind}; pub use tracing; +use tracing::warn; pub mod prelude { pub use super::{MessagingOperation, SpanExt, SpanKind, SpanKindExt}; @@ -200,11 +201,69 @@ impl ApplicationTelemetryClient { update_telemetry_tx, } } -} -#[async_trait] -impl TelemetryClient for ApplicationTelemetryClient { - async fn set_verbosity(&mut self, updated: Verbosity) -> Result<(), ClientError> { + pub async fn set_verbosity_and_wait(&mut self, updated: Verbosity) -> Result<(), ClientError> { + let (tx, rx) = oneshot::channel(); + + self.set_verbosity_inner(updated, Some(tx)).await?; + + if let Err(err) = rx.await { + warn!(error = ?err, "sender already closed while waiting on verbosity change"); + } + + Ok(()) + } + + pub async fn increase_verbosity_and_wait(&mut self) -> Result<(), ClientError> { + let (tx, rx) = oneshot::channel(); + + self.increase_verbosity_inner(Some(tx)).await?; + + if let Err(err) = rx.await { + warn!( + error = ?err, + "sender already closed while waiting on verbosity increase change", + ); + } + + Ok(()) + } + + pub async fn decrease_verbosity_and_wait(&mut self) -> Result<(), ClientError> { + let (tx, rx) = oneshot::channel(); + + self.decrease_verbosity_inner(Some(tx)).await?; + + if let Err(err) = rx.await { + warn!( + error = ?err, + "sender already closed while waiting on verbosity decrease change", + ); + } + + Ok(()) + } + + pub async fn set_custom_tracing_and_wait( + &mut self, + directives: impl Into + Send, + ) -> Result<(), ClientError> { + let (tx, rx) = oneshot::channel(); + + self.set_custom_tracing_inner(directives, Some(tx)).await?; + + if let Err(err) = rx.await { + warn!(error = ?err, "sender already closed while waiting on custom tracing change"); + } + + Ok(()) + } + + async fn set_verbosity_inner( + &mut self, + updated: Verbosity, + wait: Option>, + ) -> Result<(), ClientError> { let mut guard = self.tracing_level.lock().await; let tracing_level = guard.deref_mut(); @@ -225,39 +284,50 @@ impl TelemetryClient for ApplicationTelemetryClient { } self.update_telemetry_tx - .send(TelemetryCommand::TracingLevel(tracing_level.clone()))?; + .send(TelemetryCommand::TracingLevel { + level: tracing_level.clone(), + wait, + })?; + Ok(()) } - async fn increase_verbosity(&mut self) -> Result<(), ClientError> { + async fn increase_verbosity_inner( + &mut self, + wait: Option>, + ) -> Result<(), ClientError> { let guard = self.tracing_level.lock().await; match guard.deref() { TracingLevel::Verbosity { verbosity, .. } => { let updated = verbosity.increase(); drop(guard); - self.set_verbosity(updated).await + self.set_verbosity_inner(updated, wait).await } TracingLevel::Custom(_) => Err(ClientError::CustomHasNoVerbosity), } } - async fn decrease_verbosity(&mut self) -> Result<(), ClientError> { + async fn decrease_verbosity_inner( + &mut self, + wait: Option>, + ) -> Result<(), ClientError> { let guard = self.tracing_level.lock().await; match guard.deref() { TracingLevel::Verbosity { verbosity, .. } => { let updated = verbosity.decrease(); drop(guard); - self.set_verbosity(updated).await + self.set_verbosity_inner(updated, wait).await } TracingLevel::Custom(_) => Err(ClientError::CustomHasNoVerbosity), } } - async fn set_custom_tracing( + async fn set_custom_tracing_inner( &mut self, - directives: impl Into + Send + 'async_trait, + directives: impl Into + Send, + wait: Option>, ) -> Result<(), ClientError> { let mut guard = self.tracing_level.lock().await; let tracing_level = guard.deref_mut(); @@ -265,11 +335,36 @@ impl TelemetryClient for ApplicationTelemetryClient { let updated = TracingLevel::custom(directives); *tracing_level = updated; self.update_telemetry_tx - .send(TelemetryCommand::TracingLevel(tracing_level.clone()))?; + .send(TelemetryCommand::TracingLevel { + level: tracing_level.clone(), + wait, + })?; Ok(()) } } +#[async_trait] +impl TelemetryClient for ApplicationTelemetryClient { + async fn set_verbosity(&mut self, updated: Verbosity) -> Result<(), ClientError> { + self.set_verbosity_inner(updated, None).await + } + + async fn increase_verbosity(&mut self) -> Result<(), ClientError> { + self.increase_verbosity_inner(None).await + } + + async fn decrease_verbosity(&mut self) -> Result<(), ClientError> { + self.decrease_verbosity_inner(None).await + } + + async fn set_custom_tracing( + &mut self, + directives: impl Into + Send + 'async_trait, + ) -> Result<(), ClientError> { + self.set_custom_tracing_inner(directives, None).await + } +} + #[async_trait] impl TelemetryLevel for ApplicationTelemetryClient { async fn is_debug_or_lower(&self) -> bool { @@ -325,10 +420,13 @@ pub enum ClientError { } #[remain::sorted] -#[derive(Clone, Debug)] +#[derive(Debug)] pub enum TelemetryCommand { Shutdown(CancellationToken), - TracingLevel(TracingLevel), + TracingLevel { + level: TracingLevel, + wait: Option>, + }, } #[remain::sorted]