Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(rust): portals commands feedback improvements #8680

Open
wants to merge 9 commits into
base: develop
Choose a base branch
from
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use ockam_node::Context;
use tokio::io::AsyncWriteExt;
use tokio::net::tcp::OwnedWriteHalf;
use tokio::{io::AsyncReadExt, net::tcp::OwnedReadHalf};
use tracing::{debug, info};
use tracing::debug;

pub(crate) struct TcpMitmProcessor {
address_of_other_processor: Address,
Expand Down Expand Up @@ -83,7 +83,7 @@ impl Processor for TcpMitmProcessor {
let len = match self.read_half.read(&mut buf).await {
Ok(l) if l != 0 => l,
_ => {
info!("Connection was closed; dropping stream {}", ctx.address());
debug!("Connection was closed; dropping stream {}", ctx.address());

let _ = ctx.stop_processor(self.address_of_other_processor.clone()).await;

Expand Down
1 change: 1 addition & 0 deletions implementations/rust/ockam/ockam_api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ log = "0.4"
miette = { version = "7.2.0", features = ["fancy-no-backtrace"] }
minicbor = { version = "0.25.1", default-features = false, features = ["alloc", "derive"] }
nix = { version = "0.29", features = ["signal"] }
nu-ansi-term = "0.50"
once_cell = { version = "1", default-features = false }
open = "5.3.0"
opentelemetry = { version = "0.26.0", features = ["logs", "metrics", "trace"] }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,7 @@ pub struct CliState {
database: SqlxDatabase,
application_database: SqlxDatabase,
exporting_enabled: ExportingEnabled,
/// Broadcast channel to be notified of major events during a process supported by the
/// CliState API
/// Broadcast channel to be notified of major events during a process supported by the CliState API
notifications: Sender<Notification>,
}

Expand Down Expand Up @@ -118,7 +117,6 @@ impl CliState {
}

fn notify(&self, notification: Notification) {
debug!("{:?}", notification.contents());
let _ = self.notifications.send(notification);
}
}
Expand Down
157 changes: 154 additions & 3 deletions implementations/rust/ockam/ockam_api/src/logs/logging_options.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
use nu_ansi_term::{Color, Style};
use ockam_core::env::FromString;
use ockam_core::errcode::{Kind, Origin};
use std::fmt::{Display, Formatter};
use std::fmt::{Debug, Display, Formatter};
use tracing_core::{Event, Level, Subscriber};
use tracing_subscriber::fmt::format::Writer;
use tracing_subscriber::fmt::{FmtContext, FormatEvent, FormatFields, FormattedFields};
use tracing_subscriber::registry::LookupSpan;

#[derive(Debug, PartialEq, Eq, Copy, Clone)]
pub enum LoggingEnabled {
Expand Down Expand Up @@ -85,12 +90,158 @@ impl FromString for LogFormat {
}
}

impl std::fmt::Display for LogFormat {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
impl Display for LogFormat {
fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
match self {
LogFormat::Default => write!(f, "default"),
LogFormat::Pretty => write!(f, "pretty"),
LogFormat::Json => write!(f, "json"),
}
}
}

#[derive(Default)]
pub struct OckamLogFormat {}

impl OckamLogFormat {
pub fn new() -> Self {
Self {}
}

fn format_timestamp(&self, writer: &mut Writer<'_>) -> std::fmt::Result {
let now = chrono::Utc::now().format("%Y-%m-%dT%H:%M:%S%.6fZ");
if writer.has_ansi_escapes() {
let style = Style::new().dimmed();
write!(writer, "{}", style.prefix())?;
write!(writer, "{}", now)?;
write!(writer, "{} ", style.suffix())?;
} else {
write!(writer, "{}", now)?;
writer.write_char(' ')?;
}
Ok(())
}
}

impl<S, N> FormatEvent<S, N> for OckamLogFormat
where
S: Subscriber + for<'a> LookupSpan<'a>,
N: for<'a> FormatFields<'a> + 'static,
{
fn format_event(
&self,
ctx: &FmtContext<'_, S, N>,
mut writer: Writer<'_>,
event: &Event<'_>,
) -> std::fmt::Result {
let meta = event.metadata();
let dimmed = if writer.has_ansi_escapes() {
Style::new().dimmed()
} else {
Style::new()
};
let bold = if writer.has_ansi_escapes() {
Style::new().bold()
} else {
Style::new()
};

// Timestamp
self.format_timestamp(&mut writer)?;

// Level
let fmt_level = FmtLevel::new(meta.level(), writer.has_ansi_escapes());
write!(writer, "{} ", fmt_level)?;

// Event
ctx.format_fields(writer.by_ref(), event)?;
writer.write_char(' ')?;

// Scope
if let Some(scope) = ctx.event_scope() {
let mut seen = false;

for span in scope.from_root() {
write!(writer, "{}", bold.paint(span.metadata().name()))?;
seen = true;

let ext = span.extensions();
if let Some(fields) = &ext.get::<FormattedFields<N>>() {
if !fields.is_empty() {
write!(writer, "{}{}{}", bold.paint("{"), fields, bold.paint("}"))?;
}
}
write!(writer, "{}", dimmed.paint(":"))?;
}

if seen {
writer.write_char(' ')?;
}
};

// Target
write!(writer, "{} ", dimmed.paint(meta.target()))?;

// File and line
let line_number = meta.line();
if let Some(filename) = meta.file() {
write!(
writer,
"{}{}{}",
dimmed.paint(filename),
dimmed.paint(":"),
if line_number.is_some() { "" } else { " " }
)?;
}
if let Some(line_number) = line_number {
write!(
writer,
"{}{}{}",
dimmed.prefix(),
line_number,
dimmed.suffix()
)?;
}

writeln!(writer)
}
}

struct FmtLevel<'a> {
level: &'a Level,
ansi: bool,
}

impl<'a> FmtLevel<'a> {
pub(crate) fn new(level: &'a Level, ansi: bool) -> Self {
Self { level, ansi }
}
}

const TRACE_STR: &str = "TRACE";
const DEBUG_STR: &str = "DEBUG";
const INFO_STR: &str = " INFO";
const WARN_STR: &str = " WARN";
const ERROR_STR: &str = "ERROR";

impl Display for FmtLevel<'_> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
if self.ansi {
match *self.level {
Level::TRACE => write!(f, "{}", Color::Purple.paint(TRACE_STR)),
Level::DEBUG => write!(f, "{}", Color::Blue.paint(DEBUG_STR)),
Level::INFO => write!(f, "{}", Color::Green.paint(INFO_STR)),
Level::WARN => write!(f, "{}", Color::Yellow.paint(WARN_STR)),
Level::ERROR => write!(f, "{}", Color::Red.paint(ERROR_STR)),
}
} else {
match *self.level {
Level::TRACE => f.pad(TRACE_STR),
Level::DEBUG => f.pad(DEBUG_STR),
Level::INFO => f.pad(INFO_STR),
Level::WARN => f.pad(WARN_STR),
Level::ERROR => f.pad(ERROR_STR),
}
}
}
}
24 changes: 16 additions & 8 deletions implementations/rust/ockam/ockam_api/src/logs/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use opentelemetry::trace::TracerProvider;
use opentelemetry::{global, KeyValue};
use opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge;
use opentelemetry_otlp::WithExportConfig;
use opentelemetry_sdk as sdk;
use opentelemetry_sdk::export::logs::LogExporter;
use opentelemetry_sdk::export::trace::SpanExporter;
use opentelemetry_sdk::logs::{BatchLogProcessor, LoggerProvider};
Expand All @@ -30,6 +29,7 @@ use ockam_node::Executor;
use crate::logs::tracing_guard::TracingGuard;
use crate::logs::{
ExportingConfiguration, GlobalErrorHandler, LoggingConfiguration, OckamLogExporter,
OckamLogFormat,
};
use crate::logs::{LogFormat, OckamSpanExporter};

