Skip to content

Commit

Permalink
feat(rust): rework Sessions
Browse files Browse the repository at this point in the history
  • Loading branch information
SanjoDeundiak committed Jul 23, 2024
1 parent 57d9a54 commit 5135c5b
Show file tree
Hide file tree
Showing 22 changed files with 576 additions and 766 deletions.
2 changes: 1 addition & 1 deletion implementations/rust/ockam/ockam_api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ pub use error::*;
pub use influxdb_token_lease::*;
pub use nodes::service::default_address::*;
pub use rendezvous_healthcheck::*;
pub use session::session::ConnectionStatus;
pub use session::connection_status::ConnectionStatus;
pub use ui::*;
pub use util::*;
pub use version::*;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use crate::colors::color_primary;
use crate::error::ApiError;

use crate::output::Output;
use crate::session::session::ConnectionStatus;
use crate::session::connection_status::ConnectionStatus;
use crate::{route_to_multiaddr, try_address_to_multiaddr};

/// Request body to create an inlet
Expand Down
16 changes: 16 additions & 0 deletions implementations/rust/ockam/ockam_api/src/nodes/models/relay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ use ockam_multiaddr::MultiAddr;
use crate::colors::OckamColor;
use crate::error::ApiError;
use crate::output::{colorize_connection_status, Output};
use crate::session::replacer::ReplacerOutputKind;
use crate::session::session::Session;
use crate::{route_to_multiaddr, ConnectionStatus};

/// Request body when instructing a node to create a relay
Expand Down Expand Up @@ -94,6 +96,20 @@ impl RelayInfo {
}
}

pub fn from_session(session: &Session, destination_address: MultiAddr, alias: String) -> Self {
let relay_info = Self::new(destination_address, alias, session.connection_status());
if let Some(outcome) = session.outcome() {
match outcome {
ReplacerOutputKind::Relay(info) => relay_info.with(info),
ReplacerOutputKind::Inlet(_) => {
panic!("InletInfo should not be in the registry")
}
}
} else {
relay_info
}
}

