diff --git a/examples/rust/get_started/examples/bob.rs b/examples/rust/get_started/examples/bob.rs index 97a68a7658e..d45a0faf64a 100644 --- a/examples/rust/get_started/examples/bob.rs +++ b/examples/rust/get_started/examples/bob.rs @@ -17,7 +17,7 @@ impl Worker for Echoer { println!("\n[✓] Address: {}, Received: {:?}", ctx.address(), msg); // Echo the message body back on its return_route. - ctx.send(msg.return_route(), msg.into_body()?).await + ctx.send(msg.return_route().clone(), msg.into_body()?).await } } diff --git a/examples/rust/get_started/src/echoer.rs b/examples/rust/get_started/src/echoer.rs index 099142cb543..578ef57c2c5 100644 --- a/examples/rust/get_started/src/echoer.rs +++ b/examples/rust/get_started/src/echoer.rs @@ -11,6 +11,6 @@ impl Worker for Echoer { println!("Address: {}, Received: {:?}", ctx.address(), msg); // Echo the message body back on its return_route. - ctx.send(msg.return_route(), msg.into_body()?).await + ctx.send(msg.return_route().clone(), msg.into_body()?).await } } diff --git a/examples/rust/get_started/src/relay.rs b/examples/rust/get_started/src/relay.rs index b1b4a9f7121..3f3a32cf0f7 100644 --- a/examples/rust/get_started/src/relay.rs +++ b/examples/rust/get_started/src/relay.rs @@ -34,7 +34,7 @@ impl Worker for Relay { local_message = local_message.pop_front_onward_route()?; local_message = local_message.prepend_front_onward_route(&self.route); // Prepend predefined route to the onward_route - let prev_hop = local_message.return_route_ref().next()?.clone(); + let prev_hop = local_message.return_route().next()?.clone(); if let Some(info) = ctx .flow_controls() diff --git a/implementations/rust/ockam/ockam/src/relay_service/relay.rs b/implementations/rust/ockam/ockam/src/relay_service/relay.rs index a277c09d2fc..5a5bd4a71ae 100644 --- a/implementations/rust/ockam/ockam/src/relay_service/relay.rs +++ b/implementations/rust/ockam/ockam/src/relay_service/relay.rs @@ -71,7 +71,7 @@ impl Worker for Relay { .await?; // Remove the last hop so that just route to the node itself is left - self.forward_route.modify().pop_back(); + self.forward_route = self.forward_route.clone().modify().pop_back().into(); Ok(()) } @@ -88,7 +88,7 @@ impl Worker for Relay { .prepend_front_onward_route(&self.forward_route); let next_hop = local_message.next_on_onward_route()?; - let prev_hop = local_message.return_route_ref().next()?; + let prev_hop = local_message.return_route().next()?; if let Some(info) = ctx .flow_controls() diff --git a/implementations/rust/ockam/ockam/src/relay_service/relay_service.rs b/implementations/rust/ockam/ockam/src/relay_service/relay_service.rs index 7d2202b53e7..9a2b1049d45 100644 --- a/implementations/rust/ockam/ockam/src/relay_service/relay_service.rs +++ b/implementations/rust/ockam/ockam/src/relay_service/relay_service.rs @@ -72,7 +72,7 @@ impl Worker for RelayService { let secure_channel_local_info = SecureChannelLocalInfo::find_info(message.local_message()).ok(); - let forward_route = message.return_route(); + let forward_route = message.return_route().clone(); let requested_relay_address = message.into_body()?; let requested_relay_name = if requested_relay_address == "register" { diff --git a/implementations/rust/ockam/ockam/src/remote/worker.rs b/implementations/rust/ockam/ockam/src/remote/worker.rs index 3f557a4000a..8adc0e61fc7 100644 --- a/implementations/rust/ockam/ockam/src/remote/worker.rs +++ b/implementations/rust/ockam/ockam/src/remote/worker.rs @@ -33,7 +33,6 @@ impl Worker for RemoteRelay { msg: Routed, ) -> Result<()> { if msg.msg_addr() == self.addresses.main_remote { - let return_route = msg.return_route(); let mut local_message = msg.into_local_message(); // Remove my address from the onward_route @@ -53,8 +52,12 @@ impl Worker for RemoteRelay { } if !self.completion_msg_sent { - info!(registration_route = %self.registration_route, "RemoteRelay registered with route: {}", return_route); - let address = match return_route.recipient()?.to_string().strip_prefix("0#") + info!(registration_route = %self.registration_route, "RemoteRelay registered with route: {}", local_message.return_route); + let address = match local_message + .return_route + .recipient()? + .to_string() + .strip_prefix("0#") { Some(addr) => addr.to_string(), None => return Err(OckamError::InvalidResponseFromRelayService)?, @@ -63,7 +66,7 @@ impl Worker for RemoteRelay { ctx.send_from_address( self.addresses.completion_callback.clone(), RemoteRelayInfo::new( - return_route, + local_message.return_route, address, self.addresses.main_remote.clone(), self.flow_control_id.clone(), diff --git a/implementations/rust/ockam/ockam_api/src/address.rs b/implementations/rust/ockam/ockam_api/src/address.rs index 814a1d14465..8e298545162 100644 --- a/implementations/rust/ockam/ockam_api/src/address.rs +++ b/implementations/rust/ockam/ockam_api/src/address.rs @@ -22,19 +22,19 @@ pub fn extract_address_value(input: &str) -> Result { Node::CODE => { addr = p .cast::() - .ok_or(ApiError::message("Failed to parse `node` protocol"))? + .ok_or_else(|| ApiError::message("Failed to parse `node` protocol"))? .to_string(); } Service::CODE => { addr = p .cast::() - .ok_or(ApiError::message("Failed to parse `service` protocol"))? + .ok_or_else(|| ApiError::message("Failed to parse `service` protocol"))? .to_string(); } Project::CODE => { addr = p .cast::() - .ok_or(ApiError::message("Failed to parse `project` protocol"))? + .ok_or_else(|| ApiError::message("Failed to parse `project` protocol"))? .to_string(); } code => return Err(ApiError::message(format!("Protocol {code} not supported"))), diff --git a/implementations/rust/ockam/ockam_api/src/authenticator/credential_issuer/credential_issuer_worker.rs b/implementations/rust/ockam/ockam_api/src/authenticator/credential_issuer/credential_issuer_worker.rs index 92271889b71..0bfaebae5af 100644 --- a/implementations/rust/ockam/ockam_api/src/authenticator/credential_issuer/credential_issuer_worker.rs +++ b/implementations/rust/ockam/ockam_api/src/authenticator/credential_issuer/credential_issuer_worker.rs @@ -56,13 +56,13 @@ impl Worker for CredentialIssuerWorker { Ok(secure_channel_info) => secure_channel_info, Err(_e) => { let resp = Response::bad_request_no_request("secure channel required").to_vec()?; - c.send(m.return_route(), resp).await?; + c.send(m.return_route().clone(), resp).await?; return Ok(()); } }; let from = Identifier::from(secure_channel_info.their_identifier()); - let return_route = m.return_route(); + let return_route = m.return_route().clone(); let body = m.into_body()?; let mut dec = Decoder::new(&body); let req: RequestHeader = dec.decode()?; diff --git a/implementations/rust/ockam/ockam_api/src/authenticator/direct/direct_authenticator_worker.rs b/implementations/rust/ockam/ockam_api/src/authenticator/direct/direct_authenticator_worker.rs index 2616e6357fe..982bbf94ccb 100644 --- a/implementations/rust/ockam/ockam_api/src/authenticator/direct/direct_authenticator_worker.rs +++ b/implementations/rust/ockam/ockam_api/src/authenticator/direct/direct_authenticator_worker.rs @@ -44,13 +44,13 @@ impl Worker for DirectAuthenticatorWorker { Ok(secure_channel_info) => secure_channel_info, Err(_e) => { let resp = Response::bad_request_no_request("secure channel required").to_vec()?; - c.send(m.return_route(), resp).await?; + c.send(m.return_route().clone(), resp).await?; return Ok(()); } }; let from = Identifier::from(secure_channel_info.their_identifier()); - let return_route = m.return_route(); + let return_route = m.return_route().clone(); let body = m.into_body()?; let mut dec = Decoder::new(&body); let req: RequestHeader = dec.decode()?; diff --git a/implementations/rust/ockam/ockam_api/src/authenticator/enrollment_tokens/acceptor_worker.rs b/implementations/rust/ockam/ockam_api/src/authenticator/enrollment_tokens/acceptor_worker.rs index 498fb78e228..26d570e0d9c 100644 --- a/implementations/rust/ockam/ockam_api/src/authenticator/enrollment_tokens/acceptor_worker.rs +++ b/implementations/rust/ockam/ockam_api/src/authenticator/enrollment_tokens/acceptor_worker.rs @@ -35,13 +35,13 @@ impl Worker for EnrollmentTokenAcceptorWorker { Ok(secure_channel_info) => secure_channel_info, Err(_e) => { let resp = Response::bad_request_no_request("secure channel required").to_vec()?; - c.send(m.return_route(), resp).await?; + c.send(m.return_route().clone(), resp).await?; return Ok(()); } }; let from = Identifier::from(secure_channel_info.their_identifier()); - let return_route = m.return_route(); + let return_route = m.return_route().clone(); let body = m.into_body()?; let mut dec = Decoder::new(&body); let req: RequestHeader = dec.decode()?; diff --git a/implementations/rust/ockam/ockam_api/src/authenticator/enrollment_tokens/issuer_worker.rs b/implementations/rust/ockam/ockam_api/src/authenticator/enrollment_tokens/issuer_worker.rs index ac45f24118b..98d0d54e792 100644 --- a/implementations/rust/ockam/ockam_api/src/authenticator/enrollment_tokens/issuer_worker.rs +++ b/implementations/rust/ockam/ockam_api/src/authenticator/enrollment_tokens/issuer_worker.rs @@ -46,13 +46,13 @@ impl Worker for EnrollmentTokenIssuerWorker { Ok(secure_channel_info) => secure_channel_info, Err(_e) => { let resp = Response::bad_request_no_request("secure channel required").to_vec()?; - c.send(m.return_route(), resp).await?; + c.send(m.return_route().clone(), resp).await?; return Ok(()); } }; let from = Identifier::from(secure_channel_info.their_identifier()); - let return_route = m.return_route(); + let return_route = m.return_route().clone(); let body = m.into_body()?; let mut dec = Decoder::new(&body); let req: RequestHeader = dec.decode()?; diff --git a/implementations/rust/ockam/ockam_api/src/cli_state/cli_state.rs b/implementations/rust/ockam/ockam_api/src/cli_state/cli_state.rs index 04f187dd93a..ee606c7eeb8 100644 --- a/implementations/rust/ockam/ockam_api/src/cli_state/cli_state.rs +++ b/implementations/rust/ockam/ockam_api/src/cli_state/cli_state.rs @@ -186,15 +186,16 @@ impl CliState { /// Returns the default backup directory for the CLI state. pub fn backup_default_dir() -> Result { let dir = Self::default_dir()?; - let dir_name = - dir.file_name() - .and_then(|n| n.to_str()) - .ok_or(CliStateError::InvalidOperation( - "The $OCKAM_HOME directory does not have a valid name".to_string(), - ))?; - let parent = dir.parent().ok_or(CliStateError::InvalidOperation( - "The $OCKAM_HOME directory does not a valid parent directory".to_string(), - ))?; + let dir_name = dir.file_name().and_then(|n| n.to_str()).ok_or_else(|| { + CliStateError::InvalidOperation( + "The $OCKAM_HOME directory does not have a valid name".to_string(), + ) + })?; + let parent = dir.parent().ok_or_else(|| { + CliStateError::InvalidOperation( + "The $OCKAM_HOME directory does not a valid parent directory".to_string(), + ) + })?; Ok(parent.join(format!("{dir_name}.bak"))) } } @@ -303,7 +304,7 @@ impl CliState { Ok(get_env_with_default::( "OCKAM_HOME", home::home_dir() - .ok_or(CliStateError::InvalidPath("$HOME".to_string()))? + .ok_or_else(|| CliStateError::InvalidPath("$HOME".to_string()))? .join(".ockam"), )?) } diff --git a/implementations/rust/ockam/ockam_api/src/cli_state/enrollments.rs b/implementations/rust/ockam/ockam_api/src/cli_state/enrollments.rs index bf8fba79b1a..9d11423d361 100644 --- a/implementations/rust/ockam/ockam_api/src/cli_state/enrollments.rs +++ b/implementations/rust/ockam/ockam_api/src/cli_state/enrollments.rs @@ -441,9 +441,7 @@ impl ProjectRoute { let project_id = dnsaddr .split('.') .next() - .ok_or(CliStateError::InvalidData( - "Invalid project route".to_string(), - ))? + .ok_or_else(|| CliStateError::InvalidData("Invalid project route".to_string()))? .to_string(); Ok(Self { id: project_id, @@ -529,11 +527,11 @@ impl EnrollmentTicket { let project_change_history = project .project_change_history .as_ref() - .ok_or(ApiError::core("no project change history"))?; + .ok_or_else(|| ApiError::core("no project change history"))?; let authority_change_history = project .authority_identity .as_ref() - .ok_or(ApiError::core("no authority change history"))?; + .ok_or_else(|| ApiError::core("no authority change history"))?; let authority_route = project .authority_access_route .as_ref() @@ -558,14 +556,12 @@ impl EnrollmentTicket { let project_change_history = project .project_change_history .as_ref() - .ok_or(ApiError::core("no project change history in legacy ticket"))? + .ok_or_else(|| ApiError::core("no project change history in legacy ticket"))? .clone(); let authority_change_history = project .authority_identity .as_ref() - .ok_or(ApiError::core( - "no authority change history in legacy ticket", - ))? + .ok_or_else(|| ApiError::core("no authority change history in legacy ticket"))? .clone(); let authority_route = project .authority_access_route diff --git a/implementations/rust/ockam/ockam_api/src/cli_state/nodes.rs b/implementations/rust/ockam/ockam_api/src/cli_state/nodes.rs index 2efb434c6ad..e2e90bc8c44 100644 --- a/implementations/rust/ockam/ockam_api/src/cli_state/nodes.rs +++ b/implementations/rust/ockam/ockam_api/src/cli_state/nodes.rs @@ -369,11 +369,7 @@ impl CliState { .nodes_repository() .get_default_node() .await? - .ok_or(Error::new( - Origin::Api, - Kind::NotFound, - "There is no default node", - ))?) + .ok_or_else(|| Error::new(Origin::Api, Kind::NotFound, "There is no default node"))?) } /// Return the node information for the given node name, otherwise for the default node @@ -416,11 +412,13 @@ impl CliState { } }) .max_by_key(|file| file.metadata().unwrap().modified().unwrap()) - .ok_or(Error::new( - Origin::Api, - Kind::NotFound, - format!("there is no log file for the node {node_name}"), - ))?; + .ok_or_else(|| { + Error::new( + Origin::Api, + Kind::NotFound, + format!("there is no log file for the node {node_name}"), + ) + })?; Ok(current_log_file.path()) } } @@ -625,11 +623,13 @@ impl NodeInfo { Ok(self .tcp_listener_address .as_ref() - .ok_or(ockam::Error::new( - Origin::Api, - Kind::Internal, - "no transport has been set on the node".to_string(), - )) + .ok_or_else(|| { + ockam::Error::new( + Origin::Api, + Kind::Internal, + "no transport has been set on the node".to_string(), + ) + }) .and_then(|t| t.multi_addr())?) } @@ -662,7 +662,7 @@ impl NodeInfo { pub fn status(&self) -> NodeProcessStatus { if let Some(pid) = self.pid() { let mut sys = System::new(); - sys.refresh_processes(ProcessesToUpdate::All, false); + sys.refresh_processes(ProcessesToUpdate::Some(&[Pid::from_u32(pid)]), false); if let Some(p) = sys.process(Pid::from_u32(pid)) { // Under certain circumstances, the process can be in a state where it's not running, // and we are unable to kill it. For example, `kill -9` a process created by diff --git a/implementations/rust/ockam/ockam_api/src/cli_state/tcp_portals.rs b/implementations/rust/ockam/ockam_api/src/cli_state/tcp_portals.rs index 7c1f85c0194..82d041ee0a9 100644 --- a/implementations/rust/ockam/ockam_api/src/cli_state/tcp_portals.rs +++ b/implementations/rust/ockam/ockam_api/src/cli_state/tcp_portals.rs @@ -33,11 +33,13 @@ impl CliState { .tcp_portals_repository() .get_tcp_inlet(node_name, alias) .await? - .ok_or(ockam_core::Error::new( - Origin::Api, - Kind::NotFound, - format!("no tcp inlet found for node {node_name}, with alias {alias}"), - ))?) + .ok_or_else(|| { + ockam_core::Error::new( + Origin::Api, + Kind::NotFound, + format!("no tcp inlet found for node {node_name}, with alias {alias}"), + ) + })?) } /// Delete a TCP inlet diff --git a/implementations/rust/ockam/ockam_api/src/cli_state/test_support.rs b/implementations/rust/ockam/ockam_api/src/cli_state/test_support.rs index d34baa6726b..46d551c2cc1 100644 --- a/implementations/rust/ockam/ockam_api/src/cli_state/test_support.rs +++ b/implementations/rust/ockam/ockam_api/src/cli_state/test_support.rs @@ -30,7 +30,7 @@ impl CliState { /// Return a random root directory pub fn test_dir() -> Result { Ok(home::home_dir() - .ok_or(CliStateError::InvalidPath("$HOME".to_string()))? + .ok_or_else(|| CliStateError::InvalidPath("$HOME".to_string()))? .join(".ockam") .join(".tests") .join(random_name())) diff --git a/implementations/rust/ockam/ockam_api/src/cli_state/trust.rs b/implementations/rust/ockam/ockam_api/src/cli_state/trust.rs index 2c78b481984..7227a68d737 100644 --- a/implementations/rust/ockam/ockam_api/src/cli_state/trust.rs +++ b/implementations/rust/ockam/ockam_api/src/cli_state/trust.rs @@ -47,11 +47,13 @@ impl CliState { }; let authority_route = - multiaddr_to_transport_route(authority_multiaddr).ok_or(Error::new( - Origin::Api, - Kind::NotFound, - format!("Invalid authority route: {}", &authority_multiaddr), - ))?; + multiaddr_to_transport_route(authority_multiaddr).ok_or_else(|| { + Error::new( + Origin::Api, + Kind::NotFound, + format!("Invalid authority route: {}", &authority_multiaddr), + ) + })?; let info = RemoteCredentialRetrieverInfo::create_for_project_member( authority_identifier.clone(), authority_route, @@ -112,14 +114,16 @@ impl CliState { ) -> Result { let authority_identifier = project .authority_identifier() - .ok_or(ApiError::core("no authority identifier"))?; + .ok_or_else(|| ApiError::core("no authority identifier"))?; let authority_multiaddr = project.authority_multiaddr()?; let authority_route = - multiaddr_to_transport_route(authority_multiaddr).ok_or(Error::new( - Origin::Api, - Kind::NotFound, - format!("Invalid authority route: {}", &authority_multiaddr), - ))?; + multiaddr_to_transport_route(authority_multiaddr).ok_or_else(|| { + Error::new( + Origin::Api, + Kind::NotFound, + format!("Invalid authority route: {}", &authority_multiaddr), + ) + })?; let project_id = project.project_id().to_string(); let project_member_retriever = NodeManagerCredentialRetrieverOptions::Remote { diff --git a/implementations/rust/ockam/ockam_api/src/cloud/share/create.rs b/implementations/rust/ockam/ockam_api/src/cloud/share/create.rs index 654b0cadf36..8f2004ca9f4 100644 --- a/implementations/rust/ockam/ockam_api/src/cloud/share/create.rs +++ b/implementations/rust/ockam/ockam_api/src/cloud/share/create.rs @@ -62,11 +62,11 @@ impl CreateServiceInvitation { recipient_email: recipient_email.clone(), project_identity: project .project_identifier() - .ok_or(ApiError::core("no project identifier"))?, + .ok_or_else(|| ApiError::core("no project identifier"))?, project_route: project.project_multiaddr()?.to_string(), project_authority_identity: project .authority_identifier() - .ok_or(ApiError::core("no authority identifier"))?, + .ok_or_else(|| ApiError::core("no authority identifier"))?, project_authority_route: project_authority_route.to_string(), shared_node_identity: node_identifier, shared_node_route: service_route.as_ref().to_string(), diff --git a/implementations/rust/ockam/ockam_api/src/echoer.rs b/implementations/rust/ockam/ockam_api/src/echoer.rs index 89ead10181c..1f33e3f1665 100644 --- a/implementations/rust/ockam/ockam_api/src/echoer.rs +++ b/implementations/rust/ockam/ockam_api/src/echoer.rs @@ -11,8 +11,9 @@ impl Worker for Echoer { #[instrument(skip_all, name = "Echoer::handle_message")] async fn handle_message(&mut self, ctx: &mut Context, msg: Routed) -> Result<()> { - log::debug!(src = %msg.src_addr(), from = %msg.sender()?, to = %msg.return_route().step()?, "echoing back"); - ctx.send(msg.return_route(), NeutralMessage::from(msg.into_payload())) + log::debug!(src = %msg.src_addr(), from = %msg.sender()?, to = %msg.return_route().next()?, "echoing back"); + let msg = msg.into_local_message(); + ctx.send(msg.return_route, NeutralMessage::from(msg.payload)) .await } } diff --git a/implementations/rust/ockam/ockam_api/src/influxdb/lease_issuer/worker.rs b/implementations/rust/ockam/ockam_api/src/influxdb/lease_issuer/worker.rs index 356cdd3416a..708c051a727 100644 --- a/implementations/rust/ockam/ockam_api/src/influxdb/lease_issuer/worker.rs +++ b/implementations/rust/ockam/ockam_api/src/influxdb/lease_issuer/worker.rs @@ -110,7 +110,7 @@ impl Worker for InfluxDBTokenLessorWorker { SecureChannelLocalInfo::find_info(msg.local_message())?.their_identifier(), ); - let return_route = msg.return_route(); + let return_route = msg.return_route().clone(); let body = msg.into_body()?; let mut dec = Decoder::new(&body); let req: RequestHeader = match dec.decode() { diff --git a/implementations/rust/ockam/ockam_api/src/influxdb/portal.rs b/implementations/rust/ockam/ockam_api/src/influxdb/portal.rs index a99b7757e89..7308ed4ebe2 100644 --- a/implementations/rust/ockam/ockam_api/src/influxdb/portal.rs +++ b/implementations/rust/ockam/ockam_api/src/influxdb/portal.rs @@ -124,18 +124,12 @@ impl NodeManagerWorker { //TODO: should be an easier way to tweak the multiaddr let mut issuer_route = outlet_addr.clone(); - let outlet_addr_last_service = - issuer_route - .pop_back() - .ok_or(Response::bad_request_no_request( - "The outlet address is invalid", - ))?; - let outlet_addr_last_service = - outlet_addr_last_service - .cast::() - .ok_or(Response::bad_request_no_request( - "The outlet address is invalid", - ))?; + let outlet_addr_last_service = issuer_route + .pop_back() + .ok_or_else(|| Response::bad_request_no_request("The outlet address is invalid"))?; + let outlet_addr_last_service = outlet_addr_last_service + .cast::() + .ok_or_else(|| Response::bad_request_no_request("The outlet address is invalid"))?; let lease_issuer_route = if let Some(s) = body.lease_issuer_address { s diff --git a/implementations/rust/ockam/ockam_api/src/kafka/tests/interceptor_test.rs b/implementations/rust/ockam/ockam_api/src/kafka/tests/interceptor_test.rs index 606200a9f51..7917bd25cce 100644 --- a/implementations/rust/ockam/ockam_api/src/kafka/tests/interceptor_test.rs +++ b/implementations/rust/ockam/ockam_api/src/kafka/tests/interceptor_test.rs @@ -464,9 +464,10 @@ async fn kafka_portal_worker__metadata_exchange__response_changed( let message = context .receive_extended::(MessageReceiveOptions::new().without_timeout()) - .await?; - let return_route = message.return_route(); - let message = PortalMessage::decode(message.payload())?; + .await? + .into_local_message(); + let return_route = message.return_route; + let message = PortalMessage::decode(&message.payload)?; if let PortalMessage::Payload(payload, _) = message { assert_eq!(&request_buffer, payload); diff --git a/implementations/rust/ockam/ockam_api/src/nodes/models/transport/json.rs b/implementations/rust/ockam/ockam_api/src/nodes/models/transport/json.rs index 8b759ad2e29..9666016e286 100644 --- a/implementations/rust/ockam/ockam_api/src/nodes/models/transport/json.rs +++ b/implementations/rust/ockam/ockam_api/src/nodes/models/transport/json.rs @@ -22,9 +22,9 @@ impl CreateTransportJson { Ok(Self { tt, tm, - addr: InternetAddress::new(addr).ok_or(CliStateError::InvalidOperation( - "Invalid address '{addr}'".to_string(), - ))?, + addr: InternetAddress::new(addr).ok_or_else(|| { + CliStateError::InvalidOperation("Invalid address '{addr}'".to_string()) + })?, }) } diff --git a/implementations/rust/ockam/ockam_api/src/nodes/service/background_node_client.rs b/implementations/rust/ockam/ockam_api/src/nodes/service/background_node_client.rs index e252746f278..3c37dc8e94c 100644 --- a/implementations/rust/ockam/ockam_api/src/nodes/service/background_node_client.rs +++ b/implementations/rust/ockam/ockam_api/src/nodes/service/background_node_client.rs @@ -207,10 +207,12 @@ impl BackgroundNodeClient { /// Make a route to the node and connect using TCP async fn create_route(&self) -> miette::Result<(TcpConnection, Route)> { let tcp_connection = self.create_tcp_connection().await?; - let mut route = self.to.clone(); - route + let route = self + .to + .clone() .modify() - .prepend(tcp_connection.sender_address().clone()); + .prepend(tcp_connection.sender_address().clone()) + .into(); debug!("Sending requests to {route}"); Ok((tcp_connection, route)) } diff --git a/implementations/rust/ockam/ockam_api/src/nodes/service/manager.rs b/implementations/rust/ockam/ockam_api/src/nodes/service/manager.rs index 158cb2d6d67..fbe0d18bbe4 100644 --- a/implementations/rust/ockam/ockam_api/src/nodes/service/manager.rs +++ b/implementations/rust/ockam/ockam_api/src/nodes/service/manager.rs @@ -338,7 +338,7 @@ impl NodeManager { project.project_multiaddr()?.clone(), project .project_identifier() - .ok_or(ApiError::core("no project identifier"))?, + .ok_or_else(|| ApiError::core("no project identifier"))?, )) } @@ -450,7 +450,7 @@ impl NodeManager { self.make_authority_node_client( &project .authority_identifier() - .ok_or(ApiError::core("no authority identifier")) + .ok_or_else(|| ApiError::core("no authority identifier")) .into_diagnostic()?, project.authority_multiaddr().into_diagnostic()?, &caller_identifier, diff --git a/implementations/rust/ockam/ockam_api/src/nodes/service/secure_channel.rs b/implementations/rust/ockam/ockam_api/src/nodes/service/secure_channel.rs index a070a9ed9ef..916fc710ce6 100644 --- a/implementations/rust/ockam/ockam_api/src/nodes/service/secure_channel.rs +++ b/implementations/rust/ockam/ockam_api/src/nodes/service/secure_channel.rs @@ -282,11 +282,13 @@ impl NodeManager { .secure_channels .get_by_addr(addr) .await - .ok_or(ockam_core::Error::new( - Origin::Api, - Kind::NotFound, - format!("Secure channel with address, {}, not found", addr), - )) + .ok_or_else(|| { + ockam_core::Error::new( + Origin::Api, + Kind::NotFound, + format!("Secure channel with address, {}, not found", addr), + ) + }) } pub async fn list_secure_channels(&self) -> Vec { @@ -424,11 +426,13 @@ impl NodeManager { .secure_channel_listeners .remove(addr) .await - .ok_or(ockam_core::Error::new( - Origin::Api, - Kind::Internal, - format!("Error while deleting secure channel with addrress {}", addr,), - )) + .ok_or_else(|| { + ockam_core::Error::new( + Origin::Api, + Kind::Internal, + format!("Error while deleting secure channel with addrress {}", addr,), + ) + }) } pub async fn get_secure_channel_listener( @@ -440,11 +444,13 @@ impl NodeManager { .secure_channel_listeners .get(addr) .await - .ok_or(ockam_core::Error::new( - Origin::Api, - Kind::NotFound, - format!("Secure channel with address, {}, not found", addr), - )) + .ok_or_else(|| { + ockam_core::Error::new( + Origin::Api, + Kind::NotFound, + format!("Secure channel with address, {}, not found", addr), + ) + }) } pub async fn list_secure_channel_listeners(&self) -> Vec { diff --git a/implementations/rust/ockam/ockam_api/src/nodes/service/tcp_inlets/node_manager.rs b/implementations/rust/ockam/ockam_api/src/nodes/service/tcp_inlets/node_manager.rs index b05eb088937..e801b3079e4 100644 --- a/implementations/rust/ockam/ockam_api/src/nodes/service/tcp_inlets/node_manager.rs +++ b/implementations/rust/ockam/ockam_api/src/nodes/service/tcp_inlets/node_manager.rs @@ -55,11 +55,13 @@ impl NodeManager { } let udp_transport = if enable_udp_puncture { - Some(self.udp_transport.clone().ok_or(ockam_core::Error::new( - Origin::Transport, - Kind::Invalid, - "Can't enable UDP puncture or non UDP node", - ))?) + Some(self.udp_transport.clone().ok_or_else(|| { + ockam_core::Error::new( + Origin::Transport, + Kind::Invalid, + "Can't enable UDP puncture or non UDP node", + ) + })?) } else { None }; diff --git a/implementations/rust/ockam/ockam_api/src/nodes/service/tcp_inlets/session_replacer.rs b/implementations/rust/ockam/ockam_api/src/nodes/service/tcp_inlets/session_replacer.rs index 77587262f68..9fb91fe0052 100644 --- a/implementations/rust/ockam/ockam_api/src/nodes/service/tcp_inlets/session_replacer.rs +++ b/implementations/rust/ockam/ockam_api/src/nodes/service/tcp_inlets/session_replacer.rs @@ -78,7 +78,7 @@ impl InletSessionReplacer { { Some( p.authority_identifier() - .ok_or(ApiError::core("no authority identifier"))?, + .ok_or_else(|| ApiError::core("no authority identifier"))?, ) } else { None @@ -311,14 +311,16 @@ impl AdditionalSessionReplacer for InletSessionReplacer { let udp_transport = self .udp_transport .as_ref() - .ok_or(Error::new( - Origin::Node, - Kind::Invalid, - "Couldn't create inlet with puncture", - ))? + .ok_or_else(|| { + Error::new( + Origin::Node, + Kind::Invalid, + "Couldn't create inlet with puncture", + ) + })? .clone(); - let mut main_route = if let Some(connection) = self.connection.as_ref() { + let main_route = if let Some(connection) = self.connection.as_ref() { connection.route()? } else { return Err(Error::new( diff --git a/implementations/rust/ockam/ockam_api/src/nodes/service/worker.rs b/implementations/rust/ockam/ockam_api/src/nodes/service/worker.rs index 70e78a41b6a..2580e6bcd63 100644 --- a/implementations/rust/ockam/ockam_api/src/nodes/service/worker.rs +++ b/implementations/rust/ockam/ockam_api/src/nodes/service/worker.rs @@ -254,7 +254,7 @@ impl Worker for NodeManagerWorker { } async fn handle_message(&mut self, ctx: &mut Context, msg: Routed>) -> Result<()> { - let return_route = msg.return_route(); + let return_route = msg.return_route().clone(); let body = msg.into_body()?; let mut dec = Decoder::new(&body); let req: RequestHeader = match dec.decode() { diff --git a/implementations/rust/ockam/ockam_api/src/okta/mod.rs b/implementations/rust/ockam/ockam_api/src/okta/mod.rs index 125ef526cf1..24a6ae53663 100644 --- a/implementations/rust/ockam/ockam_api/src/okta/mod.rs +++ b/implementations/rust/ockam/ockam_api/src/okta/mod.rs @@ -26,13 +26,13 @@ impl Worker for Server { async fn handle_message(&mut self, c: &mut Context, m: Routed) -> Result<()> { if let Ok(i) = SecureChannelLocalInfo::find_info(m.local_message()) { - let return_route = m.return_route(); + let return_route = m.return_route().clone(); let reply = self .on_request(&i.their_identifier().into(), &m.into_body()?) .await?; c.send(return_route, reply).await } else { - let return_route = m.return_route(); + let return_route = m.return_route().clone(); let body = m.into_body()?; let mut dec = Decoder::new(&body); let req: RequestHeader = dec.decode()?; diff --git a/implementations/rust/ockam/ockam_api/src/rendezvous_healthcheck.rs b/implementations/rust/ockam/ockam_api/src/rendezvous_healthcheck.rs index 8b1ba0f6d5a..182395dce07 100644 --- a/implementations/rust/ockam/ockam_api/src/rendezvous_healthcheck.rs +++ b/implementations/rust/ockam/ockam_api/src/rendezvous_healthcheck.rs @@ -52,11 +52,13 @@ impl RendezvousHealthcheck { )); } - let task = self.task.take().ok_or(Error::new( - Origin::Application, - Kind::Unknown, - "Can't start Healthcheck because it is already started (task is present)", - ))?; + let task = self.task.take().ok_or_else(|| { + Error::new( + Origin::Application, + Kind::Unknown, + "Can't start Healthcheck because it is already started (task is present)", + ) + })?; let listener = TcpListener::bind(self.healthcheck_listening_address.clone()) .await diff --git a/implementations/rust/ockam/ockam_api/src/uppercase.rs b/implementations/rust/ockam/ockam_api/src/uppercase.rs index 11e78023496..d0d1bb79599 100644 --- a/implementations/rust/ockam/ockam_api/src/uppercase.rs +++ b/implementations/rust/ockam/ockam_api/src/uppercase.rs @@ -9,7 +9,7 @@ impl Worker for Uppercase { #[instrument(skip_all, name = "Uppercase::handle_message")] async fn handle_message(&mut self, ctx: &mut Context, msg: Routed) -> Result<()> { - ctx.send(msg.return_route(), msg.into_body()?.to_uppercase()) + ctx.send(msg.return_route().clone(), msg.into_body()?.to_uppercase()) .await } } diff --git a/implementations/rust/ockam/ockam_api/src/util.rs b/implementations/rust/ockam/ockam_api/src/util.rs index 2e3243cf0dc..40957cec719 100644 --- a/implementations/rust/ockam/ockam_api/src/util.rs +++ b/implementations/rust/ockam/ockam_api/src/util.rs @@ -20,27 +20,33 @@ pub fn local_multiaddr_to_route(ma: &MultiAddr) -> Result { match p.code() { // Only hops that are directly translated to existing workers are allowed here Worker::CODE => { - let local = p.cast::().ok_or(Error::new( - Origin::Api, - Kind::Invalid, - format!("incorrect worker address {ma})",), - ))?; + let local = p.cast::().ok_or_else(|| { + Error::new( + Origin::Api, + Kind::Invalid, + format!("incorrect worker address {ma})",), + ) + })?; rb = rb.append(Address::new_with_string(LOCAL, &*local)) } Service::CODE => { - let local = p.cast::().ok_or(Error::new( - Origin::Api, - Kind::Invalid, - format!("incorrect service address {ma})",), - ))?; + let local = p.cast::().ok_or_else(|| { + Error::new( + Origin::Api, + Kind::Invalid, + format!("incorrect service address {ma})",), + ) + })?; rb = rb.append(Address::new_with_string(LOCAL, &*local)) } Secure::CODE => { - let local = p.cast::().ok_or(Error::new( - Origin::Api, - Kind::Invalid, - format!("incorrect secure address {ma})",), - ))?; + let local = p.cast::().ok_or_else(|| { + Error::new( + Origin::Api, + Kind::Invalid, + format!("incorrect secure address {ma})",), + ) + })?; rb = rb.append(Address::new_with_string(LOCAL, &*local)) } diff --git a/implementations/rust/ockam/ockam_api/tests/common/session.rs b/implementations/rust/ockam/ockam_api/tests/common/session.rs index aa9d6ba0844..86e2723d5be 100644 --- a/implementations/rust/ockam/ockam_api/tests/common/session.rs +++ b/implementations/rust/ockam/ockam_api/tests/common/session.rs @@ -51,8 +51,11 @@ impl Worker for MockEchoer { } } - ctx.send(msg.return_route(), NeutralMessage::from(msg.into_payload())) - .await?; + ctx.send( + msg.return_route().clone(), + NeutralMessage::from(msg.into_payload()), + ) + .await?; info!("Echo message back"); Ok(()) diff --git a/implementations/rust/ockam/ockam_api/tests/portals.rs b/implementations/rust/ockam/ockam_api/tests/portals.rs index d1b0ab9d1e6..b86a303bfc8 100644 --- a/implementations/rust/ockam/ockam_api/tests/portals.rs +++ b/implementations/rust/ockam/ockam_api/tests/portals.rs @@ -223,12 +223,12 @@ fn portal_node_goes_down_reconnect() { #[test] fn portal_low_bandwidth_connection_keep_working_for_60s() { // in this test we use two nodes, connected through a passthrough server - // which limits the bandwidth to 64kb per second + // which limits the bandwidth to 170kb per second // // ┌────────┐ ┌───────────┐ ┌────────┐ // │ Node └─────► TCP └────────► Node │ // │ 1 ◄─────┐Passthrough◄────────┐ 2 │ - // └────┬───┘ │ 64KB/s │ └────▲───┘ + // └────┬───┘ │ 170KB/s │ └────▲───┘ // │ └───────────┘ │ // │ ┌───────────┐ │ // │ Portal │ TCP │ Outlet │ @@ -270,8 +270,8 @@ fn portal_low_bandwidth_connection_keep_working_for_60s() { let passthrough_server_handle = start_passthrough_server( &second_node_listen_address.to_string(), - Disruption::LimitBandwidth(64 * 1024), - Disruption::LimitBandwidth(64 * 1024), + Disruption::LimitBandwidth(170 * 1024), + Disruption::LimitBandwidth(170 * 1024), ) .await; diff --git a/implementations/rust/ockam/ockam_command/src/secure_channel/listener/create.rs b/implementations/rust/ockam/ockam_command/src/secure_channel/listener/create.rs index d8e24f71192..09441c62611 100644 --- a/implementations/rust/ockam/ockam_command/src/secure_channel/listener/create.rs +++ b/implementations/rust/ockam/ockam_command/src/secure_channel/listener/create.rs @@ -101,7 +101,7 @@ pub async fn create_listener( addr: Address, authorized_identifiers: Option>, identity: Option, - mut base_route: Route, + base_route: Route, ) -> miette::Result<()> { let resp: Vec = ctx .send_and_receive( diff --git a/implementations/rust/ockam/ockam_core/src/env/from_string.rs b/implementations/rust/ockam/ockam_core/src/env/from_string.rs index 16185a9dc49..2c7f2bc0a22 100644 --- a/implementations/rust/ockam/ockam_core/src/env/from_string.rs +++ b/implementations/rust/ockam/ockam_core/src/env/from_string.rs @@ -117,11 +117,7 @@ pub fn parse_duration(arg: &str) -> Result { Regex::new(r"(?P[0-9]+)(?Pd|h|m|s|ms)?$").unwrap() }) .captures(arg) - .ok_or(Error::new( - Origin::Api, - Kind::Serialization, - "Invalid duration.", - ))?; + .ok_or_else(|| Error::new(Origin::Api, Kind::Serialization, "Invalid duration."))?; let time = needles["numeric_duration"] .parse::() .map_err(|_| Error::new(Origin::Api, Kind::Serialization, "Invalid duration."))?; diff --git a/implementations/rust/ockam/ockam_core/src/lib.rs b/implementations/rust/ockam/ockam_core/src/lib.rs index c4bc11e4c6f..a72fec734a6 100644 --- a/implementations/rust/ockam/ockam_core/src/lib.rs +++ b/implementations/rust/ockam/ockam_core/src/lib.rs @@ -125,13 +125,13 @@ where } /// Produces Ok(false) to avoid an ambiguous reading from using the unadorned value in auth code. -#[inline] +#[inline(always)] pub fn deny() -> Result { Ok(false) } /// Produces Ok(true) to avoid an ambiguous reading from using the unadorned value in auth code. -#[inline] +#[inline(always)] pub fn allow() -> Result { Ok(true) } diff --git a/implementations/rust/ockam/ockam_core/src/message.rs b/implementations/rust/ockam/ockam_core/src/message.rs index ee53ff71cb7..ad08f0800e6 100644 --- a/implementations/rust/ockam/ockam_core/src/message.rs +++ b/implementations/rust/ockam/ockam_core/src/message.rs @@ -226,18 +226,18 @@ impl Routed { /// Return a copy of the onward route for the wrapped message. #[inline] - pub fn onward_route(&self) -> Route { + pub fn onward_route(&self) -> &Route { self.local_msg.onward_route() } /// Return a copy of the full return route for the wrapped message. #[inline] - pub fn return_route(&self) -> Route { + pub fn return_route(&self) -> &Route { self.local_msg.return_route() } /// Return a copy of the sender address for the wrapped message. #[inline] - pub fn sender(&self) -> Result
{ + pub fn sender(&self) -> Result<&Address> { self.local_msg.return_route().recipient() } diff --git a/implementations/rust/ockam/ockam_core/src/routing/message/local_message.rs b/implementations/rust/ockam/ockam_core/src/routing/message/local_message.rs index f5269edf72e..d387a74c589 100644 --- a/implementations/rust/ockam/ockam_core/src/routing/message/local_message.rs +++ b/implementations/rust/ockam/ockam_core/src/routing/message/local_message.rs @@ -57,13 +57,8 @@ pub struct LocalMessage { } impl LocalMessage { - /// Return the message onward route - pub fn onward_route(&self) -> Route { - self.onward_route.clone() - } - /// Return a reference to the message onward route - pub fn onward_route_ref(&self) -> &Route { + pub fn onward_route(&self) -> &Route { &self.onward_route } @@ -85,7 +80,7 @@ impl LocalMessage { /// Prepend an address on the onward route pub fn push_front_onward_route(mut self, address: &Address) -> Self { - self.onward_route.modify().prepend(address.clone()); + self.onward_route = self.onward_route.modify().prepend(address.clone()).into(); self } @@ -98,7 +93,11 @@ impl LocalMessage { /// Prepend a route to the onward route pub fn prepend_front_onward_route(mut self, route: &Route) -> Self { - self.onward_route.modify().prepend_route(route.clone()); + self.onward_route = self + .onward_route + .modify() + .prepend_route(route.clone()) + .into(); self } @@ -109,12 +108,7 @@ impl LocalMessage { } /// Return the message return route - pub fn return_route(&self) -> Route { - self.return_route.clone() - } - - /// Return a reference to the message return route - pub fn return_route_ref(&self) -> &Route { + pub fn return_route(&self) -> &Route { &self.return_route } @@ -126,13 +120,17 @@ impl LocalMessage { /// Prepend an address to the return route pub fn push_front_return_route(mut self, address: &Address) -> Self { - self.return_route.modify().prepend(address.clone()); + self.return_route = self.return_route.modify().prepend(address.clone()).into(); self } /// Prepend a route to the return route pub fn prepend_front_return_route(mut self, route: &Route) -> Self { - self.return_route.modify().prepend_route(route.clone()); + self.return_route = self + .return_route + .modify() + .prepend_route(route.clone()) + .into(); self } diff --git a/implementations/rust/ockam/ockam_core/src/routing/message/relay_message.rs b/implementations/rust/ockam/ockam_core/src/routing/message/relay_message.rs index 537e165439d..4c26a4d2e58 100644 --- a/implementations/rust/ockam/ockam_core/src/routing/message/relay_message.rs +++ b/implementations/rust/ockam/ockam_core/src/routing/message/relay_message.rs @@ -38,12 +38,12 @@ impl RelayMessage { /// Onward route pub fn onward_route(&self) -> &Route { - self.local_msg.onward_route_ref() + self.local_msg.onward_route() } /// Return route pub fn return_route(&self) -> &Route { - self.local_msg.return_route_ref() + self.local_msg.return_route() } /// Payload diff --git a/implementations/rust/ockam/ockam_core/src/routing/route.rs b/implementations/rust/ockam/ockam_core/src/routing/route.rs index 88840b5bca5..a4b8a90ce0d 100644 --- a/implementations/rust/ockam/ockam_core/src/routing/route.rs +++ b/implementations/rust/ockam/ockam_core/src/routing/route.rs @@ -30,7 +30,7 @@ impl Route { /// ``` /// #[allow(clippy::new_ret_no_self)] - pub fn new() -> RouteBuilder<'static> { + pub fn new() -> RouteBuilder { RouteBuilder::new() } @@ -104,11 +104,8 @@ impl Route { /// .into(); /// ``` /// - pub fn modify(&mut self) -> RouteBuilder { - RouteBuilder { - inner: self.inner.clone(), - write_back: Some(self), - } + pub fn modify(self) -> RouteBuilder { + RouteBuilder { inner: self.inner } } /// Return the next `Address` and remove it from this route. @@ -132,7 +129,12 @@ impl Route { /// #[track_caller] pub fn step(&mut self) -> Result
{ - Ok(self.inner.pop_front().ok_or(RouteError::IncompleteRoute)?) + // to avoid the error being allocated when not needed + #[allow(clippy::unnecessary_lazy_evaluations)] + Ok(self + .inner + .pop_front() + .ok_or_else(|| RouteError::IncompleteRoute)?) } /// Return the next `Address` from this route without removing it. @@ -156,7 +158,12 @@ impl Route { /// #[track_caller] pub fn next(&self) -> Result<&Address> { - Ok(self.inner.front().ok_or(RouteError::IncompleteRoute)?) + // to avoid the error being allocated when not needed + #[allow(clippy::unnecessary_lazy_evaluations)] + Ok(self + .inner + .front() + .ok_or_else(|| RouteError::IncompleteRoute)?) } /// Return the final recipient address. @@ -169,7 +176,7 @@ impl Route { /// let route: Route = route!["1#alice", "bob"]; /// /// // "0#bob" - /// let final_hop: Address = route.recipient()?; + /// let final_hop: &Address = route.recipient()?; /// /// // ["1#alice", "0#bob"] /// route @@ -178,14 +185,13 @@ impl Route { /// # } /// ``` #[track_caller] - pub fn recipient(&self) -> Result
{ - // `TODO` For consistency we should return a - // Result<&Address> instead of an Address.clone(). + pub fn recipient(&self) -> Result<&Address> { + // to avoid the error being allocated when not needed + #[allow(clippy::unnecessary_lazy_evaluations)] Ok(self .inner .back() - .cloned() - .ok_or(RouteError::IncompleteRoute)?) + .ok_or_else(|| RouteError::IncompleteRoute)?) } /// Iterate over all addresses of this route. @@ -286,10 +292,10 @@ impl From for Vec
{ } /// Convert a `RouteBuilder` into a `Route`. -impl From> for Route { - fn from(RouteBuilder { ref inner, .. }: RouteBuilder) -> Self { +impl From for Route { + fn from(builder: RouteBuilder) -> Self { Self { - inner: inner.clone(), + inner: builder.inner, } } } @@ -304,23 +310,21 @@ impl> From for Route { } /// A utility type for building and manipulating routes. -pub struct RouteBuilder<'r> { +pub struct RouteBuilder { inner: VecDeque
, - write_back: Option<&'r mut Route>, } -impl Default for RouteBuilder<'_> { +impl Default for RouteBuilder { fn default() -> Self { Self::new() } } -impl RouteBuilder<'_> { +impl RouteBuilder { #[doc(hidden)] pub fn new() -> Self { Self { inner: VecDeque::new(), - write_back: None, } } @@ -489,15 +493,21 @@ impl RouteBuilder<'_> { self.inner.pop_back(); self } -} -impl Drop for RouteBuilder<'_> { - fn drop(&mut self) { - if self.write_back.is_some() { - **self.write_back.as_mut().unwrap() = Route { - inner: self.inner.clone(), - }; - } + /// Builds the route. + /// Same as `into()`, but without the need to specify the type. + /// + /// # Examples + /// + /// ``` + /// # use ockam_core::{route, Route, RouteBuilder}; + /// let route = Route::new() + /// .append("1#alice") + /// .append("bob") + /// .build(); + /// ``` + pub fn build(self) -> Route { + Route { inner: self.inner } } } @@ -554,7 +564,7 @@ mod tests { let mut route = route![address, "b"]; assert_eq!(route.next().unwrap(), &Address::from_string("0#a")); assert_eq!(route.next().unwrap(), &Address::from_string("0#a")); - assert_eq!(route.recipient().unwrap(), Address::from_string("0#b")); + assert_eq!(route.recipient().unwrap(), &Address::from_string("0#b")); assert_eq!(route.step().unwrap(), Address::from_string("0#a")); assert_eq!(route.step().unwrap(), Address::from_string("0#b")); } @@ -563,7 +573,10 @@ mod tests { fn test_route_create() { let addresses = vec!["node-1", "node-2"]; let route = Route::create(addresses); - assert_eq!(route.recipient().unwrap(), Address::from_string("0#node-2")); + assert_eq!( + route.recipient().unwrap(), + &Address::from_string("0#node-2") + ); } #[test] @@ -576,7 +589,10 @@ mod tests { let s = " node-1 =>node-2=> node-3 "; let mut route = Route::parse(s).unwrap(); assert_eq!(route.next().unwrap(), &Address::from_string("0#node-1")); - assert_eq!(route.recipient().unwrap(), Address::from_string("0#node-3")); + assert_eq!( + route.recipient().unwrap(), + &Address::from_string("0#node-3") + ); let _ = route.step(); assert_eq!(route.next().unwrap(), &Address::from_string("0#node-2")); } @@ -603,10 +619,10 @@ mod tests { #[test] fn test_route_prepend_route() { - let mut r1 = route!["a", "b", "c"]; + let r1 = route!["a", "b", "c"]; let r2 = route!["1", "2", "3"]; - r1.modify().prepend_route(r2); + let r1: Route = r1.modify().prepend_route(r2).into(); assert_eq!(r1, route!["1", "2", "3", "a", "b", "c"]); } diff --git a/implementations/rust/ockam/ockam_identity/src/secure_channel/decryptor.rs b/implementations/rust/ockam/ockam_identity/src/secure_channel/decryptor.rs index 683fca8eb04..d37ce85fbac 100644 --- a/implementations/rust/ockam/ockam_identity/src/secure_channel/decryptor.rs +++ b/implementations/rust/ockam/ockam_identity/src/secure_channel/decryptor.rs @@ -75,10 +75,11 @@ impl DecryptorHandler { &self.addresses.decryptor_remote ); - let return_route = msg.return_route(); + let msg = msg.into_local_message(); + let return_route = msg.return_route; // Decode raw payload binary - let mut request = DecryptionRequest::decode(msg.payload())?; + let mut request = DecryptionRequest::decode(&msg.payload)?; // Decrypt the binary let decrypted_payload = self.decryptor.decrypt(request.0.as_mut_slice()).await; @@ -98,7 +99,7 @@ impl DecryptorHandler { async fn handle_payload( &mut self, ctx: &mut Context, - mut msg: PlaintextPayloadMessage<'_>, + msg: PlaintextPayloadMessage<'_>, nonce: Nonce, encrypted_msg_return_route: Route, ) -> Result<()> { @@ -107,15 +108,18 @@ impl DecryptorHandler { // Only overwrite if we know that's the latest address if remote_route.last_nonce < nonce { let their_decryptor_address = remote_route.route.recipient()?; - remote_route.route = route![encrypted_msg_return_route, their_decryptor_address]; + remote_route.route = + route![encrypted_msg_return_route, their_decryptor_address.clone()]; remote_route.last_nonce = nonce; } } // Add encryptor hop in the return_route (instead of our address) - msg.return_route + let return_route = msg + .return_route .modify() - .prepend(self.addresses.encryptor.clone()); + .prepend(self.addresses.encryptor.clone()) + .into(); // Mark message LocalInfo with IdentitySecureChannelLocalInfo, // replacing any pre-existing entries @@ -124,7 +128,7 @@ impl DecryptorHandler { let msg = LocalMessage::new() .with_onward_route(msg.onward_route) - .with_return_route(msg.return_route) + .with_return_route(return_route) .with_payload(msg.payload.to_vec()) .with_local_info(local_info); @@ -193,10 +197,11 @@ impl DecryptorHandler { &self.addresses.decryptor_remote ); - let encrypted_msg_return_route = msg.return_route(); + let msg = msg.into_local_message(); + let encrypted_msg_return_route = msg.return_route; // Decode raw payload binary - let mut payload = msg.into_payload(); + let mut payload = msg.payload; // Decrypt the binary let (decrypted_payload, nonce) = self.decryptor.decrypt(payload.as_mut_slice()).await?; @@ -259,6 +264,8 @@ impl Decryptor { None }; + let rekey_key; + let rekeying = self.nonce_tracker.is_some(); let key = if rekeying { // get the key corresponding to the current nonce and @@ -266,10 +273,11 @@ impl Decryptor { if let Some(key) = self.key_tracker.get_key(nonce)? { key } else { - Encryptor::rekey(&self.vault, &self.key_tracker.current_key).await? + rekey_key = Encryptor::rekey(&self.vault, &self.key_tracker.current_key).await?; + &rekey_key } } else { - self.key_tracker.current_key.clone() + &self.key_tracker.current_key }; // to improve protection against connection disruption attacks, we want to validate the @@ -277,7 +285,7 @@ impl Decryptor { let result = self .vault .aead_decrypt( - &key, + key, &mut payload[NOISE_NONCE_LEN..], &nonce.to_aes_gcm_nonce(), &[], @@ -287,7 +295,7 @@ impl Decryptor { match result { Ok(result) => { self.nonce_tracker = nonce_tracker; - if let Some(key_to_delete) = self.key_tracker.update_key(key)? { + if let Some(key_to_delete) = self.key_tracker.update_key(&key.clone())? { self.vault.delete_aead_secret_key(key_to_delete).await?; } diff --git a/implementations/rust/ockam/ockam_identity/src/secure_channel/encryptor_worker.rs b/implementations/rust/ockam/ockam_identity/src/secure_channel/encryptor_worker.rs index 8d194f5151f..b958fdac970 100644 --- a/implementations/rust/ockam/ockam_identity/src/secure_channel/encryptor_worker.rs +++ b/implementations/rust/ockam/ockam_identity/src/secure_channel/encryptor_worker.rs @@ -127,10 +127,11 @@ impl EncryptorWorker { self.role, &self.addresses.encryptor ); - let return_route = msg.return_route(); + let msg = msg.into_local_message(); + let return_route = msg.return_route; // Decode raw payload binary - let request = EncryptionRequest::decode(msg.payload())?; + let request = EncryptionRequest::decode(&msg.payload)?; let mut should_stop = false; let len = NOISE_NONCE_LEN + request.0.len() + AES_GCM_TAGSIZE; @@ -178,13 +179,14 @@ impl EncryptorWorker { self.role, &self.addresses.encryptor ); - let mut onward_route = msg.onward_route(); - let return_route = msg.return_route(); + let msg = msg.into_local_message(); + let mut onward_route = msg.onward_route; + let return_route = msg.return_route; // Remove our address let _ = onward_route.step(); - let payload = CowBytes::from(msg.into_payload()); + let payload = CowBytes::from(msg.payload); let msg = PlaintextPayloadMessage { onward_route, return_route, diff --git a/implementations/rust/ockam/ockam_identity/src/secure_channel/handshake/handshake_worker.rs b/implementations/rust/ockam/ockam_identity/src/secure_channel/handshake/handshake_worker.rs index b3398a0af70..40c8b3383a2 100644 --- a/implementations/rust/ockam/ockam_identity/src/secure_channel/handshake/handshake_worker.rs +++ b/implementations/rust/ockam/ockam_identity/src/secure_channel/handshake/handshake_worker.rs @@ -264,8 +264,9 @@ impl HandshakeWorker { context: &mut Context, message: Routed, ) -> Result<()> { - let return_route = message.return_route(); - let payload = message.into_payload(); + let message = message.into_local_message(); + let return_route = message.return_route; + let payload = message.payload; if let SendMessage(send_message) = self .state_machine diff --git a/implementations/rust/ockam/ockam_identity/src/secure_channel/key_tracker.rs b/implementations/rust/ockam/ockam_identity/src/secure_channel/key_tracker.rs index 225348d630b..d0c48339e65 100644 --- a/implementations/rust/ockam/ockam_identity/src/secure_channel/key_tracker.rs +++ b/implementations/rust/ockam/ockam_identity/src/secure_channel/key_tracker.rs @@ -40,7 +40,7 @@ impl KeyTracker { /// - if it the previous nonce but is not set /// - we reached the maximum number of rekeyings #[instrument(skip_all)] - pub(crate) fn get_key(&self, nonce: Nonce) -> Result> { + pub(crate) fn get_key(&self, nonce: Nonce) -> Result> { trace!( "The current number of rekeys is {}, the rekey interval is {}", self.number_of_rekeys, @@ -62,7 +62,7 @@ impl KeyTracker { let nonce_age = nonce.value() - current_interval_start; // if the nonce falls in the current interval return the current key if nonce_age < self.renewal_interval { - Ok(Some(self.current_key.clone())) + Ok(Some(&self.current_key)) } // if the nonce falls in the next interval // otherwise indicate that we need to create a new key @@ -76,7 +76,7 @@ impl KeyTracker { } // else return the previous key (if there is one) if the nonce is not too old } else if current_interval_start - nonce.value() <= self.renewal_interval { - if let Some(previous) = self.previous_key.clone() { + if let Some(previous) = &self.previous_key { Ok(Some(previous)) } else { warn!("There should be a previous key for this nonce: {}", nonce); @@ -92,17 +92,16 @@ impl KeyTracker { #[instrument(skip_all)] pub(crate) fn update_key( &mut self, - decryption_key: AeadSecretKeyHandle, + decryption_key: &AeadSecretKeyHandle, ) -> Result> { let mut key_to_delete = None; // if the key used for the decryption is not the current key nor the previous key // this means that a rekeying happened - if decryption_key != self.current_key && Some(decryption_key.clone()) != self.previous_key { - if let Some(previous) = self.previous_key.clone() { - key_to_delete = Some(previous) - } + if decryption_key != &self.current_key && Some(decryption_key) != self.previous_key.as_ref() + { + key_to_delete = self.previous_key.clone(); self.previous_key.replace(self.current_key.clone()); - self.current_key = decryption_key; + self.current_key = decryption_key.clone(); if u64::MAX - self.number_of_rekeys * self.renewal_interval < self.renewal_interval { self.max_rekeys_reached = true; } else { @@ -125,9 +124,9 @@ mod tests { let handle = AeadSecretKeyHandle(Aes256GcmSecretKeyHandle(HandleToSecret::new(handle))); let key_tracker = KeyTracker::new(handle.clone(), 10); - assert_eq!(key_tracker.get_key(0.into()).unwrap(), Some(handle.clone())); - assert_eq!(key_tracker.get_key(5.into()).unwrap(), Some(handle.clone())); - assert_eq!(key_tracker.get_key(9.into()).unwrap(), Some(handle)); + assert_eq!(key_tracker.get_key(0.into()).unwrap(), Some(&handle)); + assert_eq!(key_tracker.get_key(5.into()).unwrap(), Some(&handle)); + assert_eq!(key_tracker.get_key(9.into()).unwrap(), Some(&handle)); assert_eq!( key_tracker.get_key(10.into()).unwrap(), None, @@ -178,21 +177,18 @@ mod tests { ); assert_eq!( key_tracker.get_key(40.into()).unwrap(), - Some(previous_handle.clone()) + Some(&previous_handle) ); assert_eq!( key_tracker.get_key(45.into()).unwrap(), - Some(previous_handle.clone()) + Some(&previous_handle) ); assert_eq!( key_tracker.get_key(49.into()).unwrap(), - Some(previous_handle) - ); - assert_eq!( - key_tracker.get_key(50.into()).unwrap(), - Some(handle.clone()) + Some(&previous_handle) ); - assert_eq!(key_tracker.get_key(59.into()).unwrap(), Some(handle)); + assert_eq!(key_tracker.get_key(50.into()).unwrap(), Some(&handle)); + assert_eq!(key_tracker.get_key(59.into()).unwrap(), Some(&handle)); assert_eq!( key_tracker.get_key(60.into()).unwrap(), None, @@ -247,13 +243,10 @@ mod tests { renewal_interval: 10, }; - assert_eq!(key_tracker.update_key(handle.clone()).unwrap(), None); - assert_eq!( - key_tracker.update_key(previous_handle.clone()).unwrap(), - None - ); + assert_eq!(key_tracker.update_key(&handle).unwrap(), None); + assert_eq!(key_tracker.update_key(&previous_handle).unwrap(), None); assert_eq!( - key_tracker.update_key(new_handle.clone()).unwrap(), + key_tracker.update_key(&new_handle).unwrap(), Some(previous_handle), "the previous key id must be returned in order to be deleted", ); @@ -281,7 +274,7 @@ mod tests { }; // this brings us to the last interval - key_tracker.update_key(new_handle).unwrap(); + key_tracker.update_key(&new_handle).unwrap(); assert!( !key_tracker.max_rekeys_reached, "the maximum number of rekeys is not yet reached" @@ -291,7 +284,7 @@ mod tests { let new_handle2 = b"new_handle2".to_vec(); let new_handle2 = AeadSecretKeyHandle(Aes256GcmSecretKeyHandle(HandleToSecret::new(new_handle2))); - key_tracker.update_key(new_handle2).unwrap(); + key_tracker.update_key(&new_handle2).unwrap(); assert!( key_tracker.max_rekeys_reached, "the maximum number of rekeys is reached now" diff --git a/implementations/rust/ockam/ockam_identity/src/secure_channels/common.rs b/implementations/rust/ockam/ockam_identity/src/secure_channels/common.rs index 83e9c2070b7..5171ff0acc4 100644 --- a/implementations/rust/ockam/ockam_identity/src/secure_channels/common.rs +++ b/implementations/rust/ockam/ockam_identity/src/secure_channels/common.rs @@ -90,7 +90,7 @@ impl SecureChannel { let old_route = remote_route.clone(); let their_decryptor_address = old_route.route.recipient()?; - let new_route = route![new_route, their_decryptor_address]; + let new_route = route![new_route, their_decryptor_address.clone()]; remote_route.route = new_route; diff --git a/implementations/rust/ockam/ockam_identity/tests/channel.rs b/implementations/rust/ockam/ockam_identity/tests/channel.rs index 2f405bb3a09..bcf55d3112d 100644 --- a/implementations/rust/ockam/ockam_identity/tests/channel.rs +++ b/implementations/rust/ockam/ockam_identity/tests/channel.rs @@ -63,7 +63,7 @@ async fn test_channel(ctx: &mut Context) -> Result<()> { let local_info = SecureChannelLocalInfo::find_info(msg.local_message())?; assert_eq!(Identifier::from(local_info.their_identifier()), alice); - let return_route = msg.return_route(); + let return_route = msg.return_route().clone(); assert_eq!("Hello, Bob!", msg.into_body()?); ctx.flow_controls() @@ -315,7 +315,7 @@ async fn test_channel_send_multiple_messages_both_directions(ctx: &mut Context) .await?; let message = child_ctx.receive::().await?; - let return_route = message.return_route(); + let return_route = message.return_route().clone(); assert_eq!(payload, message.into_body()?); child_ctx @@ -383,7 +383,7 @@ async fn test_channel_registry(ctx: &mut Context) -> Result<()> { .await?; let msg = bob_ctx.receive::().await?; - let return_route = msg.return_route(); + let return_route = msg.return_route().clone(); assert_eq!("Hello, Alice!", msg.into_body()?); @@ -445,7 +445,7 @@ async fn test_channel_api(ctx: &mut Context) -> Result<()> { .await?; let msg = bob_ctx.receive::().await?; - let return_route = msg.return_route(); + let return_route = msg.return_route().clone(); assert_eq!("Hello, Alice!", msg.into_body()?); @@ -572,7 +572,7 @@ async fn test_tunneled_secure_channel_works(ctx: &mut Context) -> Result<()> { ) .await?; let msg = child_ctx.receive::().await?; - let return_route = msg.return_route(); + let return_route = msg.return_route().clone(); assert_eq!("Hello, Bob!", msg.into_body()?); ctx.flow_controls() @@ -666,7 +666,7 @@ async fn test_double_tunneled_secure_channel_works(ctx: &mut Context) -> Result< ) .await?; let msg = child_ctx.receive::().await?; - let return_route = msg.return_route(); + let return_route = msg.return_route().clone(); assert_eq!("Hello, Bob!", msg.into_body()?); ctx.flow_controls() @@ -712,7 +712,7 @@ async fn test_many_times_tunneled_secure_channel_works(ctx: &mut Context) -> Res .await?; let mut route = route![i.to_string()]; if let Some(last_channel) = channels.last() { - route.modify().prepend(last_channel.clone()); + route = route.modify().prepend(last_channel.clone()).into(); } let options = SecureChannelOptions::new().with_trust_policy(alice_trust_policy.clone()); @@ -742,7 +742,7 @@ async fn test_many_times_tunneled_secure_channel_works(ctx: &mut Context) -> Res ) .await?; let msg = child_ctx.receive::().await?; - let return_route = msg.return_route(); + let return_route = msg.return_route().clone(); assert_eq!("Hello, Bob!", msg.into_body()?); ctx.flow_controls() diff --git a/implementations/rust/ockam/ockam_identity/tests/credentials_refresh.rs b/implementations/rust/ockam/ockam_identity/tests/credentials_refresh.rs index 64e25b9835b..f4e69584ea8 100644 --- a/implementations/rust/ockam/ockam_identity/tests/credentials_refresh.rs +++ b/implementations/rust/ockam/ockam_identity/tests/credentials_refresh.rs @@ -39,7 +39,8 @@ impl Worker for CredentialIssuer { return Ok(()); } - let subject = SecureChannelLocalInfo::find_info(msg.local_message())? + let msg = msg.into_local_message(); + let subject = SecureChannelLocalInfo::find_info(&msg)? .their_identifier() .into(); let credential = self @@ -60,7 +61,7 @@ impl Worker for CredentialIssuer { self.call_counter.fetch_add(1, Ordering::Relaxed); ctx.sleep(self.delay).await; - ctx.send(msg.return_route(), response).await?; + ctx.send(msg.return_route, response).await?; Ok(()) } diff --git a/implementations/rust/ockam/ockam_identity/tests/update_route.rs b/implementations/rust/ockam/ockam_identity/tests/update_route.rs index 0beb78e2b36..d74aabbd36c 100644 --- a/implementations/rust/ockam/ockam_identity/tests/update_route.rs +++ b/implementations/rust/ockam/ockam_identity/tests/update_route.rs @@ -40,10 +40,10 @@ async fn test_update_decryptor_route(ctx: &mut Context) -> Result<()> { ) .await?; - let msg = child_ctx.receive::().await?; + let msg = child_ctx.receive::().await?.into_local_message(); child_ctx - .send(msg.return_route(), "Hello, Alice!".to_string()) + .send(msg.return_route, "Hello, Alice!".to_string()) .await?; let msg = child_ctx.receive::().await?; @@ -59,10 +59,10 @@ async fn test_update_decryptor_route(ctx: &mut Context) -> Result<()> { ) .await?; - let msg = child_ctx.receive::().await?; + let msg = child_ctx.receive::().await?.into_local_message(); child_ctx - .send(msg.return_route(), "Hello, Alice!".to_string()) + .send(msg.return_route, "Hello, Alice!".to_string()) .await?; let msg = child_ctx.receive::().await?; @@ -132,10 +132,10 @@ async fn test_update_decryptor_route_tcp(ctx: &mut Context) -> Result<()> { ) .await?; - let msg = child_ctx.receive::().await?; + let msg = child_ctx.receive::().await?.into_local_message(); child_ctx - .send(msg.return_route(), "Hello, Alice!".to_string()) + .send(msg.return_route, "Hello, Alice!".to_string()) .await?; let msg = child_ctx.receive::().await?; @@ -153,10 +153,10 @@ async fn test_update_decryptor_route_tcp(ctx: &mut Context) -> Result<()> { ) .await?; - let msg = child_ctx.receive::().await?; + let msg = child_ctx.receive::().await?.into_local_message(); child_ctx - .send(msg.return_route(), "Hello, Alice!".to_string()) + .send(msg.return_route, "Hello, Alice!".to_string()) .await?; let msg = child_ctx.receive::().await?; diff --git a/implementations/rust/ockam/ockam_node/src/context/send_message.rs b/implementations/rust/ockam/ockam_node/src/context/send_message.rs index cf1c76e46d9..b31e7ddacc3 100644 --- a/implementations/rust/ockam/ockam_node/src/context/send_message.rs +++ b/implementations/rust/ockam/ockam_node/src/context/send_message.rs @@ -338,7 +338,7 @@ impl Context { // First resolve the next hop in the route let (reply_tx, mut reply_rx) = small_channel(); - let addr = match local_msg.onward_route_ref().next() { + let addr = match local_msg.onward_route().next() { Ok(next) => next.clone(), Err(err) => { // TODO: communicate bad routes to calling function diff --git a/implementations/rust/ockam/ockam_node/src/workers/echoer.rs b/implementations/rust/ockam/ockam_node/src/workers/echoer.rs index 1c63711e173..b8e3c0faa1c 100644 --- a/implementations/rust/ockam/ockam_node/src/workers/echoer.rs +++ b/implementations/rust/ockam/ockam_node/src/workers/echoer.rs @@ -18,6 +18,6 @@ impl Worker for Echoer { debug!("Address: {}, Received: {:?}", ctx.address(), msg); // Echo the message body back on its return_route. - ctx.send(msg.return_route(), msg.into_body()?).await + ctx.send(msg.return_route().clone(), msg.into_body()?).await } } diff --git a/implementations/rust/ockam/ockam_node/tests/tests.rs b/implementations/rust/ockam/ockam_node/tests/tests.rs index 575d3e7aee9..735169337ed 100644 --- a/implementations/rust/ockam/ockam_node/tests/tests.rs +++ b/implementations/rust/ockam/ockam_node/tests/tests.rs @@ -96,7 +96,7 @@ impl Worker for SimpleWorker { ctx: &mut Self::Context, msg: Routed, ) -> Result<()> { - ctx.send(msg.return_route(), msg.into_body()?).await + ctx.send(msg.return_route().clone(), msg.into_body()?).await } } @@ -380,7 +380,7 @@ impl Processor for MessagingProcessor { async fn process(&mut self, ctx: &mut Self::Context) -> Result { let msg = ctx.receive::().await.unwrap(); - let route = msg.return_route(); + let route = msg.return_route().clone(); let body = msg.into_body()?; match body.as_str() { @@ -568,7 +568,7 @@ impl Worker for SendReceiveWorker { type Message = Any; async fn handle_message(&mut self, ctx: &mut Context, msg: Routed) -> Result<()> { - let return_route = msg.return_route(); + let return_route = msg.return_route().clone(); let msg = SendReceiveRequest::decode(msg.payload())?; match msg { @@ -628,7 +628,7 @@ impl Worker for DummyWorker { ctx: &mut Self::Context, msg: Routed, ) -> Result<()> { - ctx.send(msg.return_route(), msg.into_body()?).await + ctx.send(msg.return_route().clone(), msg.into_body()?).await } } diff --git a/implementations/rust/ockam/ockam_transport_ble/src/router/mod.rs b/implementations/rust/ockam/ockam_transport_ble/src/router/mod.rs index 682e35dadea..78a2c0b3ef5 100644 --- a/implementations/rust/ockam/ockam_transport_ble/src/router/mod.rs +++ b/implementations/rust/ockam/ockam_transport_ble/src/router/mod.rs @@ -80,7 +80,7 @@ impl BleRouter { } async fn handle_route(&mut self, ctx: &Context, msg: LocalMessage) -> Result<()> { - debug!("Ble route request: {:?}", msg.onward_route_ref()); + debug!("Ble route request: {:?}", msg.onward_route()); // Get the next hop let onward = msg.next_on_onward_route()?; @@ -119,7 +119,7 @@ impl Worker for BleRouter { if msg_addr == self.main_addr { let msg = LocalMessage::decode(msg.payload())?; - trace!("handle_message route: {:?}", msg.onward_route_ref()); + trace!("handle_message route: {:?}", msg.onward_route()); self.handle_route(ctx, msg).await?; } else if msg_addr == self.api_addr { let msg = BleRouterMessage::decode(msg.payload())?; diff --git a/implementations/rust/ockam/ockam_transport_ble/src/workers/receiver.rs b/implementations/rust/ockam/ockam_transport_ble/src/workers/receiver.rs index b5cb48b34be..8c8fe2bc998 100644 --- a/implementations/rust/ockam/ockam_transport_ble/src/workers/receiver.rs +++ b/implementations/rust/ockam/ockam_transport_ble/src/workers/receiver.rs @@ -104,8 +104,8 @@ where msg = msg.push_front_return_route(&self.peer_addr); // Some verbose logging we may want to remove - debug!("Message onward route: {}", msg.onward_route_ref()); - debug!("Message return route: {}", msg.return_route_ref()); + debug!("Message onward route: {}", msg.onward_route()); + debug!("Message return route: {}", msg.return_route()); // Forward the message to the final destination worker, // which consumes the TransportMessage and yields the diff --git a/implementations/rust/ockam/ockam_transport_tcp/src/portal/interceptor.rs b/implementations/rust/ockam/ockam_transport_tcp/src/portal/interceptor.rs index 62ae1877f16..cb44928531b 100644 --- a/implementations/rust/ockam/ockam_transport_tcp/src/portal/interceptor.rs +++ b/implementations/rust/ockam/ockam_transport_tcp/src/portal/interceptor.rs @@ -104,7 +104,7 @@ impl Worker for PortalOutletInterceptor { let worker_address = PortalInterceptorWorker::create_outlet_interceptor( context, - message.onward_route(), + message.onward_route().clone(), flow_control_id, self.spawner_flow_control_id.clone(), self.incoming_access_control.clone(), @@ -131,8 +131,8 @@ impl Worker for PortalOutletInterceptor { trace!( "forwarding message: onward={:?}; return={:?}; worker={:?}", - &message.onward_route_ref(), - &message.return_route_ref(), + &message.onward_route(), + &message.return_route(), worker_address ); context.forward(message).await?; @@ -204,7 +204,7 @@ impl Worker for PortalInletInterceptor { .find_flow_control_with_producer_address(&next_hop) .map(|x| x.flow_control_id().clone()); - let inlet_responder_address = message.return_route_ref().next()?.clone(); + let inlet_responder_address = message.return_route().next()?.clone(); let worker_address = PortalInterceptorWorker::create_inlet_interceptor( context, @@ -220,8 +220,8 @@ impl Worker for PortalInletInterceptor { trace!( "forwarding message: onward={:?}; return={:?}; worker={:?}", - &message.onward_route_ref(), - &message.return_route_ref(), + &message.onward_route(), + &message.return_route(), worker_address ); @@ -287,6 +287,7 @@ impl Worker for PortalInterceptorWorker { } } PortalMessage::Disconnect => { + let return_route = return_route.clone(); self.forward(context, routed_message).await?; // the first one to receive disconnect and to swap the atomic will stop both workers @@ -316,7 +317,8 @@ impl Worker for PortalInterceptorWorker { fixed_onward_route, routed_message.return_route() ); - self.fixed_onward_route = Some(routed_message.return_route()); + self.fixed_onward_route = + Some(routed_message.into_local_message().return_route); } } } @@ -507,14 +509,14 @@ impl PortalInterceptorWorker { let mut local_message = routed_message.into_local_message(); tracing::trace!( "before: onwards={:?}; return={:?};", - local_message.onward_route_ref(), - local_message.return_route_ref() + local_message.onward_route(), + local_message.return_route() ); local_message = if let Some(fixed_onward_route) = &self.fixed_onward_route { tracing::trace!( "replacing onward_route {:?} with {:?}", - local_message.onward_route_ref(), + local_message.onward_route(), fixed_onward_route ); local_message @@ -526,7 +528,7 @@ impl PortalInterceptorWorker { // we can omit the previous return route. tracing::trace!( "replacing return_route {:?} with {:?}", - local_message.return_route_ref(), + local_message.return_route(), self.other_worker_address ); local_message.set_return_route(route![self.other_worker_address.clone()]) @@ -534,8 +536,8 @@ impl PortalInterceptorWorker { tracing::trace!( "after: onwards={:?}; return={:?};", - local_message.onward_route_ref(), - local_message.return_route_ref(), + local_message.onward_route(), + local_message.return_route(), ); context.forward(local_message).await } diff --git a/implementations/rust/ockam/ockam_transport_tcp/src/portal/outlet_listener.rs b/implementations/rust/ockam/ockam_transport_tcp/src/portal/outlet_listener.rs index aded5a09abe..52d2666dabe 100644 --- a/implementations/rust/ockam/ockam_transport_tcp/src/portal/outlet_listener.rs +++ b/implementations/rust/ockam/ockam_transport_tcp/src/portal/outlet_listener.rs @@ -80,9 +80,10 @@ impl Worker for TcpOutletListenWorker { let their_identifier = SecureChannelLocalInfo::find_info(msg.local_message()) .map(|l| l.their_identifier()) .ok(); - let return_route = msg.return_route(); let src_addr = msg.src_addr(); - let body = msg.into_body()?.into_vec(); + let msg = msg.into_local_message(); + let return_route = msg.return_route; + let body = msg.payload; let msg = PortalMessage::decode(&body)?; if !matches!(msg, PortalMessage::Ping) { diff --git a/implementations/rust/ockam/ockam_transport_tcp/src/portal/portal_message.rs b/implementations/rust/ockam/ockam_transport_tcp/src/portal/portal_message.rs index 6c2ba0fb5b0..603d8844d4d 100644 --- a/implementations/rust/ockam/ockam_transport_tcp/src/portal/portal_message.rs +++ b/implementations/rust/ockam/ockam_transport_tcp/src/portal/portal_message.rs @@ -108,7 +108,7 @@ pub enum PortalInternalMessage { } /// Maximum allowed size for a payload -pub const MAX_PAYLOAD_SIZE: usize = 48 * 1024; +pub const MAX_PAYLOAD_SIZE: usize = 128 * 1024; #[cfg(test)] mod test { diff --git a/implementations/rust/ockam/ockam_transport_tcp/src/portal/portal_worker.rs b/implementations/rust/ockam/ockam_transport_tcp/src/portal/portal_worker.rs index c077ebb4b3d..ef4c9f5049a 100644 --- a/implementations/rust/ockam/ockam_transport_tcp/src/portal/portal_worker.rs +++ b/implementations/rust/ockam/ockam_transport_tcp/src/portal/portal_worker.rs @@ -458,8 +458,10 @@ impl Worker for TcpPortalWorker { // Remove our own address from the route so the other end // knows what to do with the incoming message + + let msg = msg.into_local_message(); let state = self.clone_state(); - let mut onward_route = msg.onward_route(); + let mut onward_route = msg.onward_route; let recipient = onward_route.step()?; if onward_route.next().is_ok() { return Err(TransportError::UnknownRoute)?; @@ -467,7 +469,7 @@ impl Worker for TcpPortalWorker { let remote_packet = recipient != self.addresses.sender_internal; if remote_packet { - let their_identifier = SecureChannelLocalInfo::find_info(msg.local_message()) + let their_identifier = SecureChannelLocalInfo::find_info_from_list(&msg.local_info) .map(|l| l.their_identifier()) .ok(); @@ -481,8 +483,8 @@ impl Worker for TcpPortalWorker { } } - let return_route = msg.return_route(); - let payload = msg.into_payload(); + let return_route = msg.return_route; + let payload = msg.payload; match state { State::ReceivePong => { diff --git a/implementations/rust/ockam/ockam_transport_tcp/src/privileged_portal/workers/remote_worker.rs b/implementations/rust/ockam/ockam_transport_tcp/src/privileged_portal/workers/remote_worker.rs index 8d43a9b7a68..48ee5ad70d1 100644 --- a/implementations/rust/ockam/ockam_transport_tcp/src/privileged_portal/workers/remote_worker.rs +++ b/implementations/rust/ockam/ockam_transport_tcp/src/privileged_portal/workers/remote_worker.rs @@ -175,8 +175,9 @@ impl Worker for RemoteWorker { let their_identifier = SecureChannelLocalInfo::find_info(msg.local_message()) .map(|l| l.their_identifier()) .ok(); - let return_route = msg.return_route(); - let payload = msg.into_payload(); + let msg = msg.into_local_message(); + let return_route = msg.return_route; + let payload = msg.payload; let msg: OckamPortalPacket = minicbor::decode(&payload) .map_err(|e| TransportError::InvalidOckamPortalPacket(e.to_string()))?; diff --git a/implementations/rust/ockam/ockam_transport_tcp/src/transport/portals.rs b/implementations/rust/ockam/ockam_transport_tcp/src/transport/portals.rs index 9c0e29dcb9b..9909ff6549d 100644 --- a/implementations/rust/ockam/ockam_transport_tcp/src/transport/portals.rs +++ b/implementations/rust/ockam/ockam_transport_tcp/src/transport/portals.rs @@ -213,7 +213,7 @@ impl TcpInlet { fn build_new_full_route(new_route: Route, old_route: &Route) -> Result { let their_outlet_address = old_route.recipient()?; - Ok(route![new_route, their_outlet_address]) + Ok(route![new_route, their_outlet_address.clone()]) } /// Update the route to the outlet node. diff --git a/implementations/rust/ockam/ockam_transport_tcp/src/workers/receiver.rs b/implementations/rust/ockam/ockam_transport_tcp/src/workers/receiver.rs index 7cc0db1a0a8..13b8e969319 100644 --- a/implementations/rust/ockam/ockam_transport_tcp/src/workers/receiver.rs +++ b/implementations/rust/ockam/ockam_transport_tcp/src/workers/receiver.rs @@ -28,6 +28,7 @@ use tracing::{info, instrument, trace}; /// the node message system. pub(crate) struct TcpRecvProcessor { registry: TcpRegistry, + incoming_buffer: Vec, read_half: OwnedReadHalf, socket_address: SocketAddr, addresses: Addresses, @@ -47,6 +48,7 @@ impl TcpRecvProcessor { ) -> Self { Self { registry, + incoming_buffer: Vec::new(), read_half, socket_address, addresses, @@ -210,10 +212,12 @@ impl Processor for TcpRecvProcessor { trace!("Received message header for {} bytes", len); // Allocate a buffer of that size - let mut buf = vec![0; len_usize]; + self.incoming_buffer.clear(); + self.incoming_buffer.reserve(len_usize); + self.incoming_buffer.resize(len_usize, 0); // Then read into the buffer - match self.read_half.read_exact(&mut buf).await { + match self.read_half.read_exact(&mut self.incoming_buffer).await { Ok(_) => {} Err(e) => { self.notify_sender_stream_dropped(ctx, e).await?; @@ -222,7 +226,7 @@ impl Processor for TcpRecvProcessor { } // Deserialize the message now - let transport_message: TcpTransportMessage = match minicbor::decode(&buf) { + let transport_message: TcpTransportMessage = match minicbor::decode(&self.incoming_buffer) { Ok(msg) => msg, Err(e) => { self.notify_sender_stream_dropped(ctx, e).await?; @@ -240,8 +244,8 @@ impl Processor for TcpRecvProcessor { // reply routing can be properly resolved let local_message = local_message.push_front_return_route(self.addresses.sender_address()); - trace!("Message onward route: {}", local_message.onward_route_ref()); - trace!("Message return route: {}", local_message.return_route_ref()); + trace!("Message onward route: {}", local_message.onward_route()); + trace!("Message return route: {}", local_message.return_route()); // Forward the message to the next hop in the route ctx.forward_from_address(local_message, self.addresses.receiver_address().clone()) diff --git a/implementations/rust/ockam/ockam_transport_udp/src/puncture/negotiation/listener.rs b/implementations/rust/ockam/ockam_transport_udp/src/puncture/negotiation/listener.rs index edfd99c37ce..84791d2cb8d 100644 --- a/implementations/rust/ockam/ockam_transport_udp/src/puncture/negotiation/listener.rs +++ b/implementations/rust/ockam/ockam_transport_udp/src/puncture/negotiation/listener.rs @@ -132,7 +132,7 @@ impl Worker for UdpPunctureNegotiationListener { ) -> Result<()> { info!("Received a UDP puncture request"); - let return_route = msg.return_route(); + let return_route = msg.return_route().clone(); let msg = msg.into_body()?; let child_ctx = ctx diff --git a/implementations/rust/ockam/ockam_transport_udp/src/puncture/puncture/receiver.rs b/implementations/rust/ockam/ockam_transport_udp/src/puncture/puncture/receiver.rs index a45603a54fa..a49cd0bf8ac 100644 --- a/implementations/rust/ockam/ockam_transport_udp/src/puncture/puncture/receiver.rs +++ b/implementations/rust/ockam/ockam_transport_udp/src/puncture/puncture/receiver.rs @@ -145,10 +145,10 @@ impl UdpPunctureReceiverWorker { async fn handle_peer( &mut self, ctx: &mut Context, - msg: Routed, + payload: Vec, return_route: &Route, ) -> Result<()> { - let msg = PunctureMessage::decode(msg.payload())?; + let msg = PunctureMessage::decode(&payload)?; trace!("Puncture remote message: {:?}", msg); // Record contact with peer, but only for pong and payload message. @@ -174,7 +174,7 @@ impl UdpPunctureReceiverWorker { } PunctureMessage::Payload { onward_route, - mut return_route, + return_route, payload, } => { trace!("Received Payload from peer. Will forward to local entity"); @@ -292,9 +292,9 @@ impl Worker for UdpPunctureReceiverWorker { ) -> Result<()> { let addr = msg.msg_addr(); if &addr == self.addresses.remote_address() { - let return_route = msg.return_route(); - - self.handle_peer(ctx, msg, &return_route).await?; + let msg = msg.into_local_message(); + let return_route = msg.return_route; + self.handle_peer(ctx, msg.payload, &return_route).await?; } else if &addr == self.addresses.heartbeat_address() { self.handle_heartbeat(ctx).await?; } else { diff --git a/implementations/rust/ockam/ockam_transport_udp/src/puncture/puncture/sender.rs b/implementations/rust/ockam/ockam_transport_udp/src/puncture/puncture/sender.rs index 76e564016a4..b17e88cb56f 100644 --- a/implementations/rust/ockam/ockam_transport_udp/src/puncture/puncture/sender.rs +++ b/implementations/rust/ockam/ockam_transport_udp/src/puncture/puncture/sender.rs @@ -29,14 +29,16 @@ impl UdpPunctureSenderWorker { .clone() .ok_or(PunctureError::PunctureNotOpen)?; - let onward_route = msg.onward_route().modify().pop_front().into(); - let return_route = msg.return_route(); + let msg = msg.into_local_message(); + + let onward_route = msg.onward_route.modify().pop_front().into(); + let return_route = msg.return_route; // Wrap payload let wrapped_payload = PunctureMessage::Payload { onward_route, return_route, - payload: msg.into_payload(), + payload: msg.payload, }; let msg = LocalMessage::new() diff --git a/implementations/rust/ockam/ockam_transport_udp/src/puncture/rendezvous_service/rendezvous.rs b/implementations/rust/ockam/ockam_transport_udp/src/puncture/rendezvous_service/rendezvous.rs index 4ccba3fd92d..403cb4a2038 100644 --- a/implementations/rust/ockam/ockam_transport_udp/src/puncture/rendezvous_service/rendezvous.rs +++ b/implementations/rust/ockam/ockam_transport_udp/src/puncture/rendezvous_service/rendezvous.rs @@ -85,9 +85,9 @@ impl Worker for RendezvousServiceWorker { debug!( "Received message: {:?} from {}", msg, - Self::parse_route(&msg.return_route()) + Self::parse_route(msg.return_route()) ); - let return_route = msg.return_route(); + let return_route = msg.return_route().clone(); match msg.into_body()? { RendezvousRequest::Ping => { ctx.send(return_route, RendezvousResponse::Pong).await?; diff --git a/implementations/rust/ockam/ockam_transport_udp/src/workers/receiver.rs b/implementations/rust/ockam/ockam_transport_udp/src/workers/receiver.rs index 01168c41ff9..490e92e207d 100644 --- a/implementations/rust/ockam/ockam_transport_udp/src/workers/receiver.rs +++ b/implementations/rust/ockam/ockam_transport_udp/src/workers/receiver.rs @@ -203,8 +203,8 @@ impl Processor for UdpReceiverProcessor { local_message = local_message.set_return_route(return_route.into()); - trace!(onward_route = %local_message.onward_route_ref(), - return_route = %local_message.return_route_ref(), + trace!(onward_route = %local_message.onward_route(), + return_route = %local_message.return_route(), "Forwarding UDP message"); ctx.forward(local_message).await?; diff --git a/implementations/rust/ockam/ockam_transport_udp/src/workers/sender.rs b/implementations/rust/ockam/ockam_transport_udp/src/workers/sender.rs index 6eec193097e..ad696e2e5ee 100644 --- a/implementations/rust/ockam/ockam_transport_udp/src/workers/sender.rs +++ b/implementations/rust/ockam/ockam_transport_udp/src/workers/sender.rs @@ -62,7 +62,7 @@ impl Worker for UdpSenderWorker { // Parse message and remove our address from its routing let mut msg = msg.into_local_message(); msg = msg.pop_front_onward_route()?; - trace!("Sending message to {:?}", msg.onward_route_ref()); + trace!("Sending message to {:?}", msg.onward_route()); let peer = if let Some(peer) = &self.peer { *peer diff --git a/implementations/rust/ockam/ockam_transport_udp/tests/tests.rs b/implementations/rust/ockam/ockam_transport_udp/tests/tests.rs index d0bfa5e4fd5..f2c2815edae 100644 --- a/implementations/rust/ockam/ockam_transport_udp/tests/tests.rs +++ b/implementations/rust/ockam/ockam_transport_udp/tests/tests.rs @@ -479,6 +479,6 @@ impl Worker for Echoer { } debug!("Replying back to {}", &msg.return_route()); - ctx.send(msg.return_route(), msg.into_body()?).await + ctx.send(msg.return_route().clone(), msg.into_body()?).await } } diff --git a/implementations/rust/ockam/ockam_transport_uds/src/router/uds_router.rs b/implementations/rust/ockam/ockam_transport_uds/src/router/uds_router.rs index 96b7f503680..dc01b3a38d6 100644 --- a/implementations/rust/ockam/ockam_transport_uds/src/router/uds_router.rs +++ b/implementations/rust/ockam/ockam_transport_uds/src/router/uds_router.rs @@ -264,7 +264,7 @@ impl Worker for UdsRouter { } async fn handle_message(&mut self, ctx: &mut Context, msg: Routed) -> Result<()> { - let return_route = msg.return_route(); + let return_route = msg.return_route().clone(); let msg_addr = msg.msg_addr(); if msg_addr == self.main_addr { diff --git a/implementations/rust/ockam/ockam_transport_uds/src/workers/receiver.rs b/implementations/rust/ockam/ockam_transport_uds/src/workers/receiver.rs index ace759ccd3a..bee1fd79e97 100644 --- a/implementations/rust/ockam/ockam_transport_uds/src/workers/receiver.rs +++ b/implementations/rust/ockam/ockam_transport_uds/src/workers/receiver.rs @@ -92,8 +92,8 @@ impl Processor for UdsRecvProcessor { // reply routing can be properly resolved msg = msg.push_front_return_route(&self.peer_addr); - trace!("Message onward route: {}", msg.onward_route_ref()); - trace!("Message return route: {}", msg.return_route_ref()); + trace!("Message onward route: {}", msg.onward_route()); + trace!("Message return route: {}", msg.return_route()); // Forward the message to the next hop in the route ctx.forward(msg).await?; diff --git a/implementations/rust/ockam/ockam_transport_websocket/src/router/mod.rs b/implementations/rust/ockam/ockam_transport_websocket/src/router/mod.rs index ac1d88de82e..bf80994161f 100644 --- a/implementations/rust/ockam/ockam_transport_websocket/src/router/mod.rs +++ b/implementations/rust/ockam/ockam_transport_websocket/src/router/mod.rs @@ -122,7 +122,7 @@ impl Worker for WebSocketRouter { } async fn handle_message(&mut self, ctx: &mut Context, msg: Routed) -> Result<()> { - let return_route = msg.return_route(); + let return_route = msg.return_route().clone(); let msg_addr = msg.msg_addr(); if msg_addr == self.main_addr { @@ -152,10 +152,10 @@ impl Worker for WebSocketRouter { impl WebSocketRouter { async fn handle_route(&mut self, ctx: &Context, msg: LocalMessage) -> Result<()> { - trace!("WS route request: {:?}", msg.onward_route_ref().next()); + trace!("WS route request: {:?}", msg.onward_route().next()); // Get the next hop - let onward = msg.onward_route_ref().next()?; + let onward = msg.onward_route().next()?; let next; // Look up the connection worker responsible diff --git a/implementations/rust/ockam/ockam_transport_websocket/src/workers/receiver.rs b/implementations/rust/ockam/ockam_transport_websocket/src/workers/receiver.rs index 6ab3283ca39..5c7e1b06d6e 100644 --- a/implementations/rust/ockam/ockam_transport_websocket/src/workers/receiver.rs +++ b/implementations/rust/ockam/ockam_transport_websocket/src/workers/receiver.rs @@ -92,8 +92,8 @@ where msg = msg.push_front_return_route(&self.peer_addr); // Some verbose logging we may want to remove - trace!("Message onward route: {}", msg.onward_route_ref()); - trace!("Message return route: {}", msg.return_route_ref()); + trace!("Message onward route: {}", msg.onward_route()); + trace!("Message return route: {}", msg.return_route()); // Forward the message to the next hop in the route ctx.forward(msg).await?; diff --git a/implementations/rust/ockam/ockam_vault/src/software/vault_for_secure_channels/vault_for_secure_channels.rs b/implementations/rust/ockam/ockam_vault/src/software/vault_for_secure_channels/vault_for_secure_channels.rs index eaf5d540909..0cce4e24ef5 100644 --- a/implementations/rust/ockam/ockam_vault/src/software/vault_for_secure_channels/vault_for_secure_channels.rs +++ b/implementations/rust/ockam/ockam_vault/src/software/vault_for_secure_channels/vault_for_secure_channels.rs @@ -187,11 +187,13 @@ impl SoftwareVaultForSecureChannels { return Ok(secret.clone()); } + // to avoid always creating an `Error` instance + #[allow(clippy::unnecessary_lazy_evaluations)] Ok(self .secrets_repository .get_x25519_secret(handle) .await? - .ok_or(VaultError::KeyNotFound)?) + .ok_or_else(|| VaultError::KeyNotFound)?) } async fn get_buffer_secret(&self, handle: &SecretBufferHandle) -> Result { diff --git a/implementations/rust/ockam/ockam_vault/src/software/vault_for_signing/vault_for_signing.rs b/implementations/rust/ockam/ockam_vault/src/software/vault_for_signing/vault_for_signing.rs index 14b9ec2e479..68ef37dfb39 100644 --- a/implementations/rust/ockam/ockam_vault/src/software/vault_for_signing/vault_for_signing.rs +++ b/implementations/rust/ockam/ockam_vault/src/software/vault_for_signing/vault_for_signing.rs @@ -218,11 +218,13 @@ impl SoftwareVaultForSigning { &self, signing_secret_key_handle: &SigningSecretKeyHandle, ) -> Result { + // to avoid always creating an `Error` instance + #[allow(clippy::unnecessary_lazy_evaluations)] let stored_secret = self .secrets .get_signing_secret(signing_secret_key_handle) .await? - .ok_or(VaultError::KeyNotFound)?; + .ok_or_else(|| VaultError::KeyNotFound)?; Ok(stored_secret) } diff --git a/tools/profile/README.md b/tools/profile/README.md index 7118015cc65..7ffd0886bbf 100644 --- a/tools/profile/README.md +++ b/tools/profile/README.md @@ -1,22 +1,25 @@ # Profiling -This directory contains tools for profiling ockam. +This directory contains scripts for profiling ockam. +Each script simulates a portal in conjunction with a speed test called `iperf3`. -Two scenarios for performance profiling: -- `portal.perf` - local portal, within one node -- `portal_two_nodes.perf` - two nodes, one inlet and outlet -- `relay_port.perf` - one node, one inlet and outlet passing through a relay +The scenarios are: +- `portal` - local portal, within one node +- `portal_two_nodes` - two nodes, one inlet and outlet +- `portal_relay` - one node, one inlet and outlet passing through the project relay -And one scenario for heap profiling: -- `portal.valgrind.dhat` - local portal, within one node +Each comes with different variants: +- `baseline` - no profiling, useful for a quick benchmark +- `cpu` - profile CPU usage +- `allocations` - profile memory allocations ## Running the performance tests -To run the performance tests, simply run `tools/profile/SCRIPT` from the ockam -git root. +To run the performance tests, simply run `tools/profile/.` from the ockam +git root. The script uses the ports 5500 and 8200, and expects an environment without +any other node (otherwise the script might get stuck waiting for a stopped node). ## OS Compatibility -The performance scripts are currently compatible only with Linux since they use `perf`. -On MacOS, a similar approach should be doable with `dtrace`, but is not yet implemented. - -Heap profiling with valgrind is compatible with both Linux and MacOS. +CPU profiling is supported on Linux and MacOS. +Allocation profiling should work on both MacOS and Linux, but MacOS requires the +binary to be signed with an extra capability. diff --git a/tools/profile/portal.allocations b/tools/profile/portal.allocations new file mode 100755 index 00000000000..2097fde5334 --- /dev/null +++ b/tools/profile/portal.allocations @@ -0,0 +1,62 @@ +#!/bin/bash + +if ! [ -x "$(command -v iperf3)" ]; then + echo 'Error: iperf3 is not installed.' >&2 + exit 1 +fi + +if [ "$(uname)" == "Darwin" ]; then + if ! [ -x "$(command -v xctrace)" ]; then + echo 'Error: xctrace is not installed.' >&2 + exit 1 + fi +else + if ! [ -x "$(command -v valgrind)" ]; then + echo 'Error: valgrind is not installed.' >&2 + exit 1 + fi +fi + + +set -e + +if [ -z "${OCKAM}" ]; then + RUSTFLAGS="-C force-frame-pointers=yes" cargo build --profile profiling -p ockam_command -F ockam_vault/aws-lc + OCKAM=target/profiling/ockam +fi + +"${OCKAM}" node delete portal -y >/dev/null 2>&1 || true +export OCKAM_LOG_LEVEL=info +export OCKAM_OPENTELEMETRY_EXPORT=0 + +if [ "$(uname)" == "Darwin" ]; then + rm -rf /tmp/ockam.trace/ + xctrace record --template 'Allocations' --output /tmp/ockam.trace --launch -- "${OCKAM}" node create portal +else + valgrind --tool=dhat --trace-children=yes --dhat-out-file=/tmp/ockam.valgrind.dhat -- "${OCKAM}" node create portal +fi + + +sleep 1 +"${OCKAM}" tcp-outlet create --to 5500 --at portal +"${OCKAM}" tcp-inlet create --from 8200 --to /secure/api/service/outlet --at portal + +iperf3 --server --port 5500 --one-off & +iperf3_server_pid=$! + +sleep 0.3 # wait for server to start +iperf3 --zerocopy --client 127.0.0.1 --port 8200 --time 60 + +kill ${iperf3_server_pid} +"${OCKAM}" node delete portal -y + +if [ "$(uname)" == "Darwin" ]; then + echo "Waiting for xctrace to finish writing /tmp/ockam.trace..." + wait + echo "You can use XCode Instruments to open /tmp/ockam.trace" +else + echo "Waiting for valgrind to finish writing /tmp/ockam.valgrind.dhat..." + wait + + echo "To read the outcome, open the ``dh_view.html`` with a broweser and load the file /tmp/ockam.valgrind.dhat" +fi diff --git a/tools/profile/portal_baseline b/tools/profile/portal.baseline similarity index 100% rename from tools/profile/portal_baseline rename to tools/profile/portal.baseline diff --git a/tools/profile/portal.perf b/tools/profile/portal.cpu similarity index 100% rename from tools/profile/portal.perf rename to tools/profile/portal.cpu diff --git a/tools/profile/portal.valgrind.dhat b/tools/profile/portal.valgrind.dhat deleted file mode 100755 index aa641135165..00000000000 --- a/tools/profile/portal.valgrind.dhat +++ /dev/null @@ -1,40 +0,0 @@ -#!/bin/bash - -if ! [ -x "$(command -v iperf3)" ]; then - echo 'Error: iperf3 is not installed.' >&2 - exit 1 -fi - -if ! [ -x "$(command -v valgrind)" ]; then - echo 'Error: valgrind is not installed.' >&2 - exit 1 -fi - -set -e - -if [ -z "${OCKAM}" ]; then - RUSTFLAGS="-C force-frame-pointers=yes" cargo build --profile profiling -p ockam_command -F ockam_vault/aws-lc - OCKAM=target/profiling/ockam -fi - -"${OCKAM}" node delete portal -y >/dev/null 2>&1 || true -export OCKAM_LOG_LEVEL=info -valgrind --tool=dhat --trace-children=yes --dhat-out-file=/tmp/ockam.valgrind.dhat -- "${OCKAM}" node create portal - -sleep 1 -"${OCKAM}" tcp-outlet create --to 5000 --at portal -"${OCKAM}" tcp-inlet create --from 8000 --to /secure/api/service/outlet --at portal - -iperf3 --server --port 5000 --one-off & -iperf3_server_pid=$! - -sleep 0.3 # wait for server to start -iperf3 --zerocopy --client 127.0.0.1 --port 8000 --time 60 - -kill ${iperf3_server_pid} -"${OCKAM}" node delete portal -y - -echo "Waiting for valgrind to finish writing /tmp/ockam.valgrind.dhat..." -wait - -echo "To read the outcome, open the ``dh_view.html`` with a broweser and load the file /tmp/ockam.valgrind.dhat" diff --git a/tools/profile/portal_relay.baseline b/tools/profile/portal_relay.baseline new file mode 100755 index 00000000000..1fe268b838a --- /dev/null +++ b/tools/profile/portal_relay.baseline @@ -0,0 +1,42 @@ +#!/bin/bash + +if ! [ -x "$(command -v iperf3)" ]; then + echo 'Error: iperf3 is not installed.' >&2 + exit 1 +fi + +set -e + +if [ -z "${OCKAM}" ]; then + RUSTFLAGS="-C force-frame-pointers=yes" cargo build --profile profiling -p ockam_command + OCKAM=target/profiling/ockam +fi + +"${OCKAM}" node delete portal -y >/dev/null 2>&1 || true +export OCKAM_LOG_LEVEL=info +export OCKAM_OPENTELEMETRY_EXPORT=0 + +"${OCKAM}" node create portal -f & + +sleep 1 +"${OCKAM}" tcp-outlet create --to 5500 --at portal +"${OCKAM}" relay create --to portal +"${OCKAM}" tcp-inlet create --from 8200 --to /project/default/service/forward_to_default/secure/api/service/outlet --at portal + +iperf3 --server --port 5500 --one-off & +iperf3_server_pid=$! + +sleep 0.3 # wait for server to start +iperf3 --zerocopy --client 127.0.0.1 --port 8200 --time 60 + +kill ${iperf3_server_pid} +"${OCKAM}" node delete portal -y + +echo "Waiting for perf to finish writing /tmp/ockam.perf..." +wait ${perf_pid} + +echo "Converting perf file to firefox profiler format, could take up to few minutes..." +perf script -F +pid --input /tmp/ockam.perf > /tmp/ockam.perf.firefox + +echo "You can use firefox web profiler to open /tmp/ockam.perf.firefox file." +echo "https://profiler.firefox.com/" diff --git a/tools/profile/relay_portal.perf b/tools/profile/portal_relay.cpu similarity index 82% rename from tools/profile/relay_portal.perf rename to tools/profile/portal_relay.cpu index db150e899b6..5b8e8b03038 100755 --- a/tools/profile/relay_portal.perf +++ b/tools/profile/portal_relay.cpu @@ -19,19 +19,21 @@ fi "${OCKAM}" node delete portal -y >/dev/null 2>&1 || true export OCKAM_LOG_LEVEL=info +export OCKAM_OPENTELEMETRY_EXPORT=0 + perf record --call-graph dwarf -F 99 --output /tmp/ockam.perf -- "${OCKAM}" node create portal -f & perf_pid=$! sleep 1 -"${OCKAM}" tcp-outlet create --to 5000 --at portal +"${OCKAM}" tcp-outlet create --to 5500 --at portal "${OCKAM}" relay create --to portal -"${OCKAM}" tcp-inlet create --from 8000 --to /project/default/service/forward_to_default/secure/api/service/outlet --at portal +"${OCKAM}" tcp-inlet create --from 8200 --to /project/default/service/forward_to_default/secure/api/service/outlet --at portal -iperf3 --server --port 5000 --one-off & +iperf3 --server --port 5500 --one-off & iperf3_server_pid=$! sleep 0.3 # wait for server to start -iperf3 --zerocopy --client 127.0.0.1 --port 8000 --time 60 +iperf3 --zerocopy --client 127.0.0.1 --port 8200 --time 60 kill ${iperf3_server_pid} "${OCKAM}" node delete portal -y diff --git a/tools/profile/portal_two_nodes.baseline b/tools/profile/portal_two_nodes.baseline new file mode 100755 index 00000000000..699fe75d603 --- /dev/null +++ b/tools/profile/portal_two_nodes.baseline @@ -0,0 +1,35 @@ +#!/bin/bash + +if ! [ -x "$(command -v iperf3)" ]; then + echo 'Error: iperf3 is not installed.' >&2 + exit 1 +fi + +set -e + +if [ -z "${OCKAM}" ]; then + RUSTFLAGS="-C force-frame-pointers=yes" cargo build --profile profiling -p ockam_command -F ockam_vault/aws-lc + OCKAM=target/profiling/ockam +fi + +"${OCKAM}" node delete portal -y >/dev/null 2>&1 || true +export OCKAM_LOG_LEVEL=info +export OCKAM_OPENTELEMETRY_EXPORT=0 + +"${OCKAM}" node create inlet -f & +"${OCKAM}" node create outlet -f & + +sleep 1 +"${OCKAM}" tcp-outlet create --to 5500 --at outlet +"${OCKAM}" tcp-inlet create --from 8200 --to /node/outlet/secure/api/service/outlet --at inlet + +iperf3 --server --port 5500 --one-off & +iperf3_server_pid=$! + +sleep 0.3 # wait for server to start +iperf3 --zerocopy --client 127.0.0.1 --port 8200 --time 60 + +kill ${iperf3_server_pid} +"${OCKAM}" node delete inlet -y +"${OCKAM}" node delete outlet -y + diff --git a/tools/profile/portal_two_nodes.perf b/tools/profile/portal_two_nodes.cpu similarity index 84% rename from tools/profile/portal_two_nodes.perf rename to tools/profile/portal_two_nodes.cpu index 5cdff235018..91e0050a1ca 100755 --- a/tools/profile/portal_two_nodes.perf +++ b/tools/profile/portal_two_nodes.cpu @@ -19,18 +19,20 @@ fi "${OCKAM}" node delete portal -y >/dev/null 2>&1 || true export OCKAM_LOG_LEVEL=info +export OCKAM_OPENTELEMETRY_EXPORT=0 + perf record --call-graph dwarf -F 99 --output /tmp/ockam.inlet.perf -- "${OCKAM}" node create inlet -f & perf record --call-graph dwarf -F 99 --output /tmp/ockam.outlet.perf -- "${OCKAM}" node create outlet -f & sleep 1 -"${OCKAM}" tcp-outlet create --to 5000 --at outlet -"${OCKAM}" tcp-inlet create --from 8000 --to /node/outlet/secure/api/service/outlet --at inlet +"${OCKAM}" tcp-outlet create --to 5500 --at outlet +"${OCKAM}" tcp-inlet create --from 8200 --to /node/outlet/secure/api/service/outlet --at inlet -iperf3 --server --port 5000 --one-off & +iperf3 --server --port 5500 --one-off & iperf3_server_pid=$! sleep 0.3 # wait for server to start -iperf3 --zerocopy --client 127.0.0.1 --port 8000 --time 60 +iperf3 --zerocopy --client 127.0.0.1 --port 8200 --time 60 kill ${iperf3_server_pid} "${OCKAM}" node delete inlet -y diff --git a/tools/stress-test/src/portal_simulator.rs b/tools/stress-test/src/portal_simulator.rs index 7194610dd9d..93ea4aa0c30 100644 --- a/tools/stress-test/src/portal_simulator.rs +++ b/tools/stress-test/src/portal_simulator.rs @@ -13,6 +13,8 @@ use ockam_multiaddr::MultiAddr; use crate::config::Throughput; +pub const MAX_PAYLOAD_SIZE: usize = 128 * 1024; + pub struct PortalStats { pub messages_out_of_order: Arc, pub bytes_received: Arc, @@ -97,7 +99,7 @@ impl Processor for PortalSimulatorSender { while bytes_left > 0 { let next_message_number = self.messages_sent.fetch_add(1, Ordering::Relaxed); - let payload_size = std::cmp::min(bytes_left, 48 * 1024); + let payload_size = std::cmp::min(bytes_left, MAX_PAYLOAD_SIZE); let mut message = Vec::with_capacity(8 + 8 + payload_size); message.extend_from_slice(&next_message_number.to_le_bytes());