Skip to content

Commit

Permalink
merge: #3280
Browse files Browse the repository at this point in the history
3280: chore: add optional API to make tracing level changes "wait-able" r=fnichol a=fnichol

This change adds several `*_and_wait()` method options to the `ApplicationTelemetryClient` type. When our services boot up we check the parsed CLI argument for verbosity flags. If there are more than one set then we issue a command to raise the tracing level but prior to this change, this operation was non-blocking/async. That means that the level may not have raised by the time the next line of code executes resulting in potential missing log messages.

<img src="https://media2.giphy.com/media/l0HlKrB02QY0f1mbm/giphy.gif"/>

Co-authored-by: Fletcher Nichol <[email protected]>
  • Loading branch information
si-bors-ng[bot] and fnichol authored Feb 7, 2024
2 parents 1a9c74a + 28d7e55 commit a69b546
Show file tree
Hide file tree
Showing 8 changed files with 141 additions and 23 deletions.
4 changes: 3 additions & 1 deletion bin/council/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,9 @@ async fn async_main() -> Result<()> {
};

if args.verbose > 0 {
telemetry.set_verbosity(args.verbose.into()).await?;
telemetry
.set_verbosity_and_wait(args.verbose.into())
.await?;
}
trace!(arguments =?args, "parsed cli arguments");

Expand Down
4 changes: 3 additions & 1 deletion bin/cyclone/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@ async fn main() -> Result<()> {
};

if args.verbose > 0 {
telemetry.set_verbosity(args.verbose.into()).await?;
telemetry
.set_verbosity_and_wait(args.verbose.into())
.await?;
}
trace!(arguments =?args, "parsed cli arguments");

Expand Down
4 changes: 3 additions & 1 deletion bin/module-index/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,9 @@ async fn async_main() -> Result<()> {
};

if args.verbose > 0 {
telemetry.set_verbosity(args.verbose.into()).await?;
telemetry
.set_verbosity_and_wait(args.verbose.into())
.await?;
}
trace!(arguments =?args, "parsed cli arguments");

Expand Down
4 changes: 3 additions & 1 deletion bin/pinga/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,9 @@ async fn async_main() -> Result<()> {
};

if args.verbose > 0 {
telemetry.set_verbosity(args.verbose.into()).await?;
telemetry
.set_verbosity_and_wait(args.verbose.into())
.await?;
}
trace!(arguments =?args, "parsed cli arguments");

Expand Down
4 changes: 3 additions & 1 deletion bin/sdf/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,9 @@ async fn async_main() -> Result<()> {
};

if args.verbose > 0 {
telemetry.set_verbosity(args.verbose.into()).await?;
telemetry
.set_verbosity_and_wait(args.verbose.into())
.await?;
}
trace!(arguments =?args, "parsed cli arguments");

Expand Down
4 changes: 3 additions & 1 deletion bin/veritech/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ async fn main() -> Result<()> {
};

if args.verbose > 0 {
telemetry.set_verbosity(args.verbose.into()).await?;
telemetry
.set_verbosity_and_wait(args.verbose.into())
.await?;
}
trace!(arguments =?args, "parsed cli arguments");

