Skip to content

Commit

Permalink
merge: #3232
Browse files Browse the repository at this point in the history
3232: feat: refactor application telemetry r=fnichol a=fnichol

This change aggresively refactors the telemetry setup for all services.
The enabling/disabling of OpenTelemetry has been removed and is now
*always* active and set up.

The dynamic tracing level functionality remains but its approach is
different. In this current iteration, both the console logger and the
OpenTelemetry layer have their own reloadable `EnvFilter` layer on top,
whereas before the `EnvFilter` layer was its own third layer in the
`Registry` stack. That is:

```
- Registry
  - console_log
    - env_filter
  - otel
    - env_filter
```

At the moment both of the `EnvFilter`s are updated identically and at
the same time, but these may become more independent in future work.

Additionally, long-running the signal handling and level-updating tasks
have been re-written to be their own structs.

Finally, a new pattern is introduced into each of the service binary
crates which make use of 2 types from the `tokio-util` crate:
`TaskTracker` and `CancellationToken`. These 2 types work extremely well
together and make cancellation/shutown signalling and waiting (i.e. a
process-wide graceful shutdown) much more straight forward. Future work
will embed this idea further down into each service, allowing much of
the existing shutdown channel infrastructure to be pulled out.


Co-authored-by: Fletcher Nichol <[email protected]>
  • Loading branch information
si-bors-ng[bot] and fnichol authored Jan 30, 2024
2 parents 880b2c7 + b343062 commit d847829
Show file tree
Hide file tree
Showing 31 changed files with 1,139 additions and 1,045 deletions.
227 changes: 120 additions & 107 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ tokio-serde = { version = "0.8.0", features = ["json"] }
tokio-stream = "0.1.14"
tokio-test = "0.4.2"
tokio-tungstenite = "0.20.1" # todo: pinning back from 0.21.0, upgrade this alongside hyper/http/axum/tokio-tungstenite,tower-http
tokio-util = { version = "0.7.8", features = ["codec"] }
tokio-util = { version = "0.7.8", features = ["codec", "rt"] }
tokio-vsock = { version = "0.4.0"}
toml = { version = "0.8.8" }
tower = "0.4.13"
Expand Down
3 changes: 2 additions & 1 deletion bin/council/BUCK
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ rust_binary(
"//third-party/rust:clap",
"//third-party/rust:color-eyre",
"//third-party/rust:tokio",
"//third-party/rust:tokio-util",
],
srcs = glob(["src/**/*.rs"]),
)
Expand All @@ -28,4 +29,4 @@ nix_omnibus_pkg(
name = "omnibus",
pkg_name = "council",
build_dep = "//bin/council:council",
)
)
1 change: 1 addition & 0 deletions bin/council/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,4 @@ color-eyre = { workspace = true }
council-server = { path = "../../lib/council-server" }
telemetry-application = { path = "../../lib/telemetry-application-rs" }
tokio = { workspace = true }
tokio-util = { workspace = true }
61 changes: 29 additions & 32 deletions bin/council/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,67 +1,64 @@
use color_eyre::Result;
use telemetry_application::{
prelude::*, ApplicationTelemetryClient, TelemetryClient, TelemetryConfig,
};
use telemetry_application::prelude::*;
use tokio::sync::watch;
use tokio_util::{sync::CancellationToken, task::TaskTracker};

mod args;

const RT_DEFAULT_THREAD_STACK_SIZE: usize = 2 * 1024 * 1024 * 3;

