Skip to content

Commit

Permalink
feat(rust): rework Sessions
Browse files Browse the repository at this point in the history
  • Loading branch information
SanjoDeundiak committed Jul 23, 2024
1 parent 57d9a54 commit ab050ca
Show file tree
Hide file tree
Showing 29 changed files with 658 additions and 820 deletions.
2 changes: 1 addition & 1 deletion implementations/rust/ockam/ockam_api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ pub use error::*;
pub use influxdb_token_lease::*;
pub use nodes::service::default_address::*;
pub use rendezvous_healthcheck::*;
pub use session::session::ConnectionStatus;
pub use session::connection_status::ConnectionStatus;
pub use ui::*;
pub use util::*;
pub use version::*;
Expand Down
21 changes: 10 additions & 11 deletions implementations/rust/ockam/ockam_api/src/nodes/connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ pub(crate) use plain_tcp::PlainTcpInstantiator;
pub(crate) use project::ProjectInstantiator;
pub(crate) use secure::SecureChannelInstantiator;
use std::fmt::{Debug, Formatter};
use std::sync::Arc;

#[derive(Clone)]
pub struct Connection {
Expand All @@ -43,18 +42,18 @@ pub struct Connection {

impl Connection {
/// Shorthand to add the address as consumer to the flow control
pub fn add_consumer(&self, context: Arc<Context>, address: &Address) {
pub fn add_consumer(&self, context: &Context, address: &Address) {
if let Some(flow_control_id) = &self.flow_control_id {
context
.flow_controls()
.add_consumer(address.clone(), flow_control_id);
}
}

pub fn add_default_consumers(&self, ctx: Arc<Context>) {
self.add_consumer(ctx.clone(), &DefaultAddress::KEY_EXCHANGER_LISTENER.into());
self.add_consumer(ctx.clone(), &DefaultAddress::SECURE_CHANNEL_LISTENER.into());
self.add_consumer(ctx.clone(), &DefaultAddress::UPPERCASE_SERVICE.into());
pub fn add_default_consumers(&self, ctx: &Context) {
self.add_consumer(ctx, &DefaultAddress::KEY_EXCHANGER_LISTENER.into());
self.add_consumer(ctx, &DefaultAddress::SECURE_CHANNEL_LISTENER.into());
self.add_consumer(ctx, &DefaultAddress::UPPERCASE_SERVICE.into());
self.add_consumer(ctx, &DefaultAddress::ECHO_SERVICE.into());
}

Expand Down Expand Up @@ -182,7 +181,7 @@ pub trait Instantiator: Send + Sync + 'static {
/// The returned [`Changes`] will be used to update the builder state.
async fn instantiate(
&self,
ctx: Arc<Context>,
ctx: &Context,
node_manager: &NodeManager,
transport_route: Route,
extracted: (MultiAddr, MultiAddr, MultiAddr),
Expand Down Expand Up @@ -217,7 +216,7 @@ impl ConnectionBuilder {
/// user make sure higher protocol abstraction are called before lower level ones
pub async fn instantiate(
mut self,
ctx: Arc<Context>,
ctx: &Context,
node_manager: &NodeManager,
instantiator: impl Instantiator,
) -> Result<Self, ockam_core::Error> {
Expand All @@ -233,14 +232,14 @@ impl ConnectionBuilder {
// the transport route should include only the pieces before the match
self.transport_route = self
.recalculate_transport_route(
&ctx,
ctx,
self.current_multiaddr.split(start).0,
false,
)
.await?;
let mut changes = instantiator
.instantiate(
ctx.clone(),
ctx,
node_manager,
self.transport_route.clone(),
self.extract(start, instantiator.matches().len()),
Expand Down Expand Up @@ -271,7 +270,7 @@ impl ConnectionBuilder {
}

self.transport_route = self
.recalculate_transport_route(&ctx, self.current_multiaddr.clone(), true)
.recalculate_transport_route(ctx, self.current_multiaddr.clone(), true)
.await?;

Ok(Self {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use crate::error::ApiError;
use crate::nodes::connection::{Changes, ConnectionBuilder, Instantiator};
use crate::{multiaddr_to_route, route_to_multiaddr};
use std::sync::Arc;

use crate::nodes::NodeManager;
use ockam_core::{async_trait, Error, Route};
Expand Down Expand Up @@ -30,7 +29,7 @@ impl Instantiator for PlainTcpInstantiator {

async fn instantiate(
&self,
_ctx: Arc<Context>,
_ctx: &Context,
node_manager: &NodeManager,
_transport_route: Route,
extracted: (MultiAddr, MultiAddr, MultiAddr),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use crate::error::ApiError;
use crate::nodes::connection::{Changes, Instantiator};
use crate::nodes::NodeManager;
use crate::{multiaddr_to_route, try_address_to_multiaddr};
use std::sync::Arc;

use ockam_core::{async_trait, Error, Route};
use ockam_multiaddr::proto::Project;
Expand Down Expand Up @@ -36,7 +35,7 @@ impl Instantiator for ProjectInstantiator {

async fn instantiate(
&self,
ctx: Arc<Context>,
ctx: &Context,
node_manager: &NodeManager,
_transport_route: Route,
extracted: (MultiAddr, MultiAddr, MultiAddr),
Expand Down Expand Up @@ -66,7 +65,7 @@ impl Instantiator for ProjectInstantiator {
debug!("create a secure channel to the project {project_identifier}");
let sc = node_manager
.create_secure_channel_internal(
&ctx,
ctx,
tcp.route,
&self.identifier.clone(),
Some(vec![project_identifier]),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use std::sync::Arc;
use std::time::Duration;

use crate::nodes::connection::{Changes, Instantiator};
Expand Down Expand Up @@ -41,7 +40,7 @@ impl Instantiator for SecureChannelInstantiator {

async fn instantiate(
&self,
ctx: Arc<Context>,
ctx: &Context,
node_manager: &NodeManager,
transport_route: Route,
extracted: (MultiAddr, MultiAddr, MultiAddr),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use crate::colors::color_primary;
use crate::error::ApiError;

use crate::output::Output;
use crate::session::session::ConnectionStatus;
use crate::session::connection_status::ConnectionStatus;
use crate::{route_to_multiaddr, try_address_to_multiaddr};

/// Request body to create an inlet
Expand Down
16 changes: 16 additions & 0 deletions implementations/rust/ockam/ockam_api/src/nodes/models/relay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ use ockam_multiaddr::MultiAddr;
use crate::colors::OckamColor;
use crate::error::ApiError;
use crate::output::{colorize_connection_status, Output};
use crate::session::replacer::ReplacerOutputKind;
use crate::session::session::Session;
use crate::{route_to_multiaddr, ConnectionStatus};

/// Request body when instructing a node to create a relay
Expand Down Expand Up @@ -94,6 +96,20 @@ impl RelayInfo {
}
}

pub fn from_session(session: &Session, destination_address: MultiAddr, alias: String) -> Self {
let relay_info = Self::new(destination_address, alias, session.connection_status());
if let Some(outcome) = session.last_outcome() {
match outcome {
ReplacerOutputKind::Relay(info) => relay_info.with(info),
ReplacerOutputKind::Inlet(_) => {
panic!("InletInfo should not be in the registry")
}
}
} else {
relay_info
}
}

pub fn with(self, remote_relay_info: RemoteRelayInfo) -> Self {
Self {
forwarding_route: Some(remote_relay_info.forwarding_route().to_string()),
Expand Down
38 changes: 8 additions & 30 deletions implementations/rust/ockam/ockam_api/src/nodes/registry.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
use crate::cli_state::random_name;
use crate::nodes::models::relay::RelayInfo;
use crate::session::session::{ReplacerOutputKind, Session};
use crate::DefaultAddress;

use ockam::identity::Identifier;
use ockam::identity::{SecureChannel, SecureChannelListener};
use ockam_core::compat::collections::BTreeMap;
use ockam_core::{Address, Route};
use ockam_multiaddr::MultiAddr;
use ockam_node::compat::asynchronous::RwLock;
use ockam_node::compat::asynchronous::{Mutex, RwLock};
use ockam_transport_core::HostnamePort;

use crate::session::session::Session;
use std::borrow::Borrow;
use std::fmt::Display;
use std::sync::Arc;

#[derive(Default)]
pub(crate) struct SecureChannelRegistry {
Expand Down Expand Up @@ -127,15 +129,15 @@ impl KafkaServiceInfo {
pub(crate) struct InletInfo {
pub(crate) bind_addr: String,
pub(crate) outlet_addr: MultiAddr,
pub(crate) session: Session,
pub(crate) session: Arc<Mutex<Session>>,
}

impl InletInfo {
pub(crate) fn new(bind_addr: &str, outlet_addr: MultiAddr, session: Session) -> Self {
Self {
bind_addr: bind_addr.to_owned(),
outlet_addr,
session,
session: Arc::new(Mutex::new(session)),
}
}
}
Expand All @@ -160,31 +162,7 @@ impl OutletInfo {
pub struct RegistryRelayInfo {
pub(crate) destination_address: MultiAddr,
pub(crate) alias: String,
pub(crate) session: Session,
}

impl From<RegistryRelayInfo> for RelayInfo {
fn from(registry_relay_info: RegistryRelayInfo) -> Self {
let session_lock = registry_relay_info.session.lock();
let relay_info = RelayInfo::new(
registry_relay_info.destination_address.clone(),
registry_relay_info.alias.clone(),
session_lock.connection_status(),
);

let current_relay_status = session_lock.status().map(|info| match info.kind {
ReplacerOutputKind::Inlet(_) => {
panic!("InletInfo should not be in the registry")
}
ReplacerOutputKind::Relay(info) => info,
});

if let Some(current_relay_status) = current_relay_status {
relay_info.with(current_relay_status)
} else {
relay_info
}
}
pub(crate) session: Arc<Mutex<Session>>,
}

#[derive(Default)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ impl InMemoryNode {
}

pub async fn stop(&self, ctx: &Context) -> Result<()> {
self.medic_handle.stop_medic(ctx).await?;
// self.session_handle.stop_session(ctx).await?; FIXME
for addr in DefaultAddress::iter() {
let result = ctx.stop_worker(addr).await;
// when stopping we can safely ignore missing services
Expand Down
18 changes: 6 additions & 12 deletions implementations/rust/ockam/ockam_api/src/nodes/service/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ use crate::nodes::service::{

use crate::cli_state::journeys::{NODE_NAME, USER_EMAIL, USER_NAME};
use crate::logs::CurrentSpan;
use crate::session::handle::MedicHandle;
use crate::{ApiError, CliState, DefaultAddress};
use miette::IntoDiagnostic;
use ockam::identity::{
Expand Down Expand Up @@ -60,7 +59,6 @@ pub struct NodeManager {
pub(crate) credential_retriever_creators: CredentialRetrieverCreators,
pub(super) project_authority: Option<Identifier>,
pub(crate) registry: Arc<Registry>,
pub(crate) medic_handle: MedicHandle,
}

impl NodeManager {
Expand All @@ -77,9 +75,6 @@ impl NodeManager {

let registry = Arc::new(Registry::default());

debug!("start the medic");
let medic_handle = MedicHandle::start_medic(ctx, registry.clone()).await?;

debug!("retrieve the node identifier");
let node_identifier = cli_state.get_node(&node_name).await?.identifier();

Expand Down Expand Up @@ -159,7 +154,6 @@ impl NodeManager {
credential_retriever_creators,
project_authority: trust_options.project_authority,
registry,
medic_handle,
};

debug!("initializing services");
Expand Down Expand Up @@ -280,7 +274,7 @@ impl NodeManager {
);
}

// Always start the echoer service as ockam_api::Medic assumes it will be
// Always start the echoer service as ockam_api::Session assumes it will be
// started unconditionally on every node. It's used for liveliness checks.
ctx.flow_controls()
.add_consumer(DefaultAddress::ECHO_SERVICE, &api_flow_control_id);
Expand All @@ -292,7 +286,7 @@ impl NodeManager {

pub async fn make_connection(
&self,
ctx: Arc<Context>,
ctx: &Context,
addr: &MultiAddr,
identifier: Identifier,
authorized: Option<Identifier>,
Expand All @@ -307,7 +301,7 @@ impl NodeManager {
/// Returns [`Connection`]
async fn connect(
&self,
ctx: Arc<Context>,
ctx: &Context,
addr: &MultiAddr,
identifier: Identifier,
authorized: Option<Vec<Identifier>>,
Expand All @@ -316,15 +310,15 @@ impl NodeManager {
debug!(?timeout, "connecting to {}", &addr);
let connection = ConnectionBuilder::new(addr.clone())
.instantiate(
ctx.clone(),
ctx,
self,
ProjectInstantiator::new(identifier.clone(), timeout),
)
.await?
.instantiate(ctx.clone(), self, PlainTcpInstantiator::new())
.instantiate(ctx, self, PlainTcpInstantiator::new())
.await?
.instantiate(
ctx.clone(),
ctx,
self,
SecureChannelInstantiator::new(&identifier, timeout, authorized),
)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
use miette::IntoDiagnostic;
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
use tracing::trace;

use minicbor::{CborLen, Decode, Encode};

use ockam_core::api::{Error, Request, Response};
use ockam_core::{self, async_trait, AsyncTryClone, Result};
use ockam_core::{self, async_trait, Result};
use ockam_multiaddr::MultiAddr;
use ockam_node::{Context, MessageSendReceiveOptions};

Expand Down Expand Up @@ -38,9 +37,8 @@ impl Messages for NodeManager {
timeout: Option<Duration>,
) -> miette::Result<Vec<u8>> {
let msg_length = message.len();
let connection_ctx = Arc::new(ctx.async_try_clone().await.into_diagnostic()?);
let connection = self
.make_connection(connection_ctx, to, self.identifier(), None, timeout)
.make_connection(ctx, to, self.identifier(), None, timeout)
.await
.into_diagnostic()?;
let route = connection.route().into_diagnostic()?;
Expand Down
Loading

0 comments on commit ab050ca

Please sign in to comment.