diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 2fc9c77a9be..4869dd4e4c0 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -231,7 +231,7 @@ Follow these commit guidelines:
Expand to learn how to change the log level or write logs to a JSON. -If one of your tests is failing, you may want to decrease the maximum logging level. By default, Iroha only logs `INFO` level messages, but retains the ability to produce both `DEBUG` and `TRACE` level logs. This setting can be changed either using the `MAX_LOG_LEVEL` environment variable for code-based tests, or using the `/configuration` endpoint on one of the peers in a deployed network. +If one of your tests is failing, you may want to decrease the maximum logging level. By default, Iroha only logs `INFO` level messages, but retains the ability to produce both `DEBUG` and `TRACE` level logs. This setting can be changed either using the `LOG_LEVEL` environment variable for code-based tests, or using the `/configuration` endpoint on one of the peers in a deployed network. While logs printed in the `stdout` are sufficient, you may find it more convenient to produce `json`-formatted logs into a separate file and parse them using either [node-bunyan](https://www.npmjs.com/package/bunyan) or [rust-bunyan](https://crates.io/crates/bunyan). @@ -251,8 +251,8 @@ In this case you should compile iroha with support of tokio console like that: RUSTFLAGS="--cfg tokio_unstable" cargo build --features tokio-console ``` -Port for tokio console can by configured through `TOKIO_CONSOLE_ADDR` configuration parameter (or environment variable). -Using tokio console require log level to be `TRACE`, can be enabled through configuration parameter or environment variable `MAX_LOG_LEVEL`. +Port for tokio console can by configured through `LOG_TOKIO_CONSOLE_ADDR` configuration parameter (or environment variable). +Using tokio console require log level to be `TRACE`, can be enabled through configuration parameter or environment variable `LOG_LEVEL`. Example of running iroha with tokio console support using `scripts/test_env.sh`: @@ -260,7 +260,7 @@ Example of running iroha with tokio console support using `scripts/test_env.sh`: # 1. Compile iroha RUSTFLAGS="--cfg tokio_unstable" cargo build --features tokio-console # 2. Run iroha with TRACE log level -MAX_LOG_LEVEL=TRACE ./scripts/test_env.sh setup +LOG_LEVEL=TRACE ./scripts/test_env.sh setup # 3. Access iroha. Peers will be available on ports 5555, 5556, ... tokio-console http://127.0.0.1:5555 ``` diff --git a/Cargo.lock b/Cargo.lock index 9dadaa730bd..dd4bd2c1248 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -40,7 +40,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2c99f64d1e06488f620f932677e24bc6e2897582980441ae90a671415bd7ec2f" dependencies = [ "cfg-if", - "getrandom 0.2.10", "once_cell", "version_check", ] @@ -1722,16 +1721,6 @@ dependencies = [ "zeroize", ] -[[package]] -name = "gethostname" -version = "0.2.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c1ebd34e35c46e00bb73e81363248d627782724609fe1b6396f553f68fe3862e" -dependencies = [ - "libc", - "winapi", -] - [[package]] name = "getrandom" version = "0.1.16" @@ -2746,6 +2735,7 @@ dependencies = [ "cfg-if", "derive_more", "displaydoc", + "expect-test", "eyre", "iroha_config_base", "iroha_crypto", @@ -3063,9 +3053,9 @@ dependencies = [ "iroha_data_model", "once_cell", "serde_json", + "thiserror", "tokio", "tracing", - "tracing-bunyan-formatter", "tracing-core", "tracing-error", "tracing-futures", @@ -5556,6 +5546,7 @@ dependencies = [ "futures-core", "pin-project-lite", "tokio", + "tokio-util", ] [[package]] @@ -5699,24 +5690,6 @@ dependencies = [ "syn 2.0.38", ] -[[package]] -name = "tracing-bunyan-formatter" -version = "0.3.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b5c266b9ac83dedf0e0385ad78514949e6d89491269e7065bee51d2bb8ec7373" -dependencies = [ - "ahash", - "gethostname", - "log", - "serde", - "serde_json", - "time", - "tracing", - "tracing-core", - "tracing-log", - "tracing-subscriber", -] - [[package]] name = "tracing-core" version = "0.1.31" @@ -5759,13 +5732,12 @@ dependencies = [ ] [[package]] -name = "tracing-log" +name = "tracing-serde" version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "78ddad33d2d10b1ed7eb9d1f518a5674713876e97e5bb9b7345a7984fbb4f922" +checksum = "bc6b213177105856957181934e4920de57730fc69bf42c37ee5bb664d406d9e1" dependencies = [ - "lazy_static", - "log", + "serde", "tracing-core", ] @@ -5779,11 +5751,14 @@ dependencies = [ "nu-ansi-term", "once_cell", "regex", + "serde", + "serde_json", "sharded-slab", "smallvec", "thread_local", "tracing", "tracing-core", + "tracing-serde", ] [[package]] diff --git a/README.md b/README.md index 237481c5a48..5877ff5f40d 100644 --- a/README.md +++ b/README.md @@ -169,7 +169,7 @@ For a list of all endpoints, available operations, and ways to customize them wi By default, Iroha provides logs in a human-readable format and prints them out to `stdout`. -The logging level can be changed either via a [configuration option](./docs/source/references/config.md#loggermax_log_level) or at run-time using the `configuration` endpoint. +The logging level can be changed either via the [`logger.level` configuration parameter](./docs/source/references/config.md#loggerlevel) or at run-time using the `configuration` endpoint.
Example: changing log level @@ -178,17 +178,13 @@ For example, if your Iroha instance is running at `127.0.0.1:8080` and you want curl -X POST \ -H 'content-type: application/json' \ http://127.0.0.1:8080/configuration \ - -d '{"LogLevel": "DEBUG"}' -i + -d '{"logger": {"level": "DEBUG"}}' -i ```
-#### JSON Logging Mode +The log format might be configured via the [`logger.format` configuration parameter](./docs/source/references/config.md#loggerformat). Possible values are: `full` (default), `compact`, `pretty`, and `json`. -Additionally, Iroha supports a JSON logging mode. - -To enable it, provide the [logging file](./docs/source/references/config.md#loggerlog_file_path) to store the logs in. On UNIX, you can also specify `/dev/stdout` or `/dev/stderr` if you prefer to pipe the output to [`bunyan`](https://www.npmjs.com/package/bunyan). - -[Log rotation](https://www.commandlinux.com/man-page/man5/logrotate.conf.5.html) is the responsibility of the peer administrator. +Output goes to `/dev/stdout`. Piping to files or [log rotation](https://www.commandlinux.com/man-page/man5/logrotate.conf.5.html) is the responsibility of the peer administrator. ### Monitoring diff --git a/cli/src/lib.rs b/cli/src/lib.rs index bb72b92c049..2d8a9a9c078 100644 --- a/cli/src/lib.rs +++ b/cli/src/lib.rs @@ -13,11 +13,13 @@ use iroha_config::{ base::proxy::{LoadFromDisk, LoadFromEnv, Override}, iroha::{Configuration, ConfigurationProxy}, path::Path as ConfigPath, + telemetry::Configuration as TelemetryConfiguration, }; use iroha_core::{ block_sync::{BlockSynchronizer, BlockSynchronizerHandle}, gossiper::{TransactionGossiper, TransactionGossiperHandle}, handler::ThreadHandler, + kiso::KisoHandle, kura::Kura, prelude::{World, WorldStateView}, query::store::LiveQueryStore, @@ -30,6 +32,7 @@ use iroha_core::{ }; use iroha_data_model::prelude::*; use iroha_genesis::GenesisNetwork; +use iroha_logger::actor::LoggerHandle; use tokio::{ signal, sync::{broadcast, mpsc, Notify}, @@ -74,6 +77,25 @@ impl Default for Arguments { } } +/// Reflects user decision (or its absence) about ANSI colored output +#[derive(Copy, Clone, Debug)] +pub enum TerminalColorsArg { + /// Coloring should be decided automatically + Default, + /// User explicitly specified the value + UserSet(bool), +} + +impl TerminalColorsArg { + /// Transforms the enumeration into flag + pub fn evaluate(self) -> bool { + match self { + Self::Default => supports_color::on(supports_color::Stream::Stdout).is_some(), + Self::UserSet(x) => x, + } + } +} + /// Iroha is an /// [Orchestrator](https://en.wikipedia.org/wiki/Orchestration_%28computing%29) /// of the system. It configures, coordinates and manages transactions @@ -85,6 +107,8 @@ impl Default for Arguments { /// forgot this step. #[must_use = "run `.start().await?` to not immediately stop Iroha"] pub struct Iroha { + /// Actor responsible for the configuration + pub kiso: KisoHandle, /// Queue of transactions pub queue: Arc, /// Sumeragi consensus @@ -225,7 +249,7 @@ impl Iroha { pub async fn with_genesis( genesis: Option, config: Configuration, - telemetry: Option, + logger: LoggerHandle, ) -> Result { let listen_addr = config.torii.p2p_addr.clone(); let network = IrohaNetwork::start(listen_addr, config.sumeragi.key_pair.clone()) @@ -234,15 +258,11 @@ impl Iroha { let (events_sender, _) = broadcast::channel(10000); let world = World::with( - [genesis_domain(&config)], + [genesis_domain(config.genesis.account_public_key.clone())], config.sumeragi.trusted_peers.peers.clone(), ); - let kura = Kura::new( - config.kura.init_mode, - std::path::Path::new(&config.kura.block_store_path), - config.kura.debug_output_new_blocks, - )?; + let kura = Kura::new(&config.kura)?; let live_query_store_handle = LiveQueryStore::from_configuration(config.live_query_store).start(); @@ -273,11 +293,10 @@ impl Iroha { ); let queue = Arc::new(Queue::from_configuration(&config.queue)); - if Self::start_telemetry(telemetry, &config).await? { - iroha_logger::info!("Telemetry started") - } else { - iroha_logger::warn!("Telemetry not started") - } + match Self::start_telemetry(&logger, &config.telemetry).await? { + TelemetryStartStatus::Started => iroha_logger::info!("Telemetry started"), + TelemetryStartStatus::NotStarted => iroha_logger::warn!("Telemetry not started"), + }; let kura_thread_handler = Kura::start(Arc::clone(&kura)); @@ -328,8 +347,11 @@ impl Iroha { let snapshot_maker = SnapshotMaker::from_configuration(&config.snapshot, sumeragi.clone()).start(); - let torii = Torii::from_configuration( - config.clone(), + let kiso = KisoHandle::new(config.clone()); + + let torii = Torii::new( + kiso.clone(), + &config.torii, Arc::clone(&queue), events_sender, Arc::clone(¬ify_shutdown), @@ -338,12 +360,15 @@ impl Iroha { Arc::clone(&kura), ); + Self::spawn_configuration_updates_broadcasting(kiso.clone(), logger.clone()); + Self::start_listening_signal(Arc::clone(¬ify_shutdown))?; Self::prepare_panic_hook(notify_shutdown); let torii = Some(torii); Ok(Self { + kiso, queue, sumeragi, kura, @@ -389,37 +414,46 @@ impl Iroha { #[cfg(feature = "telemetry")] async fn start_telemetry( - telemetry: Option<( - iroha_logger::SubstrateTelemetry, - iroha_logger::FutureTelemetry, - )>, - config: &Configuration, - ) -> Result { + logger: &LoggerHandle, + config: &TelemetryConfiguration, + ) -> Result { #[allow(unused)] - if let Some((substrate_telemetry, telemetry_future)) = telemetry { - #[cfg(feature = "dev-telemetry")] - { - iroha_telemetry::dev::start(&config.telemetry, telemetry_future) + let (config_for_regular, config_for_dev) = config.parse(); + + #[cfg(feature = "dev-telemetry")] + { + if let Some(config) = config_for_dev { + let receiver = logger + .subscribe_on_telemetry(iroha_logger::telemetry::Channel::Future) + .await + .wrap_err("Failed to subscribe on telemetry")?; + let _handle = iroha_telemetry::dev::start(config, receiver) .await .wrap_err("Failed to setup telemetry for futures")?; } - iroha_telemetry::ws::start(&config.telemetry, substrate_telemetry) + } + + if let Some(config) = config_for_regular { + let receiver = logger + .subscribe_on_telemetry(iroha_logger::telemetry::Channel::Regular) + .await + .wrap_err("Failed to subscribe on telemetry")?; + let _handle = iroha_telemetry::ws::start(config, receiver) .await - .wrap_err("Failed to setup telemetry for websocket communication") + .wrap_err("Failed to setup telemetry for websocket communication")?; + + Ok(TelemetryStartStatus::Started) } else { - Ok(false) + Ok(TelemetryStartStatus::NotStarted) } } #[cfg(not(feature = "telemetry"))] async fn start_telemetry( - _telemetry: Option<( - iroha_logger::SubstrateTelemetry, - iroha_logger::FutureTelemetry, - )>, - _config: &Configuration, - ) -> Result { - Ok(false) + _logger: &LoggerHandle, + _config: &TelemetryConfiguration, + ) -> Result { + Ok(TelemetryStartStatus::NotStarted) } #[allow(clippy::redundant_pub_crate)] @@ -448,22 +482,52 @@ impl Iroha { Ok(handle) } + + /// Spawns a task which subscribes on updates from configuration actor + /// and broadcasts them further to interested actors. This way, neither config actor nor other ones know + /// about each other, achieving loose coupling of code and system. + fn spawn_configuration_updates_broadcasting( + kiso: KisoHandle, + logger: LoggerHandle, + ) -> task::JoinHandle<()> { + tokio::spawn(async move { + let mut log_level_update = kiso + .subscribe_on_log_level() + .await + // FIXME: don't like neither the message nor inability to throw Result to the outside + .expect("Cannot proceed without working subscriptions"); + + loop { + tokio::select! { + Ok(()) = log_level_update.changed() => { + let value = *log_level_update.borrow_and_update(); + if let Err(error) = logger.reload_level(value).await { + iroha_logger::error!("Failed to reload log level: {error}"); + }; + } + }; + } + }) + } } -fn genesis_account(public_key: iroha_crypto::PublicKey) -> Account { +enum TelemetryStartStatus { + Started, + NotStarted, +} + +fn genesis_account(public_key: PublicKey) -> Account { Account::new(iroha_genesis::GENESIS_ACCOUNT_ID.clone(), [public_key]) .build(&iroha_genesis::GENESIS_ACCOUNT_ID) } -fn genesis_domain(configuration: &Configuration) -> Domain { - let account_public_key = &configuration.genesis.account_public_key; - +fn genesis_domain(public_key: PublicKey) -> Domain { let mut domain = Domain::new(iroha_genesis::GENESIS_DOMAIN_ID.clone()) .build(&iroha_genesis::GENESIS_ACCOUNT_ID); domain.accounts.insert( iroha_genesis::GENESIS_ACCOUNT_ID.clone(), - genesis_account(account_public_key.clone()), + genesis_account(public_key), ); domain diff --git a/cli/src/main.rs b/cli/src/main.rs index e2a07e6ae74..16629ea7ea7 100644 --- a/cli/src/main.rs +++ b/cli/src/main.rs @@ -2,7 +2,7 @@ use std::env; use color_eyre::eyre::WrapErr as _; -use iroha::style::Styling; +use iroha::{style::Styling, TerminalColorsArg}; use iroha_config::path::Path as ConfigPath; use iroha_genesis::{GenesisNetwork, RawGenesisBlock}; use owo_colors::OwoColorize as _; @@ -10,6 +10,8 @@ use owo_colors::OwoColorize as _; const HELP_ARG: [&str; 2] = ["--help", "-h"]; const SUBMIT_ARG: [&str; 2] = ["--submit-genesis", "-s"]; const VERSION_ARG: [&str; 2] = ["--version", "-V"]; +const TERMINAL_COLORS_ARG: &str = "--terminal-colors"; +const NO_TERMINAL_COLORS_ARG: &str = "--no-terminal-colors"; const REQUIRED_ENV_VARS: [(&str, &str); 7] = [ ("IROHA_TORII", "Torii (gateway) endpoint configuration"), @@ -42,11 +44,29 @@ const REQUIRED_ENV_VARS: [(&str, &str); 7] = [ /// - Telemetry setup /// - [`Sumeragi`] init async fn main() -> Result<(), color_eyre::Report> { - let styling = Styling::new(); - if !iroha::style::should_disable_color() { + let mut args = iroha::Arguments::default(); + + let terminal_colors = env::var("TERMINAL_COLORS") + .ok() + .map(|s| !s.as_str().parse().unwrap_or(true)) + .or_else(|| { + if env::args().any(|a| a == TERMINAL_COLORS_ARG) { + Some(true) + } else if env::args().any(|a| a == NO_TERMINAL_COLORS_ARG) { + Some(false) + } else { + None + } + }) + .map_or(TerminalColorsArg::Default, TerminalColorsArg::UserSet) + .evaluate(); + + if terminal_colors { color_eyre::install()?; } - let mut args = iroha::Arguments::default(); + + let styling = Styling::new(terminal_colors); + if env::args().any(|a| HELP_ARG.contains(&a.as_str())) { print_help(&styling)?; return Ok(()); @@ -109,7 +129,7 @@ async fn main() -> Result<(), color_eyre::Report> { } let config = iroha::combine_configs(&args)?; - let telemetry = iroha_logger::init(&config.logger)?; + let logger = iroha_logger::init_global(&config.logger, terminal_colors)?; if !config.disable_panic_terminal_colors { // FIXME: it shouldn't be logged here; it is a part of configuration domain // this message can be very simply broken by the changes in the configuration @@ -140,7 +160,7 @@ async fn main() -> Result<(), color_eyre::Report> { }) .transpose()?; - iroha::Iroha::with_genesis(genesis, config, telemetry) + iroha::Iroha::with_genesis(genesis, config, logger) .await? .start() .await?; diff --git a/cli/src/style.rs b/cli/src/style.rs index dbf64b975e7..393ae591140 100644 --- a/cli/src/style.rs +++ b/cli/src/style.rs @@ -25,22 +25,14 @@ impl Default for Styling { } } -/// Determine if message colourisation is to be enabled -pub fn should_disable_color() -> bool { - supports_color::on(supports_color::Stream::Stdout).is_none() - || std::env::var("TERMINAL_COLORS") - .map(|s| !s.as_str().parse().unwrap_or(true)) - .unwrap_or(false) -} - impl Styling { #[must_use] /// Constructor - pub fn new() -> Self { - if should_disable_color() { - Self::no_color() - } else { + pub fn new(terminal_colors: bool) -> Self { + if terminal_colors { Self::default() + } else { + Self::no_color() } } diff --git a/cli/src/torii/mod.rs b/cli/src/torii/mod.rs index 9594362ab5d..a20f2c3f11a 100644 --- a/cli/src/torii/mod.rs +++ b/cli/src/torii/mod.rs @@ -10,7 +10,9 @@ use std::{ }; use futures::{stream::FuturesUnordered, StreamExt}; +use iroha_config::torii::Configuration as ToriiConfiguration; use iroha_core::{ + kiso::{Error as KisoError, KisoHandle}, kura::Kura, prelude::*, query::store::LiveQueryStoreHandle, @@ -18,6 +20,7 @@ use iroha_core::{ sumeragi::SumeragiHandle, EventsSender, }; +use iroha_primitives::addr::SocketAddr; use tokio::sync::Notify; use utils::*; use warp::{ @@ -33,13 +36,15 @@ mod routing; /// Main network handler and the only entrypoint of the Iroha. pub struct Torii { - iroha_cfg: super::Configuration, + kiso: KisoHandle, queue: Arc, events: EventsSender, notify_shutdown: Arc, sumeragi: SumeragiHandle, query_service: LiveQueryStoreHandle, kura: Arc, + transaction_max_content_length: u64, + address: SocketAddr, } /// Torii errors. @@ -53,13 +58,13 @@ pub enum Error { Config(#[source] eyre::Report), /// Failed to push into queue PushIntoQueue(#[from] Box), - /// Attempt to change configuration failed - ConfigurationReload(#[from] iroha_config::base::runtime_upgrades::ReloadError), #[cfg(feature = "telemetry")] /// Error while getting Prometheus metrics Prometheus(#[source] eyre::Report), /// Internal error while getting status StatusFailure(#[source] eyre::Report), + /// Failure caused by configuration subsystem + ConfigurationFailure(#[from] KisoError), /// Cannot find status segment by provided path StatusSegmentNotFound(#[source] eyre::Report), } @@ -82,7 +87,7 @@ impl Error { match self { Query(e) => Self::query_status_code(e), - AcceptTransaction(_) | ConfigurationReload(_) => StatusCode::BAD_REQUEST, + AcceptTransaction(_) => StatusCode::BAD_REQUEST, Config(_) | StatusSegmentNotFound(_) => StatusCode::NOT_FOUND, PushIntoQueue(err) => match **err { queue::Error::Full => StatusCode::INTERNAL_SERVER_ERROR, @@ -90,7 +95,9 @@ impl Error { _ => StatusCode::BAD_REQUEST, }, #[cfg(feature = "telemetry")] - Prometheus(_) | StatusFailure(_) => StatusCode::INTERNAL_SERVER_ERROR, + Prometheus(_) | StatusFailure(_) | ConfigurationFailure(_) => { + StatusCode::INTERNAL_SERVER_ERROR + } } } diff --git a/cli/src/torii/routing.rs b/cli/src/torii/routing.rs index 8326453d7af..6a9298974ce 100644 --- a/cli/src/torii/routing.rs +++ b/cli/src/torii/routing.rs @@ -7,12 +7,7 @@ use eyre::{eyre, WrapErr}; use futures::TryStreamExt; -use iroha_config::{ - base::proxy::Documented, - iroha::{Configuration, ConfigurationView}, - torii::uri, - GetConfiguration, PostConfiguration, -}; +use iroha_config::{client_api::ConfigurationDTO, torii::uri}; use iroha_core::{ query::{pagination::Paginate, store::LiveQueryStoreHandle}, smartcontracts::query::ValidQueryRequest, @@ -79,7 +74,7 @@ fn fetch_size() -> impl warp::Filter, sumeragi: SumeragiHandle, transaction: SignedTransaction, @@ -169,42 +164,18 @@ async fn handle_pending_transactions( } #[iroha_futures::telemetry_future] -async fn handle_get_configuration( - iroha_cfg: Configuration, - get_cfg: GetConfiguration, -) -> Result { - use GetConfiguration::*; - - match get_cfg { - Docs(field) => ::get_doc_recursive( - field.iter().map(AsRef::as_ref).collect::>(), - ) - .wrap_err("Failed to get docs {:?field}") - .and_then(|doc| serde_json::to_value(doc).wrap_err("Failed to serialize docs")), - // Cast to configuration view to hide private keys. - Value => serde_json::to_value(ConfigurationView::from(iroha_cfg)) - .wrap_err("Failed to serialize value"), - } - .map(|v| reply::json(&v)) - .map_err(Error::Config) +async fn handle_get_configuration(kiso: KisoHandle) -> Result { + let dto = kiso.get_dto().await?; + Ok(reply::json(&dto)) } #[iroha_futures::telemetry_future] async fn handle_post_configuration( - iroha_cfg: Configuration, - cfg: PostConfiguration, -) -> Result { - use iroha_config::base::runtime_upgrades::Reload; - use PostConfiguration::*; - - iroha_logger::debug!(?cfg); - match cfg { - LogLevel(level) => { - iroha_cfg.logger.max_log_level.reload(level)?; - } - }; - - Ok(reply::json(&true)) + kiso: KisoHandle, + value: ConfigurationDTO, +) -> Result { + kiso.update_with_dto(value).await?; + Ok(reply::with_status(reply::reply(), StatusCode::ACCEPTED)) } #[iroha_futures::telemetry_future] @@ -403,8 +374,9 @@ fn handle_status( impl Torii { /// Construct `Torii`. #[allow(clippy::too_many_arguments)] - pub fn from_configuration( - iroha_cfg: Configuration, + pub fn new( + kiso: KisoHandle, + config: &ToriiConfiguration, queue: Arc, events: EventsSender, notify_shutdown: Arc, @@ -413,13 +385,15 @@ impl Torii { kura: Arc, ) -> Self { Self { - iroha_cfg, + kiso, queue, events, notify_shutdown, sumeragi, query_service, kura, + address: config.api_url.clone(), + transaction_max_content_length: config.max_content_len.into(), } } @@ -437,12 +411,11 @@ impl Torii { .and(add_state!(self.queue, self.sumeragi,)) .and(paginate()), ) - .or(endpoint2( - handle_get_configuration, - warp::path(uri::CONFIGURATION) - .and(add_state!(self.iroha_cfg)) - .and(warp::body::json()), - )), + .or(warp::path(uri::CONFIGURATION) + .and(add_state!(self.kiso)) + .and_then(|kiso| async move { + Ok::<_, Infallible>(WarpResult(handle_get_configuration(kiso).await)) + })), ); let get_router_status = warp::path(uri::STATUS) @@ -474,11 +447,11 @@ impl Torii { let post_router = warp::post() .and( endpoint3( - handle_instructions, + handle_transaction, warp::path(uri::TRANSACTION) .and(add_state!(self.queue, self.sumeragi)) .and(warp::body::content_length_limit( - self.iroha_cfg.torii.max_content_len.into(), + self.transaction_max_content_length, )) .and(body::versioned()), ) @@ -491,7 +464,7 @@ impl Torii { .or(endpoint2( handle_post_configuration, warp::path(uri::CONFIGURATION) - .and(add_state!(self.iroha_cfg)) + .and(add_state!(self.kiso)) .and(warp::body::json()), )), ) @@ -549,10 +522,10 @@ impl Torii { /// # Errors /// Can fail due to listening to network or if http server fails fn start_api(self: Arc) -> eyre::Result>> { - let api_url = &self.iroha_cfg.torii.api_url; + let torii_address = &self.address; let mut handles = vec![]; - match api_url.to_socket_addrs() { + match torii_address.to_socket_addrs() { Ok(addrs) => { for addr in addrs { let torii = Arc::clone(&self); @@ -568,7 +541,7 @@ impl Torii { Ok(handles) } Err(error) => { - iroha_logger::error!(%api_url, %error, "API address configuration parse error"); + iroha_logger::error!(%torii_address, %error, "API address configuration parse error"); Err(eyre::Error::new(error)) } } diff --git a/client/benches/torii.rs b/client/benches/torii.rs index 5dc72359570..b8906f52504 100644 --- a/client/benches/torii.rs +++ b/client/benches/torii.rs @@ -8,7 +8,6 @@ use iroha_client::{ client::{asset, Client}, data_model::prelude::*, }; -use iroha_config::base::runtime_upgrades::Reload; use iroha_crypto::KeyPair; use iroha_genesis::{GenesisNetwork, RawGenesisBlockBuilder}; use iroha_primitives::unique_vec; @@ -40,15 +39,16 @@ fn query_requests(criterion: &mut Criterion) { .expect("genesis creation failed"); let builder = PeerBuilder::new() - .with_configuration(configuration.clone()) + .with_configuration(configuration) .with_into_genesis(genesis); rt.block_on(builder.start_with_peer(&mut peer)); - configuration - .logger - .max_log_level - .reload(iroha_client::data_model::Level::ERROR) - .expect("Should not fail"); + rt.block_on(async { + iroha_logger::test_logger() + .reload_level(iroha_client::data_model::Level::ERROR) + .await + .unwrap() + }); let mut group = criterion.benchmark_group("query-requests"); let domain_id: DomainId = "domain".parse().expect("Valid"); let create_domain = RegisterExpr::new(Domain::new(domain_id.clone())); diff --git a/client/benches/tps/oneshot.rs b/client/benches/tps/oneshot.rs index 6fd57cf00ba..99efceac8b2 100644 --- a/client/benches/tps/oneshot.rs +++ b/client/benches/tps/oneshot.rs @@ -20,7 +20,7 @@ fn main() { flush_guard = Some(flame_layer.flush_on_drop()); tracing_subscriber::registry().with(flame_layer).init(); - iroha_logger::disable_logger(); + iroha_logger::disable_global().expect("Logger should not be set yet"); } let config = utils::Config::from_path("benches/tps/config.json").expect("Failed to configure"); diff --git a/client/src/client.rs b/client/src/client.rs index 942e5444a8d..d4f7f383e30 100644 --- a/client/src/client.rs +++ b/client/src/client.rs @@ -13,14 +13,13 @@ use derive_more::{DebugCustom, Display}; use eyre::{eyre, Result, WrapErr}; use futures_util::StreamExt; use http_default::{AsyncWebSocketStream, WebSocketStream}; -use iroha_config::{client::Configuration, torii::uri, GetConfiguration, PostConfiguration}; +use iroha_config::{client::Configuration, client_api::ConfigurationDTO, torii::uri}; use iroha_crypto::{HashOf, KeyPair}; use iroha_logger::prelude::*; use iroha_telemetry::metrics::Status; use iroha_version::prelude::*; use parity_scale_codec::DecodeAll; use rand::Rng; -use serde::de::DeserializeOwned; use url::Url; use self::{blocks_api::AsyncBlockStream, events_api::AsyncEventStream}; @@ -1073,13 +1072,16 @@ impl Client { ) } - fn get_config(&self, get_config: &GetConfiguration) -> Result { + /// Get value of config on peer + /// + /// # Errors + /// Fails if sending request or decoding fails + pub fn get_config(&self) -> Result { let resp = DefaultRequestBuilder::new( HttpMethod::GET, self.torii_url.join(uri::CONFIGURATION).expect("Valid URI"), ) .header(http::header::CONTENT_TYPE, APPLICATION_JSON) - .body(serde_json::to_vec(get_config).wrap_err("Failed to serialize")?) .build()? .send()?; @@ -1097,9 +1099,8 @@ impl Client { /// /// # Errors /// If sending request or decoding fails - pub fn set_config(&self, post_config: PostConfiguration) -> Result { - let body = serde_json::to_vec(&post_config) - .wrap_err(format!("Failed to serialize {post_config:?}"))?; + pub fn set_config(&self, dto: ConfigurationDTO) -> Result<()> { + let body = serde_json::to_vec(&dto).wrap_err(format!("Failed to serialize {dto:?}"))?; let url = self.torii_url.join(uri::CONFIGURATION).expect("Valid URI"); let resp = DefaultRequestBuilder::new(HttpMethod::POST, url) .header(http::header::CONTENT_TYPE, APPLICATION_JSON) @@ -1107,34 +1108,15 @@ impl Client { .build()? .send()?; - if resp.status() != StatusCode::OK { + if resp.status() != StatusCode::ACCEPTED { return Err(eyre!( "Failed to post configuration with HTTP status: {}. {}", resp.status(), std::str::from_utf8(resp.body()).unwrap_or(""), )); - } - serde_json::from_slice(resp.body()) - .wrap_err(format!("Failed to decode body {:?}", resp.body())) - } - - /// Get documentation of some field on config - /// - /// # Errors - /// Fails if sending request or decoding fails - pub fn get_config_docs(&self, field: &[&str]) -> Result> { - let field = field.iter().copied().map(ToOwned::to_owned).collect(); - self.get_config(&GetConfiguration::Docs(field)) - .wrap_err("Failed to get docs for field") - } + }; - /// Get value of config on peer - /// - /// # Errors - /// Fails if sending request or decoding fails - pub fn get_config_value(&self) -> Result { - self.get_config(&GetConfiguration::Value) - .wrap_err("Failed to get configuration value") + Ok(()) } /// Gets network status seen from the peer diff --git a/client/tests/integration/config.rs b/client/tests/integration/config.rs index 7a6470a9087..54c7f4f596b 100644 --- a/client/tests/integration/config.rs +++ b/client/tests/integration/config.rs @@ -1,27 +1,41 @@ +use iroha_data_model::Level; use test_network::*; -use super::{Builder, Configuration, ConfigurationProxy}; - #[test] -fn get_config() { - // The underscored variables must not be dropped until end of closure. - let (_dont_drop, _dont_drop_either, test_client) = - ::new().with_port(10_685).start_with_runtime(); +fn config_endpoints() { + const NEW_LOG_LEVEL: Level = Level::ERROR; + + let (rt, peer, test_client) = ::new().with_port(10_685).start_with_runtime(); wait_for_genesis_committed(&vec![test_client.clone()], 0); - let field = test_client.get_config_docs(&["torii"]).unwrap().unwrap(); - assert!(field.contains("IROHA_TORII")); - - let test = Configuration::test(); - let cfg_proxy: ConfigurationProxy = - serde_json::from_value(test_client.get_config_value().unwrap()).unwrap(); - assert_eq!( - cfg_proxy.block_sync.unwrap().build().unwrap(), - test.block_sync - ); - assert_eq!(cfg_proxy.network.unwrap().build().unwrap(), test.network); - assert_eq!( - cfg_proxy.telemetry.unwrap().build().unwrap(), - *test.telemetry - ); + let init_log_level = rt.block_on(async move { + peer.iroha + .as_ref() + .unwrap() + .kiso + .get_dto() + .await + .unwrap() + .logger + .level + }); + + // Just to be sure this test suite is not useless + assert_ne!(init_log_level, NEW_LOG_LEVEL); + + // Retrieving through API + let mut dto = test_client.get_config().expect("Client can always get it"); + assert_eq!(dto.logger.level, init_log_level); + + // Updating the log level + dto.logger.level = NEW_LOG_LEVEL; + test_client.set_config(dto).expect("New config is valid"); + + // Checking the updated value + dto = test_client.get_config().unwrap(); + assert_eq!(dto.logger.level, NEW_LOG_LEVEL); + + // Restoring value + dto.logger.level = init_log_level; + test_client.set_config(dto).expect("Also valid DTO"); } diff --git a/client/tests/integration/unstable_network.rs b/client/tests/integration/unstable_network.rs index d0df5179a2c..da962f4728d 100644 --- a/client/tests/integration/unstable_network.rs +++ b/client/tests/integration/unstable_network.rs @@ -55,7 +55,7 @@ fn unstable_network( let (network, iroha_client) = rt.block_on(async { let mut configuration = Configuration::test(); configuration.sumeragi.max_transactions_in_block = MAX_TRANSACTIONS_IN_BLOCK; - configuration.logger.max_log_level = Level::INFO.into(); + configuration.logger.level = Level::INFO; #[cfg(debug_assertions)] { configuration.sumeragi.debug_force_soft_fork = force_soft_fork; diff --git a/config/Cargo.toml b/config/Cargo.toml index e4caf3da84d..2f548f60946 100644 --- a/config/Cargo.toml +++ b/config/Cargo.toml @@ -35,6 +35,7 @@ once_cell = { workspace = true } [dev-dependencies] proptest = "1.3.1" stacker = "0.1.15" +expect-test = { workspace = true } [features] tokio-console = [] diff --git a/config/base/src/lib.rs b/config/base/src/lib.rs index bee2b692efc..d8f40d64c21 100644 --- a/config/base/src/lib.rs +++ b/config/base/src/lib.rs @@ -418,8 +418,6 @@ pub mod derive { } } -pub mod runtime_upgrades; - pub mod view { //! Module for view related traits and structs diff --git a/config/base/src/runtime_upgrades.rs b/config/base/src/runtime_upgrades.rs deleted file mode 100644 index 95b69e0e13d..00000000000 --- a/config/base/src/runtime_upgrades.rs +++ /dev/null @@ -1,362 +0,0 @@ -//! Module handling runtime upgrade logic. -pub use serde::{Deserialize, Serialize}; -use thiserror::*; - -type Result = core::result::Result; - -/// Error which occurs when reloading a configuration fails. -#[derive(Clone, Copy, Debug, Error)] -pub enum ReloadError { - /// The resource held by the handle was poisoned by a panic in - /// another thread. - #[error("Resource poisoned.")] - Poisoned, - /// The resource held by the handle was dropped. - #[error("Resource dropped.")] - Dropped, - /// If the reload handle wasn't properly initialized (using - /// [`handle::Singleton::set`]), there's nothing to reload with. - #[error("Cannot reload an uninitialized handle.")] - NotInitialized, - /// Error not specified by the implementer of the [`Reload`] - /// traits. Use as last resort. - #[error("Unspecified reload failure.")] - Other, -} - -/// The field needs to be mutably borrowed to be reloaded. -pub trait ReloadMut { - /// Reload `self` using provided `item`. - /// - /// # Errors - /// Fails with an appropriate variant of - /// [`ReloadError`]. [`ReloadError::Other`] can be used as a - /// **temporary** placeholder. - fn reload(&mut self, item: T) -> Result<()>; -} - -/// The field can be immutably borrowed and reloaded. -pub trait Reload { - /// Reload `self` using provided `item`. - /// - /// # Errors - /// Fails with an appropriate variant of [`ReloadError`]. - /// [`ReloadError::Other`] can be used as a **temporary** placeholder. - fn reload(&self, item: T) -> Result<()>; -} - -/// Contains [`handle`] types: opaque wrappers around a reloadable -/// configuration, used to embed reloading functionality into -/// various [`iroha_config_derive::Documented`] types. -/// -/// # Architecture. -/// -/// ## Desired behaviour -/// -/// Given a value of type (`` in this module), need to -/// -/// - Embed a handle into the configuration options, replacing a Value -/// of type with a handle. -/// -/// - The handle gets (de)serialized as if it were ``: no extra -/// fields, no extra initialisation. -/// -/// - The configuration as a whole is immutable. This is to ensure -/// that you don't accidentally re-assign the handle. -/// -/// - The last object that got instantiated from the configuration -/// file is modified when we call [`Reload::reload`]. -/// -/// - The value used to [`Reload::reload`] the value, must be reflected in the -/// configuration. -/// -/// ## Additional considerations -/// -/// - The handle might have internal mutable state, and be passed -/// along several threads in both a `sync` and `async` context. -/// -/// - The handle's state can be a global mutable static value behind a -/// wrapper. -/// -/// - The handle is almost never read. All interactions with the -/// handle are writes. -/// -/// - The handle can retain a reference to different types, depending -/// on the configuration options. The types might not all be known -/// ahead of time, or be impractically long (both true for -/// `tracting_subscriber::reload::Handle`). -/// -/// # Usage -/// -/// Embed a `SyncValue>`, in your -/// configuration options. When using the configuration to initialise -/// components, call [`handle::SyncValue::set_handle`], on a value that -/// implements [`ReloadMut`] (which you defined earlier). Call -/// [`handle::SyncValue::reload`] to change the configuration at run-time. -/// -/// If the type stored in `H` is a single simple type, it is -/// recommended to use a custom tuple `struct`, and `impl` -/// [`Reload`] for it. -/// -/// If the types are too varied, or generic in arguments that change -/// depending on run-time values, (as in -/// e.g. `tracing_subscriber::reload::Handle`), it is recommended to -/// instead use the provided opaque wrapper [`handle::Singleton`]. -/// -/// **NOTE** you shouldn't normally need to use either -/// [`handle::Singleton`] or [`handle::Value`] directly. -/// -/// # Examples -/// -/// ```ignore -/// use iroha_config_derive::Documented; -/// use serde::{Deserialize, Serialize}; -/// use iroha_config::runtime_upgrades::{handle, Reload, ReloadMut, ReloadError}; -/// use tracing::Level; -/// use tracing_subscriber::{reload::Handle, filter::LevelFilter}; -/// use std::fmt::Debug; -/// -/// struct Logger; -/// -/// #[derive(Clone, Deserialize, Serialize, Debug, Documented)] -/// struct Configuration { -/// pub max_log_level: handle::SyncValue>, -/// pub log_file_path: Option, -/// } -/// -/// fn init(config: &Configuration) -> Logger { -/// let level = config.max_log_level.value(); -/// let level_filter = tracing_subscriber::filter::LevelFilter::from_level(level); -/// let (filter, handle) = reload::Layer::new(level_filter); -/// config.max_log_level.set_handle(iroha_config::logger::ReloadHandle(handle)).unwrap(); -/// } -/// -/// impl ReloadMut for Handle { -/// fn reload(&mut self, level: Level) -> Result<(), ReloadError> { -/// let level_filter = LevelFilter::from_level(level); -/// Handle::reload(self, level_filter).map_err(|_todo| ReloadError::Dropped) -/// } -/// } -/// ``` - -pub mod handle { - use std::{ - fmt::{Debug, Formatter}, - sync::Arc, - }; - - use crossbeam::atomic::AtomicCell; - use parking_lot::Mutex; - use serde::{Deserialize, Serialize}; - - use super::{Reload, ReloadError, ReloadMut, Result}; - // ----------------------------------------------------------------- - - /// An opaque handle for arbitrary [`super::ReloadMut`], useful - /// when it is either impossible or impractical to specify a - /// single `enum` or generic type. You shouldn't embed this into - /// your configuration, and instead use [`SyncValue`]. - #[derive(Clone, Serialize, Deserialize)] - pub struct Singleton { - #[serde(skip)] - inner: Arc + Send + Sync>>>>, - } - - impl Default for Singleton { - fn default() -> Self { - Self { - inner: Arc::new(Mutex::new(None)), - } - } - } - - impl Singleton { - /// Set and/or initialize the [`Self`] to a non-empty value. - /// Reloading before calling this `fn` should cause - /// [`ReloadError::NotInitialized`]. - /// - /// # Errors - /// [`ReloadError::Poisoned`] When the [`Mutex`] storing the reload handle is poisoned. - pub fn set(&self, handle: impl ReloadMut + Send + Sync + 'static) { - *self.inner.lock() = Some(Box::new(handle)); - } - } - - impl Debug for Singleton { - fn fmt(&self, f: &mut Formatter<'_>) -> core::fmt::Result { - f.debug_struct("Handle with side effect").finish() - } - } - - impl Reload for Singleton { - fn reload(&self, item: T) -> Result<()> { - match &mut *self.inner.lock() { - Some(handle) => { - handle.reload(item)?; - Ok(()) - } - None => Err(ReloadError::NotInitialized), - } - } - } - - // --------------------------------------------------------------- - - /// A run-time reloadable configuration option with - /// value-semantics. This means that reloading a [`Value`] only - /// affects the [`Value`] itself. It's useful when you want to - /// keep a configuration immutable, but retain thread-safe - /// interior mutability, which is preferable to making the entire - /// configuration `mut`. - /// - /// # Examples - /// - /// ```ignore - /// use serde::{Serialize, Deserialize}; - /// use iroha_config_base::runtime_upgrades::{handle::Value, Reload}; - /// - /// #[derive(iroha_config_base::derive::Combine, Serialize, Deserialize)] - /// pub struct Config { option: Value } - /// - /// fn main() { - /// let c = Config { option: true.into() }; - /// - /// c.option.reload(false); - /// } - /// ``` - /// - /// If you wish to perform validation on the value, consider using - /// a thin wrapper `tuple` struct. - /// - #[derive(Debug)] - pub struct Value(pub AtomicCell); - - impl Clone for Value { - fn clone(&self) -> Self { - Self(AtomicCell::new(self.0.load())) - } - } - - impl From for Value { - fn from(value: T) -> Self { - Self(AtomicCell::new(value)) - } - } - - impl Default for Value { - fn default() -> Self { - Self(AtomicCell::default()) - } - } - - impl Reload for Value { - fn reload(&self, item: T) -> Result<()> { - self.0.swap(item); - Ok(()) - } - } - - impl<'de, T: Deserialize<'de> + Copy + Clone> Deserialize<'de> for Value { - fn deserialize(deserializer: D) -> Result - where - D: serde::Deserializer<'de>, - { - Ok(Self(AtomicCell::new(T::deserialize(deserializer)?))) - } - } - - impl Serialize for Value { - fn serialize(&self, serializer: S) -> Result - where - S: serde::Serializer, - { - (self.0.load()).serialize(serializer) - } - } - - // ----------------------------------------------------------------------- - - /// Structure that encapsulates a configuration value as well as a - /// handle for reloading other parts of the program. This is the - /// `struct` that you want to use 99% of the time. - /// - /// It handles automatic synchronisation of the current value from - /// the reload, as well as proper (de)serialization: namely the - /// handle doesn't pollute your configuration options. - pub struct SyncValue>(Value, H); - - impl> SyncValue { - /// Getter for the wrapped [`Value`] - pub fn value(&self) -> T { - self.0 .0.load() - } - } - - impl SyncValue> { - /// Set the handle - /// - /// # Errors - /// If [`Singleton::set`] fails. - pub fn set_handle(&self, other: impl ReloadMut + Send + Sync + 'static) { - self.1.set(other); - } - } - - impl + Clone> Clone for SyncValue { - fn clone(&self) -> Self { - Self(self.0.clone(), self.1.clone()) - } - } - - impl + Debug> Debug for SyncValue { - fn fmt(&self, f: &mut Formatter<'_>) -> core::fmt::Result { - f.debug_tuple("Reconfigure") - .field(&self.0) - .field(&self.1) - .finish() - } - } - - impl Default for SyncValue> - where - T: Default + Clone + Copy + Send + Sync + Debug, - { - fn default() -> Self { - Self(Value::default(), Singleton::default()) - } - } - - impl + Default> From for SyncValue { - fn from(value: T) -> Self { - Self(Value(AtomicCell::new(value)), H::default()) - } - } - - impl> Serialize for SyncValue { - fn serialize(&self, serializer: S) -> Result - where - S: serde::Serializer, - { - // We only want the actual (simple) value to be part of the serializing - self.0.serialize(serializer) - } - } - - impl<'de, T: Deserialize<'de> + Copy + Clone, H: Reload + Default> Deserialize<'de> - for SyncValue - { - fn deserialize(deserializer: D) -> Result - where - D: serde::Deserializer<'de>, - { - Ok(Self(Value::::deserialize(deserializer)?, H::default())) - } - } - - impl> Reload for SyncValue { - fn reload(&self, item: T) -> Result<()> { - self.1.reload(item)?; - self.0.reload(item) - } - } -} diff --git a/config/iroha_test_config.json b/config/iroha_test_config.json index 80f61607c38..6ebbf417a26 100644 --- a/config/iroha_test_config.json +++ b/config/iroha_test_config.json @@ -57,11 +57,8 @@ "FUTURE_THRESHOLD_MS": 1000 }, "LOGGER": { - "MAX_LOG_LEVEL": "INFO", - "TELEMETRY_CAPACITY": 1000, - "COMPACT_MODE": false, - "LOG_FILE_PATH": null, - "TERMINAL_COLORS": true, + "LEVEL": "INFO", + "FORMAT": "full", "TOKIO_CONSOLE_ADDR": "127.0.0.1:5555" }, "GENESIS": { diff --git a/config/src/client_api.rs b/config/src/client_api.rs new file mode 100644 index 00000000000..030edb8523a --- /dev/null +++ b/config/src/client_api.rs @@ -0,0 +1,70 @@ +//! Functionality related to working with the configuration through client API. +//! +//! Intended usage: +//! +//! - Create [`ConfigurationDTO`] from [`crate::iroha::Configuration`] and serialize it for the client +//! - Deserialize [`ConfigurationDTO`] from the client and use [`ConfigurationDTO::apply_update()`] to update the configuration +// TODO: Currently logic here is not generalised and handles only `logger.level` parameter. In future, when +// other parts of configuration are refactored and there is a solid foundation e.g. as a general +// configuration-related crate, this part should be re-written in a clean way. +// Track configuration refactoring here: https://github.com/hyperledger/iroha/issues/2585 + +use iroha_data_model::Level; +use serde::{Deserialize, Serialize}; + +use super::{iroha::Configuration as BaseConfiguration, logger::Configuration as BaseLogger}; + +/// Subset of [`super::iroha`] configuration. +#[derive(Debug, Serialize, Deserialize, Clone, Copy)] +pub struct ConfigurationDTO { + #[allow(missing_docs)] + pub logger: Logger, +} + +impl From<&'_ BaseConfiguration> for ConfigurationDTO { + fn from(value: &'_ BaseConfiguration) -> Self { + Self { + logger: value.logger.as_ref().into(), + } + } +} + +/// Subset of [`super::logger`] configuration. +#[derive(Debug, Serialize, Deserialize, Clone, Copy)] +pub struct Logger { + #[allow(missing_docs)] + pub level: Level, +} + +impl From<&'_ BaseLogger> for Logger { + fn from(value: &'_ BaseLogger) -> Self { + Self { level: value.level } + } +} + +#[cfg(test)] +mod test { + use super::*; + + #[test] + fn snapshot_serialized_form() { + let value = ConfigurationDTO { + logger: Logger { + level: Level::TRACE, + }, + }; + + let actual = serde_json::to_string_pretty(&value).expect("The value is a valid JSON"); + + // NOTE: whenever this is updated, make sure to update the documentation accordingly: + // https://hyperledger.github.io/iroha-2-docs/reference/torii-endpoints.html + // -> Configuration endpoints + let expected = expect_test::expect![[r#" + { + "logger": { + "level": "TRACE" + } + }"#]]; + expected.assert_eq(&actual); + } +} diff --git a/config/src/iroha.rs b/config/src/iroha.rs index 6ed054b6593..0ade3128196 100644 --- a/config/src/iroha.rs +++ b/config/src/iroha.rs @@ -262,9 +262,9 @@ mod tests { #[test] fn example_json_proxy_builds() { - ConfigurationProxy::from_path(CONFIGURATION_PATH).build().unwrap_or_else(|_| panic!("`ConfigurationProxy` specified in {CONFIGURATION_PATH} \ + ConfigurationProxy::from_path(CONFIGURATION_PATH).build().unwrap_or_else(|err| panic!("`ConfigurationProxy` specified in {CONFIGURATION_PATH} \ failed to build. This probably means that some of the fields there were not updated \ - properly with new changes.")); + properly with new changes. Error: {err}")); } #[test] diff --git a/config/src/kura.rs b/config/src/kura.rs index 9eaed6f19d3..03c0cb4fb74 100644 --- a/config/src/kura.rs +++ b/config/src/kura.rs @@ -1,13 +1,10 @@ //! Module for kura-related configuration and structs -use std::{num::NonZeroU64, path::Path}; -use eyre::{eyre, Result}; +use eyre::Result; use iroha_config_base::derive::{Documented, Proxy}; use serde::{Deserialize, Serialize}; -const DEFAULT_BLOCKS_PER_STORAGE_FILE: u64 = 1000_u64; const DEFAULT_BLOCK_STORE_PATH: &str = "./storage"; -const DEFAULT_ACTOR_CHANNEL_CAPACITY: u32 = 100; /// `Kura` configuration. #[derive(Clone, Deserialize, Serialize, Debug, Documented, Proxy, PartialEq, Eq)] @@ -18,10 +15,6 @@ pub struct Configuration { pub init_mode: Mode, /// Path to the existing block store folder or path to create new folder. pub block_store_path: String, - /// Maximum number of blocks to write into a single storage file. - pub blocks_per_storage_file: NonZeroU64, - /// Default buffer capacity of actor's MPSC channel. - pub actor_channel_capacity: u32, /// Whether or not new blocks be outputted to a file called blocks.json. pub debug_output_new_blocks: bool, } @@ -30,31 +23,12 @@ impl Default for ConfigurationProxy { fn default() -> Self { Self { init_mode: Some(Mode::default()), - block_store_path: Some(DEFAULT_BLOCK_STORE_PATH.to_owned()), - blocks_per_storage_file: Some( - NonZeroU64::new(DEFAULT_BLOCKS_PER_STORAGE_FILE) - .expect("BLOCKS_PER_STORAGE cannot be set to a non-positive value."), - ), - actor_channel_capacity: Some(DEFAULT_ACTOR_CHANNEL_CAPACITY), + block_store_path: Some(DEFAULT_BLOCK_STORE_PATH.into()), debug_output_new_blocks: Some(false), } } } -impl Configuration { - /// Set `block_store_path` configuration parameter. Will overwrite the existing one. - /// - /// # Errors - /// Fails if the path is not valid - pub fn block_store_path(&mut self, path: &Path) -> Result<()> { - self.block_store_path = path - .to_str() - .ok_or_else(|| eyre!("Failed to yield slice from path"))? - .to_owned(); - Ok(()) - } -} - /// Kura initialization mode. #[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Deserialize, Serialize)] #[serde(rename_all = "snake_case")] @@ -77,12 +51,10 @@ pub mod tests { ( init_mode in prop::option::of(Just(Mode::default())), block_store_path in prop::option::of(Just(DEFAULT_BLOCK_STORE_PATH.into())), - blocks_per_storage_file in prop::option::of(Just(NonZeroU64::new(DEFAULT_BLOCKS_PER_STORAGE_FILE).expect("Cannot be set to a negative value"))), - actor_channel_capacity in prop::option::of(Just(DEFAULT_ACTOR_CHANNEL_CAPACITY)), debug_output_new_blocks in prop::option::of(Just(false)) ) -> ConfigurationProxy { - ConfigurationProxy { init_mode, block_store_path, blocks_per_storage_file, actor_channel_capacity, debug_output_new_blocks } + ConfigurationProxy { init_mode, block_store_path, debug_output_new_blocks } } } } diff --git a/config/src/lib.rs b/config/src/lib.rs index 6e80c5e1c88..423e5a8dd19 100644 --- a/config/src/lib.rs +++ b/config/src/lib.rs @@ -1,9 +1,9 @@ //! Aggregate configuration for different Iroha modules. pub use iroha_config_base as base; -use serde::{Deserialize, Serialize}; pub mod block_sync; pub mod client; +pub mod client_api; pub mod genesis; pub mod iroha; pub mod kura; @@ -18,35 +18,3 @@ pub mod telemetry; pub mod torii; pub mod wasm; pub mod wsv; - -/// Json config for getting configuration -#[derive(Clone, Debug, Deserialize, Serialize)] -pub enum GetConfiguration { - /// Getting docs of specific field - /// - /// Top-level fields must be enclosed in an array (of strings). This array - /// provides the fully qualified path to the fields. - /// - /// # Examples - /// - /// To get the top-level configuration docs for `iroha_core::Torii` - /// `curl -X GET -H 'content-type: application/json' http://127.0.0.1:8080/configuration -d '{"Docs" : ["torii"]} ' -i` - /// - /// To get the documentation on the [`Logger::config::Configuration.max_log_level`] - /// `curl -X GET -H 'content-type: application/json' http://127.0.0.1:8080/configuration -d '{"Docs" : ["logger", "max_log_level"]}' -i` - Docs(Vec), - /// Get the original Value of the full configuration. - Value, -} - -/// Message acceptable for `POST` requests to the configuration endpoint. -#[derive(Clone, Debug, Deserialize, Serialize, Copy)] -pub enum PostConfiguration { - /// Change the maximum logging level of logger. - /// - /// # Examples - /// - /// To silence all logging events that aren't `ERROR`s - /// `curl -X POST -H 'content-type: application/json' http://127.0.0.1:8080/configuration -d '{"LogLevel": "ERROR"}' -i` - LogLevel(iroha_data_model::Level), -} diff --git a/config/src/logger.rs b/config/src/logger.rs index bee27fda3df..d3c8e79472a 100644 --- a/config/src/logger.rs +++ b/config/src/logger.rs @@ -2,21 +2,14 @@ //! configuration, as well as run-time reloading of the log-level. use core::fmt::Debug; -use derive_more::{Deref, DerefMut, From}; -use iroha_config_base::{ - derive::{Documented, Proxy}, - runtime_upgrades::{handle, ReloadError, ReloadMut}, -}; -use iroha_data_model::Level; +use iroha_config_base::derive::{Documented, Proxy}; +pub use iroha_data_model::Level; +#[cfg(feature = "tokio-console")] +use iroha_primitives::addr::{socket_addr, SocketAddr}; use serde::{Deserialize, Serialize}; -use tracing::Subscriber; -use tracing_subscriber::{filter::LevelFilter, reload::Handle}; -const TELEMETRY_CAPACITY: u32 = 1000; -const DEFAULT_COMPACT_MODE: bool = false; -const DEFAULT_TERMINAL_COLORS: bool = true; #[cfg(feature = "tokio-console")] -const DEFAULT_TOKIO_CONSOLE_ADDR: &str = "127.0.0.1:5555"; +const DEFAULT_TOKIO_CONSOLE_ADDR: SocketAddr = socket_addr!(127.0.0.1:5555); /// Convert [`Level`] into [`tracing::Level`] pub fn into_tracing_level(level: Level) -> tracing::Level { @@ -29,77 +22,50 @@ pub fn into_tracing_level(level: Level) -> tracing::Level { } } -/// Wrapper for [`Handle`] to implement [`ReloadMut`] -#[derive(From)] -pub struct ReloadHandle(pub Handle); - -impl ReloadMut for ReloadHandle { - fn reload(&mut self, level: Level) -> Result<(), ReloadError> { - let level_filter = - tracing_subscriber::filter::LevelFilter::from_level(into_tracing_level(level)); - - Handle::reload(&self.0, level_filter).map_err(|err| { - if err.is_dropped() { - ReloadError::Dropped - } else { - ReloadError::Poisoned - } - }) - } -} - -/// Wrapper around [`Level`] for runtime upgrades. -#[derive(Debug, Clone, Default, Deref, DerefMut, Deserialize, Serialize)] -#[repr(transparent)] -#[serde(transparent)] -pub struct SyncLevel(handle::SyncValue>); - -impl From for SyncLevel { - fn from(level: Level) -> Self { - Self(level.into()) - } -} - -impl PartialEq for SyncLevel { - fn eq(&self, other: &Self) -> bool { - self.0.value() == other.0.value() - } -} - -impl Eq for SyncLevel {} - /// 'Logger' configuration. #[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize, Proxy, Documented)] #[serde(rename_all = "UPPERCASE")] +#[config(env_prefix = "LOG_")] +// `tokio_console_addr` is not `Copy`, but warning appears without `tokio-console` feature +#[allow(missing_copy_implementations)] pub struct Configuration { - /// Maximum log level + /// Level of logging verbosity #[config(serde_as_str)] - pub max_log_level: SyncLevel, - /// Capacity (or batch size) for telemetry channel - pub telemetry_capacity: u32, - /// Compact mode (no spans from telemetry) - pub compact_mode: bool, - /// If provided, logs will be copied to said file in the - /// format readable by [bunyan](https://lib.rs/crates/bunyan) - #[config(serde_as_str)] - pub log_file_path: Option, - /// Enable ANSI terminal colors for formatted output. - pub terminal_colors: bool, + pub level: Level, + /// Output format + pub format: Format, #[cfg(feature = "tokio-console")] /// Address of tokio console (only available under "tokio-console" feature) - pub tokio_console_addr: String, + pub tokio_console_addr: SocketAddr, +} + +/// Reflects formatters in [`tracing_subscriber::fmt::format`] +#[derive(Debug, Copy, Clone, Eq, PartialEq, Deserialize, Serialize)] +#[serde(rename_all = "lowercase")] +pub enum Format { + /// See [`tracing_subscriber::fmt::format::Full`] + Full, + /// See [`tracing_subscriber::fmt::format::Compact`] + Compact, + /// See [`tracing_subscriber::fmt::format::Pretty`] + Pretty, + /// See [`tracing_subscriber::fmt::format::Json`] + Json, +} + +impl Default for Format { + fn default() -> Self { + Self::Full + } } impl Default for ConfigurationProxy { fn default() -> Self { Self { - max_log_level: Some(SyncLevel::default()), - telemetry_capacity: Some(TELEMETRY_CAPACITY), - compact_mode: Some(DEFAULT_COMPACT_MODE), - log_file_path: Some(None), - terminal_colors: Some(DEFAULT_TERMINAL_COLORS), + level: Some(Level::default()), + format: Some(Format::default()), #[cfg(feature = "tokio-console")] - tokio_console_addr: Some(DEFAULT_TOKIO_CONSOLE_ADDR.into()), + tokio_console_addr: Some(DEFAULT_TOKIO_CONSOLE_ADDR), } } } @@ -113,22 +79,23 @@ pub mod tests { #[must_use = "strategies do nothing unless used"] pub fn arb_proxy() -> impl proptest::strategy::Strategy { let strat = ( - (prop::option::of(Just(SyncLevel::default()))), - (prop::option::of(Just(TELEMETRY_CAPACITY))), - (prop::option::of(Just(DEFAULT_COMPACT_MODE))), - (prop::option::of(Just(None))), - (prop::option::of(Just(DEFAULT_TERMINAL_COLORS))), + (prop::option::of(Just(Level::default()))), + (prop::option::of(Just(Format::default()))), #[cfg(feature = "tokio-console")] - (prop::option::of(Just(DEFAULT_TOKIO_CONSOLE_ADDR.to_string()))), + (prop::option::of(Just(DEFAULT_TOKIO_CONSOLE_ADDR))), ); proptest::strategy::Strategy::prop_map(strat, move |strat| ConfigurationProxy { - max_log_level: strat.0, - telemetry_capacity: strat.1, - compact_mode: strat.2, - log_file_path: strat.3, - terminal_colors: strat.4, + level: strat.0, + format: strat.1, #[cfg(feature = "tokio-console")] - tokio_console_addr: strat.5, + tokio_console_addr: strat.2, }) } + + #[test] + fn serialize_pretty_format_in_lowercase() { + let value = Format::Pretty; + let actual = serde_json::to_string(&value).unwrap(); + assert_eq!("\"pretty\"", actual); + } } diff --git a/config/src/telemetry.rs b/config/src/telemetry.rs index d347df8b050..3b04f79c483 100644 --- a/config/src/telemetry.rs +++ b/config/src/telemetry.rs @@ -1,4 +1,6 @@ //! Module for telemetry-related configuration and structs. +use std::path::PathBuf; + use iroha_config_base::derive::{Documented, Proxy}; use serde::{Deserialize, Serialize}; use url::Url; @@ -20,7 +22,57 @@ pub struct Configuration { pub max_retry_delay_exponent: u8, /// The filepath that to write dev-telemetry to #[config(serde_as_str)] - pub file: Option, + pub file: Option, +} + +/// Complete configuration needed to start regular telemetry. +pub struct RegularTelemetryConfig { + #[allow(missing_docs)] + pub name: String, + #[allow(missing_docs)] + pub url: Url, + #[allow(missing_docs)] + pub min_retry_period: u64, + #[allow(missing_docs)] + pub max_retry_delay_exponent: u8, +} + +/// Complete configuration needed to start dev telemetry. +pub struct DevTelemetryConfig { + #[allow(missing_docs)] + pub file: PathBuf, +} + +impl Configuration { + /// Parses user-provided configuration into stronger typed structures + /// + /// Should be refactored with [#3500](https://github.com/hyperledger/iroha/issues/3500) + pub fn parse(&self) -> (Option, Option) { + let Self { + ref name, + ref url, + max_retry_delay_exponent, + min_retry_period, + ref file, + } = *self; + + let regular = if let (Some(name), Some(url)) = (name, url) { + Some(RegularTelemetryConfig { + name: name.clone(), + url: url.clone(), + max_retry_delay_exponent, + min_retry_period, + }) + } else { + None + }; + + let dev = file + .as_ref() + .map(|file| DevTelemetryConfig { file: file.clone() }); + + (regular, dev) + } } impl Default for ConfigurationProxy { diff --git a/configs/peer/config.json b/configs/peer/config.json index 51cc9c5a45a..11d5b354ce8 100644 --- a/configs/peer/config.json +++ b/configs/peer/config.json @@ -5,8 +5,6 @@ "KURA": { "INIT_MODE": "strict", "BLOCK_STORE_PATH": "./storage", - "BLOCKS_PER_STORAGE_FILE": 1000, - "ACTOR_CHANNEL_CAPACITY": 100, "DEBUG_OUTPUT_NEW_BLOCKS": false }, "SUMERAGI": { @@ -38,11 +36,8 @@ "FUTURE_THRESHOLD_MS": 1000 }, "LOGGER": { - "MAX_LOG_LEVEL": "INFO", - "TELEMETRY_CAPACITY": 1000, - "COMPACT_MODE": false, - "LOG_FILE_PATH": null, - "TERMINAL_COLORS": true + "LEVEL": "INFO", + "FORMAT": "full" }, "GENESIS": { "ACCOUNT_PUBLIC_KEY": null, diff --git a/core/benches/blocks/apply_blocks_oneshot.rs b/core/benches/blocks/apply_blocks_oneshot.rs index 4c8bdd6e389..f16a5bf5e57 100644 --- a/core/benches/blocks/apply_blocks_oneshot.rs +++ b/core/benches/blocks/apply_blocks_oneshot.rs @@ -8,23 +8,10 @@ mod apply_blocks; use apply_blocks::WsvApplyBlocks; -use iroha_config::base::proxy::Builder; -use iroha_data_model::Level; -use iroha_logger::{Configuration, ConfigurationProxy}; #[tokio::main] async fn main() { - let log_config = Configuration { - max_log_level: Level::INFO.into(), - compact_mode: false, - ..ConfigurationProxy::default() - .build() - .expect("Default logger config should always build") - }; - // Can't use logger because it's failed to initialize. - if let Err(err) = iroha_logger::init(&log_config) { - eprintln!("Failed to initialize logger: {err}"); - } + iroha_logger::test_logger(); iroha_logger::info!("Starting..."); let bench = WsvApplyBlocks::setup().expect("Failed to setup benchmark"); WsvApplyBlocks::measure(&bench).expect("Failed to execute benchmark"); diff --git a/core/benches/blocks/validate_blocks_oneshot.rs b/core/benches/blocks/validate_blocks_oneshot.rs index bcdeb20a519..403adbd0a22 100644 --- a/core/benches/blocks/validate_blocks_oneshot.rs +++ b/core/benches/blocks/validate_blocks_oneshot.rs @@ -7,23 +7,10 @@ mod validate_blocks; -use iroha_config::base::proxy::Builder; -use iroha_data_model::Level; -use iroha_logger::{Configuration, ConfigurationProxy}; use validate_blocks::WsvValidateBlocks; fn main() { - let log_config = Configuration { - max_log_level: Level::INFO.into(), - compact_mode: false, - ..ConfigurationProxy::default() - .build() - .expect("Default logger config should always build") - }; - // Can't use logger because it's failed to initialize. - if let Err(err) = iroha_logger::init(&log_config) { - eprintln!("Failed to initialize logger: {err}"); - } + iroha_logger::test_logger(); iroha_logger::info!("Starting..."); let bench = WsvValidateBlocks::setup().expect("Failed to setup benchmark"); WsvValidateBlocks::measure(bench).expect("Failed to execute bnechmark"); diff --git a/core/benches/kura.rs b/core/benches/kura.rs index c0371201191..279f8d97528 100644 --- a/core/benches/kura.rs +++ b/core/benches/kura.rs @@ -4,6 +4,7 @@ use std::str::FromStr as _; use byte_unit::Byte; use criterion::{criterion_group, criterion_main, Criterion}; +use iroha_config::kura::Configuration; use iroha_core::{ block::*, kura::{BlockStore, LockStatus}, @@ -39,8 +40,12 @@ async fn measure_block_size_for_n_executors(n_executors: u32) { let tx = AcceptedTransaction::accept(tx, &transaction_limits) .expect("Failed to accept Transaction."); let dir = tempfile::tempdir().expect("Could not create tempfile."); - let kura = - iroha_core::kura::Kura::new(iroha_config::kura::Mode::Strict, dir.path(), false).unwrap(); + let cfg = Configuration { + init_mode: iroha_config::kura::Mode::Strict, + debug_output_new_blocks: false, + block_store_path: dir.path().to_str().unwrap().into(), + }; + let kura = iroha_core::kura::Kura::new(&cfg).unwrap(); let _thread_handle = iroha_core::kura::Kura::start(kura.clone()); let query_handle = LiveQueryStore::test().start(); diff --git a/core/src/kiso.rs b/core/src/kiso.rs new file mode 100644 index 00000000000..464b01acb0e --- /dev/null +++ b/core/src/kiso.rs @@ -0,0 +1,206 @@ +//! Actor responsible for configuration state and its dynamic updates. +//! +//! Currently the API exposed by [`KisoHandle`] works only with [`ConfigurationDTO`], because +//! no any part of Iroha is interested in the whole state. However, the API could be extended +//! in future. +//! +//! Updates mechanism is implemented via subscriptions to [`tokio::sync::watch`] channels. For now, +//! only `logger.level` field is dynamic, which might be tracked with [`KisoHandle::subscribe_on_log_level()`]. + +use eyre::Result; +use iroha_config::{ + client_api::{ConfigurationDTO, Logger as LoggerDTO}, + iroha::Configuration, +}; +use iroha_logger::Level; +use tokio::sync::{mpsc, oneshot, watch}; + +const DEFAULT_CHANNEL_SIZE: usize = 32; + +/// Handle to work with the actor. +/// +/// The actor will shutdown when all its handles are dropped. +#[derive(Clone)] +pub struct KisoHandle { + actor: mpsc::Sender, +} + +impl KisoHandle { + /// Spawn a new actor + pub fn new(state: Configuration) -> Self { + let (actor_sender, actor_receiver) = mpsc::channel(DEFAULT_CHANNEL_SIZE); + let (log_level_update, _) = watch::channel(state.logger.level); + let mut actor = Actor { + handle: actor_receiver, + state, + log_level_update, + }; + tokio::spawn(async move { actor.run().await }); + + Self { + actor: actor_sender, + } + } + + /// Fetch the [`ConfigurationDTO`] from the actor's state. + /// + /// # Errors + /// If communication with actor fails. + pub async fn get_dto(&self) -> Result { + let (tx, rx) = oneshot::channel(); + let msg = Message::GetDTO { respond_to: tx }; + let _ = self.actor.send(msg).await; + let dto = rx.await?; + Ok(dto) + } + + /// Update the configuration state and notify subscribers. + /// + /// Works in a fire-and-forget way, i.e. completion of this task doesn't mean that updates are applied. However, + /// subsequent call of [`Self::get_dto()`] will return an updated state. + /// + /// # Errors + /// If communication with actor fails. + pub async fn update_with_dto(&self, dto: ConfigurationDTO) -> Result<(), Error> { + let (tx, rx) = oneshot::channel(); + let msg = Message::UpdateWithDTO { + dto, + respond_to: tx, + }; + let _ = self.actor.send(msg).await; + rx.await? + } + + /// Subscribe on updates of `logger.level` parameter. + /// + /// # Errors + /// If communication with actor fails. + pub async fn subscribe_on_log_level(&self) -> Result, Error> { + let (tx, rx) = oneshot::channel(); + let msg = Message::SubscribeOnLogLevel { respond_to: tx }; + let _ = self.actor.send(msg).await; + let receiver = rx.await?; + Ok(receiver) + } +} + +enum Message { + GetDTO { + respond_to: oneshot::Sender, + }, + UpdateWithDTO { + dto: ConfigurationDTO, + respond_to: oneshot::Sender>, + }, + SubscribeOnLogLevel { + respond_to: oneshot::Sender>, + }, +} + +/// Possible errors might occur while working with [`KisoHandle`] +#[derive(thiserror::Error, displaydoc::Display, Debug)] +pub enum Error { + /// Failed to get actor's response + Communication(#[from] oneshot::error::RecvError), +} + +struct Actor { + handle: mpsc::Receiver, + state: Configuration, + // Current implementation is somewhat not scalable in terms of code writing: for any + // future dynamic parameter, it will require its own `subscribe_on_` function in [`KisoHandle`], + // new channel here, and new [`Message`] variant. If boilerplate expands, a more general solution will be + // required. However, as of now a single manually written implementation seems optimal. + log_level_update: watch::Sender, +} + +impl Actor { + async fn run(&mut self) { + while let Some(msg) = self.handle.recv().await { + self.handle_message(msg).await + } + } + + async fn handle_message(&mut self, msg: Message) { + match msg { + Message::GetDTO { respond_to } => { + let dto = ConfigurationDTO::from(&self.state); + let _ = respond_to.send(dto); + } + Message::UpdateWithDTO { + dto: + ConfigurationDTO { + logger: LoggerDTO { level: new_level }, + }, + respond_to, + } => { + let _ = self.log_level_update.send(new_level); + self.state.logger.level = new_level; + + let _ = respond_to.send(Ok(())); + } + Message::SubscribeOnLogLevel { respond_to } => { + let _ = respond_to.send(self.log_level_update.subscribe()); + } + } + } +} + +#[cfg(test)] +#[allow(unused)] +mod tests { + use std::time::Duration; + + use iroha_config::{ + base::proxy::LoadFromDisk, + client_api::{ConfigurationDTO, Logger as LoggerDTO}, + iroha::{Configuration, ConfigurationProxy}, + }; + + use super::*; + + fn test_config() -> Configuration { + // FIXME Specifying path here might break! Moreover, if the file is not found, + // the error will say that `public_key` is missing! + // Hopefully this will change: https://github.com/hyperledger/iroha/issues/2585 + ConfigurationProxy::from_path("../config/iroha_test_config.json") + .build() + .unwrap() + } + + #[tokio::test] + async fn subscription_on_log_level_works() { + const INIT_LOG_LEVEL: Level = Level::WARN; + const NEW_LOG_LEVEL: Level = Level::DEBUG; + const WATCH_LAG_MILLIS: u64 = 30; + + let mut config = test_config(); + config.logger.level = INIT_LOG_LEVEL; + let kiso = KisoHandle::new(config); + + let mut recv = kiso + .subscribe_on_log_level() + .await + .expect("Subscription should be fine"); + + let _err = tokio::time::timeout(Duration::from_millis(WATCH_LAG_MILLIS), recv.changed()) + .await + .expect_err("Watcher should not be active initially"); + + kiso.update_with_dto(ConfigurationDTO { + logger: LoggerDTO { + level: NEW_LOG_LEVEL, + }, + }) + .await + .expect("Update should work fine"); + + let () = tokio::time::timeout(Duration::from_millis(WATCH_LAG_MILLIS), recv.changed()) + .await + .expect("Watcher should resolve within timeout") + .expect("Watcher should not be closed"); + + let value = *recv.borrow_and_update(); + assert_eq!(value, NEW_LOG_LEVEL); + } +} diff --git a/core/src/kura.rs b/core/src/kura.rs index cede4d491b3..11dbf2c5192 100644 --- a/core/src/kura.rs +++ b/core/src/kura.rs @@ -10,7 +10,7 @@ use std::{ sync::Arc, }; -use iroha_config::kura::Mode; +use iroha_config::kura::{Configuration, Mode}; use iroha_crypto::{Hash, HashOf}; use iroha_data_model::block::SignedBlock; use iroha_logger::prelude::*; @@ -50,22 +50,19 @@ impl Kura { /// Fails if there are filesystem errors when trying /// to access the block store indicated by the provided /// path. - pub fn new( - mode: Mode, - block_store_path: &Path, - debug_output_new_blocks: bool, - ) -> Result> { + pub fn new(config: &Configuration) -> Result> { + let block_store_path = Path::new(&config.block_store_path); let mut block_store = BlockStore::new(block_store_path, LockStatus::Unlocked); block_store.create_files_if_they_do_not_exist()?; - let block_plain_text_path = debug_output_new_blocks.then(|| { + let block_plain_text_path = config.debug_output_new_blocks.then(|| { let mut path_buf = block_store_path.to_path_buf(); path_buf.push("blocks.json"); path_buf }); let kura = Arc::new(Self { - mode, + mode: config.init_mode, block_store: Mutex::new(block_store), block_data: Mutex::new(Vec::new()), block_plain_text_path, @@ -1054,9 +1051,13 @@ mod tests { #[tokio::test] async fn strict_init_kura() { let temp_dir = TempDir::new().unwrap(); - Kura::new(Mode::Strict, temp_dir.path(), false) - .unwrap() - .init() - .unwrap(); + Kura::new(&Configuration { + init_mode: Mode::Strict, + block_store_path: temp_dir.path().to_str().unwrap().into(), + debug_output_new_blocks: false, + }) + .unwrap() + .init() + .unwrap(); } } diff --git a/core/src/lib.rs b/core/src/lib.rs index e0c6109e31f..c032e5fda37 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -4,6 +4,7 @@ pub mod block; pub mod block_sync; pub mod executor; pub mod gossiper; +pub mod kiso; pub mod kura; pub mod modules; pub mod query; diff --git a/core/test_network/src/lib.rs b/core/test_network/src/lib.rs index a5fbb690c6f..96c4210fd08 100644 --- a/core/test_network/src/lib.rs +++ b/core/test_network/src/lib.rs @@ -170,7 +170,7 @@ impl Network { start_port: Option, ) -> (Self, Client) { let mut configuration = Configuration::test(); - configuration.logger.max_log_level = Level::INFO.into(); + configuration.logger.level = Level::INFO; let network = Network::new_with_offline_peers( Some(configuration), n_peers, @@ -414,22 +414,18 @@ impl Peer { temp_dir: Arc, ) { let mut configuration = self.get_config(configuration); - configuration - .kura - .block_store_path(temp_dir.path()) - .expect("block store path not readable"); + configuration.kura.block_store_path = temp_dir.path().to_str().unwrap().into(); let info_span = iroha_logger::info_span!( "test-peer", p2p_addr = %self.p2p_address, api_addr = %self.api_address, ); - let telemetry = - iroha_logger::init(&configuration.logger).expect("Failed to initialize telemetry"); + let logger = iroha_logger::test_logger(); let (sender, receiver) = std::sync::mpsc::sync_channel(1); let handle = task::spawn( async move { - let mut iroha = Iroha::with_genesis(genesis, configuration, telemetry) + let mut iroha = Iroha::with_genesis(genesis, configuration, logger) .await .expect("Failed to start iroha"); let job_handle = iroha.start_as_task().unwrap(); diff --git a/docs/source/references/config.md b/docs/source/references/config.md index 4a8288df05c..82b00f0bc54 100644 --- a/docs/source/references/config.md +++ b/docs/source/references/config.md @@ -36,8 +36,6 @@ The following is the default configuration used by Iroha. "KURA": { "INIT_MODE": "strict", "BLOCK_STORE_PATH": "./storage", - "BLOCKS_PER_STORAGE_FILE": 1000, - "ACTOR_CHANNEL_CAPACITY": 100, "DEBUG_OUTPUT_NEW_BLOCKS": false }, "SUMERAGI": { @@ -69,11 +67,8 @@ The following is the default configuration used by Iroha. "FUTURE_THRESHOLD_MS": 1000 }, "LOGGER": { - "MAX_LOG_LEVEL": "INFO", - "TELEMETRY_CAPACITY": 1000, - "COMPACT_MODE": false, - "LOG_FILE_PATH": null, - "TERMINAL_COLORS": true + "LEVEL": "INFO", + "FORMAT": "full" }, "GENESIS": { "ACCOUNT_PUBLIC_KEY": null, @@ -225,24 +220,12 @@ Has type `Option>`[^1]. Can be configured via envi ```json { - "ACTOR_CHANNEL_CAPACITY": 100, - "BLOCKS_PER_STORAGE_FILE": 1000, "BLOCK_STORE_PATH": "./storage", "DEBUG_OUTPUT_NEW_BLOCKS": false, "INIT_MODE": "strict" } ``` -### `kura.actor_channel_capacity` - -Default buffer capacity of actor's MPSC channel. - -Has type `Option`[^1]. Can be configured via environment variable `KURA_ACTOR_CHANNEL_CAPACITY` - -```json -100 -``` - ### `kura.block_store_path` Path to the existing block store folder or path to create new folder. @@ -253,16 +236,6 @@ Has type `Option`[^1]. Can be configured via environment variable `KURA_ "./storage" ``` -### `kura.blocks_per_storage_file` - -Maximum number of blocks to write into a single storage file. - -Has type `Option`[^1]. Can be configured via environment variable `KURA_BLOCKS_PER_STORAGE_FILE` - -```json -1000 -``` - ### `kura.debug_output_new_blocks` Whether or not new blocks be outputted to a file called blocks.json. @@ -313,64 +286,31 @@ Has type `Option>`[^1]. Can be configured via en ```json { - "COMPACT_MODE": false, - "LOG_FILE_PATH": null, - "MAX_LOG_LEVEL": "INFO", - "TELEMETRY_CAPACITY": 1000, - "TERMINAL_COLORS": true + "FORMAT": "full", + "LEVEL": "INFO" } ``` -### `logger.compact_mode` - -Compact mode (no spans from telemetry) - -Has type `Option`[^1]. Can be configured via environment variable `COMPACT_MODE` - -```json -false -``` - -### `logger.log_file_path` +### `logger.format` -If provided, logs will be copied to said file in the +Output format -Has type `Option>`[^1]. Can be configured via environment variable `LOG_FILE_PATH` +Has type `Option`[^1]. Can be configured via environment variable `LOG_FORMAT` ```json -null +"full" ``` -### `logger.max_log_level` +### `logger.level` -Maximum log level +Level of logging verbosity -Has type `Option`[^1]. Can be configured via environment variable `MAX_LOG_LEVEL` +Has type `Option`[^1]. Can be configured via environment variable `LOG_LEVEL` ```json "INFO" ``` -### `logger.telemetry_capacity` - -Capacity (or batch size) for telemetry channel - -Has type `Option`[^1]. Can be configured via environment variable `TELEMETRY_CAPACITY` - -```json -1000 -``` - -### `logger.terminal_colors` - -Enable ANSI terminal colors for formatted output. - -Has type `Option`[^1]. Can be configured via environment variable `TERMINAL_COLORS` - -```json -true -``` - ## `network` Network configuration @@ -642,7 +582,7 @@ Has type `Option>`[^1]. Can be configured via The filepath that to write dev-telemetry to -Has type `Option>`[^1]. Can be configured via environment variable `TELEMETRY_FILE` +Has type `Option>`[^1]. Can be configured via environment variable `TELEMETRY_FILE` ```json null diff --git a/futures/src/lib.rs b/futures/src/lib.rs index f45fa002b71..3865cc4e6fe 100644 --- a/futures/src/lib.rs +++ b/futures/src/lib.rs @@ -7,7 +7,7 @@ use std::{ }; pub use iroha_futures_derive::*; -use iroha_logger::telemetry::{Telemetry, TelemetryFields}; +use iroha_logger::telemetry::{Event as Telemetry, Fields as TelemetryFields}; use serde::{Deserialize, Serialize}; use serde_json::Value; diff --git a/futures/tests/basic.rs b/futures/tests/basic.rs index a1514e01ab1..190d6201000 100644 --- a/futures/tests/basic.rs +++ b/futures/tests/basic.rs @@ -1,10 +1,9 @@ use std::{thread, time::Duration}; -use iroha_config::base::proxy::Builder; use iroha_futures::FuturePollTelemetry; -use iroha_logger::ConfigurationProxy; +use iroha_logger::telemetry::Channel; use tokio::task; -use tokio_stream::{wrappers::ReceiverStream, StreamExt}; +use tokio_stream::{wrappers::BroadcastStream, StreamExt}; #[iroha_futures::telemetry_future] async fn sleep(times: Vec) -> i32 { @@ -32,15 +31,13 @@ async fn test_sleep() { Duration::from_nanos(80_000_000), ]; - let (_, telemetry_future) = iroha_logger::init( - &ConfigurationProxy::default() - .build() - .expect("Default logger config always builds"), - ) - .unwrap() - .unwrap(); + let future_telemetry = iroha_logger::test_logger() + .subscribe_on_telemetry(Channel::Future) + .await + .unwrap(); assert_eq!(sleep(sleep_times.clone()).await, 10_i32); - let telemetry = ReceiverStream::new(telemetry_future) + let telemetry = BroadcastStream::new(future_telemetry) + .filter_map(Result::ok) .map(FuturePollTelemetry::try_from) .filter_map(Result::ok) .take(3) diff --git a/logger/Cargo.toml b/logger/Cargo.toml index 9d3ce2ac20c..83aba591aea 100644 --- a/logger/Cargo.toml +++ b/logger/Cargo.toml @@ -19,13 +19,13 @@ serde_json = { workspace = true } tracing = { workspace = true } tracing-core = "0.1.31" tracing-futures = { version = "0.2.5", default-features = false, features = ["std-future", "std"] } -tracing-subscriber = { workspace = true, features = ["fmt", "ansi"] } -tracing-bunyan-formatter = { version = "0.3.9", default-features = false } -tokio = { workspace = true, features = ["sync"] } +tracing-subscriber = { workspace = true, features = ["fmt", "ansi", "json"] } +tokio = { workspace = true, features = ["sync", "rt", "macros"] } console-subscriber = { version = "0.2.0", optional = true } once_cell = { workspace = true } derive_more = { workspace = true } tracing-error = "0.2.0" +thiserror = { workspace = true } [dev-dependencies] tokio = { workspace = true, features = ["macros", "time", "rt"] } diff --git a/logger/src/actor.rs b/logger/src/actor.rs new file mode 100644 index 00000000000..e9e2d91280e --- /dev/null +++ b/logger/src/actor.rs @@ -0,0 +1,145 @@ +//! Actor encapsulating interaction with logger & telemetry subsystems. + +use iroha_config::logger::into_tracing_level; +use iroha_data_model::Level; +use tokio::sync::{broadcast, mpsc, oneshot}; +use tracing_core::Subscriber; +use tracing_subscriber::{reload, reload::Error as ReloadError}; + +use crate::telemetry; + +/// TODO +#[derive(Clone)] +pub struct LoggerHandle { + sender: mpsc::Sender, +} + +impl LoggerHandle { + pub(crate) fn new( + handle: reload::Handle, + telemetry_receiver: mpsc::Receiver, + ) -> Self { + let (tx, rx) = mpsc::channel(32); + let (regular, _) = broadcast::channel(32); + let (future_forward, _) = broadcast::channel(32); + let mut actor = LoggerActor { + message_receiver: rx, + level_handle: handle, + telemetry_receiver, + telemetry_forwarder_regular: regular, + telemetry_forwarder_future: future_forward, + }; + tokio::spawn(async move { actor.run().await }); + + Self { sender: tx } + } + + /// Reload the log level filter. + /// + /// # Errors + /// - If reloading on the side of [`reload::Handle`] fails + /// - If actor communication fails + pub async fn reload_level(&self, new_value: Level) -> color_eyre::Result<(), Error> { + let (tx, rx) = oneshot::channel(); + let _ = self + .sender + .send(Message::ReloadLevel { + value: new_value, + respond_to: tx, + }) + .await; + Ok(rx.await??) + } + + /// Subscribe to the telemetry events broadcasting. + /// + /// # Errors + /// If actor communication fails + pub async fn subscribe_on_telemetry( + &self, + channel: telemetry::Channel, + ) -> color_eyre::Result, Error> { + let (tx, rx) = oneshot::channel(); + let _ = self + .sender + .send(Message::SubscribeOnTelemetry { + channel, + respond_to: tx, + }) + .await; + Ok(rx.await?) + } +} + +enum Message { + ReloadLevel { + value: Level, + respond_to: oneshot::Sender>, + }, + SubscribeOnTelemetry { + channel: telemetry::Channel, + respond_to: oneshot::Sender>, + }, +} + +/// Possible errors that might occur while interacting with the actor. +#[derive(thiserror::Error, Debug)] +pub enum Error { + /// If dynamic log level reloading failed + #[error("cannot dynamically reload the log level")] + LevelReload(#[from] ReloadError), + /// If actor communication is broken + #[error("failed to communicate with the actor")] + Communication(#[from] oneshot::error::RecvError), +} + +struct LoggerActor { + message_receiver: mpsc::Receiver, + telemetry_receiver: mpsc::Receiver, + telemetry_forwarder_regular: broadcast::Sender, + telemetry_forwarder_future: broadcast::Sender, + level_handle: reload::Handle, +} + +impl LoggerActor { + async fn run(&mut self) { + loop { + tokio::select! { + Some(msg) = self.message_receiver.recv() => { + self.handle_message(msg); + }, + Some(telemetry::ChannelEvent(channel, event)) = self.telemetry_receiver.recv() => { + let forward_to = match channel { + telemetry::Channel::Regular => &self.telemetry_forwarder_regular, + telemetry::Channel::Future => &self.telemetry_forwarder_future, + }; + + let _ = forward_to.send(event); + }, + else => break + } + tokio::task::yield_now().await; + } + } + + fn handle_message(&mut self, msg: Message) { + match msg { + Message::ReloadLevel { value, respond_to } => { + let level = into_tracing_level(value); + let filter = tracing_subscriber::filter::LevelFilter::from_level(level); + let result = self.level_handle.reload(filter); + let _ = respond_to.send(result); + } + Message::SubscribeOnTelemetry { + channel: kind, + respond_to, + } => { + let receiver = match kind { + telemetry::Channel::Regular => self.telemetry_forwarder_regular.subscribe(), + telemetry::Channel::Future => self.telemetry_forwarder_future.subscribe(), + }; + let _ = respond_to.send(receiver); + } + } + } +} diff --git a/logger/src/layer.rs b/logger/src/layer.rs index 5b17d93cfe4..6da758b35e9 100644 --- a/logger/src/layer.rs +++ b/logger/src/layer.rs @@ -111,7 +111,7 @@ pub struct LevelFilter { static CURRENT_LEVEL: AtomicU8 = AtomicU8::new(0); /// Return max log level -pub fn max_log_level() -> u8 { +pub fn current_level() -> u8 { CURRENT_LEVEL.load(Ordering::Relaxed) } @@ -129,12 +129,12 @@ impl LevelFilter { /// Constructor of level filter #[allow(clippy::new_ret_no_self)] pub fn new(level: Level, subscriber: S) -> impl Subscriber { - Self::update_max_log_level(level); + Self::update_log_level(level); EventSubscriber(Self { subscriber }) } /// Updater of max level - fn update_max_log_level(level: Level) { + fn update_log_level(level: Level) { CURRENT_LEVEL.store(Self::level_as_u8(level), Ordering::SeqCst); } } @@ -148,7 +148,7 @@ impl EventInspectorTrait for LevelFilter { fn event(&self, event: &Event<'_>) { let level = Self::level_as_u8(*event.metadata().level()); - if level >= max_log_level() { + if level >= current_level() { self.subscriber.event(event) } } diff --git a/logger/src/lib.rs b/logger/src/lib.rs index fceecc1d26c..f84ddc6a7d8 100644 --- a/logger/src/lib.rs +++ b/logger/src/lib.rs @@ -1,159 +1,139 @@ //! Iroha's logging utilities. +pub mod actor; pub mod layer; pub mod telemetry; use std::{ fmt::Debug, - fs::OpenOptions, - path::PathBuf, sync::{ atomic::{AtomicBool, Ordering}, - Arc, + OnceLock, }, }; -use color_eyre::{eyre::WrapErr, Report, Result}; -use iroha_config::logger::into_tracing_level; -pub use iroha_config::logger::{Configuration, ConfigurationProxy}; -pub use telemetry::{Telemetry, TelemetryFields, TelemetryLayer}; -use tokio::sync::mpsc::Receiver; +use actor::LoggerHandle; +use color_eyre::{eyre::eyre, Report, Result}; +pub use iroha_config::logger::{Configuration, ConfigurationProxy, Format, Level}; +use iroha_config::{base::proxy::Builder, logger::into_tracing_level}; +use tracing::subscriber::set_global_default; pub use tracing::{ debug, debug_span, error, error_span, info, info_span, instrument as log, trace, trace_span, warn, warn_span, Instrument, }; -use tracing::{subscriber::set_global_default, Subscriber}; -use tracing_bunyan_formatter::{BunyanFormattingLayer, JsonStorageLayer}; pub use tracing_futures::Instrument as InstrumentFutures; +pub use tracing_subscriber::reload::Error as ReloadError; use tracing_subscriber::{layer::SubscriberExt, registry::Registry, reload}; -/// Substrate telemetry -pub type SubstrateTelemetry = Receiver; - -/// Future telemetry -pub type FutureTelemetry = Receiver; - -/// Convenience wrapper for Telemetry types. -pub type Telemetries = (SubstrateTelemetry, FutureTelemetry); +const TELEMETRY_CAPACITY: usize = 1000; static LOGGER_SET: AtomicBool = AtomicBool::new(false); -/// Initializes `Logger` with given [`Configuration`]. -/// After the initialization `log` macros will print with the use of this `Logger`. -/// Returns the receiving side of telemetry channels (regular telemetry, future telemetry) -/// -/// # Errors -/// If the logger is already set, raises a generic error. -pub fn init(configuration: &Configuration) -> Result> { +fn try_set_logger() -> Result<()> { if LOGGER_SET .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst) .is_err() { - return Ok(None); + return Err(eyre!("Logger is already set.")); } - Ok(Some(setup_logger(configuration)?)) + Ok(()) } -/// Disables the logger by setting `LOGGER_SET` to true. Will fail -/// if the logger has already been initialized. This function is -/// required in order to generate flamegraphs and flamecharts. +/// Initializes the logger globally with given [`Configuration`]. /// -/// Returns true on success. -pub fn disable_logger() -> bool { - LOGGER_SET - .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst) - .is_ok() -} +/// Returns [`LoggerHandle`] to interact with the logger instance +/// +/// Works only once per process, all subsequent invocations will fail. +/// +/// For usage in tests consider [`test_logger`]. +/// +/// # Errors +/// If the logger is already set, raises a generic error. +// TODO: refactor configuration in a way that `terminal_colors` is part of it +// https://github.com/hyperledger/iroha/issues/3500 +pub fn init_global(configuration: &Configuration, terminal_colors: bool) -> Result { + try_set_logger()?; -fn setup_logger(configuration: &Configuration) -> Result { let layer = tracing_subscriber::fmt::layer() - .with_ansi(configuration.terminal_colors) + .with_ansi(terminal_colors) .with_test_writer(); - if configuration.compact_mode { - add_bunyan(configuration, layer.compact()) - } else { - add_bunyan(configuration, layer) + match configuration.format { + Format::Full => step2(configuration, layer), + Format::Compact => step2(configuration, layer.compact()), + Format::Pretty => step2(configuration, layer.pretty()), + Format::Json => step2(configuration, layer.json()), } } -fn bunyan_writer_create(destination: PathBuf) -> Result> { - OpenOptions::new() - .create(true) - .append(true) - .open(destination) - .wrap_err("Failed to create or open bunyan logs file") - .map(Arc::new) +/// Returns once lazily initialised global logger for testing purposes. +/// +/// # Panics +/// If [`init_global`] or [`disable_global`] were called first. +#[allow(clippy::needless_update)] // `tokio-console` feature adds additional fields to Configuration +pub fn test_logger() -> LoggerHandle { + static LOGGER: OnceLock = OnceLock::new(); + + LOGGER + .get_or_init(|| { + // NOTE: if this config should be changed for some specific tests, consider + // isolating those tests into a separate process and controlling default logger config + // with ENV vars rather than by extending `test_logger` signature. This will both remain + // `test_logger` simple and also will emphasise isolation which is necessary anyway in + // case of singleton mocking (where the logger is the singleton). + let config = Configuration { + level: Level::DEBUG, + format: Format::Pretty, + ..ConfigurationProxy::default().build().unwrap() + }; + + init_global(&config, true).expect( + "`init_global()` or `disable_global()` should not be called before `test_logger()`", + ) + }) + .clone() +} + +/// Disables the logger globally, so that subsequent calls to [`init_global`] will fail. +/// +/// Disabling logger is required in order to generate flamegraphs and flamecharts. +/// +/// # Errors +/// If global logger was already initialised/disabled. +pub fn disable_global() -> Result<()> { + try_set_logger() } -fn add_bunyan(configuration: &Configuration, layer: L) -> Result +fn step2(configuration: &Configuration, layer: L) -> Result where L: tracing_subscriber::Layer + Debug + Send + Sync + 'static, { - let level: tracing::Level = into_tracing_level(configuration.max_log_level.value()); + let level: tracing::Level = into_tracing_level(configuration.level); let level_filter = tracing_subscriber::filter::LevelFilter::from_level(level); - let (filter, handle) = reload::Layer::new(level_filter); - configuration - .max_log_level - .set_handle(iroha_config::logger::ReloadHandle(handle)); - let (bunyan_layer, storage_layer) = match configuration.log_file_path.clone() { - Some(path) => ( - Some(BunyanFormattingLayer::new( - "bunyan_layer".into(), - bunyan_writer_create(path)?, - )), - Some(JsonStorageLayer), - ), - None => (None, None), - }; + let (level_filter, level_filter_handle) = reload::Layer::new(level_filter); let subscriber = Registry::default() .with(layer) - .with(filter) - .with(storage_layer) - .with(tracing_error::ErrorLayer::default()) - .with(bunyan_layer); - - add_tokio_console_subscriber(configuration, subscriber) -} + .with(level_filter) + .with(tracing_error::ErrorLayer::default()); -fn add_tokio_console_subscriber< - S: Subscriber + Send + Sync + 'static + for<'a> tracing_subscriber::registry::LookupSpan<'a>, ->( - configuration: &Configuration, - subscriber: S, -) -> Result { #[cfg(all(feature = "tokio-console", not(feature = "no-tokio-console")))] - { + let subscriber = { let console_subscriber = console_subscriber::ConsoleLayer::builder() .server_addr( configuration .tokio_console_addr - .parse::() + .into() .expect("Invalid address for tokio console"), ) .spawn(); - add_telemetry_and_set_default(configuration, subscriber.with(console_subscriber)) - } - #[cfg(any(not(feature = "tokio-console"), feature = "no-tokio-console"))] - { - add_telemetry_and_set_default(configuration, subscriber) - } -} - -fn add_telemetry_and_set_default( - configuration: &Configuration, - subscriber: S, -) -> Result { - // static global_subscriber: dyn Subscriber = once_cell::new; - let (subscriber, receiver, receiver_future) = TelemetryLayer::from_capacity( - subscriber, - configuration - .telemetry_capacity - .try_into() - .expect("u32 should always fit in usize"), - ); + subscriber.with(console_subscriber) + }; + let (subscriber, receiver) = telemetry::Layer::with_capacity(subscriber, TELEMETRY_CAPACITY); set_global_default(subscriber)?; - Ok((receiver, receiver_future)) + + let handle = LoggerHandle::new(level_filter_handle, receiver); + + Ok(handle) } /// Macro for sending telemetry info diff --git a/logger/src/telemetry.rs b/logger/src/telemetry.rs index 526209daa60..3e65e5914bb 100644 --- a/logger/src/telemetry.rs +++ b/logger/src/telemetry.rs @@ -4,25 +4,25 @@ use std::{error::Error, fmt::Debug}; use derive_more::{Deref, DerefMut}; use serde_json::Value; -use tokio::sync::mpsc::{self, Receiver, Sender}; +use tokio::sync::mpsc; use tracing::{ field::{Field, Visit}, - Event, Subscriber, + Event as TracingEvent, Subscriber, }; use crate::layer::{EventInspectorTrait, EventSubscriber}; /// Target for telemetry in `tracing` -pub const TELEMETRY_TARGET_PREFIX: &str = "telemetry::"; +pub const TARGET_PREFIX: &str = "telemetry::"; /// Target for telemetry future in `tracing` -pub const TELEMETRY_FUTURE_TARGET_PREFIX: &str = "telemetry_future::"; +pub const FUTURE_TARGET_PREFIX: &str = "telemetry_future::"; /// Fields for telemetry (type for efficient saving) #[derive(Clone, Debug, PartialEq, Eq, Default, Deref, DerefMut)] -pub struct TelemetryFields(pub Vec<(&'static str, Value)>); +pub struct Fields(pub Vec<(&'static str, Value)>); -impl From for Value { - fn from(TelemetryFields(fields): TelemetryFields) -> Self { +impl From for Value { + fn from(Fields(fields): Fields) -> Self { fields .into_iter() .map(|(key, value)| (key.to_owned(), value)) @@ -32,14 +32,14 @@ impl From for Value { /// Telemetry which can be received from telemetry layer #[derive(Clone, Debug, PartialEq, Eq)] -pub struct Telemetry { +pub struct Event { /// Subsystem from which telemetry was received pub target: &'static str, /// Fields which was recorded - pub fields: TelemetryFields, + pub fields: Fields, } -impl Visit for Telemetry { +impl Visit for Event { fn record_debug(&mut self, field: &Field, value: &dyn Debug) { self.fields .push((field.name(), format!("{:?}", &value).into())) @@ -71,9 +71,9 @@ impl Visit for Telemetry { } } -impl Telemetry { - fn from_event(target: &'static str, event: &Event<'_>) -> Self { - let fields = TelemetryFields::default(); +impl Event { + fn from_event(target: &'static str, event: &TracingEvent<'_>) -> Self { + let fields = Fields::default(); let mut telemetry = Self { target, fields }; event.record(&mut telemetry); telemetry @@ -82,70 +82,58 @@ impl Telemetry { /// Telemetry layer #[derive(Debug, Clone)] -pub struct TelemetryLayer { - telemetry_sender: Sender, - telemetry_future_sender: Sender, +pub struct Layer { + sender: mpsc::Sender, subscriber: S, } -impl TelemetryLayer { - /// Create telemetry from channel sender - pub fn from_senders( - subscriber: S, - telemetry_sender: Sender, - telemetry_future_sender: Sender, - ) -> impl Subscriber { - EventSubscriber(Self { - telemetry_sender, - telemetry_future_sender, - subscriber, - }) - } - - /// Create new telemetry layer with specific channel size (via const generic) - #[allow(clippy::new_ret_no_self)] - pub fn new( - subscriber: S, - ) -> (impl Subscriber, Receiver, Receiver) { - let (sender, receiver) = mpsc::channel(CHANNEL_SIZE); - let (sender_future, receiver_future) = mpsc::channel(CHANNEL_SIZE); - let telemetry = Self::from_senders(subscriber, sender, sender_future); - (telemetry, receiver, receiver_future) - } - +impl Layer { /// Create new telemetry layer with specific channel size #[allow(clippy::new_ret_no_self)] - pub fn from_capacity( + pub fn with_capacity( subscriber: S, channel_size: usize, - ) -> (impl Subscriber, Receiver, Receiver) { + ) -> (impl Subscriber, mpsc::Receiver) { let (sender, receiver) = mpsc::channel(channel_size); - let (sender_future, receiver_future) = mpsc::channel(channel_size); - let telemetry = Self::from_senders(subscriber, sender, sender_future); - (telemetry, receiver, receiver_future) + let telemetry = EventSubscriber(Self { sender, subscriber }); + (telemetry, receiver) + } + + fn send_event(&self, channel: Channel, target: &'static str, event: &TracingEvent<'_>) { + let _ = self + .sender + .try_send(ChannelEvent(channel, Event::from_event(target, event))); } } -impl EventInspectorTrait for TelemetryLayer { +impl EventInspectorTrait for Layer { type Subscriber = S; fn inner_subscriber(&self) -> &Self::Subscriber { &self.subscriber } - fn event(&self, event: &Event<'_>) { + fn event(&self, event: &TracingEvent<'_>) { let target = event.metadata().target(); #[allow(clippy::option_if_let_else)] // This is actually more readable. - if let Some(telemetry_target) = target.strip_prefix(TELEMETRY_TARGET_PREFIX) { - let _result = self - .telemetry_sender - .try_send(Telemetry::from_event(telemetry_target, event)); - } else if let Some(future_target) = target.strip_prefix(TELEMETRY_FUTURE_TARGET_PREFIX) { - let _result = self - .telemetry_future_sender - .try_send(Telemetry::from_event(future_target, event)); + if let Some(target) = target.strip_prefix(TARGET_PREFIX) { + self.send_event(Channel::Regular, target, event); + } else if let Some(target) = target.strip_prefix(FUTURE_TARGET_PREFIX) { + self.send_event(Channel::Future, target, event); } else { self.subscriber.event(event) } } } + +/// A pair of [`Channel`] associated with [`Event`] +pub struct ChannelEvent(pub Channel, pub Event); + +/// Supported telemetry channels +#[derive(Copy, Clone)] +pub enum Channel { + /// Regular telemetry + Regular, + /// Telemetry collected from futures instrumented with `iroha_futures::TelemetryFuture`. + Future, +} diff --git a/logger/tests/configuration.rs b/logger/tests/configuration.rs index 661443ed256..d63d837200c 100644 --- a/logger/tests/configuration.rs +++ b/logger/tests/configuration.rs @@ -1,26 +1,23 @@ use std::time::Duration; -use iroha_data_model::Level; -use iroha_logger::{info, init, Configuration, Telemetry, TelemetryFields}; +use iroha_logger::{ + info, + telemetry::{Channel, Event, Fields}, + test_logger, +}; use tokio::time; #[tokio::test] async fn telemetry_separation_custom() { - let config = Configuration { - max_log_level: Level::TRACE.into(), - telemetry_capacity: 100, - compact_mode: true, - log_file_path: Some("/dev/stdout".into()), - terminal_colors: true, - #[cfg(feature = "tokio-console")] - tokio_console_addr: "127.0.0.1:5555".into(), - }; - let (mut receiver, _) = init(&config).unwrap().unwrap(); + let mut receiver = test_logger() + .subscribe_on_telemetry(Channel::Regular) + .await + .unwrap(); info!(target: "telemetry::test", a = 2, c = true, d = "this won't be logged"); info!("This will be logged in bunyan-readable format"); - let telemetry = Telemetry { + let telemetry = Event { target: "test", - fields: TelemetryFields(vec![ + fields: Fields(vec![ ("a", serde_json::json!(2)), ("c", serde_json::json!(true)), ("d", serde_json::json!("this won't be logged")), diff --git a/logger/tests/setting_logger.rs b/logger/tests/setting_logger.rs index 6d204f7abca..209a6b45928 100644 --- a/logger/tests/setting_logger.rs +++ b/logger/tests/setting_logger.rs @@ -1,21 +1,17 @@ use iroha_config::base::proxy::Builder; -use iroha_logger::{init, ConfigurationProxy}; +use iroha_logger::{init_global, ConfigurationProxy}; #[tokio::test] async fn setting_logger_twice_fails() { - assert!(init( - &ConfigurationProxy::default() - .build() - .expect("Default logger config always builds") - ) - .is_ok()); - let second_init = init( - &ConfigurationProxy::default() - .build() - .expect("Default logger config always builds"), - ); - assert!(second_init.is_ok()); - assert!(second_init.unwrap().is_none()); + let cfg = ConfigurationProxy::default() + .build() + .expect("Default logger config always builds"); + + let first = init_global(&cfg, false); + assert!(first.is_ok()); + + let second = init_global(&cfg, false); + assert!(second.is_err()); } #[test] diff --git a/logger/tests/telemetry.rs b/logger/tests/telemetry.rs index bfab41332eb..64b8985ca0d 100644 --- a/logger/tests/telemetry.rs +++ b/logger/tests/telemetry.rs @@ -1,19 +1,23 @@ use std::time::Duration; -use iroha_config::base::proxy::Builder; -use iroha_logger::{info, init, ConfigurationProxy, Telemetry, TelemetryFields}; +use iroha_logger::{ + info, + telemetry::{Channel, Event, Fields}, + test_logger, +}; use tokio::time; #[tokio::test] async fn telemetry_separation_default() { - let (mut receiver, _) = init(&ConfigurationProxy::default().build().unwrap()) - .unwrap() + let mut receiver = test_logger() + .subscribe_on_telemetry(Channel::Regular) + .await .unwrap(); info!(target: "telemetry::test", a = 2, c = true, d = "this won't be logged"); info!("This will be logged"); - let telemetry = Telemetry { + let telemetry = Event { target: "test", - fields: TelemetryFields(vec![ + fields: Fields(vec![ ("a", serde_json::json!(2)), ("c", serde_json::json!(true)), ("d", serde_json::json!("this won't be logged")), diff --git a/p2p/tests/integration/p2p.rs b/p2p/tests/integration/p2p.rs index 93f9a391765..b23faff5036 100644 --- a/p2p/tests/integration/p2p.rs +++ b/p2p/tests/integration/p2p.rs @@ -10,8 +10,8 @@ use std::{ use futures::{prelude::*, stream::FuturesUnordered, task::AtomicWaker}; use iroha_config_base::proxy::Builder; use iroha_crypto::KeyPair; -use iroha_data_model::{prelude::PeerId, Level}; -use iroha_logger::{prelude::*, Configuration, ConfigurationProxy}; +use iroha_data_model::prelude::PeerId; +use iroha_logger::{prelude::*, ConfigurationProxy}; use iroha_p2p::{network::message::*, NetworkHandle}; use iroha_primitives::addr::socket_addr; use parity_scale_codec::{Decode, Encode}; @@ -23,18 +23,16 @@ use tokio::{ #[derive(Clone, Debug, Decode, Encode)] struct TestMessage(String); -static INIT: Once = Once::new(); - fn setup_logger() { + static INIT: Once = Once::new(); + INIT.call_once(|| { - let log_config = Configuration { - max_log_level: Level::TRACE.into(), - compact_mode: false, - ..ConfigurationProxy::default() - .build() - .expect("Default logger config failed to build. This is a programmer error") - }; - iroha_logger::init(&log_config).expect("Failed to start logger"); + let mut config = ConfigurationProxy::default() + .build() + .expect("Default logger config failed to build. This is a programmer error"); + config.level = iroha_logger::Level::TRACE; + config.format = iroha_logger::Format::Pretty; + iroha_logger::init_global(&config, true).unwrap(); }) } @@ -230,17 +228,7 @@ async fn two_networks() { #[tokio::test(flavor = "multi_thread", worker_threads = 8)] async fn multiple_networks() { - let log_config = Configuration { - max_log_level: Level::TRACE.into(), - compact_mode: false, - ..ConfigurationProxy::default() - .build() - .expect("Default logger config should always build") - }; - // Can't use logger because it's failed to initialize. - if let Err(err) = iroha_logger::init(&log_config) { - eprintln!("Failed to initialize logger: {err}"); - } + setup_logger(); info!("Starting..."); let mut peers = Vec::new(); diff --git a/scripts/test_env.py b/scripts/test_env.py index f6463040a1f..b24b7eb0911 100755 --- a/scripts/test_env.py +++ b/scripts/test_env.py @@ -118,14 +118,14 @@ def run(self, is_genesis: bool = False): os.environ["KURA_BLOCK_STORE_PATH"] = str(peer_dir.joinpath("storage")) os.environ["SNAPSHOT_DIR_PATH"] = str(peer_dir.joinpath("storage")) - os.environ["LOG_FILE_PATH"] = str(peer_dir.joinpath("log.json")) - os.environ["MAX_LOG_LEVEL"] = "TRACE" + os.environ["LOG_LEVEL"] = "TRACE" + os.environ["LOG_FORMAT"] = "\"pretty\"" + os.environ["LOG_TOKIO_CONSOLE_ADDR"] = f"{self.host_ip}:{self.tokio_console_port}" os.environ["IROHA_PUBLIC_KEY"] = self.public_key os.environ["IROHA_PRIVATE_KEY"] = self.private_key os.environ["SUMERAGI_DEBUG_FORCE_SOFT_FORK"] = "false" os.environ["TORII_P2P_ADDR"] = f"{self.host_ip}:{self.p2p_port}" os.environ["TORII_API_URL"] = f"{self.host_ip}:{self.api_port}" - os.environ["TOKIO_CONSOLE_ADDR"] = f"{self.host_ip}:{self.tokio_console_port}" genesis_arg = "--submit-genesis" if is_genesis else "" # FD never gets closed diff --git a/telemetry/Cargo.toml b/telemetry/Cargo.toml index 4289c80e87d..e1fb573541b 100644 --- a/telemetry/Cargo.toml +++ b/telemetry/Cargo.toml @@ -31,7 +31,7 @@ serde_json = { workspace = true } streaming-stats = "0.2.3" serde = { workspace = true, features = ["derive"] } tokio = { workspace = true, features = ["rt", "rt-multi-thread", "macros"] } -tokio-stream = { workspace = true, features = ["fs"] } +tokio-stream = { workspace = true, features = ["fs", "sync"] } tokio-tungstenite = { workspace = true } url = { workspace = true, features = ["serde"] } prometheus = { workspace = true } diff --git a/telemetry/src/dev.rs b/telemetry/src/dev.rs index d4970f7c653..674b7bca748 100644 --- a/telemetry/src/dev.rs +++ b/telemetry/src/dev.rs @@ -1,31 +1,26 @@ //! Module with development telemetry use eyre::{Result, WrapErr}; -use iroha_logger::telemetry::Telemetry; +use iroha_config::telemetry::DevTelemetryConfig; +use iroha_logger::telemetry::Event as Telemetry; use tokio::{ fs::OpenOptions, io::AsyncWriteExt, - sync::mpsc::Receiver, + sync::broadcast::Receiver, task::{self, JoinHandle}, }; -use tokio_stream::{wrappers::ReceiverStream, StreamExt}; - -use crate::Configuration; +use tokio_stream::{wrappers::BroadcastStream, StreamExt}; /// Starts telemetry writing to a file /// # Errors /// Fails if unable to open the file pub async fn start( - config: &Configuration, + DevTelemetryConfig { + file: telemetry_file, + }: DevTelemetryConfig, telemetry: Receiver, ) -> Result> { - let mut telemetry = crate::futures::get_stream(ReceiverStream::new(telemetry)); - - let Some(telemetry_file) = &config.file else { - return Ok(task::spawn(async move { - while telemetry.next().await.is_some() {} - })); - }; + let mut stream = crate::futures::get_stream(BroadcastStream::new(telemetry).fuse()); let mut file = OpenOptions::new() .write(true) @@ -40,11 +35,11 @@ pub async fn start( .wrap_err("Failed to create and open file for telemetry")?; // Serde doesn't support async Read Write traits. - // So let synchonous synchronous code be here. + // So let synchronous code be here. // // TODO: After migration to tokio move to https://docs.rs/tokio-serde let join_handle = task::spawn(async move { - while let Some(item) = telemetry.next().await { + while let Some(item) = stream.next().await { let telemetry_json = match serde_json::to_string(&item) { Ok(json) => json, Err(error) => { diff --git a/telemetry/src/futures.rs b/telemetry/src/futures.rs index df723f38406..5f4cb834d22 100644 --- a/telemetry/src/futures.rs +++ b/telemetry/src/futures.rs @@ -2,10 +2,10 @@ use std::{collections::HashMap, marker::Unpin, time::Duration}; use iroha_futures::FuturePollTelemetry; -use iroha_logger::telemetry::Telemetry; +use iroha_logger::telemetry::Event as Telemetry; use serde::{Deserialize, Serialize}; use tokio::time; -use tokio_stream::{Stream, StreamExt}; +use tokio_stream::{wrappers::errors::BroadcastStreamRecvError, Stream, StreamExt}; pub mod post_process { //! Module with telemetry post processing @@ -80,9 +80,10 @@ pub mod post_process { /// Gets stream of future poll telemetry out of general telemetry stream pub fn get_stream( - receiver: impl Stream + Unpin, + receiver: impl Stream> + Unpin, ) -> impl Stream + Unpin { receiver + .filter_map(Result::ok) .map(FuturePollTelemetry::try_from) .filter_map(Result::ok) .map( diff --git a/telemetry/src/ws.rs b/telemetry/src/ws.rs index 700860f2de8..c8f1486e76c 100644 --- a/telemetry/src/ws.rs +++ b/telemetry/src/ws.rs @@ -4,13 +4,15 @@ use std::time::Duration; use chrono::Local; use eyre::{eyre, Result}; use futures::{stream::SplitSink, Sink, SinkExt, StreamExt}; -use iroha_logger::Telemetry; +use iroha_config::telemetry::RegularTelemetryConfig; +use iroha_logger::telemetry::Event as Telemetry; use serde_json::Map; use tokio::{ net::TcpStream, - sync::mpsc::{self, Receiver, Sender}, + sync::{broadcast, mpsc}, + task::JoinHandle, }; -use tokio_stream::wrappers::ReceiverStream; +use tokio_stream::wrappers::{BroadcastStream, ReceiverStream}; use tokio_tungstenite::{ tungstenite::{Error, Message}, MaybeTlsStream, WebSocketStream, @@ -21,36 +23,43 @@ use crate::retry_period::RetryPeriod; type WebSocketSplitSink = SplitSink>, Message>; +const INTERNAL_CHANNEL_CAPACITY: usize = 10; + /// Starts telemetry sending data to a server /// # Errors /// Fails if unable to connect to the server -pub async fn start(config: &crate::Configuration, telemetry: Receiver) -> Result { - if let (Some(name), Some(url)) = (&config.name, &config.url) { - iroha_logger::info!(%url, "Starting telemetry"); - let (ws, _) = tokio_tungstenite::connect_async(url).await?; - let (write, _read) = ws.split(); - let (internal_sender, internal_receiver) = mpsc::channel(10); - let client = Client::new( - name.clone(), - write, - WebsocketSinkFactory::new(url.clone()), - RetryPeriod::new(config.min_retry_period, config.max_retry_delay_exponent), - internal_sender, - ); - tokio::task::spawn(async move { - client.run(telemetry, internal_receiver).await; - }); - Ok(true) - } else { - Ok(false) - } +pub async fn start( + RegularTelemetryConfig { + name, + url, + max_retry_delay_exponent, + min_retry_period, + }: RegularTelemetryConfig, + telemetry: broadcast::Receiver, +) -> Result> { + iroha_logger::info!(%url, "Starting telemetry"); + let (ws, _) = tokio_tungstenite::connect_async(&url).await?; + let (write, _read) = ws.split(); + let (internal_sender, internal_receiver) = mpsc::channel(INTERNAL_CHANNEL_CAPACITY); + let client = Client::new( + name, + write, + WebsocketSinkFactory::new(url), + RetryPeriod::new(min_retry_period, max_retry_delay_exponent), + internal_sender, + ); + let handle = tokio::task::spawn(async move { + client.run(telemetry, internal_receiver).await; + }); + + Ok(handle) } struct Client { name: String, sink_factory: F, retry_period: RetryPeriod, - internal_sender: Sender, + internal_sender: mpsc::Sender, sink: Option, init_msg: Option, } @@ -65,7 +74,7 @@ where sink: S, sink_factory: F, retry_period: RetryPeriod, - internal_sender: Sender, + internal_sender: mpsc::Sender, ) -> Self { Self { name, @@ -79,15 +88,15 @@ where pub async fn run( mut self, - receiver: Receiver, - internal_receiver: Receiver, + receiver: broadcast::Receiver, + internal_receiver: mpsc::Receiver, ) { - let mut stream = ReceiverStream::new(receiver).fuse(); + let mut stream = BroadcastStream::new(receiver).fuse(); let mut internal_stream = ReceiverStream::new(internal_receiver).fuse(); loop { tokio::select! { msg = stream.next() => { - if let Some(msg) = msg { + if let Some(Ok(msg)) = msg { self.on_telemetry(msg).await; } else { break; @@ -272,8 +281,7 @@ mod tests { use eyre::{eyre, Result}; use futures::{Sink, StreamExt}; - use iroha_config::base::proxy::Builder; - use iroha_logger::telemetry::{Telemetry, TelemetryFields}; + use iroha_logger::telemetry::{Event, Fields}; use serde_json::{Map, Value}; use tokio::task::JoinHandle; use tokio_tungstenite::tungstenite::{Error, Message}; @@ -356,13 +364,13 @@ mod tests { struct Suite { fail_send: Arc, fail_factory_create: Arc, - telemetry_sender: tokio::sync::mpsc::Sender, + telemetry_sender: tokio::sync::broadcast::Sender, message_receiver: futures::channel::mpsc::Receiver, } impl Suite { pub fn new() -> (Self, JoinHandle<()>) { - let (telemetry_sender, telemetry_receiver) = tokio::sync::mpsc::channel(100); + let (telemetry_sender, telemetry_receiver) = tokio::sync::broadcast::channel(100); let (message_sender, message_receiver) = futures::channel::mpsc::channel(100); let fail_send = Arc::new(AtomicBool::new(false)); let message_sender = { @@ -402,10 +410,10 @@ mod tests { } } - fn system_connected_telemetry() -> Telemetry { - Telemetry { + fn system_connected_telemetry() -> Event { + Event { target: "telemetry::test", - fields: TelemetryFields(vec![ + fields: Fields(vec![ ("msg", Value::String("system.connected".to_owned())), ( "genesis_hash", @@ -415,10 +423,10 @@ mod tests { } } - fn system_interval_telemetry(peers: u64) -> Telemetry { - Telemetry { + fn system_interval_telemetry(peers: u64) -> Event { + Event { target: "telemetry::test", - fields: TelemetryFields(vec![ + fields: Fields(vec![ ("msg", Value::String("system.interval".to_owned())), ("peers", Value::Number(peers.into())), ]), @@ -433,10 +441,7 @@ mod tests { } = suite; // The first message is `initialization` - telemetry_sender - .send(system_connected_telemetry()) - .await - .unwrap(); + telemetry_sender.send(system_connected_telemetry()).unwrap(); tokio::time::sleep(Duration::from_millis(100)).await; { let msg = message_receiver.next().await.unwrap(); @@ -467,10 +472,7 @@ mod tests { } // The second message is `update` - telemetry_sender - .send(system_interval_telemetry(2)) - .await - .unwrap(); + telemetry_sender.send(system_interval_telemetry(2)).unwrap(); tokio::time::sleep(Duration::from_millis(100)).await; { let msg = message_receiver.next().await.unwrap(); @@ -500,19 +502,13 @@ mod tests { // Fail sending the first message fail_send.store(true, Ordering::SeqCst); - telemetry_sender - .send(system_connected_telemetry()) - .await - .unwrap(); + telemetry_sender.send(system_connected_telemetry()).unwrap(); message_receiver.try_next().unwrap_err(); tokio::time::sleep(Duration::from_millis(100)).await; // The second message is not sent because the sink is reset fail_send.store(false, Ordering::SeqCst); - telemetry_sender - .send(system_interval_telemetry(1)) - .await - .unwrap(); + telemetry_sender.send(system_interval_telemetry(1)).unwrap(); message_receiver.try_next().unwrap_err(); tokio::time::sleep(Duration::from_millis(100)).await; @@ -521,10 +517,7 @@ mod tests { tokio::time::sleep(Duration::from_secs(1)).await; // The third message is not sent because the sink is not created yet - telemetry_sender - .send(system_interval_telemetry(1)) - .await - .unwrap(); + telemetry_sender.send(system_interval_telemetry(1)).unwrap(); message_receiver.try_next().unwrap_err(); } @@ -538,19 +531,13 @@ mod tests { // Fail sending the first message fail_send.store(true, Ordering::SeqCst); - telemetry_sender - .send(system_connected_telemetry()) - .await - .unwrap(); + telemetry_sender.send(system_connected_telemetry()).unwrap(); message_receiver.try_next().unwrap_err(); tokio::time::sleep(Duration::from_millis(100)).await; // The second message is not sent because the sink is reset fail_send.store(false, Ordering::SeqCst); - telemetry_sender - .send(system_interval_telemetry(1)) - .await - .unwrap(); + telemetry_sender.send(system_interval_telemetry(1)).unwrap(); message_receiver.try_next().unwrap_err(); tokio::time::sleep(Duration::from_millis(100)).await; @@ -569,12 +556,6 @@ mod tests { ($ident:ident, $future:ident) => { #[tokio::test] async fn $ident() { - iroha_logger::init( - &iroha_logger::ConfigurationProxy::default() - .build() - .expect("Default logger config should always build"), - ) - .unwrap(); let (suite, run_handle) = Suite::new(); $future(suite).await; run_handle.await.unwrap();