From ab050caf40c5e4c727fb37f1259fea4d2dd7b602 Mon Sep 17 00:00:00 2001 From: Oleksandr Deundiak Date: Tue, 23 Jul 2024 17:39:19 +0200 Subject: [PATCH] feat(rust): rework `Session`s --- .../rust/ockam/ockam_api/src/lib.rs | 2 +- .../ockam_api/src/nodes/connection/mod.rs | 21 +- .../src/nodes/connection/plain_tcp.rs | 3 +- .../ockam_api/src/nodes/connection/project.rs | 5 +- .../ockam_api/src/nodes/connection/secure.rs | 3 +- .../ockam_api/src/nodes/models/portal.rs | 2 +- .../ockam/ockam_api/src/nodes/models/relay.rs | 16 + .../ockam/ockam_api/src/nodes/registry.rs | 38 +- .../src/nodes/service/in_memory_node.rs | 2 +- .../ockam_api/src/nodes/service/manager.rs | 18 +- .../ockam_api/src/nodes/service/messages.rs | 6 +- .../ockam_api/src/nodes/service/relay.rs | 91 ++-- .../src/nodes/service/secure_channel.rs | 4 +- .../nodes/service/tcp_inlets/node_manager.rs | 115 ++--- .../service/tcp_inlets/session_replacer.rs | 27 +- .../ockam_api/src/nodes/service/worker.rs | 15 +- .../ockam/ockam_api/src/session/collector.rs | 19 +- .../src/session/connection_status.rs | 42 ++ .../ockam/ockam_api/src/session/handle.rs | 41 -- .../ockam_api/src/session/inner_session.rs | 71 --- .../rust/ockam/ockam_api/src/session/medic.rs | 256 ----------- .../ockam/ockam_api/src/session/message.rs | 35 -- .../rust/ockam/ockam_api/src/session/mod.rs | 7 +- .../rust/ockam/ockam_api/src/session/ping.rs | 34 ++ .../ockam/ockam_api/src/session/replacer.rs | 34 ++ .../ockam/ockam_api/src/session/session.rs | 415 ++++++++++++------ .../rust/ockam/ockam_api/src/session/tests.rs | 152 +++---- .../src/shared_service/relay/create.rs | 2 +- tools/stress-test/src/portal_simulator.rs | 2 +- 29 files changed, 658 insertions(+), 820 deletions(-) create mode 100644 implementations/rust/ockam/ockam_api/src/session/connection_status.rs delete mode 100644 implementations/rust/ockam/ockam_api/src/session/handle.rs delete mode 100644 implementations/rust/ockam/ockam_api/src/session/inner_session.rs delete mode 100644 implementations/rust/ockam/ockam_api/src/session/medic.rs delete mode 100644 implementations/rust/ockam/ockam_api/src/session/message.rs create mode 100644 implementations/rust/ockam/ockam_api/src/session/ping.rs create mode 100644 implementations/rust/ockam/ockam_api/src/session/replacer.rs diff --git a/implementations/rust/ockam/ockam_api/src/lib.rs b/implementations/rust/ockam/ockam_api/src/lib.rs index af1e2b89e1d..a57f88fa4f2 100644 --- a/implementations/rust/ockam/ockam_api/src/lib.rs +++ b/implementations/rust/ockam/ockam_api/src/lib.rs @@ -54,7 +54,7 @@ pub use error::*; pub use influxdb_token_lease::*; pub use nodes::service::default_address::*; pub use rendezvous_healthcheck::*; -pub use session::session::ConnectionStatus; +pub use session::connection_status::ConnectionStatus; pub use ui::*; pub use util::*; pub use version::*; diff --git a/implementations/rust/ockam/ockam_api/src/nodes/connection/mod.rs b/implementations/rust/ockam/ockam_api/src/nodes/connection/mod.rs index 2f6707d6c8c..2068ab35232 100644 --- a/implementations/rust/ockam/ockam_api/src/nodes/connection/mod.rs +++ b/implementations/rust/ockam/ockam_api/src/nodes/connection/mod.rs @@ -19,7 +19,6 @@ pub(crate) use plain_tcp::PlainTcpInstantiator; pub(crate) use project::ProjectInstantiator; pub(crate) use secure::SecureChannelInstantiator; use std::fmt::{Debug, Formatter}; -use std::sync::Arc; #[derive(Clone)] pub struct Connection { @@ -43,7 +42,7 @@ pub struct Connection { impl Connection { /// Shorthand to add the address as consumer to the flow control - pub fn add_consumer(&self, context: Arc, address: &Address) { + pub fn add_consumer(&self, context: &Context, address: &Address) { if let Some(flow_control_id) = &self.flow_control_id { context .flow_controls() @@ -51,10 +50,10 @@ impl Connection { } } - pub fn add_default_consumers(&self, ctx: Arc) { - self.add_consumer(ctx.clone(), &DefaultAddress::KEY_EXCHANGER_LISTENER.into()); - self.add_consumer(ctx.clone(), &DefaultAddress::SECURE_CHANNEL_LISTENER.into()); - self.add_consumer(ctx.clone(), &DefaultAddress::UPPERCASE_SERVICE.into()); + pub fn add_default_consumers(&self, ctx: &Context) { + self.add_consumer(ctx, &DefaultAddress::KEY_EXCHANGER_LISTENER.into()); + self.add_consumer(ctx, &DefaultAddress::SECURE_CHANNEL_LISTENER.into()); + self.add_consumer(ctx, &DefaultAddress::UPPERCASE_SERVICE.into()); self.add_consumer(ctx, &DefaultAddress::ECHO_SERVICE.into()); } @@ -182,7 +181,7 @@ pub trait Instantiator: Send + Sync + 'static { /// The returned [`Changes`] will be used to update the builder state. async fn instantiate( &self, - ctx: Arc, + ctx: &Context, node_manager: &NodeManager, transport_route: Route, extracted: (MultiAddr, MultiAddr, MultiAddr), @@ -217,7 +216,7 @@ impl ConnectionBuilder { /// user make sure higher protocol abstraction are called before lower level ones pub async fn instantiate( mut self, - ctx: Arc, + ctx: &Context, node_manager: &NodeManager, instantiator: impl Instantiator, ) -> Result { @@ -233,14 +232,14 @@ impl ConnectionBuilder { // the transport route should include only the pieces before the match self.transport_route = self .recalculate_transport_route( - &ctx, + ctx, self.current_multiaddr.split(start).0, false, ) .await?; let mut changes = instantiator .instantiate( - ctx.clone(), + ctx, node_manager, self.transport_route.clone(), self.extract(start, instantiator.matches().len()), @@ -271,7 +270,7 @@ impl ConnectionBuilder { } self.transport_route = self - .recalculate_transport_route(&ctx, self.current_multiaddr.clone(), true) + .recalculate_transport_route(ctx, self.current_multiaddr.clone(), true) .await?; Ok(Self { diff --git a/implementations/rust/ockam/ockam_api/src/nodes/connection/plain_tcp.rs b/implementations/rust/ockam/ockam_api/src/nodes/connection/plain_tcp.rs index 6f835f2c6fe..faaa3eb4199 100644 --- a/implementations/rust/ockam/ockam_api/src/nodes/connection/plain_tcp.rs +++ b/implementations/rust/ockam/ockam_api/src/nodes/connection/plain_tcp.rs @@ -1,7 +1,6 @@ use crate::error::ApiError; use crate::nodes::connection::{Changes, ConnectionBuilder, Instantiator}; use crate::{multiaddr_to_route, route_to_multiaddr}; -use std::sync::Arc; use crate::nodes::NodeManager; use ockam_core::{async_trait, Error, Route}; @@ -30,7 +29,7 @@ impl Instantiator for PlainTcpInstantiator { async fn instantiate( &self, - _ctx: Arc, + _ctx: &Context, node_manager: &NodeManager, _transport_route: Route, extracted: (MultiAddr, MultiAddr, MultiAddr), diff --git a/implementations/rust/ockam/ockam_api/src/nodes/connection/project.rs b/implementations/rust/ockam/ockam_api/src/nodes/connection/project.rs index 3b7898d8391..154d6cba092 100644 --- a/implementations/rust/ockam/ockam_api/src/nodes/connection/project.rs +++ b/implementations/rust/ockam/ockam_api/src/nodes/connection/project.rs @@ -2,7 +2,6 @@ use crate::error::ApiError; use crate::nodes::connection::{Changes, Instantiator}; use crate::nodes::NodeManager; use crate::{multiaddr_to_route, try_address_to_multiaddr}; -use std::sync::Arc; use ockam_core::{async_trait, Error, Route}; use ockam_multiaddr::proto::Project; @@ -36,7 +35,7 @@ impl Instantiator for ProjectInstantiator { async fn instantiate( &self, - ctx: Arc, + ctx: &Context, node_manager: &NodeManager, _transport_route: Route, extracted: (MultiAddr, MultiAddr, MultiAddr), @@ -66,7 +65,7 @@ impl Instantiator for ProjectInstantiator { debug!("create a secure channel to the project {project_identifier}"); let sc = node_manager .create_secure_channel_internal( - &ctx, + ctx, tcp.route, &self.identifier.clone(), Some(vec![project_identifier]), diff --git a/implementations/rust/ockam/ockam_api/src/nodes/connection/secure.rs b/implementations/rust/ockam/ockam_api/src/nodes/connection/secure.rs index ee0ee3bc67d..7903c177764 100644 --- a/implementations/rust/ockam/ockam_api/src/nodes/connection/secure.rs +++ b/implementations/rust/ockam/ockam_api/src/nodes/connection/secure.rs @@ -1,4 +1,3 @@ -use std::sync::Arc; use std::time::Duration; use crate::nodes::connection::{Changes, Instantiator}; @@ -41,7 +40,7 @@ impl Instantiator for SecureChannelInstantiator { async fn instantiate( &self, - ctx: Arc, + ctx: &Context, node_manager: &NodeManager, transport_route: Route, extracted: (MultiAddr, MultiAddr, MultiAddr), diff --git a/implementations/rust/ockam/ockam_api/src/nodes/models/portal.rs b/implementations/rust/ockam/ockam_api/src/nodes/models/portal.rs index 6ba3de6214f..ae5fef4b114 100644 --- a/implementations/rust/ockam/ockam_api/src/nodes/models/portal.rs +++ b/implementations/rust/ockam/ockam_api/src/nodes/models/portal.rs @@ -16,7 +16,7 @@ use crate::colors::color_primary; use crate::error::ApiError; use crate::output::Output; -use crate::session::session::ConnectionStatus; +use crate::session::connection_status::ConnectionStatus; use crate::{route_to_multiaddr, try_address_to_multiaddr}; /// Request body to create an inlet diff --git a/implementations/rust/ockam/ockam_api/src/nodes/models/relay.rs b/implementations/rust/ockam/ockam_api/src/nodes/models/relay.rs index 997b1f7cd85..237be505e6d 100644 --- a/implementations/rust/ockam/ockam_api/src/nodes/models/relay.rs +++ b/implementations/rust/ockam/ockam_api/src/nodes/models/relay.rs @@ -10,6 +10,8 @@ use ockam_multiaddr::MultiAddr; use crate::colors::OckamColor; use crate::error::ApiError; use crate::output::{colorize_connection_status, Output}; +use crate::session::replacer::ReplacerOutputKind; +use crate::session::session::Session; use crate::{route_to_multiaddr, ConnectionStatus}; /// Request body when instructing a node to create a relay @@ -94,6 +96,20 @@ impl RelayInfo { } } + pub fn from_session(session: &Session, destination_address: MultiAddr, alias: String) -> Self { + let relay_info = Self::new(destination_address, alias, session.connection_status()); + if let Some(outcome) = session.last_outcome() { + match outcome { + ReplacerOutputKind::Relay(info) => relay_info.with(info), + ReplacerOutputKind::Inlet(_) => { + panic!("InletInfo should not be in the registry") + } + } + } else { + relay_info + } + } + pub fn with(self, remote_relay_info: RemoteRelayInfo) -> Self { Self { forwarding_route: Some(remote_relay_info.forwarding_route().to_string()), diff --git a/implementations/rust/ockam/ockam_api/src/nodes/registry.rs b/implementations/rust/ockam/ockam_api/src/nodes/registry.rs index 4a56a37d3ea..8a1b9fa7730 100644 --- a/implementations/rust/ockam/ockam_api/src/nodes/registry.rs +++ b/implementations/rust/ockam/ockam_api/src/nodes/registry.rs @@ -1,16 +1,18 @@ use crate::cli_state::random_name; -use crate::nodes::models::relay::RelayInfo; -use crate::session::session::{ReplacerOutputKind, Session}; use crate::DefaultAddress; + use ockam::identity::Identifier; use ockam::identity::{SecureChannel, SecureChannelListener}; use ockam_core::compat::collections::BTreeMap; use ockam_core::{Address, Route}; use ockam_multiaddr::MultiAddr; -use ockam_node::compat::asynchronous::RwLock; +use ockam_node::compat::asynchronous::{Mutex, RwLock}; use ockam_transport_core::HostnamePort; + +use crate::session::session::Session; use std::borrow::Borrow; use std::fmt::Display; +use std::sync::Arc; #[derive(Default)] pub(crate) struct SecureChannelRegistry { @@ -127,7 +129,7 @@ impl KafkaServiceInfo { pub(crate) struct InletInfo { pub(crate) bind_addr: String, pub(crate) outlet_addr: MultiAddr, - pub(crate) session: Session, + pub(crate) session: Arc>, } impl InletInfo { @@ -135,7 +137,7 @@ impl InletInfo { Self { bind_addr: bind_addr.to_owned(), outlet_addr, - session, + session: Arc::new(Mutex::new(session)), } } } @@ -160,31 +162,7 @@ impl OutletInfo { pub struct RegistryRelayInfo { pub(crate) destination_address: MultiAddr, pub(crate) alias: String, - pub(crate) session: Session, -} - -impl From for RelayInfo { - fn from(registry_relay_info: RegistryRelayInfo) -> Self { - let session_lock = registry_relay_info.session.lock(); - let relay_info = RelayInfo::new( - registry_relay_info.destination_address.clone(), - registry_relay_info.alias.clone(), - session_lock.connection_status(), - ); - - let current_relay_status = session_lock.status().map(|info| match info.kind { - ReplacerOutputKind::Inlet(_) => { - panic!("InletInfo should not be in the registry") - } - ReplacerOutputKind::Relay(info) => info, - }); - - if let Some(current_relay_status) = current_relay_status { - relay_info.with(current_relay_status) - } else { - relay_info - } - } + pub(crate) session: Arc>, } #[derive(Default)] diff --git a/implementations/rust/ockam/ockam_api/src/nodes/service/in_memory_node.rs b/implementations/rust/ockam/ockam_api/src/nodes/service/in_memory_node.rs index acc1b11be1e..06781287d89 100644 --- a/implementations/rust/ockam/ockam_api/src/nodes/service/in_memory_node.rs +++ b/implementations/rust/ockam/ockam_api/src/nodes/service/in_memory_node.rs @@ -179,7 +179,7 @@ impl InMemoryNode { } pub async fn stop(&self, ctx: &Context) -> Result<()> { - self.medic_handle.stop_medic(ctx).await?; + // self.session_handle.stop_session(ctx).await?; FIXME for addr in DefaultAddress::iter() { let result = ctx.stop_worker(addr).await; // when stopping we can safely ignore missing services diff --git a/implementations/rust/ockam/ockam_api/src/nodes/service/manager.rs b/implementations/rust/ockam/ockam_api/src/nodes/service/manager.rs index d075e7962c8..5887526c43a 100644 --- a/implementations/rust/ockam/ockam_api/src/nodes/service/manager.rs +++ b/implementations/rust/ockam/ockam_api/src/nodes/service/manager.rs @@ -15,7 +15,6 @@ use crate::nodes::service::{ use crate::cli_state::journeys::{NODE_NAME, USER_EMAIL, USER_NAME}; use crate::logs::CurrentSpan; -use crate::session::handle::MedicHandle; use crate::{ApiError, CliState, DefaultAddress}; use miette::IntoDiagnostic; use ockam::identity::{ @@ -60,7 +59,6 @@ pub struct NodeManager { pub(crate) credential_retriever_creators: CredentialRetrieverCreators, pub(super) project_authority: Option, pub(crate) registry: Arc, - pub(crate) medic_handle: MedicHandle, } impl NodeManager { @@ -77,9 +75,6 @@ impl NodeManager { let registry = Arc::new(Registry::default()); - debug!("start the medic"); - let medic_handle = MedicHandle::start_medic(ctx, registry.clone()).await?; - debug!("retrieve the node identifier"); let node_identifier = cli_state.get_node(&node_name).await?.identifier(); @@ -159,7 +154,6 @@ impl NodeManager { credential_retriever_creators, project_authority: trust_options.project_authority, registry, - medic_handle, }; debug!("initializing services"); @@ -280,7 +274,7 @@ impl NodeManager { ); } - // Always start the echoer service as ockam_api::Medic assumes it will be + // Always start the echoer service as ockam_api::Session assumes it will be // started unconditionally on every node. It's used for liveliness checks. ctx.flow_controls() .add_consumer(DefaultAddress::ECHO_SERVICE, &api_flow_control_id); @@ -292,7 +286,7 @@ impl NodeManager { pub async fn make_connection( &self, - ctx: Arc, + ctx: &Context, addr: &MultiAddr, identifier: Identifier, authorized: Option, @@ -307,7 +301,7 @@ impl NodeManager { /// Returns [`Connection`] async fn connect( &self, - ctx: Arc, + ctx: &Context, addr: &MultiAddr, identifier: Identifier, authorized: Option>, @@ -316,15 +310,15 @@ impl NodeManager { debug!(?timeout, "connecting to {}", &addr); let connection = ConnectionBuilder::new(addr.clone()) .instantiate( - ctx.clone(), + ctx, self, ProjectInstantiator::new(identifier.clone(), timeout), ) .await? - .instantiate(ctx.clone(), self, PlainTcpInstantiator::new()) + .instantiate(ctx, self, PlainTcpInstantiator::new()) .await? .instantiate( - ctx.clone(), + ctx, self, SecureChannelInstantiator::new(&identifier, timeout, authorized), ) diff --git a/implementations/rust/ockam/ockam_api/src/nodes/service/messages.rs b/implementations/rust/ockam/ockam_api/src/nodes/service/messages.rs index 6e57b588981..d5122f40c36 100644 --- a/implementations/rust/ockam/ockam_api/src/nodes/service/messages.rs +++ b/implementations/rust/ockam/ockam_api/src/nodes/service/messages.rs @@ -1,13 +1,12 @@ use miette::IntoDiagnostic; use std::str::FromStr; -use std::sync::Arc; use std::time::Duration; use tracing::trace; use minicbor::{CborLen, Decode, Encode}; use ockam_core::api::{Error, Request, Response}; -use ockam_core::{self, async_trait, AsyncTryClone, Result}; +use ockam_core::{self, async_trait, Result}; use ockam_multiaddr::MultiAddr; use ockam_node::{Context, MessageSendReceiveOptions}; @@ -38,9 +37,8 @@ impl Messages for NodeManager { timeout: Option, ) -> miette::Result> { let msg_length = message.len(); - let connection_ctx = Arc::new(ctx.async_try_clone().await.into_diagnostic()?); let connection = self - .make_connection(connection_ctx, to, self.identifier(), None, timeout) + .make_connection(ctx, to, self.identifier(), None, timeout) .await .into_diagnostic()?; let route = connection.route().into_diagnostic()?; diff --git a/implementations/rust/ockam/ockam_api/src/nodes/service/relay.rs b/implementations/rust/ockam/ockam_api/src/nodes/service/relay.rs index 6c2b5af4e98..3f30be5e79e 100644 --- a/implementations/rust/ockam/ockam_api/src/nodes/service/relay.rs +++ b/implementations/rust/ockam/ockam_api/src/nodes/service/relay.rs @@ -11,6 +11,7 @@ use ockam_core::api::{Error, Request, RequestHeader, Response}; use ockam_core::errcode::{Kind, Origin}; use ockam_core::{async_trait, route, Address, AsyncTryClone}; use ockam_multiaddr::MultiAddr; +use ockam_node::compat::asynchronous::Mutex; use ockam_node::Context; use crate::nodes::connection::Connection; @@ -22,8 +23,8 @@ use crate::nodes::registry::RegistryRelayInfo; use crate::nodes::service::in_memory_node::InMemoryNode; use crate::nodes::service::secure_channel::SecureChannelType; use crate::nodes::BackgroundNodeClient; -use crate::session::handle::MedicHandle; -use crate::session::session::{ReplacerOutcome, ReplacerOutputKind, Session, SessionReplacer}; +use crate::session::replacer::{ReplacerOutcome, ReplacerOutputKind, SessionReplacer}; +use crate::session::session::Session; use super::{NodeManager, NodeManagerWorker}; @@ -98,14 +99,17 @@ impl NodeManager { /// This function returns a representation of the relays currently /// registered on this node pub async fn get_relays(&self) -> Vec { - let relays = self - .registry - .relays - .entries() - .await - .into_iter() - .map(|(_, registry_info)| registry_info.into()) - .collect(); + let mut relays = vec![]; + for (_, registry_info) in self.registry.relays.entries().await { + let session = registry_info.session.lock().await; + let info = RelayInfo::from_session( + &session, + registry_info.destination_address.clone(), + registry_info.alias.clone(), + ); + relays.push(info); + } + trace!(?relays, "Relays retrieved"); relays } @@ -115,7 +119,7 @@ impl NodeManager { /// This route is monitored in the `InMemoryNode` and the workers are restarted if necessary /// when the route is unresponsive pub async fn create_relay( - self: &Arc, + self: &Arc, // FIXME? ctx: &Context, addr: &MultiAddr, alias: String, @@ -133,7 +137,7 @@ impl NodeManager { let replacer = RelaySessionReplacer { node_manager: self.clone(), - context: Arc::new(ctx.async_try_clone().await?), + context: ctx.async_try_clone().await?, addr: addr.clone(), relay_address, connection: None, @@ -141,21 +145,33 @@ impl NodeManager { authorized, }; - let mut session = Session::new(replacer); - let relay_info = - MedicHandle::connect(&mut session) - .await - .map(|outcome| match outcome.kind { - ReplacerOutputKind::Relay(status) => status, - _ => { - panic!("Unexpected outcome: {:?}", outcome); - } - })?; + let mut session = Session::create(ctx, replacer).await?; + + let remote_relay_info = session + .initial_connect() + .await + .map(|outcome| match outcome { + ReplacerOutputKind::Relay(status) => status, + _ => { + panic!("Unexpected outcome: {:?}", outcome); + } + })?; + + session.start_monitoring().await?; + + debug!( + forwarding_route = %remote_relay_info.forwarding_route(), + remote_address = %remote_relay_info.remote_address(), + "CreateRelay request processed, sending back response" + ); + + let relay_info = RelayInfo::new(addr.clone(), alias.clone(), session.connection_status()) + .with(remote_relay_info); let registry_relay_info = RegistryRelayInfo { destination_address: addr.clone(), alias: alias.clone(), - session, + session: Arc::new(Mutex::new(session)), }; self.registry @@ -163,13 +179,7 @@ impl NodeManager { .insert(alias, registry_relay_info.clone()) .await; - debug!( - forwarding_route = %relay_info.forwarding_route(), - remote_address = %relay_info.remote_address(), - "CreateRelay request processed, sending back response" - ); - - Ok(registry_relay_info.into()) + Ok(relay_info) } /// Delete a relay. @@ -178,7 +188,7 @@ impl NodeManager { pub async fn delete_relay_impl(&self, alias: &str) -> Result<(), ockam::Error> { if let Some(relay_to_delete) = self.registry.relays.remove(alias).await { debug!(%alias, "Successfully removed relay from node registry"); - let result = relay_to_delete.session.close().await; + let result = relay_to_delete.session.lock().await.stop().await; match result { Ok(_) => { debug!(%alias, "Successfully stopped relay"); @@ -207,7 +217,14 @@ impl NodeManager { ) -> Result, Response> { debug!("Handling ShowRelay request"); if let Some(registry_info) = self.registry.relays.get(alias).await { - Ok(Response::ok().with_headers(req).body(registry_info.into())) + let session = registry_info.session.lock().await; + + let relay_info = RelayInfo::from_session( + &session, + registry_info.destination_address.clone(), + registry_info.alias.clone(), + ); + Ok(Response::ok().with_headers(req).body(relay_info)) } else { error!(%alias, "Relay not found in the node registry"); Err(Response::not_found( @@ -239,7 +256,7 @@ impl InMemoryNode { struct RelaySessionReplacer { node_manager: Arc, - context: Arc, + context: Context, relay_address: Option, // current status @@ -251,24 +268,24 @@ struct RelaySessionReplacer { #[async_trait] impl SessionReplacer for RelaySessionReplacer { - async fn create(&mut self) -> std::result::Result { + async fn create(&mut self) -> Result { debug!(addr = self.addr.to_string(), relay_address = ?self.relay_address, "Handling CreateRelay request"); let connection = self .node_manager .make_connection( - self.context.clone(), + &self.context, &self.addr.clone(), self.node_manager.identifier(), self.authorized.clone(), None, ) .await?; - connection.add_default_consumers(self.context.clone()); + let connection = self.connection.insert(connection); // Add all Hop workers as consumers for Demo purposes // Production nodes should not run any Hop workers for hop in self.node_manager.registry.hop_services.keys().await { - connection.add_consumer(self.context.clone(), &hop); + connection.add_consumer(&self.context, &hop); } let route = connection.route()?; diff --git a/implementations/rust/ockam/ockam_api/src/nodes/service/secure_channel.rs b/implementations/rust/ockam/ockam_api/src/nodes/service/secure_channel.rs index 96da71c54b3..a070a9ed9ef 100644 --- a/implementations/rust/ockam/ockam_api/src/nodes/service/secure_channel.rs +++ b/implementations/rust/ockam/ockam_api/src/nodes/service/secure_channel.rs @@ -12,7 +12,6 @@ use ockam::{Address, Result, Route}; use ockam_core::api::{Error, Response}; use ockam_core::compat::sync::Arc; use ockam_core::errcode::{Kind, Origin}; -use ockam_core::AsyncTryClone; use ockam_multiaddr::MultiAddr; use ockam_node::Context; @@ -182,9 +181,8 @@ impl NodeManager { ) -> Result { let identifier = self.get_identifier_by_name(identity_name.clone()).await?; - let connection_ctx = Arc::new(ctx.async_try_clone().await?); let connection = self - .make_connection(connection_ctx, &addr, identifier.clone(), None, timeout) + .make_connection(ctx, &addr, identifier.clone(), None, timeout) .await?; let sc = self .create_secure_channel_internal( diff --git a/implementations/rust/ockam/ockam_api/src/nodes/service/tcp_inlets/node_manager.rs b/implementations/rust/ockam/ockam_api/src/nodes/service/tcp_inlets/node_manager.rs index cd904a91998..e40a265dd1d 100644 --- a/implementations/rust/ockam/ockam_api/src/nodes/service/tcp_inlets/node_manager.rs +++ b/implementations/rust/ockam/ockam_api/src/nodes/service/tcp_inlets/node_manager.rs @@ -15,8 +15,9 @@ use crate::nodes::models::portal::InletStatus; use crate::nodes::registry::InletInfo; use crate::nodes::service::tcp_inlets::InletSessionReplacer; use crate::nodes::NodeManager; -use crate::session::handle::MedicHandle; -use crate::session::session::{ConnectionStatus, ReplacerOutputKind, Session, MAX_CONNECT_TIME}; +use crate::session::connection_status::ConnectionStatus; +use crate::session::replacer::{ReplacerOutputKind, MAX_CONNECT_TIME}; +use crate::session::session::Session; impl NodeManager { #[allow(clippy::too_many_arguments)] @@ -105,7 +106,7 @@ impl NodeManager { let replacer = InletSessionReplacer { node_manager: self.clone(), udp_transport, - context: Arc::new(ctx.async_try_clone().await?), + context: ctx.async_try_clone().await?, listen_addr: listen_addr.to_string(), outlet_addr: outlet_addr.clone(), prefix_route, @@ -126,17 +127,18 @@ impl NodeManager { .create_tcp_inlet(&self.node_name, &listen_addr, &outlet_addr, &alias) .await?; - let mut session = Session::new(replacer); + let mut session = Session::create(ctx, replacer).await?; + let outcome = if wait_connection { - let result = - MedicHandle::connect(&mut session) - .await - .map(|outcome| match outcome.kind { - ReplacerOutputKind::Inlet(status) => status, - _ => { - panic!("Unexpected outcome: {:?}", outcome) - } - }); + let result = session + .initial_connect() + .await + .map(|outcome| match outcome { + ReplacerOutputKind::Inlet(status) => status, + _ => { + panic!("Unexpected outcome: {:?}", outcome) + } + }); match result { Ok(status) => Some(status), @@ -149,6 +151,10 @@ impl NodeManager { None }; + let connection_status = session.connection_status(); + + session.start_monitoring().await?; + self.registry .inlets .insert( @@ -163,10 +169,7 @@ impl NodeManager { &alias, None, outcome.clone().map(|s| s.route.to_string()), - outcome - .as_ref() - .map(|s| s.connection_status) - .unwrap_or(ConnectionStatus::Down), + connection_status, outlet_addr.to_string(), ); @@ -177,7 +180,7 @@ impl NodeManager { info!(%alias, "Handling request to delete inlet portal"); if let Some(inlet_to_delete) = self.registry.inlets.remove(alias).await { debug!(%alias, "Successfully removed inlet from node registry"); - inlet_to_delete.session.close().await?; + inlet_to_delete.session.lock().await.stop().await?; self.resources().delete_resource(&alias.into()).await?; self.cli_state .delete_tcp_inlet(&self.node_name, alias) @@ -205,19 +208,23 @@ impl NodeManager { pub async fn show_inlet(&self, alias: &str) -> Option { info!(%alias, "Handling request to show inlet portal"); if let Some(inlet_info) = self.registry.inlets.get(alias).await { - if let Some(status) = inlet_info.session.lock().status() { - if let ReplacerOutputKind::Inlet(status) = &status.kind { + let session = inlet_info.session.lock().await; + let connection_status = session.connection_status(); + let outcome = session.last_outcome(); + drop(session); + if let Some(outcome) = outcome { + if let ReplacerOutputKind::Inlet(status) = outcome { Some(InletStatus::new( inlet_info.bind_addr.to_string(), status.worker.address().to_string(), alias, None, status.route.to_string(), - status.connection_status, + connection_status, inlet_info.outlet_addr.to_string(), )) } else { - panic!("Unexpected outcome: {:?}", status.kind) + panic!("Unexpected outcome: {:?}", outcome) } } else { Some(InletStatus::new( @@ -226,7 +233,7 @@ impl NodeManager { alias, None, None, - ConnectionStatus::Down, + connection_status, inlet_info.outlet_addr.to_string(), )) } @@ -237,39 +244,43 @@ impl NodeManager { } pub async fn list_inlets(&self) -> Vec { - self.registry - .inlets - .entries() - .await - .iter() - .map(|(alias, info)| { - if let Some(status) = info.session.lock().status().as_ref() { - match &status.kind { - ReplacerOutputKind::Inlet(status) => InletStatus::new( - &info.bind_addr, - status.worker.address().to_string(), - alias, - None, - status.route.to_string(), - status.connection_status, - info.outlet_addr.to_string(), - ), - _ => { - panic!("Unexpected outcome: {:?}", status.kind) - } - } - } else { - InletStatus::new( + let mut res = vec![]; + for (alias, info) in self.registry.inlets.entries().await { + let session = info.session.lock().await; + let connection_status = session.connection_status(); + let outcome = session.last_outcome(); + drop(session); + + let status = if let Some(outcome) = outcome { + match &outcome { + ReplacerOutputKind::Inlet(status) => InletStatus::new( &info.bind_addr, - None, + status.worker.address().to_string(), alias, None, - None, - ConnectionStatus::Down, + status.route.to_string(), + connection_status, info.outlet_addr.to_string(), - ) + ), + _ => { + panic!("Unexpected outcome: {:?}", outcome) + } } - }) - .collect() + } else { + InletStatus::new( + &info.bind_addr, + None, + alias, + None, + None, + connection_status, + info.outlet_addr.to_string(), + ) + }; + + res.push(status); + } + + res } } diff --git a/implementations/rust/ockam/ockam_api/src/nodes/service/tcp_inlets/session_replacer.rs b/implementations/rust/ockam/ockam_api/src/nodes/service/tcp_inlets/session_replacer.rs index f756d7612dc..78dc1554d9c 100644 --- a/implementations/rust/ockam/ockam_api/src/nodes/service/tcp_inlets/session_replacer.rs +++ b/implementations/rust/ockam/ockam_api/src/nodes/service/tcp_inlets/session_replacer.rs @@ -23,15 +23,14 @@ use crate::error::ApiError; use crate::nodes::connection::Connection; use crate::nodes::service::SecureChannelType; use crate::nodes::NodeManager; -use crate::session::session::{ - ConnectionStatus, CurrentInletStatus, ReplacerOutcome, ReplacerOutputKind, SessionReplacer, - MAX_RECOVERY_TIME, +use crate::session::replacer::{ + CurrentInletStatus, ReplacerOutcome, ReplacerOutputKind, SessionReplacer, MAX_RECOVERY_TIME, }; pub(super) struct InletSessionReplacer { pub(super) node_manager: Arc, pub(super) udp_transport: Option, - pub(super) context: Arc, + pub(super) context: Context, pub(super) listen_addr: String, pub(super) outlet_addr: MultiAddr, pub(super) prefix_route: Route, @@ -111,7 +110,7 @@ impl InletSessionReplacer { async fn spawn_udp_puncture( &mut self, - connection: &Connection, + mut main_route: Route, inlet: Arc, // TODO: PUNCTURE Replace with a RwLock disable_tcp_fallback: bool, ) -> Result<()> { @@ -125,7 +124,6 @@ impl InletSessionReplacer { ))? .clone(); - let mut main_route = connection.route()?; // FIXME: PUNCTURE trimming outlet part, but doesn't look good let main_route: Route = main_route.modify().pop_back().into(); @@ -207,10 +205,11 @@ impl InletSessionReplacer { } async fn create_impl(&mut self) -> Result { + let options = self.inlet_options().await?; let connection = self .node_manager .make_connection( - self.context.clone(), + &self.context, &self.outlet_addr, self.secure_channel_identifier .clone() @@ -219,16 +218,16 @@ impl InletSessionReplacer { Some(self.wait_for_outlet_duration), ) .await?; - + let connection = self.connection.insert(connection); let connection_route = connection.route()?; + let transport_route = connection.transport_route(); //we expect a fully normalized MultiAddr let normalized_route = route![ self.prefix_route.clone(), - connection_route, + connection_route.clone(), self.suffix_route.clone() ]; - let options = self.inlet_options().await?; // TODO: Instead just update the route in the existing inlet // Finally, attempt to create a new inlet using the new route: @@ -238,24 +237,23 @@ impl InletSessionReplacer { .create_inlet(self.listen_addr.clone(), normalized_route.clone(), options) .await? .clone(); - let inlet_address = inlet.processor_address().clone(); let inlet = Arc::new(inlet); self.inlet = Some(inlet.clone()); + let inlet_address = inlet.processor_address().clone(); if self.enable_udp_puncture() { info!("Spawning UDP puncture future"); // TODO: Make sync if disable_tcp_fallback - self.spawn_udp_puncture(&connection, inlet, self.disable_tcp_fallback) + self.spawn_udp_puncture(connection_route, inlet.clone(), self.disable_tcp_fallback) .await?; info!("Spawned UDP puncture future"); } Ok(ReplacerOutcome { - ping_route: connection.transport_route(), + ping_route: transport_route, kind: ReplacerOutputKind::Inlet(CurrentInletStatus { worker: inlet_address, route: normalized_route, - connection_status: ConnectionStatus::Up, }), }) } @@ -270,7 +268,6 @@ impl SessionReplacer for InletSessionReplacer { // possible that there is just a single secure channel used to go directly // to another node. - self.close().await; // TODO: Should not be needed debug!(%self.outlet_addr, "creating new tcp inlet"); // The future is given some limited time to succeed. diff --git a/implementations/rust/ockam/ockam_api/src/nodes/service/worker.rs b/implementations/rust/ockam/ockam_api/src/nodes/service/worker.rs index 8e0efe96ece..a33bd11aded 100644 --- a/implementations/rust/ockam/ockam_api/src/nodes/service/worker.rs +++ b/implementations/rust/ockam/ockam_api/src/nodes/service/worker.rs @@ -5,7 +5,7 @@ use crate::nodes::{InMemoryNode, NODEMANAGER_ADDR}; use crate::DefaultAddress; use minicbor::Decoder; use ockam_core::api::{RequestHeader, Response}; -use ockam_core::{Address, Routed, Worker}; +use ockam_core::{Address, Result, Routed, Worker}; use ockam_node::Context; use std::error::Error; use std::sync::Arc; @@ -20,7 +20,7 @@ impl NodeManagerWorker { NodeManagerWorker { node_manager } } - pub async fn stop(&self, ctx: &Context) -> ockam_core::Result<()> { + pub async fn stop(&self, ctx: &Context) -> Result<()> { self.node_manager.stop(ctx).await?; ctx.stop_worker(NODEMANAGER_ADDR).await?; Ok(()) @@ -36,7 +36,7 @@ impl NodeManagerWorker { ctx: &mut Context, req: &RequestHeader, dec: &mut Decoder<'_>, - ) -> ockam_core::Result> { + ) -> Result> { debug! { target: TARGET, id = %req.id(), @@ -227,17 +227,12 @@ impl Worker for NodeManagerWorker { type Message = Vec; type Context = Context; - async fn shutdown(&mut self, ctx: &mut Self::Context) -> ockam_core::Result<()> { + async fn shutdown(&mut self, _ctx: &mut Self::Context) -> Result<()> { debug!(target: TARGET, "Shutting down NodeManagerWorker"); - self.node_manager.medic_handle.stop_medic(ctx).await?; Ok(()) } - async fn handle_message( - &mut self, - ctx: &mut Context, - msg: Routed>, - ) -> ockam_core::Result<()> { + async fn handle_message(&mut self, ctx: &mut Context, msg: Routed>) -> Result<()> { let return_route = msg.return_route(); let body = msg.into_body()?; let mut dec = Decoder::new(&body); diff --git a/implementations/rust/ockam/ockam_api/src/session/collector.rs b/implementations/rust/ockam/ockam_api/src/session/collector.rs index ef568668f01..0004ad1a003 100644 --- a/implementations/rust/ockam/ockam_api/src/session/collector.rs +++ b/implementations/rust/ockam/ockam_api/src/session/collector.rs @@ -1,29 +1,22 @@ +use crate::session::ping::Ping; use ockam::Worker; -use ockam_core::{Address, Error, Routed, LOCAL}; +use ockam_core::{Error, Routed}; use ockam_node::tokio::sync::mpsc; use ockam_node::Context; -use crate::session::message::Message; - /// A collector receives echo messages and forwards them. #[derive(Debug)] -pub(super) struct Collector(mpsc::Sender); +pub(super) struct Collector(mpsc::Sender); impl Collector { - const NAME: &'static str = "ockam.ping.collector"; - - pub(super) fn address() -> Address { - Address::new_with_string(LOCAL, Self::NAME) - } - - pub fn new(sender: mpsc::Sender) -> Self { + pub fn new(sender: mpsc::Sender) -> Self { Self(sender) } } #[ockam::worker] impl Worker for Collector { - type Message = Message; + type Message = Ping; type Context = Context; async fn handle_message( @@ -32,7 +25,7 @@ impl Worker for Collector { msg: Routed, ) -> Result<(), Error> { if self.0.send(msg.into_body()?).await.is_err() { - debug!("collector could not send message to medic") + debug!("collector could not send message to session") } Ok(()) } diff --git a/implementations/rust/ockam/ockam_api/src/session/connection_status.rs b/implementations/rust/ockam/ockam_api/src/session/connection_status.rs new file mode 100644 index 00000000000..ed2df98dbb1 --- /dev/null +++ b/implementations/rust/ockam/ockam_api/src/session/connection_status.rs @@ -0,0 +1,42 @@ +use core::fmt; +use minicbor::{CborLen, Decode, Encode}; +use serde::{Deserialize, Serialize}; +use std::fmt::Formatter; + +use crate::colors::{color_error, color_ok, color_warn}; +use crate::error::ApiError; + +use ockam_core::Result; + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Encode, Decode, CborLen, Serialize, Deserialize)] +#[rustfmt::skip] +pub enum ConnectionStatus { + #[n(0)] Down, + #[n(1)] Degraded, + #[n(2)] Up, +} + +impl fmt::Display for ConnectionStatus { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + match self { + ConnectionStatus::Down => write!(f, "{}", color_error("DOWN")), + ConnectionStatus::Degraded => write!(f, "{}", color_warn("DEGRADED")), + ConnectionStatus::Up => write!(f, "{}", color_ok("UP")), + } + } +} + +impl TryFrom for ConnectionStatus { + type Error = ApiError; + + fn try_from(value: String) -> Result { + match value.to_lowercase().as_str() { + "down" => Ok(ConnectionStatus::Down), + "degraded" => Ok(ConnectionStatus::Degraded), + "up" => Ok(ConnectionStatus::Up), + _ => Err(ApiError::message(format!( + "Invalid connection status: {value}" + ))), + } + } +} diff --git a/implementations/rust/ockam/ockam_api/src/session/handle.rs b/implementations/rust/ockam/ockam_api/src/session/handle.rs deleted file mode 100644 index cdb6ebd181e..00000000000 --- a/implementations/rust/ockam/ockam_api/src/session/handle.rs +++ /dev/null @@ -1,41 +0,0 @@ -use crate::nodes::registry::Registry; -use ockam_core::compat::sync::Arc; -use ockam_core::Error; -use ockam_node::tokio; -use ockam_node::Context; -use tokio::task::JoinHandle; - -use crate::session::medic::Medic; -use crate::session::session::{ReplacerOutcome, Session}; - -pub(crate) struct MedicHandle { - handle: JoinHandle<()>, -} - -impl MedicHandle { - pub fn new(handle: JoinHandle<()>) -> Self { - Self { handle } - } - - pub async fn start_medic(ctx: &Context, registry: Arc) -> Result { - let medic = Medic::new(registry); - let handle = medic.start(ctx).await?; - let medic_handle = Self::new(handle); - Ok(medic_handle) - } - - pub async fn stop_medic(&self, ctx: &Context) -> Result<(), Error> { - debug!("Shutting down medic"); - Medic::stop(ctx).await?; - self.handle.abort(); - Ok(()) - } - - pub async fn connect(session: &mut Session) -> Result { - // FIXME - let replacer = session.lock().replacer(); - let outcome = replacer.recreate().await?; - session.lock().up(outcome.clone()); - Ok(outcome) - } -} diff --git a/implementations/rust/ockam/ockam_api/src/session/inner_session.rs b/implementations/rust/ockam/ockam_api/src/session/inner_session.rs deleted file mode 100644 index 943608a7dda..00000000000 --- a/implementations/rust/ockam/ockam_api/src/session/inner_session.rs +++ /dev/null @@ -1,71 +0,0 @@ -use std::sync::Arc; - -use crate::session::session::{InnerSessionReplacer, Ping, ReplacerOutcome}; -use crate::ConnectionStatus; -use ockam_core::Route; - -pub struct InnerSession { - // TODO: Add last ping time? - connection: ConnectionStatus, - replacer: Arc, - pings: Vec, - last_outcome: Option, -} - -impl InnerSession { - pub(super) fn new(replacer: Arc) -> Self { - Self { - connection: ConnectionStatus::Down, - replacer, - pings: vec![], - last_outcome: None, - } - } - - pub fn ping_route(&self) -> Option { - self.last_outcome.as_ref().map(|o| o.ping_route.clone()) - } - - pub fn connection_status(&self) -> ConnectionStatus { - self.connection - } - - pub fn status(&self) -> Option { - self.last_outcome.clone() - } - - pub fn degraded(&mut self) { - self.connection = ConnectionStatus::Degraded; - self.last_outcome = None; - } - - pub fn up(&mut self, replacer_outcome: ReplacerOutcome) { - self.connection = ConnectionStatus::Up; - self.last_outcome = Some(replacer_outcome); - } - - pub fn down(&mut self) { - self.connection = ConnectionStatus::Down; - self.last_outcome = None; - } - - pub(super) fn replacer(&self) -> Arc { - self.replacer.clone() - } - - pub fn pings(&self) -> &[Ping] { - &self.pings - } - - pub fn add_ping(&mut self, p: Ping) { - self.pings.push(p); - } - - pub fn clear_pings(&mut self) { - self.pings.clear() - } - - pub fn last_outcome(&self) -> &Option { - &self.last_outcome - } -} diff --git a/implementations/rust/ockam/ockam_api/src/session/medic.rs b/implementations/rust/ockam/ockam_api/src/session/medic.rs deleted file mode 100644 index 4a737a2b44b..00000000000 --- a/implementations/rust/ockam/ockam_api/src/session/medic.rs +++ /dev/null @@ -1,256 +0,0 @@ -use tokio::task::JoinHandle; - -use crate::nodes::registry::Registry; -use ockam::LocalMessage; -use ockam_core::compat::sync::Arc; -use ockam_core::Result; -use ockam_core::{route, Address, AllowAll, DenyAll, Encodable, Error}; -use ockam_node::tokio::sync::mpsc; -use ockam_node::tokio::task::JoinSet; -use ockam_node::tokio::time::{sleep, Duration}; -use ockam_node::Context; -use ockam_node::{tokio, WorkerBuilder}; - -use crate::nodes::service::default_address::DefaultAddress; -use crate::session::collector::Collector; -use crate::session::inner_session::InnerSession; -use crate::session::message::Message; -use crate::session::session::{ConnectionStatus, ReplacerOutcome, Session}; - -const MAX_FAILURES: usize = 3; -const RETRY_DELAY: Duration = Duration::from_secs(5); -const PING_INTERVAL: Duration = Duration::from_secs(10); - -pub struct Medic { - retry_delay: Duration, - ping_interval: Duration, - registry: Arc, - replacements: JoinSet<()>, -} - -impl Medic { - pub fn new(registry: Arc) -> Self { - Self::new_extended(registry, RETRY_DELAY, PING_INTERVAL) - } - - pub fn new_extended( - registry: Arc, - retry_delay: Duration, - ping_interval: Duration, - ) -> Self { - Self { - retry_delay, - ping_interval, - registry, - replacements: JoinSet::new(), - } - } - - pub async fn start(self, ctx: &Context) -> Result, Error> { - // Stop a collector that was previously started if there is any - if ctx.is_worker_registered_at(Collector::address()).await? { - Self::stop(ctx).await? - }; - - let child_ctx = ctx - .new_detached(Address::random_tagged("Medic.ctx"), DenyAll, AllowAll) - .await?; - let (tx, ping_receiver) = mpsc::channel(32); - - WorkerBuilder::new(Collector::new(tx)) - .with_address(Collector::address()) - .with_outgoing_access_control(DenyAll) - .start(ctx) - .await?; - let handle = tokio::spawn(self.run_loop(child_ctx, ping_receiver)); - Ok(handle) - } - - pub async fn stop(ctx: &Context) -> Result<(), Error> { - ctx.stop_worker(Collector::address()).await - } - - fn spawn_ping(ctx: Arc, key: String, session: &mut InnerSession) { - let message = Message::new(key.clone()); - let ping = message.ping; - session.add_ping(ping); - let message = match Encodable::encode(message) { - Ok(message) => message, - Err(err) => { - error!("Error encoding message: {}", err); - return; - } - }; - - // if the session is up, send a ping - let ping_route = match session.ping_route() { - Some(ping_route) => ping_route, - None => return, - }; - - let echo_route = route![ping_route.clone(), DefaultAddress::ECHO_SERVICE]; - trace! { - key = %key, - addr = %ping_route, - ping = %ping, - "send ping" - } - - let next = ping_route - .next() - .cloned() - .unwrap_or(DefaultAddress::ECHO_SERVICE.into()); - - if let Some(flow_control_id) = ctx - .flow_controls() - .find_flow_control_with_producer_address(&next) - .map(|x| x.flow_control_id().clone()) - { - ctx.flow_controls() - .add_consumer(Collector::address(), &flow_control_id); - } - - let local_message = LocalMessage::new() - .with_onward_route(echo_route) - .with_return_route(route![Collector::address()]) - .with_payload(message); - - tokio::spawn(async move { - match ctx.forward(local_message).await { - Ok(_) => { - trace!(key = %key, "sent ping") - } - Err(err) => { - error!(key = %key, err = %err, "failed to send ping") - } - } - }); - } - - /// Continuously check all sessions. - /// - /// This method never returns. It will ping all healthy sessions and - /// trigger replacements for the unhealthy ones. - async fn run_loop(mut self, ctx: Context, pong_receiver: mpsc::Receiver) { - // Will shut down itself when we stop the Collector - tokio::spawn(Self::wait_for_pongs(pong_receiver, self.registry.clone())); - - // Should also shut down itself when all replacer_sender instances are dropped - let (replacer_sender, replacer_receiver) = mpsc::channel(32); - tokio::spawn(Self::wait_for_replacements( - replacer_receiver, - self.registry.clone(), - )); - - let ctx = Arc::new(ctx); - loop { - trace!("check sessions"); - - let sessions = Self::sessions(&self.registry).await; - - for session in sessions { - let mut session_lock = session.lock(); - if session_lock.pings().len() < MAX_FAILURES - && session_lock.connection_status() == ConnectionStatus::Up - { - Self::spawn_ping(ctx.clone(), session.key().to_string(), &mut session_lock); - } else { - let key = session.key().to_string(); - // We reached the maximum number of failures - match session_lock.connection_status() { - ConnectionStatus::Up | ConnectionStatus::Down => { - warn!(%key, "session unresponsive"); - session_lock.degraded(); - let replacer = session_lock.replacer(); - info!(%key, "replacing session"); - let retry_delay = self.retry_delay; - let replacer_sender_clone = replacer_sender.clone(); - self.replacements.spawn(async move { - sleep(retry_delay).await; // FIXME: Why? - let res = replacer.recreate().await; - _ = replacer_sender_clone.send((key, res)).await; - }); - } - ConnectionStatus::Degraded => { - warn!(%key, "session is being replaced"); - } - } - } - } - - sleep(self.ping_interval).await; // FIXME - } - } - - async fn wait_for_pongs(mut pong_receiver: mpsc::Receiver, registry: Arc) { - loop { - match pong_receiver.recv().await { - // Let's note the fact that we received a pong - Some(message) => { - trace!("received pong"); - if let Some(session) = Self::session(®istry, &message.key).await { - let mut session = session.lock(); - if session.pings().contains(&message.ping) { - trace!(key = %message.key, ping = %message.ping, "recv pong"); - session.clear_pings() - } - } - } - // Sender part is dropped - // => Collector is stopped - // => We won't ever receive pongs again - None => return, - } - } - } - - /// Needs replacements and sessions - async fn wait_for_replacements( - mut replacer_receiver: mpsc::Receiver<(String, Result)>, - registry: Arc, - ) { - while let Some((key, res)) = replacer_receiver.recv().await { - match res { - Ok(replacer_outcome) => { - if let Some(session) = Self::session(®istry, &key).await { - info!(key = %key, ping_route = %replacer_outcome.ping_route, "replacement is up"); - let mut session = session.lock(); - session.clear_pings(); - session.up(replacer_outcome); - } - } - Err(err) => { - warn!(key = %key, err = %err, "replacing session failed"); - if let Some(session) = Self::session(®istry, &key).await { - session.lock().down(); - } - } - } - } - } - - async fn sessions(registry: &Registry) -> Vec { - let inlets = registry - .inlets - .values() - .await - .into_iter() - .map(|info| info.session); - - let relays = registry - .relays - .values() - .await - .into_iter() - .map(|info| info.session); - - inlets.chain(relays).collect() - } - - async fn session(registry: &Registry, key: &str) -> Option { - Self::sessions(registry) - .await - .into_iter() - .find(|s| s.key() == key) - } -} diff --git a/implementations/rust/ockam/ockam_api/src/session/message.rs b/implementations/rust/ockam/ockam_api/src/session/message.rs deleted file mode 100644 index 6ebead947f0..00000000000 --- a/implementations/rust/ockam/ockam_api/src/session/message.rs +++ /dev/null @@ -1,35 +0,0 @@ -use minicbor::{CborLen, Decode, Encode}; - -use ockam_core::{Decodable, Encodable, Error}; - -use crate::session::session::Ping; - -#[derive(Debug, Clone, Encode, Decode, CborLen)] -#[rustfmt::skip] -pub struct Message { - #[n(0)] pub(super) key: String, - #[n(1)] pub(super) ping: Ping, -} - -impl Message { - pub(super) fn new(key: String) -> Self { - Self { - key, - ping: Ping::new(), - } - } -} - -impl Encodable for Message { - fn encode(self) -> Result, Error> { - ockam_core::cbor_encode_preallocate(self).map_err(Error::from) - } -} - -impl Decodable for Message { - fn decode(m: &[u8]) -> Result { - minicbor::decode(m).map_err(Error::from) - } -} - -impl ockam_core::Message for Message {} diff --git a/implementations/rust/ockam/ockam_api/src/session/mod.rs b/implementations/rust/ockam/ockam_api/src/session/mod.rs index 8e832f03903..61f913bbc8f 100644 --- a/implementations/rust/ockam/ockam_api/src/session/mod.rs +++ b/implementations/rust/ockam/ockam_api/src/session/mod.rs @@ -1,8 +1,7 @@ pub(crate) mod collector; -pub(crate) mod handle; -pub(crate) mod inner_session; -pub(crate) mod medic; -pub(crate) mod message; +pub(crate) mod connection_status; +pub(crate) mod ping; +pub(crate) mod replacer; #[allow(clippy::module_inception)] pub(crate) mod session; diff --git a/implementations/rust/ockam/ockam_api/src/session/ping.rs b/implementations/rust/ockam/ockam_api/src/session/ping.rs new file mode 100644 index 00000000000..2afd990082d --- /dev/null +++ b/implementations/rust/ockam/ockam_api/src/session/ping.rs @@ -0,0 +1,34 @@ +use core::fmt; +use minicbor::{CborLen, Decode, Encode}; +use ockam_core::{Decodable, Encodable, Error, Message, Result}; +use rand::random; + +#[derive(Debug, Default, Copy, Clone, Encode, Decode, CborLen, PartialEq, Eq)] +#[cbor(transparent)] +pub struct Ping(#[n(0)] u64); + +impl Ping { + pub fn new() -> Self { + Self(random()) + } +} + +impl fmt::Display for Ping { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{:x}", self.0) + } +} + +impl Encodable for Ping { + fn encode(self) -> Result> { + ockam_core::cbor_encode_preallocate(self).map_err(Error::from) + } +} + +impl Decodable for Ping { + fn decode(m: &[u8]) -> Result { + minicbor::decode(m).map_err(Error::from) + } +} + +impl Message for Ping {} diff --git a/implementations/rust/ockam/ockam_api/src/session/replacer.rs b/implementations/rust/ockam/ockam_api/src/session/replacer.rs new file mode 100644 index 00000000000..a96afba0922 --- /dev/null +++ b/implementations/rust/ockam/ockam_api/src/session/replacer.rs @@ -0,0 +1,34 @@ +use std::time::Duration; + +use ockam::remote::RemoteRelayInfo; +use ockam_core::{async_trait, Address, Result, Route}; + +//most sessions replacer are dependent on the node manager, if many session +//fails concurrently, which is the common scenario we need extra time +//to account for the lock contention +pub const MAX_RECOVERY_TIME: Duration = Duration::from_secs(30); +pub const MAX_CONNECT_TIME: Duration = Duration::from_secs(15); + +#[async_trait] +pub trait SessionReplacer: Send + Sync + 'static { + async fn create(&mut self) -> Result; + async fn close(&mut self); +} + +#[derive(Debug, Clone)] +pub struct CurrentInletStatus { + pub route: Route, + pub worker: Address, +} + +#[derive(Debug, Clone)] +pub enum ReplacerOutputKind { + Inlet(CurrentInletStatus), + Relay(RemoteRelayInfo), +} + +#[derive(Debug, Clone)] +pub struct ReplacerOutcome { + pub ping_route: Route, + pub kind: ReplacerOutputKind, +} diff --git a/implementations/rust/ockam/ockam_api/src/session/session.rs b/implementations/rust/ockam/ockam_api/src/session/session.rs index 58aaca7551e..269a3b4f4a6 100644 --- a/implementations/rust/ockam/ockam_api/src/session/session.rs +++ b/implementations/rust/ockam/ockam_api/src/session/session.rs @@ -1,178 +1,315 @@ -use core::fmt; -use std::fmt::Formatter; -use std::sync::{Arc, MutexGuard}; -use std::time::Duration; - -use minicbor::{CborLen, Decode, Encode}; -use ockam::remote::RemoteRelayInfo; -use serde::{Deserialize, Serialize}; -use tokio::sync::Mutex; - -use crate::colors::{color_error, color_ok, color_warn}; -use crate::error::ApiError; -use crate::session::inner_session::InnerSession; -use ockam_core::compat::rand; -use ockam_core::Result; -use ockam_core::{async_trait, Address, Route}; use rand::random; -//most sessions replacer are dependent on the node manager, if many session -//fails concurrently, which is the common scenario we need extra time -//to account for the lock contention -pub const MAX_RECOVERY_TIME: Duration = Duration::from_secs(30); -pub const MAX_CONNECT_TIME: Duration = Duration::from_secs(15); +use crate::nodes::service::default_address::DefaultAddress; +use crate::session::collector::Collector; +use crate::session::connection_status::ConnectionStatus; +use crate::session::ping::Ping; +use crate::session::replacer::{ReplacerOutputKind, SessionReplacer}; -#[async_trait] -pub trait SessionReplacer: Send + 'static { - async fn create(&mut self) -> Result; - async fn close(&mut self) -> (); -} +use ockam::LocalMessage; +use ockam_core::compat::sync::Arc; +use ockam_core::{route, Address, AllowAll, DenyAll, Encodable}; +use ockam_core::{Result, Route}; +use ockam_node::tokio::sync::mpsc; +use ockam_node::tokio::task::JoinHandle; +use ockam_node::tokio::time::{sleep, Duration}; +use ockam_node::Context; +use ockam_node::{tokio, WorkerBuilder}; -#[derive(Debug, Clone)] -pub struct CurrentInletStatus { - pub route: Route, - pub worker: Address, - pub connection_status: ConnectionStatus, -} +use ockam_node::compat::asynchronous::Mutex as AsyncMutex; +use std::sync::Mutex as SyncMutex; -#[derive(Debug, Clone)] -pub enum ReplacerOutputKind { - Inlet(CurrentInletStatus), - Relay(RemoteRelayInfo), -} +const MAX_FAILURES: usize = 3; +const RETRY_DELAY: Duration = Duration::from_secs(5); +const PING_INTERVAL: Duration = Duration::from_secs(10); -#[derive(Debug, Clone)] -pub struct ReplacerOutcome { - pub ping_route: Route, - pub kind: ReplacerOutputKind, +/// State that is accessed from multiple places/threads, therefore needs to be wrapper in Arc> +#[derive(Clone)] +struct SharedState { + /// Current connection status + connection_status: Arc>, + /// Outcome of last session creation + last_outcome: Arc>>, + /// Current route to ping to check connection + ping_route: Arc>>, + /// Replacer impl + replacer: Arc>, + /// Pings that we sent. The whole list is cleared upon receiving an ack + sent_pings: Arc>>, } -pub(super) struct InnerSessionReplacer { - inner: Mutex>, +/// Monitors individual session +pub struct Session { + ctx: Context, + key: String, // Solely for debug purposes/logging + /// Delay before we attempt to recreate the session if the previous attempt failed + retry_delay: Duration, + ping_interval: Duration, + pub(super) collector_address: Address, // FIXME + + shared_state: SharedState, + + run_loop_handle: Option>, + ping_receiver_handle: Option>, } -impl InnerSessionReplacer { - pub fn new(inner: impl SessionReplacer) -> Self { - Self { - inner: Mutex::new(Box::new(inner)), - } +impl Session { + /// Make initial connection [`Session::start_monitoring`] should be called after + pub async fn initial_connect(&mut self) -> Result { + let outcome = self.shared_state.replacer.lock().await.create().await?; + *self.shared_state.connection_status.lock().unwrap() = ConnectionStatus::Up; + self.shared_state.ping_route = Arc::new(SyncMutex::new(Some(outcome.ping_route))); + self.shared_state.last_outcome = Arc::new(SyncMutex::new(Some(outcome.kind.clone()))); + + Ok(outcome.kind) } -} -impl InnerSessionReplacer { - async fn create(&self) -> Result { - self.inner.lock().await.create().await + /// Create a Session + pub async fn create(ctx: &Context, replacer: impl SessionReplacer) -> Result { + Self::create_extended(ctx, replacer, RETRY_DELAY, PING_INTERVAL).await + } + + /// Create a Session + pub async fn create_extended( + ctx: &Context, + replacer: impl SessionReplacer, + retry_delay: Duration, + ping_interval: Duration, + ) -> Result { + let collector_address = Address::random_tagged("Collector"); + let ctx = ctx + .new_detached(Address::random_tagged("Session.ctx"), DenyAll, AllowAll) + .await?; + + let shared_state = SharedState { + connection_status: Arc::new(SyncMutex::new(ConnectionStatus::Down)), + last_outcome: Arc::new(SyncMutex::new(None)), + ping_route: Arc::new(SyncMutex::new(None)), + replacer: Arc::new(AsyncMutex::new(replacer)), + sent_pings: Default::default(), + }; + + Ok(Self { + ctx, + key: hex::encode(random::<[u8; 8]>()), + collector_address, + retry_delay, + ping_interval, + + shared_state, + + run_loop_handle: None, + ping_receiver_handle: None, + }) } - pub async fn close(&self) { - self.inner.lock().await.close().await + /// Current connection status + pub fn connection_status(&self) -> ConnectionStatus { + *self.shared_state.connection_status.lock().unwrap() } - pub async fn recreate(&self) -> Result { - self.close().await; - self.create().await + /// Last session creation outcome + pub fn last_outcome(&self) -> Option { + self.shared_state.last_outcome.lock().unwrap().clone() } -} -#[derive(Clone)] -pub struct Session { - key: String, - inner: Arc>, -} + /// Start monitoring the session + pub async fn start_monitoring(&mut self) -> Result<()> { + let (ping_channel_sender, ping_channel_receiver) = mpsc::channel(3); //FIXME -#[derive(Debug, Clone, Copy, PartialEq, Eq, Encode, Decode, CborLen, Serialize, Deserialize)] -#[rustfmt::skip] -pub enum ConnectionStatus { - #[n(0)] Down, - #[n(1)] Degraded, - #[n(2)] Up, -} + // Will shut down itself when we stop the Collector + self.ping_receiver_handle = Some(tokio::spawn(Self::wait_for_pings( + self.key.clone(), + ping_channel_receiver, + self.shared_state.sent_pings.clone(), + ))); -impl fmt::Display for ConnectionStatus { - fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { - match self { - ConnectionStatus::Down => write!(f, "{}", color_error("DOWN")), - ConnectionStatus::Degraded => write!(f, "{}", color_warn("DEGRADED")), - ConnectionStatus::Up => write!(f, "{}", color_ok("UP")), - } + WorkerBuilder::new(Collector::new(ping_channel_sender)) + .with_address(self.collector_address.clone()) + .with_outgoing_access_control(DenyAll) + .start(&self.ctx) + .await?; + + let ctx = self + .ctx + .new_detached( + Address::random_tagged("Session.ctx.run_loop"), + DenyAll, + AllowAll, + ) + .await?; + + let handle = tokio::spawn(Self::run_loop( + ctx, + self.key.clone(), + self.collector_address.clone(), + self.shared_state.clone(), + self.ping_interval, + self.retry_delay, + )); + + self.run_loop_handle = Some(handle); + + Ok(()) } -} -impl TryFrom for ConnectionStatus { - type Error = ApiError; - - fn try_from(value: String) -> Result { - match value.to_lowercase().as_str() { - "down" => Ok(ConnectionStatus::Down), - "degraded" => Ok(ConnectionStatus::Degraded), - "up" => Ok(ConnectionStatus::Up), - _ => Err(ApiError::message(format!( - "Invalid connection status: {value}" - ))), + /// Stop everything + pub async fn stop(&mut self) -> Result<()> { + if let Some(run_loop_handle) = self.run_loop_handle.take() { + run_loop_handle.abort(); } - } -} -impl fmt::Debug for Session { - fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { - let inner = self.inner.lock().unwrap(); - f.debug_struct("Session") - .field("key", &self.key) - .field("last_outcome", &inner.last_outcome()) - .field("status", &inner.connection_status()) - .field("pings", &inner.pings()) - .finish() + self.shared_state.replacer.lock().await.close().await; + *self.shared_state.last_outcome.lock().unwrap() = None; + + // ping_receiver_handle task will shut down itself when Collector Worker drops the sender + + self.ctx.stop_worker(self.collector_address.clone()).await } -} -impl Session { - /// Create a new session - /// - /// # Arguments - /// - /// * `replacer` - A structure implementing replacer [`SessionReplacer`], - /// which is used to create and close sessions - pub fn new(replacer: impl SessionReplacer) -> Self { - Self { - key: hex::encode(random::<[u8; 8]>()), - inner: Arc::new(std::sync::Mutex::new(InnerSession::new(Arc::new( - InnerSessionReplacer::new(replacer), - )))), + async fn send_ping( + ctx: &Context, + key: &str, + collector_address: Address, + pings: &mut Vec, + ping_route: Route, + ) -> Result<()> { + let ping = Ping::new(); + pings.push(ping); + let ping_encoded = Encodable::encode(ping)?; + + let echo_route = route![ping_route.clone(), DefaultAddress::ECHO_SERVICE]; + trace! { + key = %key, + addr = %ping_route, + ping = %ping, + "send ping" } - } - pub fn key(&self) -> &str { - self.key.as_str() - } + let next = ping_route + .next() + .cloned() + .unwrap_or(DefaultAddress::ECHO_SERVICE.into()); - pub fn lock(&self) -> MutexGuard { - self.inner.lock().unwrap() - } + if let Some(flow_control_id) = ctx + .flow_controls() + .find_flow_control_with_producer_address(&next) + .map(|x| x.flow_control_id().clone()) + { + ctx.flow_controls() + .add_consumer(collector_address.clone(), &flow_control_id); + } - pub async fn close(self) -> Result<()> { - let replacer = { - let mut lock = self.inner.lock().unwrap(); - lock.down(); - lock.replacer() - }; - replacer.close().await; + let local_message = LocalMessage::new() + .with_onward_route(echo_route) + .with_return_route(route![collector_address]) + .with_payload(ping_encoded); + + ctx.forward(local_message).await?; Ok(()) } -} -#[derive(Debug, Default, Copy, Clone, Encode, Decode, CborLen, PartialEq, Eq)] -#[cbor(transparent)] -pub struct Ping(#[n(0)] u64); + /// Continuously check the session. + /// + /// This method never returns. It will ping healthy session and + /// trigger replacements if it's unhealthy. + async fn run_loop( + ctx: Context, + key: String, + collector_address: Address, + shared_state: SharedState, + ping_interval: Duration, + retry_delay: Duration, + ) { + loop { + trace!("check sessions"); + + let mut pings = shared_state.sent_pings.lock().await; + + let connection_status_value = *shared_state.connection_status.lock().unwrap(); + + if pings.len() < MAX_FAILURES && connection_status_value == ConnectionStatus::Up { + let ping_route_value = shared_state.ping_route.lock().unwrap().clone(); + if let Some(ping_route) = ping_route_value { + match Self::send_ping( + &ctx, + &key, + collector_address.clone(), + &mut pings, + ping_route, + ) + .await + { + Ok(_) => { + trace!(key = %key, "sent ping") + } + Err(err) => { + error!(key = %key, err = %err, "failed to send ping") + } + } + } + + drop(pings); -impl Ping { - pub fn new() -> Self { - Self(rand::random()) + sleep(ping_interval).await; + } else { + // We reached the maximum number of failures + match connection_status_value { + ConnectionStatus::Up | ConnectionStatus::Down => { + warn!(key = %key, "session unresponsive. replacing"); + *shared_state.connection_status.lock().unwrap() = + ConnectionStatus::Degraded; + + let mut replacer_lock = shared_state.replacer.lock().await; + replacer_lock.close().await; + *shared_state.ping_route.lock().unwrap() = None; + *shared_state.last_outcome.lock().unwrap() = None; + + match replacer_lock.create().await { + Ok(replacer_outcome) => { + info!(key = %key, ping_route = %replacer_outcome.ping_route, "replacement is up"); + pings.clear(); + *shared_state.connection_status.lock().unwrap() = + ConnectionStatus::Up; + *shared_state.ping_route.lock().unwrap() = + Some(replacer_outcome.ping_route); + *shared_state.last_outcome.lock().unwrap() = + Some(replacer_outcome.kind.clone()); + } + Err(err) => { + warn!(key = %key, err = %err, "replacing session failed"); + + *shared_state.connection_status.lock().unwrap() = + ConnectionStatus::Down; + drop(pings); + drop(replacer_lock); + + // Avoid retrying too often if it fails + sleep(retry_delay).await; + } + } + } + ConnectionStatus::Degraded => { + // FIXME: Should not happen? + warn!(key = %key, "session is being replaced"); + } + } + } + } } -} -impl fmt::Display for Ping { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "{:x}", self.0) + async fn wait_for_pings( + key: String, + mut pong_receiver: mpsc::Receiver, + pings: Arc>>, + ) { + while let Some(ping) = pong_receiver.recv().await { + let mut pings_guard = pings.lock().await; + if pings_guard.contains(&ping) { + trace!(%key, %ping, "recv pong"); + pings_guard.clear() + } + } } } diff --git a/implementations/rust/ockam/ockam_api/src/session/tests.rs b/implementations/rust/ockam/ockam_api/src/session/tests.rs index 9e21ddd680c..a69ac47296b 100644 --- a/implementations/rust/ockam/ockam_api/src/session/tests.rs +++ b/implementations/rust/ockam/ockam_api/src/session/tests.rs @@ -4,14 +4,14 @@ use std::time::Duration; use ockam::{route, Address, Context}; use ockam_core::compat::sync::Arc; use ockam_core::{async_trait, Error, Result}; -use ockam_multiaddr::MultiAddr; use crate::echoer::Echoer; use crate::hop::Hop; -use crate::nodes::registry::Registry; -use crate::session::medic::Medic; -use crate::session::session::{ConnectionStatus, ReplacerOutcome, SessionReplacer}; -use crate::session::session::{CurrentInletStatus, ReplacerOutputKind, Session}; +use crate::session::connection_status::ConnectionStatus; +use crate::session::replacer::{ + CurrentInletStatus, ReplacerOutcome, ReplacerOutputKind, SessionReplacer, +}; +use crate::session::session::Session; #[derive(Clone)] struct MockReplacer { @@ -40,7 +40,6 @@ impl SessionReplacer for MockReplacer { kind: ReplacerOutputKind::Inlet(CurrentInletStatus { route: route!["hop"], worker: Address::from_string("echo"), - connection_status: ConnectionStatus::Up, }), }) } @@ -50,85 +49,88 @@ impl SessionReplacer for MockReplacer { #[ockam::test] async fn test_session_monitoring(ctx: &mut Context) -> Result<()> { - let registry = Arc::new(Registry::default()); + let mock_replacer = MockReplacer::new(); - // Create a new Medic instance - let medic = Medic::new_extended( - registry.clone(), + // Create a new Session instance + let mut session = Session::create_extended( + ctx, + mock_replacer, Duration::from_secs(1), Duration::from_secs(1), - ); - - // Start the Medic in a separate task - let medic_task = medic.start(ctx).await?; + ) + .await?; - // Medic relies on echo to verify if a session is alive + // Session relies on echo to verify if a session is alive ctx.start_worker(Address::from_string("echo"), Echoer) .await?; // Hop serves as simple neutral address we can use ctx.start_worker(Address::from_string("hop"), Hop).await?; - let mock_replacer = MockReplacer::new(); - let session = Session::new(mock_replacer.clone()); - - // by default session is down - assert_eq!(session.lock().connection_status(), ConnectionStatus::Down); - assert_eq!(session.lock().ping_route(), None); + // Start the Session in a separate task + session.start_monitoring().await?; - // mark the session as up - session.lock().up(ReplacerOutcome { - ping_route: route!["broken_route"], - kind: ReplacerOutputKind::Inlet(CurrentInletStatus { - route: route!["broken_route"], - worker: Address::from_string("mock-address"), - connection_status: ConnectionStatus::Up, - }), - }); - - assert_eq!(session.lock().connection_status(), ConnectionStatus::Up); - assert_eq!(session.lock().ping_route().unwrap(), route!["broken_route"]); - - registry - .inlets - .insert( - "inlet-1".into(), - crate::nodes::registry::InletInfo { - bind_addr: "127.0.0.1:10000".to_string(), - outlet_addr: MultiAddr::default(), - session: session.clone(), - }, - ) - .await; - - // Since the route is broken eventually it will be degraded and will call the replacer - while !mock_replacer.called.load(Ordering::Acquire) { - tokio::time::sleep(std::time::Duration::from_millis(100)).await; - } - - // Check the session is now marked as degraded - assert_eq!( - session.lock().connection_status(), - ConnectionStatus::Degraded - ); - assert_eq!(session.lock().ping_route(), None); - - // Now we allow the replacer to return and replace the route - mock_replacer.can_return.store(true, Ordering::Release); - - loop { - // Check that the session is now up, since we don't have any - // synchronization we keep to keep checking until it's up - if session.lock().connection_status() == ConnectionStatus::Up { - assert_eq!(session.lock().ping_route().unwrap(), route!["hop"]); - break; - } - - tokio::time::sleep(Duration::from_millis(100)).await; - continue; - } + ctx.is_worker_registered_at(session.collector_address.clone()) + .await?; - // Shut down the test - medic_task.abort(); - ctx.stop().await + // by default session is down + assert!(session.last_outcome().is_none()); + assert_eq!(session.connection_status(), ConnectionStatus::Down); + + // // mark the session as up + // session.lock().up(ReplacerOutcome { + // ping_route: route!["broken_route"], + // kind: ReplacerOutputKind::Inlet(CurrentInletStatus { + // route: route!["broken_route"], + // worker: Address::from_string("mock-address"), + // connection_status: ConnectionStatus::Up, + // }), + // }); + // + // assert_eq!(session.lock().connection_status(), ConnectionStatus::Up); + // assert_eq!(session.lock().ping_route().unwrap(), route!["broken_route"]); + // + // registry + // .inlets + // .insert( + // "inlet-1".into(), + // crate::nodes::registry::InletInfo { + // bind_addr: "127.0.0.1:10000".to_string(), + // outlet_addr: MultiAddr::default(), + // session: session.clone(), + // }, + // ) + // .await; + // + // // Since the route is broken eventually it will be degraded and will call the replacer + // while !mock_replacer.called.load(Ordering::Acquire) { + // tokio::time::sleep(std::time::Duration::from_millis(100)).await; + // } + // + // // Check the session is now marked as degraded + // assert_eq!( + // session.lock().connection_status(), + // ConnectionStatus::Degraded + // ); + // assert_eq!(session.lock().ping_route(), None); + // + // // Now we allow the replacer to return and replace the route + // mock_replacer.can_return.store(true, Ordering::Release); + // + // loop { + // // Check that the session is now up, since we don't have any + // // synchronization we keep to keep checking until it's up + // if session.lock().connection_status() == ConnectionStatus::Up { + // assert_eq!(session.lock().ping_route().unwrap(), route!["hop"]); + // break; + // } + // + // tokio::time::sleep(Duration::from_millis(100)).await; + // continue; + // } + // + // // Shut down the test + // session_task.abort(); + + Ok(()) } diff --git a/implementations/rust/ockam/ockam_app_lib/src/shared_service/relay/create.rs b/implementations/rust/ockam/ockam_app_lib/src/shared_service/relay/create.rs index 3209550e2c0..8a0e249ed5c 100644 --- a/implementations/rust/ockam/ockam_app_lib/src/shared_service/relay/create.rs +++ b/implementations/rust/ockam/ockam_app_lib/src/shared_service/relay/create.rs @@ -61,7 +61,7 @@ impl AppState { /// Create a relay at the default project if doesn't exist yet /// - /// Once it's created, a `Medic` worker will monitor it and recreate it whenever it's unresponsive + /// Once it's created, a `Session` worker will monitor it and recreate it whenever it's unresponsive async fn create_relay_impl( &self, context: &Context, diff --git a/tools/stress-test/src/portal_simulator.rs b/tools/stress-test/src/portal_simulator.rs index e664bb4f89c..7194610dd9d 100644 --- a/tools/stress-test/src/portal_simulator.rs +++ b/tools/stress-test/src/portal_simulator.rs @@ -58,7 +58,7 @@ pub async fn create( .unwrap(); let connection = node - .make_connection(context.clone(), &to, node.identifier(), None, None) + .make_connection(&context, &to, node.identifier(), None, None) .await?; let processor = PortalSimulatorSender {