Skip to content

Commit

Permalink
feat: improve logs for tcp portals creation
Browse files Browse the repository at this point in the history
  • Loading branch information
adrianbenavides committed Dec 2, 2024
1 parent 79c10e0 commit 4690f26
Show file tree
Hide file tree
Showing 28 changed files with 236 additions and 200 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use ockam_node::Context;
use tokio::io::AsyncWriteExt;
use tokio::net::tcp::OwnedWriteHalf;
use tokio::{io::AsyncReadExt, net::tcp::OwnedReadHalf};
use tracing::{debug, info};
use tracing::debug;

pub(crate) struct TcpMitmProcessor {
address_of_other_processor: Address,
Expand Down Expand Up @@ -83,7 +83,7 @@ impl Processor for TcpMitmProcessor {
let len = match self.read_half.read(&mut buf).await {
Ok(l) if l != 0 => l,
_ => {
info!("Connection was closed; dropping stream {}", ctx.address());
debug!("Connection was closed; dropping stream {}", ctx.address());

let _ = ctx.stop_processor(self.address_of_other_processor.clone()).await;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ impl Instantiator for ProjectInstantiator {
let (project_multiaddr, project_identifier) =
node_manager.resolve_project(&project).await?;

debug!(addr = %project_multiaddr, "creating secure channel");
debug!(to = %project_multiaddr, identifier = %project_identifier, "creating secure channel");
let tcp = multiaddr_to_route(&project_multiaddr, &node_manager.tcp_transport)
.await
.ok_or_else(|| {
Expand All @@ -62,7 +62,6 @@ impl Instantiator for ProjectInstantiator {
))
})?;

debug!("create a secure channel to the project {project_identifier}");
let sc = node_manager
.create_secure_channel_internal(
ctx,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,13 +286,13 @@ impl NodeManager {
pub async fn make_connection(
&self,
ctx: &Context,
addr: &MultiAddr,
address: &MultiAddr,
identifier: Identifier,
authorized: Option<Identifier>,
timeout: Option<Duration>,
) -> ockam_core::Result<Connection> {
let authorized = authorized.map(|authorized| vec![authorized]);
self.connect(ctx, addr, identifier, authorized, timeout)
self.connect(ctx, address, identifier, authorized, timeout)
.await
}

Expand All @@ -301,13 +301,13 @@ impl NodeManager {
async fn connect(
&self,
ctx: &Context,
addr: &MultiAddr,
address: &MultiAddr,
identifier: Identifier,
authorized: Option<Vec<Identifier>>,
timeout: Option<Duration>,
) -> ockam_core::Result<Connection> {
debug!(?timeout, "connecting to {}", &addr);
let connection = ConnectionBuilder::new(addr.clone())
debug!(%address, ?timeout, "connecting");
let connection = ConnectionBuilder::new(address.clone())
.instantiate(
ctx,
self,
Expand All @@ -319,13 +319,12 @@ impl NodeManager {
.instantiate(
ctx,
self,
SecureChannelInstantiator::new(&identifier, timeout, authorized),
SecureChannelInstantiator::new(&identifier, timeout, authorized.clone()),
)
.await?
.build();
connection.add_default_consumers(ctx);

debug!("connected to {connection:?}");
info!(%address, %identifier, ?authorized, "connection established");
Ok(connection)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ impl NodeManager {
timeout: Option<Duration>,
secure_channel_type: SecureChannelType,
) -> Result<SecureChannel> {
debug!(%sc_route, "Creating secure channel");
debug!(route = %sc_route, %identifier, "initiating secure channel");
let options = SecureChannelOptions::new();

let options = if let Some(timeout) = timeout {
Expand All @@ -225,8 +225,8 @@ impl NodeManager {
None => options,
};

let options = if let Some(credential) = credential {
options.with_credential(credential)?
let options = if let Some(credential) = credential.as_ref() {
options.with_credential(credential.clone())?
} else {
match self.credential_retriever_creators.project_member.as_ref() {
None => options,
Expand All @@ -252,7 +252,7 @@ impl NodeManager {
.create_secure_channel(ctx, identifier, sc_route.clone(), options)
.await?;

debug!(%sc_route, %sc, "Created secure channel");
info!(route = %sc_route, %identifier, "secure channel initiated");

self.registry
.secure_channels
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@ impl NodeManager {
pub async fn create_inlet(
self: &Arc<Self>,
ctx: &Context,
listen_addr: HostnamePort,
listen_address: HostnamePort,
prefix_route: Route,
suffix_route: Route,
outlet_addr: MultiAddr,
outlet_address: MultiAddr,
alias: String,
policy_expression: Option<PolicyExpression>,
wait_for_outlet_duration: Option<Duration>,
Expand All @@ -42,16 +42,15 @@ impl NodeManager {
privileged: bool,
tls_certificate_provider: Option<MultiAddr>,
) -> Result<InletStatus> {
info!("Handling request to create inlet portal");
debug! {
listen_addr = %listen_addr,
%listen_address,
prefix = %prefix_route,
suffix = %suffix_route,
outlet_addr = %outlet_addr,
%outlet_address,
%alias,
%enable_udp_puncture,
%disable_tcp_fallback,
"Creating inlet portal"
"creating inlet"
}

let udp_transport = if enable_udp_puncture {
Expand All @@ -67,8 +66,8 @@ impl NodeManager {
// the port could be zero, to simplify the following code we
// resolve the address to a full socket address
let socket_addr =
ockam_node::compat::asynchronous::resolve_peer(listen_addr.to_string()).await?;
let listen_addr = if listen_addr.port() == 0 {
ockam_node::compat::asynchronous::resolve_peer(listen_address.to_string()).await?;
let listen_addr = if listen_address.port() == 0 {
get_free_address_for(&socket_addr.ip().to_string())
.map_err(|err| ockam_core::Error::new(Origin::Transport, Kind::Invalid, err))?
} else {
Expand Down Expand Up @@ -111,7 +110,7 @@ impl NodeManager {
udp_transport,
context: ctx.async_try_clone().await?,
listen_addr: listen_addr.to_string(),
outlet_addr: outlet_addr.clone(),
outlet_addr: outlet_address.clone(),
prefix_route,
suffix_route,
authorized,
Expand Down Expand Up @@ -139,7 +138,7 @@ impl NodeManager {
.create_tcp_inlet(
&self.node_name,
&listen_addr,
&outlet_addr,
&outlet_address,
&alias,
privileged,
)
Expand Down Expand Up @@ -188,7 +187,7 @@ impl NodeManager {
alias.clone(),
InletInfo::new(
&listen_addr.to_string(),
outlet_addr.clone(),
outlet_address.clone(),
session,
privileged,
),
Expand All @@ -204,10 +203,17 @@ impl NodeManager {
None,
outcome.clone().map(|s| s.route.to_string()),
connection_status,
outlet_addr.to_string(),
outlet_address.to_string(),
privileged,
);

info! {
%listen_address,
%outlet_address,
%alias,
"inlet created"
}

Ok(tcp_inlet_status)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,9 @@ impl InletSessionReplacer {
};

self.main_route = Some(normalized_stripped_route);
info!(address = ?inlet_address,
route = %self.main_route.as_ref().map(|r| r.to_string()).unwrap_or("None".to_string()),
"tcp inlet restored");

Ok(ReplacerOutcome {
ping_route: transport_route,
Expand Down Expand Up @@ -273,7 +276,7 @@ impl SessionReplacer for InletSessionReplacer {
Err(ApiError::core("timeout"))
}
Ok(Err(e)) => {
warn!(%self.outlet_addr, err = %e, "error creating new tcp inlet");
warn!(%self.outlet_addr, err = %e, "failed to create tcp inlet");
Err(e)
}
Ok(Ok(route)) => Ok(route),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@ impl NodeManagerWorker {
}

impl NodeManager {
#[instrument(skip(self, ctx))]
#[allow(clippy::too_many_arguments)]
#[instrument(skip_all)]
pub async fn create_outlet(
Expand All @@ -107,10 +106,7 @@ impl NodeManager {
.generate_worker_addr(worker_addr)
.await;

info!(
"Handling request to create outlet portal to {to} with worker {:?}",
worker_addr
);
debug!(%to, address = %worker_addr, "creating outlet");

// Check registry for a duplicated key
if self.registry.outlets.contains_key(&worker_addr).await {
Expand Down Expand Up @@ -194,10 +190,12 @@ impl NodeManager {
OutletInfo::new(to.clone(), Some(&worker_addr), privileged),
)
.await;

self.cli_state
let outlet = self
.cli_state
.create_tcp_outlet(&self.node_name, &to, &worker_addr, &None, privileged)
.await?
.await?;
info!(%to, address = %worker_addr, "outlet created");
outlet
}
Err(e) => {
warn!(at = %to, err = %e, "Failed to create TCP outlet");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ impl CreateCommand {
/// Run the creation of a node using a node configuration
#[instrument(skip_all)]
pub async fn run_config(self, ctx: &Context, opts: CommandGlobalOpts) -> miette::Result<()> {
debug!("Running node create with a node config");
debug!("running node create with a node config");
let mut node_config = self.get_node_config().await?;
node_config.merge(&self, &opts.state).await?;
let node_name = node_config.node.name().ok_or(miette!(
Expand Down Expand Up @@ -240,7 +240,7 @@ impl NodeConfig {
node_name: &String,
identity_name: &str,
) -> miette::Result<()> {
debug!("Running node config in foreground mode");
debug!("running node config in foreground mode");
// First, run the `project enroll` commands to prepare the identity and project data
if self.project_enroll.ticket.is_some() {
if !self
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,17 +149,18 @@ impl CreateCommand {
}

async fn start_services(&self, ctx: &Context, opts: &CommandGlobalOpts) -> miette::Result<()> {
// Wait until the node is fully started
let mut node =
BackgroundNodeClient::create(ctx, &opts.state, &Some(self.name.clone())).await?;
if !is_node_up(ctx, &mut node, true).await? {
return Err(miette!(
"Couldn't start services because the node is not up"
));
}

if let Some(config) = &self.launch_configuration {
if let Some(startup_services) = &config.startup_services {
// Wait until the node is fully started
let mut node =
BackgroundNodeClient::create(ctx, &opts.state, &Some(self.name.clone()))
.await?;
if !is_node_up(ctx, &mut node, true).await? {
return Err(miette!(
"Couldn't start services because the node is not up"
));
}

if let Some(cfg) = startup_services.secure_channel_listener.clone() {
if !cfg.disabled {
opts.terminal
Expand Down
11 changes: 10 additions & 1 deletion implementations/rust/ockam/ockam_command/src/node/show.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,15 @@ pub async fn is_node_up(
wait_until_ready: bool,
) -> Result<bool> {
let node_name = node.node_name();
// Check if node is already up and running to skip the accessible/ready checks
if let Ok(status) = node
.ask_with_timeout::<(), NodeStatus>(ctx, api::query_status(), Duration::from_secs(1))
.await
{
if status.status.is_running() {
return Ok(true);
}
}
if !is_node_accessible(ctx, node, wait_until_ready).await? {
warn!(%node_name, "the node was not accessible in time");
return Ok(false);
Expand Down Expand Up @@ -222,7 +231,7 @@ async fn is_node_ready(
if let Ok(node_status) = result {
if node_status.status.is_running() {
let elapsed = now.elapsed();
info!(%node_name, ?elapsed, "node is ready {:?}", node_status);
info!(%node_name, ?elapsed, "node is ready");
return Ok(true);
} else {
trace!(%node_name, "node is initializing");
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use crate::CommandGlobalOpts;
use clap::Args;
use colorful::Colorful;
use ockam_abac::expr::and;
use ockam_api::{fmt_log, fmt_warn};
use std::io;
use std::io::Read;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use tracing::{debug, warn};
use tracing::{debug, info, warn};

use ockam_core::compat::collections::BTreeMap;
use ockam_core::compat::sync::Arc;
Expand Down Expand Up @@ -192,28 +192,47 @@ impl CredentialsVerification {
authorities: &[Identifier],
credential_and_purpose_key_attestation: &CredentialAndPurposeKey,
) -> Result<()> {
let credential_data = self
let credential = self
.verify_credential(
Some(subject),
authorities,
credential_and_purpose_key_attestation,
)
.await?;
let credential_data = credential.credential_data;
let purpose_key_data = credential.purpose_key_data;

let map = credential_data.credential_data.subject_attributes.map;
let map: BTreeMap<_, _> = map
let attributes_display = credential_data.get_attributes_display();
let attributes: BTreeMap<_, _> = credential_data
.subject_attributes
.map
.into_iter()
.map(|(k, v)| (Vec::<u8>::from(k), Vec::<u8>::from(v)))
.collect();

info! {
%subject,
attributes = attributes_display,
schema = %credential_data.subject_attributes.schema.0,
created_at = %credential_data.created_at,
expires_at = %credential_data.expires_at,
"presented credential"
}
debug! {
subject = %purpose_key_data.subject,
created_at = %purpose_key_data.created_at,
expires_at = %purpose_key_data.expires_at,
"presented credential - purpose key attestation"
}

self.identities_attributes_repository
.put_attributes(
subject,
AttributesEntry::new(
map,
attributes,
now()?,
Some(credential_data.credential_data.expires_at),
Some(credential_data.purpose_key_data.subject),
Some(credential_data.expires_at),
Some(purpose_key_data.subject),
),
)
.await?;
Expand Down
Loading

0 comments on commit 4690f26

Please sign in to comment.