Expand Down Expand Up @@ -109,7 +109,9 @@ impl LoggingTracing {
let result = match logging_configuration.format() {
LogFormat::Pretty => layers.with(appender.pretty()).try_init(),
LogFormat::Json => layers.with(appender.json()).try_init(),
LogFormat::Default => layers.with(appender).try_init(),
LogFormat::Default => layers
.with(appender.event_format(OckamLogFormat::new()))
.try_init(),
};
result.expect("Failed to initialize tracing subscriber");

Expand All @@ -130,7 +132,9 @@ impl LoggingTracing {
let result = match logging_configuration.format() {
LogFormat::Pretty => layers.with(appender.pretty()).try_init(),
LogFormat::Json => layers.with(appender.json()).try_init(),
LogFormat::Default => layers.with(appender).try_init(),
LogFormat::Default => layers
.with(appender.event_format(OckamLogFormat::new()))
.try_init(),
};
result.expect("Failed to initialize tracing subscriber");
};
Expand Down Expand Up @@ -230,7 +234,7 @@ fn create_opentelemetry_tracing_layer<
exporting_configuration: &ExportingConfiguration,
span_exporter: S,
) -> (
OpenTelemetryLayer<R, sdk::trace::Tracer>,
OpenTelemetryLayer<R, opentelemetry_sdk::trace::Tracer>,
opentelemetry_sdk::trace::TracerProvider,
) {
let app = app_name.to_string();
Expand All @@ -243,7 +247,8 @@ fn create_opentelemetry_tracing_layer<
let is_ockam_developer = exporting_configuration.is_ockam_developer();
let span_export_cutoff = exporting_configuration.span_export_cutoff();
Executor::execute_future(async move {
let trace_config = sdk::trace::Config::default().with_resource(make_resource(app));
let trace_config =
opentelemetry_sdk::trace::Config::default().with_resource(make_resource(app));
let (tracer, tracer_provider) = create_tracer(
trace_config,
batch_config,
Expand Down Expand Up @@ -410,11 +415,14 @@ fn set_global_error_handler(logging_configuration: &LoggingConfiguration) {

/// Create a Tracer using the provided span exporter
fn create_tracer<S: SpanExporter + 'static>(
trace_config: sdk::trace::Config,
trace_config: opentelemetry_sdk::trace::Config,
batch_config: BatchConfig,
exporter: S,
) -> (sdk::trace::Tracer, opentelemetry_sdk::trace::TracerProvider) {
let span_processor = BatchSpanProcessor::builder(exporter, sdk::runtime::Tokio)
) -> (
opentelemetry_sdk::trace::Tracer,
opentelemetry_sdk::trace::TracerProvider,
) {
let span_processor = BatchSpanProcessor::builder(exporter, opentelemetry_sdk::runtime::Tokio)
.with_batch_config(batch_config)
.build();
let provider = opentelemetry_sdk::trace::TracerProvider::builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ impl Instantiator for ProjectInstantiator {
let (project_multiaddr, project_identifier) =
node_manager.resolve_project(&project).await?;

debug!(addr = %project_multiaddr, "creating secure channel");
debug!(to = %project_multiaddr, identifier = %project_identifier, "creating secure channel");
let transport_res = RemoteMultiaddrResolver::new(
Some(node_manager.tcp_transport.clone()),
None, // We can't connect to the project node via UDP atm
Expand All @@ -67,7 +67,6 @@ impl Instantiator for ProjectInstantiator {
))
})?;

debug!("create a secure channel to the project {project_identifier}");
let sc = node_manager
.create_secure_channel_internal(
ctx,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -298,13 +298,13 @@ impl NodeManager {
pub async fn make_connection(
&self,
ctx: &Context,
addr: &MultiAddr,
address: &MultiAddr,
identifier: Identifier,
authorized: Option<Identifier>,
timeout: Option<Duration>,
) -> ockam_core::Result<Connection> {
let authorized = authorized.map(|authorized| vec![authorized]);
self.connect(ctx, addr, identifier, authorized, timeout)
self.connect(ctx, address, identifier, authorized, timeout)
.await
}

Expand All @@ -313,13 +313,13 @@ impl NodeManager {
async fn connect(
&self,
ctx: &Context,
addr: &MultiAddr,
address: &MultiAddr,
identifier: Identifier,
authorized: Option<Vec<Identifier>>,
timeout: Option<Duration>,
) -> ockam_core::Result<Connection> {
debug!(?timeout, "connecting to {}", &addr);
let connection = ConnectionBuilder::new(addr.clone())
debug!(%address, ?timeout, "connecting");
let connection = ConnectionBuilder::new(address.clone())
.instantiate(
ctx,
self,
Expand All @@ -333,13 +333,12 @@ impl NodeManager {
.instantiate(
ctx,
self,
SecureChannelInstantiator::new(&identifier, timeout, authorized),
SecureChannelInstantiator::new(&identifier, timeout, authorized.clone()),
)
.await?
.build();
connection.add_default_consumers(ctx);

debug!("connected to {connection:?}");
info!(%address, %identifier, ?authorized, "connection established");
Ok(connection)
}

Expand Down
Loading
Loading