Skip to content

Commit

Permalink
refactor(rust): remove the need to keep a flag to skip defaults
Browse files Browse the repository at this point in the history
the only case where it is needed is when starting a new node
and this can be determined by the presence / absence of a launch config file
  • Loading branch information
etorreborre committed Oct 2, 2023
1 parent 8953f25 commit a9afa5d
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 50 deletions.
69 changes: 31 additions & 38 deletions implementations/rust/ockam/ockam_api/src/nodes/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,6 @@ pub struct NodeManager {
api_transport_flow_control_id: FlowControlId,
pub(crate) tcp_transport: TcpTransport,
pub(crate) controller_identity_id: Identifier,
skip_defaults: bool,
enable_credential_checks: bool,
identifier: Identifier,
pub(crate) secure_channels: Arc<SecureChannels>,
Expand Down Expand Up @@ -224,21 +223,18 @@ impl NodeManager {
pub struct NodeManagerGeneralOptions {
cli_state: CliState,
node_name: String,
skip_defaults: bool,
pre_trusted_identities: Option<PreTrustedIdentities>,
}

impl NodeManagerGeneralOptions {
pub fn new(
cli_state: CliState,
node_name: String,
skip_defaults: bool,
pre_trusted_identities: Option<PreTrustedIdentities>,
) -> Self {
Self {
cli_state,
node_name,
skip_defaults,
pre_trusted_identities,
}
}
Expand Down Expand Up @@ -339,7 +335,6 @@ impl NodeManager {
api_transport_flow_control_id: transport_options.api_transport_flow_control_id,
tcp_transport: transport_options.tcp_transport,
controller_identity_id: Self::load_controller_identifier()?,
skip_defaults: general_options.skip_defaults,
enable_credential_checks: trust_options.trust_context_config.is_some()
&& trust_options
.trust_context_config
Expand All @@ -355,12 +350,9 @@ impl NodeManager {
policies,
};

if !general_options.skip_defaults {
debug!("starting default services");
if let Some(tc) = trust_options.trust_context_config {
debug!("configuring trust context");
s.configure_trust_context(&tc).await?;
}
if let Some(tc) = trust_options.trust_context_config {
debug!("configuring trust context");
s.configure_trust_context(&tc).await?;
}
info!("created a node manager for the node: {}", s.node_name);

Expand All @@ -381,7 +373,7 @@ impl NodeManager {
Ok(())
}

async fn initialize_defaults(
async fn initialize_default_services(
&mut self,
ctx: &Context,
api_flow_control_id: &FlowControlId,
Expand Down Expand Up @@ -424,6 +416,33 @@ impl NodeManager {
Ok(())
}

pub async fn initialize_services(
&mut self,
ctx: &Context,
start_default_services: bool,
) -> Result<()> {
let api_flow_control_id = self.api_transport_flow_control_id.clone();

if start_default_services {
self.initialize_default_services(ctx, &api_flow_control_id)
.await?;
}

// Always start the echoer service as ockam_api::Medic assumes it will be
// started unconditionally on every node. It's used for liveliness checks.
ctx.flow_controls()
.add_consumer(DefaultAddress::ECHO_SERVICE, &api_flow_control_id);
self.start_echoer_service_impl(ctx, DefaultAddress::ECHO_SERVICE.into())
.await?;

ctx.flow_controls()
.add_consumer(DefaultAddress::RPC_PROXY, &api_flow_control_id);
ctx.start_worker(DefaultAddress::RPC_PROXY, RpcProxyService::new())
.await?;

Ok(())
}

/// Resolve project ID (if any), create secure channel (if needed) and create a tcp connection
/// Returns [`ConnectionInstance`]
pub(crate) async fn connect(
Expand Down Expand Up @@ -830,32 +849,6 @@ impl Worker for NodeManagerWorker {
type Message = Vec<u8>;
type Context = Context;

async fn initialize(&mut self, ctx: &mut Context) -> Result<()> {
let mut node_manager = self.node_manager.write().await;
let api_flow_control_id = node_manager.api_transport_flow_control_id.clone();

if !node_manager.skip_defaults {
node_manager
.initialize_defaults(ctx, &api_flow_control_id)
.await?;
}

// Always start the echoer service as ockam_api::Medic assumes it will be
// started unconditionally on every node. It's used for liveliness checks.
ctx.flow_controls()
.add_consumer(DefaultAddress::ECHO_SERVICE, &api_flow_control_id);
node_manager
.start_echoer_service_impl(ctx, DefaultAddress::ECHO_SERVICE.into())
.await?;

ctx.flow_controls()
.add_consumer(DefaultAddress::RPC_PROXY, &api_flow_control_id);
ctx.start_worker(DefaultAddress::RPC_PROXY, RpcProxyService::new())
.await?;

Ok(())
}

async fn shutdown(&mut self, ctx: &mut Self::Context) -> Result<()> {
let node_manager = self.node_manager.read().await;
node_manager.medic_handle.stop_medic(ctx).await
Expand Down
5 changes: 3 additions & 2 deletions implementations/rust/ockam/ockam_api/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -442,9 +442,9 @@ pub mod test_utils {
let node_config = NodeConfig::try_from(&cli_state).unwrap();
cli_state.nodes.create(&node_name, node_config)?;

let node_manager = NodeManager::create(
let mut node_manager = NodeManager::create(
context,
NodeManagerGeneralOptions::new(cli_state.clone(), node_name, false, None),
NodeManagerGeneralOptions::new(cli_state.clone(), node_name, None),
NodeManagerTransportOptions::new(
FlowControls::generate_flow_control_id(), // FIXME
tcp.async_try_clone().await?,
Expand All @@ -461,6 +461,7 @@ pub mod test_utils {
)
.await?;

node_manager.initialize_services(context, true).await?;
let node_manager_worker = NodeManagerWorker::new(node_manager);
let node_manager = node_manager_worker.inner().clone();
let secure_channels = node_manager.read().await.secure_channels.clone();
Expand Down
2 changes: 1 addition & 1 deletion implementations/rust/ockam/ockam_app/src/app/state/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ pub(crate) async fn make_node_manager_worker(

let node_manager = NodeManager::create(
&ctx,
NodeManagerGeneralOptions::new(cli_state.clone(), NODE_NAME.to_string(), false, None),
NodeManagerGeneralOptions::new(cli_state.clone(), NODE_NAME.to_string(), None),
NodeManagerTransportOptions::new(listener.flow_control_id().clone(), tcp),
NodeManagerTrustOptions::new(trust_context_config),
)
Expand Down
9 changes: 6 additions & 3 deletions implementations/rust/ockam/ockam_command/src/node/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -302,12 +302,11 @@ async fn run_foreground_node(

let pre_trusted_identities = load_pre_trusted_identities(&cmd)?;

let node_man = NodeManager::create(
let mut node_man = NodeManager::create(
&ctx,
NodeManagerGeneralOptions::new(
opts.state.clone(),
cmd.node_name.clone(),
cmd.launch_config.is_some(),
pre_trusted_identities,
),
NodeManagerTransportOptions::new(
Expand All @@ -318,8 +317,12 @@ async fn run_foreground_node(
)
.await
.into_diagnostic()?;
let node_manager_worker = NodeManagerWorker::new(node_man);
node_man
.initialize_services(&ctx, cmd.launch_config.is_none())
.await
.into_diagnostic()?;

let node_manager_worker = NodeManagerWorker::new(node_man);
ctx.flow_controls()
.add_consumer(NODEMANAGER_ADDR, listener.flow_control_id());
ctx.start_worker(NODEMANAGER_ADDR, node_manager_worker)
Expand Down
7 changes: 1 addition & 6 deletions implementations/rust/ockam/ockam_command/src/node/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,7 @@ pub async fn start_embedded_node_with_vault_and_identity(

let node_man = NodeManager::create(
ctx,
NodeManagerGeneralOptions::new(
cli_state.clone(),
cmd.node_name.clone(),
cmd.launch_config.is_some(),
None,
),
NodeManagerGeneralOptions::new(cli_state.clone(), cmd.node_name.clone(), None),
NodeManagerTransportOptions::new(listener.flow_control_id().clone(), tcp),
NodeManagerTrustOptions::new(trust_context_config),
)
Expand Down

0 comments on commit a9afa5d

Please sign in to comment.