Skip to content

Commit

Permalink
feat(rust): added persistent state for incoming services in the app
Browse files Browse the repository at this point in the history
  • Loading branch information
davide-baldo committed Nov 13, 2023
1 parent c8ed28a commit 1e1f3c6
Show file tree
Hide file tree
Showing 15 changed files with 806 additions and 616 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions implementations/rust/ockam/ockam_app_lib/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ futures = { version = "0.3.28", default-features = false }
hex = { version = "0.4.3", default-features = false, features = ["alloc", "serde"] }
libc = "0.2"
miette = { version = "5.10.0", features = ["fancy-no-backtrace"] }
minicbor = { version = "0.20.0", features = ["alloc", "derive"] }
ockam = { path = "../ockam", version = "^0.103.0", features = ["software_vault"] }
ockam_api = { path = "../ockam_api", version = "0.46.0", features = ["std"] }
ockam_core = { path = "../ockam_core", version = "^0.93.0" }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ extern "C" fn initialize_application(
Some(super::notification::rust::NotificationCallback::new(
notification_callback,
)),
None,
);

let app_state = match result {
Expand Down Expand Up @@ -202,7 +203,7 @@ extern "C" fn disable_accepted_service(invitation_id: *const c_char) {
};
let app_state = unsafe { APPLICATION_STATE.as_ref() }.expect(ERROR_NOT_INITIALIZED);
app_state.context().runtime().spawn(async move {
let result = app_state.disconnect_tcp_inlet(&invitation_id).await;
let result = app_state.disable_tcp_inlet(&invitation_id).await;
if let Err(err) = result {
error!(?err, "Couldn't disable the service");
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
use crate::background_node::BackgroundNodeClient;
use crate::incoming_services::state::IncomingService;
use crate::state::AppState;
use miette::IntoDiagnostic;
use ockam_api::address::get_free_address;
use ockam_api::cli_state::StateDirTrait;
use ockam_api::nodes::service::portals::Inlets;
use ockam_api::ConnectionStatus;
use ockam_multiaddr::MultiAddr;
use std::net::SocketAddr;
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
use tracing::{debug, info, warn};

impl AppState {
pub(crate) async fn refresh_inlets(&self) -> crate::Result<()> {
info!("Refreshing inlets");

// for each invitation it checks if the relative node is running
// if not, it deletes the node and re-create the inlet

let services_arc = self.incoming_services();
let services = {
// reduce locking as much as possible to make UI consistently responsive
services_arc.read().await.services.clone()
};
if services.is_empty() {
debug!("No incoming services, skipping inlets refresh");
return Ok(());
}

let background_node_client = self.background_node_client().await;
for service in services {
let result = self
.refresh_inlet(background_node_client.clone(), &service)
.await;
{
// we want to reduce the scope of the guard as much as possible
let mut guard = services_arc.write().await;
match result {
Ok(port) => {
if let Some(service) = guard.find_mut_by_id(service.id()) {
service.set_port(port)
}
}
Err(err) => {
warn!(%err, "Failed to refresh TCP inlet for accepted invitation");
if let Some(service) = guard.find_mut_by_id(service.id()) {
service.set_port(None)
}
}
}
}
self.publish_state().await;
}

info!("Inlets refreshed");
Ok(())
}

async fn refresh_inlet(
&self,
background_node_client: Arc<dyn BackgroundNodeClient>,
service: &IncomingService,
) -> crate::Result<Option<u16>> {
let inlet_node_name = &service.local_node_name()?;
debug!(node = %inlet_node_name, "Checking node status");
if !service.enabled() {
debug!(node = %inlet_node_name, "TCP inlet is disabled by the user, deleting the node");
let _ = self.delete_background_node(inlet_node_name).await;
return Ok(None);
}

if self.state().await.nodes.exists(inlet_node_name) {
let mut inlet_node = self.background_node(inlet_node_name).await?;
inlet_node.set_timeout(Duration::from_secs(5));

if let Ok(inlet) = inlet_node
.show_inlet(&self.context(), service.name())
.await?
.success()
{
if inlet.status == ConnectionStatus::Up {
debug!(node = %inlet_node_name, alias = %inlet.alias, "TCP inlet is already up");
let bind_address: SocketAddr = inlet.bind_addr.parse()?;
return Ok(Some(bind_address.port()));
}
}
}

self.create_inlet(background_node_client, service)
.await
.map(Some)
}

/// Create the tcp-inlet for the accepted invitation
/// Returns the inlet SocketAddr
async fn create_inlet(
&self,
background_node_client: Arc<dyn BackgroundNodeClient>,
service: &IncomingService,
) -> crate::Result<u16> {
debug!(
service_name = service.name(),
"Creating TCP inlet for accepted invitation"
);

let local_node_name = service.local_node_name()?;

let port = match service.port() {
Some(port) => port,
None => get_free_address()?.port(),
};

background_node_client
.projects()
.enroll(
&local_node_name,
&service.enrollment_ticket()?.hex_encoded()?,
)
.await?;

// Recreate the node using the trust context
debug!(node = %local_node_name, "Creating node to host TCP inlet");
let _ = self.delete_background_node(&local_node_name).await;
background_node_client
.nodes()
.create(&local_node_name)
.await?;
tokio::time::sleep(Duration::from_millis(250)).await;

let mut inlet_node = self.background_node(&local_node_name).await?;
inlet_node.set_timeout(Duration::from_secs(5));
inlet_node
.create_inlet(
&self.context(),
&format!("127.0.0.1:{}", port.to_string()),
&MultiAddr::from_str(&service.service_route()?).into_diagnostic()?,
&Some(service.inlet_name()),
&None,
Duration::from_secs(5),
)
.await?;
Ok(port)
}

pub(crate) async fn disable_tcp_inlet(&self, invitation_id: &str) -> crate::Result<()> {
// first change the in-memory state
let service = {
let incoming_services_arc = self.incoming_services();
let mut writer = incoming_services_arc.write().await;
let mut service = writer.find_mut_by_id(invitation_id);
if let Some(service) = service.as_mut() {
if !service.enabled() {
debug!(node = %service.local_node_name()?, alias = %service.name(), "TCP inlet was already disconnected");
return Ok(());
}
service.disable();
}
service.cloned()
};

if let Some(service) = service {
// this is an async operation, let's give feedback to the user as soon as possible
self.publish_state().await;
// change the persistent state
self.model_mut(|model| {
let service = model.upsert_incoming_service(invitation_id);
service.enabled = false;
})
.await?;
self.background_node(&service.local_node_name()?)
.await?
.delete_inlet(&self.context(), &service.inlet_name())
.await?;
}
Ok(())
}

pub(crate) async fn enable_tcp_inlet(&self, invitation_id: &str) -> crate::Result<()> {
// first change the in-memory state
let changed = {
let incoming_services_arc = self.incoming_services();
let mut writer = incoming_services_arc.write().await;
let mut service = writer.find_mut_by_id(invitation_id);
if let Some(service) = service.as_mut() {
if service.enabled() {
debug!(node = %service.local_node_name()?, alias = %service.name(), "TCP inlet was already enabled");
return Ok(());
}
service.enable();
info!(node = %service.local_node_name()?, alias = %service.name(), "Enabled TCP inlet");
true
} else {
false
}
};

if changed {
// this is an async operation, let's give feedback to the user as soon as possible
self.publish_state().await;
// change the persistent state
self.model_mut(|model| {
let service = model.upsert_incoming_service(invitation_id);
service.enabled = true;
})
.await?;
}
Ok(())
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
mod commands;
mod state;

pub use state::IncomingServicesState;
pub use state::PersistentIncomingServiceState;
Loading

0 comments on commit 1e1f3c6

Please sign in to comment.