Skip to content

Commit

Permalink
feat(rust): simplify node create execution
Browse files Browse the repository at this point in the history
  • Loading branch information
adrianbenavides committed Dec 13, 2024
1 parent 441bacf commit 2ddf8ac
Show file tree
Hide file tree
Showing 28 changed files with 661 additions and 537 deletions.
33 changes: 26 additions & 7 deletions implementations/rust/ockam/ockam_api/src/cli_state/cli_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use tokio::sync::broadcast::{channel, Receiver, Sender};

use ockam::SqlxDatabase;
use ockam_core::env::get_env_with_default;
use ockam_node::database::DatabaseConfiguration;
use ockam_node::database::{DatabaseConfiguration, OCKAM_SQLITE_IN_MEMORY};
use ockam_node::Executor;

use crate::cli_state::error::Result;
Expand Down Expand Up @@ -71,6 +71,13 @@ impl CliState {
Self::make_database_configuration(&self.dir)
}

pub fn is_using_in_memory_database(&self) -> Result<bool> {
match self.database_configuration()? {
DatabaseConfiguration::SqliteInMemory { .. } => Ok(true),
_ => Ok(false),
}
}

pub fn is_database_path(&self, path: &Path) -> bool {
let database_configuration = self.database_configuration().ok();
match database_configuration {
Expand Down Expand Up @@ -248,9 +255,15 @@ impl CliState {
pub(super) fn make_database_configuration(root_path: &Path) -> Result<DatabaseConfiguration> {
match DatabaseConfiguration::postgres()? {
Some(configuration) => Ok(configuration),
None => Ok(DatabaseConfiguration::sqlite(
root_path.join("database.sqlite3").as_path(),
)),
None => {
if get_env_with_default::<bool>(OCKAM_SQLITE_IN_MEMORY, false)? {
Ok(DatabaseConfiguration::sqlite_in_memory())
} else {
Ok(DatabaseConfiguration::sqlite(
root_path.join("database.sqlite3").as_path(),
))
}
}
}
}

Expand All @@ -260,9 +273,15 @@ impl CliState {
) -> Result<DatabaseConfiguration> {
match DatabaseConfiguration::postgres()? {
Some(configuration) => Ok(configuration),
None => Ok(DatabaseConfiguration::sqlite(
root_path.join("application_database.sqlite3").as_path(),
)),
None => {
if get_env_with_default::<bool>(OCKAM_SQLITE_IN_MEMORY, false)? {
Ok(DatabaseConfiguration::sqlite_in_memory())
} else {
Ok(DatabaseConfiguration::sqlite(
root_path.join("application_database.sqlite3").as_path(),
))
}
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,8 @@ impl ExportingConfiguration {

/// Create a tracing configuration for a user command running in the foreground.
/// (meaning that the process will shut down once the command has been executed)
pub fn foreground() -> ockam_core::Result<ExportingConfiguration> {
match opentelemetry_endpoint()? {
pub fn foreground(state: &CliState) -> ockam_core::Result<ExportingConfiguration> {
match opentelemetry_endpoint(state)? {
None => ExportingConfiguration::off(),
Some(endpoint) => Ok(ExportingConfiguration {
enabled: exporting_enabled(
Expand All @@ -139,8 +139,8 @@ impl ExportingConfiguration {
}

/// Create a tracing configuration for a background node
pub fn background() -> ockam_core::Result<ExportingConfiguration> {
match opentelemetry_endpoint()? {
pub fn background(state: &CliState) -> ockam_core::Result<ExportingConfiguration> {
match opentelemetry_endpoint(state)? {
None => ExportingConfiguration::off(),
Some(endpoint) => Ok(ExportingConfiguration {
enabled: exporting_enabled(
Expand Down Expand Up @@ -320,20 +320,20 @@ fn to_socket_addr(url: &Url) -> Option<SocketAddr> {
/// Return the tracing endpoint, defined by an environment variable
/// If the endpoint can be established with an Ockam portal to the opentelemetry-relay created in the project
/// use that URL, otherwise use the HTTPS endpoint
fn opentelemetry_endpoint() -> ockam_core::Result<Option<OpenTelemetryEndpoint>> {
fn opentelemetry_endpoint(state: &CliState) -> ockam_core::Result<Option<OpenTelemetryEndpoint>> {
if !is_exporting_set()? {
print_debug("Exporting is turned off");
Ok(None)
} else {
let cli_state = CliState::with_default_dir()?;
let state = state.clone();
match Executor::execute_future(async move {
// if a project is defined try to use the OpenTelemetry portal
// and if we allow traces to be exported via a portal
if cli_state.projects().get_default_project().await.is_ok()
if state.projects().get_default_project().await.is_ok()
&& is_exporting_via_portal_set()?
{
print_debug("A default project exists. Getting the project export endpoint");
get_project_endpoint_url(&cli_state).await
get_project_endpoint_url(&state).await
} else {
print_debug("A default project does not exist. Getting the default HTTPs endpoint");
get_https_endpoint()
Expand Down
5 changes: 1 addition & 4 deletions implementations/rust/ockam/ockam_api/src/logs/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@ use opentelemetry::trace::TracerProvider;
use opentelemetry::{global, KeyValue};
use opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge;
use opentelemetry_otlp::WithExportConfig;
use opentelemetry_sdk as sdk;
use opentelemetry_sdk::export::logs::LogExporter;
use opentelemetry_sdk::export::trace::SpanExporter;
use opentelemetry_sdk::logs::{BatchLogProcessor, LoggerProvider};
use opentelemetry_sdk::propagation::TraceContextPropagator;
use opentelemetry_sdk::trace::{BatchConfig, BatchConfigBuilder, BatchSpanProcessor};
use opentelemetry_sdk::{self as sdk};
use opentelemetry_sdk::{logs, Resource};
use opentelemetry_semantic_conventions::attribute;
use std::io::{empty, stdout};
Expand Down Expand Up @@ -135,9 +135,6 @@ impl LoggingTracing {
result.expect("Failed to initialize tracing subscriber");
};

// the global error handler prints errors when exporting spans or log records fails
set_global_error_handler(logging_configuration);

TracingGuard::guard_only(worker_guard)
}

Expand Down
12 changes: 8 additions & 4 deletions implementations/rust/ockam/ockam_api/src/nodes/models/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,19 @@ use std::fmt::{Display, Formatter};
pub struct NodeStatus {
#[n(1)] pub name: String,
#[n(2)] pub identifier: Identifier,
#[n(3)] pub status: NodeProcessStatus,
#[n(3)] pub process_status: NodeProcessStatus,
}

impl NodeStatus {
pub fn new(name: impl Into<String>, identifier: Identifier, status: NodeProcessStatus) -> Self {
pub fn new(
name: impl Into<String>,
identifier: Identifier,
process_status: NodeProcessStatus,
) -> Self {
Self {
name: name.into(),
identifier,
status,
process_status,
}
}
}
Expand All @@ -41,7 +45,7 @@ impl From<&NodeInfo> for NodeStatus {
Self {
name: node.name(),
identifier: node.identifier(),
status: node.status(),
process_status: node.status(),
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ use std::time::Duration;

use miette::{miette, IntoDiagnostic};
use minicbor::{Decode, Encode};
use ockam::identity::get_default_timeout;

use ockam::identity::get_default_timeout;
use ockam::tcp::{TcpConnection, TcpConnectionOptions, TcpTransport};
use ockam_core::api::{Reply, Request};
use ockam_core::Route;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::common::test_spans::Trace;
use ockam_api::logs::{ExportingConfiguration, LoggingConfiguration, LoggingTracing};
use ockam_api::CliState;
use ockam_core::{AsyncTryClone, OpenTelemetryContext};
use ockam_node::{Context, NodeBuilder};
use opentelemetry::global;
Expand All @@ -13,7 +14,10 @@ use std::future::Future;
///
/// - the return value of the function
/// - all the exported spans
pub fn trace_code<F>(f: impl Fn(Context) -> F + Send + Sync + 'static) -> (F::Output, Vec<SpanData>)
pub fn trace_code<F>(
state: &CliState,
f: impl Fn(Context) -> F + Send + Sync + 'static,
) -> (F::Output, Vec<SpanData>)
where
F: Future + Send + 'static,
F::Output: Send + 'static,
Expand All @@ -26,7 +30,7 @@ where
.unwrap()
.set_all_crates()
.set_log_level(tracing_core::metadata::Level::TRACE),
&ExportingConfiguration::foreground().unwrap(),
&ExportingConfiguration::foreground(state).unwrap(),
"test",
None,
);
Expand Down
19 changes: 11 additions & 8 deletions implementations/rust/ockam/ockam_api/tests/journeys.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,16 @@ use tempfile::NamedTempFile;
/// it sets up some global spans / logs exporters that might interact with other tests
#[test]
fn test_create_journey_event() {
let cli = Executor::execute_future(async {
let db_file = NamedTempFile::new().unwrap();
let cli_state_directory = db_file.path().parent().unwrap().join(random_name());
CliState::create(cli_state_directory)
.await
.unwrap()
.set_tracing_enabled(true)
})
.unwrap();

let spans_exporter = InMemorySpanExporter::default();
let logs_exporter = InMemoryLogsExporter::default();

Expand All @@ -30,7 +40,7 @@ fn test_create_journey_event() {
&LoggingConfiguration::off()
.unwrap()
.set_crates(&["ockam_api"]),
&ExportingConfiguration::foreground().unwrap(),
&ExportingConfiguration::foreground(&cli).unwrap(),
"test",
None,
);
Expand All @@ -40,13 +50,6 @@ fn test_create_journey_event() {

Executor::execute_future(
async move {
let db_file = NamedTempFile::new().unwrap();
let cli_state_directory = db_file.path().parent().unwrap().join(random_name());
let cli = CliState::create(cli_state_directory)
.await
.unwrap()
.set_tracing_enabled(true);

let mut map = HashMap::new();
map.insert(USER_EMAIL, "[email protected]".to_string());
map.insert(USER_NAME, "eric".to_string());
Expand Down
17 changes: 14 additions & 3 deletions implementations/rust/ockam/ockam_api/tests/logging_tracing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use ockam_api::logs::{
use opentelemetry::global;
use opentelemetry::trace::Tracer;

use opentelemetry_sdk::{self as sdk};
use opentelemetry_sdk as sdk;
use sdk::testing::logs::*;
use sdk::testing::trace::*;

Expand All @@ -15,6 +15,8 @@ use std::fs;
use tempfile::NamedTempFile;

use ockam_api::cli_state::random_name;
use ockam_api::CliState;
use ockam_node::Executor;
use tracing::{error, info};
use tracing_core::Level;

Expand All @@ -23,9 +25,18 @@ use tracing_core::Level;
/// they set up some global spans / logs exporters that might interact with other tests
#[test]
fn test_log_and_traces() {
let cli = Executor::execute_future(async {
let db_file = NamedTempFile::new().unwrap();
let cli_state_directory = db_file.path().parent().unwrap().join(random_name());
CliState::create(cli_state_directory)
.await
.unwrap()
.set_tracing_enabled(true)
})
.unwrap();

let temp_file = NamedTempFile::new().unwrap();
let log_directory = &temp_file.path().parent().unwrap().join(random_name());

let spans_exporter = InMemorySpanExporter::default();
let logs_exporter = InMemoryLogsExporter::default();
let guard = LoggingTracing::setup_with_exporters(
Expand All @@ -34,7 +45,7 @@ fn test_log_and_traces() {
&make_configuration()
.unwrap()
.set_log_directory(log_directory.into()),
&ExportingConfiguration::foreground().unwrap(),
&ExportingConfiguration::foreground(&cli).unwrap(),
"test",
None,
);
Expand Down
11 changes: 5 additions & 6 deletions implementations/rust/ockam/ockam_app_lib/src/log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,19 @@ impl AppState {
if self.tracing_guard.get().is_some() {
return;
}
let node_dir = {
let state = {
let this = self.clone();
let state = self
.context()
self.context()
.runtime()
.block_on(async move { this.state().await });
state.node_dir(NODE_NAME)
.block_on(async move { this.state().await })
};
let node_dir = state.node_dir(NODE_NAME);
let level_and_crates = LogLevelWithCratesFilter::from_verbose(2)
.unwrap()
.add_crates(vec!["ockam_app_lib"]);
let tracing_guard = LoggingTracing::setup(
&logging_configuration(level_and_crates, Some(node_dir), Colored::Off).unwrap(),
&ExportingConfiguration::foreground().unwrap(),
&ExportingConfiguration::foreground(&state).unwrap(),
"portals",
Some("portals".to_string()),
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use colorful::Colorful;
use miette::{miette, IntoDiagnostic, WrapErr};
use serde::{Deserialize, Serialize};
use tokio::fs::read_to_string;
use tokio::process::Child;
use tokio_retry::strategy::FixedInterval;
use tokio_retry::Retry;
use tracing::{debug, error, info};
Expand Down Expand Up @@ -140,7 +141,7 @@ impl CreateCommand {
pub(crate) async fn spawn_background_node(
&self,
opts: &CommandGlobalOpts,
) -> miette::Result<()> {
) -> miette::Result<Child> {
if !self.skip_is_running_check {
self.guard_node_is_not_already_running(opts).await?;
}
Expand Down Expand Up @@ -282,7 +283,8 @@ impl CreateCommand {
/// Given a Context start a node in a new OS process
async fn create_background_node(&self, opts: CommandGlobalOpts) -> miette::Result<()> {
// Spawn node in another, new process
self.spawn_background_node(&opts).await
self.spawn_background_node(&opts).await?;
Ok(())
}

/// Start an authority node:
Expand Down
29 changes: 19 additions & 10 deletions implementations/rust/ockam/ockam_command/src/command.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,18 @@
use clap::Parser;
use colorful::Colorful;
use ockam_api::fmt_warn;
use opentelemetry::trace::{Link, SpanBuilder, TraceContextExt, Tracer};
use opentelemetry::{global, Context};
use tracing::{instrument, warn};

use ockam_core::OCKAM_TRACER_NAME;

use crate::command_events::{add_command_error_event, add_command_event};
use crate::command_global_opts::CommandGlobalOpts;
use crate::global_args::GlobalArgs;
use crate::subcommand::OckamSubcommand;
use crate::upgrade::check_if_an_upgrade_is_available;
use crate::version::Version;
use crate::{docs, ErrorReportHandler};
use clap::Parser;
use colorful::Colorful;
use ockam_api::fmt_warn;
use ockam_core::OCKAM_TRACER_NAME;
use ockam_node::database::OCKAM_SQLITE_IN_MEMORY;
use opentelemetry::trace::{Link, SpanBuilder, TraceContextExt, Tracer};
use opentelemetry::{global, Context};
use tracing::{instrument, warn};

const ABOUT: &str = include_str!("./static/about.txt");
const LONG_ABOUT: &str = include_str!("./static/long_about.txt");
Expand Down Expand Up @@ -58,6 +57,17 @@ impl OckamCommand {
// This allows us to customize how we format the error messages and their content.
let _hook_result = miette::set_hook(Box::new(|_| Box::new(ErrorReportHandler::new())));

let command_name = self.subcommand.name();

// Set the in-memory env var if needed
if let OckamSubcommand::Node(cmd) = &self.subcommand {
if let crate::node::NodeSubcommand::Create(c) = &cmd.subcommand {
if c.in_memory {
std::env::set_var(OCKAM_SQLITE_IN_MEMORY, "true");
}
}
}

let options = CommandGlobalOpts::new(&arguments, &self.global_args, &self.subcommand)?;

if let Err(err) = check_if_an_upgrade_is_available(&options) {
Expand All @@ -68,7 +78,6 @@ impl OckamCommand {
}

let tracer = global::tracer(OCKAM_TRACER_NAME);
let command_name = self.subcommand.name();
let result =
if let Some(opentelemetry_context) = self.subcommand.get_opentelemetry_context() {
let context = Context::current();
Expand Down
Loading

0 comments on commit 2ddf8ac

Please sign in to comment.