Expand Down
12 changes: 10 additions & 2 deletions lib/telemetry-application-rs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -579,14 +579,22 @@ impl TelemetryUpdateTask {
async fn run(mut self) {
while let Some(command) = self.update_command_rx.recv().await {
match command {
TelemetryCommand::TracingLevel(tracing_level) => {
if let Err(err) = self.update_tracing_level(tracing_level) {
TelemetryCommand::TracingLevel { level, wait } => {
if let Err(err) = self.update_tracing_level(level) {
warn!(
task = Self::NAME,
error = ?err,
"failed to update tracing level, using prior value",
);
}
if let Some(tx) = wait {
if let Err(err) = tx.send(()) {
warn!(
error = ?err,
"receiver already closed when waiting on changing tracing level",
);
}
}
}
TelemetryCommand::Shutdown(token) => {
if !self.is_shutdown {
Expand Down
128 changes: 113 additions & 15 deletions lib/telemetry-rs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,12 @@ use std::{

use async_trait::async_trait;
use thiserror::Error;
use tokio::sync::{mpsc, Mutex};
use tokio::sync::{mpsc, oneshot, Mutex};
use tokio_util::sync::CancellationToken;

pub use opentelemetry::{self, trace::SpanKind};
pub use tracing;
use tracing::warn;

pub mod prelude {
pub use super::{MessagingOperation, SpanExt, SpanKind, SpanKindExt};
Expand Down Expand Up @@ -200,11 +201,69 @@ impl ApplicationTelemetryClient {
update_telemetry_tx,
}
}
}

#[async_trait]
impl TelemetryClient for ApplicationTelemetryClient {
async fn set_verbosity(&mut self, updated: Verbosity) -> Result<(), ClientError> {
pub async fn set_verbosity_and_wait(&mut self, updated: Verbosity) -> Result<(), ClientError> {
let (tx, rx) = oneshot::channel();

self.set_verbosity_inner(updated, Some(tx)).await?;

if let Err(err) = rx.await {
warn!(error = ?err, "sender already closed while waiting on verbosity change");
}

Ok(())
}

pub async fn increase_verbosity_and_wait(&mut self) -> Result<(), ClientError> {
let (tx, rx) = oneshot::channel();

self.increase_verbosity_inner(Some(tx)).await?;

if let Err(err) = rx.await {
warn!(
error = ?err,
"sender already closed while waiting on verbosity increase change",
);
}

Ok(())
}

pub async fn decrease_verbosity_and_wait(&mut self) -> Result<(), ClientError> {
let (tx, rx) = oneshot::channel();

self.decrease_verbosity_inner(Some(tx)).await?;

if let Err(err) = rx.await {
warn!(
error = ?err,
"sender already closed while waiting on verbosity decrease change",
);
}

Ok(())
}

pub async fn set_custom_tracing_and_wait(
&mut self,
directives: impl Into<String> + Send,
) -> Result<(), ClientError> {
let (tx, rx) = oneshot::channel();

self.set_custom_tracing_inner(directives, Some(tx)).await?;

if let Err(err) = rx.await {
warn!(error = ?err, "sender already closed while waiting on custom tracing change");
}

Ok(())
}

async fn set_verbosity_inner(
&mut self,
updated: Verbosity,
wait: Option<oneshot::Sender<()>>,
) -> Result<(), ClientError> {
let mut guard = self.tracing_level.lock().await;
let tracing_level = guard.deref_mut();

Expand All @@ -225,51 +284,87 @@ impl TelemetryClient for ApplicationTelemetryClient {
}

self.update_telemetry_tx
.send(TelemetryCommand::TracingLevel(tracing_level.clone()))?;
.send(TelemetryCommand::TracingLevel {
level: tracing_level.clone(),
wait,
})?;

Ok(())
}

async fn increase_verbosity(&mut self) -> Result<(), ClientError> {
async fn increase_verbosity_inner(
&mut self,
wait: Option<oneshot::Sender<()>>,
) -> Result<(), ClientError> {
let guard = self.tracing_level.lock().await;

match guard.deref() {
TracingLevel::Verbosity { verbosity, .. } => {
let updated = verbosity.increase();
drop(guard);
self.set_verbosity(updated).await
self.set_verbosity_inner(updated, wait).await
}
TracingLevel::Custom(_) => Err(ClientError::CustomHasNoVerbosity),
}
}

async fn decrease_verbosity(&mut self) -> Result<(), ClientError> {
async fn decrease_verbosity_inner(
&mut self,
wait: Option<oneshot::Sender<()>>,
) -> Result<(), ClientError> {
let guard = self.tracing_level.lock().await;

match guard.deref() {
TracingLevel::Verbosity { verbosity, .. } => {
let updated = verbosity.decrease();
drop(guard);
self.set_verbosity(updated).await
self.set_verbosity_inner(updated, wait).await
}
TracingLevel::Custom(_) => Err(ClientError::CustomHasNoVerbosity),
}
}

async fn set_custom_tracing(
async fn set_custom_tracing_inner(
&mut self,
directives: impl Into<String> + Send + 'async_trait,
directives: impl Into<String> + Send,
wait: Option<oneshot::Sender<()>>,
) -> Result<(), ClientError> {
let mut guard = self.tracing_level.lock().await;
let tracing_level = guard.deref_mut();

let updated = TracingLevel::custom(directives);
*tracing_level = updated;
self.update_telemetry_tx
.send(TelemetryCommand::TracingLevel(tracing_level.clone()))?;
.send(TelemetryCommand::TracingLevel {
level: tracing_level.clone(),
wait,
})?;
Ok(())
}
}

#[async_trait]
impl TelemetryClient for ApplicationTelemetryClient {
async fn set_verbosity(&mut self, updated: Verbosity) -> Result<(), ClientError> {
self.set_verbosity_inner(updated, None).await
}

async fn increase_verbosity(&mut self) -> Result<(), ClientError> {
self.increase_verbosity_inner(None).await
}

async fn decrease_verbosity(&mut self) -> Result<(), ClientError> {
self.decrease_verbosity_inner(None).await
}

async fn set_custom_tracing(
&mut self,
directives: impl Into<String> + Send + 'async_trait,
) -> Result<(), ClientError> {
self.set_custom_tracing_inner(directives, None).await
}
}

#[async_trait]
impl TelemetryLevel for ApplicationTelemetryClient {
async fn is_debug_or_lower(&self) -> bool {
Expand Down Expand Up @@ -325,10 +420,13 @@ pub enum ClientError {
}

#[remain::sorted]
#[derive(Clone, Debug)]
#[derive(Debug)]
pub enum TelemetryCommand {
Shutdown(CancellationToken),
TracingLevel(TracingLevel),
TracingLevel {
level: TracingLevel,
wait: Option<oneshot::Sender<()>>,
},
}

#[remain::sorted]
Expand Down

0 comments on commit a69b546

Please sign in to comment.