diff --git a/examples/rust/file_transfer/examples/receiver.rs b/examples/rust/file_transfer/examples/receiver.rs index f2b96f9bee2..d2089275f90 100644 --- a/examples/rust/file_transfer/examples/receiver.rs +++ b/examples/rust/file_transfer/examples/receiver.rs @@ -2,7 +2,7 @@ use file_transfer::FileData; use ockam::identity::SecureChannelListenerOptions; -use ockam::remote::RemoteForwarderOptions; +use ockam::remote::RemoteRelayOptions; use ockam::{ errcode::{Kind, Origin}, node, Context, Error, Result, Routed, TcpConnectionOptions, Worker, @@ -102,17 +102,15 @@ async fn main(ctx: Context) -> Result<()> { // // To allow Sender and others to initiate an end-to-end secure channel with this program // we connect with 1.node.ockam.network:4000 as a TCP client and ask the forwarding - // service on that node to create a forwarder for us. + // service on that node to create a relay for us. // // All messages that arrive at that forwarding address will be sent to this program // using the TCP connection we created as a client. let node_in_hub = tcp.connect("1.node.ockam.network:4000", tcp_options).await?; - let forwarder = node - .create_forwarder(node_in_hub, RemoteForwarderOptions::new()) - .await?; - println!("\n[✓] RemoteForwarder was created on the node at: 1.node.ockam.network:4000"); + let relay = node.create_relay(node_in_hub, RemoteRelayOptions::new()).await?; + println!("\n[✓] RemoteRelay was created on the node at: 1.node.ockam.network:4000"); println!("Forwarding address for Receiver is:"); - println!("{}", forwarder.remote_address()); + println!("{}", relay.remote_address()); // Start a worker, of type FileReception, at address "receiver". node.start_worker("receiver", FileReception::default()).await?; diff --git a/examples/rust/get_started/examples/04-routing-over-transport-two-hops-middle.rs b/examples/rust/get_started/examples/04-routing-over-transport-two-hops-middle.rs index 477a1defe7d..a2032990764 100644 --- a/examples/rust/get_started/examples/04-routing-over-transport-two-hops-middle.rs +++ b/examples/rust/get_started/examples/04-routing-over-transport-two-hops-middle.rs @@ -1,9 +1,9 @@ // This node creates a tcp connection to a node at 127.0.0.1:4000 -// Starts a forwarder worker to forward messages to 127.0.0.1:4000 +// Starts a relay worker to forward messages to 127.0.0.1:4000 // Starts a tcp listener at 127.0.0.1:3000 // It then runs forever waiting to route messages. -use hello_ockam::Forwarder; +use hello_ockam::Relay; use ockam::{node, Context, Result, TcpConnectionOptions, TcpListenerOptions, TcpTransportExtension}; #[ockam::node] @@ -17,14 +17,14 @@ async fn main(ctx: Context) -> Result<()> { // Create a TCP connection to the responder node. let connection_to_responder = tcp.connect("127.0.0.1:4000", TcpConnectionOptions::new()).await?; - // Create a Forwarder worker - node.start_worker("forward_to_responder", Forwarder(connection_to_responder.into())) + // Create a Relay worker + node.start_worker("forward_to_responder", Relay(connection_to_responder.into())) .await?; // Create a TCP listener and wait for incoming connections. let listener = tcp.listen("127.0.0.1:3000", TcpListenerOptions::new()).await?; - // Allow access to the Forwarder via TCP connections from the TCP listener + // Allow access to the Relay via TCP connections from the TCP listener node.flow_controls() .add_consumer("forward_to_responder", listener.flow_control_id()); diff --git a/examples/rust/get_started/examples/05-secure-channel-over-two-transport-hops-middle.rs b/examples/rust/get_started/examples/05-secure-channel-over-two-transport-hops-middle.rs index 87581f368a0..58375617131 100644 --- a/examples/rust/get_started/examples/05-secure-channel-over-two-transport-hops-middle.rs +++ b/examples/rust/get_started/examples/05-secure-channel-over-two-transport-hops-middle.rs @@ -1,9 +1,9 @@ // This node creates a tcp connection to a node at 127.0.0.1:4000 -// Starts a forwarder worker to forward messages to 127.0.0.1:4000 +// Starts a relay worker to forward messages to 127.0.0.1:4000 // Starts a tcp listener at 127.0.0.1:3000 // It then runs forever waiting to route messages. -use hello_ockam::Forwarder; +use hello_ockam::Relay; use ockam::{node, Context, Result, TcpConnectionOptions, TcpListenerOptions, TcpTransportExtension}; #[ockam::node] @@ -17,8 +17,8 @@ async fn main(ctx: Context) -> Result<()> { // Create a TCP connection to Bob. let connection_to_bob = tcp.connect("127.0.0.1:4000", TcpConnectionOptions::new()).await?; - // Start a Forwarder to forward messages to Bob using the TCP connection. - node.start_worker("forward_to_bob", Forwarder(connection_to_bob.into())) + // Start a Relay to forward messages to Bob using the TCP connection. + node.start_worker("forward_to_bob", Relay(connection_to_bob.into())) .await?; // Create a TCP listener and wait for incoming connections. diff --git a/examples/rust/get_started/examples/11-attribute-based-authentication-control-plane.rs b/examples/rust/get_started/examples/11-attribute-based-authentication-control-plane.rs index 3a9e26dd5f8..cd7935287e7 100644 --- a/examples/rust/get_started/examples/11-attribute-based-authentication-control-plane.rs +++ b/examples/rust/get_started/examples/11-attribute-based-authentication-control-plane.rs @@ -5,7 +5,7 @@ use ockam::identity::{ AuthorityService, RemoteCredentialsRetriever, RemoteCredentialsRetrieverInfo, SecureChannelListenerOptions, SecureChannelOptions, TrustContext, TrustMultiIdentifiersPolicy, }; -use ockam::remote::RemoteForwarderOptions; +use ockam::remote::RemoteRelayOptions; use ockam::{node, route, Context, Result, TcpOutletOptions}; use ockam_api::authenticator::enrollment_tokens::TokenAcceptor; use ockam_api::nodes::NodeManager; @@ -16,14 +16,14 @@ use std::sync::Arc; /// This node supports a "control" server on which several "edge" devices can connect /// -/// The connections go through the Ockam Orchestrator, via a Forwarder, and a secure channel +/// The connections go through the Ockam Orchestrator, via a Relay, and a secure channel /// can be established to forward messages to an outlet going to a local Python webserver. /// /// /// This example shows how to: /// /// - retrieve credentials from an authority -/// - create a Forwarder on the Ockam Orchestrator +/// - create a Relay on the Ockam Orchestrator /// - create a TCP outlet with some access control checking the authenticated attributes of the caller /// /// The node needs to be started with: @@ -118,7 +118,7 @@ async fn start_node(ctx: Context, project_information_path: &str, token: OneTime ) .await?; - // 5. create a forwarder on the Ockam orchestrator + // 5. create a relay on the Ockam orchestrator let tcp_project_route = multiaddr_to_route(&project.route(), &tcp).await.unwrap(); // FIXME: Handle error let project_options = @@ -140,11 +140,11 @@ async fn start_node(ctx: Context, project_information_path: &str, token: OneTime ) .await?; - // finally create a forwarder using the secure channel to the project - let forwarder = node - .create_static_forwarder(secure_channel_address, "control_plane1", RemoteForwarderOptions::new()) + // finally create a relay using the secure channel to the project + let relay = node + .create_static_relay(secure_channel_address, "control_plane1", RemoteRelayOptions::new()) .await?; - println!("forwarder is {forwarder:?}"); + println!("relay is {relay:?}"); // 6. create a secure channel listener which will allow the edge node to // start a secure channel when it is ready diff --git a/examples/rust/get_started/examples/11-attribute-based-authentication-edge-plane.rs b/examples/rust/get_started/examples/11-attribute-based-authentication-edge-plane.rs index 684b18e27c9..c6d8686e8be 100644 --- a/examples/rust/get_started/examples/11-attribute-based-authentication-edge-plane.rs +++ b/examples/rust/get_started/examples/11-attribute-based-authentication-edge-plane.rs @@ -17,7 +17,7 @@ use ockam_transport_tcp::{TcpInletOptions, TcpTransportExtension}; /// This node supports an "edge" server which can connect to a "control" node /// in order to connect its TCP inlet to the "control" node TCP outlet /// -/// The connections go through the Ockam Orchestrator, via the control node Forwarder. +/// The connections go through the Ockam Orchestrator, via the control node Relay. /// /// This example shows how to: /// @@ -131,7 +131,7 @@ async fn start_node(ctx: Context, project_information_path: &str, token: OneTime ) .await?; - // 4.3 then create a secure channel to the control node (via its forwarder) + // 4.3 then create a secure channel to the control node (via its relay) let secure_channel_listener_route = route![secure_channel_address, "forward_to_control_plane1", "untrusted"]; let secure_channel_to_control = node .create_secure_channel( diff --git a/examples/rust/get_started/examples/11-attribute-based-authentication.rs b/examples/rust/get_started/examples/11-attribute-based-authentication.rs index 8b3920ce933..7e0bcc768eb 100644 --- a/examples/rust/get_started/examples/11-attribute-based-authentication.rs +++ b/examples/rust/get_started/examples/11-attribute-based-authentication.rs @@ -45,7 +45,7 @@ /// | +-------------------------------------+ /// | | | /// | | | create secure channel to control -/// | | | via the forwarder +/// | | | via the relay /// v v | /// +--------------+ +-------------------------------+-------+ /// | Authority | | | | @@ -67,10 +67,10 @@ /// - we create initially some secure channels to the Authority in order to retrieve credential /// based on a one-time token generated with `ockam project ticket --attribute component=` /// -/// - then the control node creates a forwarder on the Orchestrator in order to accept TCP traffic without +/// - then the control node creates a relay on the Orchestrator in order to accept TCP traffic without /// having to open a port to the internet. It also starts a channel listener ("untrusted", accept all incoming requests for now) /// -/// - on its side the edge node starts a secure channel via forwarder (named "forward_to_control_plane1"), to the "untrusted" listener +/// - on its side the edge node starts a secure channel via relay (named "forward_to_control_plane1"), to the "untrusted" listener /// with the secure channel address it creates an Inlet which will direct TCP traffic via the secure channel to get to the /// control node and then to the "outlet" worker to reach the Python webserver /// diff --git a/examples/rust/get_started/examples/bob.rs b/examples/rust/get_started/examples/bob.rs index 04d489d9a3e..5651c6fb051 100644 --- a/examples/rust/get_started/examples/bob.rs +++ b/examples/rust/get_started/examples/bob.rs @@ -1,5 +1,5 @@ use ockam::identity::SecureChannelListenerOptions; -use ockam::remote::RemoteForwarderOptions; +use ockam::remote::RemoteRelayOptions; use ockam::{node, Routed, TcpConnectionOptions, Worker}; use ockam::{Context, Result}; use ockam_transport_tcp::TcpTransportExtension; @@ -48,19 +48,17 @@ async fn main(ctx: Context) -> Result<()> { // // To allow Alice and others to initiate an end-to-end secure channel with this program // we connect with 1.node.ockam.network:4000 as a TCP client and ask the forwarding - // service on that node to create a forwarder for us. + // service on that node to create a relay for us. // // All messages that arrive at that forwarding address will be sent to this program // using the TCP connection we created as a client. let node_in_hub = tcp .connect("1.node.ockam.network:4000", TcpConnectionOptions::new()) .await?; - let forwarder = node - .create_forwarder(node_in_hub, RemoteForwarderOptions::new()) - .await?; - println!("\n[✓] RemoteForwarder was created on the node at: 1.node.ockam.network:4000"); + let relay = node.create_relay(node_in_hub, RemoteRelayOptions::new()).await?; + println!("\n[✓] RemoteRelay was created on the node at: 1.node.ockam.network:4000"); println!("Forwarding address for Bob is:"); - println!("{}", forwarder.remote_address()); + println!("{}", relay.remote_address()); // We won't call ctx.stop() here, this program will run until you stop it with Ctrl-C Ok(()) diff --git a/examples/rust/get_started/src/lib.rs b/examples/rust/get_started/src/lib.rs index a1f64511f0f..24fc6573e52 100644 --- a/examples/rust/get_started/src/lib.rs +++ b/examples/rust/get_started/src/lib.rs @@ -2,11 +2,11 @@ mod echoer; pub use echoer::*; -mod forwarder; mod hop; +mod relay; -pub use forwarder::*; pub use hop::*; +pub use relay::*; mod logger; mod project; diff --git a/examples/rust/get_started/src/forwarder.rs b/examples/rust/get_started/src/relay.rs similarity index 95% rename from examples/rust/get_started/src/forwarder.rs rename to examples/rust/get_started/src/relay.rs index a29b23a0a85..4c1f4e637cc 100644 --- a/examples/rust/get_started/src/forwarder.rs +++ b/examples/rust/get_started/src/relay.rs @@ -1,9 +1,9 @@ use ockam::{Address, Any, Context, LocalMessage, Result, Routed, Worker}; -pub struct Forwarder(pub Address); +pub struct Relay(pub Address); #[ockam::worker] -impl Worker for Forwarder { +impl Worker for Relay { type Context = Context; type Message = Any; diff --git a/examples/rust/tcp_inlet_and_outlet/examples/04-inlet.rs b/examples/rust/tcp_inlet_and_outlet/examples/04-inlet.rs index 91510ec8b62..84dcba7ecb2 100644 --- a/examples/rust/tcp_inlet_and_outlet/examples/04-inlet.rs +++ b/examples/rust/tcp_inlet_and_outlet/examples/04-inlet.rs @@ -13,7 +13,7 @@ async fn main(ctx: Context) -> Result<()> { // TCP Transport Outlet. // // For this example, we know that the Outlet node is listening for Ockam Routing Messages - // through a Remote Forwarder at "1.node.ockam.network:4000" and its forwarder address + // through a Remote Relay at "1.node.ockam.network:4000" and its forwarder address // points to secure channel listener. let e = node.create_identity().await?; diff --git a/examples/rust/tcp_inlet_and_outlet/examples/04-outlet.rs b/examples/rust/tcp_inlet_and_outlet/examples/04-outlet.rs index 97fdf246349..bc3a3dd45a2 100644 --- a/examples/rust/tcp_inlet_and_outlet/examples/04-outlet.rs +++ b/examples/rust/tcp_inlet_and_outlet/examples/04-outlet.rs @@ -1,5 +1,5 @@ use ockam::identity::SecureChannelListenerOptions; -use ockam::remote::RemoteForwarderOptions; +use ockam::remote::RemoteRelayOptions; use ockam::{node, Context, Result, TcpConnectionOptions, TcpOutletOptions}; use ockam_transport_tcp::TcpTransportExtension; @@ -45,17 +45,15 @@ async fn main(ctx: Context) -> Result<()> { // To allow Inlet Node and others to initiate an end-to-end secure channel with this program // we connect with 1.node.ockam.network:4000 as a TCP client and ask the forwarding - // service on that node to create a forwarder for us. + // service on that node to create a relay for us. // // All messages that arrive at that forwarding address will be sent to this program // using the TCP connection we created as a client. let node_in_hub = tcp.connect("1.node.ockam.network:4000", tcp_options).await?; - let forwarder = node - .create_forwarder(node_in_hub, RemoteForwarderOptions::new()) - .await?; - println!("\n[✓] RemoteForwarder was created on the node at: 1.node.ockam.network:4000"); + let relay = node.create_relay(node_in_hub, RemoteRelayOptions::new()).await?; + println!("\n[✓] RemoteRelay was created on the node at: 1.node.ockam.network:4000"); println!("Forwarding address in Hub is:"); - println!("{}", forwarder.remote_address()); + println!("{}", relay.remote_address()); // We won't call ctx.stop() here, // so this program will keep running until you interrupt it with Ctrl-C. diff --git a/examples/rust/tcp_inlet_and_outlet/tests/tests.rs b/examples/rust/tcp_inlet_and_outlet/tests/tests.rs index 1a28be82e6e..5141b438988 100644 --- a/examples/rust/tcp_inlet_and_outlet/tests/tests.rs +++ b/examples/rust/tcp_inlet_and_outlet/tests/tests.rs @@ -83,7 +83,7 @@ fn run_04_inlet_outlet_seperate_processes_secure_channel_via_ockam_hub() -> Resu let port = rand::thread_rng().gen_range(10000..65535); // Spawn outlet, wait for it to start up, grab dynamic forwarding address let outlet = CmdBuilder::new("cargo run --example 04-outlet ockam.io:80").spawn()?; - outlet.match_stdout(r"(?i)RemoteForwarder was created on the node")?; + outlet.match_stdout(r"(?i)RemoteRelay was created on the node")?; let fwd_address = outlet.match_stdout(r"(?m)^FWD_(\w+)$")?.swap_remove(0).unwrap(); println!("Forwarding address: {fwd_address}"); diff --git a/implementations/rust/ockam/ockam/src/forwarding_service/mod.rs b/implementations/rust/ockam/ockam/src/forwarding_service/mod.rs deleted file mode 100644 index bac339928dd..00000000000 --- a/implementations/rust/ockam/ockam/src/forwarding_service/mod.rs +++ /dev/null @@ -1,7 +0,0 @@ -mod forwarder; -#[allow(clippy::module_inception)] -mod forwarding_service; -mod options; - -pub use forwarding_service::*; -pub use options::*; diff --git a/implementations/rust/ockam/ockam/src/lib.rs b/implementations/rust/ockam/ockam/src/lib.rs index cdf404fd233..90cb3dfd3e9 100644 --- a/implementations/rust/ockam/ockam/src/lib.rs +++ b/implementations/rust/ockam/ockam/src/lib.rs @@ -67,15 +67,15 @@ pub use ockam_node::{ mod delay; mod error; -mod forwarding_service; mod metadata; mod monotonic; +mod relay_service; mod system; mod unique; pub use error::OckamError; -pub use forwarding_service::{ForwardingService, ForwardingServiceOptions}; pub use metadata::OckamMessage; +pub use relay_service::{RelayService, RelayServiceOptions}; pub use system::{SystemBuilder, SystemHandler, WorkerSystem}; pub use unique::unique_with_prefix; diff --git a/implementations/rust/ockam/ockam/src/node.rs b/implementations/rust/ockam/ockam/src/node.rs index da34878b443..89265ea5ae1 100644 --- a/implementations/rust/ockam/ockam/src/node.rs +++ b/implementations/rust/ockam/ockam/src/node.rs @@ -17,7 +17,7 @@ use ockam_identity::{PurposeKeys, Vault, VaultStorage}; use ockam_node::{Context, HasContext, MessageReceiveOptions, MessageSendReceiveOptions}; use ockam_vault::KeyId; -use crate::remote::{RemoteForwarder, RemoteForwarderInfo, RemoteForwarderOptions}; +use crate::remote::{RemoteRelay, RemoteRelayInfo, RemoteRelayOptions}; use crate::stream::Stream; use crate::OckamError; @@ -81,23 +81,23 @@ impl Node { Stream::new(self.get_context()).await } - /// Create a new forwarder - pub async fn create_forwarder( + /// Create a new relay + pub async fn create_relay( &self, orchestrator_route: impl Into, - options: RemoteForwarderOptions, - ) -> Result { - RemoteForwarder::create(self.get_context(), orchestrator_route, options).await + options: RemoteRelayOptions, + ) -> Result { + RemoteRelay::create(self.get_context(), orchestrator_route, options).await } - /// Create a new static forwarder - pub async fn create_static_forwarder( + /// Create a new static relay + pub async fn create_static_relay( &self, orchestrator_route: impl Into, alias: impl Into, - options: RemoteForwarderOptions, - ) -> Result { - RemoteForwarder::create_static(self.get_context(), orchestrator_route, alias, options).await + options: RemoteRelayOptions, + ) -> Result { + RemoteRelay::create_static(self.get_context(), orchestrator_route, alias, options).await } /// Create an Identity diff --git a/implementations/rust/ockam/ockam/src/relay_service/mod.rs b/implementations/rust/ockam/ockam/src/relay_service/mod.rs new file mode 100644 index 00000000000..34abe0b60f1 --- /dev/null +++ b/implementations/rust/ockam/ockam/src/relay_service/mod.rs @@ -0,0 +1,7 @@ +mod options; +mod relay; +#[allow(clippy::module_inception)] +mod relay_service; + +pub use options::*; +pub use relay_service::*; diff --git a/implementations/rust/ockam/ockam/src/forwarding_service/options.rs b/implementations/rust/ockam/ockam/src/relay_service/options.rs similarity index 62% rename from implementations/rust/ockam/ockam/src/forwarding_service/options.rs rename to implementations/rust/ockam/ockam/src/relay_service/options.rs index ea2decf1fea..d18b601deab 100644 --- a/implementations/rust/ockam/ockam/src/forwarding_service/options.rs +++ b/implementations/rust/ockam/ockam/src/relay_service/options.rs @@ -4,34 +4,34 @@ use ockam_core::flow_control::{FlowControlId, FlowControls}; use ockam_core::{Address, AllowAll, IncomingAccessControl}; /// Trust Options for a Forwarding Service -pub struct ForwardingServiceOptions { +pub struct RelayServiceOptions { pub(super) service_incoming_access_control: Arc, - pub(super) forwarders_incoming_access_control: Arc, + pub(super) relays_incoming_access_control: Arc, pub(super) consumer_service: Vec, - pub(super) consumer_forwarder: Vec, + pub(super) consumer_relay: Vec, } -impl ForwardingServiceOptions { +impl RelayServiceOptions { /// Default constructor without Access Control pub fn new() -> Self { Self { service_incoming_access_control: Arc::new(AllowAll), - forwarders_incoming_access_control: Arc::new(AllowAll), + relays_incoming_access_control: Arc::new(AllowAll), consumer_service: vec![], - consumer_forwarder: vec![], + consumer_relay: vec![], } } - /// Mark that this Forwarding service is a Consumer for to the given [`FlowControlId`] + /// Mark that this Relay service is a Consumer for to the given [`FlowControlId`] pub fn service_as_consumer(mut self, id: &FlowControlId) -> Self { self.consumer_service.push(id.clone()); self } - /// Mark that spawned Forwarders are Consumers for to the given [`FlowControlId`] - pub fn forwarder_as_consumer(mut self, id: &FlowControlId) -> Self { - self.consumer_forwarder.push(id.clone()); + /// Mark that spawned Relays are Consumers for to the given [`FlowControlId`] + pub fn relay_as_consumer(mut self, id: &FlowControlId) -> Self { + self.consumer_relay.push(id.clone()); self } @@ -54,25 +54,25 @@ impl ForwardingServiceOptions { self } - /// Set spawned forwarders Incoming Access Control - pub fn with_forwarders_incoming_access_control_impl( + /// Set spawned relays Incoming Access Control + pub fn with_relays_incoming_access_control_impl( mut self, access_control: impl IncomingAccessControl, ) -> Self { - self.forwarders_incoming_access_control = Arc::new(access_control); + self.relays_incoming_access_control = Arc::new(access_control); self } - /// Set spawned forwarders Incoming Access Control - pub fn with_forwarders_incoming_access_control( + /// Set spawned relays Incoming Access Control + pub fn with_relays_incoming_access_control( mut self, access_control: Arc, ) -> Self { - self.forwarders_incoming_access_control = access_control; + self.relays_incoming_access_control = access_control; self } - pub(super) fn setup_flow_control_for_forwarding_service( + pub(super) fn setup_flow_control_for_relay_service( &self, flow_controls: &FlowControls, address: &Address, @@ -82,18 +82,18 @@ impl ForwardingServiceOptions { } } - pub(super) fn setup_flow_control_for_forwarder( + pub(super) fn setup_flow_control_for_relay( &self, flow_controls: &FlowControls, address: &Address, ) { - for id in &self.consumer_forwarder { + for id in &self.consumer_relay { flow_controls.add_consumer(address.clone(), id); } } } -impl Default for ForwardingServiceOptions { +impl Default for RelayServiceOptions { fn default() -> Self { Self::new() } diff --git a/implementations/rust/ockam/ockam/src/forwarding_service/forwarder.rs b/implementations/rust/ockam/ockam/src/relay_service/relay.rs similarity index 95% rename from implementations/rust/ockam/ockam/src/forwarding_service/forwarder.rs rename to implementations/rust/ockam/ockam/src/relay_service/relay.rs index 883d8032079..ff4e10c885c 100644 --- a/implementations/rust/ockam/ockam/src/forwarding_service/forwarder.rs +++ b/implementations/rust/ockam/ockam/src/relay_service/relay.rs @@ -8,7 +8,7 @@ use ockam_core::{ use ockam_node::WorkerBuilder; use tracing::info; -pub(super) struct Forwarder { +pub(super) struct Relay { forward_route: Route, // this option will be `None` after this worker is initialized, because // while initializing, the worker will send the payload contained in this @@ -16,7 +16,7 @@ pub(super) struct Forwarder { payload: Option>, } -impl Forwarder { +impl Relay { pub(super) async fn create( ctx: &Context, address: Address, @@ -35,12 +35,12 @@ impl Forwarder { Arc::new(AllowOnwardAddress(next_hop)) }; - let forwarder = Self { + let relay = Self { forward_route, payload: Some(registration_payload.clone()), }; - WorkerBuilder::new(forwarder) + WorkerBuilder::new(relay) .with_address(address) .with_incoming_access_control_arc(incoming_access_control) .with_outgoing_access_control_arc(outgoing_access_control) @@ -52,7 +52,7 @@ impl Forwarder { } #[crate::worker] -impl Worker for Forwarder { +impl Worker for Relay { type Context = Context; type Message = Any; diff --git a/implementations/rust/ockam/ockam/src/forwarding_service/forwarding_service.rs b/implementations/rust/ockam/ockam/src/relay_service/relay_service.rs similarity index 72% rename from implementations/rust/ockam/ockam/src/forwarding_service/forwarding_service.rs rename to implementations/rust/ockam/ockam/src/relay_service/relay_service.rs index c701d7d6b01..9fb4ab2ed4c 100644 --- a/implementations/rust/ockam/ockam/src/forwarding_service/forwarding_service.rs +++ b/implementations/rust/ockam/ockam/src/relay_service/relay_service.rs @@ -1,5 +1,5 @@ -use crate::forwarding_service::forwarder::Forwarder; -use crate::{Context, ForwardingServiceOptions}; +use crate::relay_service::relay::Relay; +use crate::{Context, RelayServiceOptions}; use core::str::from_utf8; use ockam_core::compat::boxed::Box; use ockam_core::{Address, Any, DenyAll, Result, Routed, Worker}; @@ -8,23 +8,23 @@ use ockam_node::WorkerBuilder; /// Alias worker to register remote workers under local names. /// /// To talk with this worker, you can use the -/// [`RemoteForwarder`](crate::remote::RemoteForwarder) which is a +/// [`RemoteRelay`](crate::remote::RemoteRelay) which is a /// compatible client for this server. #[non_exhaustive] -pub struct ForwardingService { - options: ForwardingServiceOptions, +pub struct RelayService { + options: RelayServiceOptions, } -impl ForwardingService { +impl RelayService { /// Start a forwarding service pub async fn create( ctx: &Context, address: impl Into
, - options: ForwardingServiceOptions, + options: RelayServiceOptions, ) -> Result<()> { let address = address.into(); - options.setup_flow_control_for_forwarding_service(ctx.flow_controls(), &address); + options.setup_flow_control_for_relay_service(ctx.flow_controls(), &address); let service_incoming_access_control = options.service_incoming_access_control.clone(); @@ -42,7 +42,7 @@ impl ForwardingService { } #[crate::worker] -impl Worker for ForwardingService { +impl Worker for RelayService { type Context = Context; type Message = Any; @@ -54,7 +54,7 @@ impl Worker for ForwardingService { let forward_route = msg.return_route(); let payload = msg.into_transport_message().payload; - let random_address = Address::random_tagged("Forwarder.service"); + let random_address = Address::random_tagged("Relay.service"); // TODO: assume that the first byte is length, ignore it. // We have to improve this actually parse the payload. @@ -67,14 +67,14 @@ impl Worker for ForwardingService { }; self.options - .setup_flow_control_for_forwarder(ctx.flow_controls(), &address); + .setup_flow_control_for_relay(ctx.flow_controls(), &address); - Forwarder::create( + Relay::create( ctx, address, forward_route, payload, - self.options.forwarders_incoming_access_control.clone(), + self.options.relays_incoming_access_control.clone(), ) .await?; diff --git a/implementations/rust/ockam/ockam/src/remote/addresses.rs b/implementations/rust/ockam/ockam/src/remote/addresses.rs index 1054d993024..847e82dc960 100644 --- a/implementations/rust/ockam/ockam/src/remote/addresses.rs +++ b/implementations/rust/ockam/ockam/src/remote/addresses.rs @@ -1,4 +1,4 @@ -use crate::remote::lifecycle::ForwardType; +use crate::remote::lifecycle::RelayType; use ockam_core::Address; #[derive(Clone, Debug)] @@ -14,15 +14,14 @@ pub(super) struct Addresses { } impl Addresses { - pub(super) fn generate(ftype: ForwardType) -> Self { + pub(super) fn generate(ftype: RelayType) -> Self { let type_str = ftype.str(); - let main_remote = - Address::random_tagged(&format!("RemoteForwarder.{}.main_remote", type_str)); + let main_remote = Address::random_tagged(&format!("RemoteRelay.{}.main_remote", type_str)); let main_internal = - Address::random_tagged(&format!("RemoteForwarder.{}.main_internal", type_str)); - let heartbeat = Address::random_tagged(&format!("RemoteForwarder.{}.heartbeat", type_str)); + Address::random_tagged(&format!("RemoteRelay.{}.main_internal", type_str)); + let heartbeat = Address::random_tagged(&format!("RemoteRelay.{}.heartbeat", type_str)); let completion_callback = - Address::random_tagged(&format!("RemoteForwarder.{}.child", type_str)); + Address::random_tagged(&format!("RemoteRelay.{}.child", type_str)); Self { main_remote, diff --git a/implementations/rust/ockam/ockam/src/remote/info.rs b/implementations/rust/ockam/ockam/src/remote/info.rs index e64c3beb457..c39f3e1da09 100644 --- a/implementations/rust/ockam/ockam/src/remote/info.rs +++ b/implementations/rust/ockam/ockam/src/remote/info.rs @@ -6,14 +6,14 @@ use serde::{Deserialize, Serialize}; /// Information about a remotely forwarded worker. #[derive(Serialize, Deserialize, Clone, PartialEq, Eq, Debug, Message)] -pub struct RemoteForwarderInfo { +pub struct RemoteRelayInfo { forwarding_route: Route, remote_address: String, worker_address: Address, flow_control_id: Option, } -impl RemoteForwarderInfo { +impl RemoteRelayInfo { /// Constructor pub fn new( forwarding_route: Route, diff --git a/implementations/rust/ockam/ockam/src/remote/lifecycle.rs b/implementations/rust/ockam/ockam/src/remote/lifecycle.rs index 3a4037a86fa..25d29e62a57 100644 --- a/implementations/rust/ockam/ockam/src/remote/lifecycle.rs +++ b/implementations/rust/ockam/ockam/src/remote/lifecycle.rs @@ -1,4 +1,4 @@ -use crate::remote::{Addresses, RemoteForwarder, RemoteForwarderInfo, RemoteForwarderOptions}; +use crate::remote::{Addresses, RemoteRelay, RemoteRelayInfo, RemoteRelayOptions}; use crate::Context; use core::time::Duration; use ockam_core::compat::sync::Arc; @@ -15,23 +15,23 @@ use ockam_node::{DelayedEvent, WorkerBuilder}; use tracing::debug; #[derive(Clone, Copy)] -pub(super) enum ForwardType { +pub(super) enum RelayType { Static, Ephemeral, StaticWithoutHeartbeats, } -impl ForwardType { +impl RelayType { pub fn str(&self) -> &'static str { match self { - ForwardType::Static => "static", - ForwardType::Ephemeral => "ephemeral", - ForwardType::StaticWithoutHeartbeats => "static_w/o_heartbeats", + RelayType::Static => "static", + RelayType::Ephemeral => "ephemeral", + RelayType::StaticWithoutHeartbeats => "static_w/o_heartbeats", } } } -impl RemoteForwarder { +impl RemoteRelay { fn mailboxes( addresses: Addresses, heartbeat_source_address: Option
, @@ -64,7 +64,7 @@ impl RemoteForwarder { } } -impl RemoteForwarder { +impl RemoteRelay { fn new( addresses: Addresses, registration_route: Route, @@ -84,14 +84,14 @@ impl RemoteForwarder { } } - /// Create and start static RemoteForwarder at predefined address with given Ockam Orchestrator route + /// Create and start static RemoteRelay at predefined address with given Ockam Orchestrator route pub async fn create_static( ctx: &Context, hub_route: impl Into, alias: impl Into, - options: RemoteForwarderOptions, - ) -> Result { - let addresses = Addresses::generate(ForwardType::Static); + options: RemoteRelayOptions, + ) -> Result { + let addresses = Addresses::generate(RelayType::Static); let mut child_ctx = ctx .new_detached_with_mailboxes(Mailboxes::main( @@ -111,7 +111,7 @@ impl RemoteForwarder { let outgoing_access_control = options.create_access_control(ctx.flow_controls(), flow_control_id.clone()); - let forwarder = Self::new( + let relay = Self::new( addresses.clone(), registration_route, alias.into(), @@ -120,32 +120,29 @@ impl RemoteForwarder { Duration::from_secs(5), ); - debug!( - "Starting static RemoteForwarder at {}", - &addresses.heartbeat - ); + debug!("Starting static RemoteRelay at {}", &addresses.heartbeat); let mailboxes = Self::mailboxes( addresses, Some(heartbeat_source_address), outgoing_access_control, ); - WorkerBuilder::new(forwarder) + WorkerBuilder::new(relay) .with_mailboxes(mailboxes) .start(ctx) .await?; - let resp = child_ctx.receive::().await?.body(); + let resp = child_ctx.receive::().await?.body(); Ok(resp) } - /// Create and start new ephemeral RemoteForwarder at random address with given Ockam Hub route + /// Create and start new ephemeral RemoteRelay at random address with given Ockam Hub route pub async fn create( ctx: &Context, hub_route: impl Into, - options: RemoteForwarderOptions, - ) -> Result { - let addresses = Addresses::generate(ForwardType::Ephemeral); + options: RemoteRelayOptions, + ) -> Result { + let addresses = Addresses::generate(RelayType::Ephemeral); let mut callback_ctx = ctx .new_detached_with_mailboxes(Mailboxes::main( @@ -162,7 +159,7 @@ impl RemoteForwarder { let outgoing_access_control = options.create_access_control(ctx.flow_controls(), flow_control_id.clone()); - let forwarder = Self::new( + let relay = Self::new( addresses.clone(), registration_route, "register".to_string(), @@ -172,31 +169,31 @@ impl RemoteForwarder { ); debug!( - "Starting ephemeral RemoteForwarder at {}", + "Starting ephemeral RemoteRelay at {}", &addresses.main_internal ); let mailboxes = Self::mailboxes(addresses, None, outgoing_access_control); - WorkerBuilder::new(forwarder) + WorkerBuilder::new(relay) .with_mailboxes(mailboxes) .start(ctx) .await?; - let resp = callback_ctx.receive::().await?.body(); + let resp = callback_ctx.receive::().await?.body(); Ok(resp) } - /// Create and start new static RemoteForwarder without heart beats - /// This is a temporary kind of RemoteForwarder that will only run on - /// rust nodes (hence the `forwarding_service` addr to create static forwarders). + /// Create and start new static RemoteRelay without heart beats + /// This is a temporary kind of RemoteRelay that will only run on + /// rust nodes (hence the `forwarding_service` addr to create static relays). /// We will use it while we don't have heartbeats implemented on rust nodes. pub async fn create_static_without_heartbeats( ctx: &Context, hub_route: impl Into, alias: impl Into, - options: RemoteForwarderOptions, - ) -> Result { - let addresses = Addresses::generate(ForwardType::StaticWithoutHeartbeats); + options: RemoteRelayOptions, + ) -> Result { + let addresses = Addresses::generate(RelayType::StaticWithoutHeartbeats); let mut callback_ctx = ctx .new_detached_with_mailboxes(Mailboxes::main( @@ -213,7 +210,7 @@ impl RemoteForwarder { let outgoing_access_control = options.create_access_control(ctx.flow_controls(), flow_control_id.clone()); - let forwarder = Self::new( + let relay = Self::new( addresses.clone(), registration_route, alias.into(), @@ -223,16 +220,16 @@ impl RemoteForwarder { ); debug!( - "Starting static RemoteForwarder without heartbeats at {}", + "Starting static RemoteRelay without heartbeats at {}", &addresses.main_internal ); let mailboxes = Self::mailboxes(addresses, None, outgoing_access_control); - WorkerBuilder::new(forwarder) + WorkerBuilder::new(relay) .with_mailboxes(mailboxes) .start(ctx) .await?; - let resp = callback_ctx.receive::().await?.body(); + let resp = callback_ctx.receive::().await?.body(); Ok(resp) } diff --git a/implementations/rust/ockam/ockam/src/remote/mod.rs b/implementations/rust/ockam/ockam/src/remote/mod.rs index 469431d618d..7a3c0da4ce5 100644 --- a/implementations/rust/ockam/ockam/src/remote/mod.rs +++ b/implementations/rust/ockam/ockam/src/remote/mod.rs @@ -1,4 +1,4 @@ -//! [`RemoteForwarder`] allows registering node within a Cloud Node with dynamic or static alias, +//! [`RemoteRelay`] allows registering node within a Cloud Node with dynamic or static alias, //! which allows other nodes forward messages to local workers on this node using that alias. mod addresses; @@ -18,14 +18,14 @@ use ockam_core::Route; use ockam_node::DelayedEvent; /// This Worker is responsible for registering on Ockam Orchestrator and forwarding messages to local Worker -pub struct RemoteForwarder { +pub struct RemoteRelay { /// Address used from other node addresses: Addresses, completion_msg_sent: bool, registration_route: Route, registration_payload: String, flow_control_id: Option, - // We only use Heartbeat for static RemoteForwarder + // We only use Heartbeat for static RemoteRelay heartbeat: Option>>, heartbeat_interval: Duration, } diff --git a/implementations/rust/ockam/ockam/src/remote/options.rs b/implementations/rust/ockam/ockam/src/remote/options.rs index c9626f011ec..e1fdd5962cc 100644 --- a/implementations/rust/ockam/ockam/src/remote/options.rs +++ b/implementations/rust/ockam/ockam/src/remote/options.rs @@ -3,16 +3,16 @@ use ockam_core::compat::sync::Arc; use ockam_core::flow_control::{FlowControlId, FlowControlOutgoingAccessControl, FlowControls}; use ockam_core::{Address, AllowAll, OutgoingAccessControl}; -/// Trust options for [`RemoteForwarder`](super::RemoteForwarder) -pub struct RemoteForwarderOptions {} +/// Trust options for [`RemoteRelay`](super::RemoteRelay) +pub struct RemoteRelayOptions {} -impl RemoteForwarderOptions { +impl RemoteRelayOptions { /// Usually [`FlowControlId`] should be shared with the Producer that was used to create this - /// forwarder (probably Secure Channel), since [`RemoteForwarder`](super::RemoteForwarder) + /// relay (probably Secure Channel), since [`RemoteRelay`](super::RemoteRelay) /// doesn't imply any new "trust" context, it's just a Message Routing helper. /// Therefore, workers that are allowed to receive messages from the corresponding /// Secure Channel should as well be allowed to receive messages - /// through the [`RemoteForwarder`](super::RemoteForwarder) through the same Secure Channel. + /// through the [`RemoteRelay`](super::RemoteRelay) through the same Secure Channel. #[allow(clippy::new_without_default)] pub fn new() -> Self { Self {} diff --git a/implementations/rust/ockam/ockam/src/remote/worker.rs b/implementations/rust/ockam/ockam/src/remote/worker.rs index f41f195a19a..3ee91abfaee 100644 --- a/implementations/rust/ockam/ockam/src/remote/worker.rs +++ b/implementations/rust/ockam/ockam/src/remote/worker.rs @@ -1,4 +1,4 @@ -use crate::remote::{RemoteForwarder, RemoteForwarderInfo}; +use crate::remote::{RemoteRelay, RemoteRelayInfo}; use crate::{Context, OckamError}; use ockam_core::compat::{ boxed::Box, @@ -9,12 +9,12 @@ use ockam_core::{Any, Decodable, Result, Routed, Worker}; use tracing::{debug, info}; #[crate::worker] -impl Worker for RemoteForwarder { +impl Worker for RemoteRelay { type Context = Context; type Message = Any; async fn initialize(&mut self, ctx: &mut Self::Context) -> Result<()> { - debug!("RemoteForwarder registration..."); + debug!("RemoteRelay registration..."); ctx.send_from_address( self.registration_route.clone(), @@ -55,7 +55,7 @@ impl Worker for RemoteForwarder { match transport_message.onward_route.next() { Err(_) => { - debug!("RemoteForwarder received service message"); + debug!("RemoteRelay received service message"); let payload = Vec::::decode(&transport_message.payload) .map_err(|_| OckamError::InvalidHubResponse)?; @@ -67,7 +67,7 @@ impl Worker for RemoteForwarder { } if !self.completion_msg_sent { - info!("RemoteForwarder registered with route: {}", return_route); + info!("RemoteRelay registered with route: {}", return_route); let address = match return_route.recipient()?.to_string().strip_prefix("0#") { Some(addr) => addr.to_string(), @@ -76,7 +76,7 @@ impl Worker for RemoteForwarder { ctx.send_from_address( self.addresses.completion_callback.clone(), - RemoteForwarderInfo::new( + RemoteRelayInfo::new( return_route, address, self.addresses.main_remote.clone(), @@ -103,7 +103,7 @@ impl Worker for RemoteForwarder { } Ok(_) => { // Forwarding the message - debug!("RemoteForwarder received payload message"); + debug!("RemoteRelay received payload message"); // Send the message on its onward_route ctx.forward_from_address(message, self.addresses.main_internal.clone()) diff --git a/implementations/rust/ockam/ockam/tests/forwarder.rs b/implementations/rust/ockam/ockam/tests/relay.rs similarity index 78% rename from implementations/rust/ockam/ockam/tests/forwarder.rs rename to implementations/rust/ockam/ockam/tests/relay.rs index c4550d69789..9c7f2ba54c4 100644 --- a/implementations/rust/ockam/ockam/tests/forwarder.rs +++ b/implementations/rust/ockam/ockam/tests/relay.rs @@ -1,20 +1,20 @@ use ockam::identity::{secure_channels, SecureChannelListenerOptions, SecureChannelOptions}; -use ockam::remote::{RemoteForwarder, RemoteForwarderOptions}; +use ockam::remote::{RemoteRelay, RemoteRelayOptions}; use ockam::workers::Echoer; -use ockam::{ForwardingService, ForwardingServiceOptions}; +use ockam::{RelayService, RelayServiceOptions}; use ockam_core::{route, AllowAll, Result}; use ockam_node::{Context, MessageReceiveOptions}; use ockam_transport_tcp::{TcpConnectionOptions, TcpListenerOptions, TcpTransport}; use std::time::Duration; -// Node creates a Forwarding service and a Remote Forwarder, Echoer is reached through the Forwarder. No flow control +// Node creates a Relay service and a Remote Relay, Echoer is reached through the Relay. No flow control #[ockam_macros::test] async fn test1(ctx: &mut Context) -> Result<()> { - ForwardingService::create(ctx, "forwarding_service", ForwardingServiceOptions::new()).await?; + RelayService::create(ctx, "forwarding_service", RelayServiceOptions::new()).await?; ctx.start_worker("echoer", Echoer).await?; - let remote_info = RemoteForwarder::create(ctx, route![], RemoteForwarderOptions::new()).await?; + let remote_info = RemoteRelay::create(ctx, route![], RemoteRelayOptions::new()).await?; let resp = ctx .send_and_receive::( @@ -28,16 +28,16 @@ async fn test1(ctx: &mut Context) -> Result<()> { ctx.stop().await } -// Cloud: Hosts a Forwarding service and listens on a tcp port. No flow control -// Server: Connects to a Cloud using tcp and creates a dynamic Forwarder. Using flow control +// Cloud: Hosts a Relay service and listens on a tcp port. No flow control +// Server: Connects to a Cloud using tcp and creates a dynamic Relay. Using flow control // Client: Connects to a Cloud using tcp and reaches to the Server's Echoer. Using flow control #[ockam_macros::test] async fn test2(ctx: &mut Context) -> Result<()> { let tcp_listener_options = TcpListenerOptions::new(); - let options = ForwardingServiceOptions::new() + let options = RelayServiceOptions::new() .service_as_consumer(&tcp_listener_options.spawner_flow_control_id()) - .forwarder_as_consumer(&tcp_listener_options.spawner_flow_control_id()); - ForwardingService::create(ctx, "forwarding_service", options).await?; + .relay_as_consumer(&tcp_listener_options.spawner_flow_control_id()); + RelayService::create(ctx, "forwarding_service", options).await?; let cloud_tcp = TcpTransport::create(ctx).await?; let cloud_listener = cloud_tcp @@ -56,8 +56,7 @@ async fn test2(ctx: &mut Context) -> Result<()> { .await?; let remote_info = - RemoteForwarder::create(ctx, cloud_connection.clone(), RemoteForwarderOptions::new()) - .await?; + RemoteRelay::create(ctx, cloud_connection.clone(), RemoteRelayOptions::new()).await?; let client_tcp = TcpTransport::create(ctx).await?; let cloud_connection = client_tcp @@ -76,15 +75,15 @@ async fn test2(ctx: &mut Context) -> Result<()> { ctx.stop().await } -// Server: Connects to a Cloud using tcp and creates a dynamic Forwarder. Using flow control -// Cloud: Hosts a Forwarding service and sends replies to the Client with and without a flow control +// Server: Connects to a Cloud using tcp and creates a dynamic Relay. Using flow control +// Cloud: Hosts a Relay service and sends replies to the Client with and without a flow control #[ockam_macros::test] async fn test3(ctx: &mut Context) -> Result<()> { let tcp_listener_options = TcpListenerOptions::new(); - let options = ForwardingServiceOptions::new() + let options = RelayServiceOptions::new() .service_as_consumer(&tcp_listener_options.spawner_flow_control_id()) - .forwarder_as_consumer(&tcp_listener_options.spawner_flow_control_id()); - ForwardingService::create(ctx, "forwarding_service", options).await?; + .relay_as_consumer(&tcp_listener_options.spawner_flow_control_id()); + RelayService::create(ctx, "forwarding_service", options).await?; let cloud_tcp = TcpTransport::create(ctx).await?; let cloud_listener = cloud_tcp .listen("127.0.0.1:0", tcp_listener_options) @@ -99,8 +98,7 @@ async fn test3(ctx: &mut Context) -> Result<()> { .await?; let remote_info = - RemoteForwarder::create(ctx, cloud_connection.clone(), RemoteForwarderOptions::new()) - .await?; + RemoteRelay::create(ctx, cloud_connection.clone(), RemoteRelayOptions::new()).await?; let mut child_ctx = ctx.new_detached("ctx", AllowAll, AllowAll).await?; ctx.send( @@ -138,21 +136,21 @@ async fn test3(ctx: &mut Context) -> Result<()> { } // Cloud: -// - Hosts a Forwarding service +// - Hosts a Relay service // - Listens on a tcp port without a flow control // - Runs a secure channel listener // // Server: // - Connects to the Cloud using tcp with a flow control // - Creates a secure channel to the Cloud with a flow control -// - Creates a dynamic Forwarder. Using flow control +// - Creates a dynamic Relay. Using flow control // - Runs a Secure Channel listener with a flow control // - Runs an Echoer // // Client: // - Connects to a Cloud using tcp with a flow control // - Creates a secure channel to the Cloud with a flow control -// - Creates a tunneled secure channel to the server using Forwarder's address +// - Creates a tunneled secure channel to the server using Relay's address // - Reaches Server's Echoer using a flow control #[ockam_macros::test] async fn test4(ctx: &mut Context) -> Result<()> { @@ -161,10 +159,10 @@ async fn test4(ctx: &mut Context) -> Result<()> { let cloud_secure_channel_listener_options = SecureChannelListenerOptions::new() .as_consumer(&cloud_tcp_listener_options.spawner_flow_control_id()); - let options = ForwardingServiceOptions::new() + let options = RelayServiceOptions::new() .service_as_consumer(&cloud_secure_channel_listener_options.spawner_flow_control_id()) - .forwarder_as_consumer(&cloud_secure_channel_listener_options.spawner_flow_control_id()); - ForwardingService::create(ctx, "forwarding_service", options).await?; + .relay_as_consumer(&cloud_secure_channel_listener_options.spawner_flow_control_id()); + RelayService::create(ctx, "forwarding_service", options).await?; let secure_channels = secure_channels(); let identities_creation = secure_channels.identities().identities_creation(); @@ -216,12 +214,8 @@ async fn test4(ctx: &mut Context) -> Result<()> { ) .await?; - let remote_info = RemoteForwarder::create( - ctx, - cloud_server_channel.clone(), - RemoteForwarderOptions::new(), - ) - .await?; + let remote_info = + RemoteRelay::create(ctx, cloud_server_channel.clone(), RemoteRelayOptions::new()).await?; // Client let client_tcp = TcpTransport::create(ctx).await?; diff --git a/implementations/rust/ockam/ockam_api/src/kafka/integration_test.rs b/implementations/rust/ockam/ockam_api/src/kafka/integration_test.rs index ac731ea4008..75d7139652f 100644 --- a/implementations/rust/ockam/ockam_api/src/kafka/integration_test.rs +++ b/implementations/rust/ockam/ockam_api/src/kafka/integration_test.rs @@ -41,7 +41,7 @@ mod test { use crate::hop::Hop; use crate::kafka::protocol_aware::utils::{encode_request, encode_response}; - use crate::kafka::secure_channel_map::ForwarderCreator; + use crate::kafka::secure_channel_map::RelayCreator; use crate::kafka::{ ConsumerNodeAddr, KafkaInletController, KafkaPortalListener, KafkaSecureChannelControllerImpl, @@ -51,12 +51,12 @@ mod test { //TODO: upgrade to 13 by adding a metadata request to map uuid<=>topic_name const TEST_KAFKA_API_VERSION: i16 = 12; - struct HopForwarderCreator {} + struct HopRelayCreator {} #[async_trait] - impl ForwarderCreator for HopForwarderCreator { - async fn create_forwarder(&self, context: &Context, alias: String) -> ockam::Result<()> { - trace!("creating mock forwarder for: {alias}"); + impl RelayCreator for HopRelayCreator { + async fn create_relay(&self, context: &Context, alias: String) -> ockam::Result<()> { + trace!("creating mock relay for: {alias}"); //replicating the same logic of the orchestrator by adding consumer__ context .start_worker(Address::from_string(format!("consumer__{alias}")), Hop) @@ -74,7 +74,7 @@ mod test { let secure_channel_controller = KafkaSecureChannelControllerImpl::new_extended( handler.secure_channels.clone(), ConsumerNodeAddr::Relay(MultiAddr::try_from("/service/api")?), - Some(HopForwarderCreator {}), + Some(HopRelayCreator {}), "test_trust_context_id".to_string(), ); @@ -133,7 +133,7 @@ mod test { .await?; //before produce a new key, the consumer has to issue a Fetch request - // so the sidecar can react by creating the forwarder for the partition 1 of 'my-topic' + // so the sidecar can react by creating the relay for the partition 1 of 'my-topic' { let mut consumer_mock_kafka = TcpServerSimulator::start("127.0.0.1:0").await; handler @@ -316,7 +316,7 @@ mod test { send_kafka_request(stream, header, request, ApiKey::ProduceKey).await; } - //this is needed in order to make the consumer create the forwarders to the secure + //this is needed in order to make the consumer create the relays to the secure //channel async fn simulate_first_kafka_consumer_empty_reply_and_ignore_result( consumer_bootstrap_port: u16, @@ -328,7 +328,7 @@ mod test { .unwrap(); send_kafka_fetch_request(&mut kafka_client_connection).await; //we don't want the answer, but we need to be sure the - // message passed through and the forwarder had been created + // message passed through and the relay had been created mock_kafka_connection .stream .read_exact(&mut [0; 4]) diff --git a/implementations/rust/ockam/ockam_api/src/kafka/mod.rs b/implementations/rust/ockam/ockam_api/src/kafka/mod.rs index 6095e801ce4..5180f54a03d 100644 --- a/implementations/rust/ockam/ockam_api/src/kafka/mod.rs +++ b/implementations/rust/ockam/ockam_api/src/kafka/mod.rs @@ -13,7 +13,7 @@ mod secure_channel_map; pub(crate) use inlet_controller::KafkaInletController; use ockam_core::Address; -pub(crate) use outlet_service::prefix_forwarder::PrefixForwarderService; +pub(crate) use outlet_service::prefix_relay::PrefixRelayService; pub(crate) use outlet_service::OutletManagerService; pub(crate) use portal_listener::KafkaPortalListener; pub(crate) use secure_channel_map::ConsumerNodeAddr; diff --git a/implementations/rust/ockam/ockam_api/src/kafka/outlet_service/mod.rs b/implementations/rust/ockam/ockam_api/src/kafka/outlet_service/mod.rs index 5ab082b4a7c..5b6d2f0f617 100644 --- a/implementations/rust/ockam/ockam_api/src/kafka/outlet_service/mod.rs +++ b/implementations/rust/ockam/ockam_api/src/kafka/outlet_service/mod.rs @@ -1,4 +1,4 @@ mod interceptor_listener; -pub(crate) mod prefix_forwarder; +pub(crate) mod prefix_relay; pub(crate) use interceptor_listener::OutletManagerService; diff --git a/implementations/rust/ockam/ockam_api/src/kafka/outlet_service/prefix_forwarder.rs b/implementations/rust/ockam/ockam_api/src/kafka/outlet_service/prefix_relay.rs similarity index 88% rename from implementations/rust/ockam/ockam_api/src/kafka/outlet_service/prefix_forwarder.rs rename to implementations/rust/ockam/ockam_api/src/kafka/outlet_service/prefix_relay.rs index ba0d35e62c3..cb1be4d6cb1 100644 --- a/implementations/rust/ockam/ockam_api/src/kafka/outlet_service/prefix_forwarder.rs +++ b/implementations/rust/ockam/ockam_api/src/kafka/outlet_service/prefix_relay.rs @@ -10,11 +10,11 @@ use ockam_core::{Address, AllowAll, AllowOnwardAddress}; /// This service applies a prefix to the provided static forwarding address. /// This service was created mainly to keep full compatibility with the existing /// erlang implementation. -pub struct PrefixForwarderService { +pub struct PrefixRelayService { prefix: String, secure_channel_listener_flow_control_id: FlowControlId, } -impl PrefixForwarderService { +impl PrefixRelayService { pub async fn create( context: &Context, secure_channel_listener_flow_control_id: FlowControlId, @@ -36,14 +36,14 @@ impl PrefixForwarderService { worker_address, worker, AllowAll, - AllowOnwardAddress(DefaultAddress::FORWARDING_SERVICE.into()), + AllowOnwardAddress(DefaultAddress::RELAY_SERVICE.into()), ) .await } } #[ockam::worker] -impl Worker for PrefixForwarderService { +impl Worker for PrefixRelayService { type Message = Vec; type Context = Context; @@ -75,10 +75,7 @@ impl Worker for PrefixForwarderService { let new_address = format!("{}_{}", &self.prefix, address); - debug!( - "prefix forwarder, renamed from {} to {}", - address, new_address - ); + debug!("prefix relay, renamed from {} to {}", address, new_address); let mut bytes = new_address.clone().into_bytes(); let mut new_payload: Vec = vec![bytes.len() as u8]; @@ -94,7 +91,7 @@ impl Worker for PrefixForwarderService { ctx.forward(message).await?; - // The new forwarder needs to be reachable by the default secure channel listener + // The new relay needs to be reachable by the default secure channel listener ctx.flow_controls().add_consumer( Address::from_string(new_address), &self.secure_channel_listener_flow_control_id, diff --git a/implementations/rust/ockam/ockam_api/src/kafka/protocol_aware/request.rs b/implementations/rust/ockam/ockam_api/src/kafka/protocol_aware/request.rs index 9d39b9609f8..d605879c260 100644 --- a/implementations/rust/ockam/ockam_api/src/kafka/protocol_aware/request.rs +++ b/implementations/rust/ockam/ockam_api/src/kafka/protocol_aware/request.rs @@ -108,7 +108,7 @@ impl InletInterceptorImpl { let request: FetchRequest = decode_body(buffer, header.request_api_version)?; //we intercept every partition interested by the kafka client - //and create a forwarder for each + //and create a relay for each for topic in &request.topics { let topic_id = if header.request_api_version <= 12 { topic.topic.0.to_string() @@ -135,7 +135,7 @@ impl InletInterceptorImpl { .collect(); self.secure_channel_controller - .start_forwarders_for(context, &topic_id, partitions) + .start_relays_for(context, &topic_id, partitions) .await .map_err(InterceptError::Ockam)? } diff --git a/implementations/rust/ockam/ockam_api/src/kafka/protocol_aware/tests.rs b/implementations/rust/ockam/ockam_api/src/kafka/protocol_aware/tests.rs index 92659ee6471..b03d26918b8 100644 --- a/implementations/rust/ockam/ockam_api/src/kafka/protocol_aware/tests.rs +++ b/implementations/rust/ockam/ockam_api/src/kafka/protocol_aware/tests.rs @@ -43,7 +43,7 @@ mod test { Ok(encrypted_content) } - async fn start_forwarders_for( + async fn start_relays_for( &self, _context: &mut Context, _topic_id: &str, diff --git a/implementations/rust/ockam/ockam_api/src/kafka/secure_channel_map.rs b/implementations/rust/ockam/ockam_api/src/kafka/secure_channel_map.rs index 5d5d1c2a143..f675db9fa3f 100644 --- a/implementations/rust/ockam/ockam_api/src/kafka/secure_channel_map.rs +++ b/implementations/rust/ockam/ockam_api/src/kafka/secure_channel_map.rs @@ -1,5 +1,5 @@ use crate::kafka::KAFKA_OUTLET_CONSUMERS; -use crate::nodes::models::forwarder::{CreateForwarder, ForwarderInfo}; +use crate::nodes::models::relay::{CreateRelay, RelayInfo}; use crate::nodes::models::secure_channel::{ CreateSecureChannelRequest, CreateSecureChannelResponse, DeleteSecureChannelRequest, DeleteSecureChannelResponse, @@ -62,10 +62,10 @@ pub(crate) trait KafkaSecureChannelController: Send + Sync { encrypted_content: Vec, ) -> Result>; - /// Starts forwarders in the orchestrator for each {topic_name}_{partition} combination + /// Starts relays in the orchestrator for each {topic_name}_{partition} combination /// should be used only by the consumer. /// does nothing if they were already created, but fails it they already exist. - async fn start_forwarders_for( + async fn start_relays_for( &self, context: &mut Context, topic_id: &str, @@ -74,32 +74,27 @@ pub(crate) trait KafkaSecureChannelController: Send + Sync { } #[async_trait] -pub(crate) trait ForwarderCreator: Send + Sync + 'static { - async fn create_forwarder(&self, context: &Context, alias: String) -> Result<()>; +pub(crate) trait RelayCreator: Send + Sync + 'static { + async fn create_relay(&self, context: &Context, alias: String) -> Result<()>; } -pub(crate) struct NodeManagerForwarderCreator { +pub(crate) struct NodeManagerRelayCreator { orchestrator_multiaddr: MultiAddr, } -impl NodeManagerForwarderCreator { - async fn request_forwarder_creation( +impl NodeManagerRelayCreator { + async fn request_relay_creation( context: &Context, - forwarder_service: MultiAddr, + relay_service: MultiAddr, alias: String, ) -> Result<()> { - let is_rust = !forwarder_service.starts_with(Project::CODE); + let is_rust = !relay_service.starts_with(Project::CODE); let buffer: Vec = context .send_and_receive( route![NODEMANAGER_ADDR], Request::post("/node/forwarder") - .body(CreateForwarder::new( - forwarder_service, - Some(alias), - is_rust, - None, - )) + .body(CreateRelay::new(relay_service, Some(alias), is_rust, None)) .to_vec()?, ) .await?; @@ -112,39 +107,38 @@ impl NodeManagerForwarderCreator { return Err(Error::new( Origin::Transport, Kind::Invalid, - format!("cannot create forwarder: {}", status), + format!("cannot create relay: {}", status), )); } if !response.has_body() { Err(Error::new( Origin::Transport, Kind::Unknown, - "invalid create forwarder response", + "invalid create relay response", )) } else { - let remote_forwarder_information: ForwarderInfo = decoder.decode()?; - trace!("remote forwarder created: {remote_forwarder_information:?}"); + let remote_relay_information: RelayInfo = decoder.decode()?; + trace!("remote relay created: {remote_relay_information:?}"); Ok(()) } } } #[async_trait] -impl ForwarderCreator for NodeManagerForwarderCreator { - async fn create_forwarder(&self, context: &Context, alias: String) -> Result<()> { - trace!("creating remote forwarder for: {alias}"); - Self::request_forwarder_creation(context, self.orchestrator_multiaddr.clone(), alias) - .await?; +impl RelayCreator for NodeManagerRelayCreator { + async fn create_relay(&self, context: &Context, alias: String) -> Result<()> { + trace!("creating remote relay for: {alias}"); + Self::request_relay_creation(context, self.orchestrator_multiaddr.clone(), alias).await?; Ok(()) } } -pub(crate) struct KafkaSecureChannelControllerImpl { +pub(crate) struct KafkaSecureChannelControllerImpl { inner: Arc>>, } //had to manually implement since #[derive(Clone)] doesn't work well in this situation -impl Clone for KafkaSecureChannelControllerImpl { +impl Clone for KafkaSecureChannelControllerImpl { fn clone(&self) -> Self { Self { inner: self.inner.clone(), @@ -153,7 +147,7 @@ impl Clone for KafkaSecureChannelControllerImpl { } /// Describe to reach the consumer node: -/// either directly or through a relay with a forwarder +/// either directly or through a relay with a relay #[derive(Clone)] pub(crate) enum ConsumerNodeAddr { Direct(Option), @@ -161,31 +155,31 @@ pub(crate) enum ConsumerNodeAddr { } type TopicPartition = (String, i32); -struct InnerSecureChannelControllerImpl { +struct InnerSecureChannelControllerImpl { // we identity the secure channel instance by using the decryptor of the consumer // which is known to both parties topic_encryptor_map: HashMap, // describes how to reach the consumer node consumer_node_multiaddr: ConsumerNodeAddr, - topic_forwarder_set: HashSet, - forwarder_creator: Option, + topic_relay_set: HashSet, + relay_creator: Option, secure_channels: Arc, access_control: AbacAccessControl, } -impl KafkaSecureChannelControllerImpl { +impl KafkaSecureChannelControllerImpl { pub(crate) fn new( secure_channels: Arc, consumer_node_multiaddr: ConsumerNodeAddr, trust_context_id: String, - ) -> KafkaSecureChannelControllerImpl { - let forwarder_creator = match consumer_node_multiaddr.clone() { + ) -> KafkaSecureChannelControllerImpl { + let relay_creator = match consumer_node_multiaddr.clone() { ConsumerNodeAddr::Direct(_) => None, ConsumerNodeAddr::Relay(mut orchestrator_multiaddr) => { orchestrator_multiaddr .push_back(Service::new(KAFKA_OUTLET_CONSUMERS)) .unwrap(); - Some(NodeManagerForwarderCreator { + Some(NodeManagerRelayCreator { orchestrator_multiaddr, }) } @@ -193,18 +187,18 @@ impl KafkaSecureChannelControllerImpl { Self::new_extended( secure_channels, consumer_node_multiaddr, - forwarder_creator, + relay_creator, trust_context_id, ) } } -impl KafkaSecureChannelControllerImpl { - /// to manually specify `ForwarderCreator`, for testing purposes +impl KafkaSecureChannelControllerImpl { + /// to manually specify `RelayCreator`, for testing purposes pub(crate) fn new_extended( secure_channels: Arc, consumer_node_multiaddr: ConsumerNodeAddr, - forwarder_creator: Option, + relay_creator: Option, trust_context_id: String, ) -> KafkaSecureChannelControllerImpl { let access_control = AbacAccessControl::create( @@ -216,9 +210,9 @@ impl KafkaSecureChannelControllerImpl { Self { inner: Arc::new(Mutex::new(InnerSecureChannelControllerImpl { topic_encryptor_map: Default::default(), - topic_forwarder_set: Default::default(), + topic_relay_set: Default::default(), secure_channels, - forwarder_creator, + relay_creator, consumer_node_multiaddr, access_control, })), @@ -230,7 +224,7 @@ impl KafkaSecureChannelControllerImpl { } } -impl KafkaSecureChannelControllerImpl { +impl KafkaSecureChannelControllerImpl { async fn request_secure_channel_creation( context: &Context, destination: MultiAddr, @@ -315,7 +309,7 @@ impl KafkaSecureChannelControllerImpl { topic_name: &str, partition: i32, ) -> Result { - // here we should have the orchestrator address and expect forwarders to be + // here we should have the orchestrator address and expect relays to be // present in the orchestrator with the format "consumer__{topic_name}_{partition}" let mut inner = self.inner.lock().await; @@ -469,7 +463,7 @@ impl KafkaSecureChannelControllerImpl { } #[async_trait] -impl KafkaSecureChannelController for KafkaSecureChannelControllerImpl { +impl KafkaSecureChannelController for KafkaSecureChannelControllerImpl { async fn encrypt_content_for( &self, context: &mut Context, @@ -534,31 +528,31 @@ impl KafkaSecureChannelController for KafkaSecureChannelCon Ok(decrypted_content) } - async fn start_forwarders_for( + async fn start_relays_for( &self, context: &mut Context, topic_name: &str, partitions: Vec, ) -> Result<()> { let mut inner = self.inner.lock().await; - // when using direct mode there is no need to create a forwarder - if inner.forwarder_creator.is_none() { + // when using direct mode there is no need to create a relay + if inner.relay_creator.is_none() { return Ok(()); } for partition in partitions { let topic_key: TopicPartition = (topic_name.to_string(), partition); - if inner.topic_forwarder_set.contains(&topic_key) { + if inner.topic_relay_set.contains(&topic_key) { continue; } let alias = format!("{topic_name}_{partition}"); inner - .forwarder_creator + .relay_creator .as_ref() .unwrap() - .create_forwarder(context, alias) + .create_relay(context, alias) .await?; - inner.topic_forwarder_set.insert(topic_key); + inner.topic_relay_set.insert(topic_key); } Ok(()) } diff --git a/implementations/rust/ockam/ockam_api/src/lib.rs b/implementations/rust/ockam/ockam_api/src/lib.rs index 654bd2d57ad..40e5a71db31 100644 --- a/implementations/rust/ockam/ockam_api/src/lib.rs +++ b/implementations/rust/ockam/ockam_api/src/lib.rs @@ -160,7 +160,7 @@ pub struct DefaultAddress; impl DefaultAddress { pub const AUTHENTICATED_SERVICE: &'static str = "authenticated"; - pub const FORWARDING_SERVICE: &'static str = "forwarding_service"; + pub const RELAY_SERVICE: &'static str = "forwarding_service"; pub const UPPERCASE_SERVICE: &'static str = "uppercase"; pub const ECHO_SERVICE: &'static str = "echo"; pub const HOP_SERVICE: &'static str = "hop"; @@ -180,7 +180,7 @@ impl DefaultAddress { matches!( name, Self::AUTHENTICATED_SERVICE - | Self::FORWARDING_SERVICE + | Self::RELAY_SERVICE | Self::UPPERCASE_SERVICE | Self::ECHO_SERVICE | Self::HOP_SERVICE @@ -201,7 +201,7 @@ impl DefaultAddress { pub fn iter() -> impl Iterator { [ Self::AUTHENTICATED_SERVICE, - Self::FORWARDING_SERVICE, + Self::RELAY_SERVICE, Self::UPPERCASE_SERVICE, Self::ECHO_SERVICE, Self::HOP_SERVICE, @@ -300,7 +300,7 @@ mod test { assert!(DefaultAddress::is_valid( DefaultAddress::AUTHENTICATED_SERVICE )); - assert!(DefaultAddress::is_valid(DefaultAddress::FORWARDING_SERVICE)); + assert!(DefaultAddress::is_valid(DefaultAddress::RELAY_SERVICE)); assert!(DefaultAddress::is_valid(DefaultAddress::UPPERCASE_SERVICE)); assert!(DefaultAddress::is_valid(DefaultAddress::ECHO_SERVICE)); assert!(DefaultAddress::is_valid(DefaultAddress::HOP_SERVICE)); diff --git a/implementations/rust/ockam/ockam_api/src/nodes/models/mod.rs b/implementations/rust/ockam/ockam_api/src/nodes/models/mod.rs index dfc653d72f5..2a97cb083ec 100644 --- a/implementations/rust/ockam/ockam_api/src/nodes/models/mod.rs +++ b/implementations/rust/ockam/ockam_api/src/nodes/models/mod.rs @@ -5,9 +5,9 @@ pub mod base; pub mod credentials; pub mod flow_controls; -pub mod forwarder; pub mod policy; pub mod portal; +pub mod relay; pub mod secure_channel; pub mod services; pub mod transport; diff --git a/implementations/rust/ockam/ockam_api/src/nodes/models/forwarder.rs b/implementations/rust/ockam/ockam_api/src/nodes/models/relay.rs similarity index 86% rename from implementations/rust/ockam/ockam_api/src/nodes/models/forwarder.rs rename to implementations/rust/ockam/ockam_api/src/nodes/models/relay.rs index 87f049d63a5..11f91fc2297 100644 --- a/implementations/rust/ockam/ockam_api/src/nodes/models/forwarder.rs +++ b/implementations/rust/ockam/ockam_api/src/nodes/models/relay.rs @@ -1,7 +1,7 @@ use minicbor::{Decode, Encode}; use ockam::identity::Identifier; -use ockam::remote::RemoteForwarderInfo; +use ockam::remote::RemoteRelayInfo; use ockam::route; use ockam_core::flow_control::FlowControlId; use ockam_multiaddr::MultiAddr; @@ -9,14 +9,14 @@ use ockam_multiaddr::MultiAddr; use crate::error::ApiError; use crate::route_to_multiaddr; -/// Request body when instructing a node to create a forwarder +/// Request body when instructing a node to create a relay #[derive(Debug, Clone, Decode, Encode)] #[rustfmt::skip] #[cbor(map)] -pub struct CreateForwarder { - /// Address to create forwarder at. +pub struct CreateRelay { + /// Address to create relay at. #[n(1)] pub(crate) address: MultiAddr, - /// Forwarder alias. + /// Relay alias. #[n(2)] pub(crate) alias: Option, /// Forwarding service is at rust node. #[n(3)] pub(crate) at_rust_node: bool, @@ -26,7 +26,7 @@ pub struct CreateForwarder { #[n(4)] pub(crate) authorized: Option, } -impl CreateForwarder { +impl CreateRelay { pub fn new( address: MultiAddr, alias: Option, @@ -58,18 +58,18 @@ impl CreateForwarder { } } -/// Response body when creating a forwarder +/// Response body when creating a relay #[derive(Debug, Clone, Decode, Encode, serde::Serialize, serde::Deserialize)] #[rustfmt::skip] #[cbor(map)] -pub struct ForwarderInfo { +pub struct RelayInfo { #[n(1)] forwarding_route: String, #[n(2)] remote_address: String, #[n(3)] worker_address: String, #[n(4)] flow_control_id: Option, } -impl ForwarderInfo { +impl RelayInfo { pub fn forwarding_route(&self) -> &str { &self.forwarding_route } @@ -93,8 +93,8 @@ impl ForwarderInfo { } } -impl From for ForwarderInfo { - fn from(inner: RemoteForwarderInfo) -> Self { +impl From for RelayInfo { + fn from(inner: RemoteRelayInfo) -> Self { Self { forwarding_route: inner.forwarding_route().to_string(), remote_address: inner.remote_address().into(), diff --git a/implementations/rust/ockam/ockam_api/src/nodes/registry.rs b/implementations/rust/ockam/ockam_api/src/nodes/registry.rs index 88e9c7d6180..539bcc77502 100644 --- a/implementations/rust/ockam/ockam_api/src/nodes/registry.rs +++ b/implementations/rust/ockam/ockam_api/src/nodes/registry.rs @@ -1,7 +1,7 @@ use crate::nodes::service::Alias; use ockam::identity::Identifier; use ockam::identity::{SecureChannel, SecureChannelListener}; -use ockam::remote::RemoteForwarderInfo; +use ockam::remote::RemoteRelayInfo; use ockam_core::compat::collections::BTreeMap; use ockam_core::{Address, Route}; use ockam_node::compat::asynchronous::RwLock; @@ -205,7 +205,7 @@ pub(crate) struct Registry { pub(crate) kafka_services: RegistryOf, pub(crate) hop_services: RegistryOf, pub(crate) credentials_services: RegistryOf, - pub(crate) forwarders: RegistryOf, + pub(crate) relays: RegistryOf, pub(crate) inlets: RegistryOf, pub(crate) outlets: RegistryOf, } diff --git a/implementations/rust/ockam/ockam_api/src/nodes/service.rs b/implementations/rust/ockam/ockam_api/src/nodes/service.rs index 81f8ac20ef7..0a83610dbc1 100644 --- a/implementations/rust/ockam/ockam_api/src/nodes/service.rs +++ b/implementations/rust/ockam/ockam_api/src/nodes/service.rs @@ -19,8 +19,7 @@ use ockam::identity::{ }; use ockam::identity::{Identifier, SecureChannels}; use ockam::{ - Address, Context, ForwardingService, ForwardingServiceOptions, Result, Routed, TcpTransport, - Worker, + Address, Context, RelayService, RelayServiceOptions, Result, Routed, TcpTransport, Worker, }; use ockam_abac::expr::{eq, ident, str}; use ockam_abac::{Action, Env, Expr, PolicyAccessControl, PolicyStorage, Resource}; @@ -55,13 +54,13 @@ use super::registry::Registry; pub(crate) mod background_node; pub(crate) mod credentials; mod flow_controls; -pub mod forwarder; pub(crate) mod in_memory_node; pub mod message; mod node_identities; mod node_services; mod policy; mod portals; +pub mod relay; mod secure_channel; mod transport; @@ -436,12 +435,12 @@ impl NodeManager { self.start_uppercase_service_impl(ctx, DefaultAddress::UPPERCASE_SERVICE.into()) .await?; - ForwardingService::create( + RelayService::create( ctx, - DefaultAddress::FORWARDING_SERVICE, - ForwardingServiceOptions::new() + DefaultAddress::RELAY_SERVICE, + RelayServiceOptions::new() .service_as_consumer(api_flow_control_id) - .forwarder_as_consumer(api_flow_control_id), + .relay_as_consumer(api_flow_control_id), ) .await?; @@ -689,21 +688,17 @@ impl NodeManagerWorker { (Post, ["node", "services", DefaultAddress::KAFKA_CONSUMER]) => { self.start_kafka_consumer_service(ctx, req, dec).await? } - (Delete, ["node", "services", DefaultAddress::KAFKA_CONSUMER]) => { - encode_response( - self.delete_kafka_service(ctx, req, dec, KafkaServiceKind::Consumer) - .await, - )? - } + (Delete, ["node", "services", DefaultAddress::KAFKA_CONSUMER]) => encode_response( + self.delete_kafka_service(ctx, req, dec, KafkaServiceKind::Consumer) + .await, + )?, (Post, ["node", "services", DefaultAddress::KAFKA_PRODUCER]) => { self.start_kafka_producer_service(ctx, req, dec).await? } - (Delete, ["node", "services", DefaultAddress::KAFKA_PRODUCER]) => { - encode_response( - self.delete_kafka_service(ctx, req, dec, KafkaServiceKind::Producer) - .await, - )? - } + (Delete, ["node", "services", DefaultAddress::KAFKA_PRODUCER]) => encode_response( + self.delete_kafka_service(ctx, req, dec, KafkaServiceKind::Producer) + .await, + )?, (Post, ["node", "services", DefaultAddress::KAFKA_DIRECT]) => { self.start_kafka_direct_service(ctx, req, dec).await? } @@ -716,30 +711,27 @@ impl NodeManagerWorker { self.list_services_of_type(req, service_type).await? } - // ==*== Forwarder commands ==*== + // ==*== Relay commands ==*== + // TODO: change the path to 'relay' instead of 'forwarder' (Get, ["node", "forwarder", remote_address]) => { - encode_response(self.show_forwarder(req, remote_address).await)? + encode_response(self.show_relay(req, remote_address).await)? } - (Get, ["node", "forwarder"]) => encode_response(self.get_forwarders(req).await)?, + (Get, ["node", "forwarder"]) => encode_response(self.get_relays(req).await)?, (Delete, ["node", "forwarder", remote_address]) => { - encode_response(self.delete_forwarder(ctx, req, remote_address).await)? + encode_response(self.delete_relay(ctx, req, remote_address).await)? } (Post, ["node", "forwarder"]) => { - encode_response(self.create_forwarder(ctx, req, dec.decode()?).await)? + encode_response(self.create_relay(ctx, req, dec.decode()?).await)? } // ==*== Inlets & Outlets ==*== (Get, ["node", "inlet"]) => self.get_inlets(req).await.to_vec()?, - (Get, ["node", "inlet", alias]) => { - encode_response(self.show_inlet(req, alias).await)? - } + (Get, ["node", "inlet", alias]) => encode_response(self.show_inlet(req, alias).await)?, (Get, ["node", "outlet"]) => self.get_outlets(req).await.to_vec()?, (Get, ["node", "outlet", alias]) => { encode_response(self.show_outlet(req, alias).await)? } - (Post, ["node", "inlet"]) => { - encode_response(self.create_inlet(req, dec, ctx).await)? - } + (Post, ["node", "inlet"]) => encode_response(self.create_inlet(req, dec, ctx).await)?, (Post, ["node", "outlet"]) => { encode_response(self.create_outlet(ctx, req, dec.decode()?).await)? } diff --git a/implementations/rust/ockam/ockam_api/src/nodes/service/node_services.rs b/implementations/rust/ockam/ockam_api/src/nodes/service/node_services.rs index 9676288c777..384ee6c9924 100644 --- a/implementations/rust/ockam/ockam_api/src/nodes/service/node_services.rs +++ b/implementations/rust/ockam/ockam_api/src/nodes/service/node_services.rs @@ -20,7 +20,7 @@ use crate::kafka::{ ConsumerNodeAddr, KafkaInletController, KafkaPortalListener, KafkaSecureChannelControllerImpl, KAFKA_OUTLET_BOOTSTRAP_ADDRESS, KAFKA_OUTLET_INTERCEPTOR_ADDRESS, }; -use crate::kafka::{OutletManagerService, PrefixForwarderService}; +use crate::kafka::{OutletManagerService, PrefixRelayService}; use crate::nodes::models::services::{ DeleteServiceRequest, ServiceList, ServiceStatus, StartAuthenticatedServiceRequest, StartCredentialsService, StartEchoerServiceRequest, StartHopServiceRequest, @@ -270,7 +270,7 @@ impl NodeManagerWorker { ApiError::core("Unable to get flow control for secure channel listener") })?; - PrefixForwarderService::create( + PrefixRelayService::create( context, default_secure_channel_listener_flow_control_id.clone(), ) diff --git a/implementations/rust/ockam/ockam_api/src/nodes/service/portals.rs b/implementations/rust/ockam/ockam_api/src/nodes/service/portals.rs index f3d6db41dda..0c7329b0274 100644 --- a/implementations/rust/ockam/ockam_api/src/nodes/service/portals.rs +++ b/implementations/rust/ockam/ockam_api/src/nodes/service/portals.rs @@ -524,7 +524,7 @@ impl InMemoryNode { ) -> Result { // The addressing scheme is very flexible. Typically the node connects to // the cloud via secure channel and the with another secure channel via - // forwarder to the actual outlet on the target node. However it is also + // relay to the actual outlet on the target node. However it is also // possible that there is just a single secure channel used to go directly // to another node. let duration = wait_for_outlet_duration.unwrap_or(Duration::from_secs(5)); diff --git a/implementations/rust/ockam/ockam_api/src/nodes/service/forwarder.rs b/implementations/rust/ockam/ockam_api/src/nodes/service/relay.rs similarity index 71% rename from implementations/rust/ockam/ockam_api/src/nodes/service/forwarder.rs rename to implementations/rust/ockam/ockam_api/src/nodes/service/relay.rs index c038eab3d6a..3ebe6f7666a 100644 --- a/implementations/rust/ockam/ockam_api/src/nodes/service/forwarder.rs +++ b/implementations/rust/ockam/ockam_api/src/nodes/service/relay.rs @@ -4,7 +4,7 @@ use std::time::Duration; use ockam::compat::sync::Mutex; use ockam::identity::Identifier; -use ockam::remote::{RemoteForwarder, RemoteForwarderOptions}; +use ockam::remote::{RemoteRelay, RemoteRelayOptions}; use ockam::Result; use ockam_core::api::{Error, Request, RequestHeader, Response}; use ockam_core::{async_trait, Address, AsyncTryClone}; @@ -15,7 +15,7 @@ use ockam_node::Context; use crate::error::ApiError; use crate::nodes::connection::Connection; -use crate::nodes::models::forwarder::{CreateForwarder, ForwarderInfo}; +use crate::nodes::models::relay::{CreateRelay, RelayInfo}; use crate::nodes::models::secure_channel::{ CreateSecureChannelRequest, CreateSecureChannelResponse, }; @@ -27,194 +27,192 @@ use crate::session::sessions::{MAX_CONNECT_TIME, MAX_RECOVERY_TIME}; use super::{NodeManager, NodeManagerWorker}; impl NodeManagerWorker { - pub async fn create_forwarder( + pub async fn create_relay( &self, ctx: &Context, req: &RequestHeader, - create_forwarder: CreateForwarder, - ) -> Result, Response> { - let CreateForwarder { + create_relay: CreateRelay, + ) -> Result, Response> { + let CreateRelay { address, alias, at_rust_node, authorized, - } = create_forwarder; + } = create_relay; match self .node_manager - .create_forwarder(ctx, &address, alias, at_rust_node, authorized) + .create_relay(ctx, &address, alias, at_rust_node, authorized) .await { Ok(body) => Ok(Response::ok(req).body(body)), Err(err) => Err(Response::internal_error( req, - &format!("Failed to create forwarder: {}", err), + &format!("Failed to create relay: {}", err), )), } } - pub async fn delete_forwarder( + pub async fn delete_relay( &self, - ctx: &mut Context, + ctx: &Context, req: &RequestHeader, remote_address: &str, - ) -> Result>, Response> { + ) -> Result>, Response> { self.node_manager - .delete_forwarder(ctx, req, remote_address) + .delete_relay(ctx, req, remote_address) .await } - pub async fn show_forwarder( + pub async fn show_relay( &self, req: &RequestHeader, remote_address: &str, - ) -> Result>, Response> { - self.node_manager.show_forwarder(req, remote_address).await + ) -> Result>, Response> { + self.node_manager.show_relay(req, remote_address).await } - pub async fn get_forwarders( + pub async fn get_relays( &self, req: &RequestHeader, - ) -> Result>, Response> { - debug!("Handling ListForwarders request"); - Ok(Response::ok(req).body(self.node_manager.get_forwarders().await)) + ) -> Result>, Response> { + debug!("Handling GetRelays request"); + Ok(Response::ok(req).body(self.node_manager.get_relays().await)) } } impl NodeManager { - /// This function returns a representation of the relays currently /// registered on this node - pub async fn get_forwarders(&self) -> Vec { - let forwarders = self + pub async fn get_relays(&self) -> Vec { + let relays = self .registry - .forwarders + .relays .entries() .await .iter() - .map(|(_, registry_info)| ForwarderInfo::from(registry_info.to_owned())) + .map(|(_, registry_info)| RelayInfo::from(registry_info.to_owned())) .collect(); - trace!(?forwarders, "Forwarders retrieved"); - forwarders + trace!(?relays, "Relays retrieved"); + relays } /// Create a new Relay /// The Connection encapsulates the list of workers required on the relay route. /// This route is monitored in the `InMemoryNode` and the workers are restarted if necessary /// when the route is unresponsive - pub async fn create_forwarder( + pub async fn create_relay( &self, ctx: &Context, connection: Connection, at_rust_node: bool, alias: Option, - ) -> Result { + ) -> Result { let route = connection.route(self.tcp_transport()).await?; - let options = RemoteForwarderOptions::new(); + let options = RemoteRelayOptions::new(); - let forwarder = if at_rust_node { + let relay = if at_rust_node { if let Some(alias) = alias { - RemoteForwarder::create_static_without_heartbeats(ctx, route, alias, options).await + RemoteRelay::create_static_without_heartbeats(ctx, route, alias, options).await } else { - RemoteForwarder::create(ctx, route, options).await + RemoteRelay::create(ctx, route, options).await } } else if let Some(alias) = alias { - RemoteForwarder::create_static(ctx, route, alias, options).await + RemoteRelay::create_static(ctx, route, alias, options).await } else { - RemoteForwarder::create(ctx, route, options).await + RemoteRelay::create(ctx, route, options).await }; - match forwarder { + match relay { Ok(info) => { let registry_info = info.clone(); let registry_remote_address = registry_info.remote_address().to_string(); - let forwarder_info = ForwarderInfo::from(info); + let relay_info = RelayInfo::from(info); self.registry - .forwarders + .relays .insert(registry_remote_address, registry_info) .await; debug!( - forwarding_route = %forwarder_info.forwarding_route(), - remote_address = %forwarder_info.remote_address_ma()?, - "CreateForwarder request processed, sending back response" + forwarding_route = %relay_info.forwarding_route(), + remote_address = %relay_info.remote_address_ma()?, + "CreateRelay request processed, sending back response" ); - Ok(forwarder_info) + Ok(relay_info) } Err(err) => { - error!(?err, "Failed to create forwarder"); + error!(?err, "Failed to create relay"); Err(err) } } } /// This function removes an existing relay based on its remote address - pub async fn delete_forwarder( + pub async fn delete_relay( &self, ctx: &Context, req: &RequestHeader, remote_address: &str, - ) -> Result>, Response> { - debug!(%remote_address , "Handling DeleteForwarder request"); + ) -> Result>, Response> { + debug!(%remote_address , "Handling DeleteRelay request"); - if let Some(forwarder_to_delete) = self.registry.forwarders.remove(remote_address).await { - debug!(%remote_address, "Successfully removed forwarder from node registry"); + if let Some(relay_to_delete) = self.registry.relays.remove(remote_address).await { + debug!(%remote_address, "Successfully removed relay from node registry"); match ctx - .stop_worker(forwarder_to_delete.worker_address().clone()) + .stop_worker(relay_to_delete.worker_address().clone()) .await { Ok(_) => { - debug!(%remote_address, "Successfully stopped forwarder"); - Ok(Response::ok(req) - .body(Some(ForwarderInfo::from(forwarder_to_delete.to_owned())))) + debug!(%remote_address, "Successfully stopped relay"); + Ok(Response::ok(req).body(Some(RelayInfo::from(relay_to_delete.to_owned())))) } Err(err) => { - error!(%remote_address, ?err, "Failed to delete forwarder from node registry"); + error!(%remote_address, ?err, "Failed to delete relay from node registry"); Err(Response::internal_error( req, - &format!("Failed to delete forwarder at {}: {}", remote_address, err), + &format!("Failed to delete relay at {}: {}", remote_address, err), )) } } } else { - error!(%remote_address, "Forwarder not found in the node registry"); + error!(%remote_address, "Relay not found in the node registry"); Err(Response::not_found( req, - &format!("Forwarder with address {} not found.", remote_address), + &format!("Relay with address {} not found.", remote_address), )) } } /// This function finds an existing relay and returns its configuration - pub(super) async fn show_forwarder( + pub(super) async fn show_relay( &self, req: &RequestHeader, remote_address: &str, - ) -> Result>, Response> { - debug!("Handling ShowForwarder request"); - if let Some(forwarder_to_show) = self.registry.forwarders.get(remote_address).await { - debug!(%remote_address, "Forwarder not found in node registry"); - Ok(Response::ok(req).body(Some(ForwarderInfo::from(forwarder_to_show.to_owned())))) + ) -> Result>, Response> { + debug!("Handling ShowRelay request"); + if let Some(relay) = self.registry.relays.get(remote_address).await { + debug!(%remote_address, "Relay not found in node registry"); + Ok(Response::ok(req).body(Some(RelayInfo::from(relay.to_owned())))) } else { - error!(%remote_address, "Forwarder not found in the node registry"); + error!(%remote_address, "Relay not found in the node registry"); Err(Response::not_found( req, - &format!("Forwarder with address {} not found.", remote_address), + &format!("Relay with address {} not found.", remote_address), )) } } } impl InMemoryNode { - pub async fn create_forwarder( + pub async fn create_relay( &self, ctx: &Context, address: &MultiAddr, alias: Option, at_rust_node: bool, authorized: Option, - ) -> Result { - debug!(addr = %address, alias = ?alias, at_rust_node = ?at_rust_node, "Handling CreateForwarder request"); + ) -> Result { + debug!(addr = %address, alias = ?alias, at_rust_node = ?at_rust_node, "Handling CreateRelay request"); let connection_ctx = Arc::new(ctx.async_try_clone().await?); let connection = self .make_connection( @@ -234,9 +232,9 @@ impl InMemoryNode { connection.add_consumer(connection_ctx.clone(), &hop); } - let forwarder = self + let relay = self .node_manager - .create_forwarder( + .create_relay( ctx, connection.clone(), at_rust_node, @@ -258,7 +256,7 @@ impl InMemoryNode { session.set_replacer(repl); self.add_session(session); }; - Ok(forwarder) + Ok(relay) } /// Create a session replacer. @@ -286,7 +284,7 @@ impl InMemoryNode { let node_manager = node_manager.clone(); Box::pin(async move { - debug!(%prev_route, %addr, "creating new remote forwarder"); + debug!(%prev_route, %addr, "creating new remote relay"); let f = async { for encryptor in &previous_connection.secure_channel_encryptors { @@ -323,21 +321,21 @@ impl InMemoryNode { let route = connection.route(node_manager.tcp_transport()).await?; - let options = RemoteForwarderOptions::new(); + let options = RemoteRelayOptions::new(); if let Some(alias) = &alias { - RemoteForwarder::create_static(&ctx, route, alias, options).await?; + RemoteRelay::create_static(&ctx, route, alias, options).await?; } else { - RemoteForwarder::create(&ctx, route, options).await?; + RemoteRelay::create(&ctx, route, options).await?; } Ok(connection.transport_route()) }; match timeout(MAX_RECOVERY_TIME, f).await { Err(_) => { - warn!(%addr, "timeout creating new remote forwarder"); + warn!(%addr, "timeout creating new remote relay"); Err(ApiError::core("timeout")) } Ok(Err(e)) => { - warn!(%addr, err = %e, "error creating new remote forwarder"); + warn!(%addr, err = %e, "error creating new remote relay"); Err(e) } Ok(Ok(a)) => Ok(a), @@ -355,7 +353,7 @@ pub trait Relays { address: &MultiAddr, alias: Option, authorized: Option, - ) -> miette::Result; + ) -> miette::Result; } #[async_trait] @@ -366,9 +364,9 @@ impl Relays for BackgroundNode { address: &MultiAddr, alias: Option, authorized: Option, - ) -> miette::Result { + ) -> miette::Result { let at_rust_node = !address.starts_with(Project::CODE); - let body = CreateForwarder::new(address.clone(), alias, at_rust_node, authorized); + let body = CreateRelay::new(address.clone(), alias, at_rust_node, authorized); self.ask(ctx, Request::post("/node/forwarder").body(body)) .await } diff --git a/implementations/rust/ockam/ockam_app/src/shared_service/relay/create.rs b/implementations/rust/ockam/ockam_app/src/shared_service/relay/create.rs index b86a948929b..fd07e4aa5a6 100644 --- a/implementations/rust/ockam/ockam_app/src/shared_service/relay/create.rs +++ b/implementations/rust/ockam/ockam_app/src/shared_service/relay/create.rs @@ -3,7 +3,7 @@ use crate::Result; use miette::IntoDiagnostic; use ockam::Context; use ockam_api::cli_state::{CliState, StateDirTrait}; -use ockam_api::nodes::models::forwarder::ForwarderInfo; +use ockam_api::nodes::models::relay::RelayInfo; use ockam_api::nodes::InMemoryNode; use ockam_multiaddr::MultiAddr; use once_cell::sync::Lazy; @@ -37,7 +37,7 @@ async fn create_relay_impl( context: &Context, cli_state: &CliState, node_manager: Arc, -) -> Result> { +) -> Result> { trace!("Creating relay"); if !cli_state.is_enrolled().unwrap_or(false) { trace!("Not enrolled, skipping relay creation"); @@ -53,7 +53,7 @@ async fn create_relay_impl( let project_route = format!("/project/{}", project.name()); let project_address = MultiAddr::from_str(&project_route).into_diagnostic()?; let relay = node_manager - .create_forwarder( + .create_relay( context, &project_address, Some(NODE_NAME.to_string()), @@ -73,9 +73,9 @@ async fn create_relay_impl( } } -pub(crate) async fn get_relay(node_manager: Arc) -> Option { +pub(crate) async fn get_relay(node_manager: Arc) -> Option { node_manager - .get_forwarders() + .get_relays() .await .into_iter() .find(|r| r.remote_address() == *RELAY_NAME) diff --git a/implementations/rust/ockam/ockam_command/src/project/util.rs b/implementations/rust/ockam/ockam_command/src/project/util.rs index 60c709cf398..90043df9297 100644 --- a/implementations/rust/ockam/ockam_command/src/project/util.rs +++ b/implementations/rust/ockam/ockam_command/src/project/util.rs @@ -12,7 +12,7 @@ use ockam_api::cloud::project::{Project, Projects}; use ockam_api::cloud::{Controller, ORCHESTRATOR_AWAIT_TIMEOUT_MS}; use ockam_api::config::lookup::LookupMeta; use ockam_api::error::ApiError; -use ockam_api::nodes::service::forwarder::SecureChannelsCreation; +use ockam_api::nodes::service::relay::SecureChannelsCreation; use ockam_api::nodes::InMemoryNode; use ockam_api::route_to_multiaddr; diff --git a/implementations/rust/ockam/ockam_command/src/relay/create.rs b/implementations/rust/ockam/ockam_command/src/relay/create.rs index 8266b78dd18..8d2fa3891bb 100644 --- a/implementations/rust/ockam/ockam_command/src/relay/create.rs +++ b/implementations/rust/ockam/ockam_command/src/relay/create.rs @@ -12,8 +12,8 @@ use ockam::identity::Identifier; use ockam::Context; use ockam_api::address::extract_address_value; use ockam_api::is_local_node; -use ockam_api::nodes::models::forwarder::ForwarderInfo; -use ockam_api::nodes::service::forwarder::Relays; +use ockam_api::nodes::models::relay::RelayInfo; +use ockam_api::nodes::service::relay::Relays; use ockam_api::nodes::BackgroundNode; use ockam_multiaddr::proto::Project; use ockam_multiaddr::{MultiAddr, Protocol}; @@ -43,7 +43,7 @@ pub struct CreateCommand { to: Option, /// Route to the node at which to create the relay - #[arg(long, id = "ROUTE", display_order = 900, value_parser = parse_at, default_value_t = default_forwarder_at())] + #[arg(long, id = "ROUTE", display_order = 900, value_parser = parse_at, default_value_t = default_relay_at())] at: MultiAddr, /// Authorized identity for secure channel connection @@ -69,7 +69,7 @@ fn parse_at(input: &str) -> Result { Ok(ma) } -pub fn default_forwarder_at() -> MultiAddr { +pub fn default_relay_at() -> MultiAddr { MultiAddr::from_str("/project/default").expect("Default relay address is invalid") } @@ -107,7 +107,7 @@ async fn rpc(ctx: Context, (opts, cmd): (CommandGlobalOpts, CreateCommand)) -> m let output_messages = vec![ format!( - "Creating relay forwarding service at {}...", + "Creating relay relay service at {}...", &cmd.at .to_string() .color(OckamColor::PrimaryResource.color()) @@ -155,7 +155,7 @@ async fn rpc(ctx: Context, (opts, cmd): (CommandGlobalOpts, CreateCommand)) -> m Ok(()) } -impl Output for ForwarderInfo { +impl Output for RelayInfo { fn output(&self) -> Result { let output = format!( r#" diff --git a/implementations/rust/ockam/ockam_command/src/relay/delete.rs b/implementations/rust/ockam/ockam_command/src/relay/delete.rs index 9b72006dcbf..9b474a2e74c 100644 --- a/implementations/rust/ockam/ockam_command/src/relay/delete.rs +++ b/implementations/rust/ockam/ockam_command/src/relay/delete.rs @@ -63,7 +63,7 @@ pub async fn run_impl( node_name )) .machine(&relay_name) - .json(serde_json::json!({ "forwarder": { "name": relay_name, + .json(serde_json::json!({ "relay": { "name": relay_name, "node": node_name } })) .write_line() .unwrap(); diff --git a/implementations/rust/ockam/ockam_command/src/relay/list.rs b/implementations/rust/ockam/ockam_command/src/relay/list.rs index 31c98b491e0..68c75939d77 100644 --- a/implementations/rust/ockam/ockam_command/src/relay/list.rs +++ b/implementations/rust/ockam/ockam_command/src/relay/list.rs @@ -8,7 +8,7 @@ use tracing::trace; use ockam::Context; use ockam_api::address::extract_address_value; use ockam_api::cli_state::StateDirTrait; -use ockam_api::nodes::models::forwarder::ForwarderInfo; +use ockam_api::nodes::models::relay::RelayInfo; use ockam_api::nodes::BackgroundNode; use ockam_core::api::Request; @@ -54,8 +54,7 @@ async fn run_impl( let is_finished: Mutex = Mutex::new(false); let get_relays = async { - let relay_infos: Vec = - node.ask(&ctx, Request::get("/node/forwarder")).await?; + let relay_infos: Vec = node.ask(&ctx, Request::get("/node/forwarder")).await?; *is_finished.lock().await = true; Ok(relay_infos) }; diff --git a/implementations/rust/ockam/ockam_command/src/relay/show.rs b/implementations/rust/ockam/ockam_command/src/relay/show.rs index 41a4d4b584b..e4105f70848 100644 --- a/implementations/rust/ockam/ockam_command/src/relay/show.rs +++ b/implementations/rust/ockam/ockam_command/src/relay/show.rs @@ -3,7 +3,7 @@ use miette::IntoDiagnostic; use ockam::Context; use ockam_api::address::extract_address_value; -use ockam_api::nodes::models::forwarder::ForwarderInfo; +use ockam_api::nodes::models::relay::RelayInfo; use ockam_api::nodes::BackgroundNode; use ockam_core::api::Request; @@ -45,7 +45,7 @@ async fn run_impl( let node_name = extract_address_value(&at)?; let remote_address = &cmd.remote_address; let node = BackgroundNode::create(&ctx, &opts.state, &node_name).await?; - let relay_info: ForwarderInfo = node + let relay_info: RelayInfo = node .ask( &ctx, Request::get(format!("/node/forwarder/{remote_address}")), diff --git a/implementations/rust/ockam/ockam_command/tests/bats/relay.bats b/implementations/rust/ockam/ockam_command/tests/bats/relay.bats index df4febd6e19..62d004387dd 100644 --- a/implementations/rust/ockam/ockam_command/tests/bats/relay.bats +++ b/implementations/rust/ockam/ockam_command/tests/bats/relay.bats @@ -58,6 +58,6 @@ teardown() { assert_output --regexp "Worker Address: /service/.*" # Test showing non-existing with no relay - run_failure "$OCKAM" relay show forwarder_blank --at /node/n2 + run_failure "$OCKAM" relay show relay_blank --at /node/n2 assert_output --partial "not found" }