pub fn with(self, remote_relay_info: RemoteRelayInfo) -> Self {
Self {
forwarding_route: Some(remote_relay_info.forwarding_route().to_string()),
Expand Down
38 changes: 8 additions & 30 deletions implementations/rust/ockam/ockam_api/src/nodes/registry.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
use crate::cli_state::random_name;
use crate::nodes::models::relay::RelayInfo;
use crate::session::session::{ReplacerOutputKind, Session};
use crate::DefaultAddress;

use ockam::identity::Identifier;
use ockam::identity::{SecureChannel, SecureChannelListener};
use ockam_core::compat::collections::BTreeMap;
use ockam_core::{Address, Route};
use ockam_multiaddr::MultiAddr;
use ockam_node::compat::asynchronous::RwLock;
use ockam_node::compat::asynchronous::{Mutex, RwLock};
use ockam_transport_core::HostnamePort;

use crate::session::session::Session;
use std::borrow::Borrow;
use std::fmt::Display;
use std::sync::Arc;

#[derive(Default)]
pub(crate) struct SecureChannelRegistry {
Expand Down Expand Up @@ -127,15 +129,15 @@ impl KafkaServiceInfo {
pub(crate) struct InletInfo {
pub(crate) bind_addr: String,
pub(crate) outlet_addr: MultiAddr,
pub(crate) session: Session,
pub(crate) session: Arc<Mutex<Session>>,
}

impl InletInfo {
pub(crate) fn new(bind_addr: &str, outlet_addr: MultiAddr, session: Session) -> Self {
Self {
bind_addr: bind_addr.to_owned(),
outlet_addr,
session,
session: Arc::new(Mutex::new(session)),
}
}
}
Expand All @@ -160,31 +162,7 @@ impl OutletInfo {
pub struct RegistryRelayInfo {
pub(crate) destination_address: MultiAddr,
pub(crate) alias: String,
pub(crate) session: Session,
}

impl From<RegistryRelayInfo> for RelayInfo {
fn from(registry_relay_info: RegistryRelayInfo) -> Self {
let session_lock = registry_relay_info.session.lock();
let relay_info = RelayInfo::new(
registry_relay_info.destination_address.clone(),
registry_relay_info.alias.clone(),
session_lock.connection_status(),
);

let current_relay_status = session_lock.status().map(|info| match info.kind {
ReplacerOutputKind::Inlet(_) => {
panic!("InletInfo should not be in the registry")
}
ReplacerOutputKind::Relay(info) => info,
});

if let Some(current_relay_status) = current_relay_status {
relay_info.with(current_relay_status)
} else {
relay_info
}
}
pub(crate) session: Arc<Mutex<Session>>,
}

#[derive(Default)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ impl InMemoryNode {
}

pub async fn stop(&self, ctx: &Context) -> Result<()> {
self.medic_handle.stop_medic(ctx).await?;
// self.session_handle.stop_session(ctx).await?; FIXME
for addr in DefaultAddress::iter() {
let result = ctx.stop_worker(addr).await;
// when stopping we can safely ignore missing services
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ use crate::nodes::service::{

use crate::cli_state::journeys::{NODE_NAME, USER_EMAIL, USER_NAME};
use crate::logs::CurrentSpan;
use crate::session::handle::MedicHandle;
use crate::{ApiError, CliState, DefaultAddress};
use miette::IntoDiagnostic;
use ockam::identity::{
Expand Down Expand Up @@ -60,7 +59,6 @@ pub struct NodeManager {
pub(crate) credential_retriever_creators: CredentialRetrieverCreators,
pub(super) project_authority: Option<Identifier>,
pub(crate) registry: Arc<Registry>,
pub(crate) medic_handle: MedicHandle,
}

impl NodeManager {
Expand All @@ -77,9 +75,6 @@ impl NodeManager {

let registry = Arc::new(Registry::default());

debug!("start the medic");
let medic_handle = MedicHandle::start_medic(ctx, registry.clone()).await?;

debug!("retrieve the node identifier");
let node_identifier = cli_state.get_node(&node_name).await?.identifier();

Expand Down Expand Up @@ -159,7 +154,6 @@ impl NodeManager {
credential_retriever_creators,
project_authority: trust_options.project_authority,
registry,
medic_handle,
};

debug!("initializing services");
Expand Down Expand Up @@ -280,7 +274,7 @@ impl NodeManager {
);
}

// Always start the echoer service as ockam_api::Medic assumes it will be
// Always start the echoer service as ockam_api::Session assumes it will be
// started unconditionally on every node. It's used for liveliness checks.
ctx.flow_controls()
.add_consumer(DefaultAddress::ECHO_SERVICE, &api_flow_control_id);
Expand Down
65 changes: 41 additions & 24 deletions implementations/rust/ockam/ockam_api/src/nodes/service/relay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use ockam_core::api::{Error, Request, RequestHeader, Response};
use ockam_core::errcode::{Kind, Origin};
use ockam_core::{async_trait, route, Address, AsyncTryClone};
use ockam_multiaddr::MultiAddr;
use ockam_node::compat::asynchronous::Mutex;
use ockam_node::Context;

use crate::nodes::connection::Connection;
Expand All @@ -22,8 +23,8 @@ use crate::nodes::registry::RegistryRelayInfo;
use crate::nodes::service::in_memory_node::InMemoryNode;
use crate::nodes::service::secure_channel::SecureChannelType;
use crate::nodes::BackgroundNodeClient;
use crate::session::handle::MedicHandle;
use crate::session::session::{ReplacerOutcome, ReplacerOutputKind, Session, SessionReplacer};
use crate::session::replacer::{ReplacerOutcome, ReplacerOutputKind, SessionReplacer};
use crate::session::session::Session;

use super::{NodeManager, NodeManagerWorker};

Expand Down Expand Up @@ -98,14 +99,17 @@ impl NodeManager {
/// This function returns a representation of the relays currently
/// registered on this node
pub async fn get_relays(&self) -> Vec<RelayInfo> {
let relays = self
.registry
.relays
.entries()
.await
.into_iter()
.map(|(_, registry_info)| registry_info.into())
.collect();
let mut relays = vec![];
for (_, registry_info) in self.registry.relays.entries().await {
let session = registry_info.session.lock().await;
let info = RelayInfo::from_session(
&session,
registry_info.destination_address.clone(),
registry_info.alias.clone(),
);
relays.push(info);
}

trace!(?relays, "Relays retrieved");
relays
}
Expand Down Expand Up @@ -141,35 +145,41 @@ impl NodeManager {
authorized,
};

let mut session = Session::new(replacer);
let relay_info =
MedicHandle::connect(&mut session)
let mut session = Session::create(ctx, replacer).await?;

let remote_relay_info =
session
.connect()
.await
.map(|outcome| match outcome.kind {
.map(|(_handle, outcome)| match outcome {
ReplacerOutputKind::Relay(status) => status,
_ => {
panic!("Unexpected outcome: {:?}", outcome);
}
})?;

let relay_info = RelayInfo::new(addr.clone(), alias.clone(), session.connection_status())
.with(remote_relay_info);

let registry_relay_info = RegistryRelayInfo {
destination_address: addr.clone(),
alias: alias.clone(),
session,
session: Arc::new(Mutex::new(session)),
};

self.registry
.relays
.insert(alias, registry_relay_info.clone())
.await;

debug!(
forwarding_route = %relay_info.forwarding_route(),
remote_address = %relay_info.remote_address(),
"CreateRelay request processed, sending back response"
);
// FIXME
// debug!(
// forwarding_route = %relay_info.forwarding_route(),
// remote_address = %relay_info.remote_address(),
// "CreateRelay request processed, sending back response"
// );

Ok(registry_relay_info.into())
Ok(relay_info)
}

/// Delete a relay.
Expand All @@ -178,7 +188,7 @@ impl NodeManager {
pub async fn delete_relay_impl(&self, alias: &str) -> Result<(), ockam::Error> {
if let Some(relay_to_delete) = self.registry.relays.remove(alias).await {
debug!(%alias, "Successfully removed relay from node registry");
let result = relay_to_delete.session.close().await;
let result = relay_to_delete.session.lock().await.stop().await;
match result {
Ok(_) => {
debug!(%alias, "Successfully stopped relay");
Expand Down Expand Up @@ -207,7 +217,14 @@ impl NodeManager {
) -> Result<Response<RelayInfo>, Response<Error>> {
debug!("Handling ShowRelay request");
if let Some(registry_info) = self.registry.relays.get(alias).await {
Ok(Response::ok().with_headers(req).body(registry_info.into()))
let session = registry_info.session.lock().await;

let relay_info = RelayInfo::from_session(
&session,
registry_info.destination_address.clone(),
registry_info.alias.clone(),
);
Ok(Response::ok().with_headers(req).body(relay_info))
} else {
error!(%alias, "Relay not found in the node registry");
Err(Response::not_found(
Expand Down Expand Up @@ -251,7 +268,7 @@ struct RelaySessionReplacer {

#[async_trait]
impl SessionReplacer for RelaySessionReplacer {
async fn create(&mut self) -> std::result::Result<ReplacerOutcome, ockam_core::Error> {
async fn create(&mut self) -> Result<ReplacerOutcome> {
debug!(addr = self.addr.to_string(), relay_address = ?self.relay_address, "Handling CreateRelay request");
let connection = self
.node_manager
Expand Down
Loading

0 comments on commit 5135c5b

Please sign in to comment.