diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 67d5e6e..f7319e2 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -27,6 +27,21 @@ jobs: - name: Failed run: exit 1 if: contains(needs.*.result, 'failure') || contains(needs.*.result, 'cancelled') || contains(needs.*.result, 'skipped') + test: + runs-on: ubuntu-latest + steps: + - name: checkout + uses: actions/checkout@v4 + - name: install rust + uses: dtolnay/rust-toolchain@stable + with: + toolchain: stable + components: rustfmt, clippy + - name: install protoc + uses: arduino/setup-protoc@v3 + - uses: Swatinem/rust-cache@v2 + - name: Run cargo tests + run: cargo test lint: runs-on: ubuntu-latest steps: diff --git a/Cargo.lock b/Cargo.lock index b1df8e9..76ec283 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -726,9 +726,9 @@ checksum = "d231dfb89cfffdbc30e7fc41579ed6066ad03abda9e567ccafae602b97ec5024" [[package]] name = "http" -version = "1.1.0" +version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "21b9ddb458710bc376481b842f5da65cdf31522de232c1ca8146abce2a358258" +checksum = "f16ca2af56261c99fba8bac40a10251ce8188205a4c448fbb745a2e4daa76fea" dependencies = [ "bytes", "fnv", @@ -1592,9 +1592,9 @@ checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" [[package]] name = "streamstore" -version = "0.3.0" +version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0e15080fe3386ceb57043fd17cc52ff3e3c0a0261a123aedc059c55b9d705546" +checksum = "723dafc9eaa6f187ea9f74dc6b9d20ebe4a6eb38b9de20b0ac69fdad279d1de4" dependencies = [ "async-stream", "backon", @@ -1631,6 +1631,7 @@ dependencies = [ "config", "dirs", "futures", + "http", "humantime", "miette", "pin-project-lite", diff --git a/Cargo.toml b/Cargo.toml index 480006f..3c5549e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,6 +17,7 @@ colored = "2.1.0" config = "0.14.1" dirs = "5.0.1" futures = "0.3.31" +http = "1.2.0" humantime = "2.1.0" miette = { version = "7.2.0", features = ["fancy"] } pin-project-lite = "0.2" @@ -24,11 +25,10 @@ pin-utils = "0.1.0" rand = "0.8.5" serde = { version = "1.0.214", features = ["derive"] } serde_json = "1.0.132" -streamstore = "0.3.0" +streamstore = "0.3.1" thiserror = "2.0.6" tokio = { version = "1.41.1", features = ["full"] } tokio-stream = { version = "0.1.16", features = ["io-util"] } toml = "0.8.19" tracing = "0.1.40" tracing-subscriber = { version = "0.3.18", features = ["env-filter"] } - diff --git a/src/error.rs b/src/error.rs index 1e7beec..6e696a1 100644 --- a/src/error.rs +++ b/src/error.rs @@ -25,9 +25,9 @@ pub enum S2CliError { #[diagnostic(transparent)] Config(#[from] S2ConfigError), - #[error(transparent)] - #[diagnostic(help("Are you trying to operate on an invalid basin?"))] - ConvertError(#[from] ConvertError), + #[error("Invalid CLI arguments: {0}")] + #[diagnostic(transparent)] + InvalidArgs(miette::Report), #[error("Unable to load S2 endpoints from environment")] #[diagnostic(help( @@ -133,3 +133,14 @@ impl ServiceError { } } } + +#[derive(Debug, Error, Diagnostic)] +pub enum BasinNameOrUriParseError { + #[error(transparent)] + #[diagnostic(help("Are you trying to operate on an invalid basin?"))] + BasinName(#[from] ConvertError), + + #[error("Invalid S2 URI: {0}")] + #[diagnostic(transparent)] + InvalidUri(miette::Report), +} diff --git a/src/main.rs b/src/main.rs index 5631709..25d1cad 100644 --- a/src/main.rs +++ b/src/main.rs @@ -18,10 +18,9 @@ use streamstore::{ batching::AppendRecordsBatchingOpts, client::{BasinClient, Client, ClientConfig, S2Endpoints, StreamClient}, types::{ - AppendRecordBatch, BasinInfo, BasinName, CommandRecord, ConvertError, FencingToken, - MeteredBytes as _, ReadOutput, StreamInfo, + AppendRecordBatch, BasinInfo, CommandRecord, ConvertError, FencingToken, MeteredBytes as _, + ReadOutput, StreamInfo, }, - HeaderValue, }; use tokio::{ fs::{File, OpenOptions}, @@ -36,7 +35,10 @@ use tokio_stream::{ }; use tracing::trace; use tracing_subscriber::{fmt::format::FmtSpan, layer::SubscriberExt, util::SubscriberInitExt}; -use types::{BasinConfig, StreamConfig, RETENTION_POLICY_PATH, STORAGE_CLASS_PATH}; +use types::{ + BasinConfig, BasinNameAndMaybeStreamUri, BasinNameAndStreamArgs, BasinNameOnlyUri, + StreamConfig, RETENTION_POLICY_PATH, STORAGE_CLASS_PATH, +}; mod account; mod basin; @@ -93,7 +95,7 @@ enum Commands { /// Create a basin. CreateBasin { /// Name of the basin to create. - basin: BasinName, + basin: BasinNameOnlyUri, #[command(flatten)] config: BasinConfig, @@ -102,19 +104,19 @@ enum Commands { /// Delete a basin. DeleteBasin { /// Name of the basin to delete. - basin: BasinName, + basin: BasinNameOnlyUri, }, /// Get basin config. GetBasinConfig { /// Basin name to get config for. - basin: BasinName, + basin: BasinNameOnlyUri, }, /// Reconfigure a basin. ReconfigureBasin { /// Name of the basin to reconfigure. - basin: BasinName, + basin: BasinNameOnlyUri, /// Configuration to apply. #[command(flatten)] @@ -122,9 +124,11 @@ enum Commands { }, /// List streams. + #[command(alias = "ls")] ListStreams { - /// Name of the basin to manage. - basin: BasinName, + /// Name of the basin to manage or S2 URI with basin and prefix. + #[arg(value_name = "BASIN|S2_URI")] + basin: BasinNameAndMaybeStreamUri, /// Filter to stream names that begin with this prefix. #[arg(short = 'p', long)] @@ -141,11 +145,8 @@ enum Commands { /// Create a stream. CreateStream { - /// Name of the basin to manage. - basin: BasinName, - - /// Name of the stream to create. - stream: String, + #[command(flatten)] + args: BasinNameAndStreamArgs, /// Configuration to apply. #[command(flatten)] @@ -153,30 +154,22 @@ enum Commands { }, /// Delete a stream. + #[command(alias = "rm")] DeleteStream { - /// Name of the basin to manage. - basin: BasinName, - - /// Name of the stream to delete. - stream: String, + #[command(flatten)] + args: BasinNameAndStreamArgs, }, /// Get stream config. GetStreamConfig { - /// Name of the basin to manage. - basin: BasinName, - - /// Name of the stream to get config for. - stream: String, + #[command(flatten)] + args: BasinNameAndStreamArgs, }, /// Reconfigure a stream. ReconfigureStream { - /// Name of the basin to manage. - basin: BasinName, - - /// Name of the stream to reconfigure. - stream: String, + #[command(flatten)] + args: BasinNameAndStreamArgs, /// Configuration to apply. #[command(flatten)] @@ -185,11 +178,8 @@ enum Commands { /// Get the next sequence number that will be assigned by a stream. CheckTail { - /// Name of the basin. - basin: BasinName, - - /// Name of the stream. - stream: String, + #[command(flatten)] + args: BasinNameAndStreamArgs, }, /// Set the trim point for the stream. @@ -197,11 +187,8 @@ enum Commands { /// Trimming is eventually consistent, and trimmed records may be visible /// for a brief period. Trim { - /// Name of the basin. - basin: BasinName, - - /// Name of the stream. - stream: String, + #[command(flatten)] + args: BasinNameAndStreamArgs, /// Earliest sequence number that should be retained. /// This sequence number is only allowed to advance, @@ -225,11 +212,8 @@ enum Commands { /// Note that fencing is a cooperative mechanism, /// and it is only enforced when a token is provided. Fence { - /// Name of the basin. - basin: BasinName, - - /// Name of the stream. - stream: String, + #[command(flatten)] + args: BasinNameAndStreamArgs, /// New fencing token specified in hex. /// It may be upto 16 bytes, and can be empty. @@ -249,11 +233,8 @@ enum Commands { /// /// Currently, only newline delimited records are supported. Append { - /// Name of the basin. - basin: BasinName, - - /// Name of the stream. - stream: String, + #[command(flatten)] + args: BasinNameAndStreamArgs, /// Enforce fencing token specified in hex. #[arg(short = 'f', long, value_parser = parse_fencing_token)] @@ -275,11 +256,8 @@ enum Commands { /// If a limit if specified, reading will stop when the limit is reached or there are no more records on the stream. /// If a limit is not specified, the reader will keep tailing and wait for new records. Read { - /// Name of the basin. - basin: BasinName, - - /// Name of the stream. - stream: String, + #[command(flatten)] + args: BasinNameAndStreamArgs, /// Starting sequence number (inclusive). #[arg(short = 's', long, default_value_t = 0)] @@ -301,11 +279,8 @@ enum Commands { /// Ping the stream to get append acknowledgement and end-to-end latencies. Ping { - /// Name of the basin. - basin: BasinName, - - /// Name of the stream. - stream: String, + #[command(flatten)] + args: BasinNameAndStreamArgs, /// Send a batch after this interval. /// @@ -425,7 +400,7 @@ fn parse_fencing_token(s: &str) -> Result { fn client_config(auth_token: String) -> Result { let endpoints = S2Endpoints::from_env().map_err(S2CliError::EndpointsFromEnv)?; let client_config = ClientConfig::new(auth_token.to_string()) - .with_user_agent("s2-cli".parse::().expect("valid user agent")) + .with_user_agent("s2-cli".parse().expect("valid user agent")) .with_endpoints(endpoints) .with_request_timeout(Duration::from_secs(30)); Ok(client_config) @@ -506,7 +481,7 @@ async fn run() -> Result<(), S2CliError> { None => (None, None), }; account_service - .create_basin(basin, storage_class, retention_policy) + .create_basin(basin.into(), storage_class, retention_policy) .await?; eprintln!("{}", "✓ Basin created".green().bold()); @@ -516,7 +491,7 @@ async fn run() -> Result<(), S2CliError> { let cfg = config::load_config(&config_path)?; let client_config = client_config(cfg.auth_token)?; let account_service = AccountService::new(Client::new(client_config)); - account_service.delete_basin(basin).await?; + account_service.delete_basin(basin.into()).await?; eprintln!("{}", "✓ Basin deletion requested".green().bold()); } @@ -524,7 +499,7 @@ async fn run() -> Result<(), S2CliError> { let cfg = config::load_config(&config_path)?; let client_config = client_config(cfg.auth_token)?; let account_service = AccountService::new(Client::new(client_config)); - let basin_config = account_service.get_basin_config(basin).await?; + let basin_config = account_service.get_basin_config(basin.into()).await?; let basin_config: BasinConfig = basin_config.into(); println!("{}", serde_json::to_string_pretty(&basin_config)?); } @@ -544,7 +519,7 @@ async fn run() -> Result<(), S2CliError> { } account_service - .reconfigure_basin(basin, config.into(), mask) + .reconfigure_basin(basin.into(), config.into(), mask) .await?; } @@ -554,6 +529,20 @@ async fn run() -> Result<(), S2CliError> { start_after, limit, } => { + let BasinNameAndMaybeStreamUri { + basin, + stream: maybe_prefix, + } = basin; + let prefix = match (maybe_prefix, prefix) { + (Some(_), Some(_)) => { + return Err(S2CliError::InvalidArgs(miette::miette!( + help = "Make sure to provide the prefix once either using '--prefix' opt or in URI like 's2://basin-name/prefix'", + "Multiple prefixes provided" + ))); + } + (Some(s), None) | (None, Some(s)) => Some(s), + (None, None) => None, + }; let cfg = config::load_config(&config_path)?; let client_config = client_config(cfg.auth_token)?; let basin_client = BasinClient::new(client_config, basin); @@ -585,11 +574,8 @@ async fn run() -> Result<(), S2CliError> { } } - Commands::CreateStream { - basin, - stream, - config, - } => { + Commands::CreateStream { args, config } => { + let (basin, stream) = args.try_into_parts()?; let cfg = config::load_config(&config_path)?; let client_config = client_config(cfg.auth_token)?; let basin_client = BasinClient::new(client_config, basin); @@ -599,7 +585,8 @@ async fn run() -> Result<(), S2CliError> { eprintln!("{}", "✓ Stream created".green().bold()); } - Commands::DeleteStream { basin, stream } => { + Commands::DeleteStream { args } => { + let (basin, stream) = args.try_into_parts()?; let cfg = config::load_config(&config_path)?; let client_config = client_config(cfg.auth_token)?; let basin_client = BasinClient::new(client_config, basin); @@ -609,7 +596,8 @@ async fn run() -> Result<(), S2CliError> { eprintln!("{}", "✓ Stream deletion requested".green().bold()); } - Commands::GetStreamConfig { basin, stream } => { + Commands::GetStreamConfig { args } => { + let (basin, stream) = args.try_into_parts()?; let cfg = config::load_config(&config_path)?; let client_config = client_config(cfg.auth_token)?; let basin_client = BasinClient::new(client_config, basin); @@ -620,11 +608,8 @@ async fn run() -> Result<(), S2CliError> { println!("{}", serde_json::to_string_pretty(&config)?); } - Commands::ReconfigureStream { - basin, - stream, - config, - } => { + Commands::ReconfigureStream { args, config } => { + let (basin, stream) = args.try_into_parts()?; let cfg = config::load_config(&config_path)?; let client_config = client_config(cfg.auth_token)?; let basin_client = BasinClient::new(client_config, basin); @@ -647,7 +632,8 @@ async fn run() -> Result<(), S2CliError> { println!("{}", serde_json::to_string_pretty(&config)?); } - Commands::CheckTail { basin, stream } => { + Commands::CheckTail { args } => { + let (basin, stream) = args.try_into_parts()?; let cfg = config::load_config(&config_path)?; let client_config = client_config(cfg.auth_token)?; let stream_client = StreamClient::new(client_config, basin, stream); @@ -656,12 +642,12 @@ async fn run() -> Result<(), S2CliError> { } Commands::Trim { - basin, - stream, + args, trim_point, fencing_token, match_seq_num, } => { + let (basin, stream) = args.try_into_parts()?; let cfg = config::load_config(&config_path)?; let client_config = client_config(cfg.auth_token)?; let stream_client = StreamClient::new(client_config, basin, stream); @@ -681,12 +667,12 @@ async fn run() -> Result<(), S2CliError> { } Commands::Fence { - basin, - stream, + args, new_fencing_token, fencing_token, match_seq_num, } => { + let (basin, stream) = args.try_into_parts()?; let cfg = config::load_config(&config_path)?; let client_config = client_config(cfg.auth_token)?; let stream_client = StreamClient::new(client_config, basin, stream); @@ -706,12 +692,12 @@ async fn run() -> Result<(), S2CliError> { } Commands::Append { - basin, - stream, + args, input, fencing_token, match_seq_num, } => { + let (basin, stream) = args.try_into_parts()?; let cfg = config::load_config(&config_path)?; let client_config = client_config(cfg.auth_token)?; let stream_client = StreamClient::new(client_config, basin, stream); @@ -769,13 +755,13 @@ async fn run() -> Result<(), S2CliError> { } Commands::Read { - basin, - stream, + args, start_seq_num, output, limit_count, limit_bytes, } => { + let (basin, stream) = args.try_into_parts()?; let cfg = config::load_config(&config_path)?; let client_config = client_config(cfg.auth_token)?; let stream_client = StreamClient::new(client_config, basin, stream); @@ -895,12 +881,12 @@ async fn run() -> Result<(), S2CliError> { } Commands::Ping { - basin, - stream, + args, interval, batch_bytes, num_batches, } => { + let (basin, stream) = args.try_into_parts()?; let cfg = config::load_config(&config_path)?; let client_config = client_config(cfg.auth_token)?; let stream_client = StreamService::new(StreamClient::new(client_config, basin, stream)); diff --git a/src/types.rs b/src/types.rs index 03159e7..7c4c650 100644 --- a/src/types.rs +++ b/src/types.rs @@ -2,11 +2,131 @@ use clap::{Parser, ValueEnum}; use serde::Serialize; -use std::time::Duration; +use std::{str::FromStr, time::Duration}; +use streamstore::types::BasinName; + +use crate::error::{BasinNameOrUriParseError, S2CliError}; pub const STORAGE_CLASS_PATH: &str = "default_stream_config.storage_class"; pub const RETENTION_POLICY_PATH: &str = "default_stream_config.retention_policy"; +#[derive(Debug, Clone)] +pub struct BasinNameOrUri { + pub basin: BasinName, + pub stream: S, +} + +impl From> for BasinName { + fn from(value: BasinNameOrUri) -> Self { + value.basin + } +} + +fn parse_maybe_basin_or_uri( + s: &str, +) -> Result<(BasinName, Option), BasinNameOrUriParseError> { + match BasinName::from_str(s) { + Ok(basin) => { + // Definitely a basin name since a valid basin name cannot have `:` + // which is required for the URI. + Ok((basin, None)) + } + Err(parse_basin_err) => { + // Should definitely be a URI else error. + let uri = http::Uri::from_str(s).map_err(|_| parse_basin_err)?; + + match uri.scheme_str() { + Some("s2") => (), + Some(other) => { + return Err(BasinNameOrUriParseError::InvalidUri(miette::miette!( + help = "Does the URI start with 's2://'?", + "Unsupported URI scheme '{}'", + other + ))); + } + None => { + return Err(BasinNameOrUriParseError::InvalidUri(miette::miette!( + help = "Make sure the URI starts with 's2://'", + "Missing URI scheme" + ))) + } + }; + + let basin = uri.host().ok_or_else(|| { + BasinNameOrUriParseError::InvalidUri(miette::miette!( + help = "Is there an extra '/' after 's2://'?", + "Missing basin name (URI host)" + )) + })?; + let basin = BasinName::from_str(basin)?; + + let stream = uri.path().trim_start_matches('/'); + let stream = if stream.is_empty() { + None + } else { + Some(stream.to_string()) + }; + + Ok((basin, stream)) + } + } +} + +pub type BasinNameOnlyUri = BasinNameOrUri<()>; + +impl FromStr for BasinNameOnlyUri { + type Err = BasinNameOrUriParseError; + + fn from_str(s: &str) -> Result { + let (basin, stream) = parse_maybe_basin_or_uri(s)?; + if stream.is_none() { + Ok(Self { basin, stream: () }) + } else { + Err(BasinNameOrUriParseError::InvalidUri(miette::miette!( + help = "Try providing the basin name directly or URI like 's2://basin-name'", + "Must not contain stream name (URI path)" + ))) + } + } +} + +pub type BasinNameAndMaybeStreamUri = BasinNameOrUri>; + +impl FromStr for BasinNameAndMaybeStreamUri { + type Err = BasinNameOrUriParseError; + + fn from_str(s: &str) -> Result { + let (basin, stream) = parse_maybe_basin_or_uri(s)?; + Ok(Self { basin, stream }) + } +} + +#[derive(Parser, Debug, Clone)] +pub struct BasinNameAndStreamArgs { + /// Name of the basin to manage or S2 URI with basin and stream. + #[arg(value_name = "BASIN|S2_URI")] + uri: BasinNameAndMaybeStreamUri, + /// Name of the stream. + stream: Option, +} + +impl BasinNameAndStreamArgs { + pub fn try_into_parts(self) -> Result<(BasinName, String), S2CliError> { + let stream = match (self.stream, self.uri.stream) { + (Some(_), Some(_)) => return Err(S2CliError::InvalidArgs(miette::miette!( + help = "Make sure to provide the stream name once either in URI or as argument", + "Multiple stream names provided" + ))), + (None, None) => return Err(S2CliError::InvalidArgs(miette::miette!( + help = "Try providing the stream name as another argument or in URI like 's2://basin-name/stream/name'", + "Missing stream name" + ))), + (Some(s), None) | (None, Some(s)) => s, + }; + Ok((self.uri.basin, stream)) + } +} + #[derive(Parser, Debug, Clone, Serialize)] pub struct BasinConfig { #[clap(flatten)] @@ -126,3 +246,45 @@ impl From for StreamConfig { } } } + +#[cfg(test)] +mod tests { + use std::str::FromStr; + + use crate::types::BasinNameOnlyUri; + + use super::BasinNameAndMaybeStreamUri; + + #[test] + fn test_basin_name_or_uri_parse() { + let test_cases = vec![ + ("valid-basin", Some(("valid-basin", None))), + ("s2://valid-basin", Some(("valid-basin", None))), + ("s2://valid-basin/", Some(("valid-basin", None))), + ( + "s2://valid-basin/stream/name", + Some(("valid-basin", Some("stream/name"))), + ), + ("-invalid-basin", None), + ("http://valid-basin", None), + ("s2://-invalid-basin", None), + ("s2:///stream/name", None), + ("random:::string", None), + ]; + + for (s, expected) in test_cases { + let b = BasinNameAndMaybeStreamUri::from_str(s); + if let Some((expected_basin, expected_stream)) = expected { + let b = b.unwrap(); + assert_eq!(b.basin.as_ref(), expected_basin); + assert_eq!(b.stream.as_deref(), expected_stream); + assert_eq!( + expected_stream.is_some(), + BasinNameOnlyUri::from_str(s).is_err() + ); + } else { + assert!(b.is_err()); + } + } + } +}