diff --git a/implementations/rust/ockam/ockam_command/src/influxdb/inlet/create.rs b/implementations/rust/ockam/ockam_command/src/influxdb/inlet/create.rs index b939dc619ed..b02c31a8986 100644 --- a/implementations/rust/ockam/ockam_command/src/influxdb/inlet/create.rs +++ b/implementations/rust/ockam/ockam_command/src/influxdb/inlet/create.rs @@ -1,25 +1,145 @@ use crate::node::util::initialize_default_node; -use crate::tcp::inlet::create::CreateCommand as InletCreateCommand; +use crate::shared_args::OptionalTimeoutArg; +use crate::tcp::inlet::create::{tcp_inlet_default_from_addr, tcp_inlet_default_to_addr}; +use crate::tcp::util::alias_parser; +use crate::util::parsers::duration_parser; +use crate::util::parsers::hostname_parser; +use crate::util::{port_is_free_guard, print_warning_for_deprecated_flag_replaced}; use crate::{Command, CommandGlobalOpts}; use async_trait::async_trait; +use clap::builder::FalseyValueParser; use clap::Args; use colorful::Colorful; -use miette::miette; +use miette::{miette, IntoDiagnostic}; +use ockam::identity::Identifier; +use ockam::transport::HostnamePort; use ockam::Context; +use ockam_abac::PolicyExpression; +use ockam_api::address::extract_address_value; +use ockam_api::cli_state::random_name; use ockam_api::colors::color_primary; use ockam_api::influxdb::{InfluxDBPortals, LeaseUsage}; use ockam_api::nodes::models::portal::InletStatus; use ockam_api::nodes::BackgroundNodeClient; -use ockam_api::{fmt_info, fmt_log, fmt_ok, fmt_warn, ConnectionStatus}; +use ockam_api::{fmt_info, fmt_log, fmt_ok, fmt_warn, CliState, ConnectionStatus}; use ockam_core::api::{Reply, Status}; -use ockam_multiaddr::MultiAddr; +use ockam_multiaddr::{proto, MultiAddr, Protocol}; +use ockam_node::compat::asynchronous::resolve_peer; +use std::str::FromStr; +use std::time::Duration; use tracing::trace; /// Create InfluxDB Inlets #[derive(Clone, Debug, Args)] -pub struct InfluxDBCreateCommand { +pub struct CreateCommand { + /// Assign a name to this InfluxDB Inlet + #[arg(id = "NAME", value_parser = alias_parser)] + pub name: Option, + + /// Node on which to start the InfluxDB Inlet. + #[arg(long, display_order = 900, id = "NODE_NAME", value_parser = extract_address_value)] + pub at: Option, + + /// Address on which to accept InfluxDB connections. + #[arg(long, display_order = 900, id = "SOCKET_ADDRESS", hide_default_value = true, default_value_t = tcp_inlet_default_from_addr(), value_parser = hostname_parser)] + pub from: HostnamePort, + + /// Route to a InfluxDB Outlet or the name of the InfluxDB Outlet service you want to connect to. + /// + /// If you are connecting to a local node, you can provide the route as `/node/n/service/outlet`. + /// + /// If you are connecting to a remote node through a relay in the Orchestrator you can either + /// provide the full route to the InfluxDB Outlet as `/project/myproject/service/forward_to_myrelay/secure/api/service/outlet`, + /// or just the name of the service as `outlet` or `/service/outlet`. + /// If you are passing just the service name, consider using `--via` to specify the + /// relay name (e.g. `ockam tcp-inlet create --to outlet --via myrelay`). + #[arg(long, display_order = 900, id = "ROUTE", default_value_t = tcp_inlet_default_to_addr())] + pub to: String, + + /// Name of the relay that this InfluxDB Inlet will use to connect to the InfluxDB Outlet. + /// + /// Use this flag when you are using `--to` to specify the service name of a InfluxDB Outlet + /// that is reachable through a relay in the Orchestrator. + /// If you don't provide it, the default relay name will be used, if necessary. + #[arg(long, display_order = 900, id = "RELAY_NAME")] + pub via: Option, + + /// Identity to be used to create the secure channel. If not set, the node's identity will be used. + #[arg(long, value_name = "IDENTITY_NAME", display_order = 900)] + pub identity: Option, + + /// Authorized identifier for secure channel connection + #[arg(long, name = "AUTHORIZED", display_order = 900)] + pub authorized: Option, + + /// [DEPRECATED] Use the positional argument instead + #[arg(long, display_order = 900, id = "ALIAS", value_parser = alias_parser)] + pub alias: Option, + + /// Policy expression that will be used for access control to the InfluxDB Inlet. + /// If you don't provide it, the policy set for the "tcp-inlet" resource type will be used. + /// + /// You can check the fallback policy with `ockam policy show --resource-type tcp-inlet`. + #[arg( + long, + visible_alias = "expression", + display_order = 900, + id = "POLICY_EXPRESSION" + )] + pub allow: Option, + + /// Time to wait for the outlet to be available. + #[arg(long, display_order = 900, id = "WAIT", default_value = "5s", value_parser = duration_parser)] + pub connection_wait: Duration, + + /// Time to wait before retrying to connect to the InfluxDB Outlet. + #[arg(long, display_order = 900, id = "RETRY", default_value = "20s", value_parser = duration_parser)] + pub retry_wait: Duration, + #[command(flatten)] - pub tcp_inlet: InletCreateCommand, + pub timeout: OptionalTimeoutArg, + + /// Create the InfluxDB Inlet without waiting for the InfluxDB Outlet to connect + #[arg(long, default_value = "false")] + pub no_connection_wait: bool, + + /// Enable UDP NAT puncture. + #[arg( + long, + visible_alias = "enable-udp-puncture", + value_name = "BOOL", + default_value_t = false, + hide = true + )] + pub udp: bool, + + /// Disable fallback to TCP. + /// TCP won't be used to transfer data between the Inlet and the Outlet. + #[arg( + long, + visible_alias = "disable-tcp-fallback", + value_name = "BOOL", + default_value_t = false, + hide = true + )] + pub no_tcp_fallback: bool, + + /// Use eBPF and RawSocket to access TCP packets instead of TCP data stream. + /// If `OCKAM_PRIVILEGED` env variable is set to 1, this argument will be `true`. + #[arg(long, env = "OCKAM_PRIVILEGED", value_parser = FalseyValueParser::default(), hide = true)] + pub privileged: bool, + + #[arg(long, value_name = "BOOL", default_value_t = false, hide = true)] + /// Enable TLS for the InfluxDB Inlet. + /// Uses the default project TLS certificate provider, `/project/default/service/tls_certificate_provider`. + /// To specify a different certificate provider, use `--tls-certificate-provider`. + /// Requires `ockam-tls-certificate` credential attribute. + pub tls: bool, + + #[arg(long, value_name = "ROUTE", hide = true)] + /// Enable TLS for the InfluxDB Inlet using the provided certificate provider. + /// Requires `ockam-tls-certificate` credential attribute. + pub tls_certificate_provider: Option, /// Share the leases among the clients or use a separate lease for each client #[arg(long, default_value = "per-client")] @@ -33,25 +153,22 @@ pub struct InfluxDBCreateCommand { } #[async_trait] -impl Command for InfluxDBCreateCommand { +impl Command for CreateCommand { const NAME: &'static str = "influxdb-inlet create"; async fn async_run(mut self, ctx: &Context, opts: CommandGlobalOpts) -> crate::Result<()> { initialize_default_node(ctx, &opts).await?; - self = self.parse_args(&opts).await?; + let cmd = self.parse_args(&opts).await?; - let mut node = BackgroundNodeClient::create(ctx, &opts.state, &self.tcp_inlet.at).await?; - self.tcp_inlet - .timeout - .timeout - .map(|t| node.set_timeout_mut(t)); + let mut node = BackgroundNodeClient::create(ctx, &opts.state, &cmd.at).await?; + cmd.timeout.timeout.map(|t| node.set_timeout_mut(t)); let inlet_status = { let pb = opts.terminal.progress_bar(); if let Some(pb) = pb.as_ref() { pb.set_message(format!( "Creating a InfluxDB Inlet at {}...\n", - color_primary(&self.tcp_inlet.from) + color_primary(&cmd.from) )); } @@ -59,22 +176,21 @@ impl Command for InfluxDBCreateCommand { let result: Reply = node .create_influxdb_inlet( ctx, - &self.tcp_inlet.from, - &self.tcp_inlet.to(), - self.tcp_inlet.alias.as_ref().expect("The `alias` argument should be set to its default value if not provided"), - &self.tcp_inlet.authorized, - &self.tcp_inlet.allow, - self.tcp_inlet.connection_wait, - !self.tcp_inlet.no_connection_wait, - &self - .tcp_inlet + &cmd.from, + &cmd.to(), + cmd.name.as_ref().expect("The `name` argument should be set to its default value if not provided"), + &cmd.authorized, + &cmd.allow, + cmd.connection_wait, + !cmd.no_connection_wait, + &cmd .secure_channel_identifier(&opts.state) .await?, - self.tcp_inlet.udp, - self.tcp_inlet.no_tcp_fallback, - &self.tcp_inlet.tls_certificate_provider, - self.leased_token_strategy.clone(), - self.lease_manager_route.clone(), + cmd.udp, + cmd.no_tcp_fallback, + &cmd.tls_certificate_provider, + cmd.leased_token_strategy.clone(), + cmd.lease_manager_route.clone(), ) .await?; @@ -90,49 +206,46 @@ impl Command for InfluxDBCreateCommand { }; trace!("the inlet creation returned a non-OK status: {s:?}"); - if self.tcp_inlet.retry_wait.as_millis() == 0 { - return Err(miette!("Failed to create TCP inlet"))?; + if cmd.retry_wait.as_millis() == 0 { + return Err(miette!("Failed to create InfluxDB inlet"))?; } if let Some(pb) = pb.as_ref() { pb.set_message(format!( - "Waiting for TCP Inlet {} to be available... Retrying momentarily\n", - color_primary(&self.tcp_inlet.to) + "Waiting for InfluxDB Inlet {} to be available... Retrying momentarily\n", + color_primary(&cmd.to) )); } - tokio::time::sleep(self.tcp_inlet.retry_wait).await + tokio::time::sleep(cmd.retry_wait).await } } } }; let node_name = node.node_name(); - self.tcp_inlet - .add_inlet_created_event(&opts, &node_name, &inlet_status) - .await?; let created_message = fmt_ok!( "Created a new InfluxDB Inlet in the Node {} bound to {}\n", color_primary(&node_name), - color_primary(&self.tcp_inlet.from) + color_primary(&cmd.from) ); - let plain = if self.tcp_inlet.no_connection_wait { - created_message + &fmt_log!("It will automatically connect to the TCP Outlet at {} as soon as it is available", - color_primary(&self.tcp_inlet.to) + let plain = if cmd.no_connection_wait { + created_message + &fmt_log!("It will automatically connect to the InfluxDB Outlet at {} as soon as it is available", + color_primary(&cmd.to) ) } else if inlet_status.status == ConnectionStatus::Up { created_message + &fmt_log!( - "sending traffic to the TCP Outlet at {}", - color_primary(&self.tcp_inlet.to) + "sending traffic to the InfluxDB Outlet at {}", + color_primary(&cmd.to) ) } else { fmt_warn!( - "A InfluxDB Inlet was created in the Node {} bound to {} but failed to connect to the TCP Outlet at {}\n", + "A InfluxDB Inlet was created in the Node {} bound to {} but failed to connect to the InfluxDB Outlet at {}\n", color_primary(&node_name), - color_primary(self.tcp_inlet.from.to_string()), - color_primary(&self.tcp_inlet.to) + color_primary(cmd.from.to_string()), + color_primary(&cmd.to) ) + &fmt_info!("It will retry to connect automatically") }; @@ -147,9 +260,52 @@ impl Command for InfluxDBCreateCommand { } } -impl InfluxDBCreateCommand { +impl CreateCommand { async fn parse_args(mut self, opts: &CommandGlobalOpts) -> miette::Result { - self.tcp_inlet = self.tcp_inlet.parse_args(opts).await?; + if let Some(alias) = self.alias.as_ref() { + print_warning_for_deprecated_flag_replaced( + opts, + "alias", + "the positional argument", + )?; + if self.name.is_none() { + self.name = Some(alias.clone()); + } else { + return Err(miette!( + "--alias is deprecated and can't be used together with the positional argument", + )); + } + } else { + self.name = self.name.or_else(|| Some(random_name())); + } + + let from = resolve_peer(self.from.to_string()) + .await + .into_diagnostic()?; + port_is_free_guard(&from)?; + + self.to = crate::tcp::inlet::create::CreateCommand::parse_arg_to( + &opts.state, + self.to, + self.via.as_ref(), + ) + .await?; + if self.to().matches(0, &[proto::Project::CODE.into()]) && self.authorized.is_some() { + return Err(miette!( + "--authorized can not be used with project addresses" + ))?; + } + + self.tls_certificate_provider = if let Some(tls_certificate_provider) = + &self.tls_certificate_provider + { + Some(tls_certificate_provider.clone()) + } else if self.tls { + Some(MultiAddr::from_str("/project/default/service/tls_certificate_provider").unwrap()) + } else { + None + }; + if self .lease_manager_route .as_ref() @@ -159,6 +315,22 @@ impl InfluxDBCreateCommand { "lease-manager-route argument requires leased-token-strategy=per-client" ))? }; + Ok(self) } + + pub fn to(&self) -> MultiAddr { + MultiAddr::from_str(&self.to).unwrap() + } + + pub async fn secure_channel_identifier( + &self, + state: &CliState, + ) -> miette::Result> { + if let Some(identity_name) = self.identity.as_ref() { + Ok(Some(state.get_identifier_by_name(identity_name).await?)) + } else { + Ok(None) + } + } } diff --git a/implementations/rust/ockam/ockam_command/src/influxdb/inlet/mod.rs b/implementations/rust/ockam/ockam_command/src/influxdb/inlet/mod.rs index 034d4a3617c..0aae0c319f7 100644 --- a/implementations/rust/ockam/ockam_command/src/influxdb/inlet/mod.rs +++ b/implementations/rust/ockam/ockam_command/src/influxdb/inlet/mod.rs @@ -1,6 +1,6 @@ use clap::{Args, Subcommand}; -use create::InfluxDBCreateCommand; +use create::CreateCommand; use crate::{docs, Command, CommandGlobalOpts}; @@ -22,7 +22,7 @@ pub struct InfluxDBInletCommand { #[derive(Clone, Debug, Subcommand)] pub enum InfluxDBInletSubCommand { - Create(InfluxDBCreateCommand), + Create(CreateCommand), } impl InfluxDBInletCommand { diff --git a/implementations/rust/ockam/ockam_command/src/influxdb/outlet/create.rs b/implementations/rust/ockam/ockam_command/src/influxdb/outlet/create.rs index bc1a5752abe..fdd5cf46b8a 100644 --- a/implementations/rust/ockam/ockam_command/src/influxdb/outlet/create.rs +++ b/implementations/rust/ockam/ockam_command/src/influxdb/outlet/create.rs @@ -1,24 +1,64 @@ use crate::node::util::initialize_default_node; -use crate::tcp::outlet::create::CreateCommand as OutletCreateCommand; use crate::util::parsers::duration_parser; use crate::{Command, CommandGlobalOpts}; use async_trait::async_trait; +use clap::builder::FalseyValueParser; use clap::Args; use colorful::Colorful; use miette::miette; +use ockam::transport::HostnamePort; use ockam::{Address, Context}; +use ockam_abac::PolicyExpression; +use ockam_api::address::extract_address_value; use ockam_api::colors::color_primary; use ockam_api::fmt_ok; use ockam_api::influxdb::portal::{InfluxDBOutletConfig, LeaseManagerConfig}; use ockam_api::influxdb::InfluxDBPortals; use ockam_api::nodes::BackgroundNodeClient; +use std::str::FromStr; use std::time::Duration; /// Create InfluxDB Outlets #[derive(Clone, Debug, Args)] -pub struct InfluxDBCreateCommand { - #[command(flatten)] - pub tcp_outlet: OutletCreateCommand, +pub struct CreateCommand { + /// TCP address where your TCP server is running: domain:port. Your Outlet will send raw TCP traffic to it + #[arg(long, display_order = 900, id = "HOSTNAME_PORT", value_parser = HostnamePort::from_str)] + pub to: HostnamePort, + + /// If set, the outlet will establish a TLS connection over TCP + #[arg(long, display_order = 900, id = "BOOLEAN")] + pub tls: bool, + + /// Address of your InfluxDB Outlet, which is part of a route that is used in other + /// commands. This address must be unique. This address identifies the InfluxDB Outlet + /// worker, on the node, on your local machine. Examples are `/service/my-outlet` or + /// `my-outlet`. If you don't provide it, `/service/outlet` will be used. You will + /// need this address when you create a InfluxDB Inlet (using `ockam tcp-inlet create --to + /// `) + #[arg(long, display_order = 902, id = "OUTLET_ADDRESS", value_parser = extract_address_value)] + pub from: Option, + + /// Your InfluxDB Outlet will be created on this node. If you don't provide it, the default + /// node will be used + #[arg(long, display_order = 903, id = "NODE_NAME", value_parser = extract_address_value)] + pub at: Option, + + /// Policy expression that will be used for access control to the InfluxDB Outlet. + /// If you don't provide it, the policy set for the "tcp-outlet" resource type will be used. + /// + /// You can check the fallback policy with `ockam policy show --resource-type tcp-outlet`. + #[arg( + long, + visible_alias = "expression", + display_order = 904, + id = "POLICY_EXPRESSION" + )] + pub allow: Option, + + /// Use eBPF and RawSocket to access TCP packets instead of TCP data stream. + /// If `OCKAM_PRIVILEGED` env variable is set to 1, this argument will be `true`. + #[arg(long, env = "OCKAM_PRIVILEGED", value_parser = FalseyValueParser::default(), hide = true)] + pub privileged: bool, #[arg(long, conflicts_with("LeaseManagerConfigArgs"))] fixed_token: Option, @@ -48,7 +88,7 @@ pub struct LeaseManagerConfigArgs { } #[async_trait] -impl Command for InfluxDBCreateCommand { +impl Command for CreateCommand { const NAME: &'static str = "influxdb-outlet create"; async fn async_run(mut self, ctx: &Context, opts: CommandGlobalOpts) -> crate::Result<()> { @@ -70,28 +110,25 @@ impl Command for InfluxDBCreateCommand { ))?; }; - let node = BackgroundNodeClient::create(ctx, &opts.state, &self.tcp_outlet.at).await?; + let node = BackgroundNodeClient::create(ctx, &opts.state, &self.at).await?; let outlet_status = { let pb = opts.terminal.progress_bar(); if let Some(pb) = pb.as_ref() { pb.set_message(format!( "Creating a new InfluxDB Outlet to {}...\n", - color_primary(self.tcp_outlet.to.to_string()) + color_primary(self.to.to_string()) )); } node.create_influxdb_outlet( ctx, - self.tcp_outlet.to.clone(), - self.tcp_outlet.tls, - self.tcp_outlet.from.clone().map(Address::from).as_ref(), - self.tcp_outlet.allow.clone(), + self.to.clone(), + self.tls, + self.from.clone().map(Address::from).as_ref(), + self.allow.clone(), token_config, ) .await? }; - self.tcp_outlet - .add_outlet_created_journey_event(&opts, &node.node_name(), &outlet_status) - .await?; opts.terminal .stdout() @@ -99,7 +136,7 @@ impl Command for InfluxDBCreateCommand { "Created a new InfluxDB Outlet in the Node {} at {} bound to {}\n\n", color_primary(node.node_name()), color_primary(&outlet_status.worker_addr), - color_primary(&self.tcp_outlet.to) + color_primary(&self.to) )) .machine(&outlet_status.worker_addr) .json_obj(&outlet_status)? diff --git a/implementations/rust/ockam/ockam_command/src/influxdb/outlet/mod.rs b/implementations/rust/ockam/ockam_command/src/influxdb/outlet/mod.rs index e391dfd6ad9..d54f0cce49b 100644 --- a/implementations/rust/ockam/ockam_command/src/influxdb/outlet/mod.rs +++ b/implementations/rust/ockam/ockam_command/src/influxdb/outlet/mod.rs @@ -1,6 +1,6 @@ use clap::{Args, Subcommand}; -use create::InfluxDBCreateCommand; +use create::CreateCommand; use crate::{docs, Command, CommandGlobalOpts}; @@ -22,7 +22,7 @@ pub struct InfluxDBOutletCommand { #[derive(Clone, Debug, Subcommand)] pub enum InfluxDBOutletSubCommand { - Create(InfluxDBCreateCommand), + Create(CreateCommand), } impl InfluxDBOutletCommand { diff --git a/implementations/rust/ockam/ockam_command/src/kafka/consumer/create.rs b/implementations/rust/ockam/ockam_command/src/kafka/consumer/create.rs index 15574244871..11f2fced93d 100644 --- a/implementations/rust/ockam/ockam_command/src/kafka/consumer/create.rs +++ b/implementations/rust/ockam/ockam_command/src/kafka/consumer/create.rs @@ -3,7 +3,7 @@ use ockam::transport::HostnamePort; use ockam_api::port_range::PortRange; use ockam_multiaddr::MultiAddr; -use crate::util::print_deprecated_warning; +use crate::util::print_warning_for_deprecated_flag_replaced; use crate::{ kafka::{kafka_default_consumer_server, kafka_default_project_route, kafka_inlet_default_addr}, node::NodeOpts, @@ -22,14 +22,17 @@ pub struct CreateCommand { /// The local address of the service #[arg(long, default_value_t = kafka_inlet_default_addr())] addr: String, + /// The address where to bind and where the client will connect to alongside its port,
:. /// In case just a port is specified, the default loopback address (127.0.0.1) will be used #[arg(long, default_value_t = kafka_default_consumer_server(), value_parser = hostname_parser)] bootstrap_server: HostnamePort, + /// Local port range dynamically allocated to kafka brokers, must not overlap with the /// bootstrap port #[arg(long)] brokers_port_range: Option, + /// The route to the project in ockam orchestrator, expected something like /project/ #[arg(long, default_value_t = kafka_default_project_route())] project_route: MultiAddr, @@ -37,8 +40,9 @@ pub struct CreateCommand { impl CreateCommand { pub fn run(self, opts: CommandGlobalOpts) -> miette::Result<()> { - print_deprecated_warning(&opts, &self.name(), "kafka-inlet")?; + print_warning_for_deprecated_flag_replaced(&opts, &self.name(), "kafka-inlet")?; crate::kafka::inlet::create::CreateCommand { + name: self.addr.clone(), node_opts: self.node_opts, addr: self.addr, from: self.bootstrap_server, @@ -47,8 +51,8 @@ impl CreateCommand { consumer: None, consumer_relay: None, publishing_relay: None, - avoid_publishing: false, - disable_content_encryption: false, + no_publishing: false, + no_content_encryption: false, encrypted_fields: vec![], inlet_policy_expression: None, consumer_policy_expression: None, diff --git a/implementations/rust/ockam/ockam_command/src/kafka/consumer/delete.rs b/implementations/rust/ockam/ockam_command/src/kafka/consumer/delete.rs index 6bb2f4917c2..cfd4dfb2db5 100644 --- a/implementations/rust/ockam/ockam_command/src/kafka/consumer/delete.rs +++ b/implementations/rust/ockam/ockam_command/src/kafka/consumer/delete.rs @@ -1,6 +1,6 @@ use clap::Args; -use crate::util::print_deprecated_warning; +use crate::util::print_warning_for_deprecated_flag_replaced; use crate::{docs, node::NodeOpts, Command, CommandGlobalOpts}; const AFTER_LONG_HELP: &str = include_str!("./static/delete/after_long_help.txt"); @@ -19,7 +19,7 @@ pub struct DeleteCommand { impl DeleteCommand { pub fn run(self, opts: CommandGlobalOpts) -> miette::Result<()> { - print_deprecated_warning(&opts, &self.name(), "kafka-inlet")?; + print_warning_for_deprecated_flag_replaced(&opts, &self.name(), "kafka-inlet")?; crate::kafka::inlet::delete::DeleteCommand { node_opts: self.node_opts, address: Some(self.address), diff --git a/implementations/rust/ockam/ockam_command/src/kafka/consumer/list.rs b/implementations/rust/ockam/ockam_command/src/kafka/consumer/list.rs index 61606dd641c..3ef59203c48 100644 --- a/implementations/rust/ockam/ockam_command/src/kafka/consumer/list.rs +++ b/implementations/rust/ockam/ockam_command/src/kafka/consumer/list.rs @@ -1,7 +1,7 @@ use clap::Args; use crate::node::NodeOpts; -use crate::util::print_deprecated_warning; +use crate::util::print_warning_for_deprecated_flag_replaced; use crate::{docs, Command, CommandGlobalOpts}; const PREVIEW_TAG: &str = include_str!("../../static/preview_tag.txt"); @@ -21,7 +21,7 @@ pub struct ListCommand { impl ListCommand { pub fn run(self, opts: CommandGlobalOpts) -> miette::Result<()> { - print_deprecated_warning(&opts, &self.name(), "kafka-inlet")?; + print_warning_for_deprecated_flag_replaced(&opts, &self.name(), "kafka-inlet")?; crate::kafka::inlet::list::ListCommand { node_opts: self.node_opts, } diff --git a/implementations/rust/ockam/ockam_command/src/kafka/inlet/create.rs b/implementations/rust/ockam/ockam_command/src/kafka/inlet/create.rs index b0fed96b913..d5e0062c922 100644 --- a/implementations/rust/ockam/ockam_command/src/kafka/inlet/create.rs +++ b/implementations/rust/ockam/ockam_command/src/kafka/inlet/create.rs @@ -2,6 +2,15 @@ use crate::kafka::kafka_default_project_route; use async_trait::async_trait; use std::fmt::Write; +use crate::kafka::make_brokers_port_range; +use crate::node::util::initialize_default_node; +use crate::util::{print_warning_for_deprecated_flag_replaced, process_nodes_multiaddr}; +use crate::{ + kafka::{kafka_default_inlet_bind_address, kafka_inlet_default_addr}, + node::NodeOpts, + util::parsers::hostname_parser, + Command, CommandGlobalOpts, +}; use clap::{command, Args}; use colorful::Colorful; use miette::miette; @@ -20,30 +29,24 @@ use ockam_multiaddr::MultiAddr; use ockam_node::Context; use serde::Serialize; -use crate::kafka::make_brokers_port_range; -use crate::node::util::initialize_default_node; -use crate::util::process_nodes_multiaddr; -use crate::{ - kafka::{kafka_default_inlet_bind_address, kafka_inlet_default_addr}, - node::NodeOpts, - util::parsers::hostname_parser, - Command, CommandGlobalOpts, -}; - /// Create a new Kafka Inlet. /// Kafka clients v3.7.0 and earlier are supported. /// You can find the version you have with 'kafka-topics.sh --version'. #[derive(Clone, Debug, Args)] pub struct CreateCommand { + /// Assign a name to this Kafka Inlet + #[arg(default_value_t = kafka_inlet_default_addr())] + pub name: String, + #[command(flatten)] pub node_opts: NodeOpts, - /// The local address of the service + /// [DEPRECATED] Use the positional argument instead #[arg(long, default_value_t = kafka_inlet_default_addr())] pub addr: String, /// The address where to bind and where the client will connect to alongside its port,
:. - /// In case just a port is specified, the default loopback address (127.0.0.1:4000) will be used + /// In case just a port is specified, the default loopback address (127.0.0.1) will be used. #[arg(long, default_value_t = kafka_default_inlet_bind_address(), value_parser = hostname_parser)] pub from: HostnamePort, @@ -78,19 +81,23 @@ pub struct CreateCommand { /// Avoid publishing the consumer in the relay. /// This is useful to avoid the creation of an unused relay when the consumer is directly /// referenced by the producer. - #[arg(long, name = "avoid-publishing", conflicts_with = "publishing-relay")] - pub avoid_publishing: bool, + #[arg( + long, + visible_alias = "avoid-publishing", + conflicts_with = "publishing-relay" + )] + pub no_publishing: bool, /// Disable end-to-end kafka messages encryption between producer and consumer. /// Use it when you want a plain kafka portal, the communication itself will still be /// encrypted. #[arg( long, - name = "disable-content-encryption", + visible_alias = "disable-content-encryption", value_name = "BOOL", default_value_t = false )] - pub disable_content_encryption: bool, + pub no_content_encryption: bool, /// The fields to encrypt in the kafka messages, assuming the record is a valid JSON map. /// By default, the whole record is encrypted. @@ -130,70 +137,53 @@ impl Command for CreateCommand { async fn async_run(self, ctx: &Context, opts: CommandGlobalOpts) -> crate::Result<()> { initialize_default_node(ctx, &opts).await?; - - let brokers_port_range = self - .brokers_port_range - .unwrap_or_else(|| make_brokers_port_range(&self.from)); - - // The bootstrap port can't overlap with the brokers port range - if self.from.port() >= brokers_port_range.start() - && self.from.port() <= brokers_port_range.end() - { - return Err(miette!( - "The bootstrap port {} can't overlap with the brokers port range {}", - self.from.port(), - brokers_port_range.to_string() - )); - } - - let at_node = self.node_opts.at_node.clone(); - let addr = self.addr.clone(); - let to = process_nodes_multiaddr(&self.to, &opts.state).await?; + let cmd = self.parse_args(&opts).await?; let inlet = { let pb = opts.terminal.progress_bar(); if let Some(pb) = pb.as_ref() { pb.set_message(format!( "Creating Kafka Inlet at {}...\n", - color_primary(self.from.to_string()) + color_primary(cmd.from.to_string()) )); } - let node = BackgroundNodeClient::create(ctx, &opts.state, &at_node).await?; + let node = + BackgroundNodeClient::create(ctx, &opts.state, &cmd.node_opts.at_node).await?; let consumer_resolution; - if let Some(route) = self.consumer { - consumer_resolution = ConsumerResolution::SingleNode(route); - } else if let Some(route) = &self.consumer_relay { + if let Some(route) = &cmd.consumer { + consumer_resolution = ConsumerResolution::SingleNode(route.clone()); + } else if let Some(route) = &cmd.consumer_relay { consumer_resolution = ConsumerResolution::ViaRelay(route.clone()); } else { - consumer_resolution = ConsumerResolution::ViaRelay(to.clone()); + consumer_resolution = ConsumerResolution::ViaRelay(cmd.to.clone()); } let consumer_publishing; - if self.avoid_publishing { + if cmd.no_publishing { consumer_publishing = ConsumerPublishing::None; - } else if let Some(route) = self.publishing_relay { - consumer_publishing = ConsumerPublishing::Relay(route); - } else if let Some(route) = self.consumer_relay { - consumer_publishing = ConsumerPublishing::Relay(route); + } else if let Some(route) = &cmd.publishing_relay { + consumer_publishing = ConsumerPublishing::Relay(route.clone()); + } else if let Some(route) = &cmd.consumer_relay { + consumer_publishing = ConsumerPublishing::Relay(route.clone()); } else { - consumer_publishing = ConsumerPublishing::Relay(to.clone()); + consumer_publishing = ConsumerPublishing::Relay(cmd.to.clone()); } let payload = StartKafkaInletRequest::new( - self.from.clone(), - brokers_port_range, - to.clone(), - !self.disable_content_encryption, - self.encrypted_fields, + cmd.from.clone(), + cmd.brokers_port_range(), + cmd.to.clone(), + !cmd.no_content_encryption, + cmd.encrypted_fields.clone(), consumer_resolution, consumer_publishing, - self.inlet_policy_expression, - self.consumer_policy_expression, - self.producer_policy_expression, + cmd.inlet_policy_expression.clone(), + cmd.consumer_policy_expression.clone(), + cmd.producer_policy_expression.clone(), ); - let payload = StartServiceRequest::new(payload, &addr); + let payload = StartServiceRequest::new(payload, &cmd.name); let req = Request::post("/node/services/kafka_inlet").body(payload); node.tell(ctx, req) .await @@ -201,10 +191,10 @@ impl Command for CreateCommand { KafkaInletOutput { node_name: node.node_name(), - from: InternetAddress::new(&self.from.to_string()) + from: InternetAddress::new(&cmd.from.to_string()) .ok_or(miette!("Invalid address"))?, - brokers_port_range, - to, + brokers_port_range: cmd.brokers_port_range(), + to: cmd.to.clone(), } }; @@ -218,6 +208,47 @@ impl Command for CreateCommand { } } +impl CreateCommand { + async fn parse_args(mut self, opts: &CommandGlobalOpts) -> miette::Result { + if self.addr != kafka_inlet_default_addr() { + print_warning_for_deprecated_flag_replaced( + opts, + "addr", + "the positional argument", + )?; + if self.name == kafka_inlet_default_addr() { + self.name = self.addr.clone(); + } else { + return Err(miette!( + "--addr is deprecated and can't be used together with the positional argument", + )); + } + } + + self.brokers_port_range = self + .brokers_port_range + .or_else(|| Some(make_brokers_port_range(&self.from))); + + // The bootstrap port can't overlap with the brokers port range + if self.from.port() >= self.brokers_port_range().start() + && self.from.port() <= self.brokers_port_range().end() + { + return Err(miette!( + "The bootstrap port {} can't overlap with the brokers port range {}", + color_primary(self.from.port()), + color_primary(self.brokers_port_range().to_string()) + )); + } + + self.to = process_nodes_multiaddr(&self.to, &opts.state).await?; + Ok(self) + } + + fn brokers_port_range(&self) -> PortRange { + self.brokers_port_range.unwrap() + } +} + #[derive(Serialize)] struct KafkaInletOutput { node_name: String, diff --git a/implementations/rust/ockam/ockam_command/src/kafka/outlet/create.rs b/implementations/rust/ockam/ockam_command/src/kafka/outlet/create.rs index 60bd2cf9b06..8f462ed1e38 100644 --- a/implementations/rust/ockam/ockam_command/src/kafka/outlet/create.rs +++ b/implementations/rust/ockam/ockam_command/src/kafka/outlet/create.rs @@ -18,6 +18,7 @@ use ockam_api::{fmt_log, fmt_ok}; use ockam_core::api::Request; use crate::node::util::initialize_default_node; +use crate::util::print_warning_for_deprecated_flag_replaced; use crate::{ kafka::{kafka_default_outlet_addr, kafka_default_outlet_server}, node::NodeOpts, @@ -27,15 +28,19 @@ use crate::{ /// Create a new Kafka Outlet #[derive(Clone, Debug, Args)] pub struct CreateCommand { + /// Assign a name to this Kafka Outlet + #[arg(default_value_t = kafka_default_outlet_addr())] + pub name: String, + #[command(flatten)] pub node_opts: NodeOpts, - /// The local address of the service + /// [DEPRECATED] Use the positional argument instead #[arg(long, default_value_t = kafka_default_outlet_addr())] pub addr: String, /// The address of the kafka bootstrap broker - #[arg(long, default_value_t = kafka_default_outlet_server())] + #[arg(long, visible_alias = "to", default_value_t = kafka_default_outlet_server())] pub bootstrap_server: HostnamePort, /// If set, the outlet will establish a TLS connection over TCP @@ -46,7 +51,7 @@ pub struct CreateCommand { /// If you don't provide it, the policy set for the "tcp-outlet" resource type will be used. /// /// You can check the fallback policy with `ockam policy show --resource-type tcp-outlet`. - #[arg(hide = true, long = "allow", id = "EXPRESSION")] + #[arg(long = "allow", id = "EXPRESSION")] pub policy_expression: Option, } @@ -56,32 +61,33 @@ impl Command for CreateCommand { async fn async_run(self, ctx: &Context, opts: CommandGlobalOpts) -> crate::Result<()> { initialize_default_node(ctx, &opts).await?; + let cmd = self.parse_args(&opts).await?; let outlet = { let pb = opts.terminal.progress_bar(); if let Some(pb) = pb.as_ref() { pb.set_message(format!( "Creating Kafka Outlet to bootstrap server {}...\n", - color_primary(self.bootstrap_server.to_string()) + color_primary(cmd.bootstrap_server.to_string()) )); } let payload = StartKafkaOutletRequest::new( - self.bootstrap_server.clone(), - self.tls, - self.policy_expression, + cmd.bootstrap_server.clone(), + cmd.tls, + cmd.policy_expression, ); - let payload = StartServiceRequest::new(payload, &self.addr); + let payload = StartServiceRequest::new(payload, &cmd.name); let req = Request::post("/node/services/kafka_outlet").body(payload); let node = - BackgroundNodeClient::create(ctx, &opts.state, &self.node_opts.at_node).await?; + BackgroundNodeClient::create(ctx, &opts.state, &cmd.node_opts.at_node).await?; node.tell(ctx, req) .await .map_err(|e| miette!("Failed to start Kafka Outlet: {e}"))?; KafkaOutletOutput { node_name: node.node_name(), - bootstrap_server: self.bootstrap_server.to_string(), + bootstrap_server: cmd.bootstrap_server.to_string(), } }; @@ -95,6 +101,27 @@ impl Command for CreateCommand { } } +impl CreateCommand { + async fn parse_args(mut self, opts: &CommandGlobalOpts) -> miette::Result { + if self.addr != kafka_default_outlet_addr() { + print_warning_for_deprecated_flag_replaced( + opts, + "addr", + "the positional argument", + )?; + if self.name == kafka_default_outlet_addr() { + self.name = self.addr.clone(); + } else { + return Err(miette!( + "--addr is deprecated and can't be used together with the positional argument", + )); + } + } + + Ok(self) + } +} + #[derive(Serialize)] struct KafkaOutletOutput { node_name: String, diff --git a/implementations/rust/ockam/ockam_command/src/kafka/producer/create.rs b/implementations/rust/ockam/ockam_command/src/kafka/producer/create.rs index 22ee2355542..6eeb30c34cd 100644 --- a/implementations/rust/ockam/ockam_command/src/kafka/producer/create.rs +++ b/implementations/rust/ockam/ockam_command/src/kafka/producer/create.rs @@ -3,7 +3,7 @@ use ockam::transport::HostnamePort; use ockam_api::port_range::PortRange; use ockam_multiaddr::MultiAddr; -use crate::util::print_deprecated_warning; +use crate::util::print_warning_for_deprecated_flag_replaced; use crate::{ kafka::{kafka_default_producer_server, kafka_default_project_route, kafka_inlet_default_addr}, node::NodeOpts, @@ -18,17 +18,21 @@ use crate::{ pub struct CreateCommand { #[command(flatten)] node_opts: NodeOpts, + /// The local address of the service #[arg(long, default_value_t = kafka_inlet_default_addr())] addr: String, + /// The address where to bind and where the client will connect to alongside its port,
:. /// In case just a port is specified, the default loopback address (127.0.0.1) will be used #[arg(long, default_value_t = kafka_default_producer_server(), value_parser = hostname_parser)] bootstrap_server: HostnamePort, + /// Local port range dynamically allocated to kafka brokers, must not overlap with the /// bootstrap port #[arg(long)] brokers_port_range: Option, + /// The route to the project in ockam orchestrator, expected something like /project/ #[arg(long, default_value_t = kafka_default_project_route())] project_route: MultiAddr, @@ -36,8 +40,9 @@ pub struct CreateCommand { impl CreateCommand { pub fn run(self, opts: CommandGlobalOpts) -> miette::Result<()> { - print_deprecated_warning(&opts, &self.name(), "kafka-inlet")?; + print_warning_for_deprecated_flag_replaced(&opts, &self.name(), "kafka-inlet")?; crate::kafka::inlet::create::CreateCommand { + name: self.addr.clone(), node_opts: self.node_opts, addr: self.addr, from: self.bootstrap_server, @@ -46,8 +51,8 @@ impl CreateCommand { consumer: None, consumer_relay: None, publishing_relay: None, - avoid_publishing: false, - disable_content_encryption: false, + no_publishing: false, + no_content_encryption: false, encrypted_fields: vec![], inlet_policy_expression: None, consumer_policy_expression: None, diff --git a/implementations/rust/ockam/ockam_command/src/kafka/producer/delete.rs b/implementations/rust/ockam/ockam_command/src/kafka/producer/delete.rs index 456ad8ace3a..2828eb2c2e8 100644 --- a/implementations/rust/ockam/ockam_command/src/kafka/producer/delete.rs +++ b/implementations/rust/ockam/ockam_command/src/kafka/producer/delete.rs @@ -1,6 +1,6 @@ use clap::Args; -use crate::util::print_deprecated_warning; +use crate::util::print_warning_for_deprecated_flag_replaced; use crate::{docs, node::NodeOpts, Command, CommandGlobalOpts}; const AFTER_LONG_HELP: &str = include_str!("./static/delete/after_long_help.txt"); @@ -19,7 +19,7 @@ pub struct DeleteCommand { impl DeleteCommand { pub fn run(self, opts: CommandGlobalOpts) -> miette::Result<()> { - print_deprecated_warning(&opts, &self.name(), "kafka-inlet")?; + print_warning_for_deprecated_flag_replaced(&opts, &self.name(), "kafka-inlet")?; crate::kafka::inlet::delete::DeleteCommand { node_opts: self.node_opts, address: Some(self.address), diff --git a/implementations/rust/ockam/ockam_command/src/kafka/producer/list.rs b/implementations/rust/ockam/ockam_command/src/kafka/producer/list.rs index 9f160a35a2b..9eae7f5fc5c 100644 --- a/implementations/rust/ockam/ockam_command/src/kafka/producer/list.rs +++ b/implementations/rust/ockam/ockam_command/src/kafka/producer/list.rs @@ -1,7 +1,7 @@ use clap::Args; use crate::node::NodeOpts; -use crate::util::print_deprecated_warning; +use crate::util::print_warning_for_deprecated_flag_replaced; use crate::{docs, Command, CommandGlobalOpts}; const PREVIEW_TAG: &str = include_str!("../../static/preview_tag.txt"); @@ -21,7 +21,7 @@ pub struct ListCommand { impl ListCommand { pub fn run(self, opts: CommandGlobalOpts) -> miette::Result<()> { - print_deprecated_warning(&opts, &self.name(), "kafka-inlet")?; + print_warning_for_deprecated_flag_replaced(&opts, &self.name(), "kafka-inlet")?; crate::kafka::inlet::list::ListCommand { node_opts: self.node_opts, } diff --git a/implementations/rust/ockam/ockam_command/src/node/delete.rs b/implementations/rust/ockam/ockam_command/src/node/delete.rs index 33d873c0f88..ff711d8d7ef 100644 --- a/implementations/rust/ockam/ockam_command/src/node/delete.rs +++ b/implementations/rust/ockam/ockam_command/src/node/delete.rs @@ -1,6 +1,6 @@ use crate::terminal::tui::DeleteCommandTui; use crate::tui::PluralTerm; -use crate::util::{async_cmd, print_deprecated_flag_warning}; +use crate::util::{async_cmd, print_warning_for_deprecated_flag_no_effect}; use crate::{docs, CommandGlobalOpts}; use clap::Args; use colorful::Colorful; @@ -51,7 +51,7 @@ impl DeleteCommand { async fn async_run(&self, opts: CommandGlobalOpts) -> miette::Result<()> { if self.force { - print_deprecated_flag_warning(&opts, "--force")?; + print_warning_for_deprecated_flag_no_effect(&opts, "--force")?; } DeleteTui::run(opts, self.clone()).await } diff --git a/implementations/rust/ockam/ockam_command/src/node/stop.rs b/implementations/rust/ockam/ockam_command/src/node/stop.rs index fde4bcb799f..970472ae01a 100644 --- a/implementations/rust/ockam/ockam_command/src/node/stop.rs +++ b/implementations/rust/ockam/ockam_command/src/node/stop.rs @@ -4,7 +4,7 @@ use miette::miette; use ockam_api::colors::OckamColor; use ockam_api::{color, fmt_info, fmt_ok, fmt_warn}; -use crate::util::{async_cmd, print_deprecated_flag_warning}; +use crate::util::{async_cmd, print_warning_for_deprecated_flag_no_effect}; use crate::{docs, CommandGlobalOpts}; const LONG_ABOUT: &str = include_str!("./static/stop/long_about.txt"); @@ -40,7 +40,7 @@ impl StopCommand { async fn async_run(&self, opts: CommandGlobalOpts) -> miette::Result<()> { if self.force { - print_deprecated_flag_warning(&opts, "--force")?; + print_warning_for_deprecated_flag_no_effect(&opts, "--force")?; } let running_nodes = opts diff --git a/implementations/rust/ockam/ockam_command/src/relay/create.rs b/implementations/rust/ockam/ockam_command/src/relay/create.rs index 95eb5d02bfe..cf7e15e2f33 100644 --- a/implementations/rust/ockam/ockam_command/src/relay/create.rs +++ b/implementations/rust/ockam/ockam_command/src/relay/create.rs @@ -20,7 +20,7 @@ use ockam_multiaddr::{MultiAddr, Protocol}; use crate::node::util::initialize_default_node; use crate::shared_args::RetryOpts; -use crate::util::{print_deprecated_flag_warning, process_nodes_multiaddr}; +use crate::util::{print_warning_for_deprecated_flag_no_effect, process_nodes_multiaddr}; use crate::{docs, Command, CommandGlobalOpts, Error, Result}; const AFTER_LONG_HELP: &str = include_str!("./static/create/after_long_help.txt"); @@ -81,7 +81,7 @@ impl Command for CreateCommand { async fn async_run(self, ctx: &Context, opts: CommandGlobalOpts) -> crate::Result<()> { if self.project_relay { - print_deprecated_flag_warning(&opts, "--project-relay")?; + print_warning_for_deprecated_flag_no_effect(&opts, "--project-relay")?; } initialize_default_node(ctx, &opts).await?; diff --git a/implementations/rust/ockam/ockam_command/src/run/parser/resource/influxdb_inlets.rs b/implementations/rust/ockam/ockam_command/src/run/parser/resource/influxdb_inlets.rs index e7125700213..f2a7a909e0d 100644 --- a/implementations/rust/ockam/ockam_command/src/run/parser/resource/influxdb_inlets.rs +++ b/implementations/rust/ockam/ockam_command/src/run/parser/resource/influxdb_inlets.rs @@ -4,7 +4,7 @@ use serde::{Deserialize, Serialize}; use crate::run::parser::building_blocks::{ArgsToCommands, ResourceNameOrMap}; -use crate::influxdb::inlet::create::InfluxDBCreateCommand; +use crate::influxdb::inlet::create::CreateCommand; use crate::run::parser::resource::utils::parse_cmd_from_args; use crate::{influxdb::inlet, Command, OckamSubcommand}; @@ -15,31 +15,29 @@ pub struct InfluxDBInlets { } impl InfluxDBInlets { - fn get_subcommand(args: &[String]) -> Result { - if let OckamSubcommand::InfluxDBInlet(cmd) = - parse_cmd_from_args(InfluxDBCreateCommand::NAME, args)? + fn get_subcommand(args: &[String]) -> Result { + if let OckamSubcommand::InfluxDBInlet(cmd) = parse_cmd_from_args(CreateCommand::NAME, args)? { let inlet::InfluxDBInletSubCommand::Create(c) = cmd.subcommand; return Ok(c); } Err(miette!(format!( "Failed to parse {} command", - color_primary(InfluxDBCreateCommand::NAME) + color_primary(CreateCommand::NAME) ))) } pub fn into_parsed_commands( self, default_node_name: Option<&String>, - ) -> Result> { + ) -> Result> { match self.influxdb_inlets { Some(c) => { - let mut cmds = - c.into_commands_with_name_arg(Self::get_subcommand, Some("alias"))?; + let mut cmds = c.into_commands(Self::get_subcommand)?; if let Some(node_name) = default_node_name.as_ref() { for cmd in cmds.iter_mut() { - if cmd.tcp_inlet.at.is_none() { - cmd.tcp_inlet.at = Some(node_name.to_string()) + if cmd.at.is_none() { + cmd.at = Some(node_name.to_string()) } } } @@ -65,7 +63,6 @@ mod tests { lease-manager-route: /service/test ti2: from: '6061' - alias: my_inlet lease-manager-route: /service/test "#; let parsed: InfluxDBInlets = serde_yaml::from_str(named).unwrap(); @@ -74,12 +71,12 @@ mod tests { .into_parsed_commands(Some(&default_node_name)) .unwrap(); assert_eq!(cmds.len(), 2); - assert_eq!(cmds[0].tcp_inlet.alias.as_ref().unwrap(), "ti1"); - assert_eq!(cmds[0].tcp_inlet.from, HostnamePort::new("127.0.0.1", 6060)); - assert_eq!(cmds[0].tcp_inlet.at.as_ref().unwrap(), "n"); - assert_eq!(cmds[1].tcp_inlet.alias.as_ref().unwrap(), "my_inlet"); - assert_eq!(cmds[1].tcp_inlet.from, HostnamePort::new("127.0.0.1", 6061)); - assert_eq!(cmds[1].tcp_inlet.at.as_ref(), Some(&default_node_name)); + assert_eq!(cmds[0].name.as_ref().unwrap(), "ti1"); + assert_eq!(cmds[0].from, HostnamePort::new("127.0.0.1", 6060)); + assert_eq!(cmds[0].at.as_ref().unwrap(), "n"); + assert_eq!(cmds[1].name.as_ref().unwrap(), "ti2"); + assert_eq!(cmds[1].from, HostnamePort::new("127.0.0.1", 6061)); + assert_eq!(cmds[1].at.as_ref(), Some(&default_node_name)); let unnamed = r#" influxdb_inlets: @@ -93,9 +90,9 @@ mod tests { .into_parsed_commands(Some(&default_node_name)) .unwrap(); assert_eq!(cmds.len(), 2); - assert_eq!(cmds[0].tcp_inlet.from, HostnamePort::new("127.0.0.1", 6060)); - assert_eq!(cmds[0].tcp_inlet.at.as_ref().unwrap(), "n"); - assert_eq!(cmds[1].tcp_inlet.from, HostnamePort::new("127.0.0.1", 6061)); - assert_eq!(cmds[1].tcp_inlet.at.as_ref(), Some(&default_node_name)); + assert_eq!(cmds[0].from, HostnamePort::new("127.0.0.1", 6060)); + assert_eq!(cmds[0].at.as_ref().unwrap(), "n"); + assert_eq!(cmds[1].from, HostnamePort::new("127.0.0.1", 6061)); + assert_eq!(cmds[1].at.as_ref(), Some(&default_node_name)); } } diff --git a/implementations/rust/ockam/ockam_command/src/run/parser/resource/influxdb_outlets.rs b/implementations/rust/ockam/ockam_command/src/run/parser/resource/influxdb_outlets.rs index d73d71e1e61..31e47b88c2f 100644 --- a/implementations/rust/ockam/ockam_command/src/run/parser/resource/influxdb_outlets.rs +++ b/implementations/rust/ockam/ockam_command/src/run/parser/resource/influxdb_outlets.rs @@ -4,7 +4,7 @@ use serde::{Deserialize, Serialize}; use crate::run::parser::building_blocks::{ArgsToCommands, ResourceNameOrMap}; -use crate::influxdb::outlet::create::InfluxDBCreateCommand; +use crate::influxdb::outlet::create::CreateCommand; use crate::run::parser::resource::utils::parse_cmd_from_args; use crate::{influxdb::outlet, Command, OckamSubcommand}; @@ -15,30 +15,30 @@ pub struct InfluxDBOutlets { } impl InfluxDBOutlets { - fn get_subcommand(args: &[String]) -> Result { + fn get_subcommand(args: &[String]) -> Result { if let OckamSubcommand::InfluxDBOutlet(cmd) = - parse_cmd_from_args(InfluxDBCreateCommand::NAME, args)? + parse_cmd_from_args(CreateCommand::NAME, args)? { let outlet::InfluxDBOutletSubCommand::Create(c) = cmd.subcommand; return Ok(c); } Err(miette!(format!( "Failed to parse {} command", - color_primary(InfluxDBCreateCommand::NAME) + color_primary(CreateCommand::NAME) ))) } pub fn into_parsed_commands( self, default_node_name: Option<&String>, - ) -> Result> { + ) -> Result> { match self.influxdb_outlets { Some(c) => { let mut cmds = c.into_commands_with_name_arg(Self::get_subcommand, Some("from"))?; if let Some(node_name) = default_node_name { for cmd in cmds.iter_mut() { - if cmd.tcp_outlet.at.is_none() { - cmd.tcp_outlet.at = Some(node_name.to_string()) + if cmd.at.is_none() { + cmd.at = Some(node_name.to_string()) } } } @@ -70,6 +70,6 @@ mod tests { .into_parsed_commands(Some(&default_node_name)) .unwrap(); assert_eq!(cmds.len(), 1); - assert_eq!(cmds[0].tcp_outlet.to, HostnamePort::new("127.0.0.1", 6060)); + assert_eq!(cmds[0].to, HostnamePort::new("127.0.0.1", 6060)); } } diff --git a/implementations/rust/ockam/ockam_command/src/run/parser/resource/kafka_inlet.rs b/implementations/rust/ockam/ockam_command/src/run/parser/resource/kafka_inlet.rs index 4570d34c674..4e9af66f0fb 100644 --- a/implementations/rust/ockam/ockam_command/src/run/parser/resource/kafka_inlet.rs +++ b/implementations/rust/ockam/ockam_command/src/run/parser/resource/kafka_inlet.rs @@ -33,7 +33,7 @@ impl KafkaInlet { ) -> Result> { match self.kafka_inlet { Some(c) => { - let mut cmds = c.into_commands_with_name_arg(Self::get_subcommand, Some("addr"))?; + let mut cmds = c.into_commands(Self::get_subcommand)?; if let Some(node_name) = default_node_name { for cmd in cmds.iter_mut() { if cmd.node_opts.at_node.is_none() { @@ -88,7 +88,7 @@ mod tests { &MultiAddr::from_string("/ip4/192.168.1.2/tcp/4000").unwrap(), ); assert_eq!(cmds[0].node_opts.at_node, Some("node_name".to_string())); - assert!(!cmds[0].avoid_publishing); + assert!(!cmds[0].no_publishing); assert_eq!( cmds[0].encrypted_fields, @@ -106,12 +106,12 @@ mod tests { .into_parsed_commands(Some(&default_node_name)) .unwrap(); assert_eq!(cmds.len(), 1); - assert_eq!(cmds[0].addr, "ki"); + assert_eq!(cmds[0].name, "ki"); assert_eq!( cmds[0].consumer.as_ref().unwrap(), &MultiAddr::from_string("/dnsaddr/kafka-outlet.local/tcp/5000").unwrap(), ); - assert!(cmds[0].avoid_publishing); + assert!(cmds[0].no_publishing); assert_eq!(cmds[0].node_opts.at_node, Some(default_node_name.clone())); let list = r#" @@ -128,7 +128,7 @@ mod tests { cmds[0].consumer.as_ref().unwrap(), &MultiAddr::from_string("/dnsaddr/kafka-outlet.local/tcp/5000").unwrap(), ); - assert!(cmds[0].avoid_publishing); + assert!(cmds[0].no_publishing); assert_eq!(cmds[0].node_opts.at_node, Some(default_node_name)); } } diff --git a/implementations/rust/ockam/ockam_command/src/run/parser/resource/kafka_outlet.rs b/implementations/rust/ockam/ockam_command/src/run/parser/resource/kafka_outlet.rs index 113fe30b3ee..95387c20a9f 100644 --- a/implementations/rust/ockam/ockam_command/src/run/parser/resource/kafka_outlet.rs +++ b/implementations/rust/ockam/ockam_command/src/run/parser/resource/kafka_outlet.rs @@ -35,7 +35,7 @@ impl KafkaOutlet { ) -> Result> { match self.kafka_outlet { Some(c) => { - let mut cmds = c.into_commands_with_name_arg(Self::get_subcommand, Some("addr"))?; + let mut cmds = c.into_commands(Self::get_subcommand)?; if let Some(node_name) = default_node_name { for cmd in cmds.iter_mut() { if cmd.node_opts.at_node.is_none() { @@ -84,7 +84,7 @@ mod tests { let cmds = parsed .into_parsed_commands(Some(&default_node_name)) .unwrap(); - assert_eq!(cmds[0].addr, "ko".to_string()); + assert_eq!(cmds[0].name, "ko".to_string()); assert_eq!(cmds[0].node_opts.at_node.as_ref(), Some(&default_node_name)); // check if the default node name is used when the configuration does not specify it diff --git a/implementations/rust/ockam/ockam_command/src/run/parser/resource/tcp_inlets.rs b/implementations/rust/ockam/ockam_command/src/run/parser/resource/tcp_inlets.rs index bb1b184a03e..7ae29677e26 100644 --- a/implementations/rust/ockam/ockam_command/src/run/parser/resource/tcp_inlets.rs +++ b/implementations/rust/ockam/ockam_command/src/run/parser/resource/tcp_inlets.rs @@ -33,8 +33,7 @@ impl TcpInlets { ) -> Result> { match self.tcp_inlets { Some(c) => { - let mut cmds = - c.into_commands_with_name_arg(Self::get_subcommand, Some("alias"))?; + let mut cmds = c.into_commands(Self::get_subcommand)?; if let Some(node_name) = default_node_name.as_ref() { for cmd in cmds.iter_mut() { if cmd.at.is_none() { @@ -63,7 +62,6 @@ mod tests { at: n ti2: from: '6061' - alias: my_inlet "#; let parsed: TcpInlets = serde_yaml::from_str(named).unwrap(); let default_node_name = "n1".to_string(); @@ -71,10 +69,10 @@ mod tests { .into_parsed_commands(Some(&default_node_name)) .unwrap(); assert_eq!(cmds.len(), 2); - assert_eq!(cmds[0].alias.as_ref().unwrap(), "ti1"); + assert_eq!(cmds[0].name.as_ref().unwrap(), "ti1"); assert_eq!(cmds[0].from, HostnamePort::new("127.0.0.1", 6060)); assert_eq!(cmds[0].at.as_ref().unwrap(), "n"); - assert_eq!(cmds[1].alias.as_ref().unwrap(), "my_inlet"); + assert_eq!(cmds[1].name.as_ref().unwrap(), "ti2"); assert_eq!(cmds[1].from, HostnamePort::new("127.0.0.1", 6061)); assert_eq!(cmds[1].at.as_ref(), Some(&default_node_name)); diff --git a/implementations/rust/ockam/ockam_command/src/sidecar/secure_relay_inlet.rs b/implementations/rust/ockam/ockam_command/src/sidecar/secure_relay_inlet.rs index 83175753530..8a81d0e9fd4 100644 --- a/implementations/rust/ockam/ockam_command/src/sidecar/secure_relay_inlet.rs +++ b/implementations/rust/ockam/ockam_command/src/sidecar/secure_relay_inlet.rs @@ -1,17 +1,15 @@ +use crate::run::Config; +use crate::tcp::inlet::create::tcp_inlet_default_from_addr; +use crate::util::async_cmd; +use crate::util::parsers::hostname_parser; +use crate::{docs, CommandGlobalOpts}; use clap::Args; use colorful::Colorful; use indoc::formatdoc; use ockam::transport::HostnamePort; use ockam_api::fmt_info; - -use crate::{docs, CommandGlobalOpts}; use ockam_node::Context; -use crate::run::Config; -use crate::tcp::inlet::create::default_from_addr; -use crate::util::async_cmd; -use crate::util::parsers::hostname_parser; - const LONG_ABOUT: &str = include_str!("./static/secure_relay_inlet/long_about.txt"); const AFTER_LONG_HELP: &str = include_str!("./static/secure_relay_inlet/after_long_help.txt"); @@ -27,7 +25,7 @@ pub struct SecureRelayInlet { pub service_name: String, /// Address on which to accept tcp connections. - #[arg(long, display_order = 900, id = "SOCKET_ADDRESS", default_value_t = default_from_addr(), value_parser = hostname_parser)] + #[arg(long, display_order = 900, id = "SOCKET_ADDRESS", default_value_t = tcp_inlet_default_from_addr(), value_parser = hostname_parser)] from: HostnamePort, /// Just print the recipe and exit diff --git a/implementations/rust/ockam/ockam_command/src/tcp/inlet/create.rs b/implementations/rust/ockam/ockam_command/src/tcp/inlet/create.rs index 35fd93087ab..47d389d7404 100644 --- a/implementations/rust/ockam/ockam_command/src/tcp/inlet/create.rs +++ b/implementations/rust/ockam/ockam_command/src/tcp/inlet/create.rs @@ -35,7 +35,10 @@ use ockam_node::compat::asynchronous::resolve_peer; use crate::util::parsers::duration_parser; use crate::util::parsers::hostname_parser; -use crate::util::{find_available_port, port_is_free_guard, process_nodes_multiaddr}; +use crate::util::{ + find_available_port, port_is_free_guard, print_warning_for_deprecated_flag_replaced, + process_nodes_multiaddr, +}; const AFTER_LONG_HELP: &str = include_str!("./static/create/after_long_help.txt"); @@ -43,12 +46,16 @@ const AFTER_LONG_HELP: &str = include_str!("./static/create/after_long_help.txt" #[derive(Clone, Debug, Args)] #[command(after_long_help = docs::after_help(AFTER_LONG_HELP))] pub struct CreateCommand { + /// Assign a name to this TCP Inlet + #[arg(id = "NAME", value_parser = alias_parser)] + pub name: Option, + /// Node on which to start the TCP Inlet. #[arg(long, display_order = 900, id = "NODE_NAME", value_parser = extract_address_value)] pub at: Option, - /// Address on which to accept TCP connections. - #[arg(long, display_order = 900, id = "SOCKET_ADDRESS", hide_default_value = true, default_value_t = default_from_addr(), value_parser = hostname_parser)] + /// Address on which to accept TCP connections. If not set, a random port will be used. + #[arg(long, display_order = 900, id = "SOCKET_ADDRESS", hide_default_value = true, default_value_t = tcp_inlet_default_from_addr(), value_parser = hostname_parser)] pub from: HostnamePort, /// Route to a TCP Outlet or the name of the TCP Outlet service you want to connect to. @@ -60,7 +67,7 @@ pub struct CreateCommand { /// or just the name of the service as `outlet` or `/service/outlet`. /// If you are passing just the service name, consider using `--via` to specify the /// relay name (e.g. `ockam tcp-inlet create --to outlet --via myrelay`). - #[arg(long, display_order = 900, id = "ROUTE", default_value_t = default_to_addr())] + #[arg(long, display_order = 900, id = "ROUTE", default_value_t = tcp_inlet_default_to_addr())] pub to: String, /// Name of the relay that this TCP Inlet will use to connect to the TCP Outlet. @@ -79,7 +86,7 @@ pub struct CreateCommand { #[arg(long, name = "AUTHORIZED", display_order = 900)] pub authorized: Option, - /// Assign a name to this TCP Inlet. + /// [DEPRECATED] Use the positional argument instead #[arg(long, display_order = 900, id = "ALIAS", value_parser = alias_parser)] pub alias: Option, @@ -88,7 +95,6 @@ pub struct CreateCommand { /// /// You can check the fallback policy with `ockam policy show --resource-type tcp-inlet`. #[arg( - hide = true, long, visible_alias = "expression", display_order = 900, @@ -150,12 +156,12 @@ pub struct CreateCommand { pub tls_certificate_provider: Option, } -pub(crate) fn default_from_addr() -> HostnamePort { +pub(crate) fn tcp_inlet_default_from_addr() -> HostnamePort { let port = find_available_port().expect("Failed to find available port"); HostnamePort::new("127.0.0.1", port) } -fn default_to_addr() -> String { +pub(crate) fn tcp_inlet_default_to_addr() -> String { "/project//service/forward_to_/secure/api/service/".to_string() } @@ -193,7 +199,7 @@ impl Command for CreateCommand { ctx, &cmd.from, &cmd.to(), - cmd.alias.as_ref().expect("The `alias` argument should be set to its default value if not provided"), + cmd.name.as_ref().expect("The `name` argument should be set to its default value if not provided"), &cmd.authorized, &cmd.allow, cmd.connection_wait, @@ -318,17 +324,35 @@ impl CreateCommand { } pub async fn parse_args(mut self, opts: &CommandGlobalOpts) -> miette::Result { - self.alias = self.alias.or_else(|| Some(random_name())); + if let Some(alias) = self.alias.as_ref() { + print_warning_for_deprecated_flag_replaced( + opts, + "alias", + "the positional argument", + )?; + if self.name.is_none() { + self.name = Some(alias.clone()); + } else { + return Err(miette!( + "--alias is deprecated and can't be used together with the positional argument", + )); + } + } else { + self.name = self.name.or_else(|| Some(random_name())); + } + let from = resolve_peer(self.from.to_string()) .await .into_diagnostic()?; port_is_free_guard(&from)?; + self.to = Self::parse_arg_to(&opts.state, self.to, self.via.as_ref()).await?; if self.to().matches(0, &[proto::Project::CODE.into()]) && self.authorized.is_some() { return Err(miette!( "--authorized can not be used with project addresses" ))?; } + self.tls_certificate_provider = if let Some(tls_certificate_provider) = &self.tls_certificate_provider { @@ -338,16 +362,17 @@ impl CreateCommand { } else { None }; + Ok(self) } - async fn parse_arg_to( + pub(crate) async fn parse_arg_to( state: &CliState, to: impl Into, via: Option<&String>, ) -> miette::Result { let mut to = to.into(); - let to_is_default = to == default_to_addr(); + let to_is_default = to == tcp_inlet_default_to_addr(); let mut service_name = "outlet".to_string(); let relay_name = via.cloned().unwrap_or("default".to_string()); @@ -381,7 +406,7 @@ impl CreateCommand { // "to" refers to the service name service_name = to.to_string(); // and we set "to" to the default route, so we can do the replacements later - to = default_to_addr(); + to = tcp_inlet_default_to_addr(); } } @@ -456,7 +481,7 @@ mod tests { } // "to" default value - let res = CreateCommand::parse_arg_to(&state, default_to_addr(), None) + let res = CreateCommand::parse_arg_to(&state, tcp_inlet_default_to_addr(), None) .await .unwrap(); assert_eq!( @@ -488,7 +513,7 @@ mod tests { // "via" argument is used to replace the relay name let cases = [ ( - default_to_addr(), + tcp_inlet_default_to_addr(), "myrelay", "/project/p1/service/forward_to_myrelay/secure/api/service/outlet", ), diff --git a/implementations/rust/ockam/ockam_command/src/tcp/outlet/create.rs b/implementations/rust/ockam/ockam_command/src/tcp/outlet/create.rs index 68bf36aa713..8ebca6989fe 100644 --- a/implementations/rust/ockam/ockam_command/src/tcp/outlet/create.rs +++ b/implementations/rust/ockam/ockam_command/src/tcp/outlet/create.rs @@ -60,7 +60,6 @@ pub struct CreateCommand { /// /// You can check the fallback policy with `ockam policy show --resource-type tcp-outlet`. #[arg( - hide = true, long, visible_alias = "expression", display_order = 904, diff --git a/implementations/rust/ockam/ockam_command/src/util/mod.rs b/implementations/rust/ockam/ockam_command/src/util/mod.rs index b05d5f9d147..10999478ff5 100644 --- a/implementations/rust/ockam/ockam_command/src/util/mod.rs +++ b/implementations/rust/ockam/ockam_command/src/util/mod.rs @@ -209,7 +209,11 @@ pub fn port_is_free_guard(address: &SocketAddr) -> Result<()> { Ok(()) } -pub fn print_deprecated_warning(opts: &CommandGlobalOpts, old: &str, new: &str) -> Result<()> { +pub fn print_warning_for_deprecated_flag_replaced( + opts: &CommandGlobalOpts, + old: &str, + new: &str, +) -> Result<()> { opts.terminal.write_line(fmt_warn!( "{} is deprecated. Please use {} instead", color_primary(old), @@ -218,7 +222,10 @@ pub fn print_deprecated_warning(opts: &CommandGlobalOpts, old: &str, new: &str) Ok(()) } -pub fn print_deprecated_flag_warning(opts: &CommandGlobalOpts, deprecated: &str) -> Result<()> { +pub fn print_warning_for_deprecated_flag_no_effect( + opts: &CommandGlobalOpts, + deprecated: &str, +) -> Result<()> { opts.terminal.write_line(fmt_warn!( "{} is deprecated. This flag has no effect", color_primary(deprecated), diff --git a/implementations/rust/ockam/ockam_command/tests/bats/local/kafka.bats b/implementations/rust/ockam/ockam_command/tests/bats/local/kafka.bats index 788ca45f8c5..2f94e87a0e7 100644 --- a/implementations/rust/ockam/ockam_command/tests/bats/local/kafka.bats +++ b/implementations/rust/ockam/ockam_command/tests/bats/local/kafka.bats @@ -48,7 +48,7 @@ teardown() { run_failure $OCKAM kafka-inlet create --to /secure/api --from $(random_port) # Create a second inlet port="$(random_port)" - run_success $OCKAM kafka-inlet create --to /secure/api --from $port --addr inlet2 --jq '.' + run_success $OCKAM kafka-inlet create inlet2 --to /secure/api --from $port --jq '.' assert_output --partial "\"from\": \"127.0.0.1:$port\"" assert_output --partial "\"to\": \"/secure/api\"" @@ -75,3 +75,17 @@ teardown() { run_success $OCKAM kafka-inlet list --jq '.[].addr' assert_output --partial "inlet2" } + +@test "kafka - create with deprecated alias --addr flag" { + port="$(random_port)" + + # Fail if both addr and name are used + run_failure $OCKAM kafka-inlet create kinlet-name --addr kinlet --to /secure/api --from $port --jq '.' + + run_success $OCKAM kafka-inlet create --addr kinlet --to /secure/api --from $port --jq '.' + assert_output --partial "\"from\": \"127.0.0.1:$port\"" + assert_output --partial "\"to\": \"/secure/api\"" + + run_success $OCKAM kafka-inlet show kinlet --jq '.' + assert_output --partial "kinlet" +} diff --git a/implementations/rust/ockam/ockam_command/tests/bats/local/portals.bats b/implementations/rust/ockam/ockam_command/tests/bats/local/portals.bats index 07679e386d7..4356e110437 100644 --- a/implementations/rust/ockam/ockam_command/tests/bats/local/portals.bats +++ b/implementations/rust/ockam/ockam_command/tests/bats/local/portals.bats @@ -50,7 +50,7 @@ teardown() { assert_output --partial "/service/outlet" inlet_port="$(random_port)" - run_success $OCKAM tcp-inlet create --at /node/n2 --from 127.0.0.1:$inlet_port --to /node/n1/service/outlet --alias "test-inlet" + run_success $OCKAM tcp-inlet create "test-inlet" --at /node/n2 --from 127.0.0.1:$inlet_port --to /node/n1/service/outlet run_success $OCKAM tcp-inlet create --at /node/n2 --from 6102 --to /node/n1/service/outlet sleep 1 @@ -94,6 +94,19 @@ teardown() { run_success "$OCKAM" node create n1 run_success "$OCKAM" node create n2 + port="$(random_port)" + run_success $OCKAM tcp-inlet create tcp-inlet-2 --at /node/n2 --from $port --to /node/n1/service/outlet + sleep 1 + + run_success $OCKAM tcp-inlet list --at /node/n2 + assert_output --partial "tcp-inlet-2" + assert_output --partial "127.0.0.1:$port" +} + +@test "portals - list inlets on a node, using deprecated --alias flag" { + run_success "$OCKAM" node create n1 + run_success "$OCKAM" node create n2 + port="$(random_port)" run_success $OCKAM tcp-inlet create --at /node/n2 --from $port --to /node/n1/service/outlet --alias tcp-inlet-2 sleep 1 @@ -120,7 +133,7 @@ teardown() { run_success "$OCKAM" node create n2 port="$(random_port)" - run_success $OCKAM tcp-inlet create --at /node/n2 --from $port --to /node/n1/service/outlet --alias "test-inlet" + run_success $OCKAM tcp-inlet create "test-inlet" --at /node/n2 --from $port --to /node/n1/service/outlet sleep 1 run_success $OCKAM tcp-inlet show "test-inlet" --at /node/n2 @@ -231,9 +244,9 @@ teardown() { run_success "$OCKAM" tcp-outlet create --at n --to "$port" port="$(random_port)" - run_success "$OCKAM" tcp-inlet create --at n --from "$port" --to "/node/n/service/outlet" --alias i + run_success "$OCKAM" tcp-inlet create i --at n --from "$port" --to "/node/n/service/outlet" port="$(random_port)" - run_failure "$OCKAM" tcp-inlet create --at n --from "$port" --to "/node/n/service/outlet" --alias i + run_failure "$OCKAM" tcp-inlet create i --at n --from "$port" --to "/node/n/service/outlet" } @test "portals - fail to create two TCP inlets at the same socket address" {