fn main() {
std::thread::Builder::new()
.stack_size(RT_DEFAULT_THREAD_STACK_SIZE)
.name("bin/council-std::thread".to_owned())
.spawn(move || {
let runtime = tokio::runtime::Builder::new_multi_thread()
.thread_stack_size(RT_DEFAULT_THREAD_STACK_SIZE)
.thread_name("bin/council-tokio::runtime".to_owned())
.enable_all()
.build()?;
runtime.block_on(async_main())
})
.expect("council thread failed")
.join()
.expect("council thread panicked")
.expect("council thread join failed");
fn main() -> Result<()> {
let thread_builder = ::std::thread::Builder::new().stack_size(RT_DEFAULT_THREAD_STACK_SIZE);
let thread_handler = thread_builder.spawn(|| {
tokio::runtime::Builder::new_multi_thread()
.thread_stack_size(RT_DEFAULT_THREAD_STACK_SIZE)
.thread_name("bin/council-tokio::runtime")
.enable_all()
.build()?
.block_on(async_main())
})?;
thread_handler.join().unwrap()
}

async fn async_main() -> Result<()> {
let shutdown_token = CancellationToken::new();
let task_tracker = TaskTracker::new();

color_eyre::install()?;
let config = TelemetryConfig::builder()
.service_name("council")
.service_namespace("si")
.log_env_var_prefix("SI")
.app_modules(vec!["council", "council_server"])
.build()?;
let telemetry = telemetry_application::init(config)?;
let mut telemetry = telemetry_application::init(config, &task_tracker, shutdown_token.clone())?;
let args = args::parse();

let (_shutdown_request_tx, shutdown_request_rx) = watch::channel(());
tokio::task::spawn(run(args, telemetry, shutdown_request_rx)).await??;

Ok(())
}

async fn run(
args: args::Args,
mut telemetry: ApplicationTelemetryClient,
shutdown_request_rx: watch::Receiver<()>,
) -> Result<()> {
if args.verbose > 0 {
telemetry.set_verbosity(args.verbose.into()).await?;
}
debug!(arguments =?args, "parsed cli arguments");

if args.disable_opentelemetry {
telemetry.disable_opentelemetry().await?;
}
task_tracker.close();

let config = council_server::server::Config::try_from(args)?;
let server = council_server::Server::new_with_config(config).await?;
let (subscriber_started_tx, _subscriber_started_rx) = watch::channel(());
server
.run(subscriber_started_tx, shutdown_request_rx.clone())
.await?;

// TODO(fnichol): this will eventually go into the signal handler code but at the moment in
// council's case, this is embedded in server library code which is incorrect. At this moment
// in the program however, the main council server has shut down so it's an appropriate time to
// cancel other remaining tasks and wait on their graceful shutdowns
{
shutdown_token.cancel();
task_tracker.wait().await;
}

info!("graceful shutdown complete.");
Ok(())
}
1 change: 1 addition & 0 deletions bin/cyclone/BUCK
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ rust_binary(
"//third-party/rust:clap",
"//third-party/rust:color-eyre",
"//third-party/rust:tokio",
"//third-party/rust:tokio-util",
],
srcs = glob(["src/**/*.rs"]),
)
Expand Down
1 change: 1 addition & 0 deletions bin/cyclone/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,4 @@ color-eyre = { version = "0.6.1" }
cyclone-server = { path = "../../lib/cyclone-server" }
telemetry-application = { path = "../../lib/telemetry-application-rs" }
tokio = { workspace = true }
tokio-util = { workspace = true }
32 changes: 17 additions & 15 deletions bin/cyclone/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
use color_eyre::Result;
use cyclone_server::{Config, IncomingStream, Server};
use telemetry_application::{
prelude::*, start_tracing_level_signal_handler_task, ApplicationTelemetryClient,
TelemetryClient, TelemetryConfig,
};
use telemetry_application::prelude::*;
use tokio_util::{sync::CancellationToken, task::TaskTracker};

mod args;

Expand All @@ -17,6 +15,9 @@ const CUSTOM_DEFAULT_TRACING_LEVEL: &str = "warn";

