Skip to content

Commit

Permalink
feat(rust): simplify node create execution
Browse files Browse the repository at this point in the history
  • Loading branch information
adrianbenavides committed Dec 12, 2024
1 parent 441bacf commit 1fa5559
Show file tree
Hide file tree
Showing 21 changed files with 585 additions and 450 deletions.
33 changes: 26 additions & 7 deletions implementations/rust/ockam/ockam_api/src/cli_state/cli_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use tokio::sync::broadcast::{channel, Receiver, Sender};

use ockam::SqlxDatabase;
use ockam_core::env::get_env_with_default;
use ockam_node::database::DatabaseConfiguration;
use ockam_node::database::{DatabaseConfiguration, OCKAM_IN_MEMORY};
use ockam_node::Executor;

use crate::cli_state::error::Result;
Expand Down Expand Up @@ -71,6 +71,13 @@ impl CliState {
Self::make_database_configuration(&self.dir)
}

pub fn is_using_in_memory_database(&self) -> Result<bool> {
match self.database_configuration()? {
DatabaseConfiguration::SqliteInMemory { .. } => Ok(true),
_ => Ok(false),
}
}

pub fn is_database_path(&self, path: &Path) -> bool {
let database_configuration = self.database_configuration().ok();
match database_configuration {
Expand Down Expand Up @@ -248,9 +255,15 @@ impl CliState {
pub(super) fn make_database_configuration(root_path: &Path) -> Result<DatabaseConfiguration> {
match DatabaseConfiguration::postgres()? {
Some(configuration) => Ok(configuration),
None => Ok(DatabaseConfiguration::sqlite(
root_path.join("database.sqlite3").as_path(),
)),
None => {
if get_env_with_default::<bool>(OCKAM_IN_MEMORY, false)? {
Ok(DatabaseConfiguration::sqlite_in_memory())
} else {
Ok(DatabaseConfiguration::sqlite(
root_path.join("database.sqlite3").as_path(),
))
}
}
}
}

Expand All @@ -260,9 +273,15 @@ impl CliState {
) -> Result<DatabaseConfiguration> {
match DatabaseConfiguration::postgres()? {
Some(configuration) => Ok(configuration),
None => Ok(DatabaseConfiguration::sqlite(
root_path.join("application_database.sqlite3").as_path(),
)),
None => {
if get_env_with_default::<bool>(OCKAM_IN_MEMORY, false)? {
Ok(DatabaseConfiguration::sqlite_in_memory())
} else {
Ok(DatabaseConfiguration::sqlite(
root_path.join("application_database.sqlite3").as_path(),
))
}
}
}
}

Expand Down
16 changes: 16 additions & 0 deletions implementations/rust/ockam/ockam_api/src/cli_state/nodes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,9 @@ impl CliState {
#[instrument(skip_all, fields(node_name = node_name))]
pub async fn stop_node(&self, node_name: &str) -> Result<()> {
debug!(name=%node_name, "stopping node...");
self.nodes_repository()
.mark_as_not_configured(node_name)
.await?;
let node = self.get_node(node_name).await?;
if let Some(pid) = node.pid() {
// Avoid killing the current process, return successfully instead.
Expand Down Expand Up @@ -339,6 +342,15 @@ impl CliState {
pub async fn set_node_pid(&self, node_name: &str, pid: u32) -> Result<()> {
Ok(self.nodes_repository().set_node_pid(node_name, pid).await?)
}

/// Set the configured field as true once a node has finished running its configuration
#[instrument(skip_all, fields(node_name = node_name))]
pub async fn mark_node_as_configured(&self, node_name: &str) -> Result<()> {
Ok(self
.nodes_repository()
.mark_as_configured(node_name)
.await?)
}
}

/// The following methods return nodes data
Expand Down Expand Up @@ -455,6 +467,7 @@ impl CliState {
tcp_listener_address,
Some(process::id()),
status_endpoint_address,
false,
);
repository.store_node(&node_info).await?;
Ok(node_info)
Expand Down Expand Up @@ -559,6 +572,7 @@ pub struct NodeInfo {
tcp_listener_address: Option<InternetAddress>,
pid: Option<u32>,
status_endpoint_address: Option<InternetAddress>,
pub(crate) configured: bool,
}

impl NodeInfo {
Expand All @@ -572,6 +586,7 @@ impl NodeInfo {
tcp_listener_address: Option<InternetAddress>,
pid: Option<u32>,
status_endpoint_address: Option<InternetAddress>,
configured: bool,
) -> Self {
Self {
name,
Expand All @@ -582,6 +597,7 @@ impl NodeInfo {
tcp_listener_address,
pid,
status_endpoint_address,
configured,
}
}
pub fn name(&self) -> String {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,12 @@ pub trait NodesRepository: Send + Sync + 'static {
/// Unset the process id of a node
async fn set_no_node_pid(&self, node_name: &str) -> Result<()>;

/// Mark a node as configured
async fn mark_as_configured(&self, node_name: &str) -> Result<()>;

/// Mark a node as not configured
async fn mark_as_not_configured(&self, node_name: &str) -> Result<()>;

/// Associate a node to a project
async fn set_node_project_name(&self, node_name: &str, project_name: &str) -> Result<()>;

Expand Down Expand Up @@ -152,6 +158,14 @@ impl<T: NodesRepository> NodesRepository for AutoRetry<T> {
retry!(self.wrapped.set_no_node_pid(node_name))
}

async fn mark_as_configured(&self, node_name: &str) -> Result<()> {
retry!(self.wrapped.mark_as_configured(node_name))
}

async fn mark_as_not_configured(&self, node_name: &str) -> Result<()> {
retry!(self.wrapped.mark_as_not_configured(node_name))
}

async fn set_node_project_name(&self, node_name: &str, project_name: &str) -> Result<()> {
retry!(self.wrapped.set_node_project_name(node_name, project_name))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,10 @@ impl NodesSqlxDatabase {
impl NodesRepository for NodesSqlxDatabase {
async fn store_node(&self, node_info: &NodeInfo) -> Result<()> {
let query = query(r#"
INSERT INTO node (name, identifier, verbosity, is_default, is_authority, tcp_listener_address, pid, http_server_address)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
INSERT INTO node (name, identifier, verbosity, is_default, is_authority, tcp_listener_address, pid, http_server_address, configured)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
ON CONFLICT (name)
DO UPDATE SET identifier = $2, verbosity = $3, is_default = $4, is_authority = $5, tcp_listener_address = $6, pid = $7, http_server_address = $8"#)
DO UPDATE SET identifier = $2, verbosity = $3, is_default = $4, is_authority = $5, tcp_listener_address = $6, pid = $7, http_server_address = $8, configured = $9"#)
.bind(node_info.name())
.bind(node_info.identifier())
.bind(node_info.verbosity() as i16)
Expand All @@ -66,7 +66,8 @@ impl NodesRepository for NodesSqlxDatabase {
.status_endpoint_address()
.as_ref()
.map(|a| a.to_string()),
);
)
.bind(node_info.configured);
query.execute(&*self.database.pool).await.void()?;
if node_info.is_default() {
self.set_default_node(&node_info.name()).await?;
Expand All @@ -75,13 +76,13 @@ impl NodesRepository for NodesSqlxDatabase {
}

async fn get_nodes(&self) -> Result<Vec<NodeInfo>> {
let query = query_as("SELECT name, identifier, verbosity, is_default, is_authority, tcp_listener_address, pid, http_server_address FROM node");
let query = query_as("SELECT name, identifier, verbosity, is_default, is_authority, tcp_listener_address, pid, http_server_address, configured FROM node");
let rows: Vec<NodeRow> = query.fetch_all(&*self.database.pool).await.into_core()?;
rows.iter().map(|r| r.node_info()).collect()
}

async fn get_node(&self, node_name: &str) -> Result<Option<NodeInfo>> {
let query = query_as("SELECT name, identifier, verbosity, is_default, is_authority, tcp_listener_address, pid, http_server_address FROM node WHERE name = $1").bind(node_name);
let query = query_as("SELECT name, identifier, verbosity, is_default, is_authority, tcp_listener_address, pid, http_server_address, configured FROM node WHERE name = $1").bind(node_name);
let row: Option<NodeRow> = query
.fetch_optional(&*self.database.pool)
.await
Expand All @@ -90,13 +91,13 @@ impl NodesRepository for NodesSqlxDatabase {
}

async fn get_nodes_by_identifier(&self, identifier: &Identifier) -> Result<Vec<NodeInfo>> {
let query = query_as("SELECT name, identifier, verbosity, is_default, is_authority, tcp_listener_address, pid, http_server_address FROM node WHERE identifier = $1").bind(identifier.to_string());
let query = query_as("SELECT name, identifier, verbosity, is_default, is_authority, tcp_listener_address, pid, http_server_address, configured FROM node WHERE identifier = $1").bind(identifier.to_string());
let rows: Vec<NodeRow> = query.fetch_all(&*self.database.pool).await.into_core()?;
rows.iter().map(|r| r.node_info()).collect()
}

async fn get_default_node(&self) -> Result<Option<NodeInfo>> {
let query = query_as("SELECT name, identifier, verbosity, is_default, is_authority, tcp_listener_address, pid, http_server_address FROM node WHERE is_default = $1").bind(true);
let query = query_as("SELECT name, identifier, verbosity, is_default, is_authority, tcp_listener_address, pid, http_server_address, configured FROM node WHERE is_default = $1").bind(true);
let row: Option<NodeRow> = query
.fetch_optional(&*self.database.pool)
.await
Expand Down Expand Up @@ -225,6 +226,20 @@ impl NodesRepository for NodesSqlxDatabase {
query.execute(&*self.database.pool).await.void()
}

async fn mark_as_configured(&self, node_name: &str) -> Result<()> {
let query = query("UPDATE node SET configured=$1 WHERE name = $2")
.bind(true)
.bind(node_name);
query.execute(&*self.database.pool).await.void()
}

async fn mark_as_not_configured(&self, node_name: &str) -> Result<()> {
let query = query("UPDATE node SET configured=$1 WHERE name = $2")
.bind(false)
.bind(node_name);
query.execute(&*self.database.pool).await.void()
}

async fn set_node_project_name(&self, node_name: &str, project_name: &str) -> Result<()> {
let query = query(
r#"
Expand Down Expand Up @@ -274,6 +289,7 @@ pub(crate) struct NodeRow {
tcp_listener_address: Nullable<String>,
pid: Nullable<i64>,
http_server_address: Nullable<String>,
configured: Boolean,
}

impl NodeRow {
Expand All @@ -296,6 +312,7 @@ impl NodeRow {
tcp_listener_address,
self.pid.to_option().map(|p| p as u32),
status_endpoint_address,
self.configured.to_bool(),
))
}
}
Expand Down Expand Up @@ -326,6 +343,7 @@ mod test {
InternetAddress::new("127.0.0.1:51591"),
Some(1234),
InternetAddress::new("127.0.0.1:51592"),
false,
);

repository.store_node(&node_info1).await?;
Expand All @@ -338,6 +356,15 @@ mod test {
let result = repository.get_nodes_by_identifier(&identifier).await?;
assert_eq!(result, vec![node_info1.clone()]);

// change configured value
repository.mark_as_configured("node1").await?;
let result = repository.get_node("node1").await?;
assert!(result.unwrap().configured);

repository.mark_as_not_configured("node1").await?;
let result = repository.get_node("node1").await?;
assert!(!result.unwrap().configured);

// the list of all the nodes can be retrieved
let node_info2 = NodeInfo::new(
"node2".to_string(),
Expand All @@ -348,6 +375,7 @@ mod test {
None,
Some(5678),
None,
false,
);

repository.store_node(&node_info2).await?;
Expand All @@ -371,6 +399,7 @@ mod test {
None,
Some(5678),
None,
true,
);
repository.store_node(&node_info3).await?;
let result = repository.get_default_node().await?;
Expand Down Expand Up @@ -453,6 +482,7 @@ mod test {
InternetAddress::new("127.0.0.1:51591"),
Some(1234),
InternetAddress::new("127.0.0.1:51592"),
false,
)
}
}
16 changes: 12 additions & 4 deletions implementations/rust/ockam/ockam_api/src/nodes/models/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,22 @@ use std::fmt::{Display, Formatter};
pub struct NodeStatus {
#[n(1)] pub name: String,
#[n(2)] pub identifier: Identifier,
#[n(3)] pub status: NodeProcessStatus,
#[n(3)] pub process_status: NodeProcessStatus,
#[n(4)] pub configured: bool,
}

impl NodeStatus {
pub fn new(name: impl Into<String>, identifier: Identifier, status: NodeProcessStatus) -> Self {
pub fn new(
name: impl Into<String>,
identifier: Identifier,
process_status: NodeProcessStatus,
configured: bool,
) -> Self {
Self {
name: name.into(),
identifier,
status,
process_status,
configured,
}
}
}
Expand All @@ -41,7 +48,8 @@ impl From<&NodeInfo> for NodeStatus {
Self {
name: node.name(),
identifier: node.identifier(),
status: node.status(),
process_status: node.status(),
configured: node.configured,
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ use std::time::Duration;

use miette::{miette, IntoDiagnostic};
use minicbor::{Decode, Encode};
use ockam::identity::get_default_timeout;

use ockam::identity::get_default_timeout;
use ockam::tcp::{TcpConnection, TcpConnectionOptions, TcpTransport};
use ockam_core::api::{Reply, Request};
use ockam_core::Route;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use colorful::Colorful;
use miette::{miette, IntoDiagnostic, WrapErr};
use serde::{Deserialize, Serialize};
use tokio::fs::read_to_string;
use tokio::process::Child;
use tokio_retry::strategy::FixedInterval;
use tokio_retry::Retry;
use tracing::{debug, error, info};
Expand Down Expand Up @@ -140,7 +141,7 @@ impl CreateCommand {
pub(crate) async fn spawn_background_node(
&self,
opts: &CommandGlobalOpts,
) -> miette::Result<()> {
) -> miette::Result<Child> {
if !self.skip_is_running_check {
self.guard_node_is_not_already_running(opts).await?;
}
Expand Down Expand Up @@ -282,7 +283,8 @@ impl CreateCommand {
/// Given a Context start a node in a new OS process
async fn create_background_node(&self, opts: CommandGlobalOpts) -> miette::Result<()> {
// Spawn node in another, new process
self.spawn_background_node(&opts).await
self.spawn_background_node(&opts).await?;
Ok(())
}

/// Start an authority node:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ pub struct CommandGlobalOpts {
pub state: CliState,
pub terminal: Terminal<TerminalStream<Term>>,
pub rt: Arc<Runtime>,
tracing_guard: Option<Arc<TracingGuard>>,
pub tracing_guard: Option<Arc<TracingGuard>>,
}

impl CommandGlobalOpts {
Expand Down
Loading

0 comments on commit 1fa5559

Please sign in to comment.