From 6794f0c021dbbfcd598b785bad80ca81b34bfec7 Mon Sep 17 00:00:00 2001 From: Adrian Benavides Date: Mon, 25 Nov 2024 16:13:33 +0100 Subject: [PATCH 1/9] feat: improve logs for relay creation --- .../ockam_api/src/nodes/service/relay.rs | 28 +++++++++++-------- .../ockam/ockam_command/src/relay/create.rs | 14 ++++++---- 2 files changed, 24 insertions(+), 18 deletions(-) diff --git a/implementations/rust/ockam/ockam_api/src/nodes/service/relay.rs b/implementations/rust/ockam/ockam_api/src/nodes/service/relay.rs index af7b3429da1..58713f74fc6 100644 --- a/implementations/rust/ockam/ockam_api/src/nodes/service/relay.rs +++ b/implementations/rust/ockam/ockam_api/src/nodes/service/relay.rs @@ -130,12 +130,13 @@ impl NodeManager { pub async fn create_relay( self: &Arc, ctx: &Context, - addr: &MultiAddr, + address: &MultiAddr, alias: String, authorized: Option, relay_address: Option, return_timing: ReturnTiming, ) -> Result { + debug!(%alias, %address, ?authorized, ?relay_address, "creating relay"); if self.registry.relays.contains_key(&alias).await { let message = format!("A relay with the name '{alias}' already exists"); return Err(ockam_core::Error::new( @@ -148,11 +149,11 @@ impl NodeManager { let replacer = RelaySessionReplacer { node_manager: Arc::downgrade(self), context: ctx.async_try_clone().await?, - addr: addr.clone(), - relay_address, + addr: address.clone(), + relay_address: relay_address.clone(), connection: None, relay_worker_address: None, - authorized, + authorized: authorized.clone(), }; let mut session = Session::create(ctx, Arc::new(Mutex::new(replacer)), None).await?; @@ -182,29 +183,32 @@ impl NodeManager { session.start_monitoring().await?; - let relay_info = RelayInfo::new(addr.clone(), alias.clone(), session.connection_status()); + let relay_info = + RelayInfo::new(address.clone(), alias.clone(), session.connection_status()); let relay_info = if let Some(remote_relay_info) = remote_relay_info { - debug!( - forwarding_route = %remote_relay_info.forwarding_route(), - remote_address = %remote_relay_info.remote_address(), - "CreateRelay request processed, sending back response" - ); relay_info.with(remote_relay_info) } else { relay_info }; let registry_relay_info = RegistryRelayInfo { - destination_address: addr.clone(), + destination_address: address.clone(), alias: alias.clone(), session: Arc::new(Mutex::new(session)), }; self.registry .relays - .insert(alias, registry_relay_info.clone()) + .insert(alias.clone(), registry_relay_info.clone()) .await; + info!( + %alias, %address, ?authorized, ?relay_address, + forwarding_route = ?relay_info.forwarding_route(), + remote_address = ?relay_info.remote_address(), + "relay created" + ); + Ok(relay_info) } diff --git a/implementations/rust/ockam/ockam_command/src/relay/create.rs b/implementations/rust/ockam/ockam_command/src/relay/create.rs index aad706aa7d7..479110f6693 100644 --- a/implementations/rust/ockam/ockam_command/src/relay/create.rs +++ b/implementations/rust/ockam/ockam_command/src/relay/create.rs @@ -4,7 +4,7 @@ use std::str::FromStr; use clap::Args; use colorful::Colorful; use miette::{miette, IntoDiagnostic}; -use tracing::info; +use tracing::debug; use ockam::identity::Identifier; use ockam::Context; @@ -91,6 +91,8 @@ impl Command for CreateCommand { let alias = cmd.relay_name(); let return_timing = cmd.return_timing(); + // let _notification_handler = NotificationHandler::start(&opts.state, opts.terminal.clone()); + let node = BackgroundNodeClient::create(ctx, &opts.state, &cmd.to).await?; let relay_info = { if at.starts_with(Project::CODE) && cmd.authorized.is_some() { @@ -98,7 +100,11 @@ impl Command for CreateCommand { "--authorized can not be used with project addresses" ))?; }; - info!("creating a relay at {} to {}", at, node.node_name()); + debug!( + "sending request to {} to create a relay at {}", + node.node_name(), + at + ); let pb = opts.terminal.spinner(); if let Some(pb) = pb.as_ref() { pb.set_message(format!( @@ -123,10 +129,8 @@ impl Command for CreateCommand { let plain = { let from = color_primary(&at); let to = color_primary(format!("/node/{}", &node.node_name())); - fmt_ok!("Relay will be created automatically from {from} → {to} as soon as a connection can be established.") }; - opts.terminal .stdout() .plain(plain) @@ -166,11 +170,9 @@ impl Command for CreateCommand { let plain = { let from = color_primary(&at); let to = color_primary(format!("/node/{}", &node.node_name())); - fmt_warn!("A relay was created at {to} but failed to connect to {from}\n") + &fmt_info!("It will retry to connect automatically") }; - opts.terminal .stdout() .plain(plain) From 41e36decc9e6844ab7c1bc226012104d587064ac Mon Sep 17 00:00:00 2001 From: Adrian Benavides Date: Mon, 25 Nov 2024 16:14:07 +0100 Subject: [PATCH 2/9] feat: improve stdout output for `node create` --- .../rust/ockam/ockam_command/src/util/foreground_args.rs | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/implementations/rust/ockam/ockam_command/src/util/foreground_args.rs b/implementations/rust/ockam/ockam_command/src/util/foreground_args.rs index 3ffe1182fd9..e4a58d3749b 100644 --- a/implementations/rust/ockam/ockam_command/src/util/foreground_args.rs +++ b/implementations/rust/ockam/ockam_command/src/util/foreground_args.rs @@ -1,6 +1,7 @@ use crate::CommandGlobalOpts; use clap::Args; use colorful::Colorful; +use ockam_abac::expr::and; use ockam_api::{fmt_log, fmt_warn}; use std::io; use std::io::Read; @@ -43,7 +44,8 @@ pub async fn wait_for_exit_signal( let _ = tx.blocking_send(()); info!("Exit signal received"); if !is_child_process { - let _ = terminal.write_line(fmt_warn!("Exit signal received")); + let _ = + terminal.write_line("\n".to_string() + &fmt_warn!("Exit signal received")); } processed = true } @@ -70,9 +72,9 @@ pub async fn wait_for_exit_signal( debug!("waiting for exit signal"); - if !args.child_process { + if !args.child_process && opts.terminal.is_tty() { opts.terminal.write_line("")?; - opts.terminal.write_line(fmt_log!("{}", msg))?; + opts.terminal.write(fmt_log!("{}", msg))?; } // Wait for signal SIGINT, SIGTERM, SIGHUP or EOF; or for the tx to be closed. From 60b9140101403ff109aa68138a8a5501637e6254 Mon Sep 17 00:00:00 2001 From: Adrian Benavides Date: Mon, 25 Nov 2024 17:13:39 +0100 Subject: [PATCH 3/9] feat: improve logs for tcp portals creation --- .../src/tcp_interceptor/workers/processor.rs | 4 +- .../ockam_api/src/nodes/connection/project.rs | 3 +- .../ockam_api/src/nodes/service/manager.rs | 15 ++-- .../src/nodes/service/secure_channel.rs | 8 +- .../nodes/service/tcp_inlets/node_manager.rs | 30 +++++--- .../service/tcp_inlets/session_replacer.rs | 5 +- .../src/nodes/service/tcp_outlets.rs | 14 ++-- .../src/node/create/foreground.rs | 19 ++--- .../rust/ockam/ockam_command/src/node/show.rs | 11 ++- .../ockam_command/src/util/foreground_args.rs | 1 - .../credentials/credentials_verification.rs | 33 ++++++-- .../remote_retriever/remote_retriever.rs | 73 +++++++++--------- .../src/models/credential_and_purpose_key.rs | 1 - .../ockam_identity/src/models/timestamp.rs | 9 +++ .../src/models/utils/credentials.rs | 18 +++++ .../handshake/handshake_worker.rs | 58 ++++++-------- .../src/context/context_lifecycle.rs | 7 +- .../ockam/ockam_node/src/router/shutdown.rs | 16 ++-- .../rust/ockam/ockam_transport_tcp/src/lib.rs | 5 +- .../src/portal/addresses.rs | 7 ++ .../src/portal/portal_worker.rs | 76 +++++++------------ .../src/transport/portals.rs | 4 +- .../src/workers/receiver.rs | 4 +- .../ockam_transport_tcp/src/workers/sender.rs | 4 +- .../src/workers/receiver.rs | 4 +- .../ockam_transport_uds/src/workers/sender.rs | 2 +- .../src/workers/receiver.rs | 2 +- 27 files changed, 235 insertions(+), 198 deletions(-) diff --git a/examples/rust/mitm_node/src/tcp_interceptor/workers/processor.rs b/examples/rust/mitm_node/src/tcp_interceptor/workers/processor.rs index a558b294098..eca9639e430 100644 --- a/examples/rust/mitm_node/src/tcp_interceptor/workers/processor.rs +++ b/examples/rust/mitm_node/src/tcp_interceptor/workers/processor.rs @@ -7,7 +7,7 @@ use ockam_node::Context; use tokio::io::AsyncWriteExt; use tokio::net::tcp::OwnedWriteHalf; use tokio::{io::AsyncReadExt, net::tcp::OwnedReadHalf}; -use tracing::{debug, info}; +use tracing::debug; pub(crate) struct TcpMitmProcessor { address_of_other_processor: Address, @@ -83,7 +83,7 @@ impl Processor for TcpMitmProcessor { let len = match self.read_half.read(&mut buf).await { Ok(l) if l != 0 => l, _ => { - info!("Connection was closed; dropping stream {}", ctx.address()); + debug!("Connection was closed; dropping stream {}", ctx.address()); let _ = ctx.stop_processor(self.address_of_other_processor.clone()).await; diff --git a/implementations/rust/ockam/ockam_api/src/nodes/connection/project.rs b/implementations/rust/ockam/ockam_api/src/nodes/connection/project.rs index 375bea40dbc..5571cf0359f 100644 --- a/implementations/rust/ockam/ockam_api/src/nodes/connection/project.rs +++ b/implementations/rust/ockam/ockam_api/src/nodes/connection/project.rs @@ -53,7 +53,7 @@ impl Instantiator for ProjectInstantiator { let (project_multiaddr, project_identifier) = node_manager.resolve_project(&project).await?; - debug!(addr = %project_multiaddr, "creating secure channel"); + debug!(to = %project_multiaddr, identifier = %project_identifier, "creating secure channel"); let transport_res = RemoteMultiaddrResolver::new( Some(node_manager.tcp_transport.clone()), None, // We can't connect to the project node via UDP atm @@ -67,7 +67,6 @@ impl Instantiator for ProjectInstantiator { )) })?; - debug!("create a secure channel to the project {project_identifier}"); let sc = node_manager .create_secure_channel_internal( ctx, diff --git a/implementations/rust/ockam/ockam_api/src/nodes/service/manager.rs b/implementations/rust/ockam/ockam_api/src/nodes/service/manager.rs index b7e7b4ebef2..cadddc423be 100644 --- a/implementations/rust/ockam/ockam_api/src/nodes/service/manager.rs +++ b/implementations/rust/ockam/ockam_api/src/nodes/service/manager.rs @@ -298,13 +298,13 @@ impl NodeManager { pub async fn make_connection( &self, ctx: &Context, - addr: &MultiAddr, + address: &MultiAddr, identifier: Identifier, authorized: Option, timeout: Option, ) -> ockam_core::Result { let authorized = authorized.map(|authorized| vec![authorized]); - self.connect(ctx, addr, identifier, authorized, timeout) + self.connect(ctx, address, identifier, authorized, timeout) .await } @@ -313,13 +313,13 @@ impl NodeManager { async fn connect( &self, ctx: &Context, - addr: &MultiAddr, + address: &MultiAddr, identifier: Identifier, authorized: Option>, timeout: Option, ) -> ockam_core::Result { - debug!(?timeout, "connecting to {}", &addr); - let connection = ConnectionBuilder::new(addr.clone()) + debug!(%address, ?timeout, "connecting"); + let connection = ConnectionBuilder::new(address.clone()) .instantiate( ctx, self, @@ -333,13 +333,12 @@ impl NodeManager { .instantiate( ctx, self, - SecureChannelInstantiator::new(&identifier, timeout, authorized), + SecureChannelInstantiator::new(&identifier, timeout, authorized.clone()), ) .await? .build(); connection.add_default_consumers(ctx); - - debug!("connected to {connection:?}"); + info!(%address, %identifier, ?authorized, "connection established"); Ok(connection) } diff --git a/implementations/rust/ockam/ockam_api/src/nodes/service/secure_channel.rs b/implementations/rust/ockam/ockam_api/src/nodes/service/secure_channel.rs index bf14ddc3b62..697c01980a1 100644 --- a/implementations/rust/ockam/ockam_api/src/nodes/service/secure_channel.rs +++ b/implementations/rust/ockam/ockam_api/src/nodes/service/secure_channel.rs @@ -210,7 +210,7 @@ impl NodeManager { timeout: Option, secure_channel_type: SecureChannelType, ) -> Result { - debug!(%sc_route, "Creating secure channel"); + debug!(route = %sc_route, %identifier, "initiating secure channel"); let options = SecureChannelOptions::new(); let options = if let Some(timeout) = timeout { @@ -224,8 +224,8 @@ impl NodeManager { None => options, }; - let options = if let Some(credential) = credential { - options.with_credential(credential)? + let options = if let Some(credential) = credential.as_ref() { + options.with_credential(credential.clone())? } else { match self.credential_retriever_creators.project_member.as_ref() { None => options, @@ -251,7 +251,7 @@ impl NodeManager { .create_secure_channel(ctx, identifier, sc_route.clone(), options) .await?; - debug!(%sc_route, %sc, "Created secure channel"); + info!(route = %sc_route, %identifier, "secure channel initiated"); self.registry .secure_channels diff --git a/implementations/rust/ockam/ockam_api/src/nodes/service/tcp_inlets/node_manager.rs b/implementations/rust/ockam/ockam_api/src/nodes/service/tcp_inlets/node_manager.rs index e801b3079e4..37667189307 100644 --- a/implementations/rust/ockam/ockam_api/src/nodes/service/tcp_inlets/node_manager.rs +++ b/implementations/rust/ockam/ockam_api/src/nodes/service/tcp_inlets/node_manager.rs @@ -26,10 +26,10 @@ impl NodeManager { pub async fn create_inlet( self: &Arc, ctx: &Context, - listen_addr: HostnamePort, + listen_address: HostnamePort, prefix_route: Route, suffix_route: Route, - outlet_addr: MultiAddr, + outlet_address: MultiAddr, alias: String, policy_expression: Option, wait_for_outlet_duration: Option, @@ -42,16 +42,15 @@ impl NodeManager { privileged: bool, tls_certificate_provider: Option, ) -> Result { - info!("Handling request to create inlet portal"); debug! { - listen_addr = %listen_addr, + %listen_address, prefix = %prefix_route, suffix = %suffix_route, - outlet_addr = %outlet_addr, + %outlet_address, %alias, %enable_udp_puncture, %disable_tcp_fallback, - "Creating inlet portal" + "creating inlet" } let udp_transport = if enable_udp_puncture { @@ -69,8 +68,8 @@ impl NodeManager { // the port could be zero, to simplify the following code we // resolve the address to a full socket address let socket_addr = - ockam_node::compat::asynchronous::resolve_peer(listen_addr.to_string()).await?; - let listen_addr = if listen_addr.port() == 0 { + ockam_node::compat::asynchronous::resolve_peer(listen_address.to_string()).await?; + let listen_addr = if listen_address.port() == 0 { get_free_address_for(&socket_addr.ip().to_string()) .map_err(|err| ockam_core::Error::new(Origin::Transport, Kind::Invalid, err))? } else { @@ -113,7 +112,7 @@ impl NodeManager { udp_transport, context: ctx.async_try_clone().await?, listen_addr: listen_addr.to_string(), - outlet_addr: outlet_addr.clone(), + outlet_addr: outlet_address.clone(), prefix_route, suffix_route, authorized, @@ -141,7 +140,7 @@ impl NodeManager { .create_tcp_inlet( &self.node_name, &listen_addr, - &outlet_addr, + &outlet_address, &alias, privileged, ) @@ -190,7 +189,7 @@ impl NodeManager { alias.clone(), InletInfo::new( &listen_addr.to_string(), - outlet_addr.clone(), + outlet_address.clone(), session, privileged, ), @@ -206,10 +205,17 @@ impl NodeManager { None, outcome.clone().map(|s| s.route.to_string()), connection_status, - outlet_addr.to_string(), + outlet_address.to_string(), privileged, ); + info! { + %listen_address, + %outlet_address, + %alias, + "inlet created" + } + Ok(tcp_inlet_status) } diff --git a/implementations/rust/ockam/ockam_api/src/nodes/service/tcp_inlets/session_replacer.rs b/implementations/rust/ockam/ockam_api/src/nodes/service/tcp_inlets/session_replacer.rs index 9fb91fe0052..a3e87076109 100644 --- a/implementations/rust/ockam/ockam_api/src/nodes/service/tcp_inlets/session_replacer.rs +++ b/implementations/rust/ockam/ockam_api/src/nodes/service/tcp_inlets/session_replacer.rs @@ -203,6 +203,9 @@ impl InletSessionReplacer { }; self.main_route = Some(normalized_stripped_route); + info!(address = ?inlet_address, + route = %self.main_route.as_ref().map(|r| r.to_string()).unwrap_or("None".to_string()), + "tcp inlet restored"); Ok(ReplacerOutcome { ping_route: transport_route, @@ -273,7 +276,7 @@ impl SessionReplacer for InletSessionReplacer { Err(ApiError::core("timeout")) } Ok(Err(e)) => { - warn!(%self.outlet_addr, err = %e, "error creating new tcp inlet"); + warn!(%self.outlet_addr, err = %e, "failed to create tcp inlet"); Err(e) } Ok(Ok(route)) => Ok(route), diff --git a/implementations/rust/ockam/ockam_api/src/nodes/service/tcp_outlets.rs b/implementations/rust/ockam/ockam_api/src/nodes/service/tcp_outlets.rs index 822ae25a424..32d69f69e6c 100644 --- a/implementations/rust/ockam/ockam_api/src/nodes/service/tcp_outlets.rs +++ b/implementations/rust/ockam/ockam_api/src/nodes/service/tcp_outlets.rs @@ -88,7 +88,6 @@ impl NodeManagerWorker { } impl NodeManager { - #[instrument(skip(self, ctx))] #[allow(clippy::too_many_arguments)] #[instrument(skip_all)] pub async fn create_outlet( @@ -107,10 +106,7 @@ impl NodeManager { .generate_worker_addr(worker_addr) .await; - info!( - "Handling request to create outlet portal to {to} with worker {:?}", - worker_addr - ); + debug!(%to, address = %worker_addr, "creating outlet"); // Check registry for a duplicated key if self.registry.outlets.contains_key(&worker_addr).await { @@ -192,10 +188,12 @@ impl NodeManager { OutletInfo::new(to.clone(), Some(&worker_addr), privileged), ) .await; - - self.cli_state + let outlet = self + .cli_state .create_tcp_outlet(&self.node_name, &to, &worker_addr, &None, privileged) - .await? + .await?; + info!(%to, address = %worker_addr, "outlet created"); + outlet } Err(e) => { warn!(at = %to, err = %e, "Failed to create TCP outlet"); diff --git a/implementations/rust/ockam/ockam_command/src/node/create/foreground.rs b/implementations/rust/ockam/ockam_command/src/node/create/foreground.rs index 1151d88f5b1..7f96039a573 100644 --- a/implementations/rust/ockam/ockam_command/src/node/create/foreground.rs +++ b/implementations/rust/ockam/ockam_command/src/node/create/foreground.rs @@ -144,17 +144,18 @@ impl CreateCommand { } async fn start_services(&self, ctx: &Context, opts: &CommandGlobalOpts) -> miette::Result<()> { - // Wait until the node is fully started - let mut node = - BackgroundNodeClient::create(ctx, &opts.state, &Some(self.name.clone())).await?; - if !is_node_up(ctx, &mut node, true).await? { - return Err(miette!( - "Couldn't start services because the node is not up" - )); - } - if let Some(config) = &self.launch_configuration { if let Some(startup_services) = &config.startup_services { + // Wait until the node is fully started + let mut node = + BackgroundNodeClient::create(ctx, &opts.state, &Some(self.name.clone())) + .await?; + if !is_node_up(ctx, &mut node, true).await? { + return Err(miette!( + "Couldn't start services because the node is not up" + )); + } + if let Some(cfg) = startup_services.secure_channel_listener.clone() { if !cfg.disabled { opts.terminal diff --git a/implementations/rust/ockam/ockam_command/src/node/show.rs b/implementations/rust/ockam/ockam_command/src/node/show.rs index 518d2400454..52c9938ae01 100644 --- a/implementations/rust/ockam/ockam_command/src/node/show.rs +++ b/implementations/rust/ockam/ockam_command/src/node/show.rs @@ -166,6 +166,15 @@ pub async fn is_node_up( ) -> Result { debug!("waiting for node to be up"); let node_name = node.node_name(); + // Check if node is already up and running to skip the accessible/ready checks + if let Ok(status) = node + .ask_with_timeout::<(), NodeStatus>(ctx, api::query_status(), Duration::from_secs(1)) + .await + { + if status.status.is_running() { + return Ok(true); + } + } if !is_node_accessible(ctx, node, wait_until_ready).await? { warn!(%node_name, "the node was not accessible in time"); return Ok(false); @@ -233,7 +242,7 @@ async fn is_node_ready( if let Ok(node_status) = result { if node_status.process_status.is_running() { let elapsed = now.elapsed(); - info!(%node_name, ?elapsed, "node is ready {:?}", node_status); + info!(%node_name, ?elapsed, "node is ready"); return Ok(true); } else { trace!(%node_name, "node is initializing"); diff --git a/implementations/rust/ockam/ockam_command/src/util/foreground_args.rs b/implementations/rust/ockam/ockam_command/src/util/foreground_args.rs index e4a58d3749b..58e1f723c9d 100644 --- a/implementations/rust/ockam/ockam_command/src/util/foreground_args.rs +++ b/implementations/rust/ockam/ockam_command/src/util/foreground_args.rs @@ -1,7 +1,6 @@ use crate::CommandGlobalOpts; use clap::Args; use colorful::Colorful; -use ockam_abac::expr::and; use ockam_api::{fmt_log, fmt_warn}; use std::io; use std::io::Read; diff --git a/implementations/rust/ockam/ockam_identity/src/credentials/credentials_verification.rs b/implementations/rust/ockam/ockam_identity/src/credentials/credentials_verification.rs index e530cb14433..919c8355922 100644 --- a/implementations/rust/ockam/ockam_identity/src/credentials/credentials_verification.rs +++ b/implementations/rust/ockam/ockam_identity/src/credentials/credentials_verification.rs @@ -1,4 +1,4 @@ -use tracing::{debug, warn}; +use tracing::{debug, info, warn}; use ockam_core::compat::collections::BTreeMap; use ockam_core::compat::sync::Arc; @@ -192,28 +192,47 @@ impl CredentialsVerification { authorities: &[Identifier], credential_and_purpose_key_attestation: &CredentialAndPurposeKey, ) -> Result<()> { - let credential_data = self + let credential = self .verify_credential( Some(subject), authorities, credential_and_purpose_key_attestation, ) .await?; + let credential_data = credential.credential_data; + let purpose_key_data = credential.purpose_key_data; - let map = credential_data.credential_data.subject_attributes.map; - let map: BTreeMap<_, _> = map + let attributes_display = credential_data.get_attributes_display(); + let attributes: BTreeMap<_, _> = credential_data + .subject_attributes + .map .into_iter() .map(|(k, v)| (Vec::::from(k), Vec::::from(v))) .collect(); + info! { + %subject, + attributes = attributes_display, + schema = %credential_data.subject_attributes.schema.0, + created_at = %credential_data.created_at, + expires_at = %credential_data.expires_at, + "presented credential" + } + debug! { + subject = %purpose_key_data.subject, + created_at = %purpose_key_data.created_at, + expires_at = %purpose_key_data.expires_at, + "presented credential - purpose key attestation" + } + self.identities_attributes_repository .put_attributes( subject, AttributesEntry::new( - map, + attributes, now()?, - Some(credential_data.credential_data.expires_at), - Some(credential_data.purpose_key_data.subject), + Some(credential_data.expires_at), + Some(purpose_key_data.subject), ), ) .await?; diff --git a/implementations/rust/ockam/ockam_identity/src/credentials/retriever/remote_retriever/remote_retriever.rs b/implementations/rust/ockam/ockam_identity/src/credentials/retriever/remote_retriever/remote_retriever.rs index 0b438217bea..747cec80fe0 100644 --- a/implementations/rust/ockam/ockam_identity/src/credentials/retriever/remote_retriever/remote_retriever.rs +++ b/implementations/rust/ockam/ockam_identity/src/credentials/retriever/remote_retriever/remote_retriever.rs @@ -256,20 +256,17 @@ impl RemoteCredentialRetriever { /// into EncryptorWorker's own internal mailbox which it will use as a trigger to get a new /// credential and present it to the other side. fn schedule_credentials_refresh_impl(&self, refresh_in: Duration, is_retry: bool) { - let is_retry_str = if is_retry { " retry " } else { " " }; - info!( - "Scheduling background credentials refresh{}from {} in {} seconds", - is_retry_str, - self.issuer_info.issuer, - refresh_in.as_secs() - ); - + debug!(issuer=%self.issuer_info.issuer, is_retry, + "Scheduling background credentials refresh in {} seconds", + refresh_in.as_secs()); self.request_new_credential_in_background(refresh_in, is_retry); } } impl RemoteCredentialRetriever { async fn get_new_credential(&self) -> Result<()> { + debug!(subject=%self.subject, issuer=%self.issuer_info.issuer, + "retrieving a new credential"); let cache = self .secure_channels .identities @@ -286,7 +283,7 @@ impl RemoteCredentialRetriever { self.timing_options.request_timeout, ); - let credential = client + let credential: CredentialAndPurposeKey = client .ask( &self.ctx, &self.issuer_info.service_address, @@ -298,10 +295,23 @@ impl RemoteCredentialRetriever { .await? .success()?; - info!( - "Retrieved a new credential for {} from {}", - self.subject, &self.issuer_info.route - ); + let credential_data = credential.get_credential_data()?; + let attributes = credential_data.get_attributes_display(); + let purpose_key_data = credential.purpose_key_attestation.get_attestation_data()?; + info! { + subject = %self.subject, + %attributes, + schema = %credential_data.subject_attributes.schema.0, + created_at = %credential_data.created_at, + expires_at = %credential_data.expires_at, + "retrieved credential" + } + debug! { + subject = %purpose_key_data.subject, + created_at = %purpose_key_data.created_at, + expires_at = %purpose_key_data.expires_at, + "retrieved credential - purpose key attestation" + } let credential_and_purpose_key_data = self .secure_channels @@ -316,7 +326,7 @@ impl RemoteCredentialRetriever { .await?; let expires_at = credential_and_purpose_key_data.credential_data.expires_at; - trace!("The retrieved credential is valid"); + trace!("the retrieved credential is valid"); *self.last_presented_credential.write().unwrap() = Some(LastPresentedCredential { credential: credential.clone(), @@ -334,10 +344,8 @@ impl RemoteCredentialRetriever { .await; if let Some(err) = caching_res.err() { - error!( - "Error caching credential for {} from {}. Err={}", - self.subject, &self.issuer_info.issuer, err - ); + error!(subject=%self.subject, issuer=%self.issuer_info.issuer, %err, + "error caching credential"); } self.notify_subscribers().await?; @@ -351,29 +359,20 @@ impl RemoteCredentialRetriever { fn request_new_credential_in_background(&self, wait: Duration, is_retry: bool) { let s = self.clone(); ockam_node::spawn(async move { - let is_retry_str = if is_retry { " retry " } else { " " }; - info!( - "Scheduled background credentials refresh{}from {} in {} seconds", - is_retry_str, - s.issuer_info.issuer, - wait.as_secs() - ); + info!(issuer=%s.issuer_info.issuer, is_retry, + "scheduled background credentials refresh in {} seconds", + wait.as_secs()); s.ctx .sleep_long_until(*now().unwrap() + wait.as_secs()) .await; - info!( - "Executing background credentials refresh{}from {}", - is_retry_str, s.issuer_info.issuer, - ); - let res = s.get_new_credential().await; - - if let Some(err) = res.err() { - error!( - "Error refreshing credential for {} in the background: {}", - s.subject, err - ); - + debug!(issuer=%s.issuer_info.issuer, is_retry, + "executing background credentials refresh"); + if let Some(err) = s.get_new_credential().await.err() { + error!(subject=%s.subject, is_retry, %err, + "error refreshing credential in the background"); s.schedule_credentials_refresh(now().unwrap(), true); + } else { + debug!(issuer=%s.issuer_info.issuer, is_retry, "credentials refreshed"); } }); } diff --git a/implementations/rust/ockam/ockam_identity/src/models/credential_and_purpose_key.rs b/implementations/rust/ockam/ockam_identity/src/models/credential_and_purpose_key.rs index 99767c5c686..e274f6faae4 100644 --- a/implementations/rust/ockam/ockam_identity/src/models/credential_and_purpose_key.rs +++ b/implementations/rust/ockam/ockam_identity/src/models/credential_and_purpose_key.rs @@ -1,5 +1,4 @@ use minicbor::{CborLen, Decode, Encode}; - use ockam_core::compat::string::String; use ockam_core::compat::vec::Vec; use ockam_core::errcode::{Kind, Origin}; diff --git a/implementations/rust/ockam/ockam_identity/src/models/timestamp.rs b/implementations/rust/ockam/ockam_identity/src/models/timestamp.rs index 56bb45d805b..63c8a676a91 100644 --- a/implementations/rust/ockam/ockam_identity/src/models/timestamp.rs +++ b/implementations/rust/ockam/ockam_identity/src/models/timestamp.rs @@ -1,3 +1,4 @@ +use core::fmt::Display; use minicbor::{CborLen, Decode, Encode}; use serde::{Deserialize, Serialize}; @@ -7,3 +8,11 @@ use serde::{Deserialize, Serialize}; #[cbor(transparent)] #[serde(transparent)] pub struct TimestampInSeconds(#[n(0)] pub u64); + +impl Display for TimestampInSeconds { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + // Convert to human-readable time + let date = chrono::DateTime::from_timestamp(self.0 as i64, 0).ok_or(core::fmt::Error)?; + write!(f, "{}", date) + } +} diff --git a/implementations/rust/ockam/ockam_identity/src/models/utils/credentials.rs b/implementations/rust/ockam/ockam_identity/src/models/utils/credentials.rs index 8d33d0eedeb..216a23eb390 100644 --- a/implementations/rust/ockam/ockam_identity/src/models/utils/credentials.rs +++ b/implementations/rust/ockam/ockam_identity/src/models/utils/credentials.rs @@ -1,6 +1,8 @@ use crate::models::{CredentialData, CredentialSignature, VersionedData, CREDENTIAL_DATA_TYPE}; use crate::{Credential, IdentityError}; +use ockam_core::compat::str; +use ockam_core::compat::string::String; use ockam_core::compat::vec::Vec; use ockam_core::Result; use ockam_vault::Signature; @@ -34,6 +36,22 @@ impl CredentialData { Ok(minicbor::decode(&versioned_data.data)?) } + + /// Return the credential's attributes as a displayable string + pub fn get_attributes_display(&self) -> String { + self.subject_attributes + .map + .iter() + .map(|(k, v)| { + format!( + "{}={}", + str::from_utf8(k).unwrap_or("**binary**"), + str::from_utf8(v).unwrap_or("**binary**") + ) + }) + .collect::>() + .join(";") + } } impl From for Signature { diff --git a/implementations/rust/ockam/ockam_identity/src/secure_channel/handshake/handshake_worker.rs b/implementations/rust/ockam/ockam_identity/src/secure_channel/handshake/handshake_worker.rs index 40c8b3383a2..fee6d540456 100644 --- a/implementations/rust/ockam/ockam_identity/src/secure_channel/handshake/handshake_worker.rs +++ b/implementations/rust/ockam/ockam_identity/src/secure_channel/handshake/handshake_worker.rs @@ -74,11 +74,7 @@ impl Worker for HandshakeWorker { if let Some(state_machine) = self.state_machine.as_mut() { match state_machine.on_event(Initialize).await? { SendMessage(message) => { - debug!( - "remote route {:?}, decryptor remote {:?}", - self.remote_route.clone(), - self.addresses.decryptor_remote.clone() - ); + debug!(remote_route=?self.remote_route, decryptor_remote=%self.addresses.decryptor_remote, "sending message"); context .send_from_address( self.remote_route()?, @@ -216,10 +212,7 @@ impl HandshakeWorker { .start(context) .await?; - debug!( - "Starting SecureChannel {} at remote: {}, local: {}", - role, addresses.decryptor_remote, addresses.encryptor - ); + debug!(decryptor=%my_identifier, encryptor=%addresses.encryptor, "starting SecureChannel {role}"); // before sending messages make sure that the handshake is finished and // the encryptor worker is ready @@ -232,11 +225,18 @@ impl HandshakeWorker { match res { Ok(their_identifier) => Some(their_identifier), Err(err) => { - error!( - "Timeout {:?} or error reached when creating secure channel for: {}. Encryptor: {}. Error: {err:?}", - timeout, my_identifier, addresses.encryptor - ); - + match err.code().kind { + Kind::Timeout => { + warn!(?timeout, identifier=%my_identifier, encryptor=%addresses.encryptor, + "timeout reached when creating secure channel", + ); + } + _ => { + error!(identifier=%my_identifier, encryptor=%addresses.encryptor, ?err, + "failed to create secure channel", + ); + } + } return Err(err); } } @@ -460,10 +460,9 @@ impl HandshakeWorker { .await; info!( - "Initialized SecureChannel {} at local: {}, remote: {}", + local = %self.addresses.encryptor, remote = %self.addresses.decryptor_remote, + "initialized SecureChannel {}", self.role.str(), - &self.addresses.encryptor, - &self.addresses.decryptor_remote ); let their_decryptor_address = self @@ -493,10 +492,8 @@ impl HandshakeWorker { async fn persist(&self, their_identifier: Identifier, decryption_key: &AeadSecretKeyHandle) { let Some(repository) = &self.secure_channel_repository else { - info!( - "Skipping persistence. Local: {}, Remote: {}", - self.addresses.encryptor, &self.addresses.decryptor_remote - ); + debug!(local = %self.addresses.encryptor, remote = %self.addresses.decryptor_remote, + "Skipping persistence. No repository provided"); return; }; @@ -510,17 +507,12 @@ impl HandshakeWorker { ); match repository.put(sc).await { Ok(_) => { - info!( - "Successfully persisted secure channel. Local: {}, Remote: {}", - self.addresses.encryptor, &self.addresses.decryptor_remote, - ); + info!(local = %self.addresses.encryptor, remote = %self.addresses.decryptor_remote, + "Successfully persisted secure channel"); } Err(err) => { - warn!( - "Error while persisting secure channel: {err}. Local: {}, Remote: {}", - self.addresses.encryptor, &self.addresses.decryptor_remote - ); - + warn!(local = %self.addresses.encryptor, remote = %self.addresses.decryptor_remote, %err, + "Error while persisting secure channel"); return; } } @@ -533,10 +525,8 @@ impl HandshakeWorker { .persist_aead_key(decryption_key) .await { - warn!( - "Error persisting secure channel key: {err}. Local: {}, Remote: {}", - self.addresses.encryptor, &self.addresses.decryptor_remote - ); + warn!(local = %self.addresses.encryptor, remote = %self.addresses.decryptor_remote, %err, + "Error persisting secure channel key"); }; } diff --git a/implementations/rust/ockam/ockam_node/src/context/context_lifecycle.rs b/implementations/rust/ockam/ockam_node/src/context/context_lifecycle.rs index 148bf3b091d..09ad207a75a 100644 --- a/implementations/rust/ockam/ockam_node/src/context/context_lifecycle.rs +++ b/implementations/rust/ockam/ockam_node/src/context/context_lifecycle.rs @@ -32,9 +32,10 @@ pub type AsyncDropSender = tokio::sync::oneshot::Sender
; impl Drop for Context { fn drop(&mut self) { if let Some(sender) = self.async_drop_sender.take() { - trace!("De-allocated detached context {}", self.address()); - if let Err(e) = sender.send(self.address()) { - warn!("Encountered error while dropping detached context: {}", e); + trace!(address=%self.address(), "de-allocated detached context"); + if let Err(err) = sender.send(self.address()) { + warn!(address=%self.address(), %err, + "couldn't drop detached context"); } } } diff --git a/implementations/rust/ockam/ockam_node/src/router/shutdown.rs b/implementations/rust/ockam/ockam_node/src/router/shutdown.rs index 5d426d70464..599cd259388 100644 --- a/implementations/rust/ockam/ockam_node/src/router/shutdown.rs +++ b/implementations/rust/ockam/ockam_node/src/router/shutdown.rs @@ -63,11 +63,11 @@ impl Router { #[cfg_attr(not(feature = "std"), allow(unused_variables))] pub(super) async fn graceful( router: &mut Router, - seconds: u8, + timeout: u8, reply: SmallSender, ) -> Result { // Mark the router as shutting down to prevent spawning - info!("Initiate graceful node shutdown"); + debug!("initiating graceful node shutdown"); // This changes the router state to `Stopping` router.state.shutdown(reply); @@ -75,11 +75,11 @@ pub(super) async fn graceful( let mut cluster = vec![]; for rec in router.map.non_cluster_workers().iter_mut() { if let Some(first_address) = rec.address_set().first().cloned() { - debug!("Stopping address {}", first_address); + debug!("stopping address {}", first_address); rec.stop().await?; cluster.push(first_address); } else { - error!("Empty Address Set during graceful shutdown"); + error!("empty address set during graceful shutdown"); } } @@ -101,17 +101,19 @@ pub(super) async fn graceful( use tokio::{task, time}; let sender = router.sender(); - let dur = Duration::from_secs(seconds as u64); + let dur = Duration::from_secs(timeout as u64); task::spawn(async move { time::sleep(dur).await; - warn!("Shutdown timeout reached; aborting node!"); + warn!(%timeout, "shutdown timeout reached; aborting node!"); // This works only because the state of the router is `Stopping` if sender.send(NodeMessage::AbortNode).await.is_err() { - error!("Failed to send node abort signal to router"); + warn!("failed to send node abort signal to router"); } }); } + info!("node was shutdown gracefully"); + // Return but DO NOT stop the router Ok(false) } diff --git a/implementations/rust/ockam/ockam_transport_tcp/src/lib.rs b/implementations/rust/ockam/ockam_transport_tcp/src/lib.rs index dc5893bc184..72d4507a7d4 100644 --- a/implementations/rust/ockam/ockam_transport_tcp/src/lib.rs +++ b/implementations/rust/ockam/ockam_transport_tcp/src/lib.rs @@ -11,11 +11,10 @@ )] #![cfg_attr(not(feature = "std"), no_std)] -#[cfg(feature = "std")] -extern crate core; - #[cfg(feature = "alloc")] extern crate alloc; +#[cfg(feature = "std")] +extern crate core; mod options; mod portal; diff --git a/implementations/rust/ockam/ockam_transport_tcp/src/portal/addresses.rs b/implementations/rust/ockam/ockam_transport_tcp/src/portal/addresses.rs index cbb8c9aa637..eb1e4fbfe86 100644 --- a/implementations/rust/ockam/ockam_transport_tcp/src/portal/addresses.rs +++ b/implementations/rust/ockam/ockam_transport_tcp/src/portal/addresses.rs @@ -1,3 +1,4 @@ +use core::fmt::Display; use ockam_core::Address; /// Enumerate all portal types @@ -27,6 +28,12 @@ impl PortalType { } } +impl Display for PortalType { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + write!(f, "{}", self.str()) + } +} + #[derive(Clone, Debug)] pub(crate) struct Addresses { /// Used to receive messages from the corresponding receiver `receiver_internal` Address diff --git a/implementations/rust/ockam/ockam_transport_tcp/src/portal/portal_worker.rs b/implementations/rust/ockam/ockam_transport_tcp/src/portal/portal_worker.rs index 15519b8d47b..b06e24bfe97 100644 --- a/implementations/rust/ockam/ockam_transport_tcp/src/portal/portal_worker.rs +++ b/implementations/rust/ockam/ockam_transport_tcp/src/portal/portal_worker.rs @@ -149,11 +149,7 @@ impl TcpPortalWorker { } else { PortalType::Outlet }; - info!( - "Creating new {:?} at sender remote: {}", - portal_type.str(), - addresses.sender_remote - ); + debug!(%portal_type, sender_remote=%addresses.sender_remote, %is_tls, "creating portal worker"); let (rx, tx) = match streams { // A TcpStream is provided in case of an inlet @@ -163,7 +159,6 @@ impl TcpPortalWorker { } None => (None, None), }; - debug!("The {} supports TLS: {}", portal_type.str(), is_tls); let worker = Self { registry, @@ -276,9 +271,8 @@ impl TcpPortalWorker { .await?; debug!( - "Notified the other side from {:?} at: {} about connection drop", - self.portal_type.str(), - self.addresses.sender_internal + portal_type = %self.portal_type, sender_internal = %self.addresses.sender_internal, + "notified the other side of portal that the connection is dropped", ); } @@ -292,11 +286,8 @@ impl TcpPortalWorker { .await .is_ok() { - debug!( - "{:?} at: {} stopped receiver due to connection drop", - self.portal_type.str(), - self.addresses.sender_internal - ); + debug!(portal_type = %self.portal_type, sender_internal = %self.addresses.sender_internal, + "stopped receiver due to connection drop"); } Ok(()) @@ -355,11 +346,8 @@ impl TcpPortalWorker { } } - info!( - "{:?} at: {} stopped due to connection drop", - self.portal_type.str(), - self.addresses.sender_internal - ); + debug!(portal_type = %self.portal_type, sender_internal = %self.addresses.sender_internal, + "stopped due to connection drop"); Ok(()) } @@ -374,7 +362,7 @@ impl TcpPortalWorker { ) .await?; - debug!("Inlet at: {} sent ping", self.addresses.sender_internal); + debug!(portal_type = %self.portal_type, sender_internal = %self.addresses.sender_internal, "sent ping"); Ok(State::ReceivePong) } @@ -386,12 +374,12 @@ impl TcpPortalWorker { return Err(TransportError::PortalInvalidState)?; } if self.is_tls { - debug!("Connect to {} via TLS", &self.hostname_port); + debug!(portal_type = %self.portal_type, sender_internal = %self.addresses.sender_internal, "connect to {} via TLS", &self.hostname_port); let (rx, tx) = connect_tls(&self.hostname_port).await?; self.write_half = Some(WriteHalfWithTls(tx)); self.read_half = Some(ReadHalfWithTls(rx)); } else { - debug!("Connect to {}", self.hostname_port); + debug!(portal_type = %self.portal_type, sender_internal = %self.addresses.sender_internal, "connect to {}", self.hostname_port); let (rx, tx) = connect(&self.hostname_port).await?; self.write_half = Some(WriteHalfNoTls(tx)); self.read_half = Some(ReadHalfNoTls(rx)); @@ -409,12 +397,7 @@ impl TcpPortalWorker { self.start_receiver(ctx, pong_route.clone()).await?; - debug!( - "Outlet at: {} successfully connected", - self.addresses.sender_internal - ); - - debug!("Outlet at: {} sent pong", self.addresses.sender_internal); + debug!(portal_type = %self.portal_type, sender_internal = %self.addresses.sender_internal, "sent pong"); self.remote_route = Some(pong_route); Ok(State::Initialized) @@ -445,6 +428,10 @@ impl Worker for TcpPortalWorker { self.registry .add_portal_worker(&self.addresses.sender_remote); + info!(portal_type = %self.portal_type, sender_internal = %self.addresses.sender_internal, + "tcp portal worker initialized" + ); + Ok(()) } @@ -483,7 +470,7 @@ impl Worker for TcpPortalWorker { if their_identifier != self.their_identifier { debug!( - "Identifier changed from {:?} to {:?}", + "identifier changed from {:?} to {:?}", self.their_identifier.as_ref().map(|i| i.to_string()), their_identifier.as_ref().map(|i| i.to_string()), ); @@ -505,11 +492,9 @@ impl Worker for TcpPortalWorker { self.handle_receive_pong(ctx, return_route).await } State::Initialized => { - trace!( - "{:?} at: {} received {} tcp packet", - self.portal_type.str(), - self.addresses.sender_internal, - if remote_packet { "remote" } else { "internal " } + trace!(portal_type = %self.portal_type, sender_internal = %self.addresses.sender_internal, + "received {} tcp packet", + if remote_packet { "remote" } else { "internal " }, ); if remote_packet { @@ -523,9 +508,7 @@ impl Worker for TcpPortalWorker { self.start_disconnection(ctx, DisconnectionReason::Remote) .await } - PortalMessage::Ping | PortalMessage::Pong => { - return Err(TransportError::Protocol)?; - } + PortalMessage::Ping | PortalMessage::Pong => Err(TransportError::Protocol)?, } } else { let msg = PortalInternalMessage::decode(&payload)?; @@ -536,7 +519,7 @@ impl Worker for TcpPortalWorker { } } State::SendPing { .. } | State::SendPong { .. } => { - return Err(TransportError::PortalInvalidState)?; + Err(TransportError::PortalInvalidState)? } } } @@ -546,7 +529,7 @@ impl TcpPortalWorker { #[instrument(skip_all)] async fn handle_receive_pong(&mut self, ctx: &Context, return_route: Route) -> Result<()> { self.start_receiver(ctx, return_route.clone()).await?; - debug!("Inlet at: {} received pong", self.addresses.sender_internal); + debug!(portal_type = %self.portal_type, sender_internal = %self.addresses.sender_internal, "received pong"); self.remote_route = Some(return_route); self.state = State::Initialized; Ok(()) @@ -554,11 +537,8 @@ impl TcpPortalWorker { #[instrument(skip_all)] async fn handle_disconnect(&mut self, ctx: &Context) -> Result<()> { - info!( - "Tcp stream was dropped for {:?} at: {}", - self.portal_type.str(), - self.addresses.sender_internal - ); + info!(portal_type = %self.portal_type, sender_internal = %self.addresses.sender_internal, + "tcp stream was dropped"); self.start_disconnection(ctx, DisconnectionReason::FailedRx) .await } @@ -583,9 +563,9 @@ impl TcpPortalWorker { WriteHalfWithTls(tx) => tx.write_all(payload).await, }; if let Err(err) = result { - warn!( - "Failed to send message to peer {} with error: {}", - self.hostname_port, err + warn!(portal_type = %self.portal_type, %err, + "failed to send message to peer {} with error", + self.hostname_port ); self.start_disconnection(ctx, DisconnectionReason::FailedTx) .await?; @@ -608,7 +588,7 @@ impl TcpPortalWorker { }; if packet_counter != expected_counter { - warn!( + warn!(portal_type = %self.portal_type, "Received packet with counter {} while expecting {}, disconnecting", packet_counter, expected_counter ); diff --git a/implementations/rust/ockam/ockam_transport_tcp/src/transport/portals.rs b/implementations/rust/ockam/ockam_transport_tcp/src/transport/portals.rs index 9909ff6549d..2da8a436fc4 100644 --- a/implementations/rust/ockam/ockam_transport_tcp/src/transport/portals.rs +++ b/implementations/rust/ockam/ockam_transport_tcp/src/transport/portals.rs @@ -9,7 +9,7 @@ use ockam_core::{route, Address, Result, Route}; use ockam_node::compat::asynchronous::RwLock; use ockam_node::Context; use ockam_transport_core::{parse_socket_addr, HostnamePort}; -use tracing::instrument; +use tracing::{debug, instrument}; impl TcpTransport { /// Create Tcp Inlet that listens on bind_addr, transforms Tcp stream into Ockam Routable @@ -237,8 +237,8 @@ impl TcpInlet { /// Pause TCP Inlet, all incoming TCP streams will be dropped. pub async fn pause(&self) { + debug!(address = %self.socket_address, "pausing inlet"); let mut inlet_shared_state = self.inlet_shared_state.write().await; - inlet_shared_state.set_is_paused(true); } diff --git a/implementations/rust/ockam/ockam_transport_tcp/src/workers/receiver.rs b/implementations/rust/ockam/ockam_transport_tcp/src/workers/receiver.rs index 13b8e969319..0f5f077d346 100644 --- a/implementations/rust/ockam/ockam_transport_tcp/src/workers/receiver.rs +++ b/implementations/rust/ockam/ockam_transport_tcp/src/workers/receiver.rs @@ -16,7 +16,7 @@ use ockam_core::{Processor, Result}; use ockam_node::{Context, ProcessorBuilder}; use ockam_transport_core::TransportError; use tokio::{io::AsyncReadExt, net::tcp::OwnedReadHalf}; -use tracing::{info, instrument, trace}; +use tracing::{debug, instrument, trace}; /// A TCP receiving message processor /// @@ -99,7 +99,7 @@ impl TcpRecvProcessor { } async fn notify_sender_stream_dropped(&self, ctx: &Context, msg: impl Display) -> Result<()> { - info!( + debug!( "Connection to peer '{}' was closed; dropping stream. {}", self.socket_address, msg ); diff --git a/implementations/rust/ockam/ockam_transport_tcp/src/workers/sender.rs b/implementations/rust/ockam/ockam_transport_tcp/src/workers/sender.rs index 9f5cbb0e9a1..3c77e4a9e64 100644 --- a/implementations/rust/ockam/ockam_transport_tcp/src/workers/sender.rs +++ b/implementations/rust/ockam/ockam_transport_tcp/src/workers/sender.rs @@ -14,7 +14,7 @@ use ockam_transport_core::TransportError; use serde::{Deserialize, Serialize}; use tokio::io::AsyncWriteExt; use tokio::net::tcp::OwnedWriteHalf; -use tracing::{info, instrument, trace, warn}; +use tracing::{debug, instrument, trace, warn}; #[derive(Serialize, Deserialize, Message, Clone)] pub(crate) enum TcpSendWorkerMsg { @@ -223,7 +223,7 @@ impl Worker for TcpSendWorker { match msg { TcpSendWorkerMsg::ConnectionClosed => { - info!( + debug!( "Stopping sender due to closed connection {}", self.socket_address ); diff --git a/implementations/rust/ockam/ockam_transport_uds/src/workers/receiver.rs b/implementations/rust/ockam/ockam_transport_uds/src/workers/receiver.rs index bee1fd79e97..8d97ce15837 100644 --- a/implementations/rust/ockam/ockam_transport_uds/src/workers/receiver.rs +++ b/implementations/rust/ockam/ockam_transport_uds/src/workers/receiver.rs @@ -6,7 +6,7 @@ use ockam_core::{ use ockam_node::Context; use ockam_transport_core::TransportError; use tokio::{io::AsyncReadExt, net::unix::OwnedReadHalf}; -use tracing::{error, info, trace}; +use tracing::{debug, error, trace}; /// A UDS receiving message processor /// @@ -48,7 +48,7 @@ impl Processor for UdsRecvProcessor { let len = match self.rx.read_u16().await { Ok(len) => len, Err(_e) => { - info!( + debug!( "Connection to peer '{}' was closed; dropping stream", self.peer_addr ); diff --git a/implementations/rust/ockam/ockam_transport_uds/src/workers/sender.rs b/implementations/rust/ockam/ockam_transport_uds/src/workers/sender.rs index 86ee9e74fa6..61858cbc30b 100644 --- a/implementations/rust/ockam/ockam_transport_uds/src/workers/sender.rs +++ b/implementations/rust/ockam/ockam_transport_uds/src/workers/sender.rs @@ -258,7 +258,7 @@ impl Worker for UdsSendWorker { match msg { UdsSendWorkerMsg::ConnectionClosed => { - warn!("Stopping sender due to closed connection"); + debug!("Stopping sender due to closed connection"); // No need to stop Receiver as it notified us about connection drop and will // stop itself self.rx_should_be_stopped = false; diff --git a/implementations/rust/ockam/ockam_transport_websocket/src/workers/receiver.rs b/implementations/rust/ockam/ockam_transport_websocket/src/workers/receiver.rs index 5c7e1b06d6e..1b255bb0ade 100644 --- a/implementations/rust/ockam/ockam_transport_websocket/src/workers/receiver.rs +++ b/implementations/rust/ockam/ockam_transport_websocket/src/workers/receiver.rs @@ -58,7 +58,7 @@ where Some(res) => match res { Ok(ws_msg) => ws_msg, Err(_e) => { - info!( + debug!( "Connection to peer '{}' was closed; dropping stream", self.peer_addr ); From 2b11f4852ae67bfe8517ba5f43aac9c8bb107fa3 Mon Sep 17 00:00:00 2001 From: Adrian Benavides Date: Wed, 11 Dec 2024 08:06:36 +0100 Subject: [PATCH 4/9] feat(rust): add a custom log format to change the fields order --- Cargo.lock | 1 + .../rust/ockam/ockam_api/Cargo.toml | 1 + .../ockam_api/src/cli_state/cli_state.rs | 1 - .../ockam_api/src/logs/logging_options.rs | 157 +++++++++++++++++- .../rust/ockam/ockam_api/src/logs/setup.rs | 23 ++- .../ockam/ockam_api/tests/logging_tracing.rs | 7 +- .../ockam_command/src/command_global_opts.rs | 2 +- .../src/node/create/foreground.rs | 12 +- 8 files changed, 188 insertions(+), 16 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 53386d53223..3105c31220f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4753,6 +4753,7 @@ dependencies = [ "mockall", "multimap", "nix 0.29.0", + "nu-ansi-term 0.50.1", "ockam", "ockam_abac", "ockam_core", diff --git a/implementations/rust/ockam/ockam_api/Cargo.toml b/implementations/rust/ockam/ockam_api/Cargo.toml index 3d043f0299e..c29b06e3cfb 100644 --- a/implementations/rust/ockam/ockam_api/Cargo.toml +++ b/implementations/rust/ockam/ockam_api/Cargo.toml @@ -78,6 +78,7 @@ log = "0.4" miette = { version = "7.2.0", features = ["fancy-no-backtrace"] } minicbor = { version = "0.25.1", default-features = false, features = ["alloc", "derive"] } nix = { version = "0.29", features = ["signal"] } +nu-ansi-term = "0.50" once_cell = { version = "1", default-features = false } open = "5.3.0" opentelemetry = { version = "0.26.0", features = ["logs", "metrics", "trace"] } diff --git a/implementations/rust/ockam/ockam_api/src/cli_state/cli_state.rs b/implementations/rust/ockam/ockam_api/src/cli_state/cli_state.rs index d4b4775b4f0..3301aaaa9b9 100644 --- a/implementations/rust/ockam/ockam_api/src/cli_state/cli_state.rs +++ b/implementations/rust/ockam/ockam_api/src/cli_state/cli_state.rs @@ -118,7 +118,6 @@ impl CliState { } fn notify(&self, notification: Notification) { - debug!("{:?}", notification.contents()); let _ = self.notifications.send(notification); } } diff --git a/implementations/rust/ockam/ockam_api/src/logs/logging_options.rs b/implementations/rust/ockam/ockam_api/src/logs/logging_options.rs index 0af21ff0143..0c3eaff181e 100644 --- a/implementations/rust/ockam/ockam_api/src/logs/logging_options.rs +++ b/implementations/rust/ockam/ockam_api/src/logs/logging_options.rs @@ -1,6 +1,11 @@ +use nu_ansi_term::{Color, Style}; use ockam_core::env::FromString; use ockam_core::errcode::{Kind, Origin}; -use std::fmt::{Display, Formatter}; +use std::fmt::{Debug, Display, Formatter}; +use tracing_core::{Event, Level, Subscriber}; +use tracing_subscriber::fmt::format::Writer; +use tracing_subscriber::fmt::{FmtContext, FormatEvent, FormatFields, FormattedFields}; +use tracing_subscriber::registry::LookupSpan; #[derive(Debug, PartialEq, Eq, Copy, Clone)] pub enum LoggingEnabled { @@ -85,8 +90,8 @@ impl FromString for LogFormat { } } -impl std::fmt::Display for LogFormat { - fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { +impl Display for LogFormat { + fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { match self { LogFormat::Default => write!(f, "default"), LogFormat::Pretty => write!(f, "pretty"), @@ -94,3 +99,149 @@ impl std::fmt::Display for LogFormat { } } } + +#[derive(Default)] +pub struct OckamLogFormat {} + +impl OckamLogFormat { + pub fn new() -> Self { + Self {} + } + + fn format_timestamp(&self, writer: &mut Writer<'_>) -> std::fmt::Result { + let now = chrono::Utc::now().format("%Y-%m-%d %H:%M:%S%.3f"); + if writer.has_ansi_escapes() { + let style = Style::new().dimmed(); + write!(writer, "{}", style.prefix())?; + write!(writer, "{}", now)?; + write!(writer, "{} ", style.suffix())?; + } else { + write!(writer, "{}", now)?; + writer.write_char(' ')?; + } + Ok(()) + } +} + +impl FormatEvent for OckamLogFormat +where + S: Subscriber + for<'a> LookupSpan<'a>, + N: for<'a> FormatFields<'a> + 'static, +{ + fn format_event( + &self, + ctx: &FmtContext<'_, S, N>, + mut writer: Writer<'_>, + event: &Event<'_>, + ) -> std::fmt::Result { + let meta = event.metadata(); + let dimmed = if writer.has_ansi_escapes() { + Style::new().dimmed() + } else { + Style::new() + }; + let bold = if writer.has_ansi_escapes() { + Style::new().bold() + } else { + Style::new() + }; + + // Timestamp + self.format_timestamp(&mut writer)?; + + // Level + let fmt_level = FmtLevel::new(meta.level(), writer.has_ansi_escapes()); + write!(writer, "{} ", fmt_level)?; + + // Event + ctx.format_fields(writer.by_ref(), event)?; + writer.write_char(' ')?; + + // Scope + if let Some(scope) = ctx.event_scope() { + let mut seen = false; + + for span in scope.from_root() { + write!(writer, "{}", bold.paint(span.metadata().name()))?; + seen = true; + + let ext = span.extensions(); + if let Some(fields) = &ext.get::>() { + if !fields.is_empty() { + write!(writer, "{}{}{}", bold.paint("{"), fields, bold.paint("}"))?; + } + } + write!(writer, "{}", dimmed.paint(":"))?; + } + + if seen { + writer.write_char(' ')?; + } + }; + + // Target + write!(writer, "{} ", dimmed.paint(meta.target()))?; + + // File and line + let line_number = meta.line(); + if let Some(filename) = meta.file() { + write!( + writer, + "{}{}{}", + dimmed.paint(filename), + dimmed.paint(":"), + if line_number.is_some() { "" } else { " " } + )?; + } + if let Some(line_number) = line_number { + write!( + writer, + "{}{}{}", + dimmed.prefix(), + line_number, + dimmed.suffix() + )?; + } + + writeln!(writer) + } +} + +struct FmtLevel<'a> { + level: &'a Level, + ansi: bool, +} + +impl<'a> FmtLevel<'a> { + pub(crate) fn new(level: &'a Level, ansi: bool) -> Self { + Self { level, ansi } + } +} + +const TRACE_STR: &str = "TRACE"; +const DEBUG_STR: &str = "DEBUG"; +const INFO_STR: &str = " INFO"; +const WARN_STR: &str = " WARN"; +const ERROR_STR: &str = "ERROR"; + +impl Display for FmtLevel<'_> { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + if self.ansi { + match *self.level { + Level::TRACE => write!(f, "{}", Color::Purple.paint(TRACE_STR)), + Level::DEBUG => write!(f, "{}", Color::Blue.paint(DEBUG_STR)), + Level::INFO => write!(f, "{}", Color::Green.paint(INFO_STR)), + Level::WARN => write!(f, "{}", Color::Yellow.paint(WARN_STR)), + Level::ERROR => write!(f, "{}", Color::Red.paint(ERROR_STR)), + } + } else { + match *self.level { + Level::TRACE => f.pad(TRACE_STR), + Level::DEBUG => f.pad(DEBUG_STR), + Level::INFO => f.pad(INFO_STR), + Level::WARN => f.pad(WARN_STR), + Level::ERROR => f.pad(ERROR_STR), + } + } + } +} diff --git a/implementations/rust/ockam/ockam_api/src/logs/setup.rs b/implementations/rust/ockam/ockam_api/src/logs/setup.rs index 8a3715bc98c..080b2463209 100644 --- a/implementations/rust/ockam/ockam_api/src/logs/setup.rs +++ b/implementations/rust/ockam/ockam_api/src/logs/setup.rs @@ -30,6 +30,7 @@ use ockam_node::Executor; use crate::logs::tracing_guard::TracingGuard; use crate::logs::{ ExportingConfiguration, GlobalErrorHandler, LoggingConfiguration, OckamLogExporter, + OckamLogFormat, }; use crate::logs::{LogFormat, OckamSpanExporter}; @@ -109,7 +110,9 @@ impl LoggingTracing { let result = match logging_configuration.format() { LogFormat::Pretty => layers.with(appender.pretty()).try_init(), LogFormat::Json => layers.with(appender.json()).try_init(), - LogFormat::Default => layers.with(appender).try_init(), + LogFormat::Default => layers + .with(appender.event_format(OckamLogFormat::new())) + .try_init(), }; result.expect("Failed to initialize tracing subscriber"); @@ -130,7 +133,9 @@ impl LoggingTracing { let result = match logging_configuration.format() { LogFormat::Pretty => layers.with(appender.pretty()).try_init(), LogFormat::Json => layers.with(appender.json()).try_init(), - LogFormat::Default => layers.with(appender).try_init(), + LogFormat::Default => layers + .with(appender.event_format(OckamLogFormat::new())) + .try_init(), }; result.expect("Failed to initialize tracing subscriber"); }; @@ -230,7 +235,7 @@ fn create_opentelemetry_tracing_layer< exporting_configuration: &ExportingConfiguration, span_exporter: S, ) -> ( - OpenTelemetryLayer, + OpenTelemetryLayer, opentelemetry_sdk::trace::TracerProvider, ) { let app = app_name.to_string(); @@ -243,7 +248,8 @@ fn create_opentelemetry_tracing_layer< let is_ockam_developer = exporting_configuration.is_ockam_developer(); let span_export_cutoff = exporting_configuration.span_export_cutoff(); Executor::execute_future(async move { - let trace_config = sdk::trace::Config::default().with_resource(make_resource(app)); + let trace_config = + opentelemetry_sdk::trace::Config::default().with_resource(make_resource(app)); let (tracer, tracer_provider) = create_tracer( trace_config, batch_config, @@ -410,11 +416,14 @@ fn set_global_error_handler(logging_configuration: &LoggingConfiguration) { /// Create a Tracer using the provided span exporter fn create_tracer( - trace_config: sdk::trace::Config, + trace_config: opentelemetry_sdk::trace::Config, batch_config: BatchConfig, exporter: S, -) -> (sdk::trace::Tracer, opentelemetry_sdk::trace::TracerProvider) { - let span_processor = BatchSpanProcessor::builder(exporter, sdk::runtime::Tokio) +) -> ( + opentelemetry_sdk::trace::Tracer, + opentelemetry_sdk::trace::TracerProvider, +) { + let span_processor = BatchSpanProcessor::builder(exporter, opentelemetry_sdk::runtime::Tokio) .with_batch_config(batch_config) .build(); let provider = opentelemetry_sdk::trace::TracerProvider::builder() diff --git a/implementations/rust/ockam/ockam_api/tests/logging_tracing.rs b/implementations/rust/ockam/ockam_api/tests/logging_tracing.rs index 85d94986c4e..12c010df860 100644 --- a/implementations/rust/ockam/ockam_api/tests/logging_tracing.rs +++ b/implementations/rust/ockam/ockam_api/tests/logging_tracing.rs @@ -10,8 +10,9 @@ use opentelemetry_sdk as sdk; use sdk::testing::logs::*; use sdk::testing::trace::*; +use opentelemetry_sdk::testing::logs::InMemoryLogsExporter; +use opentelemetry_sdk::testing::trace::InMemorySpanExporter; use std::fs; - use tempfile::NamedTempFile; use ockam_api::cli_state::{random_name, CliStateMode}; @@ -81,12 +82,12 @@ fn test_log_and_traces() { if file_path.to_string_lossy().contains("stdout") { let contents = fs::read_to_string(file_path).unwrap(); assert!( - contents.contains("INFO logging_tracing: inside span"), + contents.contains("INFO inside span logging_tracing"), "{:?}", contents ); assert!( - contents.contains("ERROR logging_tracing: something went wrong!"), + contents.contains("ERROR something went wrong! logging_tracing"), "{:?}", contents ); diff --git a/implementations/rust/ockam/ockam_command/src/command_global_opts.rs b/implementations/rust/ockam/ockam_command/src/command_global_opts.rs index 89a201d8427..a65f7d85c5d 100644 --- a/implementations/rust/ockam/ockam_command/src/command_global_opts.rs +++ b/implementations/rust/ockam/ockam_command/src/command_global_opts.rs @@ -185,7 +185,7 @@ impl CommandGlobalOpts { debug!("Arguments: {}", arguments.join(" ")); debug!("Global arguments: {:#?}", &global_args); debug!("Command: {:#?}", &cmd); - debug!("Version: {}", Version::new()); + debug!("Version: {}", Version::new().no_color()); info!("Tracing initialized"); debug!("{:#?}", logging_configuration); diff --git a/implementations/rust/ockam/ockam_command/src/node/create/foreground.rs b/implementations/rust/ockam/ockam_command/src/node/create/foreground.rs index 7f96039a573..a6dbab4941a 100644 --- a/implementations/rust/ockam/ockam_command/src/node/create/foreground.rs +++ b/implementations/rust/ockam/ockam_command/src/node/create/foreground.rs @@ -53,7 +53,17 @@ impl CreateCommand { // Set node_name so that node can isolate its data in the storage from other nodes self.get_or_create_identity(&opts, &self.identity).await?; - let _notification_handler = NotificationHandler::start(&opts.state, opts.terminal.clone()); + let _notification_handler = if self.foreground_args.child_process { + // If enabled, the user's terminal will receive notifications + // from the node after the command exited. + None + } else { + // Enable the notifications only on explicit foreground nodes. + Some(NotificationHandler::start( + &opts.state, + opts.terminal.clone(), + )) + }; let node_info = opts .state .start_node_with_optional_values( From 33a79697cce486f31da4ad0752580079b221999a Mon Sep 17 00:00:00 2001 From: Adrian Benavides Date: Thu, 12 Dec 2024 11:21:30 +0100 Subject: [PATCH 5/9] chore(rust): add some more logs to the secure-channel --- .../purpose_keys/purpose_key_verification.rs | 7 ++- .../src/secure_channel/encryptor_worker.rs | 48 ++++++++++++++----- .../handshake/handshake_worker.rs | 18 ++++++- 3 files changed, 59 insertions(+), 14 deletions(-) diff --git a/implementations/rust/ockam/ockam_identity/src/purpose_keys/purpose_key_verification.rs b/implementations/rust/ockam/ockam_identity/src/purpose_keys/purpose_key_verification.rs index b0dca3d4901..566656d70cc 100644 --- a/implementations/rust/ockam/ockam_identity/src/purpose_keys/purpose_key_verification.rs +++ b/implementations/rust/ockam/ockam_identity/src/purpose_keys/purpose_key_verification.rs @@ -1,13 +1,14 @@ use ockam_core::compat::sync::Arc; use ockam_core::Result; use ockam_vault::VaultForVerifyingSignatures; +use tracing::trace; use crate::models::{Identifier, PurposeKeyAttestation, PurposeKeyAttestationData, VersionedData}; use crate::utils::now; use crate::{ChangeHistoryRepository, IdentitiesVerification, IdentityError, TimestampInSeconds}; /// We allow purpose keys to be created in the future related to this machine's time due to -/// possible time dyssynchronization +/// possible time desynchronization const MAX_ALLOWED_TIME_DRIFT: TimestampInSeconds = TimestampInSeconds(60); /// This struct supports all the services related to identities @@ -45,6 +46,8 @@ impl PurposeKeyVerification { expected_subject: Option<&Identifier>, attestation: &PurposeKeyAttestation, ) -> Result { + trace!(?expected_subject, "verifying purpose key attestation"); + let versioned_data_hash = self.verifying_vault.sha256(&attestation.data).await?; let versioned_data: VersionedData = minicbor::decode(&attestation.data)?; @@ -118,6 +121,8 @@ impl PurposeKeyVerification { return Err(IdentityError::PurposeKeyAttestationVerificationFailed)?; } + trace!(?expected_subject, "verified purpose key attestation"); + Ok(purpose_key_data) } } diff --git a/implementations/rust/ockam/ockam_identity/src/secure_channel/encryptor_worker.rs b/implementations/rust/ockam/ockam_identity/src/secure_channel/encryptor_worker.rs index b958fdac970..7c7f45a4177 100644 --- a/implementations/rust/ockam/ockam_identity/src/secure_channel/encryptor_worker.rs +++ b/implementations/rust/ockam/ockam_identity/src/secure_channel/encryptor_worker.rs @@ -1,6 +1,6 @@ use core::sync::atomic::{AtomicBool, Ordering}; -use tracing::{debug, error, info, warn}; +use tracing::{debug, error, info, trace, warn}; use tracing_attributes::instrument; use ockam_core::compat::boxed::Box; @@ -99,12 +99,23 @@ impl EncryptorWorker { ctx: &Context, msg: SecureChannelPaddedMessage<'static>, ) -> Result> { + trace!( + role=%self.role, + encryptor=%self.addresses.encryptor, + "encrypting message"); + let expected_len = minicbor::len(&msg); let mut destination = vec![0u8; NOISE_NONCE_LEN + expected_len + AES_GCM_TAGSIZE]; minicbor::encode(&msg, &mut destination[NOISE_NONCE_LEN..])?; match self.encryptor.encrypt(&mut destination).await { - Ok(()) => Ok(destination), + Ok(()) => { + trace!( + role=%self.role, + encryptor=%self.addresses.encryptor, + "message encrypted"); + Ok(destination) + } // If encryption failed, that means we have some internal error, // and we may be in an invalid state, it's better to stop the Worker Err(err) => { @@ -122,10 +133,10 @@ impl EncryptorWorker { ctx: &mut ::Context, msg: Routed<::Message>, ) -> Result<()> { - debug!( - "SecureChannel {} received Encrypt API {}", - self.role, &self.addresses.encryptor - ); + trace!( + role=%self.role, + encryptor=%self.addresses.encryptor, + "handling encrypt API message"); let msg = msg.into_local_message(); let return_route = msg.return_route; @@ -161,6 +172,11 @@ impl EncryptorWorker { ctx.send_from_address(return_route, response, self.addresses.encryptor_api.clone()) .await?; + trace!( + role=%self.role, + encryptor=%self.addresses.encryptor, + "sent encrypt API response"); + if should_stop { ctx.stop_worker(self.addresses.encryptor.clone()).await?; } @@ -174,10 +190,10 @@ impl EncryptorWorker { ctx: &mut ::Context, msg: Routed<::Message>, ) -> Result<()> { - debug!( - "SecureChannel {} received Encrypt {}", - self.role, &self.addresses.encryptor - ); + trace!( + role=%self.role, + encryptor=%self.addresses.encryptor, + "handling encrypt message"); let msg = msg.into_local_message(); let mut onward_route = msg.onward_route; @@ -208,6 +224,11 @@ impl EncryptorWorker { ctx.forward_from_address(msg, self.addresses.encryptor.clone()) .await?; + debug!( + role=%self.role, + encryptor=%self.addresses.encryptor, + "forwarded message to decryptor"); + Ok(()) } @@ -215,7 +236,7 @@ impl EncryptorWorker { /// the latest change_history #[instrument(skip_all)] async fn handle_refresh_credentials(&mut self, ctx: &::Context) -> Result<()> { - debug!( + trace!( "Started credentials refresh for {}", self.addresses.encryptor ); @@ -283,6 +304,11 @@ impl EncryptorWorker { ) .await?; + trace!( + role=%self.role, + encryptor=%self.addresses.encryptor, + "credentials refresh sent"); + self.last_presented_credential = Some(credential); Ok(()) diff --git a/implementations/rust/ockam/ockam_identity/src/secure_channel/handshake/handshake_worker.rs b/implementations/rust/ockam/ockam_identity/src/secure_channel/handshake/handshake_worker.rs index fee6d540456..fa0665f212f 100644 --- a/implementations/rust/ockam/ockam_identity/src/secure_channel/handshake/handshake_worker.rs +++ b/implementations/rust/ockam/ockam_identity/src/secure_channel/handshake/handshake_worker.rs @@ -11,7 +11,7 @@ use ockam_core::{Result, Worker}; use ockam_node::callback::CallbackSender; use ockam_node::{Context, WorkerBuilder}; use ockam_vault::AeadSecretKeyHandle; -use tracing::{debug, error, info, warn}; +use tracing::{debug, error, info, trace, warn}; use tracing_attributes::instrument; use crate::models::Identifier; @@ -74,7 +74,11 @@ impl Worker for HandshakeWorker { if let Some(state_machine) = self.state_machine.as_mut() { match state_machine.on_event(Initialize).await? { SendMessage(message) => { - debug!(remote_route=?self.remote_route, decryptor_remote=%self.addresses.decryptor_remote, "sending message"); + trace!( + remote_route = ?self.remote_route, + decryptor_remote = %self.addresses.decryptor_remote, + "sending message", + ); context .send_from_address( self.remote_route()?, @@ -264,6 +268,11 @@ impl HandshakeWorker { context: &mut Context, message: Routed, ) -> Result<()> { + trace!( + remote_route = ?self.remote_route, + decryptor_remote = %self.addresses.decryptor_remote, + "handling handshake message", + ); let message = message.into_local_message(); let return_route = message.return_route; let payload = message.payload; @@ -315,6 +324,11 @@ impl HandshakeWorker { /// and for decryption. #[instrument(skip_all, name = "DecryptorWorker::handle_message")] async fn handle_decrypt(&mut self, context: &mut Context, message: Routed) -> Result<()> { + trace!( + remote_route = ?self.remote_route, + decryptor_remote = %self.addresses.decryptor_remote, + "handling decrypt message", + ); let decryptor_handler = self.decryptor_handler.as_mut().unwrap(); let msg_addr = message.msg_addr(); From 48ca66897d436c99c07f7f60253164ca00b781b3 Mon Sep 17 00:00:00 2001 From: Adrian Benavides Date: Thu, 12 Dec 2024 11:31:09 +0100 Subject: [PATCH 6/9] chore(rust): logs adjustments --- .../ockam_api/src/logs/logging_options.rs | 2 +- .../rust/ockam/ockam_api/src/logs/setup.rs | 1 - .../ockam_api/src/nodes/service/relay.rs | 1 - .../ockam/ockam_api/tests/logging_tracing.rs | 5 ---- .../src/node/create/foreground.rs | 2 +- .../rust/ockam/ockam_command/src/node/show.rs | 2 +- .../ockam/ockam_command/src/relay/create.rs | 2 -- .../src/secure_channel/handshake/handshake.rs | 26 +++++++++++++++++++ .../rust/ockam/ockam_node/src/api.rs | 12 ++++++++- .../ockam_node/src/context/receive_message.rs | 2 +- 10 files changed, 41 insertions(+), 14 deletions(-) diff --git a/implementations/rust/ockam/ockam_api/src/logs/logging_options.rs b/implementations/rust/ockam/ockam_api/src/logs/logging_options.rs index 0c3eaff181e..a4cbad14bfa 100644 --- a/implementations/rust/ockam/ockam_api/src/logs/logging_options.rs +++ b/implementations/rust/ockam/ockam_api/src/logs/logging_options.rs @@ -109,7 +109,7 @@ impl OckamLogFormat { } fn format_timestamp(&self, writer: &mut Writer<'_>) -> std::fmt::Result { - let now = chrono::Utc::now().format("%Y-%m-%d %H:%M:%S%.3f"); + let now = chrono::Utc::now().format("%Y-%m-%dT%H:%M:%S%.6fZ"); if writer.has_ansi_escapes() { let style = Style::new().dimmed(); write!(writer, "{}", style.prefix())?; diff --git a/implementations/rust/ockam/ockam_api/src/logs/setup.rs b/implementations/rust/ockam/ockam_api/src/logs/setup.rs index 080b2463209..2d02ba0c43c 100644 --- a/implementations/rust/ockam/ockam_api/src/logs/setup.rs +++ b/implementations/rust/ockam/ockam_api/src/logs/setup.rs @@ -3,7 +3,6 @@ use opentelemetry::trace::TracerProvider; use opentelemetry::{global, KeyValue}; use opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge; use opentelemetry_otlp::WithExportConfig; -use opentelemetry_sdk as sdk; use opentelemetry_sdk::export::logs::LogExporter; use opentelemetry_sdk::export::trace::SpanExporter; use opentelemetry_sdk::logs::{BatchLogProcessor, LoggerProvider}; diff --git a/implementations/rust/ockam/ockam_api/src/nodes/service/relay.rs b/implementations/rust/ockam/ockam_api/src/nodes/service/relay.rs index 58713f74fc6..f51b9fcbbc0 100644 --- a/implementations/rust/ockam/ockam_api/src/nodes/service/relay.rs +++ b/implementations/rust/ockam/ockam_api/src/nodes/service/relay.rs @@ -204,7 +204,6 @@ impl NodeManager { info!( %alias, %address, ?authorized, ?relay_address, - forwarding_route = ?relay_info.forwarding_route(), remote_address = ?relay_info.remote_address(), "relay created" ); diff --git a/implementations/rust/ockam/ockam_api/tests/logging_tracing.rs b/implementations/rust/ockam/ockam_api/tests/logging_tracing.rs index 12c010df860..a142e7126ac 100644 --- a/implementations/rust/ockam/ockam_api/tests/logging_tracing.rs +++ b/implementations/rust/ockam/ockam_api/tests/logging_tracing.rs @@ -5,11 +5,6 @@ use ockam_api::logs::{ use opentelemetry::global; use opentelemetry::trace::Tracer; - -use opentelemetry_sdk as sdk; -use sdk::testing::logs::*; -use sdk::testing::trace::*; - use opentelemetry_sdk::testing::logs::InMemoryLogsExporter; use opentelemetry_sdk::testing::trace::InMemorySpanExporter; use std::fs; diff --git a/implementations/rust/ockam/ockam_command/src/node/create/foreground.rs b/implementations/rust/ockam/ockam_command/src/node/create/foreground.rs index a6dbab4941a..ea50a499708 100644 --- a/implementations/rust/ockam/ockam_command/src/node/create/foreground.rs +++ b/implementations/rust/ockam/ockam_command/src/node/create/foreground.rs @@ -54,7 +54,7 @@ impl CreateCommand { // Set node_name so that node can isolate its data in the storage from other nodes self.get_or_create_identity(&opts, &self.identity).await?; let _notification_handler = if self.foreground_args.child_process { - // If enabled, the user's terminal will receive notifications + // If enabled, the user's terminal would receive notifications // from the node after the command exited. None } else { diff --git a/implementations/rust/ockam/ockam_command/src/node/show.rs b/implementations/rust/ockam/ockam_command/src/node/show.rs index 52c9938ae01..2a0278fe924 100644 --- a/implementations/rust/ockam/ockam_command/src/node/show.rs +++ b/implementations/rust/ockam/ockam_command/src/node/show.rs @@ -171,7 +171,7 @@ pub async fn is_node_up( .ask_with_timeout::<(), NodeStatus>(ctx, api::query_status(), Duration::from_secs(1)) .await { - if status.status.is_running() { + if status.process_status.is_running() { return Ok(true); } } diff --git a/implementations/rust/ockam/ockam_command/src/relay/create.rs b/implementations/rust/ockam/ockam_command/src/relay/create.rs index 479110f6693..a27711c4774 100644 --- a/implementations/rust/ockam/ockam_command/src/relay/create.rs +++ b/implementations/rust/ockam/ockam_command/src/relay/create.rs @@ -91,8 +91,6 @@ impl Command for CreateCommand { let alias = cmd.relay_name(); let return_timing = cmd.return_timing(); - // let _notification_handler = NotificationHandler::start(&opts.state, opts.terminal.clone()); - let node = BackgroundNodeClient::create(ctx, &opts.state, &cmd.to).await?; let relay_info = { if at.starts_with(Project::CODE) && cmd.authorized.is_some() { diff --git a/implementations/rust/ockam/ockam_identity/src/secure_channel/handshake/handshake.rs b/implementations/rust/ockam/ockam_identity/src/secure_channel/handshake/handshake.rs index 1fd2a7757c9..f82e3d46912 100644 --- a/implementations/rust/ockam/ockam_identity/src/secure_channel/handshake/handshake.rs +++ b/implementations/rust/ockam/ockam_identity/src/secure_channel/handshake/handshake.rs @@ -13,6 +13,8 @@ use ockam_vault::{ X25519PublicKey, X25519SecretKeyHandle, X25519_PUBLIC_KEY_LENGTH, }; use sha2::{Digest, Sha256}; +#[cfg(feature = "debugger")] +use tracing::debug; use Status::*; /// The number of bytes in a SHA256 digest @@ -50,6 +52,9 @@ impl Handshake { /// Encode the first message, sent from the initiator to the responder pub(super) async fn encode_message1(&mut self, payload: &[u8]) -> Result> { + #[cfg(feature = "debugger")] + debug!("Encoding message 1"); + let mut state = self.state.clone(); // output e.pubKey let e_pub_key = self.get_public_key(state.e()?).await?; @@ -70,6 +75,9 @@ impl Handshake { /// Decode the first message to get the ephemeral public key sent by the initiator pub(super) async fn decode_message1(&mut self, message1: &[u8]) -> Result> { + #[cfg(feature = "debugger")] + debug!("Decoding message 2"); + if message1.len() > NOISE_MAX_MESSAGE_SIZE { return Err(XXError::ExceededMaxMessageLen)?; } @@ -93,6 +101,9 @@ impl Handshake { /// That message contains: the responder ephemeral public key + a Diffie-Hellman key + /// an encrypted payload containing the responder identity / signature / credentials pub(super) async fn encode_message2(&mut self, payload: &[u8]) -> Result> { + #[cfg(feature = "debugger")] + debug!("Encoding message 2"); + let mut state = self.state.clone(); // output e.pubKey let e_pub_key = self.get_public_key(state.e()?).await?; @@ -126,6 +137,9 @@ impl Handshake { /// Decode the second message sent by the responder pub(super) async fn decode_message2(&mut self, message2: &[u8]) -> Result> { + #[cfg(feature = "debugger")] + debug!("Decoding message 2"); + if message2.len() > NOISE_MAX_MESSAGE_SIZE { return Err(XXError::ExceededMaxMessageLen)?; } @@ -166,6 +180,9 @@ impl Handshake { /// That message contains: the initiator static public key (encrypted) + a Diffie-Hellman key + /// an encrypted payload containing the initiator identity / signature / credentials pub(super) async fn encode_message3(&mut self, payload: &[u8]) -> Result> { + #[cfg(feature = "debugger")] + debug!("Encoding message 3"); + let mut state = self.state.clone(); // encrypt s.pubKey let s_pub_key = self.get_public_key(state.s()?).await?; @@ -190,6 +207,9 @@ impl Handshake { /// Decode the third message sent by the initiator pub(super) async fn decode_message3(&mut self, message3: &[u8]) -> Result> { + #[cfg(feature = "debugger")] + debug!("Decoding message 3"); + if message3.len() > NOISE_MAX_MESSAGE_SIZE { return Err(XXError::ExceededMaxMessageLen)?; } @@ -219,6 +239,9 @@ impl Handshake { /// Set the final state of the state machine by creating the encryption / decryption keys /// and return the other party identity pub(super) async fn set_final_state(&mut self, role: Role) -> Result<()> { + #[cfg(feature = "debugger")] + debug!("Setting final state"); + // k1, k2 = HKDF(ck, zerolen, 2) let mut state = self.state.clone(); let (k1, k2) = self.compute_final_keys(&mut state).await?; @@ -239,6 +262,9 @@ impl Handshake { /// Return the final results of the handshake if we reached the final state pub(super) fn get_handshake_keys(&self) -> Option { + #[cfg(feature = "debugger")] + debug!("Getting handshake keys"); + match &self.state.status { Ready(keys) => Some(keys.clone()), _ => None, diff --git a/implementations/rust/ockam/ockam_node/src/api.rs b/implementations/rust/ockam/ockam_node/src/api.rs index 466caada4ec..baf8bd62c4f 100644 --- a/implementations/rust/ockam/ockam_node/src/api.rs +++ b/implementations/rust/ockam/ockam_node/src/api.rs @@ -132,7 +132,8 @@ impl Client { method = ?req.header().method(), path = %req.header().path(), body = %req.header().has_body(), - }; + "sending request" + } let options = if let Some(t) = timeout { MessageSendReceiveOptions::new().with_timeout(t) } else { @@ -147,6 +148,15 @@ impl Client { let local_info = resp.local_message().local_info().to_vec(); let body = resp.into_body()?; + trace! { + target: "ockam_api", + id = %req.header().id(), + method = ?req.header().method(), + path = %req.header().path(), + body = %req.header().has_body(), + "received response" + } + Ok((body, local_info)) } } diff --git a/implementations/rust/ockam/ockam_node/src/context/receive_message.rs b/implementations/rust/ockam/ockam_node/src/context/receive_message.rs index 8334d0b32c9..327ff927ff3 100644 --- a/implementations/rust/ockam/ockam_node/src/context/receive_message.rs +++ b/implementations/rust/ockam/ockam_node/src/context/receive_message.rs @@ -61,7 +61,7 @@ impl Context { pub(crate) async fn receiver_next(&mut self) -> Result> { loop { let relay_msg = if let Some(msg) = self.receiver.recv().await.map(|msg| { - trace!("{}: received new message!", self.address()); + trace!(address=%self.address(), "received new message!"); // First we update the mailbox fill metrics self.mailbox_count.fetch_sub(1, Ordering::Acquire); From 82f2debd665def4612aadfbd9eb729c505b46f2d Mon Sep 17 00:00:00 2001 From: Adrian Benavides Date: Thu, 2 Jan 2025 11:47:45 +0100 Subject: [PATCH 7/9] feat: session replacer sends notifications on session lost/replaced --- .../ockam_api/src/cli_state/cli_state.rs | 3 +- .../ockam_api/src/nodes/service/relay.rs | 26 +++++++++++++++-- .../service/tcp_inlets/session_replacer.rs | 28 +++++++++++++++++-- .../ockam/ockam_api/src/session/replacer.rs | 4 +++ .../ockam/ockam_api/src/session/session.rs | 15 +++++++--- .../ockam/ockam_api/src/ui/terminal/fmt.rs | 16 +++++------ .../ockam_api/src/ui/terminal/notification.rs | 17 +++++------ .../ockam/ockam_api/tests/common/session.rs | 8 ++++++ .../ockam/ockam_command/src/relay/create.rs | 6 ++-- .../src/run/parser/resource/traits.rs | 8 ++---- .../ockam_command/src/tcp/inlet/create.rs | 28 +++++++++---------- .../ockam/ockam_node/src/router/shutdown.rs | 2 +- 12 files changed, 111 insertions(+), 50 deletions(-) diff --git a/implementations/rust/ockam/ockam_api/src/cli_state/cli_state.rs b/implementations/rust/ockam/ockam_api/src/cli_state/cli_state.rs index 3301aaaa9b9..18e04a50169 100644 --- a/implementations/rust/ockam/ockam_api/src/cli_state/cli_state.rs +++ b/implementations/rust/ockam/ockam_api/src/cli_state/cli_state.rs @@ -44,8 +44,7 @@ pub struct CliState { database: SqlxDatabase, application_database: SqlxDatabase, exporting_enabled: ExportingEnabled, - /// Broadcast channel to be notified of major events during a process supported by the - /// CliState API + /// Broadcast channel to be notified of major events during a process supported by the CliState API notifications: Sender, } diff --git a/implementations/rust/ockam/ockam_api/src/nodes/service/relay.rs b/implementations/rust/ockam/ockam_api/src/nodes/service/relay.rs index f51b9fcbbc0..4037ed8dce7 100644 --- a/implementations/rust/ockam/ockam_api/src/nodes/service/relay.rs +++ b/implementations/rust/ockam/ockam_api/src/nodes/service/relay.rs @@ -1,6 +1,7 @@ use std::sync::{Arc, Weak}; use std::time::Duration; +use colorful::Colorful; use miette::IntoDiagnostic; use ockam::identity::models::CredentialAndPurposeKey; @@ -14,6 +15,8 @@ use ockam_multiaddr::MultiAddr; use ockam_node::compat::asynchronous::Mutex; use ockam_node::Context; +use super::{NodeManager, NodeManagerWorker}; +use crate::colors::color_primary; use crate::nodes::connection::Connection; use crate::nodes::models::relay::{CreateRelay, RelayInfo, ReturnTiming}; use crate::nodes::models::secure_channel::{ @@ -25,8 +28,7 @@ use crate::nodes::service::secure_channel::SecureChannelType; use crate::nodes::BackgroundNodeClient; use crate::session::replacer::{ReplacerOutcome, ReplacerOutputKind, SessionReplacer}; use crate::session::session::Session; - -use super::{NodeManager, NodeManagerWorker}; +use crate::{fmt_info, fmt_ok, fmt_warn}; impl NodeManagerWorker { pub async fn create_relay( @@ -374,6 +376,26 @@ impl SessionReplacer for RelaySessionReplacer { } } } + + async fn on_session_down(&self) { + if let Some(node_manager) = self.node_manager.upgrade() { + node_manager.cli_state.notify_message( + fmt_warn!( + "The Node lost the connection to the Relay at {}\n", + color_primary(&self.addr) + ) + &fmt_info!("Attempting to reconnect...\n"), + ); + } + } + + async fn on_session_replaced(&self) { + if let Some(node_manager) = self.node_manager.upgrade() { + node_manager.cli_state.notify_message(fmt_ok!( + "The Node has restored the connection to the Relay at {}\n", + color_primary(&self.addr) + )); + } + } } #[async_trait] diff --git a/implementations/rust/ockam/ockam_api/src/nodes/service/tcp_inlets/session_replacer.rs b/implementations/rust/ockam/ockam_api/src/nodes/service/tcp_inlets/session_replacer.rs index a3e87076109..294c9286b83 100644 --- a/implementations/rust/ockam/ockam_api/src/nodes/service/tcp_inlets/session_replacer.rs +++ b/implementations/rust/ockam/ockam_api/src/nodes/service/tcp_inlets/session_replacer.rs @@ -1,11 +1,10 @@ -use crate::nodes::service::certificate_provider::ProjectCertificateProvider; use ockam_transport_tcp::new_certificate_provider_cache; use std::sync::{Arc, Weak}; use std::time::Duration; +use colorful::Colorful; use tokio::time::timeout; -use crate::DefaultAddress; use ockam::identity::{Identifier, SecureChannel}; use ockam::tcp::TcpInletOptions; use ockam::udp::{UdpPuncture, UdpPunctureNegotiation, UdpTransport}; @@ -18,14 +17,17 @@ use ockam_multiaddr::MultiAddr; use ockam_node::Context; use ockam_transport_tcp::TcpInlet; +use crate::colors::color_primary; use crate::error::ApiError; use crate::nodes::connection::Connection; +use crate::nodes::service::certificate_provider::ProjectCertificateProvider; use crate::nodes::service::SecureChannelType; use crate::nodes::NodeManager; use crate::session::replacer::{ AdditionalSessionReplacer, CurrentInletStatus, ReplacerOutcome, ReplacerOutputKind, SessionReplacer, MAX_RECOVERY_TIME, }; +use crate::{fmt_info, fmt_ok, fmt_warn, DefaultAddress}; pub(super) struct InletSessionReplacer { pub(super) node_manager: Weak, @@ -296,6 +298,28 @@ impl SessionReplacer for InletSessionReplacer { self.close_inlet().await; self.close_connection(&node_manager).await; } + + async fn on_session_down(&self) { + if let Some(node_manager) = self.node_manager.upgrade() { + node_manager.cli_state.notify_message( + fmt_warn!( + "The TCP Inlet at {} lost the connection to the TCP Outlet at {}\n", + color_primary(&self.listen_addr), + color_primary(&self.outlet_addr) + ) + &fmt_info!("Attempting to reconnect...\n"), + ); + } + } + + async fn on_session_replaced(&self) { + if let Some(node_manager) = self.node_manager.upgrade() { + node_manager.cli_state.notify_message(fmt_ok!( + "The TCP Inlet at {} has restored the connection to the TCP Outlet at {}\n", + color_primary(&self.listen_addr), + color_primary(&self.outlet_addr) + )); + } + } } #[async_trait] diff --git a/implementations/rust/ockam/ockam_api/src/session/replacer.rs b/implementations/rust/ockam/ockam_api/src/session/replacer.rs index 9ce755f4b99..55fdccf643e 100644 --- a/implementations/rust/ockam/ockam_api/src/session/replacer.rs +++ b/implementations/rust/ockam/ockam_api/src/session/replacer.rs @@ -14,6 +14,10 @@ pub trait SessionReplacer: Send + Sync + 'static { async fn create(&mut self) -> Result; async fn close(&mut self); + + async fn on_session_down(&self); + + async fn on_session_replaced(&self); } #[async_trait] diff --git a/implementations/rust/ockam/ockam_api/src/session/session.rs b/implementations/rust/ockam/ockam_api/src/session/session.rs index 0beebe7f158..9e8425a67a3 100644 --- a/implementations/rust/ockam/ockam_api/src/session/session.rs +++ b/implementations/rust/ockam/ockam_api/src/session/session.rs @@ -490,8 +490,10 @@ impl Session { sleep(ping_interval).await; } + // The session is down, or we reached the maximum number of failures _ => { - // We reached the maximum number of failures + let mut replacer = shared_state.replacer.lock().await; + if first_creation && !initial_connect_was_called { debug!(key = %key, "session is down. starting"); first_creation = false; @@ -499,6 +501,10 @@ impl Session { warn!(key = %key, "session unresponsive. replacing"); } + if !first_creation && pings.len() > 0 { + replacer.on_session_down().await; + } + shared_state.status.set_down(); *shared_state.last_outcome.lock().unwrap() = None; shared_state @@ -507,11 +513,12 @@ impl Session { pings.clear(); drop(pings); - let res = shared_state.replacer.lock().await.create().await; - - match res { + match replacer.create().await { Ok(replacer_outcome) => { info!(key = %key, ping_route = %replacer_outcome.ping_route, "replacement is up"); + if !first_creation { + replacer.on_session_replaced().await; + } shared_state.status.set_up(replacer_outcome.ping_route); shared_state diff --git a/implementations/rust/ockam/ockam_api/src/ui/terminal/fmt.rs b/implementations/rust/ockam/ockam_api/src/ui/terminal/fmt.rs index 855e5515235..4760661f52b 100644 --- a/implementations/rust/ockam/ockam_api/src/ui/terminal/fmt.rs +++ b/implementations/rust/ockam/ockam_api/src/ui/terminal/fmt.rs @@ -38,7 +38,7 @@ macro_rules! fmt_log { ($input:expr $(, $args:expr)* $(,)?) => { format!("{}{}", $crate::terminal::PADDING, - format!($input, $($args),+)) + format!($input, $($args),*)) }; } @@ -58,7 +58,7 @@ macro_rules! fmt_ok { "✔" .color($crate::colors::OckamColor::FmtOKBackground.color()) .bold(), - format!($input, $($args),+)) + format!($input, $($args),*)) }; } @@ -78,7 +78,7 @@ macro_rules! fmt_para { "│" .color($crate::colors::OckamColor::FmtINFOBackground.color()) .bold(), - format!($input, $($args),+)) + format!($input, $($args),*)) }; } @@ -98,7 +98,7 @@ macro_rules! fmt_list { "│" .color($crate::colors::OckamColor::FmtLISTBackground.color()) .bold(), - format!($input, $($args),+)) + format!($input, $($args),*)) }; } @@ -114,7 +114,7 @@ macro_rules! fmt_heading { ($input:expr $(, $args:expr)* $(,)?) => { format!("\n{}{}\n{}{}", $crate::terminal::PADDING, - format!($input, $($args),+), + format!($input, $($args),*), $crate::terminal::PADDING, "─".repeat($crate::terminal::get_separator_width()).dim().light_gray()) }; @@ -150,7 +150,7 @@ macro_rules! fmt_info { ">" .color($crate::colors::OckamColor::FmtINFOBackground.color()) .bold(), - format!($input, $($args),+)) + format!($input, $($args),*)) }; } @@ -170,7 +170,7 @@ macro_rules! fmt_warn { "!" .color($crate::colors::OckamColor::FmtWARNBackground.color()) .bold(), - format!($input, $($args),+)) + format!($input, $($args),*)) }; } @@ -190,6 +190,6 @@ macro_rules! fmt_err { "✗" .color($crate::colors::OckamColor::FmtERRORBackground.color()) .bold(), - format!($input, $($args),+)) + format!($input, $($args),*)) }; } diff --git a/implementations/rust/ockam/ockam_api/src/ui/terminal/notification.rs b/implementations/rust/ockam/ockam_api/src/ui/terminal/notification.rs index 0524986b588..7ab4c2d014b 100644 --- a/implementations/rust/ockam/ockam_api/src/ui/terminal/notification.rs +++ b/implementations/rust/ockam/ockam_api/src/ui/terminal/notification.rs @@ -1,15 +1,17 @@ use crate::terminal::{Terminal, TerminalWriter}; use crate::{fmt_log, CliState}; +use core::sync::atomic::AtomicBool; +use core::sync::atomic::Ordering::{Acquire, Release}; use indicatif::ProgressBar; use std::fmt::Debug; -use std::sync::{Arc, Mutex}; +use std::sync::Arc; use std::time::Duration; use tokio::select; use tokio::sync::broadcast::Receiver; use tokio::time::sleep; -const REPORTING_CHANNEL_POLL_DELAY: Duration = Duration::from_millis(100); +const REPORTING_CHANNEL_POLL_DELAY: Duration = Duration::from_millis(20); #[derive(Debug, Clone, PartialEq)] pub enum Notification { @@ -46,13 +48,12 @@ impl Notification { } pub struct NotificationHandle { - stop: Arc>, + stop: Arc, } impl Drop for NotificationHandle { fn drop(&mut self) { - let mut stop = self.stop.lock().unwrap(); - *stop = true; + self.stop.store(true, Release); } } @@ -67,14 +68,14 @@ pub struct NotificationHandler { /// User terminal terminal: Terminal, /// Flag to determine if the progress display should stop - stop: Arc>, + stop: Arc, } impl NotificationHandler { /// Create a new NotificationsProgress without progress bar. /// The notifications are printed as they arrive and stay on screen pub fn start(cli_state: &CliState, terminal: Terminal) -> NotificationHandle { - let stop = Arc::new(Mutex::new(false)); + let stop = Arc::new(AtomicBool::new(false)); let _self = NotificationHandler { rx: cli_state.subscribe_to_notifications(), terminal: terminal.clone(), @@ -90,7 +91,7 @@ impl NotificationHandler { loop { select! { _ = sleep(REPORTING_CHANNEL_POLL_DELAY) => { - if *self.stop.lock().unwrap() { + if self.stop.load(Acquire) { // Drain the channel while let Ok(notification) = self.rx.try_recv() { self.handle_notification(notification).await; diff --git a/implementations/rust/ockam/ockam_api/tests/common/session.rs b/implementations/rust/ockam/ockam_api/tests/common/session.rs index 86e2723d5be..8a25724d91e 100644 --- a/implementations/rust/ockam/ockam_api/tests/common/session.rs +++ b/implementations/rust/ockam/ockam_api/tests/common/session.rs @@ -171,6 +171,14 @@ impl SessionReplacer for MockReplacer { async fn close(&mut self) { self.close_impl() } + + async fn on_session_down(&self) { + info!("MockReplacer {} on_session_down called", self.name); + } + + async fn on_session_replaced(&self) { + info!("MockReplacer {} on_session_replaced called", self.name); + } } #[async_trait] diff --git a/implementations/rust/ockam/ockam_command/src/relay/create.rs b/implementations/rust/ockam/ockam_command/src/relay/create.rs index a27711c4774..c70db02840f 100644 --- a/implementations/rust/ockam/ockam_command/src/relay/create.rs +++ b/implementations/rust/ockam/ockam_command/src/relay/create.rs @@ -166,9 +166,9 @@ impl Command for CreateCommand { .write_line()?; } else { let plain = { - let from = color_primary(&at); - let to = color_primary(format!("/node/{}", &node.node_name())); - fmt_warn!("A relay was created at {to} but failed to connect to {from}\n") + let at = color_primary(&at); + let node = color_primary(format!("/node/{}", &node.node_name())); + fmt_warn!("A relay was created at {node} but failed to connect to {at}\n") + &fmt_info!("It will retry to connect automatically") }; opts.terminal diff --git a/implementations/rust/ockam/ockam_command/src/run/parser/resource/traits.rs b/implementations/rust/ockam/ockam_command/src/run/parser/resource/traits.rs index d4ad70f2400..564d6e12acc 100644 --- a/implementations/rust/ockam/ockam_command/src/run/parser/resource/traits.rs +++ b/implementations/rust/ockam/ockam_command/src/run/parser/resource/traits.rs @@ -71,13 +71,11 @@ impl ParsedCommands { if len > 0 { opts.terminal.write_line("")?; } - for (idx, cmd) in self.commands.into_iter().enumerate() { + for cmd in self.commands.into_iter() { if cmd.is_valid(ctx, opts).await? { cmd.run(ctx, opts).await?; - if idx < len - 1 { - // Newline between commands - opts.terminal.write_line("")?; - } + // Newline after each command + opts.terminal.write_line("")?; } } Ok(()) diff --git a/implementations/rust/ockam/ockam_command/src/tcp/inlet/create.rs b/implementations/rust/ockam/ockam_command/src/tcp/inlet/create.rs index f8ca201b158..e99047467a3 100644 --- a/implementations/rust/ockam/ockam_command/src/tcp/inlet/create.rs +++ b/implementations/rust/ockam/ockam_command/src/tcp/inlet/create.rs @@ -233,39 +233,37 @@ impl Command for CreateCommand { .await?; let created_message = format!( - "Created a new TCP Inlet in the Node {} bound to {}\n", + "Created a new TCP Inlet in the Node {} bound to {}", color_primary(&node_name), color_primary(cmd.from.to_string()), ); - let created_message = fmt_ok!("{}", created_message); - let mut plain = if cmd.no_connection_wait { - created_message - + &fmt_log!( + fmt_ok!("{created_message}\n") + + &fmt_info!( "It will automatically connect to the TCP Outlet at {} as soon as it is available\n", color_primary(&cmd.to) ) } else if inlet_status.status == ConnectionStatus::Up { - created_message + fmt_ok!("{created_message}\n") + &fmt_log!( "sending traffic to the TCP Outlet at {}\n", color_primary(&cmd.to) ) } else { - let mut msg = fmt_warn!("A TCP Inlet was created in the Node {} bound to {} but failed to connect to the TCP Outlet at {}\n", - color_primary(&node_name), - color_primary(cmd.from.to_string()), - color_primary(&cmd.to)); - - msg += &fmt_info!("It will retry to connect automatically\n"); - - msg + fmt_warn!("{created_message}\n") + + &fmt_log!( + "but it failed to connect to the TCP Outlet at {}\n", + color_primary(&cmd.to) + ) + + &fmt_info!( + "It will automatically connect to the TCP Outlet as soon as it is available\n", + ) }; if privileged { plain += &fmt_info!( - "This Inlet is operating in {} mode\n", + "This TCP Inlet is operating in {} mode\n", color_primary_alt("privileged".to_string()) ); } diff --git a/implementations/rust/ockam/ockam_node/src/router/shutdown.rs b/implementations/rust/ockam/ockam_node/src/router/shutdown.rs index 599cd259388..cfff5da9721 100644 --- a/implementations/rust/ockam/ockam_node/src/router/shutdown.rs +++ b/implementations/rust/ockam/ockam_node/src/router/shutdown.rs @@ -107,7 +107,7 @@ pub(super) async fn graceful( warn!(%timeout, "shutdown timeout reached; aborting node!"); // This works only because the state of the router is `Stopping` if sender.send(NodeMessage::AbortNode).await.is_err() { - warn!("failed to send node abort signal to router"); + warn!("couldn't send node abort signal to router"); } }); } From 92ac60f0290388515cb4b83a4eff053dea24ad71 Mon Sep 17 00:00:00 2001 From: Adrian Benavides Date: Thu, 2 Jan 2025 17:13:50 +0100 Subject: [PATCH 8/9] fix: newlines between commands run in a configuration --- .../ockam_command/src/node/create/config.rs | 33 +++++++++++++++---- .../src/run/parser/resource/traits.rs | 10 +++--- 2 files changed, 31 insertions(+), 12 deletions(-) diff --git a/implementations/rust/ockam/ockam_command/src/node/create/config.rs b/implementations/rust/ockam/ockam_command/src/node/create/config.rs index 8fa550f91d1..fcbc428e4d4 100644 --- a/implementations/rust/ockam/ockam_command/src/node/create/config.rs +++ b/implementations/rust/ockam/ockam_command/src/node/create/config.rs @@ -281,9 +281,9 @@ impl NodeConfig { self.kafka_outlet.into_parsed_commands(node_name)?.into(), self.kafka_inlet.into_parsed_commands(node_name)?.into(), ]; - for section in other_sections { - section.run(ctx, opts).await?; - } + opts.terminal.write_line("")?; + Self::run_commands_sections(ctx, opts, other_sections).await?; + opts.terminal.write_line("")?; // Block on the node until it exits let _ = node_handle.await.into_diagnostic()?; @@ -297,9 +297,8 @@ impl NodeConfig { node_name: &String, identity_name: &String, ) -> miette::Result<()> { - for section in self.parse_commands(node_name, identity_name)? { - section.run(ctx, opts).await? - } + let sections = self.parse_commands(node_name, identity_name)?; + Self::run_commands_sections(ctx, opts, sections).await?; Ok(()) } @@ -328,6 +327,28 @@ impl NodeConfig { self.kafka_inlet.into_parsed_commands(node_name)?.into(), ]) } + + async fn run_commands_sections( + ctx: &Context, + opts: &CommandGlobalOpts, + sections: Vec, + ) -> miette::Result<()> { + let sections: Vec = sections + .into_iter() + .filter(|s| !s.commands.is_empty()) + .collect(); + let len = sections.len(); + for (idx, section) in sections.into_iter().enumerate() { + if section.commands.is_empty() { + continue; + } + section.run(ctx, opts).await?; + if idx < len - 1 { + opts.terminal.write_line("")?; + } + } + Ok(()) + } } #[cfg(test)] diff --git a/implementations/rust/ockam/ockam_command/src/run/parser/resource/traits.rs b/implementations/rust/ockam/ockam_command/src/run/parser/resource/traits.rs index 564d6e12acc..56572024c5b 100644 --- a/implementations/rust/ockam/ockam_command/src/run/parser/resource/traits.rs +++ b/implementations/rust/ockam/ockam_command/src/run/parser/resource/traits.rs @@ -68,14 +68,12 @@ impl ParsedCommands { /// Validate and run each command pub async fn run(self, ctx: &Context, opts: &CommandGlobalOpts) -> Result<()> { let len = self.commands.len(); - if len > 0 { - opts.terminal.write_line("")?; - } - for cmd in self.commands.into_iter() { + for (idx, cmd) in self.commands.into_iter().enumerate() { if cmd.is_valid(ctx, opts).await? { cmd.run(ctx, opts).await?; - // Newline after each command - opts.terminal.write_line("")?; + if idx < len - 1 { + opts.terminal.write_line("")?; + } } } Ok(()) From 2ba08a4c40de6d5718c59c8767c16f54cf2a544a Mon Sep 17 00:00:00 2001 From: Adrian Benavides Date: Thu, 2 Jan 2025 17:30:35 +0100 Subject: [PATCH 9/9] chore: update relay and influxdb-inlet output to be consistent with the tcp-inlet --- .../src/influxdb/inlet/create.rs | 22 +++++++++---------- .../ockam/ockam_command/src/relay/create.rs | 2 +- 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/implementations/rust/ockam/ockam_command/src/influxdb/inlet/create.rs b/implementations/rust/ockam/ockam_command/src/influxdb/inlet/create.rs index 02e5770bd80..9e1ed2428bc 100644 --- a/implementations/rust/ockam/ockam_command/src/influxdb/inlet/create.rs +++ b/implementations/rust/ockam/ockam_command/src/influxdb/inlet/create.rs @@ -111,29 +111,29 @@ impl Command for InfluxDBCreateCommand { .add_inlet_created_event(&opts, &node_name, &inlet_status) .await?; - let created_message = fmt_ok!( - "Created a new InfluxDB Inlet in the Node {} bound to {}\n", + let created_message = format!( + "Created a new InfluxDB Inlet in the Node {} bound to {}", color_primary(&node_name), - color_primary(&self.tcp_inlet.from) + color_primary(self.tcp_inlet.from.to_string()), ); let plain = if self.tcp_inlet.no_connection_wait { - created_message + &fmt_log!("It will automatically connect to the TCP Outlet at {} as soon as it is available", + fmt_ok!("{created_message}\n") + + &fmt_log!("It will automatically connect to the InfluxDB Outlet at {} as soon as it is available\n", color_primary(&self.tcp_inlet.to) ) } else if inlet_status.status == ConnectionStatus::Up { - created_message + fmt_ok!("{created_message}\n") + &fmt_log!( - "sending traffic to the TCP Outlet at {}", + "sending traffic to the TCP Outlet at {}\n", color_primary(&self.tcp_inlet.to) ) } else { - fmt_warn!( - "A InfluxDB Inlet was created in the Node {} bound to {} but failed to connect to the TCP Outlet at {}\n", - color_primary(&node_name), - color_primary(self.tcp_inlet.from.to_string()), + fmt_warn!("{created_message}\n") + + &fmt_log!( + "but failed to connect to the TCP Outlet at {}\n", color_primary(&self.tcp_inlet.to) - ) + &fmt_info!("It will retry to connect automatically") + ) + &fmt_info!("It will automatically connect to the InfluxDB Outlet as soon as it is available\n") }; opts.terminal diff --git a/implementations/rust/ockam/ockam_command/src/relay/create.rs b/implementations/rust/ockam/ockam_command/src/relay/create.rs index c70db02840f..f27179fc602 100644 --- a/implementations/rust/ockam/ockam_command/src/relay/create.rs +++ b/implementations/rust/ockam/ockam_command/src/relay/create.rs @@ -169,7 +169,7 @@ impl Command for CreateCommand { let at = color_primary(&at); let node = color_primary(format!("/node/{}", &node.node_name())); fmt_warn!("A relay was created at {node} but failed to connect to {at}\n") - + &fmt_info!("It will retry to connect automatically") + + &fmt_info!("It will automatically connect to the Relay as soon as it is available") }; opts.terminal .stdout()