#[tokio::main]
async fn main() -> Result<()> {
let shutdown_token = CancellationToken::new();
let task_tracker = TaskTracker::new();

color_eyre::install()?;
let config = TelemetryConfig::builder()
.service_name("cyclone")
Expand All @@ -25,30 +26,22 @@ async fn main() -> Result<()> {
.app_modules(vec!["cyclone", "cyclone_server"])
.custom_default_tracing_level(CUSTOM_DEFAULT_TRACING_LEVEL)
.build()?;
let telemetry = telemetry_application::init(config)?;
let mut telemetry = telemetry_application::init(config, &task_tracker, shutdown_token.clone())?;
let args = args::parse();

run(args, telemetry).await
}

async fn run(args: args::Args, mut telemetry: ApplicationTelemetryClient) -> Result<()> {
if args.verbose > 0 {
telemetry.set_verbosity(args.verbose.into()).await?;
}
debug!(arguments =?args, "parsed cli arguments");

if args.disable_opentelemetry {
telemetry.disable_opentelemetry().await?;
}

let decryption_key = Server::load_decryption_key(&args.decryption_key).await?;

let config = Config::try_from(args)?;

start_tracing_level_signal_handler_task(&telemetry)?;

let telemetry = Box::new(telemetry);

task_tracker.close();

match config.incoming_stream() {
IncomingStream::HTTPSocket(_) => {
Server::http(config, telemetry, decryption_key)?
Expand All @@ -70,5 +63,14 @@ async fn run(args: args::Args, mut telemetry: ApplicationTelemetryClient) -> Res
}
}

// TODO(fnichol): this will eventually go into the signal handler code but at the moment in
// cyclone's case, this is embedded in server library code which is incorrect. At this moment in
// the program however, axum has shut down so it's an appropriate time to cancel other
// remaining tasks and wait on their graceful shutdowns
{
shutdown_token.cancel();
task_tracker.wait().await;
}

Ok(())
}
1 change: 1 addition & 0 deletions bin/module-index/BUCK
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ rust_binary(
"//third-party/rust:clap",
"//third-party/rust:color-eyre",
"//third-party/rust:tokio",
"//third-party/rust:tokio-util",
],
srcs = glob(["src/**/*.rs"]),
resources = {
Expand Down
1 change: 1 addition & 0 deletions bin/module-index/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,4 @@ color-eyre = { workspace = true }
module-index-server = { path = "../../lib/module-index-server" }
telemetry-application = { path = "../../lib/telemetry-application-rs" }
tokio = { workspace = true }
tokio-util = { workspace = true }
38 changes: 22 additions & 16 deletions bin/module-index/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
use color_eyre::Result;
use module_index_server::{Config, Server};
use telemetry_application::{
prelude::*, start_tracing_level_signal_handler_task, ApplicationTelemetryClient,
TelemetryClient, TelemetryConfig,
};
use telemetry_application::prelude::*;
use tokio_util::{sync::CancellationToken, task::TaskTracker};

mod args;

const RT_DEFAULT_THREAD_STACK_SIZE: usize = 2 * 1024 * 1024 * 3;

fn main() -> Result<()> {
let thread_builder = ::std::thread::Builder::new();
let thread_builder = ::std::thread::Builder::new().stack_size(RT_DEFAULT_THREAD_STACK_SIZE);
let thread_handler = thread_builder.spawn(|| {
tokio::runtime::Builder::new_multi_thread()
.thread_stack_size(RT_DEFAULT_THREAD_STACK_SIZE)
.thread_name("bin/module-index-tokio::runtime")
.enable_all()
.build()?
Expand All @@ -20,29 +21,24 @@ fn main() -> Result<()> {
}

async fn async_main() -> Result<()> {
let shutdown_token = CancellationToken::new();
let task_tracker = TaskTracker::new();

color_eyre::install()?;
let config = TelemetryConfig::builder()
.service_name("module-index")
.service_namespace("si")
.log_env_var_prefix("SI")
.app_modules(vec!["module_index", "module_index_server"])
.build()?;
let telemetry = telemetry_application::init(config)?;
let mut telemetry = telemetry_application::init(config, &task_tracker, shutdown_token.clone())?;
let args = args::parse();

run(args, telemetry).await
}

async fn run(args: args::Args, mut telemetry: ApplicationTelemetryClient) -> Result<()> {
if args.verbose > 0 {
telemetry.set_verbosity(args.verbose.into()).await?;
}
debug!(arguments =?args, "parsed cli arguments");

if args.disable_opentelemetry {
telemetry.disable_opentelemetry().await?;
}

let config = Config::try_from(args)?;

let jwt_public_signing_key =
Expand All @@ -57,15 +53,25 @@ async fn run(args: args::Args, mut telemetry: ApplicationTelemetryClient) -> Res
// this is the SeaOrm-managed Pg Pool
let pg_pool = Server::create_db_connection(config.pg_pool()).await?;

start_tracing_level_signal_handler_task(&telemetry)?;

let posthog_client = Server::start_posthog(config.posthog()).await?;

task_tracker.close();

let (server, initial_shutdown_broadcast_rx) =
Server::http(config, pg_pool, jwt_public_signing_key, posthog_client)?;
let _second_shutdown_broadcast_rx = initial_shutdown_broadcast_rx.resubscribe();

server.run().await?;

// TODO(fnichol): this will eventually go into the signal handler code but at the moment in
// module-index's case, this is embedded in server library code which is incorrect. At this
// moment in the program however, axum has shut down so it's an appropriate time to cancel
// other remaining tasks and wait on their graceful shutdowns
{
shutdown_token.cancel();
task_tracker.wait().await;
}

info!("graceful shutdown complete.");
Ok(())
}
1 change: 1 addition & 0 deletions bin/pinga/BUCK
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ rust_binary(
"//third-party/rust:clap",
"//third-party/rust:color-eyre",
"//third-party/rust:tokio",
"//third-party/rust:tokio-util",
],
srcs = glob(["src/**/*.rs"]),
resources = {
Expand Down
1 change: 1 addition & 0 deletions bin/pinga/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,4 @@ color-eyre = { workspace = true }
pinga-server = { path = "../../lib/pinga-server" }
telemetry-application = { path = "../../lib/telemetry-application-rs" }
tokio = { workspace = true }
tokio-util = { workspace = true }
32 changes: 17 additions & 15 deletions bin/pinga/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
use color_eyre::Result;
use pinga_server::{Config, Server};
use telemetry_application::{
prelude::*, start_tracing_level_signal_handler_task, ApplicationTelemetryClient,
TelemetryClient, TelemetryConfig,
};
use telemetry_application::prelude::*;
use tokio_util::{sync::CancellationToken, task::TaskTracker};

mod args;

Expand All @@ -23,35 +21,39 @@ fn main() -> Result<()> {
}

async fn async_main() -> Result<()> {
let shutdown_token = CancellationToken::new();
let task_tracker = TaskTracker::new();

color_eyre::install()?;
let config = TelemetryConfig::builder()
.service_name("pinga")
.service_namespace("si")
.log_env_var_prefix("SI")
.app_modules(vec!["pinga", "pinga_server"])
.build()?;
let telemetry = telemetry_application::init(config)?;

let mut telemetry = telemetry_application::init(config, &task_tracker, shutdown_token.clone())?;
let args = args::parse();

run(args, telemetry).await
}

async fn run(args: args::Args, mut telemetry: ApplicationTelemetryClient) -> Result<()> {
if args.verbose > 0 {
telemetry.set_verbosity(args.verbose.into()).await?;
}
debug!(arguments =?args, "parsed cli arguments");

if args.disable_opentelemetry {
telemetry.disable_opentelemetry().await?;
}

let config = Config::try_from(args)?;

start_tracing_level_signal_handler_task(&telemetry)?;
task_tracker.close();

Server::from_config(config).await?.run().await?;

// TODO(fnichol): this will eventually go into the signal handler code but at the moment in
// sdf's case, this is embedded in server library code which is incorrect. At this moment in
// the program however, axum has shut down so it's an appropriate time to cancel other
// remaining tasks and wait on their graceful shutdowns
{
shutdown_token.cancel();
task_tracker.wait().await;
}

info!("graceful shutdown complete.");
Ok(())
}
1 change: 1 addition & 0 deletions bin/sdf/BUCK
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ rust_binary(
"//third-party/rust:clap",
"//third-party/rust:color-eyre",
"//third-party/rust:tokio",
"//third-party/rust:tokio-util",
],
srcs = glob(["src/**/*.rs"]),
resources = {
Expand Down
1 change: 1 addition & 0 deletions bin/sdf/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,4 @@ color-eyre = { workspace = true }
sdf-server = { path = "../../lib/sdf-server" }
telemetry-application = { path = "../../lib/telemetry-application-rs" }
tokio = { workspace = true }
tokio-util = { workspace = true }
Loading

0 comments on commit d847829

Please sign in to comment.