Skip to content

Commit

Permalink
feat(rust): added persistent state for incoming services in the app
Browse files Browse the repository at this point in the history
  • Loading branch information
davide-baldo committed Nov 16, 2023
1 parent bf419f8 commit 0b82836
Show file tree
Hide file tree
Showing 15 changed files with 938 additions and 674 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions implementations/rust/ockam/ockam_app_lib/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ futures = { version = "0.3.28", default-features = false }
hex = { version = "0.4.3", default-features = false, features = ["alloc", "serde"] }
libc = "0.2"
miette = { version = "5.10.0", features = ["fancy-no-backtrace"] }
minicbor = { version = "0.20.0", features = ["alloc", "derive"] }
ockam = { path = "../ockam", version = "^0.103.0", features = ["software_vault"] }
ockam_api = { path = "../ockam_api", version = "0.46.0", features = ["std"] }
ockam_core = { path = "../ockam_core", version = "^0.93.0" }
Expand Down
10 changes: 3 additions & 7 deletions implementations/rust/ockam/ockam_app_lib/src/api/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,8 @@ extern "C" fn initialize_application(
}

let result = AppState::new(
Some(super::state::rust::ApplicationStateCallback::new(
application_state_callback,
)),
Some(super::notification::rust::NotificationCallback::new(
notification_callback,
)),
super::state::rust::ApplicationStateCallback::new(application_state_callback),
super::notification::rust::NotificationCallback::new(notification_callback),
);

