From 6aa2b308187c9cc580849e6b6d4c2668bb3525d4 Mon Sep 17 00:00:00 2001 From: Oleksandr Deundiak Date: Thu, 21 Nov 2024 18:56:44 +0100 Subject: [PATCH] feat(rust): add `UDP` support to nodes and multiaddr. refactor multiaddr --- ...bute-based-authentication-control-plane.rs | 11 +- ...tribute-based-authentication-edge-plane.rs | 11 +- implementations/rust/ockam/ockam/src/lib.rs | 4 +- .../ockam/ockam_api/src/cli_state/trust.rs | 18 +- .../ockam_api/src/cloud/project/project.rs | 23 +- .../ockam_api/src/cloud/secure_clients.rs | 59 ++-- .../rust/ockam/ockam_api/src/lib.rs | 2 + .../src/multiaddr_resolver/local_resolver.rs | 61 ++++ .../ockam_api/src/multiaddr_resolver/mod.rs | 28 ++ .../src/multiaddr_resolver/remote_resolver.rs | 241 ++++++++++++++ .../reverse_local_converter.rs | 31 ++ .../transport_route_resolver.rs | 186 +++++++++++ .../ockam_api/src/nodes/connection/mod.rs | 60 +++- .../src/nodes/connection/plain_tcp.rs | 30 +- .../src/nodes/connection/plain_udp.rs | 69 ++++ .../ockam_api/src/nodes/connection/project.rs | 38 ++- .../ockam_api/src/nodes/connection/secure.rs | 7 +- .../ockam_api/src/nodes/models/portal.rs | 7 +- .../ockam/ockam_api/src/nodes/models/relay.rs | 11 +- .../src/nodes/models/secure_channel.rs | 14 +- .../src/nodes/service/flow_controls.rs | 7 +- .../src/nodes/service/in_memory_node.rs | 2 +- .../ockam_api/src/nodes/service/manager.rs | 88 +++-- .../src/nodes/service/node_services.rs | 6 +- .../src/nodes/service/secure_channel.rs | 7 +- .../src/nodes/service/tcp_outlets.rs | 18 +- .../ockam/ockam_api/src/test_utils/mod.rs | 3 +- .../rust/ockam/ockam_api/src/util.rs | 315 +----------------- .../rust/ockam/ockam_app_lib/src/state/mod.rs | 2 +- .../ockam/ockam_command/src/node/create.rs | 13 + .../ockam_command/src/node/create/config.rs | 3 + .../src/node/create/foreground.rs | 21 +- .../rust/ockam/ockam_command/src/node/util.rs | 9 +- .../ockam/ockam_command/src/project/util.rs | 6 +- .../src/run/parser/resource/node.rs | 5 + .../src/secure_channel/create.rs | 4 +- .../src/secure_channel/delete.rs | 9 +- .../ockam_command/src/secure_channel/list.rs | 11 +- .../rust/ockam/ockam_multiaddr/src/codec.rs | 19 +- .../rust/ockam/ockam_multiaddr/src/lib.rs | 35 -- .../rust/ockam/ockam_multiaddr/src/proto.rs | 45 +++ .../ockam/ockam_multiaddr/src/registry.rs | 3 +- .../workers/remote_worker.rs | 6 +- .../ockam_transport_tcp/src/transport/mod.rs | 2 +- .../ockam_transport_udp/src/workers/sender.rs | 1 + tools/stress-test/src/main.rs | 2 +- 46 files changed, 1016 insertions(+), 537 deletions(-) create mode 100644 implementations/rust/ockam/ockam_api/src/multiaddr_resolver/local_resolver.rs create mode 100644 implementations/rust/ockam/ockam_api/src/multiaddr_resolver/mod.rs create mode 100644 implementations/rust/ockam/ockam_api/src/multiaddr_resolver/remote_resolver.rs create mode 100644 implementations/rust/ockam/ockam_api/src/multiaddr_resolver/reverse_local_converter.rs create mode 100644 implementations/rust/ockam/ockam_api/src/multiaddr_resolver/transport_route_resolver.rs create mode 100644 implementations/rust/ockam/ockam_api/src/nodes/connection/plain_udp.rs diff --git a/examples/rust/get_started/examples/11-attribute-based-authentication-control-plane.rs b/examples/rust/get_started/examples/11-attribute-based-authentication-control-plane.rs index bddca089016..29889b212ec 100644 --- a/examples/rust/get_started/examples/11-attribute-based-authentication-control-plane.rs +++ b/examples/rust/get_started/examples/11-attribute-based-authentication-control-plane.rs @@ -13,7 +13,7 @@ use ockam::{node, Context, Result}; use ockam_api::authenticator::enrollment_tokens::TokenAcceptor; use ockam_api::authenticator::one_time_code::OneTimeCode; use ockam_api::nodes::NodeManager; -use ockam_api::{multiaddr_to_route, multiaddr_to_transport_route}; +use ockam_api::{RemoteMultiaddrResolver, TransportRouteResolver}; use ockam_core::AsyncTryClone; use ockam_multiaddr::MultiAddr; @@ -75,7 +75,9 @@ async fn start_node(ctx: Context, project_information_path: &str, token: OneTime authority_node.present_token(node.context(), token).await.unwrap(); let project = import_project(project_information_path, node.identities()).await?; - let project_authority_route = multiaddr_to_transport_route(&project.authority_route()).unwrap(); // FIXME: Handle error + let project_authority_route = TransportRouteResolver::default() + .allow_tcp() + .resolve(&project.authority_route())?; // Create a credential retriever that will be used to obtain credentials let credential_retriever = Arc::new(RemoteCredentialRetrieverCreator::new( @@ -117,7 +119,10 @@ async fn start_node(ctx: Context, project_information_path: &str, token: OneTime // 5. create a relay on the Ockam orchestrator - let tcp_project_route = multiaddr_to_route(&project.route(), &tcp).await.unwrap(); // FIXME: Handle error + let tcp_project_route = RemoteMultiaddrResolver::default() + .with_tcp(tcp.clone()) + .resolve(&project.route()) + .await?; let project_options = SecureChannelOptions::new() .with_credential_retriever_creator(credential_retriever)? .with_authority(project.authority_identifier()) diff --git a/examples/rust/get_started/examples/11-attribute-based-authentication-edge-plane.rs b/examples/rust/get_started/examples/11-attribute-based-authentication-edge-plane.rs index 86bf872fc70..c94b2ea4829 100644 --- a/examples/rust/get_started/examples/11-attribute-based-authentication-edge-plane.rs +++ b/examples/rust/get_started/examples/11-attribute-based-authentication-edge-plane.rs @@ -9,7 +9,7 @@ use ockam::{route, Context, Result}; use ockam_api::authenticator::enrollment_tokens::TokenAcceptor; use ockam_api::authenticator::one_time_code::OneTimeCode; use ockam_api::nodes::NodeManager; -use ockam_api::{multiaddr_to_route, multiaddr_to_transport_route}; +use ockam_api::{RemoteMultiaddrResolver, TransportRouteResolver}; use ockam_core::compat::sync::Arc; use ockam_core::AsyncTryClone; use ockam_multiaddr::MultiAddr; @@ -73,7 +73,9 @@ async fn start_node(ctx: Context, project_information_path: &str, token: OneTime let project = import_project(project_information_path, node.identities()).await?; - let project_authority_route = multiaddr_to_transport_route(&project.route()).unwrap(); // FIXME: Handle error + let project_authority_route = TransportRouteResolver::default() + .allow_tcp() + .resolve(&project.route())?; // Create a credential retriever that will be used to obtain credentials let credential_retriever = Arc::new(RemoteCredentialRetrieverCreator::new( @@ -105,7 +107,10 @@ async fn start_node(ctx: Context, project_information_path: &str, token: OneTime // 4. create a tcp inlet with the above policy - let tcp_project_route = multiaddr_to_route(&project.route(), &tcp).await.unwrap(); // FIXME: Handle error + let tcp_project_route = RemoteMultiaddrResolver::default() + .with_tcp(tcp.clone()) + .resolve(&project.route()) + .await?; let project_options = SecureChannelOptions::new() .with_credential_retriever_creator(credential_retriever)? .with_authority(project.authority_identifier()) diff --git a/implementations/rust/ockam/ockam/src/lib.rs b/implementations/rust/ockam/ockam/src/lib.rs index 7c34dffb9c6..06ba9522c37 100644 --- a/implementations/rust/ockam/ockam/src/lib.rs +++ b/implementations/rust/ockam/ockam/src/lib.rs @@ -94,8 +94,8 @@ pub mod tcp { /// UDP transport pub mod udp { pub use ockam_transport_udp::{ - RendezvousClient, RendezvousService, UdpBindArguments, UdpBindOptions, UdpPuncture, - UdpPunctureNegotiation, UdpPunctureNegotiationListener, + RendezvousClient, RendezvousService, UdpBind, UdpBindArguments, UdpBindOptions, + UdpPuncture, UdpPunctureNegotiation, UdpPunctureNegotiationListener, UdpPunctureNegotiationListenerOptions, UdpTransport, UdpTransportExtension, MAX_MESSAGE_SIZE, UDP, }; diff --git a/implementations/rust/ockam/ockam_api/src/cli_state/trust.rs b/implementations/rust/ockam/ockam_api/src/cli_state/trust.rs index 7227a68d737..9deac10d249 100644 --- a/implementations/rust/ockam/ockam_api/src/cli_state/trust.rs +++ b/implementations/rust/ockam/ockam_api/src/cli_state/trust.rs @@ -3,7 +3,7 @@ use crate::nodes::service::{ CredentialScope, NodeManagerCredentialRetrieverOptions, NodeManagerTrustOptions, }; use crate::nodes::NodeManager; -use crate::{multiaddr_to_transport_route, ApiError, CliState}; +use crate::{ApiError, CliState, TransportRouteResolver}; use ockam::identity::models::ChangeHistory; use ockam::identity::{IdentitiesVerification, RemoteCredentialRetrieverInfo}; use ockam_core::errcode::{Kind, Origin}; @@ -46,12 +46,14 @@ impl CliState { } }; - let authority_route = - multiaddr_to_transport_route(authority_multiaddr).ok_or_else(|| { + let authority_route = TransportRouteResolver::default() + .allow_tcp() + .resolve(authority_multiaddr) + .map_err(|err| { Error::new( Origin::Api, Kind::NotFound, - format!("Invalid authority route: {}", &authority_multiaddr), + format!("Invalid authority route. Err: {}", &err), ) })?; let info = RemoteCredentialRetrieverInfo::create_for_project_member( @@ -116,12 +118,14 @@ impl CliState { .authority_identifier() .ok_or_else(|| ApiError::core("no authority identifier"))?; let authority_multiaddr = project.authority_multiaddr()?; - let authority_route = - multiaddr_to_transport_route(authority_multiaddr).ok_or_else(|| { + let authority_route = TransportRouteResolver::default() + .allow_tcp() + .resolve(authority_multiaddr) + .map_err(|err| { Error::new( Origin::Api, Kind::NotFound, - format!("Invalid authority route: {}", &authority_multiaddr), + format!("Invalid authority route. Err: {}", &err), ) })?; diff --git a/implementations/rust/ockam/ockam_api/src/cloud/project/project.rs b/implementations/rust/ockam/ockam_api/src/cloud/project/project.rs index f6cc9779323..118a130d066 100644 --- a/implementations/rust/ockam/ockam_api/src/cloud/project/project.rs +++ b/implementations/rust/ockam/ockam_api/src/cloud/project/project.rs @@ -10,6 +10,7 @@ use crate::error::ApiError; use crate::output::Output; use crate::terminal::fmt; +use crate::TransportRouteResolver; use ockam::identity::{Identifier, Identity, Vault}; use ockam_core::compat::collections::HashSet; use ockam_core::errcode::{Kind, Origin}; @@ -66,9 +67,9 @@ impl Project { // return the host and port of the project node. // Ex: if access_route is "/dnsaddr/node.dnsaddr.com/tcp/4000/service/api", // then this will return the string "node.dnsaddr.com:4000". - let socket_addr = multiaddr - .to_socket_addr() - .map_err(|e| ApiError::core(e.to_string()))?; + let socket_addr = TransportRouteResolver::default() + .allow_tcp() + .socket_address(&multiaddr)?; project_socket_addr = Some(socket_addr.clone()); egress_allow_list.insert(socket_addr); project_multiaddr = Some(multiaddr); @@ -92,9 +93,9 @@ impl Project { Some(authority_access_route) => { let multiaddr = MultiAddr::from_str(authority_access_route) .map_err(|e| ApiError::core(e.to_string()))?; - let socket_addr = multiaddr - .to_socket_addr() - .map_err(|e| ApiError::core(e.to_string()))?; + let socket_addr = TransportRouteResolver::default() + .allow_tcp() + .socket_address(&multiaddr)?; authority_socket_addr = Some(socket_addr.clone()); egress_allow_list.insert(socket_addr); authority_multiaddr = Some(multiaddr) @@ -260,7 +261,10 @@ impl Output for Project { fmt::PADDING, fmt::INDENTATION, self.project_multiaddr() - .map(|m| m.to_socket_addr().unwrap_or("N/A".to_string())) + .map(|m| TransportRouteResolver::default() + .allow_tcp() + .socket_address(m) + .unwrap_or("N/A".to_string())) .unwrap_or("N/A".to_string()) )?; writeln!( @@ -301,7 +305,10 @@ impl Output for Project { fmt::PADDING, fmt::INDENTATION, self.authority_multiaddr() - .map(|m| m.to_socket_addr().unwrap_or("N/A".to_string())) + .map(|m| TransportRouteResolver::default() + .allow_tcp() + .socket_address(m) + .unwrap_or("N/A".to_string())) .unwrap_or("N/A".to_string()) )?; writeln!( diff --git a/implementations/rust/ockam/ockam_api/src/cloud/secure_clients.rs b/implementations/rust/ockam/ockam_api/src/cloud/secure_clients.rs index f186d43ae0a..38f10c6fb0d 100644 --- a/implementations/rust/ockam/ockam_api/src/cloud/secure_clients.rs +++ b/implementations/rust/ockam/ockam_api/src/cloud/secure_clients.rs @@ -8,13 +8,13 @@ use ockam::identity::{ use ockam::tcp::TcpTransport; use ockam_core::compat::sync::Arc; use ockam_core::env::{get_env, get_env_with_default, FromString}; -use ockam_core::{Result, Route}; +use ockam_core::errcode::{Kind, Origin}; +use ockam_core::{Error, Result, Route}; use ockam_multiaddr::MultiAddr; use ockam_node::Context; -use crate::error::ApiError; -use crate::multiaddr_to_transport_route; use crate::nodes::NodeManager; +use crate::TransportRouteResolver; pub const OCKAM_CONTROLLER_ADDR: &str = "OCKAM_CONTROLLER_ADDR"; pub const DEFAULT_CONTROLLER_ADDRESS: &str = "/dnsaddr/orchestrator.ockam.io/tcp/6252/service/api"; @@ -138,11 +138,16 @@ impl NodeManager { caller_identifier: &Identifier, credential_retriever_creator: Option>, ) -> Result { - let authority_route = multiaddr_to_transport_route(authority_route).ok_or_else(|| { - ApiError::core(format!( - "Couldn't convert MultiAddr to route: multiaddr={authority_route}" - )) - })?; + let authority_route = TransportRouteResolver::default() + .allow_tcp() + .resolve(authority_route) + .map_err(|err| { + Error::new( + Origin::Api, + Kind::NotFound, + format!("Invalid authority route. Err: {}", &err), + ) + })?; Ok(AuthorityNodeClient { secure_client: SecureClient::new( @@ -167,11 +172,16 @@ impl NodeManager { project_multiaddr: &MultiAddr, caller_identifier: &Identifier, ) -> Result { - let project_route = multiaddr_to_transport_route(project_multiaddr).ok_or_else(|| { - ApiError::core(format!( - "Couldn't convert MultiAddr to route: multiaddr={project_multiaddr}" - )) - })?; + let project_route = TransportRouteResolver::default() + .allow_tcp() + .resolve(project_multiaddr) + .map_err(|err| { + Error::new( + Origin::Api, + Kind::NotFound, + format!("Invalid project node route. Err: {}", &err), + ) + })?; Ok(ProjectNodeClient { secure_client: SecureClient::new( @@ -194,11 +204,9 @@ impl NodeManager { multiaddr: &MultiAddr, caller_identifier: &Identifier, ) -> Result { - let route = multiaddr_to_transport_route(multiaddr).ok_or_else(|| { - ApiError::core(format!( - "Couldn't convert MultiAddr to route: multiaddr={multiaddr}" - )) - })?; + let route = TransportRouteResolver::default() + .allow_tcp() + .resolve(multiaddr)?; Ok(GenericSecureClient { secure_client: SecureClient::new( @@ -233,11 +241,16 @@ impl NodeManager { pub async fn controller_route() -> Result { let multiaddr = Self::controller_multiaddr(); - multiaddr_to_transport_route(&multiaddr).ok_or_else(|| { - ApiError::core(format!( - "Couldn't convert MultiAddr to route: multiaddr={multiaddr}" - )) - }) + TransportRouteResolver::default() + .allow_tcp() + .resolve(&multiaddr) + .map_err(|err| { + Error::new( + Origin::Api, + Kind::NotFound, + format!("Invalid controller route. Err: {}", &err), + ) + }) } } diff --git a/implementations/rust/ockam/ockam_api/src/lib.rs b/implementations/rust/ockam/ockam_api/src/lib.rs index eac52566c41..55adead0306 100644 --- a/implementations/rust/ockam/ockam_api/src/lib.rs +++ b/implementations/rust/ockam/ockam_api/src/lib.rs @@ -44,6 +44,7 @@ pub mod logs; mod schema; mod date; +mod multiaddr_resolver; mod rendezvous_healthcheck; pub mod test_utils; mod ui; @@ -52,6 +53,7 @@ mod util; pub use cli_state::CliState; pub use date::UtcDateTime; pub use error::*; +pub use multiaddr_resolver::*; pub use nodes::service::default_address::*; pub use rendezvous_healthcheck::*; pub use session::connection_status::ConnectionStatus; diff --git a/implementations/rust/ockam/ockam_api/src/multiaddr_resolver/local_resolver.rs b/implementations/rust/ockam/ockam_api/src/multiaddr_resolver/local_resolver.rs new file mode 100644 index 00000000000..2f3985c804b --- /dev/null +++ b/implementations/rust/ockam/ockam_api/src/multiaddr_resolver/local_resolver.rs @@ -0,0 +1,61 @@ +use crate::multiaddr_resolver::invalid_multiaddr_error; +use ockam_core::errcode::{Kind, Origin}; +use ockam_core::{Address, Error, Result, Route, LOCAL}; +use ockam_multiaddr::proto::{DnsAddr, Ip4, Ip6, Node, Secure, Service, Worker}; +use ockam_multiaddr::{MultiAddr, Protocol}; + +pub struct LocalMultiaddrResolver {} + +impl LocalMultiaddrResolver { + /// Try to convert a local multi-address to an Ockam route. + pub fn resolve(ma: &MultiAddr) -> Result { + let mut rb = Route::new(); + for p in ma.iter() { + match p.code() { + // Only hops that are directly translated to existing workers are allowed here + Worker::CODE => { + let local = p + .cast::() + .ok_or_else(|| invalid_multiaddr_error(ma))?; + rb = rb.append(Address::new_with_string(LOCAL, &*local)) + } + Service::CODE => { + let local = p + .cast::() + .ok_or_else(|| invalid_multiaddr_error(ma))?; + rb = rb.append(Address::new_with_string(LOCAL, &*local)) + } + Secure::CODE => { + let local = p + .cast::() + .ok_or_else(|| invalid_multiaddr_error(ma))?; + rb = rb.append(Address::new_with_string(LOCAL, &*local)) + } + + Node::CODE => { + return Err(Error::new( + Origin::Api, + Kind::Invalid, + "unexpected code: node. clean_multiaddr should have been called", + )); + } + + code @ (Ip4::CODE | Ip6::CODE | DnsAddr::CODE) => { + return Err(Error::new( + Origin::Api, + Kind::Invalid, + format!( + "unexpected code: {code}. The address must be a local address {ma}" + ), + )); + } + + _ => { + return Err(invalid_multiaddr_error(ma)); + } + } + } + + Ok(rb.into()) + } +} diff --git a/implementations/rust/ockam/ockam_api/src/multiaddr_resolver/mod.rs b/implementations/rust/ockam/ockam_api/src/multiaddr_resolver/mod.rs new file mode 100644 index 00000000000..da971a511a3 --- /dev/null +++ b/implementations/rust/ockam/ockam_api/src/multiaddr_resolver/mod.rs @@ -0,0 +1,28 @@ +mod local_resolver; +mod remote_resolver; +mod reverse_local_converter; +mod transport_route_resolver; + +pub use local_resolver::*; +use ockam_core::errcode::{Kind, Origin}; +use ockam_core::Error; +use ockam_multiaddr::MultiAddr; +pub use remote_resolver::*; +pub use reverse_local_converter::*; +pub use transport_route_resolver::*; + +fn invalid_multiaddr_error(ma: &MultiAddr) -> Error { + Error::new( + Origin::Api, + Kind::Misuse, + format!("Invalid multiaddr {}", ma), + ) +} + +fn multiple_transport_hops_error(ma: &MultiAddr) -> Error { + Error::new( + Origin::Api, + Kind::Unsupported, + format!("Only one hop is allowed in a multiaddr. Multiaddr={}", ma), + ) +} diff --git a/implementations/rust/ockam/ockam_api/src/multiaddr_resolver/remote_resolver.rs b/implementations/rust/ockam/ockam_api/src/multiaddr_resolver/remote_resolver.rs new file mode 100644 index 00000000000..95cb62a0b8f --- /dev/null +++ b/implementations/rust/ockam/ockam_api/src/multiaddr_resolver/remote_resolver.rs @@ -0,0 +1,241 @@ +use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4}; + +use crate::multiaddr_resolver::{invalid_multiaddr_error, multiple_transport_hops_error}; +use ockam::tcp::{TcpConnection, TcpConnectionOptions, TcpTransport}; +use ockam::udp::{UdpBind, UdpBindArguments, UdpBindOptions, UdpTransport}; +use ockam_core::errcode::{Kind, Origin}; +use ockam_core::flow_control::FlowControlId; +use ockam_core::{Address, Error, Result, Route, LOCAL}; +use ockam_multiaddr::proto::{DnsAddr, Ip4, Ip6, Secure, Service, Tcp, Udp, Worker}; +use ockam_multiaddr::{MultiAddr, ProtoIter, Protocol}; + +pub enum RemoteMultiaddrResolverConnection { + Tcp(TcpConnection), + Udp(UdpBind), +} + +impl RemoteMultiaddrResolverConnection { + fn flow_control_id(&self) -> &FlowControlId { + match self { + RemoteMultiaddrResolverConnection::Tcp(c) => c.flow_control_id(), + RemoteMultiaddrResolverConnection::Udp(b) => b.flow_control_id(), + } + } + + fn sender_address(&self) -> &Address { + match self { + RemoteMultiaddrResolverConnection::Tcp(t) => t.sender_address(), + RemoteMultiaddrResolverConnection::Udp(b) => b.sender_address(), + } + } +} + +pub struct RemoteMultiaddrResolverResult { + pub flow_control_id: Option, + pub route: Route, + pub connection: Option, +} + +#[derive(Default, Clone, Debug)] +pub struct RemoteMultiaddrResolver { + tcp: Option, + udp: Option, + udp_bind_address: Option, +} + +impl RemoteMultiaddrResolver { + pub fn new(tcp: Option, udp: Option) -> Self { + Self { + tcp, + udp, + udp_bind_address: None, + } + } + + pub fn with_tcp(&mut self, tcp: TcpTransport) -> &mut Self { + self.tcp = Some(tcp); + self + } + + pub fn with_udp(&mut self, udp: UdpTransport, bind_address: Option) -> &mut Self { + self.udp = Some(udp); + self.udp_bind_address = bind_address; + self + } +} + +fn unsupported_protocol_error(ma: &MultiAddr) -> Error { + Error::new( + Origin::Api, + Kind::Unsupported, + format!("Unsupported multiaddr protocol: {}", ma), + ) +} + +impl RemoteMultiaddrResolver { + pub async fn resolve(&self, ma: &MultiAddr) -> Result { + let mut rb = Route::new(); + let mut it = ma.iter(); + + let mut flow_control_id = None; + let mut connection_res = None; + + // Only one transport hop is allowed + let mut transport_hop_resolved = false; + + while let Some(p) = it.next() { + let peer = match p.code() { + Ip4::CODE => { + if transport_hop_resolved { + return Err(multiple_transport_hops_error(ma)); + } + + let ip4 = p.cast::().ok_or_else(|| invalid_multiaddr_error(ma))?; + + (*ip4).to_string() + } + Ip6::CODE => { + if transport_hop_resolved { + return Err(multiple_transport_hops_error(ma)); + } + + let ip6 = p.cast::().ok_or_else(|| invalid_multiaddr_error(ma))?; + + (*ip6).to_string() + } + DnsAddr::CODE => { + if transport_hop_resolved { + return Err(multiple_transport_hops_error(ma)); + } + + let host = p + .cast::() + .ok_or_else(|| invalid_multiaddr_error(ma))?; + + (*host).to_string() + } + Worker::CODE => { + let local = p + .cast::() + .ok_or_else(|| invalid_multiaddr_error(ma))?; + rb = rb.append(Address::new_with_string(LOCAL, &*local)); + continue; + } + Service::CODE => { + let local = p + .cast::() + .ok_or_else(|| invalid_multiaddr_error(ma))?; + rb = rb.append(Address::new_with_string(LOCAL, &*local)); + continue; + } + Secure::CODE => { + let local = p + .cast::() + .ok_or_else(|| invalid_multiaddr_error(ma))?; + rb = rb.append(Address::new_with_string(LOCAL, &*local)); + continue; + } + _ => { + return Err(unsupported_protocol_error(ma)); + } + }; + + let connection = self.connect(ma, &mut it, &peer).await?; + transport_hop_resolved = true; + flow_control_id = Some(connection.flow_control_id().clone()); + rb = rb.append(connection.sender_address().clone()); + connection_res = Some(connection); + } + + Ok(RemoteMultiaddrResolverResult { + flow_control_id, + connection: connection_res, + route: rb.into(), + }) + } + + async fn connect_tcp( + &self, + tcp: &TcpTransport, + ma: &MultiAddr, + peer: String, + ) -> Result { + tcp.connect(peer, TcpConnectionOptions::new()) + .await + .map_err(|err| { + Error::new( + Origin::Api, + Kind::Io, + format!( + "Couldn't make TCP connection while resolving multiaddr: {}. Err: {}", + ma, err + ), + ) + }) + } + + async fn connect_udp(&self, udp: &UdpTransport, ma: &MultiAddr, peer: &str) -> Result { + let bind_address = self + .udp_bind_address + .unwrap_or_else(|| SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0))); + + let arguments = UdpBindArguments::new() + .with_bind_socket_address(bind_address) + .with_peer_address(peer) + .await?; + + udp.bind(arguments, UdpBindOptions::new()) + .await + .map_err(|err| { + Error::new( + Origin::Api, + Kind::Io, + format!( + "Couldn't make UDP connection while resolving multiaddr: {}. Err: {}", + ma, err + ), + ) + }) + } + + async fn connect( + &self, + ma: &MultiAddr, + it: &mut ProtoIter<'_>, + peer: &str, + ) -> Result { + let next = it.next().ok_or_else(|| invalid_multiaddr_error(ma))?; + + if let Some(port) = next.cast::() { + let tcp = self.tcp.as_ref().ok_or_else(|| { + Error::new( + Origin::Api, + Kind::Unsupported, + format!("TCP hops are not allowed. Multiaddr={}", ma), + ) + })?; + + let peer = format!("{}:{}", peer, *port); + let connection = self.connect_tcp(tcp, ma, peer).await?; + + return Ok(RemoteMultiaddrResolverConnection::Tcp(connection)); + } + + if let Some(port) = next.cast::() { + let udp = self.udp.as_ref().ok_or_else(|| { + Error::new( + Origin::Api, + Kind::Unsupported, + format!("UDP hops are not allowed. Multiaddr={}", ma), + ) + })?; + + let peer = format!("{}:{}", peer, *port); + let connection = self.connect_udp(udp, ma, &peer).await?; + + return Ok(RemoteMultiaddrResolverConnection::Udp(connection)); + } + + Err(unsupported_protocol_error(ma)) + } +} diff --git a/implementations/rust/ockam/ockam_api/src/multiaddr_resolver/reverse_local_converter.rs b/implementations/rust/ockam/ockam_api/src/multiaddr_resolver/reverse_local_converter.rs new file mode 100644 index 00000000000..3b3127c03e0 --- /dev/null +++ b/implementations/rust/ockam/ockam_api/src/multiaddr_resolver/reverse_local_converter.rs @@ -0,0 +1,31 @@ +use ockam_core::{Address, Result, Route, LOCAL}; +use ockam_multiaddr::proto::Service; +use ockam_multiaddr::MultiAddr; + +use crate::error::ApiError; + +pub struct ReverseLocalConverter; + +impl ReverseLocalConverter { + /// Try to convert an Ockam Route into a MultiAddr. + pub fn convert_route(r: &Route) -> Result { + let mut ma = MultiAddr::default(); + for a in r.iter() { + ma.try_extend(&Self::convert_address(a)?)? + } + Ok(ma) + } + + /// Try to convert an Ockam Address to a MultiAddr. + pub fn convert_address(a: &Address) -> Result { + let mut ma = MultiAddr::default(); + match a.transport_type() { + LOCAL => ma.push_back(Service::new(a.address()))?, + other => { + error!(target: "ockam_api", transport = %other, "unsupported transport type"); + return Err(ApiError::core(format!("unknown transport type: {other}"))); + } + } + Ok(ma) + } +} diff --git a/implementations/rust/ockam/ockam_api/src/multiaddr_resolver/transport_route_resolver.rs b/implementations/rust/ockam/ockam_api/src/multiaddr_resolver/transport_route_resolver.rs new file mode 100644 index 00000000000..672694d85af --- /dev/null +++ b/implementations/rust/ockam/ockam_api/src/multiaddr_resolver/transport_route_resolver.rs @@ -0,0 +1,186 @@ +use std::net::{SocketAddrV4, SocketAddrV6}; + +use crate::multiaddr_resolver::{invalid_multiaddr_error, multiple_transport_hops_error}; +use ockam::tcp::TCP; +use ockam::udp::UDP; +use ockam_core::errcode::{Kind, Origin}; +use ockam_core::{Address, Error, Result, Route, TransportType, LOCAL}; +use ockam_multiaddr::proto::{DnsAddr, Ip4, Ip6, Secure, Service, Tcp, Udp, Worker}; +use ockam_multiaddr::{MultiAddr, ProtoIter, ProtoValue, Protocol}; + +#[derive(Default, Debug, Clone)] +pub struct TransportRouteResolver { + allow_tcp: bool, + allow_udp: bool, +} + +impl TransportRouteResolver { + pub fn new(allow_tcp: bool, allow_udp: bool) -> Self { + Self { + allow_tcp, + allow_udp, + } + } + + pub fn allow_tcp(&mut self) -> &mut Self { + self.allow_tcp = true; + self + } + + pub fn allow_udp(&mut self) -> &mut Self { + self.allow_udp = true; + self + } +} + +impl TransportRouteResolver { + /// Resolve all the multiaddresses which represent transport addresses + /// For example /tcp/127.0.0.1/port/4000 is transformed to the Address (TCP, "127.0.0.1:4000") + /// The creation of a TCP worker and the substitution of that transport address to a worker address + /// is done later with `context.resolve_transport_route(route)` + pub fn resolve(&self, ma: &MultiAddr) -> Result { + let mut route = Route::new(); + let mut it = ma.iter(); + + // Only one transport hop is allowed + let mut transport_hop_resolved = false; + + while let Some(p) = it.next() { + match p.code() { + Ip4::CODE => { + if transport_hop_resolved { + return Err(multiple_transport_hops_error(ma)); + } + let ip4 = p.cast::().ok_or_else(|| invalid_multiaddr_error(ma))?; + let (transport_type, port) = self.parse_port_it(ma, &mut it)?; + let socket_addr = SocketAddrV4::new(*ip4, port); + route = route.append(Address::new_with_string( + transport_type, + socket_addr.to_string(), + )); + transport_hop_resolved = true; + } + Ip6::CODE => { + if transport_hop_resolved { + return Err(multiple_transport_hops_error(ma)); + } + let ip6 = p.cast::().ok_or_else(|| invalid_multiaddr_error(ma))?; + let (transport_type, port) = self.parse_port_it(ma, &mut it)?; + let socket_addr = SocketAddrV6::new(*ip6, port, 0, 0); + route = route.append(Address::new_with_string( + transport_type, + socket_addr.to_string(), + )); + transport_hop_resolved = true; + } + DnsAddr::CODE => { + if transport_hop_resolved { + return Err(multiple_transport_hops_error(ma)); + } + let host = p + .cast::() + .ok_or_else(|| invalid_multiaddr_error(ma))?; + let (transport_type, port) = self.parse_port_it(ma, &mut it)?; + let addr = format!("{}:{}", &*host, port); + route = route.append(Address::new_with_string(transport_type, addr)); + transport_hop_resolved = true; + } + Worker::CODE => { + let local = p + .cast::() + .ok_or_else(|| invalid_multiaddr_error(ma))?; + route = route.append(Address::new_with_string(LOCAL, &*local)) + } + Service::CODE => { + let local = p + .cast::() + .ok_or_else(|| invalid_multiaddr_error(ma))?; + route = route.append(Address::new_with_string(LOCAL, &*local)) + } + Secure::CODE => { + let local = p + .cast::() + .ok_or_else(|| invalid_multiaddr_error(ma))?; + route = route.append(Address::new_with_string(LOCAL, &*local)) + } + _ => { + return Err(Error::new( + Origin::Api, + Kind::Misuse, + format!("Unsupported multiaddr protocol: {}", ma), + )); + } + } + } + + Ok(route.into()) + } + /// If the input MultiAddr is "/dnsaddr/localhost/tcp/4000/service/api", + /// then this will return string format of the SocketAddr: "127.0.0.1:4000". + pub fn socket_address(&self, ma: &MultiAddr) -> Result { + let mut it = ma.iter(); + + let first = it.next().ok_or_else(|| invalid_multiaddr_error(ma))?; + let second = it.next().ok_or_else(|| invalid_multiaddr_error(ma))?; + + match first.code() { + Ip4::CODE => { + let ip4 = first + .cast::() + .ok_or_else(|| invalid_multiaddr_error(ma))?; + let (_transport_type, port) = self.parse_port(ma, &second)?; + Ok(SocketAddrV4::new(*ip4, port).to_string()) + } + Ip6::CODE => { + let ip6 = first + .cast::() + .ok_or_else(|| invalid_multiaddr_error(ma))?; + let (_transport_type, port) = self.parse_port(ma, &second)?; + Ok(SocketAddrV6::new(*ip6, port, 0, 0).to_string()) + } + DnsAddr::CODE => { + let host = first + .cast::() + .ok_or_else(|| invalid_multiaddr_error(ma))?; + let (_transport_type, port) = self.parse_port(ma, &second)?; + Ok(format!("{}:{}", &*host, port)) + } + _ => Err(invalid_multiaddr_error(ma)), + } + } + + fn parse_port_it(&self, ma: &MultiAddr, it: &mut ProtoIter) -> Result<(TransportType, u16)> { + let next = it.next().ok_or_else(|| invalid_multiaddr_error(ma))?; + + self.parse_port(ma, &next) + } + + fn parse_port(&self, ma: &MultiAddr, next: &ProtoValue) -> Result<(TransportType, u16)> { + if let Some(port) = next.cast::() { + if !self.allow_tcp { + return Err(Error::new( + Origin::Api, + Kind::Unsupported, + format!("TCP hops are not allowed. Multiaddr={}", ma), + )); + } + + return Ok((TCP, port.0)); + } + + if let Some(port) = next.cast::() { + if !self.allow_udp { + return Err(Error::new( + Origin::Api, + Kind::Unsupported, + format!("UDP hops are not allowed. Multiaddr={}", ma), + )); + } + + return Ok((UDP, port.0)); + } + + // Should not happen + Err(invalid_multiaddr_error(ma)) + } +} diff --git a/implementations/rust/ockam/ockam_api/src/nodes/connection/mod.rs b/implementations/rust/ockam/ockam_api/src/nodes/connection/mod.rs index 3d97e591ea5..e3560216968 100644 --- a/implementations/rust/ockam/ockam_api/src/nodes/connection/mod.rs +++ b/implementations/rust/ockam/ockam_api/src/nodes/connection/mod.rs @@ -1,4 +1,5 @@ mod plain_tcp; +mod plain_udp; mod project; mod secure; @@ -12,10 +13,12 @@ use ockam_multiaddr::{Match, MultiAddr, Protocol}; use ockam_node::Context; use crate::error::ApiError; -use crate::local_multiaddr_to_route; use crate::nodes::service::default_address::DefaultAddress; use crate::nodes::NodeManager; +use crate::LocalMultiaddrResolver; +use ockam::udp::UdpBind; pub(crate) use plain_tcp::PlainTcpInstantiator; +pub(crate) use plain_udp::PlainUdpInstantiator; pub(crate) use project::ProjectInstantiator; pub(crate) use secure::SecureChannelInstantiator; use std::fmt::{Debug, Formatter}; @@ -36,6 +39,8 @@ pub struct Connection { pub(crate) secure_channel_encryptors: Vec
, /// A TCP worker address if used when instantiating the connection pub(crate) tcp_connection: Option, + /// A UDP worker address if used when instantiating the connection + pub(crate) udp_bind: Option, /// If a flow control was created flow_control_id: Option, } @@ -62,7 +67,7 @@ impl Connection { } pub fn route(&self) -> Result { - local_multiaddr_to_route(&self.normalized_addr).map_err(|_| { + LocalMultiaddrResolver::resolve(&self.normalized_addr).map_err(|_| { ApiError::core(format!( "Couldn't convert MultiAddr to route: normalized_addr={}", self.normalized_addr @@ -105,6 +110,30 @@ impl Connection { } } + if let Some(udp_bind) = self.udp_bind.as_ref() { + let address = udp_bind.sender_address().clone(); + if let Err(error) = node_manager + .udp_transport + .as_ref() + .ok_or_else(|| { + ockam_core::Error::new(Origin::Node, Kind::Internal, "UDP transport is missing") + })? + .unbind(address.clone()) + .await + { + match error.code().kind { + Kind::NotFound => { + debug!("cannot find and disconnect udp worker `{udp_bind}`"); + } + _ => Err(ockam_core::Error::new( + Origin::Node, + Kind::Internal, + format!("Failed to remove inlet with alias {address}. {}", error), + ))?, + } + } + } + Ok(()) } } @@ -134,6 +163,7 @@ pub(crate) struct ConnectionBuilder { pub(crate) flow_control_id: Option, pub(crate) secure_channel_encryptors: Vec
, pub(crate) tcp_connection: Option, + pub(crate) udp_bind: Option, } impl Debug for ConnectionBuilder { @@ -163,6 +193,8 @@ pub struct Changes { pub secure_channel_encryptors: Vec
, /// Optional, to keep track of tcp worker when created for the connection pub tcp_connection: Option, + /// Optional, to keep track of tcp worker when created for the connection + pub udp_bind: Option, } /// Takes in a [`MultiAddr`] and instantiate it, can be implemented for any protocol. @@ -197,6 +229,7 @@ impl ConnectionBuilder { secure_channel_encryptors: vec![], flow_control_id: None, tcp_connection: None, + udp_bind: None, } } @@ -207,6 +240,7 @@ impl ConnectionBuilder { original_addr: self.original_multiaddr, secure_channel_encryptors: self.secure_channel_encryptors, tcp_connection: self.tcp_connection, + udp_bind: self.udp_bind, flow_control_id: self.flow_control_id, } } @@ -255,12 +289,23 @@ impl ConnectionBuilder { return Err(ockam_core::Error::new( Origin::Transport, Kind::Unsupported, - "multiple tcp connections created in a `MultiAddr`", + "multiple transport connections created in a `MultiAddr`", )); } self.tcp_connection = changes.tcp_connection; } + if changes.udp_bind.is_some() { + if self.udp_bind.is_some() { + return Err(ockam_core::Error::new( + Origin::Transport, + Kind::Unsupported, + "multiple transport connections created in a `MultiAddr`", + )); + } + self.udp_bind = changes.udp_bind; + } + if changes.flow_control_id.is_some() { self.flow_control_id = changes.flow_control_id; } @@ -273,14 +318,7 @@ impl ConnectionBuilder { .recalculate_transport_route(ctx, self.current_multiaddr.clone(), true) .await?; - Ok(Self { - original_multiaddr: self.original_multiaddr, - transport_route: self.transport_route, - secure_channel_encryptors: self.secure_channel_encryptors, - current_multiaddr: self.current_multiaddr, - flow_control_id: self.flow_control_id, - tcp_connection: self.tcp_connection, - }) + Ok(self) } /// Calculate a 'transport route' from the [`MultiAddr`] diff --git a/implementations/rust/ockam/ockam_api/src/nodes/connection/plain_tcp.rs b/implementations/rust/ockam/ockam_api/src/nodes/connection/plain_tcp.rs index faaa3eb4199..079e5286947 100644 --- a/implementations/rust/ockam/ockam_api/src/nodes/connection/plain_tcp.rs +++ b/implementations/rust/ockam/ockam_api/src/nodes/connection/plain_tcp.rs @@ -1,6 +1,6 @@ use crate::error::ApiError; use crate::nodes::connection::{Changes, ConnectionBuilder, Instantiator}; -use crate::{multiaddr_to_route, route_to_multiaddr}; +use crate::{RemoteMultiaddrResolver, RemoteMultiaddrResolverConnection, ReverseLocalConverter}; use crate::nodes::NodeManager; use ockam_core::{async_trait, Error, Route}; @@ -36,35 +36,35 @@ impl Instantiator for PlainTcpInstantiator { ) -> Result { let (before, tcp_piece, after) = extracted; - let mut tcp = multiaddr_to_route(&tcp_piece, &node_manager.tcp_transport) - .await - .ok_or_else(|| { - ApiError::core(format!( - "Couldn't convert MultiAddr to route: tcp_piece={tcp_piece}" - )) - })?; + let mut tcp = RemoteMultiaddrResolver::default() + .with_tcp(node_manager.tcp_transport.clone()) + .resolve(&tcp_piece) + .await?; - let multiaddr = route_to_multiaddr(&tcp.route).ok_or_else(|| { - ApiError::core(format!( - "Couldn't convert route to MultiAddr: tcp_route={}", - &tcp.route - )) - })?; + let multiaddr = ReverseLocalConverter::convert_route(&tcp.route)?; let current_multiaddr = ConnectionBuilder::combine(before, multiaddr, after)?; // since we only pass the piece regarding tcp // tcp_connection should exist let tcp_connection = tcp - .tcp_connection + .connection .take() .ok_or_else(|| ApiError::core("TCP connection should be set"))?; + let tcp_connection = match tcp_connection { + RemoteMultiaddrResolverConnection::Tcp(tcp_connection) => tcp_connection, + RemoteMultiaddrResolverConnection::Udp(_) => { + return Err(ApiError::core("TCP connection should be set")); + } + }; + Ok(Changes { current_multiaddr, flow_control_id: tcp.flow_control_id, secure_channel_encryptors: vec![], tcp_connection: Some(tcp_connection), + udp_bind: None, }) } } diff --git a/implementations/rust/ockam/ockam_api/src/nodes/connection/plain_udp.rs b/implementations/rust/ockam/ockam_api/src/nodes/connection/plain_udp.rs new file mode 100644 index 00000000000..3741f4f2f4f --- /dev/null +++ b/implementations/rust/ockam/ockam_api/src/nodes/connection/plain_udp.rs @@ -0,0 +1,69 @@ +use crate::error::ApiError; +use crate::nodes::connection::{Changes, ConnectionBuilder, Instantiator}; +use crate::{RemoteMultiaddrResolver, RemoteMultiaddrResolverConnection, ReverseLocalConverter}; + +use crate::nodes::NodeManager; +use ockam_core::{async_trait, Error, Route}; +use ockam_multiaddr::proto::{DnsAddr, Ip4, Ip6, Udp}; +use ockam_multiaddr::{Match, MultiAddr, Protocol}; +use ockam_node::Context; + +/// Creates the tcp connection. +pub(crate) struct PlainUdpInstantiator {} + +impl PlainUdpInstantiator { + pub(crate) fn new() -> Self { + Self {} + } +} + +#[async_trait] +impl Instantiator for PlainUdpInstantiator { + fn matches(&self) -> Vec { + vec![ + // matches any tcp address followed by a tcp protocol + Match::any([DnsAddr::CODE, Ip4::CODE, Ip6::CODE]), + Udp::CODE.into(), + ] + } + + async fn instantiate( + &self, + _ctx: &Context, + node_manager: &NodeManager, + _transport_route: Route, + extracted: (MultiAddr, MultiAddr, MultiAddr), + ) -> Result { + let (before, udp_piece, after) = extracted; + + let mut udp = RemoteMultiaddrResolver::new(None, node_manager.udp_transport.clone()) + .resolve(&udp_piece) + .await?; + + let multiaddr = ReverseLocalConverter::convert_route(&udp.route)?; + + let current_multiaddr = ConnectionBuilder::combine(before, multiaddr, after)?; + + // since we only pass the piece regarding udp + // udp_bind should exist + let udp_bind = udp + .connection + .take() + .ok_or_else(|| ApiError::core("UDP connection should be set"))?; + + let udp_bind = match udp_bind { + RemoteMultiaddrResolverConnection::Tcp(_) => { + return Err(ApiError::core("UDP connection should be set")); + } + RemoteMultiaddrResolverConnection::Udp(udp_bind) => udp_bind, + }; + + Ok(Changes { + current_multiaddr, + flow_control_id: udp.flow_control_id, + secure_channel_encryptors: vec![], + tcp_connection: None, + udp_bind: Some(udp_bind), + }) + } +} diff --git a/implementations/rust/ockam/ockam_api/src/nodes/connection/project.rs b/implementations/rust/ockam/ockam_api/src/nodes/connection/project.rs index 154d6cba092..375bea40dbc 100644 --- a/implementations/rust/ockam/ockam_api/src/nodes/connection/project.rs +++ b/implementations/rust/ockam/ockam_api/src/nodes/connection/project.rs @@ -1,7 +1,7 @@ use crate::error::ApiError; use crate::nodes::connection::{Changes, Instantiator}; use crate::nodes::NodeManager; -use crate::{multiaddr_to_route, try_address_to_multiaddr}; +use crate::{RemoteMultiaddrResolver, RemoteMultiaddrResolverConnection, ReverseLocalConverter}; use ockam_core::{async_trait, Error, Route}; use ockam_multiaddr::proto::Project; @@ -54,19 +54,24 @@ impl Instantiator for ProjectInstantiator { node_manager.resolve_project(&project).await?; debug!(addr = %project_multiaddr, "creating secure channel"); - let tcp = multiaddr_to_route(&project_multiaddr, &node_manager.tcp_transport) - .await - .ok_or_else(|| { - ApiError::core(format!( - "Couldn't convert MultiAddr to route: project_multiaddr={project_multiaddr}" - )) - })?; + let transport_res = RemoteMultiaddrResolver::new( + Some(node_manager.tcp_transport.clone()), + None, // We can't connect to the project node via UDP atm + ) + .resolve(&project_multiaddr) + .await + .map_err(|err| { + ApiError::core(format!( + "Couldn't instantiate project multiaddr. Err: {}", + err + )) + })?; debug!("create a secure channel to the project {project_identifier}"); let sc = node_manager .create_secure_channel_internal( ctx, - tcp.route, + transport_res.route, &self.identifier.clone(), Some(vec![project_identifier]), None, @@ -77,14 +82,25 @@ impl Instantiator for ProjectInstantiator { // when creating a secure channel we want the route to pass through that // ignoring previous steps, since they will be implicit - let mut current_multiaddr = try_address_to_multiaddr(sc.encryptor_address()).unwrap(); + let mut current_multiaddr = ReverseLocalConverter::convert_address(sc.encryptor_address())?; current_multiaddr.try_extend(after.iter())?; + let tcp_connection = transport_res + .connection + .map(|connection| match connection { + RemoteMultiaddrResolverConnection::Tcp(tcp_connection) => Ok(tcp_connection), + RemoteMultiaddrResolverConnection::Udp(_) => Err(ApiError::core( + "UDP connection can't be used to Project node", + )), + }) + .transpose()?; + Ok(Changes { flow_control_id: Some(sc.flow_control_id().clone()), current_multiaddr, secure_channel_encryptors: vec![sc.encryptor_address().clone()], - tcp_connection: tcp.tcp_connection, + tcp_connection, + udp_bind: None, }) } } diff --git a/implementations/rust/ockam/ockam_api/src/nodes/connection/secure.rs b/implementations/rust/ockam/ockam_api/src/nodes/connection/secure.rs index 7903c177764..f4e0bfa9072 100644 --- a/implementations/rust/ockam/ockam_api/src/nodes/connection/secure.rs +++ b/implementations/rust/ockam/ockam_api/src/nodes/connection/secure.rs @@ -2,7 +2,7 @@ use std::time::Duration; use crate::nodes::connection::{Changes, Instantiator}; use crate::nodes::NodeManager; -use crate::{local_multiaddr_to_route, try_address_to_multiaddr}; +use crate::{LocalMultiaddrResolver, ReverseLocalConverter}; use crate::nodes::service::SecureChannelType; use ockam::identity::Identifier; @@ -47,7 +47,7 @@ impl Instantiator for SecureChannelInstantiator { ) -> Result { let (_before, secure_piece, after) = extracted; debug!(%secure_piece, %transport_route, "creating secure channel"); - let route = local_multiaddr_to_route(&secure_piece)?; + let route = LocalMultiaddrResolver::resolve(&secure_piece)?; let sc_ctx = ctx.async_try_clone().await?; let sc = node_manager @@ -66,7 +66,7 @@ impl Instantiator for SecureChannelInstantiator { // when creating a secure channel we want the route to pass through that // ignoring previous steps, since they will be implicit - let mut current_multiaddr = try_address_to_multiaddr(sc.encryptor_address()).unwrap(); + let mut current_multiaddr = ReverseLocalConverter::convert_address(sc.encryptor_address())?; current_multiaddr.try_extend(after.iter())?; Ok(Changes { @@ -74,6 +74,7 @@ impl Instantiator for SecureChannelInstantiator { flow_control_id: Some(sc.flow_control_id().clone()), secure_channel_encryptors: vec![sc.encryptor_address().clone()], tcp_connection: None, + udp_bind: None, }) } } diff --git a/implementations/rust/ockam/ockam_api/src/nodes/models/portal.rs b/implementations/rust/ockam/ockam_api/src/nodes/models/portal.rs index 3ecf882fccf..71d2f8d7880 100644 --- a/implementations/rust/ockam/ockam_api/src/nodes/models/portal.rs +++ b/implementations/rust/ockam/ockam_api/src/nodes/models/portal.rs @@ -18,7 +18,7 @@ use crate::error::ApiError; use crate::output::Output; use crate::session::connection_status::ConnectionStatus; use crate::terminal::fmt; -use crate::{route_to_multiaddr, try_address_to_multiaddr}; +use crate::ReverseLocalConverter; /// Request body to create an inlet #[derive(Clone, Debug, Encode, Decode, CborLen)] @@ -249,7 +249,7 @@ impl Display for InletStatus { .outlet_route .as_ref() .and_then(Route::parse) - .and_then(|r| route_to_multiaddr(&r)) + .and_then(|r| ReverseLocalConverter::convert_route(&r).ok()) { writeln!( f, @@ -310,8 +310,7 @@ impl OutletStatus { } pub fn worker_route(&self) -> Result { - try_address_to_multiaddr(&self.worker_addr) - .map_err(|_| ApiError::core("Invalid Worker Address")) + ReverseLocalConverter::convert_address(&self.worker_addr) } pub fn worker_name(&self) -> Result { diff --git a/implementations/rust/ockam/ockam_api/src/nodes/models/relay.rs b/implementations/rust/ockam/ockam_api/src/nodes/models/relay.rs index 4fe608826d6..8dfc7c90eb6 100644 --- a/implementations/rust/ockam/ockam_api/src/nodes/models/relay.rs +++ b/implementations/rust/ockam/ockam_api/src/nodes/models/relay.rs @@ -8,11 +8,10 @@ use ockam_core::flow_control::FlowControlId; use ockam_multiaddr::MultiAddr; use crate::colors::color_primary; -use crate::error::ApiError; use crate::output::Output; use crate::session::replacer::ReplacerOutputKind; use crate::session::session::Session; -use crate::{route_to_multiaddr, ConnectionStatus}; +use crate::{ConnectionStatus, ReverseLocalConverter}; #[derive(Debug, Clone, Encode, Decode, CborLen)] #[rustfmt::skip] @@ -177,9 +176,7 @@ impl RelayInfo { pub fn remote_address_ma(&self) -> Result, ockam_core::Error> { if let Some(addr) = &self.remote_address { - route_to_multiaddr(&route![addr.to_string()]) - .ok_or_else(|| ApiError::core("Invalid Remote Address")) - .map(Some) + ReverseLocalConverter::convert_route(&route![addr.to_string()]).map(Some) } else { Ok(None) } @@ -187,9 +184,7 @@ impl RelayInfo { pub fn worker_address_ma(&self) -> Result, ockam_core::Error> { if let Some(addr) = &self.worker_address { - route_to_multiaddr(&route![addr.to_string()]) - .ok_or_else(|| ApiError::core("Invalid Worker Address")) - .map(Some) + ReverseLocalConverter::convert_route(&route![addr.to_string()]).map(Some) } else { Ok(None) } diff --git a/implementations/rust/ockam/ockam_api/src/nodes/models/secure_channel.rs b/implementations/rust/ockam/ockam_api/src/nodes/models/secure_channel.rs index 2cc197318b5..7dc2ea6067c 100644 --- a/implementations/rust/ockam/ockam_api/src/nodes/models/secure_channel.rs +++ b/implementations/rust/ockam/ockam_api/src/nodes/models/secure_channel.rs @@ -12,11 +12,9 @@ use ockam_core::{route, Address, Result}; use ockam_multiaddr::MultiAddr; use crate::colors::color_primary; -use crate::error::ApiError; use crate::nodes::registry::SecureChannelInfo; use crate::output::Output; -use crate::{route_to_multiaddr, try_route_to_multiaddr}; - +use crate::ReverseLocalConverter; //Requests /// Request body when instructing a node to create a Secure Channel @@ -136,14 +134,14 @@ impl CreateSecureChannelResponse { } pub fn multiaddr(&self) -> Result { - route_to_multiaddr(&route![self.addr.to_string()]) - .ok_or_else(|| ApiError::core(format!("Invalid route: {}", self.addr))) + ReverseLocalConverter::convert_route(&route![self.addr.to_string()]) } } impl Output for CreateSecureChannelResponse { fn item(&self) -> crate::Result { - let addr = try_route_to_multiaddr(&route![self.addr.to_string()])?.to_string(); + let addr = + ReverseLocalConverter::convert_route(&route![self.addr.to_string()])?.to_string(); Ok(addr) } } @@ -189,7 +187,7 @@ impl Output for SecureChannelListener { fn item(&self) -> crate::Result { let addr = { let channel_route = route![self.address().clone()]; - let channel_multiaddr = try_route_to_multiaddr(&channel_route)?; + let channel_multiaddr = ReverseLocalConverter::convert_route(&channel_route)?; channel_multiaddr.to_string() }; Ok(format!("Listener at {}", color_primary(addr))) @@ -248,7 +246,7 @@ impl Output for ShowSecureChannelResponse { format!( "\n Secure Channel:\n{} {}\n{} {}\n{} {}", " • At: ".light_magenta(), - try_route_to_multiaddr(&route![addr.to_string()])? + ReverseLocalConverter::convert_route(&route![addr.to_string()])? .to_string() .light_yellow(), " • To: ".light_magenta(), diff --git a/implementations/rust/ockam/ockam_api/src/nodes/service/flow_controls.rs b/implementations/rust/ockam/ockam_api/src/nodes/service/flow_controls.rs index 247e73f4c48..c934375ac60 100644 --- a/implementations/rust/ockam/ockam_api/src/nodes/service/flow_controls.rs +++ b/implementations/rust/ockam/ockam_api/src/nodes/service/flow_controls.rs @@ -6,11 +6,10 @@ use ockam_multiaddr::MultiAddr; use ockam_node::Context; use std::fmt::Display; -use crate::local_multiaddr_to_route; +use super::NodeManagerWorker; use crate::nodes::models::flow_controls::AddConsumer; use crate::nodes::NodeManager; - -use super::NodeManagerWorker; +use crate::LocalMultiaddrResolver; impl NodeManagerWorker { pub(super) async fn add_consumer( @@ -40,7 +39,7 @@ impl NodeManager { consumer: &MultiAddr, flow_control_id: &FlowControlId, ) -> Result> { - let mut route = local_multiaddr_to_route(consumer)?; + let mut route = LocalMultiaddrResolver::resolve(consumer)?; let address = match route.step().ok() { Some(a) => a, diff --git a/implementations/rust/ockam/ockam_api/src/nodes/service/in_memory_node.rs b/implementations/rust/ockam/ockam_api/src/nodes/service/in_memory_node.rs index a626603f779..1848d7603bf 100644 --- a/implementations/rust/ockam/ockam_api/src/nodes/service/in_memory_node.rs +++ b/implementations/rust/ockam/ockam_api/src/nodes/service/in_memory_node.rs @@ -167,7 +167,7 @@ impl InMemoryNode { status_endpoint_port, false, ), - NodeManagerTransportOptions::new(tcp_listener.flow_control_id().clone(), tcp, None), + NodeManagerTransportOptions::new_tcp(tcp_listener.flow_control_id().clone(), tcp), trust_options, ) .await diff --git a/implementations/rust/ockam/ockam_api/src/nodes/service/manager.rs b/implementations/rust/ockam/ockam_api/src/nodes/service/manager.rs index fbe0d18bbe4..b7e7b4ebef2 100644 --- a/implementations/rust/ockam/ockam_api/src/nodes/service/manager.rs +++ b/implementations/rust/ockam/ockam_api/src/nodes/service/manager.rs @@ -1,7 +1,7 @@ use crate::cloud::project::Project; use crate::cloud::{AuthorityNodeClient, ControllerClient, CredentialsEnabled, ProjectNodeClient}; use crate::nodes::connection::{ - Connection, ConnectionBuilder, PlainTcpInstantiator, ProjectInstantiator, + Connection, ConnectionBuilder, PlainTcpInstantiator, PlainUdpInstantiator, ProjectInstantiator, SecureChannelInstantiator, }; use crate::nodes::models::portal::OutletStatus; @@ -51,7 +51,7 @@ pub struct NodeManager { pub(crate) cli_state: CliState, pub(super) node_name: String, pub(super) node_identifier: Identifier, - pub(crate) api_transport_flow_control_id: FlowControlId, + pub(crate) api_transport_flow_control_ids: Vec, pub(crate) tcp_transport: TcpTransport, pub(crate) udp_transport: Option, pub(crate) secure_channels: Arc, @@ -100,7 +100,7 @@ impl NodeManager { NodeManagerCredentialRetrieverOptions::Remote { info, scope } => { Some(Arc::new(RemoteCredentialRetrieverCreator::new( ctx.async_try_clone().await?, - Arc::new(transport_options.tcp_transport.clone()), + Arc::new(transport_options.tcp.transport.clone()), secure_channels.clone(), info.clone(), scope, @@ -125,7 +125,7 @@ impl NodeManager { NodeManagerCredentialRetrieverOptions::Remote { info, scope } => { Some(Arc::new(RemoteCredentialRetrieverCreator::new( ctx.async_try_clone().await?, - Arc::new(transport_options.tcp_transport.clone()), + Arc::new(transport_options.tcp.transport.clone()), secure_channels.clone(), info.clone(), scope, @@ -142,13 +142,19 @@ impl NodeManager { _account_admin: None, }; + let mut api_transport_flow_control_ids = vec![transport_options.tcp.flow_control_id]; + + if let Some(udp) = &transport_options.udp { + api_transport_flow_control_ids.push(udp.flow_control_id.clone()); + } + let mut s = Self { cli_state, node_name, node_identifier, - api_transport_flow_control_id: transport_options.api_transport_flow_control_id, - tcp_transport: transport_options.tcp_transport, - udp_transport: transport_options.udp_transport.clone(), + api_transport_flow_control_ids, + tcp_transport: transport_options.tcp.transport, + udp_transport: transport_options.udp.map(|u| u.transport), secure_channels, api_sc_listener: None, credential_retriever_creators, @@ -168,7 +174,7 @@ impl NodeManager { .map_err(|e| ApiError::core(e.to_string()))?; } - if let Some(udp_transport) = transport_options.udp_transport.as_ref() { + if let Some(udp) = &s.udp_transport { let rendezvous_route = route![ DefaultAddress::get_rendezvous_server_address(), DefaultAddress::RENDEZVOUS_SERVICE @@ -180,7 +186,7 @@ impl NodeManager { UdpPunctureNegotiationListener::create( ctx, DefaultAddress::UDP_PUNCTURE_NEGOTIATION_LISTENER, - udp_transport, + udp, rendezvous_route, options, ) @@ -205,11 +211,13 @@ impl NodeManager { async fn initialize_default_services( &self, ctx: &Context, - api_flow_control_id: &FlowControlId, + api_flow_control_ids: &[FlowControlId], ) -> ockam_core::Result { // Start services - ctx.flow_controls() - .add_consumer(DefaultAddress::UPPERCASE_SERVICE, api_flow_control_id); + for api_flow_control_id in api_flow_control_ids { + ctx.flow_controls() + .add_consumer(DefaultAddress::UPPERCASE_SERVICE, api_flow_control_id); + } self.start_uppercase_service_impl(ctx, DefaultAddress::UPPERCASE_SERVICE.into()) .await?; @@ -223,12 +231,16 @@ impl NodeManager { ) .await?; - let options = RelayServiceOptions::new() + let mut options = RelayServiceOptions::new() .alias(DefaultAddress::STATIC_RELAY_SERVICE) - .service_as_consumer(api_flow_control_id) - .relay_as_consumer(api_flow_control_id) .prefix("forward_to_"); + for api_flow_control_id in api_flow_control_ids { + options = options + .service_as_consumer(api_flow_control_id) + .relay_as_consumer(api_flow_control_id); + } + let options = if let Some(authority) = &self.project_authority { let policy_access_control = self .policy_access_control( @@ -264,19 +276,19 @@ impl NodeManager { ctx: &Context, start_default_services: bool, ) -> ockam_core::Result<()> { - let api_flow_control_id = self.api_transport_flow_control_id.clone(); - if start_default_services { self.api_sc_listener = Some( - self.initialize_default_services(ctx, &api_flow_control_id) + self.initialize_default_services(ctx, &self.api_transport_flow_control_ids) .await?, ); } // Always start the echoer service as ockam_api::Session assumes it will be // started unconditionally on every node. It's used for liveliness checks. - ctx.flow_controls() - .add_consumer(DefaultAddress::ECHO_SERVICE, &api_flow_control_id); + for api_flow_control_id in &self.api_transport_flow_control_ids { + ctx.flow_controls() + .add_consumer(DefaultAddress::ECHO_SERVICE, api_flow_control_id); + } self.start_echoer_service(ctx, DefaultAddress::ECHO_SERVICE.into()) .await?; @@ -316,6 +328,8 @@ impl NodeManager { .await? .instantiate(ctx, self, PlainTcpInstantiator::new()) .await? + .instantiate(ctx, self, PlainUdpInstantiator::new()) + .await? .instantiate( ctx, self, @@ -657,23 +671,39 @@ pub struct ApiTransport { pub flow_control_id: FlowControlId, } +#[derive(Debug)] +pub struct NodeManagerTransport { + flow_control_id: FlowControlId, + transport: T, +} + +impl NodeManagerTransport { + pub fn new(flow_control_id: FlowControlId, transport: T) -> Self { + Self { + flow_control_id, + transport, + } + } +} + #[derive(Debug)] pub struct NodeManagerTransportOptions { - api_transport_flow_control_id: FlowControlId, - tcp_transport: TcpTransport, - udp_transport: Option, + tcp: NodeManagerTransport, + udp: Option>, } impl NodeManagerTransportOptions { pub fn new( - api_transport_flow_control_id: FlowControlId, - tcp_transport: TcpTransport, - udp_transport: Option, + tcp: NodeManagerTransport, + udp: Option>, ) -> Self { + Self { tcp, udp } + } + + pub fn new_tcp(flow_control_id: FlowControlId, transport: TcpTransport) -> Self { Self { - api_transport_flow_control_id, - tcp_transport, - udp_transport, + tcp: NodeManagerTransport::new(flow_control_id, transport), + udp: None, } } } diff --git a/implementations/rust/ockam/ockam_api/src/nodes/service/node_services.rs b/implementations/rust/ockam/ockam_api/src/nodes/service/node_services.rs index 4c1d6fc81f1..6aa5895d041 100644 --- a/implementations/rust/ockam/ockam_api/src/nodes/service/node_services.rs +++ b/implementations/rust/ockam/ockam_api/src/nodes/service/node_services.rs @@ -240,8 +240,10 @@ impl NodeManager { ))); } - ctx.flow_controls() - .add_consumer(addr.clone(), &self.api_transport_flow_control_id); + for api_transport_flow_control_id in &self.api_transport_flow_control_ids { + ctx.flow_controls() + .add_consumer(addr.clone(), api_transport_flow_control_id); + } ctx.start_worker(addr.clone(), Hop).await?; diff --git a/implementations/rust/ockam/ockam_api/src/nodes/service/secure_channel.rs b/implementations/rust/ockam/ockam_api/src/nodes/service/secure_channel.rs index 916fc710ce6..0854c58b16a 100644 --- a/implementations/rust/ockam/ockam_api/src/nodes/service/secure_channel.rs +++ b/implementations/rust/ockam/ockam_api/src/nodes/service/secure_channel.rs @@ -352,8 +352,11 @@ impl NodeManager { let vault = self.cli_state.make_vault(named_vault).await?; let secure_channels = self.build_secure_channels(vault).await?; - let options = - SecureChannelListenerOptions::new().as_consumer(&self.api_transport_flow_control_id); + let mut options = SecureChannelListenerOptions::new(); + + for api_flow_control_id in &self.api_transport_flow_control_ids { + options = options.as_consumer(api_flow_control_id); + } let options = match authorized_identifiers { Some(ids) => options.with_trust_policy(TrustMultiIdentifiersPolicy::new(ids)), diff --git a/implementations/rust/ockam/ockam_api/src/nodes/service/tcp_outlets.rs b/implementations/rust/ockam/ockam_api/src/nodes/service/tcp_outlets.rs index f3f56d44ed1..822ae25a424 100644 --- a/implementations/rust/ockam/ockam_api/src/nodes/service/tcp_outlets.rs +++ b/implementations/rust/ockam/ockam_api/src/nodes/service/tcp_outlets.rs @@ -139,14 +139,14 @@ impl NodeManager { }; let options = { - let options = TcpOutletOptions::new() + let mut options = TcpOutletOptions::new() .with_incoming_access_control(incoming_ac) .with_outgoing_access_control(outgoing_ac) .with_tls(tls); - let options = if self.project_authority().is_none() { - options.as_consumer(&self.api_transport_flow_control_id) - } else { - options + if self.project_authority().is_none() { + for api_transport_flow_control_id in &self.api_transport_flow_control_ids { + options = options.as_consumer(api_transport_flow_control_id) + } }; if reachable_from_default_secure_channel { // Accept messages from the default secure channel listener @@ -154,13 +154,11 @@ impl NodeManager { .flow_controls() .get_flow_control_with_spawner(&DefaultAddress::SECURE_CHANNEL_LISTENER.into()) { - options.as_consumer(&flow_control_id) - } else { - options + options = options.as_consumer(&flow_control_id) } - } else { - options } + + options }; let res = if privileged { diff --git a/implementations/rust/ockam/ockam_api/src/test_utils/mod.rs b/implementations/rust/ockam/ockam_api/src/test_utils/mod.rs index eb866a05fe0..b4c5b6f0c79 100644 --- a/implementations/rust/ockam/ockam_api/src/test_utils/mod.rs +++ b/implementations/rust/ockam/ockam_api/src/test_utils/mod.rs @@ -96,10 +96,9 @@ pub async fn start_manager_for_tests( let node_manager = InMemoryNode::new( context, NodeManagerGeneralOptions::new(cli_state.clone(), node_name, true, None, false), - NodeManagerTransportOptions::new( + NodeManagerTransportOptions::new_tcp( tcp_listener.flow_control_id().clone(), tcp.async_try_clone().await?, - None, ), trust_options.unwrap_or_else(|| { NodeManagerTrustOptions::new( diff --git a/implementations/rust/ockam/ockam_api/src/util.rs b/implementations/rust/ockam/ockam_api/src/util.rs index 40957cec719..6441dbbdaea 100644 --- a/implementations/rust/ockam/ockam_api/src/util.rs +++ b/implementations/rust/ockam/ockam_api/src/util.rs @@ -1,11 +1,6 @@ -use std::net::{SocketAddrV4, SocketAddrV6}; - use miette::miette; -use ockam::tcp::{TcpConnection, TcpConnectionOptions, TcpTransport, TCP}; -use ockam_core::errcode::{Kind, Origin}; -use ockam_core::flow_control::FlowControlId; -use ockam_core::{Address, Error, Result, Route, TransportType, LOCAL}; +use ockam_core::Result; use ockam_multiaddr::proto::{ DnsAddr, Ip4, Ip6, Node, Project, Secure, Service, Space, Tcp, Worker, }; @@ -13,314 +8,6 @@ use ockam_multiaddr::{Code, MultiAddr, Protocol}; use crate::error::ApiError; -/// Try to convert a multi-address to an Ockam route. -pub fn local_multiaddr_to_route(ma: &MultiAddr) -> Result { - let mut rb = Route::new(); - for p in ma.iter() { - match p.code() { - // Only hops that are directly translated to existing workers are allowed here - Worker::CODE => { - let local = p.cast::().ok_or_else(|| { - Error::new( - Origin::Api, - Kind::Invalid, - format!("incorrect worker address {ma})",), - ) - })?; - rb = rb.append(Address::new_with_string(LOCAL, &*local)) - } - Service::CODE => { - let local = p.cast::().ok_or_else(|| { - Error::new( - Origin::Api, - Kind::Invalid, - format!("incorrect service address {ma})",), - ) - })?; - rb = rb.append(Address::new_with_string(LOCAL, &*local)) - } - Secure::CODE => { - let local = p.cast::().ok_or_else(|| { - Error::new( - Origin::Api, - Kind::Invalid, - format!("incorrect secure address {ma})",), - ) - })?; - rb = rb.append(Address::new_with_string(LOCAL, &*local)) - } - - Node::CODE => { - return Err(Error::new( - Origin::Api, - Kind::Invalid, - "unexpected code: node. clean_multiaddr should have been called", - )); - } - - code @ (Ip4::CODE | Ip6::CODE | DnsAddr::CODE) => { - return Err(Error::new( - Origin::Api, - Kind::Invalid, - format!("unexpected code: {code}. The address must be a local address {ma}"), - )); - } - - other => { - error!(target: "ockam_api", code = %other, "unsupported protocol"); - return Err(Error::new( - Origin::Api, - Kind::Invalid, - format!("unsupported protocol {other}"), - )); - } - } - } - - Ok(rb.into()) -} - -pub struct MultiAddrToRouteResult { - pub flow_control_id: Option, - pub route: Route, - pub tcp_connection: Option, -} - -pub async fn multiaddr_to_route( - ma: &MultiAddr, - tcp: &TcpTransport, -) -> Option { - let mut rb = Route::new(); - let mut it = ma.iter().peekable(); - - let mut flow_control_id = None; - let mut number_of_tcp_hops = 0; - let mut tcp_connection = None; - - while let Some(p) = it.next() { - match p.code() { - Ip4::CODE => { - if number_of_tcp_hops >= 1 { - return None; // Only 1 TCP hop is allowed - } - - let ip4 = p.cast::()?; - let port = it.next()?.cast::()?; - let socket_addr = SocketAddrV4::new(*ip4, *port); - - let options = TcpConnectionOptions::new(); - flow_control_id = Some(options.flow_control_id().clone()); - - let connection = match tcp.connect(socket_addr.to_string(), options).await { - Ok(c) => c, - Err(error) => { - error!(%error, %socket_addr, "Couldn't connect to Ip4 address"); - return None; - } - }; - - number_of_tcp_hops += 1; - rb = rb.append(connection.sender_address().clone()); - - tcp_connection = Some(connection); - } - Ip6::CODE => { - if number_of_tcp_hops >= 1 { - return None; // Only 1 TCP hop is allowed - } - - let ip6 = p.cast::()?; - let port = it.next()?.cast::()?; - let socket_addr = SocketAddrV6::new(*ip6, *port, 0, 0); - - let options = TcpConnectionOptions::new(); - flow_control_id = Some(options.flow_control_id().clone()); - - let connection = match tcp.connect(socket_addr.to_string(), options).await { - Ok(c) => c, - Err(error) => { - error!(%error, %socket_addr, "Couldn't connect to Ip6 address"); - return None; - } - }; - - number_of_tcp_hops += 1; - rb = rb.append(connection.sender_address().clone()); - - tcp_connection = Some(connection); - } - DnsAddr::CODE => { - if number_of_tcp_hops >= 1 { - return None; // Only 1 TCP hop is allowed - } - - let host = p.cast::()?; - if let Some(p) = it.peek() { - if p.code() == Tcp::CODE { - let port = p.cast::()?; - - let options = TcpConnectionOptions::new(); - flow_control_id = Some(options.flow_control_id().clone()); - let peer = format!("{}:{}", &*host, *port); - - let connection = match tcp.connect(&peer, options).await { - Ok(c) => c, - Err(error) => { - error!(%error, %peer, "Couldn't connect to DNS address"); - return None; - } - }; - - number_of_tcp_hops += 1; - rb = rb.append(connection.sender_address().clone()); - - tcp_connection = Some(connection); - - let _ = it.next(); - - continue; - } - } - } - Worker::CODE => { - let local = p.cast::()?; - rb = rb.append(Address::new_with_string(LOCAL, &*local)) - } - Service::CODE => { - let local = p.cast::()?; - rb = rb.append(Address::new_with_string(LOCAL, &*local)) - } - Secure::CODE => { - let local = p.cast::()?; - rb = rb.append(Address::new_with_string(LOCAL, &*local)) - } - other => { - error!(target: "ockam_api", code = %other, "unsupported protocol"); - return None; - } - } - } - - Some(MultiAddrToRouteResult { - flow_control_id, - tcp_connection, - route: rb.into(), - }) -} - -/// Resolve all the multiaddresses which represent transport addresses -/// For example /tcp/127.0.0.1/port/4000 is transformed to the Address (TCP, "127.0.0.1:4000") -/// The creation of a TCP worker and the substitution of that transport address to a worker address -/// is done later with `context.resolve_transport_route(route)` -pub fn multiaddr_to_transport_route(ma: &MultiAddr) -> Option { - let mut route = Route::new(); - let mut it = ma.iter().peekable(); - - while let Some(p) = it.next() { - match p.code() { - Ip4::CODE => { - let ip4 = p.cast::()?; - let port = it.next()?.cast::()?; - let socket_addr = SocketAddrV4::new(*ip4, *port); - route = route.append(Address::new_with_string(TCP, socket_addr.to_string())) - } - Ip6::CODE => { - let ip6 = p.cast::()?; - let port = it.next()?.cast::()?; - let socket_addr = SocketAddrV6::new(*ip6, *port, 0, 0); - route = route.append(Address::new_with_string( - TransportType::new(1), - socket_addr.to_string(), - )) - } - DnsAddr::CODE => { - let host = p.cast::()?; - if let Some(p) = it.peek() { - if p.code() == Tcp::CODE { - let port = p.cast::()?; - let addr = format!("{}:{}", &*host, *port); - route = route.append(Address::new_with_string(TransportType::new(1), addr)); - let _ = it.next(); - continue; - } - } - } - Worker::CODE => { - let local = p.cast::()?; - route = route.append(Address::new_with_string(LOCAL, &*local)) - } - Service::CODE => { - let local = p.cast::()?; - route = route.append(Address::new_with_string(LOCAL, &*local)) - } - Secure::CODE => { - let local = p.cast::()?; - route = route.append(Address::new_with_string(LOCAL, &*local)) - } - other => { - error!(target: "ockam_api", code = %other, "unsupported protocol"); - return None; - } - } - } - Some(route.into()) -} - -/// Try to convert a multiaddr to an Ockam Address -pub fn multiaddr_to_addr(ma: &MultiAddr) -> Option
{ - let mut it = ma.iter().peekable(); - let p = it.next()?; - match p.code() { - Worker::CODE => { - let local = p.cast::()?; - Some(Address::new_with_string(LOCAL, &*local)) - } - Service::CODE => { - let local = p.cast::()?; - Some(Address::new_with_string(LOCAL, &*local)) - } - _ => None, - } -} - -pub fn try_multiaddr_to_addr(ma: &MultiAddr) -> Result { - multiaddr_to_addr(ma) - .ok_or_else(|| ApiError::core(format!("could not convert {ma} to address"))) -} - -/// Convert an Ockam Route into a MultiAddr. -pub fn route_to_multiaddr(r: &Route) -> Option { - try_route_to_multiaddr(r).ok() -} - -/// Try to convert an Ockam Route into a MultiAddr. -pub fn try_route_to_multiaddr(r: &Route) -> Result { - let mut ma = MultiAddr::default(); - for a in r.iter() { - ma.try_extend(&try_address_to_multiaddr(a)?)? - } - Ok(ma) -} - -/// Try to convert an Ockam Address to a MultiAddr. -pub fn try_address_to_multiaddr(a: &Address) -> Result { - let mut ma = MultiAddr::default(); - match a.transport_type() { - LOCAL => ma.push_back(Service::new(a.address()))?, - other => { - error!(target: "ockam_api", transport = %other, "unsupported transport type"); - return Err(ApiError::core(format!("unknown transport type: {other}"))); - } - } - Ok(ma) -} - -/// Try to convert an Ockam Address into a MultiAddr. -pub fn addr_to_multiaddr>(a: T) -> Option { - let r: Route = Route::from(a); - route_to_multiaddr(&r) -} - /// Tells whether the input MultiAddr references a local node or a remote node. /// /// This should be called before cleaning the MultiAddr. diff --git a/implementations/rust/ockam/ockam_app_lib/src/state/mod.rs b/implementations/rust/ockam/ockam_app_lib/src/state/mod.rs index 90c7246e21f..1b3dfa3223b 100644 --- a/implementations/rust/ockam/ockam_app_lib/src/state/mod.rs +++ b/implementations/rust/ockam/ockam_app_lib/src/state/mod.rs @@ -725,7 +725,7 @@ pub(crate) async fn make_node_manager( None, true, ), - NodeManagerTransportOptions::new(listener.flow_control_id().clone(), tcp, None), + NodeManagerTransportOptions::new_tcp(listener.flow_control_id().clone(), tcp), trust_options, ) .await diff --git a/implementations/rust/ockam/ockam_command/src/node/create.rs b/implementations/rust/ockam/ockam_command/src/node/create.rs index 0a3d8ba1eea..381a7b90636 100644 --- a/implementations/rust/ockam/ockam_command/src/node/create.rs +++ b/implementations/rust/ockam/ockam_command/src/node/create.rs @@ -71,6 +71,18 @@ pub struct CreateCommand { )] pub tcp_listener_address: String, + /// The address to bind the UDP listener to. UDP listener is not started unless --udp is passed. + /// Once the node is created, its services can be accessed via this address. + /// By default, it binds to 127.0.0.1:0 to assign a random free port. + #[arg( + display_order = 900, + long, + short, + id = "SOCKET_ADDRESS_UDP", + default_value = "127.0.0.1:0" + )] + pub udp_listener_address: String, + /// [DEPRECATED] Enable the HTTP server for the node that will listen to in a random free port. /// To specify a port, use `--status-endpoint-port` instead. #[arg( @@ -139,6 +151,7 @@ impl Default for CreateCommand { started_from_configuration: false, }, tcp_listener_address: node_manager_defaults.tcp_listener_address, + udp_listener_address: node_manager_defaults.udp_listener_address, http_server: false, no_status_endpoint: false, status_endpoint_port: None, diff --git a/implementations/rust/ockam/ockam_command/src/node/create/config.rs b/implementations/rust/ockam/ockam_command/src/node/create/config.rs index da6e5c2f61d..123b536e898 100644 --- a/implementations/rust/ockam/ockam_command/src/node/create/config.rs +++ b/implementations/rust/ockam/ockam_command/src/node/create/config.rs @@ -189,6 +189,9 @@ impl NodeConfig { if cmd.tcp_listener_address != default_cmd_args.tcp_listener_address { self.node.tcp_listener_address = Some(cmd.tcp_listener_address.clone().into()); } + if cmd.udp_listener_address != default_cmd_args.udp_listener_address { + self.node.udp_listener_address = Some(cmd.udp_listener_address.clone().into()); + } if cmd.no_status_endpoint != default_cmd_args.no_status_endpoint { self.node.no_status_endpoint = Some(cmd.no_status_endpoint.into()); } diff --git a/implementations/rust/ockam/ockam_command/src/node/create/foreground.rs b/implementations/rust/ockam/ockam_command/src/node/create/foreground.rs index a02b1919caa..54c7abcb061 100644 --- a/implementations/rust/ockam/ockam_command/src/node/create/foreground.rs +++ b/implementations/rust/ockam/ockam_command/src/node/create/foreground.rs @@ -10,10 +10,11 @@ use crate::secure_channel::listener::create as secure_channel_listener; use crate::util::foreground_args::wait_for_exit_signal; use crate::CommandGlobalOpts; use ockam::tcp::{TcpListenerOptions, TcpTransport}; -use ockam::udp::UdpTransport; +use ockam::udp::{UdpBindArguments, UdpBindOptions, UdpTransport}; use ockam::{Address, Context}; use ockam_api::colors::color_primary; use ockam_api::fmt_log; +use ockam_api::nodes::service::NodeManagerTransport; use ockam_api::nodes::{ service::{NodeManagerGeneralOptions, NodeManagerTransportOptions}, NodeManagerWorker, NODEMANAGER_ADDR, @@ -80,8 +81,17 @@ impl CreateCommand { .await?; debug!("node info persisted {node_info:?}"); - let udp_transport = if self.udp { - Some(UdpTransport::create(ctx).await.into_diagnostic()?) + let udp_options = if self.udp { + let udp = UdpTransport::create(ctx).await.into_diagnostic()?; + let options = UdpBindOptions::new(); + let flow_control_id = options.flow_control_id(); + udp.bind( + UdpBindArguments::new().with_bind_address(&self.udp_listener_address)?, + options, + ) + .await?; + + Some(NodeManagerTransport::new(flow_control_id, udp)) } else { None }; @@ -96,9 +106,8 @@ impl CreateCommand { true, ), NodeManagerTransportOptions::new( - tcp_listener.flow_control_id().clone(), - tcp, - udp_transport, + NodeManagerTransport::new(tcp_listener.flow_control_id().clone(), tcp), + udp_options, ), trust_options, ) diff --git a/implementations/rust/ockam/ockam_command/src/node/util.rs b/implementations/rust/ockam/ockam_command/src/node/util.rs index d3de45dee6e..e1637a384ab 100644 --- a/implementations/rust/ockam/ockam_command/src/node/util.rs +++ b/implementations/rust/ockam/ockam_command/src/node/util.rs @@ -17,6 +17,7 @@ use crate::{Command as CommandTrait, CommandGlobalOpts}; pub struct NodeManagerDefaults { pub node_name: String, pub tcp_listener_address: String, + pub udp_listener_address: String, pub trust_opts: TrustOpts, } @@ -25,6 +26,7 @@ impl Default for NodeManagerDefaults { Self { node_name: hex::encode(random::<[u8; 4]>()), tcp_listener_address: "127.0.0.1:0".to_string(), + udp_listener_address: "127.0.0.1:0".to_string(), trust_opts: TrustOpts::default(), } } @@ -57,7 +59,8 @@ pub async fn spawn_node(opts: &CommandGlobalOpts, cmd: CreateCommand) -> miette: skip_is_running_check, name, identity: identity_name, - tcp_listener_address: address, + tcp_listener_address, + udp_listener_address, no_status_endpoint, status_endpoint_port, udp, @@ -81,7 +84,9 @@ pub async fn spawn_node(opts: &CommandGlobalOpts, cmd: CreateCommand) -> miette: "node".to_string(), "create".to_string(), "--tcp-listener-address".to_string(), - address.to_string(), + tcp_listener_address.to_string(), + "--udp-listener-address".to_string(), + udp_listener_address.to_string(), "--foreground".to_string(), "--child-process".to_string(), ]; diff --git a/implementations/rust/ockam/ockam_command/src/project/util.rs b/implementations/rust/ockam/ockam_command/src/project/util.rs index 1894cc6125e..44561f4430b 100644 --- a/implementations/rust/ockam/ockam_command/src/project/util.rs +++ b/implementations/rust/ockam/ockam_command/src/project/util.rs @@ -11,10 +11,9 @@ use tracing::debug; use ockam_api::cloud::project::{Project, ProjectsOrchestratorApi}; use ockam_api::cloud::{CredentialsEnabled, ORCHESTRATOR_AWAIT_TIMEOUT}; use ockam_api::config::lookup::LookupMeta; -use ockam_api::error::ApiError; use ockam_api::nodes::service::relay::SecureChannelsCreation; use ockam_api::nodes::InMemoryNode; -use ockam_api::route_to_multiaddr; +use ockam_api::ReverseLocalConverter; use ockam_core::route; use ockam_multiaddr::{MultiAddr, Protocol}; use ockam_node::Context; @@ -87,8 +86,7 @@ pub async fn get_projects_secure_channels_from_config_lookup( timeout, ) .await?; - let address = route_to_multiaddr(&route![secure_channel.to_string()]) - .ok_or_else(|| ApiError::core(format!("Invalid route: {}", secure_channel)))?; + let address = ReverseLocalConverter::convert_route(&route![secure_channel.to_string()])?; debug!("secure channel created at {address}"); sc.push(address); } diff --git a/implementations/rust/ockam/ockam_command/src/run/parser/resource/node.rs b/implementations/rust/ockam/ockam_command/src/run/parser/resource/node.rs index 50775226e1f..0440579d850 100644 --- a/implementations/rust/ockam/ockam_command/src/run/parser/resource/node.rs +++ b/implementations/rust/ockam/ockam_command/src/run/parser/resource/node.rs @@ -36,6 +36,8 @@ pub struct Node { #[serde(alias = "opentelemetry-context")] pub opentelemetry_context: Option, pub udp: Option, + #[serde(alias = "udp-listener-address")] + pub udp_listener_address: Option, } impl Resource for Node { @@ -62,6 +64,9 @@ impl Resource for Node { if let Some(tcp_listener_address) = self.tcp_listener_address { args.insert("tcp-listener-address".into(), tcp_listener_address); } + if let Some(udp_listener_address) = self.udp_listener_address { + args.insert("udp-listener-address".into(), udp_listener_address); + } if let Some(no_status_endpoint) = self.no_status_endpoint { args.insert("no-status-endpoint".into(), no_status_endpoint); } diff --git a/implementations/rust/ockam/ockam_command/src/secure_channel/create.rs b/implementations/rust/ockam/ockam_command/src/secure_channel/create.rs index a90259c7ba7..9987a6582c1 100644 --- a/implementations/rust/ockam/ockam_command/src/secure_channel/create.rs +++ b/implementations/rust/ockam/ockam_command/src/secure_channel/create.rs @@ -14,7 +14,7 @@ use ockam_api::nodes::models::secure_channel::{ CreateSecureChannelRequest, CreateSecureChannelResponse, }; use ockam_api::nodes::BackgroundNodeClient; -use ockam_api::{fmt_log, fmt_ok, route_to_multiaddr}; +use ockam_api::{fmt_log, fmt_ok, ReverseLocalConverter}; use ockam_core::api::Request; use ockam_multiaddr::MultiAddr; @@ -145,7 +145,7 @@ impl CreateCommand { let (secure_channel, _) = try_join!(create_secure_channel, progress_output)?; let route = &route![secure_channel.to_string()]; - let multi_addr = route_to_multiaddr(route).ok_or_else(|| { + let multi_addr = ReverseLocalConverter::convert_route(route).map_err(|_| { crate::Error::new( exitcode::PROTOCOL, miette!("Failed to convert route {route} to multi-address"), diff --git a/implementations/rust/ockam/ockam_command/src/secure_channel/delete.rs b/implementations/rust/ockam/ockam_command/src/secure_channel/delete.rs index cdc9b3f677b..ce7519a2c5b 100644 --- a/implementations/rust/ockam/ockam_command/src/secure_channel/delete.rs +++ b/implementations/rust/ockam/ockam_command/src/secure_channel/delete.rs @@ -6,8 +6,9 @@ use serde_json::json; use ockam::{route, Context}; use ockam_api::address::extract_address_value; +use ockam_api::nodes::models::secure_channel::DeleteSecureChannelResponse; use ockam_api::nodes::BackgroundNodeClient; -use ockam_api::{nodes::models::secure_channel::DeleteSecureChannelResponse, route_to_multiaddr}; +use ockam_api::ReverseLocalConverter; use ockam_core::{Address, AddressParseError}; use crate::util::async_cmd; @@ -59,8 +60,8 @@ impl DeleteCommand { match response.channel { Some(address) => { let route = &route![address]; - match route_to_multiaddr(route) { - Some(multiaddr) => { + match ReverseLocalConverter::convert_route(route) { + Ok(multiaddr) => { // if stdout is not interactive/tty write the secure channel address to it // in case some other program is trying to read it as piped input if !options.terminal.is_tty() { @@ -96,7 +97,7 @@ impl DeleteCommand { } } } - None => { + Err(_err) => { // if stderr is interactive/tty and we haven't been asked to be quiet // and output format is plain then write a plain info to stderr. if options.terminal.is_tty() diff --git a/implementations/rust/ockam/ockam_command/src/secure_channel/list.rs b/implementations/rust/ockam/ockam_command/src/secure_channel/list.rs index c1096e68cee..36aaa7249b8 100644 --- a/implementations/rust/ockam/ockam_command/src/secure_channel/list.rs +++ b/implementations/rust/ockam/ockam_command/src/secure_channel/list.rs @@ -10,12 +10,12 @@ use ockam::Context; use ockam_api::colors::OckamColor; use ockam_api::nodes::models::secure_channel::ShowSecureChannelResponse; use ockam_api::nodes::BackgroundNodeClient; -use ockam_api::route_to_multiaddr; use ockam_core::{route, Address, Result}; use crate::util::async_cmd; use crate::{docs, util::api, CommandGlobalOpts}; use ockam_api::output::Output; +use ockam_api::ReverseLocalConverter; const LONG_ABOUT: &str = include_str!("./static/list/long_about.txt"); const PREVIEW_TAG: &str = include_str!("../static/preview_tag.txt"); @@ -55,9 +55,8 @@ impl ListCommand { let from = node_name.to_string(); let at = { let channel_route = &route![channel_address]; - let channel_multiaddr = route_to_multiaddr(channel_route).ok_or(miette!( - "Failed to convert route {channel_route} to multi-address" - ))?; + let channel_multiaddr = ReverseLocalConverter::convert_route(channel_route) + .map_err(|_| miette!("Failed to convert route {channel_route} to multi-address"))?; channel_multiaddr.to_string() }; @@ -69,8 +68,8 @@ impl ListCommand { .split(" => ") .map(|p| { let r = route![p]; - route_to_multiaddr(&r) - .ok_or(miette!("Failed to convert route {r} to multi-address")) + ReverseLocalConverter::convert_route(&r) + .map_err(|_| miette!("Failed to convert route {r} to multi-address")) }) .collect::, _>>()? .iter() diff --git a/implementations/rust/ockam/ockam_multiaddr/src/codec.rs b/implementations/rust/ockam/ockam_multiaddr/src/codec.rs index 38c639e19e7..36b46d8358c 100644 --- a/implementations/rust/ockam/ockam_multiaddr/src/codec.rs +++ b/implementations/rust/ockam/ockam_multiaddr/src/codec.rs @@ -1,5 +1,5 @@ use super::{Buffer, Checked, Code, Codec, Protocol}; -use crate::proto::{DnsAddr, Node, Project, Secure, Service, Space, Tcp, Worker}; +use crate::proto::{DnsAddr, Node, Project, Secure, Service, Space, Tcp, Udp, Worker}; use crate::{Error, ProtoValue}; use core::fmt; use unsigned_varint::decode; @@ -49,6 +49,13 @@ impl Codec for StdCodec { let (x, y) = input.split_at(2); Ok((Checked(x), y)) } + Udp::CODE => { + if input.len() < 2 { + return Err(Error::required_bytes(Udp::CODE, 2)); + } + let (x, y) = input.split_at(2); + Ok((Checked(x), y)) + } c @ Worker::CODE | c @ DnsAddr::CODE | c @ Service::CODE @@ -75,6 +82,7 @@ impl Codec for StdCodec { #[cfg(feature = "std")] crate::proto::Ip6::CODE => crate::proto::Ip6::read_bytes(input).is_ok(), Tcp::CODE => Tcp::read_bytes(input).is_ok(), + Udp::CODE => Udp::read_bytes(input).is_ok(), DnsAddr::CODE => DnsAddr::read_bytes(input).is_ok(), Service::CODE => Service::read_bytes(input).is_ok(), Node::CODE => Node::read_bytes(input).is_ok(), @@ -93,6 +101,7 @@ impl Codec for StdCodec { #[cfg(feature = "std")] crate::proto::Ip6::CODE => crate::proto::Ip6::read_bytes(val.data())?.write_bytes(buf), Tcp::CODE => Tcp::read_bytes(val.data())?.write_bytes(buf), + Udp::CODE => Udp::read_bytes(val.data())?.write_bytes(buf), DnsAddr::CODE => DnsAddr::read_bytes(val.data())?.write_bytes(buf), Service::CODE => Service::read_bytes(val.data())?.write_bytes(buf), Node::CODE => Node::read_bytes(val.data())?.write_bytes(buf), @@ -129,6 +138,10 @@ impl Codec for StdCodec { Tcp::read_str(value)?.write_bytes(buf); Ok(()) } + Udp::PREFIX => { + Udp::read_str(value)?.write_bytes(buf); + Ok(()) + } DnsAddr::PREFIX => { DnsAddr::read_str(value)?.write_bytes(buf); Ok(()) @@ -182,6 +195,10 @@ impl Codec for StdCodec { Tcp::read_bytes(value)?.write_str(f)?; Ok(()) } + Udp::CODE => { + Udp::read_bytes(value)?.write_str(f)?; + Ok(()) + } DnsAddr::CODE => { DnsAddr::read_bytes(value)?.write_str(f)?; Ok(()) diff --git a/implementations/rust/ockam/ockam_multiaddr/src/lib.rs b/implementations/rust/ockam/ockam_multiaddr/src/lib.rs index cf9cc396231..f6834b7a7f2 100644 --- a/implementations/rust/ockam/ockam_multiaddr/src/lib.rs +++ b/implementations/rust/ockam/ockam_multiaddr/src/lib.rs @@ -24,10 +24,8 @@ use core::hash::{Hash, Hasher}; use core::ops::Deref; use core::str::FromStr; use once_cell::race::OnceBox; -use std::net::{SocketAddrV4, SocketAddrV6}; use tinyvec::{Array, ArrayVec, TinyVec}; -use crate::proto::{DnsAddr, Ip4, Ip6, Tcp}; pub use error::Error; use ockam_core::env::FromString; pub use registry::{Registry, RegistryBuilder}; @@ -592,39 +590,6 @@ impl MultiAddr { Ok(addr) } - - /// If the input MultiAddr is "/dnsaddr/localhost/tcp/4000/service/api", - /// then this will return string format of the SocketAddr: "127.0.0.1:4000". - pub fn to_socket_addr(&self) -> Result { - let mut it = self.iter().peekable(); - while let Some(p) = it.next() { - match p.code() { - Ip4::CODE => { - let ip4 = p.cast::().unwrap(); - let port = it.next().unwrap().cast::().unwrap(); - return Ok(SocketAddrV4::new(*ip4, *port).to_string()); - } - Ip6::CODE => { - let ip6 = p.cast::().unwrap(); - let port = it.next().unwrap().cast::().unwrap(); - return Ok(SocketAddrV6::new(*ip6, *port, 0, 0).to_string()); - } - DnsAddr::CODE => { - let host = p.cast::().unwrap(); - if let Some(p) = it.peek() { - if p.code() == Tcp::CODE { - let port = p.cast::().unwrap(); - return Ok(format!("{}:{}", &*host, *port)); - } - } - } - other => { - return Err(Error::invalid_proto(other)); - } - } - } - Err(Error::message("No socket address found")) - } } #[derive(Debug, Clone, PartialEq, Eq)] diff --git a/implementations/rust/ockam/ockam_multiaddr/src/proto.rs b/implementations/rust/ockam/ockam_multiaddr/src/proto.rs index 98eba3397ee..714b82d53e5 100644 --- a/implementations/rust/ockam/ockam_multiaddr/src/proto.rs +++ b/implementations/rust/ockam/ockam_multiaddr/src/proto.rs @@ -153,6 +153,51 @@ impl Protocol<'_> for Tcp { } } +/// A Udp port number. +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub struct Udp(pub u16); + +impl Udp { + pub fn new(v: u16) -> Self { + Udp(v) + } +} + +impl Deref for Udp { + type Target = u16; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl Protocol<'_> for Udp { + const CODE: Code = Code::new(273); + const PREFIX: &'static str = "udp"; + + fn read_str(input: Checked<&str>) -> Result { + u16::from_str(&input).map(Udp).map_err(Error::message) + } + + fn read_bytes(input: Checked<&[u8]>) -> Result { + let mut b = [0; 2]; + b.copy_from_slice(&input); + Ok(Udp(u16::from_be_bytes(b))) + } + + fn write_str(&self, f: &mut fmt::Formatter) -> Result<(), Error> { + write!(f, "/{}/{}", Self::PREFIX, self.0)?; + Ok(()) + } + + fn write_bytes(&self, buf: &mut dyn Buffer) { + let mut b = encode::u32_buffer(); + let uvi = encode::u32(Self::CODE.into(), &mut b); + buf.extend_with(uvi); + buf.extend_with(&self.0.to_be_bytes()) + } +} + macro_rules! gen_str_proto { ($t:ident, $c:literal, $p:literal) => { #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] diff --git a/implementations/rust/ockam/ockam_multiaddr/src/registry.rs b/implementations/rust/ockam/ockam_multiaddr/src/registry.rs index 4c55c5728aa..776d145800f 100644 --- a/implementations/rust/ockam/ockam_multiaddr/src/registry.rs +++ b/implementations/rust/ockam/ockam_multiaddr/src/registry.rs @@ -1,6 +1,6 @@ use super::{Code, Codec, Protocol}; use crate::codec::StdCodec; -use crate::proto::{DnsAddr, Node, Project, Secure, Service, Space, Tcp, Worker}; +use crate::proto::{DnsAddr, Node, Project, Secure, Service, Space, Tcp, Udp, Worker}; use alloc::collections::btree_map::BTreeMap; use alloc::sync::Arc; use core::fmt; @@ -27,6 +27,7 @@ impl Default for Registry { let mut r = RegistryBuilder::new(); r.register(Worker::CODE, Worker::PREFIX, std_codec.clone()); r.register(Tcp::CODE, Tcp::PREFIX, std_codec.clone()); + r.register(Udp::CODE, Udp::PREFIX, std_codec.clone()); r.register(DnsAddr::CODE, DnsAddr::PREFIX, std_codec.clone()); #[allow(clippy::redundant_clone)] r.register(Service::CODE, Service::PREFIX, std_codec.clone()); diff --git a/implementations/rust/ockam/ockam_transport_tcp/src/privileged_portal/workers/remote_worker.rs b/implementations/rust/ockam/ockam_transport_tcp/src/privileged_portal/workers/remote_worker.rs index 48ee5ad70d1..c6a8dfb0120 100644 --- a/implementations/rust/ockam/ockam_transport_tcp/src/privileged_portal/workers/remote_worker.rs +++ b/implementations/rust/ockam/ockam_transport_tcp/src/privileged_portal/workers/remote_worker.rs @@ -33,7 +33,7 @@ pub struct RemoteWorker { mode: PortalMode, tcp_packet_writer: Box, - ebpf_support: TcpTransportEbpfSupport, + ebpf_support: Arc, } impl RemoteWorker { @@ -41,7 +41,7 @@ impl RemoteWorker { pub fn new_inlet( tcp_packet_writer: Box, inlet: Inlet, - ebpf_support: TcpTransportEbpfSupport, + ebpf_support: Arc, ) -> Self { Self { mode: PortalMode::Inlet { inlet }, @@ -54,7 +54,7 @@ impl RemoteWorker { pub fn new_outlet( tcp_packet_writer: Box, outlet: Outlet, - ebpf_support: TcpTransportEbpfSupport, + ebpf_support: Arc, ) -> Self { Self { mode: PortalMode::Outlet { outlet }, diff --git a/implementations/rust/ockam/ockam_transport_tcp/src/transport/mod.rs b/implementations/rust/ockam/ockam_transport_tcp/src/transport/mod.rs index d87485c8fd7..9db988c2393 100644 --- a/implementations/rust/ockam/ockam_transport_tcp/src/transport/mod.rs +++ b/implementations/rust/ockam/ockam_transport_tcp/src/transport/mod.rs @@ -60,7 +60,7 @@ pub struct TcpTransport { registry: TcpRegistry, #[cfg(privileged_portals_support)] - pub(crate) ebpf_support: crate::privileged_portal::TcpTransportEbpfSupport, + pub(crate) ebpf_support: Arc, } impl TcpTransport { diff --git a/implementations/rust/ockam/ockam_transport_udp/src/workers/sender.rs b/implementations/rust/ockam/ockam_transport_udp/src/workers/sender.rs index ad696e2e5ee..2a8b9dfbda4 100644 --- a/implementations/rust/ockam/ockam_transport_udp/src/workers/sender.rs +++ b/implementations/rust/ockam/ockam_transport_udp/src/workers/sender.rs @@ -77,6 +77,7 @@ impl Worker for UdpSenderWorker { return Err(TransportError::UnknownRoute)?; } + // Avoid doing that each time resolve_peer(peer_addr.address().to_string()).await? }; diff --git a/tools/stress-test/src/main.rs b/tools/stress-test/src/main.rs index 135c98febf8..202cd2cbbf6 100644 --- a/tools/stress-test/src/main.rs +++ b/tools/stress-test/src/main.rs @@ -190,7 +190,7 @@ impl State { None, false, ), - NodeManagerTransportOptions::new(listener.flow_control_id().clone(), tcp, None), + NodeManagerTransportOptions::new_tcp(listener.flow_control_id().clone(), tcp), trust_options, ) .await?,