diff --git a/examples/rust/get_started/examples/11-attribute-based-authentication-control-plane.rs b/examples/rust/get_started/examples/11-attribute-based-authentication-control-plane.rs index bddca089016..e4676513ac0 100644 --- a/examples/rust/get_started/examples/11-attribute-based-authentication-control-plane.rs +++ b/examples/rust/get_started/examples/11-attribute-based-authentication-control-plane.rs @@ -108,7 +108,7 @@ async fn start_node(ctx: Context, project_information_path: &str, token: OneTime // 4. create a tcp outlet with the above policy tcp.create_outlet( "outlet", - HostnamePort::new("127.0.0.1", 5000), + HostnamePort::new("127.0.0.1", 5000)?, TcpOutletOptions::new() .with_incoming_access_control_impl(incoming_access_control) .with_outgoing_access_control_impl(outgoing_access_control), diff --git a/implementations/rust/ockam/ockam/src/lib.rs b/implementations/rust/ockam/ockam/src/lib.rs index 7c34dffb9c6..81cb2e6f243 100644 --- a/implementations/rust/ockam/ockam/src/lib.rs +++ b/implementations/rust/ockam/ockam/src/lib.rs @@ -105,7 +105,7 @@ pub use relay_service::{RelayService, RelayServiceOptions}; /// Transport pub mod transport { pub use ockam_transport_core::{ - parse_socket_addr, HostnamePort, StaticHostnamePort, Transport, + parse_socket_addr, HostnamePort, SchemeHostnamePort, StaticHostnamePort, Transport, }; } diff --git a/implementations/rust/ockam/ockam_api/src/kafka/inlet_controller.rs b/implementations/rust/ockam/ockam_api/src/kafka/inlet_controller.rs index e13b7a5e803..7beef6f8a79 100644 --- a/implementations/rust/ockam/ockam_api/src/kafka/inlet_controller.rs +++ b/implementations/rust/ockam/ockam_api/src/kafka/inlet_controller.rs @@ -118,7 +118,7 @@ impl KafkaInletController { } let inlet_bind_address = - HostnamePort::new(inner.bind_hostname.clone(), inner.current_port); + HostnamePort::new(inner.bind_hostname.clone(), inner.current_port)?; let node_manager = self.node_manager.upgrade().ok_or_else(|| { Error::new(Origin::Node, Kind::Internal, "node manager was shut down") diff --git a/implementations/rust/ockam/ockam_api/src/kafka/tests/integration_test.rs b/implementations/rust/ockam/ockam_api/src/kafka/tests/integration_test.rs index 4b3fead2383..fbc14b42247 100644 --- a/implementations/rust/ockam/ockam_api/src/kafka/tests/integration_test.rs +++ b/implementations/rust/ockam/ockam_api/src/kafka/tests/integration_test.rs @@ -161,7 +161,7 @@ async fn producer__flow_with_mock_kafka__content_encryption_and_decryption( .tcp .create_outlet( "kafka_consumer_outlet", - HostnamePort::new("127.0.0.1", consumer_mock_kafka.port), + HostnamePort::new("127.0.0.1", consumer_mock_kafka.port)?, TcpOutletOptions::new(), ) .await?; @@ -181,7 +181,7 @@ async fn producer__flow_with_mock_kafka__content_encryption_and_decryption( .tcp .create_outlet( "kafka_producer_outlet", - HostnamePort::new("127.0.0.1", producer_mock_kafka.port), + HostnamePort::new("127.0.0.1", producer_mock_kafka.port)?, TcpOutletOptions::new(), ) .await?; @@ -217,7 +217,7 @@ async fn producer__flow_with_mock_kafka__content_encryption_and_decryption( .tcp .create_outlet( "kafka_consumer_outlet", - HostnamePort::new("127.0.0.1", consumer_mock_kafka.port), + HostnamePort::new("127.0.0.1", consumer_mock_kafka.port)?, TcpOutletOptions::new(), ) .await?; diff --git a/implementations/rust/ockam/ockam_api/src/nodes/registry.rs b/implementations/rust/ockam/ockam_api/src/nodes/registry.rs index 099e180044c..85b4b702cf7 100644 --- a/implementations/rust/ockam/ockam_api/src/nodes/registry.rs +++ b/implementations/rust/ockam/ockam_api/src/nodes/registry.rs @@ -346,6 +346,10 @@ mod tests { } fn outlet_info(worker_addr: Address) -> OutletInfo { - OutletInfo::new(HostnamePort::new("127.0.0.1", 0), Some(&worker_addr), true) + OutletInfo::new( + HostnamePort::new("127.0.0.1", 0).unwrap(), + Some(&worker_addr), + true, + ) } } diff --git a/implementations/rust/ockam/ockam_api/src/nodes/service/tcp_inlets/node_manager.rs b/implementations/rust/ockam/ockam_api/src/nodes/service/tcp_inlets/node_manager.rs index b05eb088937..b665acc9429 100644 --- a/implementations/rust/ockam/ockam_api/src/nodes/service/tcp_inlets/node_manager.rs +++ b/implementations/rust/ockam/ockam_api/src/nodes/service/tcp_inlets/node_manager.rs @@ -66,8 +66,7 @@ impl NodeManager { // the port could be zero, to simplify the following code we // resolve the address to a full socket address - let socket_addr = - ockam_node::compat::asynchronous::resolve_peer(listen_addr.to_string()).await?; + let socket_addr = ockam_node::compat::asynchronous::resolve_peer(&listen_addr).await?; let listen_addr = if listen_addr.port() == 0 { get_free_address_for(&socket_addr.ip().to_string()) .map_err(|err| ockam_core::Error::new(Origin::Transport, Kind::Invalid, err))? diff --git a/implementations/rust/ockam/ockam_api/src/rendezvous_healthcheck.rs b/implementations/rust/ockam/ockam_api/src/rendezvous_healthcheck.rs index 8b1ba0f6d5a..92daac73a9c 100644 --- a/implementations/rust/ockam/ockam_api/src/rendezvous_healthcheck.rs +++ b/implementations/rust/ockam/ockam_api/src/rendezvous_healthcheck.rs @@ -23,7 +23,7 @@ impl RendezvousHealthcheck { udp_socket_address: SocketAddr, ) -> Result { let peer = if udp_socket_address.ip().is_unspecified() { - HostnamePort::new("localhost", udp_socket_address.port()).to_string() + HostnamePort::new("localhost", udp_socket_address.port())?.to_string() } else { udp_socket_address.to_string() }; diff --git a/implementations/rust/ockam/ockam_api/tests/latency.rs b/implementations/rust/ockam/ockam_api/tests/latency.rs index 4eb7bf974ee..ec086eafe04 100644 --- a/implementations/rust/ockam/ockam_api/tests/latency.rs +++ b/implementations/rust/ockam/ockam_api/tests/latency.rs @@ -150,7 +150,7 @@ pub fn measure_buffer_latency_two_nodes_portal() { .node_manager .create_inlet( &first_node.context, - HostnamePort::new("127.0.0.1", 0), + HostnamePort::new("127.0.0.1", 0)?, route![], route![], second_node_listen_address diff --git a/implementations/rust/ockam/ockam_api/tests/portals.rs b/implementations/rust/ockam/ockam_api/tests/portals.rs index d1b0ab9d1e6..ef7d88a7401 100644 --- a/implementations/rust/ockam/ockam_api/tests/portals.rs +++ b/implementations/rust/ockam/ockam_api/tests/portals.rs @@ -46,7 +46,7 @@ async fn inlet_outlet_local_successful(context: &mut Context) -> ockam::Result<( .node_manager .create_inlet( context, - HostnamePort::new("127.0.0.1", 0), + HostnamePort::new("127.0.0.1", 0)?, route![], route![], MultiAddr::from_str("/secure/api/service/outlet")?, @@ -122,7 +122,7 @@ fn portal_node_goes_down_reconnect() { .node_manager .create_inlet( &first_node.context, - HostnamePort::new("127.0.0.1", 0), + HostnamePort::new("127.0.0.1", 0)?, route![], route![], second_node_listen_address @@ -280,7 +280,7 @@ fn portal_low_bandwidth_connection_keep_working_for_60s() { .node_manager .create_inlet( &first_node.context, - HostnamePort::new("127.0.0.1", 0), + HostnamePort::new("127.0.0.1", 0)?, route![], route![], InternetAddress::from(passthrough_server_handle.chosen_addr) @@ -394,7 +394,7 @@ fn portal_heavy_load_exchanged() { .node_manager .create_inlet( &first_node.context, - HostnamePort::new("127.0.0.1", 0), + HostnamePort::new("127.0.0.1", 0)?, route![], route![], second_node_listen_address @@ -547,7 +547,7 @@ fn test_portal_payload_transfer(outgoing_disruption: Disruption, incoming_disrup .node_manager .create_inlet( &first_node.context, - HostnamePort::new("127.0.0.1", 0), + HostnamePort::new("127.0.0.1", 0)?, route![], route![], InternetAddress::from(passthrough_server_handle.chosen_addr) diff --git a/implementations/rust/ockam/ockam_app_lib/src/shared_service/tcp_outlet/create.rs b/implementations/rust/ockam/ockam_app_lib/src/shared_service/tcp_outlet/create.rs index e656b3bc85a..4304949f042 100644 --- a/implementations/rust/ockam/ockam_app_lib/src/shared_service/tcp_outlet/create.rs +++ b/implementations/rust/ockam/ockam_app_lib/src/shared_service/tcp_outlet/create.rs @@ -6,6 +6,7 @@ use ockam::transport::HostnamePort; use ockam::Address; use ockam_api::address::extract_address_value; use ockam_api::nodes::models::portal::OutletAccessControl; +use std::str::FromStr; use std::sync::Arc; use tracing::{debug, info}; @@ -21,9 +22,12 @@ impl AppState { } else { format!("{DEFAULT_HOST}:{to}") }; - let socket_addr = resolve_peer(addr).await.into_diagnostic().wrap_err( - "Invalid address. The expected formats are 'host:port', 'ip:port' or 'port'", - )?; + let socket_addr = resolve_peer(&HostnamePort::from_str(&addr)?) + .await + .into_diagnostic() + .wrap_err( + "Invalid address. The expected formats are 'host:port', 'ip:port' or 'port'", + )?; let worker_addr: Address = extract_address_value(&from) .wrap_err("Invalid service address")? .into(); diff --git a/implementations/rust/ockam/ockam_command/src/influxdb/inlet/create.rs b/implementations/rust/ockam/ockam_command/src/influxdb/inlet/create.rs index b939dc619ed..855d3cb40ff 100644 --- a/implementations/rust/ockam/ockam_command/src/influxdb/inlet/create.rs +++ b/implementations/rust/ockam/ockam_command/src/influxdb/inlet/create.rs @@ -1,25 +1,148 @@ use crate::node::util::initialize_default_node; -use crate::tcp::inlet::create::CreateCommand as InletCreateCommand; +use crate::shared_args::OptionalTimeoutArg; +use crate::tcp::inlet::create::{tcp_inlet_default_from_addr, tcp_inlet_default_to_addr}; +use crate::tcp::util::alias_parser; +use crate::util::parsers::duration_parser; +use crate::util::parsers::hostname_parser; +use crate::util::{port_is_free_guard, print_warning_for_deprecated_flag_replaced}; use crate::{Command, CommandGlobalOpts}; use async_trait::async_trait; +use clap::builder::FalseyValueParser; use clap::Args; use colorful::Colorful; -use miette::miette; +use miette::{miette, IntoDiagnostic}; +use ockam::identity::Identifier; +use ockam::transport::SchemeHostnamePort; use ockam::Context; +use ockam_abac::PolicyExpression; +use ockam_api::address::extract_address_value; +use ockam_api::cli_state::random_name; use ockam_api::colors::color_primary; use ockam_api::influxdb::{InfluxDBPortals, LeaseUsage}; use ockam_api::nodes::models::portal::InletStatus; use ockam_api::nodes::BackgroundNodeClient; -use ockam_api::{fmt_info, fmt_log, fmt_ok, fmt_warn, ConnectionStatus}; +use ockam_api::{fmt_info, fmt_log, fmt_ok, fmt_warn, CliState, ConnectionStatus}; use ockam_core::api::{Reply, Status}; -use ockam_multiaddr::MultiAddr; +use ockam_multiaddr::{proto, MultiAddr, Protocol}; +use ockam_node::compat::asynchronous::resolve_peer; +use std::str::FromStr; +use std::time::Duration; use tracing::trace; /// Create InfluxDB Inlets #[derive(Clone, Debug, Args)] -pub struct InfluxDBCreateCommand { +pub struct CreateCommand { + /// Assign a name to this InfluxDB Inlet + #[arg(id = "NAME", value_parser = alias_parser)] + pub name: Option, + + /// Node on which to start the InfluxDB Inlet. + #[arg(long, display_order = 900, id = "NODE_NAME", value_parser = extract_address_value)] + pub at: Option, + + /// Address on which to accept InfluxDB connections, in the format ://
:. + /// At least the port must be provided. The default scheme is `tcp` and the default address is `127.0.0.1`. + /// If the argument is not set, a random port will be used on the default address. + /// + /// To enable TLS, the `ockam-tls-certificate` credential attribute is required. + /// It will use the default project TLS certificate provider `/project/default/service/tls_certificate_provider`. + /// To specify a different certificate provider, use `--tls-certificate-provider`. + #[arg(long, display_order = 900, id = "SOCKET_ADDRESS", hide_default_value = true, default_value_t = tcp_inlet_default_from_addr(), value_parser = hostname_parser)] + pub from: SchemeHostnamePort, + + /// Route to a InfluxDB Outlet or the name of the InfluxDB Outlet service you want to connect to. + /// + /// If you are connecting to a local node, you can provide the route as `/node/n/service/outlet`. + /// + /// If you are connecting to a remote node through a relay in the Orchestrator you can either + /// provide the full route to the InfluxDB Outlet as `/project/myproject/service/forward_to_myrelay/secure/api/service/outlet`, + /// or just the name of the service as `outlet` or `/service/outlet`. + /// If you are passing just the service name, consider using `--via` to specify the + /// relay name (e.g. `ockam tcp-inlet create --to outlet --via myrelay`). + #[arg(long, display_order = 900, id = "ROUTE", default_value_t = tcp_inlet_default_to_addr())] + pub to: String, + + /// Name of the relay that this InfluxDB Inlet will use to connect to the InfluxDB Outlet. + /// + /// Use this flag when you are using `--to` to specify the service name of a InfluxDB Outlet + /// that is reachable through a relay in the Orchestrator. + /// If you don't provide it, the default relay name will be used, if necessary. + #[arg(long, display_order = 900, id = "RELAY_NAME")] + pub via: Option, + + /// Identity to be used to create the secure channel. If not set, the node's identity will be used. + #[arg(long, value_name = "IDENTITY_NAME", display_order = 900)] + pub identity: Option, + + /// Authorized identifier for secure channel connection + #[arg(long, name = "AUTHORIZED", display_order = 900)] + pub authorized: Option, + + /// [DEPRECATED] Use the positional argument instead + #[arg(long, display_order = 900, id = "ALIAS", value_parser = alias_parser)] + pub alias: Option, + + /// Policy expression that will be used for access control to the InfluxDB Inlet. + /// If you don't provide it, the policy set for the "tcp-inlet" resource type will be used. + /// + /// You can check the fallback policy with `ockam policy show --resource-type tcp-inlet`. + #[arg( + long, + visible_alias = "expression", + display_order = 900, + id = "POLICY_EXPRESSION" + )] + pub allow: Option, + + /// Time to wait for the outlet to be available. + #[arg(long, display_order = 900, id = "WAIT", default_value = "5s", value_parser = duration_parser)] + pub connection_wait: Duration, + + /// Time to wait before retrying to connect to the InfluxDB Outlet. + #[arg(long, display_order = 900, id = "RETRY", default_value = "20s", value_parser = duration_parser)] + pub retry_wait: Duration, + #[command(flatten)] - pub tcp_inlet: InletCreateCommand, + pub timeout: OptionalTimeoutArg, + + /// Create the InfluxDB Inlet without waiting for the InfluxDB Outlet to connect + #[arg(long, default_value = "false")] + pub no_connection_wait: bool, + + /// [DEPRECATED] Use the `udp` scheme in the `--from` argument. + #[arg( + long, + visible_alias = "enable-udp-puncture", + value_name = "BOOL", + default_value_t = false, + hide = true + )] + pub udp: bool, + + /// Disable fallback to TCP. + /// TCP won't be used to transfer data between the Inlet and the Outlet. + #[arg( + long, + visible_alias = "disable-tcp-fallback", + value_name = "BOOL", + default_value_t = false, + hide = true + )] + pub no_tcp_fallback: bool, + + /// Use eBPF and RawSocket to access TCP packets instead of TCP data stream. + /// If `OCKAM_PRIVILEGED` env variable is set to 1, this argument will be `true`. + #[arg(long, env = "OCKAM_PRIVILEGED", value_parser = FalseyValueParser::default(), hide = true)] + pub privileged: bool, + + /// [DEPRECATED] Use the `tls` scheme in the `--from` argument. + #[arg(long, value_name = "BOOL", default_value_t = false, hide = true)] + pub tls: bool, + + /// Enable TLS for the InfluxDB Inlet using the provided certificate provider. + /// Requires `ockam-tls-certificate` credential attribute. + #[arg(long, value_name = "ROUTE", hide = true)] + pub tls_certificate_provider: Option, /// Share the leases among the clients or use a separate lease for each client #[arg(long, default_value = "per-client")] @@ -33,25 +156,22 @@ pub struct InfluxDBCreateCommand { } #[async_trait] -impl Command for InfluxDBCreateCommand { +impl Command for CreateCommand { const NAME: &'static str = "influxdb-inlet create"; async fn async_run(mut self, ctx: &Context, opts: CommandGlobalOpts) -> crate::Result<()> { initialize_default_node(ctx, &opts).await?; - self = self.parse_args(&opts).await?; + let cmd = self.parse_args(&opts).await?; - let mut node = BackgroundNodeClient::create(ctx, &opts.state, &self.tcp_inlet.at).await?; - self.tcp_inlet - .timeout - .timeout - .map(|t| node.set_timeout_mut(t)); + let mut node = BackgroundNodeClient::create(ctx, &opts.state, &cmd.at).await?; + cmd.timeout.timeout.map(|t| node.set_timeout_mut(t)); let inlet_status = { let pb = opts.terminal.progress_bar(); if let Some(pb) = pb.as_ref() { pb.set_message(format!( "Creating a InfluxDB Inlet at {}...\n", - color_primary(&self.tcp_inlet.from) + color_primary(&cmd.from) )); } @@ -59,22 +179,21 @@ impl Command for InfluxDBCreateCommand { let result: Reply = node .create_influxdb_inlet( ctx, - &self.tcp_inlet.from, - &self.tcp_inlet.to(), - self.tcp_inlet.alias.as_ref().expect("The `alias` argument should be set to its default value if not provided"), - &self.tcp_inlet.authorized, - &self.tcp_inlet.allow, - self.tcp_inlet.connection_wait, - !self.tcp_inlet.no_connection_wait, - &self - .tcp_inlet + cmd.from.hostname_port(), + &cmd.to(), + cmd.name.as_ref().expect("The `name` argument should be set to its default value if not provided"), + &cmd.authorized, + &cmd.allow, + cmd.connection_wait, + !cmd.no_connection_wait, + &cmd .secure_channel_identifier(&opts.state) .await?, - self.tcp_inlet.udp, - self.tcp_inlet.no_tcp_fallback, - &self.tcp_inlet.tls_certificate_provider, - self.leased_token_strategy.clone(), - self.lease_manager_route.clone(), + cmd.udp || cmd.from.is_udp(), + cmd.no_tcp_fallback, + &cmd.tls_certificate_provider, + cmd.leased_token_strategy.clone(), + cmd.lease_manager_route.clone(), ) .await?; @@ -90,49 +209,46 @@ impl Command for InfluxDBCreateCommand { }; trace!("the inlet creation returned a non-OK status: {s:?}"); - if self.tcp_inlet.retry_wait.as_millis() == 0 { - return Err(miette!("Failed to create TCP inlet"))?; + if cmd.retry_wait.as_millis() == 0 { + return Err(miette!("Failed to create InfluxDB inlet"))?; } if let Some(pb) = pb.as_ref() { pb.set_message(format!( - "Waiting for TCP Inlet {} to be available... Retrying momentarily\n", - color_primary(&self.tcp_inlet.to) + "Waiting for InfluxDB Inlet {} to be available... Retrying momentarily\n", + color_primary(&cmd.to) )); } - tokio::time::sleep(self.tcp_inlet.retry_wait).await + tokio::time::sleep(cmd.retry_wait).await } } } }; let node_name = node.node_name(); - self.tcp_inlet - .add_inlet_created_event(&opts, &node_name, &inlet_status) - .await?; let created_message = fmt_ok!( "Created a new InfluxDB Inlet in the Node {} bound to {}\n", color_primary(&node_name), - color_primary(&self.tcp_inlet.from) + color_primary(&cmd.from) ); - let plain = if self.tcp_inlet.no_connection_wait { - created_message + &fmt_log!("It will automatically connect to the TCP Outlet at {} as soon as it is available", - color_primary(&self.tcp_inlet.to) + let plain = if cmd.no_connection_wait { + created_message + &fmt_log!("It will automatically connect to the InfluxDB Outlet at {} as soon as it is available", + color_primary(&cmd.to) ) } else if inlet_status.status == ConnectionStatus::Up { created_message + &fmt_log!( - "sending traffic to the TCP Outlet at {}", - color_primary(&self.tcp_inlet.to) + "sending traffic to the InfluxDB Outlet at {}", + color_primary(&cmd.to) ) } else { fmt_warn!( - "A InfluxDB Inlet was created in the Node {} bound to {} but failed to connect to the TCP Outlet at {}\n", + "A InfluxDB Inlet was created in the Node {} bound to {} but failed to connect to the InfluxDB Outlet at {}\n", color_primary(&node_name), - color_primary(self.tcp_inlet.from.to_string()), - color_primary(&self.tcp_inlet.to) + color_primary(cmd.from.to_string()), + color_primary(&cmd.to) ) + &fmt_info!("It will retry to connect automatically") }; @@ -147,9 +263,53 @@ impl Command for InfluxDBCreateCommand { } } -impl InfluxDBCreateCommand { +impl CreateCommand { async fn parse_args(mut self, opts: &CommandGlobalOpts) -> miette::Result { - self.tcp_inlet = self.tcp_inlet.parse_args(opts).await?; + if let Some(alias) = self.alias.as_ref() { + print_warning_for_deprecated_flag_replaced( + opts, + "alias", + "the positional argument", + )?; + if self.name.is_some() { + opts.terminal.write_line( + fmt_warn!("The argument is being overridden by the --alias flag") + + &fmt_log!("Consider removing the --alias flag"), + )?; + } + self.name = Some(alias.clone()); + } else { + self.name = self.name.or_else(|| Some(random_name())); + } + + let from = resolve_peer(self.from.hostname_port()) + .await + .into_diagnostic()?; + port_is_free_guard(&from)?; + + self.to = crate::tcp::inlet::create::CreateCommand::parse_arg_to( + &opts.state, + self.to, + self.via.as_ref(), + ) + .await?; + if self.to().matches(0, &[proto::Project::CODE.into()]) && self.authorized.is_some() { + return Err(miette!( + "--authorized can not be used with project addresses" + ))?; + } + + self.tls_certificate_provider = + if let Some(tls_certificate_provider) = &self.tls_certificate_provider { + Some(tls_certificate_provider.clone()) + } else if self.tls || self.from.is_tls() { + Some(MultiAddr::from_str( + "/project/default/service/tls_certificate_provider", + )?) + } else { + None + }; + if self .lease_manager_route .as_ref() @@ -159,6 +319,22 @@ impl InfluxDBCreateCommand { "lease-manager-route argument requires leased-token-strategy=per-client" ))? }; + Ok(self) } + + pub fn to(&self) -> MultiAddr { + MultiAddr::from_str(&self.to).unwrap() + } + + pub async fn secure_channel_identifier( + &self, + state: &CliState, + ) -> miette::Result> { + if let Some(identity_name) = self.identity.as_ref() { + Ok(Some(state.get_identifier_by_name(identity_name).await?)) + } else { + Ok(None) + } + } } diff --git a/implementations/rust/ockam/ockam_command/src/influxdb/inlet/mod.rs b/implementations/rust/ockam/ockam_command/src/influxdb/inlet/mod.rs index 034d4a3617c..0aae0c319f7 100644 --- a/implementations/rust/ockam/ockam_command/src/influxdb/inlet/mod.rs +++ b/implementations/rust/ockam/ockam_command/src/influxdb/inlet/mod.rs @@ -1,6 +1,6 @@ use clap::{Args, Subcommand}; -use create::InfluxDBCreateCommand; +use create::CreateCommand; use crate::{docs, Command, CommandGlobalOpts}; @@ -22,7 +22,7 @@ pub struct InfluxDBInletCommand { #[derive(Clone, Debug, Subcommand)] pub enum InfluxDBInletSubCommand { - Create(InfluxDBCreateCommand), + Create(CreateCommand), } impl InfluxDBInletCommand { diff --git a/implementations/rust/ockam/ockam_command/src/influxdb/inlet/static/long_about.txt b/implementations/rust/ockam/ockam_command/src/influxdb/inlet/static/long_about.txt index 6bb7b41afb6..d64a547c483 100644 --- a/implementations/rust/ockam/ockam_command/src/influxdb/inlet/static/long_about.txt +++ b/implementations/rust/ockam/ockam_command/src/influxdb/inlet/static/long_about.txt @@ -1 +1 @@ -A Http inlet is a way of defining where a node should be listening for connections, and where it should forward that traffic to. It is one end (tcp-outlet being the other) of a portal, which receives Http data, attach an authorization token header to it, chunks and wraps them into Ockam Routing messages and sends them along the supplied route. +An InfluxDB Inlet is a way of defining where a node should be listening for connections, and where it should forward that traffic to. It is one end (influxdb-outlet being the other) of a portal, which receives Http data, attach an authorization token header to it, chunks and wraps them into Ockam Routing messages and sends them along the supplied route. diff --git a/implementations/rust/ockam/ockam_command/src/influxdb/outlet/create.rs b/implementations/rust/ockam/ockam_command/src/influxdb/outlet/create.rs index bc1a5752abe..970acea19ff 100644 --- a/implementations/rust/ockam/ockam_command/src/influxdb/outlet/create.rs +++ b/implementations/rust/ockam/ockam_command/src/influxdb/outlet/create.rs @@ -1,24 +1,69 @@ use crate::node::util::initialize_default_node; -use crate::tcp::outlet::create::CreateCommand as OutletCreateCommand; use crate::util::parsers::duration_parser; use crate::{Command, CommandGlobalOpts}; use async_trait::async_trait; +use clap::builder::FalseyValueParser; use clap::Args; use colorful::Colorful; use miette::miette; +use ockam::transport::SchemeHostnamePort; use ockam::{Address, Context}; +use ockam_abac::PolicyExpression; +use ockam_api::address::extract_address_value; use ockam_api::colors::color_primary; -use ockam_api::fmt_ok; use ockam_api::influxdb::portal::{InfluxDBOutletConfig, LeaseManagerConfig}; use ockam_api::influxdb::InfluxDBPortals; use ockam_api::nodes::BackgroundNodeClient; +use ockam_api::{fmt_log, fmt_ok, fmt_warn}; +use std::str::FromStr; use std::time::Duration; /// Create InfluxDB Outlets #[derive(Clone, Debug, Args)] -pub struct InfluxDBCreateCommand { - #[command(flatten)] - pub tcp_outlet: OutletCreateCommand, +pub struct CreateCommand { + /// Address of your InfluxDB Outlet, which is part of a route used in other commands. + /// This unique address identifies the InfluxDB Outlet worker on the Node on your local machine. + /// Examples are `/service/my-outlet` or `my-outlet`. + /// If not provided, `outlet` will be used, or a random address will be generated if `outlet` is taken. + /// You will need this address when creating a InfluxDB Inlet using `ockam influxdb-inlet create`. + #[arg(value_parser = extract_address_value)] + pub name: Option, + + /// Address where your InfluxDB server is running, in the form of `://:`. + /// At least the port must be provided. The default scheme is `tcp` and the default address is `127.0.0.1`. + #[arg(long, display_order = 900, id = "SOCKET_ADDRESS", value_parser = SchemeHostnamePort::from_str)] + pub to: SchemeHostnamePort, + + /// [DEPRECATED] Use the `tls` scheme in the `--from` argument. + #[arg(long, display_order = 900, id = "BOOLEAN")] + pub tls: bool, + + /// Alternative to the positional argument. + /// Address of your InfluxDB Outlet, which is part of a route used in other commands. + #[arg(long, display_order = 902, id = "OUTLET_ADDRESS", value_parser = extract_address_value)] + pub from: Option, + + /// Your InfluxDB Outlet will be created on this node. If you don't provide it, the default + /// node will be used + #[arg(long, display_order = 903, id = "NODE_NAME", value_parser = extract_address_value)] + pub at: Option, + + /// Policy expression that will be used for access control to the InfluxDB Outlet. + /// If you don't provide it, the policy set for the "tcp-outlet" resource type will be used. + /// + /// You can check the fallback policy with `ockam policy show --resource-type tcp-outlet`. + #[arg( + long, + visible_alias = "expression", + display_order = 904, + id = "POLICY_EXPRESSION" + )] + pub allow: Option, + + /// Use eBPF and RawSocket to access TCP packets instead of TCP data stream. + /// If `OCKAM_PRIVILEGED` env variable is set to 1, this argument will be `true`. + #[arg(long, env = "OCKAM_PRIVILEGED", value_parser = FalseyValueParser::default(), hide = true)] + pub privileged: bool, #[arg(long, conflicts_with("LeaseManagerConfigArgs"))] fixed_token: Option, @@ -48,15 +93,16 @@ pub struct LeaseManagerConfigArgs { } #[async_trait] -impl Command for InfluxDBCreateCommand { +impl Command for CreateCommand { const NAME: &'static str = "influxdb-outlet create"; async fn async_run(mut self, ctx: &Context, opts: CommandGlobalOpts) -> crate::Result<()> { initialize_default_node(ctx, &opts).await?; + let cmd = self.parse_args(&opts).await?; - let token_config = if let Some(t) = self.fixed_token { + let token_config = if let Some(t) = cmd.fixed_token { InfluxDBOutletConfig::OutletWithFixedToken(t) - } else if let Some(config) = self.lease_manager_config { + } else if let Some(config) = cmd.lease_manager_config { let config = config.parse_args().await?; InfluxDBOutletConfig::StartLeaseManager(LeaseManagerConfig::new( config.org_id, @@ -70,28 +116,25 @@ impl Command for InfluxDBCreateCommand { ))?; }; - let node = BackgroundNodeClient::create(ctx, &opts.state, &self.tcp_outlet.at).await?; + let node = BackgroundNodeClient::create(ctx, &opts.state, &cmd.at).await?; let outlet_status = { let pb = opts.terminal.progress_bar(); if let Some(pb) = pb.as_ref() { pb.set_message(format!( "Creating a new InfluxDB Outlet to {}...\n", - color_primary(self.tcp_outlet.to.to_string()) + color_primary(cmd.to.to_string()) )); } node.create_influxdb_outlet( ctx, - self.tcp_outlet.to.clone(), - self.tcp_outlet.tls, - self.tcp_outlet.from.clone().map(Address::from).as_ref(), - self.tcp_outlet.allow.clone(), + cmd.to.clone().into(), + cmd.tls || cmd.to.is_tls(), + cmd.name.clone().map(Address::from).as_ref(), + cmd.allow.clone(), token_config, ) .await? }; - self.tcp_outlet - .add_outlet_created_journey_event(&opts, &node.node_name(), &outlet_status) - .await?; opts.terminal .stdout() @@ -99,7 +142,7 @@ impl Command for InfluxDBCreateCommand { "Created a new InfluxDB Outlet in the Node {} at {} bound to {}\n\n", color_primary(node.node_name()), color_primary(&outlet_status.worker_addr), - color_primary(&self.tcp_outlet.to) + color_primary(&cmd.to) )) .machine(&outlet_status.worker_addr) .json_obj(&outlet_status)? @@ -108,6 +151,22 @@ impl Command for InfluxDBCreateCommand { } } +impl CreateCommand { + async fn parse_args(mut self, opts: &CommandGlobalOpts) -> miette::Result { + if let Some(from) = self.from.as_ref() { + if self.name.is_some() { + opts.terminal.write_line( + fmt_warn!("The argument is being overridden by the --from flag") + + &fmt_log!("Consider using either the argument or the --from flag"), + )?; + } + self.name = Some(from.clone()); + } + + Ok(self) + } +} + impl LeaseManagerConfigArgs { async fn parse_args(mut self) -> miette::Result { if self.org_id == "INFLUXDB_ORG_ID" { diff --git a/implementations/rust/ockam/ockam_command/src/influxdb/outlet/mod.rs b/implementations/rust/ockam/ockam_command/src/influxdb/outlet/mod.rs index e391dfd6ad9..d54f0cce49b 100644 --- a/implementations/rust/ockam/ockam_command/src/influxdb/outlet/mod.rs +++ b/implementations/rust/ockam/ockam_command/src/influxdb/outlet/mod.rs @@ -1,6 +1,6 @@ use clap::{Args, Subcommand}; -use create::InfluxDBCreateCommand; +use create::CreateCommand; use crate::{docs, Command, CommandGlobalOpts}; @@ -22,7 +22,7 @@ pub struct InfluxDBOutletCommand { #[derive(Clone, Debug, Subcommand)] pub enum InfluxDBOutletSubCommand { - Create(InfluxDBCreateCommand), + Create(CreateCommand), } impl InfluxDBOutletCommand { diff --git a/implementations/rust/ockam/ockam_command/src/influxdb/outlet/static/long_about.txt b/implementations/rust/ockam/ockam_command/src/influxdb/outlet/static/long_about.txt index ab7bad67825..f0ea9700584 100644 --- a/implementations/rust/ockam/ockam_command/src/influxdb/outlet/static/long_about.txt +++ b/implementations/rust/ockam/ockam_command/src/influxdb/outlet/static/long_about.txt @@ -1,5 +1,5 @@ -Create a InfluxDB Outlet that runs adjacent to a the InfluxDB server. The Outlet unwraps Ockam messages and delivers the http request to the server, after attaching authentication information to it. +Create an InfluxDB Outlet that runs adjacent to an InfluxDB server. The Outlet unwraps Ockam messages and delivers the http request to the server, after attaching authentication information to it. You must specify the TCP address of the server, that your Outlet should send raw TCP traffic to. You can also name your Outlet by giving it an alias. -When you create a InfluxDB Outlet, on an Ockam node, running on your local machine, it makes the TCP server available from a worker address, to the corresponding TCP Inlet (see `ockam tcp-inlet`). +When you create an InfluxDB Outlet, on an Ockam node, running on your local machine, it makes the InfluxDB server available from a worker address, to the corresponding InfluxDB Inlet (see `ockam influxfb-inlet`). diff --git a/implementations/rust/ockam/ockam_command/src/kafka/consumer/create.rs b/implementations/rust/ockam/ockam_command/src/kafka/consumer/create.rs index 30e0fd9e71d..f5cba282458 100644 --- a/implementations/rust/ockam/ockam_command/src/kafka/consumer/create.rs +++ b/implementations/rust/ockam/ockam_command/src/kafka/consumer/create.rs @@ -1,5 +1,5 @@ use clap::{command, Args}; -use ockam::transport::HostnamePort; +use ockam::transport::SchemeHostnamePort; use ockam_api::port_range::PortRange; use ockam_multiaddr::MultiAddr; @@ -22,14 +22,17 @@ pub struct CreateCommand { /// The local address of the service #[arg(long, default_value_t = kafka_inlet_default_addr())] addr: String, - /// The address where to bind and where the client will connect to alongside its port,
:. - /// In case just a port is specified, the default loopback address (127.0.0.1) will be used - #[arg(long, default_value_t = kafka_default_consumer_server(), value_parser = hostname_parser)] - bootstrap_server: HostnamePort, + + /// The address and port where the client will connect, in the format
:. + /// If only a port is specified, the default address 127.0.0.1 will be used. + #[arg(long, id = "SOCKET_ADDRESS", default_value_t = kafka_default_consumer_server(), value_parser = hostname_parser)] + bootstrap_server: SchemeHostnamePort, + /// Local port range dynamically allocated to kafka brokers, must not overlap with the /// bootstrap port #[arg(long)] brokers_port_range: Option, + /// The route to the project in ockam orchestrator, expected something like /project/ #[arg(long, default_value_t = kafka_default_project_route())] project_route: MultiAddr, @@ -39,6 +42,7 @@ impl CreateCommand { pub fn run(self, opts: CommandGlobalOpts) -> miette::Result<()> { print_warning_for_deprecated_flag_replaced(&opts, &self.name(), "kafka-inlet")?; crate::kafka::inlet::create::CreateCommand { + name: self.addr.clone(), node_opts: self.node_opts, addr: self.addr, from: self.bootstrap_server, @@ -47,8 +51,8 @@ impl CreateCommand { consumer: None, consumer_relay: None, publishing_relay: None, - avoid_publishing: false, - disable_content_encryption: false, + no_publishing: false, + no_content_encryption: false, encrypted_fields: vec![], inlet_policy_expression: None, consumer_policy_expression: None, diff --git a/implementations/rust/ockam/ockam_command/src/kafka/inlet/create.rs b/implementations/rust/ockam/ockam_command/src/kafka/inlet/create.rs index b0fed96b913..846f4e3b05c 100644 --- a/implementations/rust/ockam/ockam_command/src/kafka/inlet/create.rs +++ b/implementations/rust/ockam/ockam_command/src/kafka/inlet/create.rs @@ -1,11 +1,18 @@ use crate::kafka::kafka_default_project_route; +use crate::kafka::make_brokers_port_range; +use crate::node::util::initialize_default_node; +use crate::util::{print_warning_for_deprecated_flag_replaced, process_nodes_multiaddr}; +use crate::{ + kafka::{kafka_default_inlet_bind_address, kafka_inlet_default_addr}, + node::NodeOpts, + util::parsers::hostname_parser, + Command, CommandGlobalOpts, +}; use async_trait::async_trait; -use std::fmt::Write; - use clap::{command, Args}; use colorful::Colorful; use miette::miette; -use ockam::transport::HostnamePort; +use ockam::transport::SchemeHostnamePort; use ockam_abac::PolicyExpression; use ockam_api::colors::{color_primary, color_warn}; use ockam_api::config::lookup::InternetAddress; @@ -14,38 +21,33 @@ use ockam_api::nodes::models::services::{StartKafkaInletRequest, StartServiceReq use ockam_api::nodes::BackgroundNodeClient; use ockam_api::output::Output; use ockam_api::port_range::PortRange; -use ockam_api::{fmt_log, fmt_ok}; +use ockam_api::{fmt_log, fmt_ok, fmt_warn}; use ockam_core::api::Request; use ockam_multiaddr::MultiAddr; use ockam_node::Context; use serde::Serialize; - -use crate::kafka::make_brokers_port_range; -use crate::node::util::initialize_default_node; -use crate::util::process_nodes_multiaddr; -use crate::{ - kafka::{kafka_default_inlet_bind_address, kafka_inlet_default_addr}, - node::NodeOpts, - util::parsers::hostname_parser, - Command, CommandGlobalOpts, -}; +use std::fmt::Write; /// Create a new Kafka Inlet. /// Kafka clients v3.7.0 and earlier are supported. /// You can find the version you have with 'kafka-topics.sh --version'. #[derive(Clone, Debug, Args)] pub struct CreateCommand { + /// Assign a name to this Kafka Inlet + #[arg(default_value_t = kafka_inlet_default_addr())] + pub name: String, + #[command(flatten)] pub node_opts: NodeOpts, - /// The local address of the service + /// [DEPRECATED] Use the positional argument instead #[arg(long, default_value_t = kafka_inlet_default_addr())] pub addr: String, - /// The address where to bind and where the client will connect to alongside its port,
:. - /// In case just a port is specified, the default loopback address (127.0.0.1:4000) will be used - #[arg(long, default_value_t = kafka_default_inlet_bind_address(), value_parser = hostname_parser)] - pub from: HostnamePort, + /// The address and port where the client will connect, in the format
:. + /// If only a port is specified, the default address 127.0.0.1 will be used. + #[arg(long, id = "SOCKET_ADDRESS", default_value_t = kafka_default_inlet_bind_address(), value_parser = hostname_parser)] + pub from: SchemeHostnamePort, /// Local port range dynamically allocated to kafka brokers, must not overlap with the /// bootstrap port @@ -78,19 +80,23 @@ pub struct CreateCommand { /// Avoid publishing the consumer in the relay. /// This is useful to avoid the creation of an unused relay when the consumer is directly /// referenced by the producer. - #[arg(long, name = "avoid-publishing", conflicts_with = "publishing-relay")] - pub avoid_publishing: bool, + #[arg( + long, + visible_alias = "avoid-publishing", + conflicts_with = "publishing-relay" + )] + pub no_publishing: bool, /// Disable end-to-end kafka messages encryption between producer and consumer. /// Use it when you want a plain kafka portal, the communication itself will still be /// encrypted. #[arg( long, - name = "disable-content-encryption", + visible_alias = "disable-content-encryption", value_name = "BOOL", default_value_t = false )] - pub disable_content_encryption: bool, + pub no_content_encryption: bool, /// The fields to encrypt in the kafka messages, assuming the record is a valid JSON map. /// By default, the whole record is encrypted. @@ -130,70 +136,53 @@ impl Command for CreateCommand { async fn async_run(self, ctx: &Context, opts: CommandGlobalOpts) -> crate::Result<()> { initialize_default_node(ctx, &opts).await?; - - let brokers_port_range = self - .brokers_port_range - .unwrap_or_else(|| make_brokers_port_range(&self.from)); - - // The bootstrap port can't overlap with the brokers port range - if self.from.port() >= brokers_port_range.start() - && self.from.port() <= brokers_port_range.end() - { - return Err(miette!( - "The bootstrap port {} can't overlap with the brokers port range {}", - self.from.port(), - brokers_port_range.to_string() - )); - } - - let at_node = self.node_opts.at_node.clone(); - let addr = self.addr.clone(); - let to = process_nodes_multiaddr(&self.to, &opts.state).await?; + let cmd = self.parse_args(&opts).await?; let inlet = { let pb = opts.terminal.progress_bar(); if let Some(pb) = pb.as_ref() { pb.set_message(format!( "Creating Kafka Inlet at {}...\n", - color_primary(self.from.to_string()) + color_primary(cmd.from.to_string()) )); } - let node = BackgroundNodeClient::create(ctx, &opts.state, &at_node).await?; + let node = + BackgroundNodeClient::create(ctx, &opts.state, &cmd.node_opts.at_node).await?; let consumer_resolution; - if let Some(route) = self.consumer { - consumer_resolution = ConsumerResolution::SingleNode(route); - } else if let Some(route) = &self.consumer_relay { + if let Some(route) = &cmd.consumer { + consumer_resolution = ConsumerResolution::SingleNode(route.clone()); + } else if let Some(route) = &cmd.consumer_relay { consumer_resolution = ConsumerResolution::ViaRelay(route.clone()); } else { - consumer_resolution = ConsumerResolution::ViaRelay(to.clone()); + consumer_resolution = ConsumerResolution::ViaRelay(cmd.to.clone()); } let consumer_publishing; - if self.avoid_publishing { + if cmd.no_publishing { consumer_publishing = ConsumerPublishing::None; - } else if let Some(route) = self.publishing_relay { - consumer_publishing = ConsumerPublishing::Relay(route); - } else if let Some(route) = self.consumer_relay { - consumer_publishing = ConsumerPublishing::Relay(route); + } else if let Some(route) = &cmd.publishing_relay { + consumer_publishing = ConsumerPublishing::Relay(route.clone()); + } else if let Some(route) = &cmd.consumer_relay { + consumer_publishing = ConsumerPublishing::Relay(route.clone()); } else { - consumer_publishing = ConsumerPublishing::Relay(to.clone()); + consumer_publishing = ConsumerPublishing::Relay(cmd.to.clone()); } let payload = StartKafkaInletRequest::new( - self.from.clone(), - brokers_port_range, - to.clone(), - !self.disable_content_encryption, - self.encrypted_fields, + cmd.from.clone().into(), + cmd.brokers_port_range(), + cmd.to.clone(), + !cmd.no_content_encryption, + cmd.encrypted_fields.clone(), consumer_resolution, consumer_publishing, - self.inlet_policy_expression, - self.consumer_policy_expression, - self.producer_policy_expression, + cmd.inlet_policy_expression.clone(), + cmd.consumer_policy_expression.clone(), + cmd.producer_policy_expression.clone(), ); - let payload = StartServiceRequest::new(payload, &addr); + let payload = StartServiceRequest::new(payload, &cmd.name); let req = Request::post("/node/services/kafka_inlet").body(payload); node.tell(ctx, req) .await @@ -201,10 +190,10 @@ impl Command for CreateCommand { KafkaInletOutput { node_name: node.node_name(), - from: InternetAddress::new(&self.from.to_string()) + from: InternetAddress::new(&cmd.from.hostname_port().to_string()) .ok_or(miette!("Invalid address"))?, - brokers_port_range, - to, + brokers_port_range: cmd.brokers_port_range(), + to: cmd.to.clone(), } }; @@ -218,6 +207,47 @@ impl Command for CreateCommand { } } +impl CreateCommand { + async fn parse_args(mut self, opts: &CommandGlobalOpts) -> miette::Result { + if self.addr != kafka_inlet_default_addr() { + print_warning_for_deprecated_flag_replaced( + opts, + "addr", + "the positional argument", + )?; + if self.name != kafka_inlet_default_addr() { + opts.terminal.write_line( + fmt_warn!("The argument is being overridden by the --alias flag") + + &fmt_log!("Consider removing the --addr flag"), + )?; + } + self.name = self.addr.clone(); + } + + self.brokers_port_range = self + .brokers_port_range + .or_else(|| Some(make_brokers_port_range(&self.from))); + + // The bootstrap port can't overlap with the brokers port range + if self.from.port() >= self.brokers_port_range().start() + && self.from.port() <= self.brokers_port_range().end() + { + return Err(miette!( + "The bootstrap port {} can't overlap with the brokers port range {}", + color_primary(self.from.port()), + color_primary(self.brokers_port_range().to_string()) + )); + } + + self.to = process_nodes_multiaddr(&self.to, &opts.state).await?; + Ok(self) + } + + fn brokers_port_range(&self) -> PortRange { + self.brokers_port_range.unwrap() + } +} + #[derive(Serialize)] struct KafkaInletOutput { node_name: String, diff --git a/implementations/rust/ockam/ockam_command/src/kafka/mod.rs b/implementations/rust/ockam/ockam_command/src/kafka/mod.rs index 0a006e89e6e..644426f754e 100644 --- a/implementations/rust/ockam/ockam_command/src/kafka/mod.rs +++ b/implementations/rust/ockam/ockam_command/src/kafka/mod.rs @@ -1,4 +1,4 @@ -use ockam::transport::{HostnamePort, StaticHostnamePort}; +use ockam::transport::{HostnamePort, SchemeHostnamePort, StaticHostnamePort}; use ockam_api::nodes::service::default_address::DefaultAddress; use ockam_api::port_range::PortRange; use ockam_multiaddr::MultiAddr; @@ -33,24 +33,24 @@ fn kafka_default_project_route() -> MultiAddr { MultiAddr::from_str(KAFKA_DEFAULT_PROJECT_ROUTE).expect("Failed to parse default project route") } -fn kafka_default_outlet_server() -> HostnamePort { - KAFKA_DEFAULT_BOOTSTRAP_ADDRESS.into() +fn kafka_default_outlet_server() -> SchemeHostnamePort { + KAFKA_DEFAULT_BOOTSTRAP_ADDRESS.try_into().unwrap() } -fn kafka_default_consumer_server() -> HostnamePort { - KAFKA_DEFAULT_CONSUMER_SERVER.into() +fn kafka_default_consumer_server() -> SchemeHostnamePort { + KAFKA_DEFAULT_CONSUMER_SERVER.try_into().unwrap() } -fn kafka_default_inlet_bind_address() -> HostnamePort { - KAFKA_DEFAULT_INLET_BIND_ADDRESS.into() +fn kafka_default_inlet_bind_address() -> SchemeHostnamePort { + KAFKA_DEFAULT_INLET_BIND_ADDRESS.try_into().unwrap() } -fn kafka_default_producer_server() -> HostnamePort { - KAFKA_DEFAULT_PRODUCER_SERVER.into() +fn kafka_default_producer_server() -> SchemeHostnamePort { + KAFKA_DEFAULT_PRODUCER_SERVER.try_into().unwrap() } -pub(crate) fn make_brokers_port_range(bootstrap_server: &HostnamePort) -> PortRange { - let boostrap_server_port = bootstrap_server.port() as u32; +pub(crate) fn make_brokers_port_range>(bootstrap_server: T) -> PortRange { + let boostrap_server_port = bootstrap_server.into().port() as u32; let start = min(boostrap_server_port + 1, u16::MAX as u32) as u16; let end = min(boostrap_server_port + 100, u16::MAX as u32) as u16; // we can unwrap here because we know that range start <= range end @@ -63,13 +63,18 @@ mod tests { #[test] fn brokers_port_range() { + let address = SchemeHostnamePort::from_str("tls://127.0.0.1:8080").unwrap(); + let port_range = make_brokers_port_range(address); + assert_eq!(port_range.start(), 8081); + assert_eq!(port_range.end(), 8180); + let address = HostnamePort::from_str("127.0.0.1:8080").unwrap(); - let port_range = make_brokers_port_range(&address); + let port_range = make_brokers_port_range(address); assert_eq!(port_range.start(), 8081); assert_eq!(port_range.end(), 8180); let address = HostnamePort::from_str("127.0.0.1:65442").unwrap(); - let port_range = make_brokers_port_range(&address); + let port_range = make_brokers_port_range(address); assert_eq!(port_range.start(), 65443); assert_eq!(port_range.end(), u16::MAX); } diff --git a/implementations/rust/ockam/ockam_command/src/kafka/outlet/create.rs b/implementations/rust/ockam/ockam_command/src/kafka/outlet/create.rs index 60bd2cf9b06..17189141080 100644 --- a/implementations/rust/ockam/ockam_command/src/kafka/outlet/create.rs +++ b/implementations/rust/ockam/ockam_command/src/kafka/outlet/create.rs @@ -6,7 +6,7 @@ use miette::miette; use serde::Serialize; use std::fmt::Write; -use ockam::transport::HostnamePort; +use ockam::transport::SchemeHostnamePort; use ockam::Context; use ockam_abac::PolicyExpression; use ockam_api::colors::{color_primary, color_warn}; @@ -14,7 +14,7 @@ use ockam_api::nodes::models::services::StartKafkaOutletRequest; use ockam_api::nodes::models::services::StartServiceRequest; use ockam_api::nodes::BackgroundNodeClient; use ockam_api::output::Output; -use ockam_api::{fmt_log, fmt_ok}; +use ockam_api::{fmt_log, fmt_ok, fmt_warn}; use ockam_core::api::Request; use crate::node::util::initialize_default_node; @@ -27,18 +27,27 @@ use crate::{ /// Create a new Kafka Outlet #[derive(Clone, Debug, Args)] pub struct CreateCommand { + /// Address of your Kafka Outlet, which is part of a route used in other commands. + /// This unique address identifies the Kafka Outlet worker on the Node on your local machine. + /// Examples are `/service/my-outlet` or `my-outlet`. + /// If not provided, `/service/kafka_outlet` will be used. + /// You will need this address when creating a Kafka Inlet using `ockam kafka-inlet create`. + #[arg(default_value_t = kafka_default_outlet_addr())] + pub name: String, + #[command(flatten)] pub node_opts: NodeOpts, - /// The local address of the service - #[arg(long, default_value_t = kafka_default_outlet_addr())] - pub addr: String, + /// Alternative to the positional argument. + /// Address of your Kafka Outlet, which is part of a route used in other commands. + #[arg(long, id = "OUTLET_ADDRESS", visible_alias = "addr", default_value_t = kafka_default_outlet_addr())] + pub from: String, /// The address of the kafka bootstrap broker - #[arg(long, default_value_t = kafka_default_outlet_server())] - pub bootstrap_server: HostnamePort, + #[arg(long, visible_alias = "to", default_value_t = kafka_default_outlet_server())] + pub bootstrap_server: SchemeHostnamePort, - /// If set, the outlet will establish a TLS connection over TCP + /// [DEPRECATED] Use the `tls` scheme in the `--from` argument. #[arg(long, id = "BOOLEAN")] pub tls: bool, @@ -46,7 +55,7 @@ pub struct CreateCommand { /// If you don't provide it, the policy set for the "tcp-outlet" resource type will be used. /// /// You can check the fallback policy with `ockam policy show --resource-type tcp-outlet`. - #[arg(hide = true, long = "allow", id = "EXPRESSION")] + #[arg(long = "allow", id = "EXPRESSION")] pub policy_expression: Option, } @@ -56,32 +65,33 @@ impl Command for CreateCommand { async fn async_run(self, ctx: &Context, opts: CommandGlobalOpts) -> crate::Result<()> { initialize_default_node(ctx, &opts).await?; + let cmd = self.parse_args(&opts).await?; let outlet = { let pb = opts.terminal.progress_bar(); if let Some(pb) = pb.as_ref() { pb.set_message(format!( "Creating Kafka Outlet to bootstrap server {}...\n", - color_primary(self.bootstrap_server.to_string()) + color_primary(cmd.bootstrap_server.to_string()) )); } let payload = StartKafkaOutletRequest::new( - self.bootstrap_server.clone(), - self.tls, - self.policy_expression, + cmd.bootstrap_server.clone().into(), + cmd.tls || cmd.bootstrap_server.is_tls(), + cmd.policy_expression, ); - let payload = StartServiceRequest::new(payload, &self.addr); + let payload = StartServiceRequest::new(payload, &cmd.name); let req = Request::post("/node/services/kafka_outlet").body(payload); let node = - BackgroundNodeClient::create(ctx, &opts.state, &self.node_opts.at_node).await?; + BackgroundNodeClient::create(ctx, &opts.state, &cmd.node_opts.at_node).await?; node.tell(ctx, req) .await .map_err(|e| miette!("Failed to start Kafka Outlet: {e}"))?; KafkaOutletOutput { node_name: node.node_name(), - bootstrap_server: self.bootstrap_server.to_string(), + bootstrap_server: cmd.bootstrap_server.to_string(), } }; @@ -95,6 +105,24 @@ impl Command for CreateCommand { } } +impl CreateCommand { + async fn parse_args(mut self, opts: &CommandGlobalOpts) -> miette::Result { + if self.from != kafka_default_outlet_addr() { + if self.name != kafka_default_outlet_addr() { + opts.terminal.write_line( + fmt_warn!("The argument is being overridden by the --from/--addr flag") + + &fmt_log!( + "Consider using either the argument or the --from/--addr flag" + ), + )?; + } + self.name = self.from.clone(); + } + + Ok(self) + } +} + #[derive(Serialize)] struct KafkaOutletOutput { node_name: String, diff --git a/implementations/rust/ockam/ockam_command/src/kafka/producer/create.rs b/implementations/rust/ockam/ockam_command/src/kafka/producer/create.rs index 1041705dcb0..738545d625b 100644 --- a/implementations/rust/ockam/ockam_command/src/kafka/producer/create.rs +++ b/implementations/rust/ockam/ockam_command/src/kafka/producer/create.rs @@ -1,5 +1,5 @@ use clap::{command, Args}; -use ockam::transport::HostnamePort; +use ockam::transport::SchemeHostnamePort; use ockam_api::port_range::PortRange; use ockam_multiaddr::MultiAddr; @@ -18,17 +18,21 @@ use crate::{ pub struct CreateCommand { #[command(flatten)] node_opts: NodeOpts, + /// The local address of the service #[arg(long, default_value_t = kafka_inlet_default_addr())] addr: String, - /// The address where to bind and where the client will connect to alongside its port,
:. - /// In case just a port is specified, the default loopback address (127.0.0.1) will be used - #[arg(long, default_value_t = kafka_default_producer_server(), value_parser = hostname_parser)] - bootstrap_server: HostnamePort, + + /// The address and port where the client will connect, in the format
:. + /// If only a port is specified, the default address 127.0.0.1 will be used. + #[arg(long, id = "SOCKET_ADDRESS", default_value_t = kafka_default_producer_server(), value_parser = hostname_parser)] + bootstrap_server: SchemeHostnamePort, + /// Local port range dynamically allocated to kafka brokers, must not overlap with the /// bootstrap port #[arg(long)] brokers_port_range: Option, + /// The route to the project in ockam orchestrator, expected something like /project/ #[arg(long, default_value_t = kafka_default_project_route())] project_route: MultiAddr, @@ -38,6 +42,7 @@ impl CreateCommand { pub fn run(self, opts: CommandGlobalOpts) -> miette::Result<()> { print_warning_for_deprecated_flag_replaced(&opts, &self.name(), "kafka-inlet")?; crate::kafka::inlet::create::CreateCommand { + name: self.addr.clone(), node_opts: self.node_opts, addr: self.addr, from: self.bootstrap_server, @@ -46,8 +51,8 @@ impl CreateCommand { consumer: None, consumer_relay: None, publishing_relay: None, - avoid_publishing: false, - disable_content_encryption: false, + no_publishing: false, + no_content_encryption: false, encrypted_fields: vec![], inlet_policy_expression: None, consumer_policy_expression: None, diff --git a/implementations/rust/ockam/ockam_command/src/run/parser/resource/influxdb_inlets.rs b/implementations/rust/ockam/ockam_command/src/run/parser/resource/influxdb_inlets.rs index e7125700213..2dcf847be0f 100644 --- a/implementations/rust/ockam/ockam_command/src/run/parser/resource/influxdb_inlets.rs +++ b/implementations/rust/ockam/ockam_command/src/run/parser/resource/influxdb_inlets.rs @@ -4,7 +4,7 @@ use serde::{Deserialize, Serialize}; use crate::run::parser::building_blocks::{ArgsToCommands, ResourceNameOrMap}; -use crate::influxdb::inlet::create::InfluxDBCreateCommand; +use crate::influxdb::inlet::create::CreateCommand; use crate::run::parser::resource::utils::parse_cmd_from_args; use crate::{influxdb::inlet, Command, OckamSubcommand}; @@ -15,31 +15,29 @@ pub struct InfluxDBInlets { } impl InfluxDBInlets { - fn get_subcommand(args: &[String]) -> Result { - if let OckamSubcommand::InfluxDBInlet(cmd) = - parse_cmd_from_args(InfluxDBCreateCommand::NAME, args)? + fn get_subcommand(args: &[String]) -> Result { + if let OckamSubcommand::InfluxDBInlet(cmd) = parse_cmd_from_args(CreateCommand::NAME, args)? { let inlet::InfluxDBInletSubCommand::Create(c) = cmd.subcommand; return Ok(c); } Err(miette!(format!( "Failed to parse {} command", - color_primary(InfluxDBCreateCommand::NAME) + color_primary(CreateCommand::NAME) ))) } pub fn into_parsed_commands( self, default_node_name: Option<&String>, - ) -> Result> { + ) -> Result> { match self.influxdb_inlets { Some(c) => { - let mut cmds = - c.into_commands_with_name_arg(Self::get_subcommand, Some("alias"))?; + let mut cmds = c.into_commands(Self::get_subcommand)?; if let Some(node_name) = default_node_name.as_ref() { for cmd in cmds.iter_mut() { - if cmd.tcp_inlet.at.is_none() { - cmd.tcp_inlet.at = Some(node_name.to_string()) + if cmd.at.is_none() { + cmd.at = Some(node_name.to_string()) } } } @@ -53,7 +51,7 @@ impl InfluxDBInlets { #[cfg(test)] mod tests { use super::*; - use ockam::transport::HostnamePort; + use ockam::transport::SchemeHostnamePort; #[test] fn tcp_inlet_config() { @@ -65,7 +63,9 @@ mod tests { lease-manager-route: /service/test ti2: from: '6061' - alias: my_inlet + lease-manager-route: /service/test + ti3: + from: tls://localhost:6062 lease-manager-route: /service/test "#; let parsed: InfluxDBInlets = serde_yaml::from_str(named).unwrap(); @@ -73,13 +73,25 @@ mod tests { let cmds = parsed .into_parsed_commands(Some(&default_node_name)) .unwrap(); - assert_eq!(cmds.len(), 2); - assert_eq!(cmds[0].tcp_inlet.alias.as_ref().unwrap(), "ti1"); - assert_eq!(cmds[0].tcp_inlet.from, HostnamePort::new("127.0.0.1", 6060)); - assert_eq!(cmds[0].tcp_inlet.at.as_ref().unwrap(), "n"); - assert_eq!(cmds[1].tcp_inlet.alias.as_ref().unwrap(), "my_inlet"); - assert_eq!(cmds[1].tcp_inlet.from, HostnamePort::new("127.0.0.1", 6061)); - assert_eq!(cmds[1].tcp_inlet.at.as_ref(), Some(&default_node_name)); + assert_eq!(cmds.len(), 3); + assert_eq!(cmds[0].name.as_ref().unwrap(), "ti1"); + assert_eq!( + cmds[0].from, + SchemeHostnamePort::new("tcp", "127.0.0.1", 6060).unwrap() + ); + assert_eq!(cmds[0].at.as_ref().unwrap(), "n"); + assert_eq!(cmds[1].name.as_ref().unwrap(), "ti2"); + assert_eq!( + cmds[1].from, + SchemeHostnamePort::new("tcp", "127.0.0.1", 6061).unwrap() + ); + assert_eq!(cmds[1].at.as_ref(), Some(&default_node_name)); + assert_eq!(cmds[2].name.as_ref().unwrap(), "ti3"); + assert_eq!( + cmds[2].from, + SchemeHostnamePort::new("tls", "localhost", 6062).unwrap() + ); + assert_eq!(cmds[2].at.as_ref(), Some(&default_node_name)); let unnamed = r#" influxdb_inlets: @@ -93,9 +105,15 @@ mod tests { .into_parsed_commands(Some(&default_node_name)) .unwrap(); assert_eq!(cmds.len(), 2); - assert_eq!(cmds[0].tcp_inlet.from, HostnamePort::new("127.0.0.1", 6060)); - assert_eq!(cmds[0].tcp_inlet.at.as_ref().unwrap(), "n"); - assert_eq!(cmds[1].tcp_inlet.from, HostnamePort::new("127.0.0.1", 6061)); - assert_eq!(cmds[1].tcp_inlet.at.as_ref(), Some(&default_node_name)); + assert_eq!( + cmds[0].from, + SchemeHostnamePort::new("tcp", "127.0.0.1", 6060).unwrap() + ); + assert_eq!(cmds[0].at.as_ref().unwrap(), "n"); + assert_eq!( + cmds[1].from, + SchemeHostnamePort::new("tcp", "127.0.0.1", 6061).unwrap() + ); + assert_eq!(cmds[1].at.as_ref(), Some(&default_node_name)); } } diff --git a/implementations/rust/ockam/ockam_command/src/run/parser/resource/influxdb_outlets.rs b/implementations/rust/ockam/ockam_command/src/run/parser/resource/influxdb_outlets.rs index d73d71e1e61..fe1b63cc38c 100644 --- a/implementations/rust/ockam/ockam_command/src/run/parser/resource/influxdb_outlets.rs +++ b/implementations/rust/ockam/ockam_command/src/run/parser/resource/influxdb_outlets.rs @@ -2,9 +2,8 @@ use miette::{miette, Result}; use ockam_api::colors::color_primary; use serde::{Deserialize, Serialize}; +use crate::influxdb::outlet::create::CreateCommand; use crate::run::parser::building_blocks::{ArgsToCommands, ResourceNameOrMap}; - -use crate::influxdb::outlet::create::InfluxDBCreateCommand; use crate::run::parser::resource::utils::parse_cmd_from_args; use crate::{influxdb::outlet, Command, OckamSubcommand}; @@ -15,30 +14,30 @@ pub struct InfluxDBOutlets { } impl InfluxDBOutlets { - fn get_subcommand(args: &[String]) -> Result { + fn get_subcommand(args: &[String]) -> Result { if let OckamSubcommand::InfluxDBOutlet(cmd) = - parse_cmd_from_args(InfluxDBCreateCommand::NAME, args)? + parse_cmd_from_args(CreateCommand::NAME, args)? { let outlet::InfluxDBOutletSubCommand::Create(c) = cmd.subcommand; return Ok(c); } Err(miette!(format!( "Failed to parse {} command", - color_primary(InfluxDBCreateCommand::NAME) + color_primary(CreateCommand::NAME) ))) } pub fn into_parsed_commands( self, default_node_name: Option<&String>, - ) -> Result> { + ) -> Result> { match self.influxdb_outlets { Some(c) => { - let mut cmds = c.into_commands_with_name_arg(Self::get_subcommand, Some("from"))?; + let mut cmds = c.into_commands(Self::get_subcommand)?; if let Some(node_name) = default_node_name { for cmd in cmds.iter_mut() { - if cmd.tcp_outlet.at.is_none() { - cmd.tcp_outlet.at = Some(node_name.to_string()) + if cmd.at.is_none() { + cmd.at = Some(node_name.to_string()) } } } @@ -52,7 +51,7 @@ impl InfluxDBOutlets { #[cfg(test)] mod tests { use super::*; - use ockam::transport::HostnamePort; + use ockam::transport::SchemeHostnamePort; #[test] fn tcp_outlet_config() { @@ -63,13 +62,25 @@ mod tests { from: my_outlet leased-token-permissions: "" leased-token-expires-in: 1h + + ti2: + to: 127.0.0.1:6061 + leased-token-permissions: "" + leased-token-expires-in: 1h "#; let parsed: InfluxDBOutlets = serde_yaml::from_str(named).unwrap(); let default_node_name = "n1".to_string(); let cmds = parsed .into_parsed_commands(Some(&default_node_name)) .unwrap(); - assert_eq!(cmds.len(), 1); - assert_eq!(cmds[0].tcp_outlet.to, HostnamePort::new("127.0.0.1", 6060)); + assert_eq!(cmds.len(), 2); + assert_eq!(cmds[0].name, Some("ti1".to_string())); + assert_eq!(cmds[0].from.clone().unwrap(), "my_outlet"); + assert_eq!( + cmds[0].to, + SchemeHostnamePort::new("tcp", "127.0.0.1", 6060).unwrap() + ); + assert_eq!(cmds[1].name, Some("ti2".to_string())); + assert!(cmds[1].from.is_none()); } } diff --git a/implementations/rust/ockam/ockam_command/src/run/parser/resource/kafka_inlet.rs b/implementations/rust/ockam/ockam_command/src/run/parser/resource/kafka_inlet.rs index 4570d34c674..b28ad59e814 100644 --- a/implementations/rust/ockam/ockam_command/src/run/parser/resource/kafka_inlet.rs +++ b/implementations/rust/ockam/ockam_command/src/run/parser/resource/kafka_inlet.rs @@ -33,7 +33,7 @@ impl KafkaInlet { ) -> Result> { match self.kafka_inlet { Some(c) => { - let mut cmds = c.into_commands_with_name_arg(Self::get_subcommand, Some("addr"))?; + let mut cmds = c.into_commands(Self::get_subcommand)?; if let Some(node_name) = default_node_name { for cmd in cmds.iter_mut() { if cmd.node_opts.at_node.is_none() { @@ -51,7 +51,8 @@ impl KafkaInlet { #[cfg(test)] mod tests { use super::*; - use ockam::transport::HostnamePort; + use ockam::transport::SchemeHostnamePort; + use ockam_core::env::FromString; use ockam_multiaddr::MultiAddr; @@ -74,7 +75,10 @@ mod tests { .into_parsed_commands(Some(&default_node_name)) .unwrap(); assert_eq!(cmds.len(), 1); - assert_eq!(cmds[0].from, HostnamePort::new("127.0.0.1", 9092)); + assert_eq!( + cmds[0].from, + SchemeHostnamePort::new("tcp", "127.0.0.1", 9092).unwrap() + ); assert_eq!( &cmds[0].to, &MultiAddr::from_string("/project/default").unwrap(), @@ -88,7 +92,7 @@ mod tests { &MultiAddr::from_string("/ip4/192.168.1.2/tcp/4000").unwrap(), ); assert_eq!(cmds[0].node_opts.at_node, Some("node_name".to_string())); - assert!(!cmds[0].avoid_publishing); + assert!(!cmds[0].no_publishing); assert_eq!( cmds[0].encrypted_fields, @@ -106,12 +110,12 @@ mod tests { .into_parsed_commands(Some(&default_node_name)) .unwrap(); assert_eq!(cmds.len(), 1); - assert_eq!(cmds[0].addr, "ki"); + assert_eq!(cmds[0].name, "ki"); assert_eq!( cmds[0].consumer.as_ref().unwrap(), &MultiAddr::from_string("/dnsaddr/kafka-outlet.local/tcp/5000").unwrap(), ); - assert!(cmds[0].avoid_publishing); + assert!(cmds[0].no_publishing); assert_eq!(cmds[0].node_opts.at_node, Some(default_node_name.clone())); let list = r#" @@ -128,7 +132,7 @@ mod tests { cmds[0].consumer.as_ref().unwrap(), &MultiAddr::from_string("/dnsaddr/kafka-outlet.local/tcp/5000").unwrap(), ); - assert!(cmds[0].avoid_publishing); + assert!(cmds[0].no_publishing); assert_eq!(cmds[0].node_opts.at_node, Some(default_node_name)); } } diff --git a/implementations/rust/ockam/ockam_command/src/run/parser/resource/kafka_outlet.rs b/implementations/rust/ockam/ockam_command/src/run/parser/resource/kafka_outlet.rs index 113fe30b3ee..5499dabc38b 100644 --- a/implementations/rust/ockam/ockam_command/src/run/parser/resource/kafka_outlet.rs +++ b/implementations/rust/ockam/ockam_command/src/run/parser/resource/kafka_outlet.rs @@ -35,7 +35,7 @@ impl KafkaOutlet { ) -> Result> { match self.kafka_outlet { Some(c) => { - let mut cmds = c.into_commands_with_name_arg(Self::get_subcommand, Some("addr"))?; + let mut cmds = c.into_commands(Self::get_subcommand)?; if let Some(node_name) = default_node_name { for cmd in cmds.iter_mut() { if cmd.node_opts.at_node.is_none() { @@ -53,7 +53,7 @@ impl KafkaOutlet { #[cfg(test)] mod tests { use super::*; - use ockam::transport::HostnamePort; + use ockam::transport::SchemeHostnamePort; #[test] fn kafka_outlet_config() { @@ -70,7 +70,7 @@ mod tests { assert_eq!(cmds.len(), 1); assert_eq!( cmds[0].bootstrap_server, - HostnamePort::new("192.168.0.100", 9092), + SchemeHostnamePort::new("tcp", "192.168.0.100", 9092).unwrap(), ); assert_eq!(cmds[0].node_opts.at_node.as_ref().unwrap(), "node_name"); @@ -84,7 +84,7 @@ mod tests { let cmds = parsed .into_parsed_commands(Some(&default_node_name)) .unwrap(); - assert_eq!(cmds[0].addr, "ko".to_string()); + assert_eq!(cmds[0].name, "ko".to_string()); assert_eq!(cmds[0].node_opts.at_node.as_ref(), Some(&default_node_name)); // check if the default node name is used when the configuration does not specify it diff --git a/implementations/rust/ockam/ockam_command/src/run/parser/resource/tcp_inlets.rs b/implementations/rust/ockam/ockam_command/src/run/parser/resource/tcp_inlets.rs index bb1b184a03e..68cc2250242 100644 --- a/implementations/rust/ockam/ockam_command/src/run/parser/resource/tcp_inlets.rs +++ b/implementations/rust/ockam/ockam_command/src/run/parser/resource/tcp_inlets.rs @@ -33,8 +33,7 @@ impl TcpInlets { ) -> Result> { match self.tcp_inlets { Some(c) => { - let mut cmds = - c.into_commands_with_name_arg(Self::get_subcommand, Some("alias"))?; + let mut cmds = c.into_commands(Self::get_subcommand)?; if let Some(node_name) = default_node_name.as_ref() { for cmd in cmds.iter_mut() { if cmd.at.is_none() { @@ -52,7 +51,7 @@ impl TcpInlets { #[cfg(test)] mod tests { use super::*; - use ockam::transport::HostnamePort; + use ockam::transport::SchemeHostnamePort; #[test] fn tcp_inlet_config() { @@ -63,7 +62,6 @@ mod tests { at: n ti2: from: '6061' - alias: my_inlet "#; let parsed: TcpInlets = serde_yaml::from_str(named).unwrap(); let default_node_name = "n1".to_string(); @@ -71,11 +69,17 @@ mod tests { .into_parsed_commands(Some(&default_node_name)) .unwrap(); assert_eq!(cmds.len(), 2); - assert_eq!(cmds[0].alias.as_ref().unwrap(), "ti1"); - assert_eq!(cmds[0].from, HostnamePort::new("127.0.0.1", 6060)); + assert_eq!(cmds[0].name.as_ref().unwrap(), "ti1"); + assert_eq!( + cmds[0].from, + SchemeHostnamePort::new("tcp", "127.0.0.1", 6060).unwrap() + ); assert_eq!(cmds[0].at.as_ref().unwrap(), "n"); - assert_eq!(cmds[1].alias.as_ref().unwrap(), "my_inlet"); - assert_eq!(cmds[1].from, HostnamePort::new("127.0.0.1", 6061)); + assert_eq!(cmds[1].name.as_ref().unwrap(), "ti2"); + assert_eq!( + cmds[1].from, + SchemeHostnamePort::new("tcp", "127.0.0.1", 6061).unwrap() + ); assert_eq!(cmds[1].at.as_ref(), Some(&default_node_name)); let unnamed = r#" @@ -89,9 +93,15 @@ mod tests { .into_parsed_commands(Some(&default_node_name)) .unwrap(); assert_eq!(cmds.len(), 2); - assert_eq!(cmds[0].from, HostnamePort::new("127.0.0.1", 6060)); + assert_eq!( + cmds[0].from, + SchemeHostnamePort::new("tcp", "127.0.0.1", 6060).unwrap() + ); assert_eq!(cmds[0].at.as_ref().unwrap(), "n"); - assert_eq!(cmds[1].from, HostnamePort::new("127.0.0.1", 6061)); + assert_eq!( + cmds[1].from, + SchemeHostnamePort::new("tcp", "127.0.0.1", 6061).unwrap() + ); assert_eq!(cmds[1].at.as_ref(), Some(&default_node_name)); } } diff --git a/implementations/rust/ockam/ockam_command/src/run/parser/resource/tcp_outlets.rs b/implementations/rust/ockam/ockam_command/src/run/parser/resource/tcp_outlets.rs index d54dbe2be36..4c583b43cd9 100644 --- a/implementations/rust/ockam/ockam_command/src/run/parser/resource/tcp_outlets.rs +++ b/implementations/rust/ockam/ockam_command/src/run/parser/resource/tcp_outlets.rs @@ -34,7 +34,7 @@ impl TcpOutlets { ) -> Result> { match self.tcp_outlets { Some(c) => { - let mut cmds = c.into_commands_with_name_arg(Self::get_subcommand, Some("from"))?; + let mut cmds = c.into_commands(Self::get_subcommand)?; if let Some(node_name) = default_node_name { for cmd in cmds.iter_mut() { if cmd.at.is_none() { @@ -52,7 +52,8 @@ impl TcpOutlets { #[cfg(test)] mod tests { use super::*; - use ockam::transport::HostnamePort; + use ockam::transport::SchemeHostnamePort; + use std::str::FromStr; #[test] fn tcp_outlet_config() { @@ -62,7 +63,7 @@ mod tests { to: 6060 at: n to2: - to: 6061 + to: tls://127.0.0.1:6061 from: my_outlet "#; let parsed: TcpOutlets = serde_yaml::from_str(config).unwrap(); @@ -71,11 +72,19 @@ mod tests { .into_parsed_commands(Some(&default_node_name)) .unwrap(); assert_eq!(cmds.len(), 2); - assert_eq!(cmds[0].from.clone().unwrap(), "to1"); - assert_eq!(cmds[0].to, HostnamePort::new("127.0.0.1", 6060)); + assert_eq!(cmds[0].name.clone().unwrap(), "to1"); + assert!(cmds[0].from.is_none()); + assert_eq!( + cmds[0].to, + SchemeHostnamePort::from_str("tcp://127.0.0.1:6060").unwrap() + ); assert_eq!(cmds[0].at.as_ref().unwrap(), "n"); + assert_eq!(cmds[1].name.clone().unwrap(), "to2"); assert_eq!(cmds[1].from.clone().unwrap(), "my_outlet"); - assert_eq!(cmds[1].to, HostnamePort::new("127.0.0.1", 6061)); + assert_eq!( + cmds[1].to, + SchemeHostnamePort::from_str("tls://127.0.0.1:6061").unwrap() + ); assert_eq!(cmds[1].at.as_ref(), Some(&default_node_name)); } } diff --git a/implementations/rust/ockam/ockam_command/src/sidecar/secure_relay_inlet.rs b/implementations/rust/ockam/ockam_command/src/sidecar/secure_relay_inlet.rs index 83175753530..75ff53afac0 100644 --- a/implementations/rust/ockam/ockam_command/src/sidecar/secure_relay_inlet.rs +++ b/implementations/rust/ockam/ockam_command/src/sidecar/secure_relay_inlet.rs @@ -1,17 +1,15 @@ +use crate::run::Config; +use crate::tcp::inlet::create::tcp_inlet_default_from_addr; +use crate::util::async_cmd; +use crate::util::parsers::hostname_parser; +use crate::{docs, CommandGlobalOpts}; use clap::Args; use colorful::Colorful; use indoc::formatdoc; -use ockam::transport::HostnamePort; +use ockam::transport::SchemeHostnamePort; use ockam_api::fmt_info; - -use crate::{docs, CommandGlobalOpts}; use ockam_node::Context; -use crate::run::Config; -use crate::tcp::inlet::create::default_from_addr; -use crate::util::async_cmd; -use crate::util::parsers::hostname_parser; - const LONG_ABOUT: &str = include_str!("./static/secure_relay_inlet/long_about.txt"); const AFTER_LONG_HELP: &str = include_str!("./static/secure_relay_inlet/after_long_help.txt"); @@ -26,9 +24,9 @@ pub struct SecureRelayInlet { #[arg(value_name = "SERVICE NAME")] pub service_name: String, - /// Address on which to accept tcp connections. - #[arg(long, display_order = 900, id = "SOCKET_ADDRESS", default_value_t = default_from_addr(), value_parser = hostname_parser)] - from: HostnamePort, + /// Address on which to accept tcp connections, in the format
: + #[arg(long, id = "SOCKET_ADDRESS", display_order = 900, id = "SOCKET_ADDRESS", default_value_t = tcp_inlet_default_from_addr(), value_parser = hostname_parser)] + from: SchemeHostnamePort, /// Just print the recipe and exit #[arg(long)] @@ -123,6 +121,7 @@ impl SecureRelayInlet { #[cfg(test)] mod tests { use super::*; + use ockam::transport::SchemeHostnamePort; use ockam_api::cli_state::ExportedEnrollmentTicket; #[test] @@ -132,7 +131,7 @@ mod tests { let cmd = SecureRelayInlet { service_name: "service_name".to_string(), - from: HostnamePort::new("127.0.0.1", 8080), + from: SchemeHostnamePort::new("tcp", "127.0.0.1", 8080).unwrap(), dry_run: false, enroll: Enroll { enroll_ticket: Some(enrollment_ticket_encoded), diff --git a/implementations/rust/ockam/ockam_command/src/sidecar/secure_relay_outlet.rs b/implementations/rust/ockam/ockam_command/src/sidecar/secure_relay_outlet.rs index ff3fcb8d6d9..aed8f703670 100644 --- a/implementations/rust/ockam/ockam_command/src/sidecar/secure_relay_outlet.rs +++ b/implementations/rust/ockam/ockam_command/src/sidecar/secure_relay_outlet.rs @@ -1,16 +1,14 @@ +use crate::run::Config; +use crate::util::async_cmd; +use crate::util::parsers::hostname_parser; +use crate::{docs, CommandGlobalOpts}; use clap::Args; use colorful::Colorful; use indoc::formatdoc; -use ockam::transport::HostnamePort; +use ockam::transport::SchemeHostnamePort; use ockam_api::fmt_info; - -use crate::{docs, CommandGlobalOpts}; use ockam_node::Context; -use crate::run::Config; -use crate::util::async_cmd; -use crate::util::parsers::hostname_parser; - const LONG_ABOUT: &str = include_str!("./static/secure_relay_outlet/long_about.txt"); const AFTER_LONG_HELP: &str = include_str!("./static/secure_relay_outlet/after_long_help.txt"); @@ -25,9 +23,9 @@ pub struct SecureRelayOutlet { #[arg(value_name = "SERVICE NAME")] pub service_name: String, - /// TCP address to send raw tcp traffic. + /// TCP address to send raw tcp traffic, in the format
: #[arg(long, display_order = 902, id = "SOCKET_ADDRESS", value_parser = hostname_parser)] - to: HostnamePort, + to: SchemeHostnamePort, /// Just print the recipe and exit #[arg(long)] @@ -125,6 +123,7 @@ impl SecureRelayOutlet { #[cfg(test)] mod tests { use super::*; + use ockam::transport::SchemeHostnamePort; use ockam_api::cli_state::ExportedEnrollmentTicket; #[test] @@ -134,7 +133,7 @@ mod tests { let cmd = SecureRelayOutlet { service_name: "service_name".to_string(), - to: HostnamePort::new("127.0.0.1", 8080), + to: SchemeHostnamePort::new("tcp", "127.0.0.1", 8080).unwrap(), dry_run: false, enroll: Enroll { enroll_ticket: Some(enrollment_ticket_encoded), diff --git a/implementations/rust/ockam/ockam_command/src/tcp/inlet/create.rs b/implementations/rust/ockam/ockam_command/src/tcp/inlet/create.rs index a7548a08da0..1320e6d6230 100644 --- a/implementations/rust/ockam/ockam_command/src/tcp/inlet/create.rs +++ b/implementations/rust/ockam/ockam_command/src/tcp/inlet/create.rs @@ -1,20 +1,19 @@ -use std::collections::HashMap; -use std::str::FromStr; -use std::time::Duration; - +use crate::node::util::initialize_default_node; +use crate::shared_args::OptionalTimeoutArg; +use crate::tcp::util::alias_parser; +use crate::util::parsers::duration_parser; +use crate::util::parsers::hostname_parser; +use crate::util::{ + port_is_free_guard, print_warning_for_deprecated_flag_replaced, process_nodes_multiaddr, +}; +use crate::{docs, Command, CommandGlobalOpts, Error}; use async_trait::async_trait; use clap::builder::FalseyValueParser; use clap::Args; use colorful::Colorful; use miette::{miette, IntoDiagnostic}; -use tracing::trace; - -use crate::node::util::initialize_default_node; -use crate::shared_args::OptionalTimeoutArg; -use crate::tcp::util::alias_parser; -use crate::{docs, Command, CommandGlobalOpts, Error}; use ockam::identity::Identifier; -use ockam::transport::HostnamePort; +use ockam::transport::SchemeHostnamePort; use ockam::Context; use ockam_abac::PolicyExpression; use ockam_api::address::extract_address_value; @@ -32,10 +31,10 @@ use ockam_core::api::{Reply, Status}; use ockam_multiaddr::proto; use ockam_multiaddr::{MultiAddr, Protocol as _}; use ockam_node::compat::asynchronous::resolve_peer; - -use crate::util::parsers::duration_parser; -use crate::util::parsers::hostname_parser; -use crate::util::{port_is_free_guard, process_nodes_multiaddr}; +use std::collections::HashMap; +use std::str::FromStr; +use std::time::Duration; +use tracing::trace; const AFTER_LONG_HELP: &str = include_str!("./static/create/after_long_help.txt"); @@ -43,13 +42,23 @@ const AFTER_LONG_HELP: &str = include_str!("./static/create/after_long_help.txt" #[derive(Clone, Debug, Args)] #[command(after_long_help = docs::after_help(AFTER_LONG_HELP))] pub struct CreateCommand { + /// Assign a name to this TCP Inlet + #[arg(id = "NAME", value_parser = alias_parser)] + pub name: Option, + /// Node on which to start the TCP Inlet. #[arg(long, display_order = 900, id = "NODE_NAME", value_parser = extract_address_value)] pub at: Option, - /// Address on which to accept TCP connections. - #[arg(long, display_order = 900, id = "SOCKET_ADDRESS", hide_default_value = true, default_value_t = default_from_addr(), value_parser = hostname_parser)] - pub from: HostnamePort, + /// Address on which to accept InfluxDB connections, in the format ://
:. + /// At least the port must be provided. The default scheme is `tcp` and the default address is `127.0.0.1`. + /// If the argument is not set, a random port will be used on the default address. + /// + /// To enable TLS, the `ockam-tls-certificate` credential attribute is required. + /// It will use the default project TLS certificate provider `/project/default/service/tls_certificate_provider`. + /// To specify a different certificate provider, use `--tls-certificate-provider`. + #[arg(long, display_order = 900, id = "SOCKET_ADDRESS", hide_default_value = true, default_value_t = tcp_inlet_default_from_addr(), value_parser = hostname_parser)] + pub from: SchemeHostnamePort, /// Route to a TCP Outlet or the name of the TCP Outlet service you want to connect to. /// @@ -60,7 +69,7 @@ pub struct CreateCommand { /// or just the name of the service as `outlet` or `/service/outlet`. /// If you are passing just the service name, consider using `--via` to specify the /// relay name (e.g. `ockam tcp-inlet create --to outlet --via myrelay`). - #[arg(long, display_order = 900, id = "ROUTE", default_value_t = default_to_addr())] + #[arg(long, display_order = 900, id = "ROUTE", default_value_t = tcp_inlet_default_to_addr())] pub to: String, /// Name of the relay that this TCP Inlet will use to connect to the TCP Outlet. @@ -79,7 +88,7 @@ pub struct CreateCommand { #[arg(long, name = "AUTHORIZED", display_order = 900)] pub authorized: Option, - /// Assign a name to this TCP Inlet. + /// [DEPRECATED] Use the positional argument instead #[arg(long, display_order = 900, id = "ALIAS", value_parser = alias_parser)] pub alias: Option, @@ -88,7 +97,6 @@ pub struct CreateCommand { /// /// You can check the fallback policy with `ockam policy show --resource-type tcp-inlet`. #[arg( - hide = true, long, visible_alias = "expression", display_order = 900, @@ -111,7 +119,7 @@ pub struct CreateCommand { #[arg(long, default_value = "false")] pub no_connection_wait: bool, - /// Enable UDP NAT puncture. + /// [DEPRECATED] Use the `udp` scheme in the `--from` argument. #[arg( long, visible_alias = "enable-udp-puncture", @@ -137,24 +145,22 @@ pub struct CreateCommand { #[arg(long, env = "OCKAM_PRIVILEGED", value_parser = FalseyValueParser::default(), hide = true)] pub privileged: bool, + /// [DEPRECATED] Use the `tls` scheme in the `--from` argument. #[arg(long, value_name = "BOOL", default_value_t = false, hide = true)] - /// Enable TLS for the TCP Inlet. - /// Uses the default project TLS certificate provider, `/project/default/service/tls_certificate_provider`. - /// To specify a different certificate provider, use `--tls-certificate-provider`. - /// Requires `ockam-tls-certificate` credential attribute. pub tls: bool, #[arg(long, value_name = "ROUTE", hide = true)] /// Enable TLS for the TCP Inlet using the provided certificate provider. /// Requires `ockam-tls-certificate` credential attribute. + #[arg(long, value_name = "ROUTE", hide = true)] pub tls_certificate_provider: Option, } -pub(crate) fn default_from_addr() -> HostnamePort { - HostnamePort::new("127.0.0.1", 0) +pub(crate) fn tcp_inlet_default_from_addr() -> SchemeHostnamePort { + SchemeHostnamePort::from_str("127.0.0.1:0").unwrap() } -fn default_to_addr() -> String { +pub(crate) fn tcp_inlet_default_to_addr() -> String { "/project//service/forward_to_/secure/api/service/".to_string() } @@ -164,9 +170,6 @@ impl Command for CreateCommand { async fn async_run(self, ctx: &Context, opts: CommandGlobalOpts) -> crate::Result<()> { initialize_default_node(ctx, &opts).await?; - - let privileged = self.privileged; - let cmd = self.parse_args(&opts).await?; let mut node = BackgroundNodeClient::create(ctx, &opts.state, &cmd.at).await?; @@ -185,15 +188,15 @@ impl Command for CreateCommand { let result: Reply = node .create_inlet( ctx, - &cmd.from, + cmd.from.hostname_port(), &cmd.to(), - cmd.alias.as_ref().expect("The `alias` argument should be set to its default value if not provided"), + cmd.name.as_ref().expect("The `name` argument should be set to its default value if not provided"), &cmd.authorized, &cmd.allow, cmd.connection_wait, !cmd.no_connection_wait, &cmd.secure_channel_identifier(&opts.state).await?, - cmd.udp, + cmd.udp || cmd.from.is_udp(), cmd.no_tcp_fallback, cmd.privileged, &cmd.tls_certificate_provider, @@ -263,7 +266,7 @@ impl Command for CreateCommand { msg }; - if privileged { + if cmd.privileged { plain += &fmt_info!( "This Inlet is operating in {} mode\n", color_primary_alt("privileged".to_string()) @@ -317,36 +320,56 @@ impl CreateCommand { } pub async fn parse_args(mut self, opts: &CommandGlobalOpts) -> miette::Result { - self.alias = self.alias.or_else(|| Some(random_name())); - let from = resolve_peer(self.from.to_string()) + if let Some(alias) = self.alias.as_ref() { + print_warning_for_deprecated_flag_replaced( + opts, + "alias", + "the positional argument", + )?; + if self.name.is_some() { + opts.terminal.write_line( + fmt_warn!("The argument is being overridden by the --alias flag") + + &fmt_log!("Consider removing the --alias flag"), + )?; + } + self.name = Some(alias.clone()); + } else { + self.name = self.name.or_else(|| Some(random_name())); + } + + let from = resolve_peer(self.from.hostname_port()) .await .into_diagnostic()?; port_is_free_guard(&from)?; + self.to = Self::parse_arg_to(&opts.state, self.to, self.via.as_ref()).await?; if self.to().matches(0, &[proto::Project::CODE.into()]) && self.authorized.is_some() { return Err(miette!( "--authorized can not be used with project addresses" ))?; } - self.tls_certificate_provider = if let Some(tls_certificate_provider) = - &self.tls_certificate_provider - { - Some(tls_certificate_provider.clone()) - } else if self.tls { - Some(MultiAddr::from_str("/project/default/service/tls_certificate_provider").unwrap()) - } else { - None - }; + + self.tls_certificate_provider = + if let Some(tls_certificate_provider) = &self.tls_certificate_provider { + Some(tls_certificate_provider.clone()) + } else if self.tls || self.from.is_tls() { + Some(MultiAddr::from_str( + "/project/default/service/tls_certificate_provider", + )?) + } else { + None + }; + Ok(self) } - async fn parse_arg_to( + pub(crate) async fn parse_arg_to( state: &CliState, to: impl Into, via: Option<&String>, ) -> miette::Result { let mut to = to.into(); - let to_is_default = to == default_to_addr(); + let to_is_default = to == tcp_inlet_default_to_addr(); let mut service_name = "outlet".to_string(); let relay_name = via.cloned().unwrap_or("default".to_string()); @@ -380,7 +403,7 @@ impl CreateCommand { // "to" refers to the service name service_name = to.to_string(); // and we set "to" to the default route, so we can do the replacements later - to = default_to_addr(); + to = tcp_inlet_default_to_addr(); } } @@ -455,7 +478,7 @@ mod tests { } // "to" default value - let res = CreateCommand::parse_arg_to(&state, default_to_addr(), None) + let res = CreateCommand::parse_arg_to(&state, tcp_inlet_default_to_addr(), None) .await .unwrap(); assert_eq!( @@ -487,7 +510,7 @@ mod tests { // "via" argument is used to replace the relay name let cases = [ ( - default_to_addr(), + tcp_inlet_default_to_addr(), "myrelay", "/project/p1/service/forward_to_myrelay/secure/api/service/outlet", ), diff --git a/implementations/rust/ockam/ockam_command/src/tcp/outlet/create.rs b/implementations/rust/ockam/ockam_command/src/tcp/outlet/create.rs index bf4925c07d8..19a812921ad 100644 --- a/implementations/rust/ockam/ockam_command/src/tcp/outlet/create.rs +++ b/implementations/rust/ockam/ockam_command/src/tcp/outlet/create.rs @@ -1,15 +1,11 @@ -use std::collections::HashMap; -use std::str::FromStr; - +use crate::node::util::initialize_default_node; +use crate::{docs, Command, CommandGlobalOpts}; use async_trait::async_trait; use clap::builder::FalseyValueParser; use clap::Args; use colorful::Colorful; use miette::IntoDiagnostic; - -use crate::node::util::initialize_default_node; -use crate::{docs, Command, CommandGlobalOpts}; -use ockam::transport::HostnamePort; +use ockam::transport::SchemeHostnamePort; use ockam::Address; use ockam::Context; use ockam_abac::PolicyExpression; @@ -21,7 +17,9 @@ use ockam_api::colors::{color_primary, color_primary_alt}; use ockam_api::nodes::models::portal::OutletStatus; use ockam_api::nodes::service::tcp_outlets::Outlets; use ockam_api::nodes::BackgroundNodeClient; -use ockam_api::{fmt_info, fmt_ok}; +use ockam_api::{fmt_info, fmt_log, fmt_ok, fmt_warn}; +use ockam_core::compat::str::FromStr; +use std::collections::HashMap; const AFTER_LONG_HELP: &str = include_str!("./static/create/after_long_help.txt"); const LONG_ABOUT: &str = include_str!("./static/create/long_about.txt"); @@ -33,20 +31,24 @@ long_about = docs::about(LONG_ABOUT), after_long_help = docs::after_help(AFTER_LONG_HELP) )] pub struct CreateCommand { + /// Address of your TCP Outlet, which is part of a route used in other commands. + /// This unique address identifies the TCP Outlet worker on the Node on your local machine. + /// Examples are `/service/my-outlet` or `my-outlet`. + /// If not provided, `outlet` will be used, or a random address will be generated if `outlet` is taken. + /// You will need this address when creating a TCP Inlet using `ockam tcp-inlet create`. + #[arg(value_parser = extract_address_value)] + pub name: Option, + /// TCP address where your TCP server is running: domain:port. Your Outlet will send raw TCP traffic to it - #[arg(long, display_order = 900, id = "HOSTNAME_PORT", value_parser = HostnamePort::from_str)] - pub to: HostnamePort, + #[arg(long, id = "SOCKET_ADDRESS", display_order = 900, value_parser = SchemeHostnamePort::from_str)] + pub to: SchemeHostnamePort, /// If set, the outlet will establish a TLS connection over TCP #[arg(long, display_order = 900, id = "BOOLEAN")] pub tls: bool, - /// Address of your TCP Outlet, which is part of a route that is used in other - /// commands. This address must be unique. This address identifies the TCP Outlet - /// worker, on the node, on your local machine. Examples are `/service/my-outlet` or - /// `my-outlet`. If you don't provide it, `/service/outlet` will be used. You will - /// need this address when you create a TCP Inlet (using `ockam tcp-inlet create --to - /// `) + /// Alternative to the positional argument. + /// Address of your TCP Outlet, which is part of a route used in other commands. #[arg(long, display_order = 902, id = "OUTLET_ADDRESS", value_parser = extract_address_value)] pub from: Option, @@ -60,7 +62,6 @@ pub struct CreateCommand { /// /// You can check the fallback policy with `ockam policy show --resource-type tcp-outlet`. #[arg( - hide = true, long, visible_alias = "expression", display_order = 904, @@ -80,28 +81,29 @@ impl Command for CreateCommand { async fn async_run(self, ctx: &Context, opts: CommandGlobalOpts) -> crate::Result<()> { initialize_default_node(ctx, &opts).await?; + let cmd = self.parse_args(&opts).await?; - let node = BackgroundNodeClient::create(ctx, &opts.state, &self.at).await?; + let node = BackgroundNodeClient::create(ctx, &opts.state, &cmd.at).await?; let node_name = node.node_name(); let outlet_status = { let pb = opts.terminal.progress_bar(); if let Some(pb) = pb.as_ref() { pb.set_message(format!( "Creating a new TCP Outlet to {}...\n", - color_primary(self.to.to_string()) + color_primary(cmd.to.to_string()) )); } node.create_outlet( ctx, - self.to.clone(), - self.tls, - self.from.clone().map(Address::from).as_ref(), - self.allow.clone(), - self.privileged, + cmd.to.clone().into(), + cmd.tls, + cmd.name.clone().map(Address::from).as_ref(), + cmd.allow.clone(), + cmd.privileged, ) .await? }; - self.add_outlet_created_journey_event(&opts, &node_name, &outlet_status) + cmd.add_outlet_created_journey_event(&opts, &node_name, &outlet_status) .await?; let worker_route = outlet_status.worker_route().into_diagnostic()?; @@ -110,10 +112,10 @@ impl Command for CreateCommand { "Created a new TCP Outlet in the Node {} at {} bound to {}\n", color_primary(&node_name), color_primary(worker_route.to_string()), - color_primary(self.to.to_string()) + color_primary(cmd.to.to_string()) ); - if self.privileged { + if cmd.privileged { msg += &fmt_info!( "This Outlet is operating in {} mode\n", color_primary_alt("privileged".to_string()) @@ -132,6 +134,20 @@ impl Command for CreateCommand { } impl CreateCommand { + async fn parse_args(mut self, opts: &CommandGlobalOpts) -> miette::Result { + if let Some(from) = self.from.as_ref() { + if self.name.is_some() { + opts.terminal.write_line( + fmt_warn!("The argument is being overridden by the --from flag") + + &fmt_log!("Consider using either the argument or the --from flag"), + )?; + } + self.name = Some(from.clone()); + } + + Ok(self) + } + pub async fn add_outlet_created_journey_event( &self, opts: &CommandGlobalOpts, diff --git a/implementations/rust/ockam/ockam_command/src/util/mod.rs b/implementations/rust/ockam/ockam_command/src/util/mod.rs index 195615a72c2..3188ac9085b 100644 --- a/implementations/rust/ockam/ockam_command/src/util/mod.rs +++ b/implementations/rust/ockam/ockam_command/src/util/mod.rs @@ -6,10 +6,6 @@ use std::{ use colorful::Colorful; use miette::{miette, IntoDiagnostic}; -use opentelemetry::trace::FutureExt; -use tokio::runtime::Runtime; -use tracing::{debug, error}; - use ockam::{Address, Context, NodeBuilder}; use ockam_api::cli_state::CliState; use ockam_api::cli_state::CliStateError; @@ -19,6 +15,9 @@ use ockam_api::fmt_warn; use ockam_core::{DenyAll, OpenTelemetryContext}; use ockam_multiaddr::proto::{DnsAddr, Ip4, Ip6, Project, Space, Tcp}; use ockam_multiaddr::{proto::Node, MultiAddr, Protocol}; +use opentelemetry::trace::FutureExt; +use tokio::runtime::Runtime; +use tracing::{debug, error}; use crate::{CommandGlobalOpts, Result}; @@ -188,6 +187,9 @@ pub async fn clean_nodes_multiaddr( pub fn port_is_free_guard(address: &SocketAddr) -> Result<()> { let port = address.port(); + if port == 0 { + return Ok(()); + } let ip = address.ip(); if TcpListener::bind((ip, port)).is_err() { Err(miette!( diff --git a/implementations/rust/ockam/ockam_command/src/util/parsers.rs b/implementations/rust/ockam/ockam_command/src/util/parsers.rs index 5b00b617ee7..9645b76ae0f 100644 --- a/implementations/rust/ockam/ockam_command/src/util/parsers.rs +++ b/implementations/rust/ockam/ockam_command/src/util/parsers.rs @@ -5,18 +5,18 @@ use std::time::Duration; use miette::{miette, WrapErr}; use ockam::identity::Identifier; -use ockam::transport::HostnamePort; +use ockam::transport::SchemeHostnamePort; use ockam_api::config::lookup::InternetAddress; use ockam_core::env::parse_duration; +// use crate::util::validators::cloud_resource_name_validator; use crate::Result; -/// Helper function for parsing a socket from user input -/// It is possible to just input a `port`. In that case the address will be assumed to be -/// 127.0.0.1: -pub(crate) fn hostname_parser(input: &str) -> Result { - HostnamePort::from_str(input).wrap_err(format!( +/// Helper function for parsing a socket from user input by using +/// [`SchemeHostnamePort::from_str()`] +pub(crate) fn hostname_parser(input: &str) -> Result { + SchemeHostnamePort::from_str(input).wrap_err(format!( "cannot parse the address {input} as a socket address" )) } diff --git a/implementations/rust/ockam/ockam_command/tests/bats/local/kafka.bats b/implementations/rust/ockam/ockam_command/tests/bats/local/kafka.bats index 788ca45f8c5..2f94e87a0e7 100644 --- a/implementations/rust/ockam/ockam_command/tests/bats/local/kafka.bats +++ b/implementations/rust/ockam/ockam_command/tests/bats/local/kafka.bats @@ -48,7 +48,7 @@ teardown() { run_failure $OCKAM kafka-inlet create --to /secure/api --from $(random_port) # Create a second inlet port="$(random_port)" - run_success $OCKAM kafka-inlet create --to /secure/api --from $port --addr inlet2 --jq '.' + run_success $OCKAM kafka-inlet create inlet2 --to /secure/api --from $port --jq '.' assert_output --partial "\"from\": \"127.0.0.1:$port\"" assert_output --partial "\"to\": \"/secure/api\"" @@ -75,3 +75,17 @@ teardown() { run_success $OCKAM kafka-inlet list --jq '.[].addr' assert_output --partial "inlet2" } + +@test "kafka - create with deprecated alias --addr flag" { + port="$(random_port)" + + # Fail if both addr and name are used + run_failure $OCKAM kafka-inlet create kinlet-name --addr kinlet --to /secure/api --from $port --jq '.' + + run_success $OCKAM kafka-inlet create --addr kinlet --to /secure/api --from $port --jq '.' + assert_output --partial "\"from\": \"127.0.0.1:$port\"" + assert_output --partial "\"to\": \"/secure/api\"" + + run_success $OCKAM kafka-inlet show kinlet --jq '.' + assert_output --partial "kinlet" +} diff --git a/implementations/rust/ockam/ockam_command/tests/bats/local/portals.bats b/implementations/rust/ockam/ockam_command/tests/bats/local/portals.bats index 07679e386d7..05b1e097467 100644 --- a/implementations/rust/ockam/ockam_command/tests/bats/local/portals.bats +++ b/implementations/rust/ockam/ockam_command/tests/bats/local/portals.bats @@ -50,7 +50,7 @@ teardown() { assert_output --partial "/service/outlet" inlet_port="$(random_port)" - run_success $OCKAM tcp-inlet create --at /node/n2 --from 127.0.0.1:$inlet_port --to /node/n1/service/outlet --alias "test-inlet" + run_success $OCKAM tcp-inlet create "test-inlet" --at /node/n2 --from 127.0.0.1:$inlet_port --to /node/n1/service/outlet run_success $OCKAM tcp-inlet create --at /node/n2 --from 6102 --to /node/n1/service/outlet sleep 1 @@ -94,6 +94,19 @@ teardown() { run_success "$OCKAM" node create n1 run_success "$OCKAM" node create n2 + port="$(random_port)" + run_success $OCKAM tcp-inlet create tcp-inlet-2 --at /node/n2 --from $port --to /node/n1/service/outlet + sleep 1 + + run_success $OCKAM tcp-inlet list --at /node/n2 + assert_output --partial "tcp-inlet-2" + assert_output --partial "127.0.0.1:$port" +} + +@test "portals - list inlets on a node, using deprecated --alias flag" { + run_success "$OCKAM" node create n1 + run_success "$OCKAM" node create n2 + port="$(random_port)" run_success $OCKAM tcp-inlet create --at /node/n2 --from $port --to /node/n1/service/outlet --alias tcp-inlet-2 sleep 1 @@ -103,6 +116,19 @@ teardown() { assert_output --partial "127.0.0.1:$port" } +@test "portals - list inlets on a node, using deprecated --alias flag overriding name" { + run_success "$OCKAM" node create n1 + run_success "$OCKAM" node create n2 + + port="$(random_port)" + run_success $OCKAM tcp-inlet create my-inlet --at /node/n2 --from $port --to /node/n1/service/outlet --alias tcp-inlet-2 + sleep 1 + + run_success $OCKAM tcp-inlet list --at /node/n2 + assert_output --partial "tcp-inlet-2" + assert_output --partial "127.0.0.1:$port" +} + @test "portals - list outlets on a node" { run_success "$OCKAM" node create n1 @@ -120,7 +146,7 @@ teardown() { run_success "$OCKAM" node create n2 port="$(random_port)" - run_success $OCKAM tcp-inlet create --at /node/n2 --from $port --to /node/n1/service/outlet --alias "test-inlet" + run_success $OCKAM tcp-inlet create "test-inlet" --at /node/n2 --from $port --to /node/n1/service/outlet sleep 1 run_success $OCKAM tcp-inlet show "test-inlet" --at /node/n2 @@ -231,9 +257,9 @@ teardown() { run_success "$OCKAM" tcp-outlet create --at n --to "$port" port="$(random_port)" - run_success "$OCKAM" tcp-inlet create --at n --from "$port" --to "/node/n/service/outlet" --alias i + run_success "$OCKAM" tcp-inlet create i --at n --from "$port" --to "/node/n/service/outlet" port="$(random_port)" - run_failure "$OCKAM" tcp-inlet create --at n --from "$port" --to "/node/n/service/outlet" --alias i + run_failure "$OCKAM" tcp-inlet create i --at n --from "$port" --to "/node/n/service/outlet" } @test "portals - fail to create two TCP inlets at the same socket address" { diff --git a/implementations/rust/ockam/ockam_node/src/compat/asynchronous.rs b/implementations/rust/ockam/ockam_node/src/compat/asynchronous.rs index 6f6de61ed77..e68fee65a42 100644 --- a/implementations/rust/ockam/ockam_node/src/compat/asynchronous.rs +++ b/implementations/rust/ockam/ockam_node/src/compat/asynchronous.rs @@ -1,13 +1,14 @@ pub use tokio::sync::Mutex; pub use tokio::sync::RwLock; -use ockam_transport_core::TransportError; +use ockam_transport_core::{HostnamePort, TransportError}; use std::net::SocketAddr; use tokio::net::lookup_host; /// Asynchronously resolve the given peer to a [`SocketAddr`](std::net::SocketAddr) -pub async fn resolve_peer(peer: impl ToString) -> ockam_core::Result { +pub async fn resolve_peer(peer: &HostnamePort) -> ockam_core::Result { let peer = peer.to_string(); + // Try to resolve hostname match lookup_host(peer.clone()).await { Ok(mut iter) => { @@ -31,14 +32,15 @@ pub async fn resolve_peer(peer: impl ToString) -> ockam_core::Result #[cfg(test)] mod tests { use super::*; + use core::str::FromStr; use ockam_transport_core::HostnamePort; #[tokio::test] async fn test_hostname_port() -> ockam_core::Result<()> { - let socket_addr = resolve_peer("76.76.21.21:8080".to_string()).await.unwrap(); + let socket_addr = + resolve_peer(&HostnamePort::from_str("76.76.21.21:8080").unwrap()).await?; let actual = HostnamePort::from(socket_addr); - assert_eq!(actual, HostnamePort::new("76.76.21.21", 8080)); - + assert_eq!(actual, HostnamePort::new("76.76.21.21", 8080)?); Ok(()) } } diff --git a/implementations/rust/ockam/ockam_node/src/lib.rs b/implementations/rust/ockam/ockam_node/src/lib.rs index 511af9a735a..54c0796dea0 100644 --- a/implementations/rust/ockam/ockam_node/src/lib.rs +++ b/implementations/rust/ockam/ockam_node/src/lib.rs @@ -15,13 +15,11 @@ )] #![cfg_attr(not(feature = "std"), no_std)] -#[cfg(feature = "std")] -extern crate core; - #[cfg(feature = "alloc")] #[macro_use] extern crate alloc; - +#[cfg(feature = "std")] +extern crate core; #[macro_use] extern crate tracing; diff --git a/implementations/rust/ockam/ockam_transport_core/src/hostname_port.rs b/implementations/rust/ockam/ockam_transport_core/src/hostname_port.rs index 4c09aa1ed3a..9ceb87f6f0a 100644 --- a/implementations/rust/ockam/ockam_transport_core/src/hostname_port.rs +++ b/implementations/rust/ockam/ockam_transport_core/src/hostname_port.rs @@ -23,9 +23,11 @@ impl StaticHostnamePort { } } -impl From for HostnamePort { - fn from(value: StaticHostnamePort) -> Self { - Self::new(value.hostname, value.port) +impl TryFrom for HostnamePort { + type Error = ockam_core::Error; + + fn try_from(value: StaticHostnamePort) -> ockam_core::Result { + HostnamePort::new(value.hostname, value.port) } } @@ -39,11 +41,12 @@ pub struct HostnamePort { impl HostnamePort { /// Create a new HostnamePort - pub fn new(hostname: impl Into, port: u16) -> HostnamePort { - Self { + pub fn new(hostname: impl Into, port: u16) -> ockam_core::Result { + let _self = Self { hostname: hostname.into(), port, - } + }; + Self::validate(&_self.to_string()) } /// Return the hostname @@ -63,6 +66,11 @@ impl HostnamePort { } fn validate(hostname_port: &str) -> ockam_core::Result { + // Check if the input is an IP address + if let Ok(socket) = parse_socket_addr(hostname_port) { + return Ok(HostnamePort::from(socket)); + } + // Split the input into hostname and port let (hostname, port_str) = match hostname_port.split_once(':') { None => { @@ -156,7 +164,10 @@ impl HostnamePort { } } - Ok(HostnamePort::new(hostname, port)) + Ok(Self { + hostname: hostname.to_string(), + port, + }) } } @@ -215,12 +226,12 @@ impl FromStr for HostnamePort { fn from_str(hostname_port: &str) -> ockam_core::Result { // edge case: only the port is given if let Ok(port) = hostname_port.parse::() { - return Ok(HostnamePort::new("127.0.0.1", port)); + return HostnamePort::new("127.0.0.1", port); } if let Some(port_str) = hostname_port.strip_prefix(':') { if let Ok(port) = port_str.parse::() { - return Ok(HostnamePort::new("127.0.0.1", port)); + return HostnamePort::new("127.0.0.1", port); } } @@ -247,21 +258,24 @@ mod tests { #[test] fn hostname_port_valid_inputs() -> ockam_core::Result<()> { let valid_cases = vec![ - ("localhost:80", HostnamePort::new("localhost", 80)), - ("33domain:80", HostnamePort::new("33domain", 80)), - ("127.0.0.1:80", HostnamePort::new("127.0.0.1", 80)), - ("xn--74h.com:80", HostnamePort::new("xn--74h.com", 80)), - ("sub.xn_74h.com:80", HostnamePort::new("sub.xn_74h.com", 80)), - (":80", HostnamePort::new("127.0.0.1", 80)), - ("80", HostnamePort::new("127.0.0.1", 80)), + ("localhost:80", HostnamePort::new("localhost", 80)?), + ("33domain:80", HostnamePort::new("33domain", 80)?), + ("127.0.0.1:80", HostnamePort::new("127.0.0.1", 80)?), + ("xn--74h.com:80", HostnamePort::new("xn--74h.com", 80)?), + ( + "sub.xn_74h.com:80", + HostnamePort::new("sub.xn_74h.com", 80)?, + ), + (":80", HostnamePort::new("127.0.0.1", 80)?), + ("80", HostnamePort::new("127.0.0.1", 80)?), ( "[2001:db8:85a3::8a2e:370:7334]:8080", - HostnamePort::new("[2001:db8:85a3::8a2e:370:7334]", 8080), + HostnamePort::new("[2001:db8:85a3::8a2e:370:7334]", 8080)?, ), - ("[::1]:8080", HostnamePort::new("[::1]", 8080)), + ("[::1]:8080", HostnamePort::new("[::1]", 8080)?), ( "[2001:db8:85a3::8a2e:370:7334]:8080", - HostnamePort::new("[2001:db8:85a3::8a2e:370:7334]", 8080), + HostnamePort::new("[2001:db8:85a3::8a2e:370:7334]", 8080)?, ), ]; for (input, expected) in valid_cases { @@ -272,15 +286,15 @@ mod tests { let socket_address_cases = vec![ ( SocketAddr::from_str("127.0.0.1:8080").unwrap(), - HostnamePort::new("127.0.0.1", 8080), + HostnamePort::new("127.0.0.1", 8080)?, ), ( SocketAddr::from_str("[2001:db8:85a3::8a2e:370:7334]:8080").unwrap(), - HostnamePort::new("[2001:db8:85a3::8a2e:370:7334]", 8080), + HostnamePort::new("[2001:db8:85a3::8a2e:370:7334]", 8080)?, ), ( SocketAddr::from_str("[::1]:8080").unwrap(), - HostnamePort::new("[::1]", 8080), + HostnamePort::new("[::1]", 8080)?, ), ]; for (input, expected) in socket_address_cases { diff --git a/implementations/rust/ockam/ockam_transport_core/src/lib.rs b/implementations/rust/ockam/ockam_transport_core/src/lib.rs index 0f79c9b507d..890528e22b1 100644 --- a/implementations/rust/ockam/ockam_transport_core/src/lib.rs +++ b/implementations/rust/ockam/ockam_transport_core/src/lib.rs @@ -16,9 +16,11 @@ mod error; mod hostname_port; mod parse_socket; +mod scheme_hostname_port; mod transport; pub use error::TransportError; pub use hostname_port::*; pub use parse_socket::*; +pub use scheme_hostname_port::*; pub use transport::*; diff --git a/implementations/rust/ockam/ockam_transport_core/src/scheme_hostname_port.rs b/implementations/rust/ockam/ockam_transport_core/src/scheme_hostname_port.rs new file mode 100644 index 00000000000..5a4b7c3d5d3 --- /dev/null +++ b/implementations/rust/ockam/ockam_transport_core/src/scheme_hostname_port.rs @@ -0,0 +1,196 @@ +use crate::{HostnamePort, StaticHostnamePort}; +use core::fmt::{Display, Formatter}; +use core::str::FromStr; +use ockam_core::compat::{format, string::String, vec::Vec}; +use ockam_core::errcode::{Kind, Origin}; + +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct SchemeHostnamePort { + scheme: ValidScheme, + hostname_port: HostnamePort, +} + +impl SchemeHostnamePort { + pub fn new( + scheme: impl Into, + hostname: impl Into, + port: u16, + ) -> ockam_core::Result { + let scheme = scheme.into(); + Ok(SchemeHostnamePort { + scheme: ValidScheme::from_str(&scheme)?, + hostname_port: HostnamePort::new(hostname, port)?, + }) + } + + pub fn hostname_port(&self) -> &HostnamePort { + &self.hostname_port + } + + pub fn hostname(&self) -> String { + self.hostname_port.hostname() + } + + pub fn port(&self) -> u16 { + self.hostname_port.port() + } + + pub fn is_tls(&self) -> bool { + self.scheme == ValidScheme::Tls + } + + pub fn is_udp(&self) -> bool { + self.scheme == ValidScheme::Udp + } +} + +impl From for HostnamePort { + fn from(scheme_hostname_port: SchemeHostnamePort) -> Self { + scheme_hostname_port.hostname_port + } +} + +impl From<&SchemeHostnamePort> for HostnamePort { + fn from(scheme_hostname_port: &SchemeHostnamePort) -> Self { + scheme_hostname_port.hostname_port.clone() + } +} + +impl TryFrom for SchemeHostnamePort { + type Error = ockam_core::Error; + + fn try_from(value: StaticHostnamePort) -> Result { + Ok(SchemeHostnamePort { + scheme: ValidScheme::Tcp, + hostname_port: value.try_into()?, + }) + } +} + +impl Display for SchemeHostnamePort { + fn fmt(&self, f: &mut Formatter<'_>) -> core::fmt::Result { + f.write_str(&format!("{}://{}", self.scheme, self.hostname_port)) + } +} + +impl FromStr for SchemeHostnamePort { + type Err = ::Err; + + fn from_str(s: &str) -> Result { + // The input string can be either "scheme://hostname:port" or "hostname:port" + // If the scheme is missing, we assume it's "tcp" + let mut parts = s.split("://"); + let (scheme, hostname_port) = match (parts.next(), parts.next()) { + (Some(s), Some(hp)) => (s, hp), + (Some(hp), None) => ("tcp", hp), + _ => { + return Err(ockam_core::Error::new( + Origin::Application, + Kind::Serialization, + "invalid input string for SchemeHostnamePort", + )); + } + }; + Ok(SchemeHostnamePort { + scheme: ValidScheme::from_str(scheme)?, + hostname_port: HostnamePort::from_str(hostname_port)?, + }) + } +} + +#[derive(Clone, Debug, PartialEq, Eq)] +pub enum ValidScheme { + Tcp, + Udp, + Tls, +} + +impl ValidScheme { + pub fn values() -> &'static [Self] { + &[ValidScheme::Tcp, ValidScheme::Udp, ValidScheme::Tls] + } +} + +impl AsRef for ValidScheme { + fn as_ref(&self) -> &str { + match self { + ValidScheme::Tcp => "tcp", + ValidScheme::Udp => "udp", + ValidScheme::Tls => "tls", + } + } +} + +impl FromStr for ValidScheme { + type Err = ockam_core::Error; + + fn from_str(s: &str) -> Result { + match s { + "tcp" => Ok(ValidScheme::Tcp), + "udp" => Ok(ValidScheme::Udp), + "tls" => Ok(ValidScheme::Tls), + _ => Err(ockam_core::Error::new( + Origin::Application, + Kind::Serialization, + format!( + "invalid scheme {s}. Supported schemes are {}", + ValidScheme::values() + .iter() + .map(|s| s.as_ref()) + .collect::>() + .join(", ") + ), + )), + } + } +} + +impl Display for ValidScheme { + fn fmt(&self, f: &mut Formatter<'_>) -> core::fmt::Result { + f.write_str(self.as_ref()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_valid_scheme() { + let scheme = ValidScheme::from_str("tcp").unwrap(); + assert_eq!(scheme, ValidScheme::Tcp); + assert_eq!(scheme.as_ref(), "tcp"); + + let scheme = ValidScheme::from_str("udp").unwrap(); + assert_eq!(scheme, ValidScheme::Udp); + assert_eq!(scheme.as_ref(), "udp"); + + let scheme = ValidScheme::from_str("tls").unwrap(); + assert_eq!(scheme, ValidScheme::Tls); + assert_eq!(scheme.as_ref(), "tls"); + } + + #[test] + fn test_scheme_hostname_port() { + let valid_cases = [ + ("tcp://localhost:22", ValidScheme::Tcp, "localhost:22"), + ("udp://my.domain:1234", ValidScheme::Udp, "my.domain:1234"), + ("tls://127.0.0.1:45678", ValidScheme::Tls, "127.0.0.1:45678"), + ("localhost:1234", ValidScheme::Tcp, "localhost:1234"), + ("6543", ValidScheme::Tcp, "127.0.0.1:6543"), + ]; + for (input, expected_scheme, expected_hostname_port) in valid_cases.iter() { + let scheme_hostname_port = SchemeHostnamePort::from_str(input).unwrap(); + assert_eq!(&scheme_hostname_port.scheme, expected_scheme); + assert_eq!( + scheme_hostname_port.hostname_port.to_string(), + expected_hostname_port.to_string() + ); + } + + let invalid_cases = ["", "tcp://", "tcp://localhost"]; + for input in invalid_cases.iter() { + assert!(SchemeHostnamePort::from_str(input).is_err()); + } + } +} diff --git a/implementations/rust/ockam/ockam_transport_tcp/src/privileged_portal/privileged_portals.rs b/implementations/rust/ockam/ockam_transport_tcp/src/privileged_portal/privileged_portals.rs index b9b162f182d..82883ec7423 100644 --- a/implementations/rust/ockam/ockam_transport_tcp/src/privileged_portal/privileged_portals.rs +++ b/implementations/rust/ockam/ockam_transport_tcp/src/privileged_portal/privileged_portals.rs @@ -184,7 +184,7 @@ impl TcpTransport { // TODO: eBPF May be good to run resolution every time there is incoming connection, but that // would require also updating the self.ebpf_support.outlet_registry - let destination = resolve_peer(peer.to_string()).await?; + let destination = resolve_peer(&peer).await?; let dst_ip = match destination.ip() { IpAddr::V4(ip) => ip, diff --git a/implementations/rust/ockam/ockam_transport_tcp/src/transport/portals.rs b/implementations/rust/ockam/ockam_transport_tcp/src/transport/portals.rs index 9c0e29dcb9b..8df3af5a5dc 100644 --- a/implementations/rust/ockam/ockam_transport_tcp/src/transport/portals.rs +++ b/implementations/rust/ockam/ockam_transport_tcp/src/transport/portals.rs @@ -83,7 +83,7 @@ impl TcpTransport { /// async fn test(ctx: Context) -> Result<()> { /// /// let tcp = TcpTransport::create(&ctx).await?; - /// tcp.create_outlet("outlet", HostnamePort::new("localhost", 9000), TcpOutletOptions::new()).await?; + /// tcp.create_outlet("outlet", HostnamePort::new("localhost", 9000)?, TcpOutletOptions::new()).await?; /// # tcp.stop_outlet("outlet").await?; /// # Ok(()) } /// ``` @@ -116,7 +116,7 @@ impl TcpTransport { /// async fn test(ctx: Context) -> Result<()> { /// /// let tcp = TcpTransport::create(&ctx).await?; - /// tcp.create_outlet("outlet", HostnamePort::new("127.0.0.1", 5000), TcpOutletOptions::new()).await?; + /// tcp.create_outlet("outlet", HostnamePort::new("127.0.0.1", 5000)?, TcpOutletOptions::new()).await?; /// tcp.stop_outlet("outlet").await?; /// # Ok(()) } /// ``` diff --git a/implementations/rust/ockam/ockam_transport_udp/src/lib.rs b/implementations/rust/ockam/ockam_transport_udp/src/lib.rs index 70bfe40b390..23f1e56bb55 100644 --- a/implementations/rust/ockam/ockam_transport_udp/src/lib.rs +++ b/implementations/rust/ockam/ockam_transport_udp/src/lib.rs @@ -11,11 +11,10 @@ )] #![cfg_attr(not(feature = "std"), no_std)] -#[cfg(feature = "std")] -extern crate core; - #[cfg(feature = "alloc")] extern crate alloc; +#[cfg(feature = "std")] +extern crate core; mod messages; mod options; diff --git a/implementations/rust/ockam/ockam_transport_udp/src/transport/bind.rs b/implementations/rust/ockam/ockam_transport_udp/src/transport/bind.rs index 74cb2bd14f7..0f63afdb156 100644 --- a/implementations/rust/ockam/ockam_transport_udp/src/transport/bind.rs +++ b/implementations/rust/ockam/ockam_transport_udp/src/transport/bind.rs @@ -2,12 +2,13 @@ use crate::workers::{split_socket, Addresses, UdpReceiverProcessor, UdpSenderWor use crate::{UdpBindOptions, UdpTransport}; use core::fmt; use core::fmt::Formatter; +use core::str::FromStr; use ockam_core::errcode::{Kind, Origin}; use ockam_core::flow_control::FlowControlId; use ockam_core::{Address, AllowAll, DenyAll, Error, Result}; use ockam_node::compat::asynchronous::resolve_peer; use ockam_node::{ProcessorBuilder, WorkerBuilder}; -use ockam_transport_core::{parse_socket_addr, TransportError}; +use ockam_transport_core::{parse_socket_addr, HostnamePort, TransportError}; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use tokio::net::UdpSocket; use tracing::{debug, error}; @@ -52,7 +53,7 @@ impl UdpBindArguments { /// Set peer address if we communicate with one specific peer pub async fn with_peer_address(mut self, peer_address: impl AsRef) -> Result { - let peer_address = resolve_peer(peer_address.as_ref().to_string()).await?; + let peer_address = resolve_peer(&HostnamePort::from_str(peer_address.as_ref())?).await?; self.peer_address = Some(peer_address); Ok(self) diff --git a/implementations/rust/ockam/ockam_transport_udp/src/workers/sender.rs b/implementations/rust/ockam/ockam_transport_udp/src/workers/sender.rs index 6eec193097e..2193df7626d 100644 --- a/implementations/rust/ockam/ockam_transport_udp/src/workers/sender.rs +++ b/implementations/rust/ockam/ockam_transport_udp/src/workers/sender.rs @@ -3,11 +3,12 @@ use crate::messages::{ RoutingNumber, UdpRoutingMessage, UdpTransportMessage, CURRENT_VERSION, MAX_PAYLOAD_SIZE, }; use crate::{MAX_MESSAGE_SIZE, UDP}; +use core::str::FromStr; use ockam_core::errcode::{Kind, Origin}; use ockam_core::{async_trait, Any, Error, LocalMessage, Result, Routed, Worker}; use ockam_node::compat::asynchronous::resolve_peer; use ockam_node::Context; -use ockam_transport_core::TransportError; +use ockam_transport_core::{HostnamePort, TransportError}; use std::net::SocketAddr; use tracing::{error, trace, warn}; @@ -77,7 +78,7 @@ impl Worker for UdpSenderWorker { return Err(TransportError::UnknownRoute)?; } - resolve_peer(peer_addr.address().to_string()).await? + resolve_peer(&HostnamePort::from_str(peer_addr.address())?).await? }; // Error on conditions that _might_ put the sink