let app_state = match result {
Expand Down Expand Up @@ -202,7 +198,7 @@ extern "C" fn disable_accepted_service(invitation_id: *const c_char) {
};
let app_state = unsafe { APPLICATION_STATE.as_ref() }.expect(ERROR_NOT_INITIALIZED);
app_state.context().runtime().spawn(async move {
let result = app_state.disconnect_tcp_inlet(&invitation_id).await;
let result = app_state.disable_tcp_inlet(&invitation_id).await;
if let Err(err) = result {
error!(?err, "Couldn't disable the service");
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,222 @@
use crate::background_node::BackgroundNodeClient;
use crate::incoming_services::state::{IncomingService, Port};
use crate::state::AppState;
use miette::IntoDiagnostic;
use ockam_api::address::get_free_address;
use ockam_api::cli_state::StateDirTrait;
use ockam_api::nodes::service::portals::Inlets;
use ockam_api::ConnectionStatus;
use ockam_core::api::Reply;
use ockam_multiaddr::MultiAddr;
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
use tracing::{debug, info, warn};

impl AppState {
pub(crate) async fn refresh_inlets(&self) -> crate::Result<()> {
info!("Refreshing inlets");

// for each invitation it checks if the relative node is running
// if not, it deletes the node and re-create the inlet

let services_arc = self.incoming_services();
let services = {
// reduce locking as much as possible to make UI consistently responsive
services_arc.read().await.services.clone()
};
if services.is_empty() {
debug!("No incoming services, skipping inlets refresh");
return Ok(());
}

let background_node_client = self.background_node_client().await;
for service in services {
let result = self
.refresh_inlet(background_node_client.clone(), &service)
.await;
{
// we want to reduce the scope of the guard as much as possible
let mut guard = services_arc.write().await;
match result {
Ok(port) => {
if let Some(service) = guard.find_mut_by_id(service.id()) {
service.set_port(port)
}
}
Err(err) => {
warn!(%err, "Failed to refresh TCP inlet for accepted invitation");
if let Some(service) = guard.find_mut_by_id(service.id()) {
service.set_port(None)
}
}
}
}
self.publish_state().await;
}

info!("Inlets refreshed");
Ok(())
}

async fn refresh_inlet(
&self,
background_node_client: Arc<dyn BackgroundNodeClient>,
service: &IncomingService,
) -> crate::Result<Option<Port>> {
let inlet_node_name = &service.local_node_name();
debug!(node = %inlet_node_name, "Checking node status");
if !service.enabled() {
debug!(node = %inlet_node_name, "TCP inlet is disabled by the user, deleting the node");
let _ = self.delete_background_node(inlet_node_name).await;
return Ok(None);
}

if self.is_connected(service, inlet_node_name).await {
Ok(None)
} else {
self.create_inlet(background_node_client, service)
.await
.map(Some)
}
}

/// Returns true if the inlet is already connected to the destination node
/// if any error occurs, it returns false
async fn is_connected(&self, service: &IncomingService, inlet_node_name: &str) -> bool {
if self.state().await.nodes.exists(inlet_node_name) {
if let Ok(mut inlet_node) = self.background_node(inlet_node_name).await {
inlet_node.set_timeout(Duration::from_secs(5));
if let Ok(Reply::Successful(inlet)) = inlet_node
.show_inlet(&self.context(), service.inlet_name())
.await
{
if inlet.status == ConnectionStatus::Up {
debug!(node = %inlet_node_name, alias = %inlet.alias, "TCP inlet is already up");
return true;
}
}
}
}
false
}

/// Create the tcp-inlet for the accepted invitation
/// Returns the inlet [`Port`] if successful
async fn create_inlet(
&self,
background_node_client: Arc<dyn BackgroundNodeClient>,
service: &IncomingService,
) -> crate::Result<Port> {
debug!(
service_name = service.name(),
"Creating TCP inlet for accepted invitation"
);

let local_node_name = service.local_node_name();
background_node_client
.projects()
.enroll(
&local_node_name,
&service.enrollment_ticket().hex_encoded()?,
)
.await?;

// Recreate the node using the trust context
debug!(node = %local_node_name, "Creating node to host TCP inlet");
let _ = self.delete_background_node(&local_node_name).await;
background_node_client
.nodes()
.create(&local_node_name)
.await?;
tokio::time::sleep(Duration::from_millis(250)).await;

let mut inlet_node = self.background_node(&local_node_name).await?;
inlet_node.set_timeout(Duration::from_secs(5));

let bind_address = match service.address() {
Some(address) => address,
None => get_free_address()?,
};

inlet_node
.create_inlet(
&self.context(),
&bind_address.to_string(),
&MultiAddr::from_str(&service.service_route()).into_diagnostic()?,
&Some(service.inlet_name().to_string()),
&None,
Duration::from_secs(5),
)
.await?;
Ok(bind_address.port())
}

pub(crate) async fn disable_tcp_inlet(&self, invitation_id: &str) -> crate::Result<()> {
// first change the in-memory state
let service = {
// we want to reduce the scope of the guard as much as possible
let incoming_services_arc = self.incoming_services();
let mut writer = incoming_services_arc.write().await;
let mut service = writer.find_mut_by_id(invitation_id);
if let Some(service) = service.as_mut() {
if !service.enabled() {
debug!(node = %service.local_node_name(), alias = %service.name(), "TCP inlet was already disconnected");
return Ok(());
}
service.disable();
service.clone()
} else {
return Ok(());
}
};

// this is an async operation, let's give feedback to the user as soon as possible
self.publish_state().await;
// change the persistent state
self.model_mut(|model| {
let service = model.upsert_incoming_service(invitation_id);
service.enabled = false;
})
.await?;
self.background_node(&service.local_node_name())
.await?
.delete_inlet(&self.context(), service.inlet_name())
.await?;

Ok(())
}

pub(crate) async fn enable_tcp_inlet(&self, invitation_id: &str) -> crate::Result<()> {
// first change the in-memory state
let changed = {
// we want to reduce the scope of the guard as much as possible
let incoming_services_arc = self.incoming_services();
let mut writer = incoming_services_arc.write().await;
let mut service = writer.find_mut_by_id(invitation_id);
if let Some(service) = service.as_mut() {
if service.enabled() {
debug!(node = %service.local_node_name(), alias = %service.name(), "TCP inlet was already enabled");
return Ok(());
}
service.enable();
info!(node = %service.local_node_name(), alias = %service.name(), "Enabled TCP inlet");
true
} else {
false
}
};

if changed {
// this is an async operation, let's give feedback to the user as soon as possible
self.publish_state().await;
// change the persistent state
self.model_mut(|model| {
let service = model.upsert_incoming_service(invitation_id);
service.enabled = true;
})
.await?;
}
Ok(())
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
mod commands;
mod state;

pub use state::IncomingServicesState;
pub use state::PersistentIncomingServiceState;
Loading

0 comments on commit 0b82836

Please sign in to comment.