diff --git a/Cargo.lock b/Cargo.lock index eb6c8d8..7aef720 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -525,6 +525,21 @@ version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" +[[package]] +name = "futures" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "645c6916888f6cb6350d2550b80fb63e734897a8498abe35cfb732b6487804b0" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + [[package]] name = "futures-channel" version = "0.3.30" @@ -532,6 +547,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "eac8f7d7865dcb88bd4373ab671c8cf4508703796caa2b1985a9ca867b3fcb78" dependencies = [ "futures-core", + "futures-sink", ] [[package]] @@ -540,6 +556,34 @@ version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dfc6580bb841c5a68e9ef15c77ccc837b40a7504914d52e47b8b0e9bbda25a1d" +[[package]] +name = "futures-executor" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a576fc72ae164fca6b9db127eaa9a9dda0d61316034f33a0a0d4eda41f02b01d" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-io" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a44623e20b9681a318efdd71c299b6b222ed6f231972bfe2f224ebad6311f0c1" + +[[package]] +name = "futures-macro" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "futures-sink" version = "0.3.30" @@ -558,10 +602,16 @@ version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3d6401deb83407ab3da39eba7e33987a73c3df0c82b4bb5813ee871c19c41d48" dependencies = [ + "futures-channel", "futures-core", + "futures-io", + "futures-macro", + "futures-sink", "futures-task", + "memchr", "pin-project-lite", "pin-utils", + "slab", ] [[package]] @@ -1392,13 +1442,11 @@ name = "s2" version = "0.1.0" dependencies = [ "backon", + "futures", "http", "prost", "prost-types", "secrecy", - "serde", - "serde_derive", - "serde_json", "thiserror", "tonic", "tonic-build", diff --git a/src/account.rs b/src/account.rs index 2b30b60..7078f01 100644 --- a/src/account.rs +++ b/src/account.rs @@ -5,8 +5,7 @@ use s2::{ ServiceError, }, types::{ - BasinConfig, CreateBasinResponse, GetBasinConfigResponse, ListBasinsResponse, - RetentionPolicy, StorageClass, StreamConfig, + BasinConfig, BasinMetadata, ListBasinsResponse, RetentionPolicy, StorageClass, StreamConfig, }, }; @@ -60,7 +59,7 @@ impl AccountService { name: String, storage_class: Option, retention_policy: Option, - ) -> Result { + ) -> Result { let basin_config = match (&storage_class, retention_policy) { (Some(storage_class), Some(retention_policy)) => { let stream_config = StreamConfig::builder() @@ -98,9 +97,8 @@ impl AccountService { let get_basin_config_req = s2::types::GetBasinConfigRequest::builder() .basin(name) .build(); - let GetBasinConfigResponse { config } = - self.client.get_basin_config(get_basin_config_req).await?; - Ok(config) + + Ok(self.client.get_basin_config(get_basin_config_req).await?) } pub async fn reconfigure_basin( diff --git a/src/basin.rs b/src/basin.rs index bd49550..c9c1fd5 100644 --- a/src/basin.rs +++ b/src/basin.rs @@ -4,7 +4,7 @@ use s2::{ CreateStreamError, DeleteStreamError, GetStreamConfigError, ListStreamsError, ReconfigureStreamError, ServiceError, }, - types::{GetStreamConfigResponse, ListStreamsResponse, StreamConfig}, + types::{ListStreamsResponse, StreamConfig}, }; pub struct BasinService { @@ -85,9 +85,7 @@ impl BasinService { .stream(stream) .build(); - let GetStreamConfigResponse { config } = - self.client.get_stream_config(get_stream_config_req).await?; - Ok(config) + Ok(self.client.get_stream_config(get_stream_config_req).await?) } pub async fn reconfigure_stream( diff --git a/src/error.rs b/src/error.rs index 184866e..33fd449 100644 --- a/src/error.rs +++ b/src/error.rs @@ -1,28 +1,21 @@ -use std::sync::OnceLock; - -use colored::*; use miette::Diagnostic; use s2::client::ClientError; use thiserror::Error; -use crate::{account::AccountServiceError, basin::BasinServiceError, config::S2ConfigError}; - -static HELP: OnceLock = OnceLock::new(); - -fn get_help() -> &'static str { - HELP.get_or_init(|| { - format!( - "\n{}\n\n ► {}\n{}\n\n ► {}\n{}\n\n ► {}\n{}", - "Notice something wrong?".cyan().bold(), - "Open an issue:".green(), - "https://github.com/s2-cli/issues".bold(), - "Reach out to us:".green(), - "hi@s2.dev".bold(), - "Join our community:".green(), - "Discord: https://discord.gg/s2".bold(), - ) - }) -} +use crate::{ + account::AccountServiceError, basin::BasinServiceError, config::S2ConfigError, + stream::StreamServiceError, +}; + +const HELP: &str = color_print::cstr!( + "\nNotice something wrong?\n\n\ + ► Open an issue:\n\ + https://github.com/s2-cli/issues\n\n\ + ► Reach out to us:\n\ + hi@s2.dev\n\n\ + ► Join our community:\n\ + Discord: https://discord.gg/s2" +); #[derive(Error, Debug, Diagnostic)] pub enum S2CliError { @@ -35,17 +28,17 @@ pub enum S2CliError { Connection(#[from] ClientError), #[error(transparent)] - #[diagnostic(help("{}", get_help()))] + #[diagnostic(help("{}", HELP))] AccountService(#[from] AccountServiceError), #[error(transparent)] - #[diagnostic(help("{}", get_help()))] + #[diagnostic(help("{}", HELP))] BasinService(#[from] BasinServiceError), #[error(transparent)] - InvalidConfig(#[from] serde_json::Error), + #[diagnostic(help("{}", HELP))] + StreamService(#[from] StreamServiceError), - #[error("Failed to interact for confirmation!")] - #[diagnostic(help("{}", get_help()))] - ConfirmationError(#[from] dialoguer::Error), + #[error(transparent)] + InvalidConfig(#[from] serde_json::Error), } diff --git a/src/main.rs b/src/main.rs index 8415531..6024f30 100644 --- a/src/main.rs +++ b/src/main.rs @@ -5,16 +5,18 @@ use colored::*; use config::{config_path, create_config}; use error::S2CliError; use s2::{ - client::{Client, ClientConfig, HostCloud}, + client::{BasinClient, Client, ClientConfig, HostCloud, StreamClient}, types::{BasinMetadata, StorageClass}, }; +use stream::StreamService; use tracing_subscriber::{fmt::format::FmtSpan, layer::SubscriberExt, util::SubscriberInitExt}; -use types::{BasinConfig, StreamConfig}; +use types::{BasinConfig, StreamConfig, RETENTION_POLICY_PATH, STORAGE_CLASS_PATH}; mod account; mod basin; mod config; mod error; +mod stream; mod types; const STYLES: styling::Styles = styling::Styles::styled() @@ -23,7 +25,7 @@ const STYLES: styling::Styles = styling::Styles::styled() .literal(styling::AnsiColor::Blue.on_default().bold()) .placeholder(styling::AnsiColor::Cyan.on_default()); -const USAGE: &str = color_print::cstr!( +const GENERAL_USAGE: &str = color_print::cstr!( r#" $ s2-cli config set --token ... $ s2-cli account list-basins --prefix "bar" --start-after "foo" --limit 100 @@ -31,7 +33,7 @@ const USAGE: &str = color_print::cstr!( ); #[derive(Parser, Debug)] -#[command(version, about, override_usage = USAGE, styles = STYLES)] +#[command(version, about, override_usage = GENERAL_USAGE, styles = STYLES)] struct Cli { #[command(subcommand)] command: Commands, @@ -52,10 +54,16 @@ enum Commands { }, /// Manage s2 basins - Basins { + Basin { #[command(subcommand)] action: BasinActions, }, + + /// Manage s2 streams + Stream { + #[command(subcommand)] + action: StreamActions, + }, } #[derive(Subcommand, Debug)] @@ -187,14 +195,26 @@ enum BasinActions { }, } -async fn s2_client(auth_token: String) -> Result { - let config = ClientConfig::builder() +#[derive(Subcommand, Debug)] +enum StreamActions { + /// Get the next sequence number that will be assigned by a stream. + GetNextSeqNum { + /// Name of the basin to get the next sequence number from. + basin: String, + + /// Name of the stream to get the next sequence number for. + stream: String, + }, + + +} + +fn s2_config(auth_token: String) -> ClientConfig { + ClientConfig::builder() .host_uri(HostCloud::Local) .token(auth_token.to_string()) .connection_timeout(std::time::Duration::from_secs(5)) - .build(); - - Ok(Client::connect(config).await?) + .build() } #[tokio::main] @@ -232,7 +252,8 @@ async fn run() -> Result<(), S2CliError> { Commands::Account { action } => { let cfg = config::load_config(&config_path)?; - let account_service = AccountService::new(s2_client(cfg.auth_token).await?); + let account_service = + AccountService::new(Client::connect(s2_config(cfg.auth_token)).await?); match action { AccountActions::ListBasins { prefix, @@ -270,6 +291,7 @@ async fn run() -> Result<(), S2CliError> { println!("{}", "✓ Basin created successfully".green().bold()); } + AccountActions::DeleteBasin { basin } => { account_service.delete_basin(basin).await?; println!("{}", "✓ Basin deleted successfully".green().bold()); @@ -277,24 +299,20 @@ async fn run() -> Result<(), S2CliError> { AccountActions::GetBasinConfig { basin } => { let basin_config = account_service.get_basin_config(basin).await?; - println!("{:?}", basin_config); + let basin_config: BasinConfig = basin_config.into(); + println!("{:?}", serde_json::to_string_pretty(&basin_config)?); } + AccountActions::ReconfigureBasin { basin, config } => { let mut mask = Vec::new(); match &config.default_stream_config { Some(config) => { - match config.storage_class { - Some(_) => { - mask.push("default_stream_config.storage_class".to_string()); - } - None => {} + if config.storage_class.is_some() { + mask.push(STORAGE_CLASS_PATH.to_string()); } - match config.retention_policy { - Some(_) => { - mask.push("default_stream_config.retention_policy".to_string()); - } - None => {} + if config.retention_policy.is_some() { + mask.push(RETENTION_POLICY_PATH.to_string()); } } None => {} @@ -306,9 +324,10 @@ async fn run() -> Result<(), S2CliError> { } } } - Commands::Basins { action } => { + + Commands::Basin { action } => { let cfg = config::load_config(&config_path)?; - let client = s2_client(cfg.auth_token).await?; + let basin_config = s2_config(cfg.auth_token); match action { BasinActions::ListStreams { basin, @@ -316,7 +335,7 @@ async fn run() -> Result<(), S2CliError> { start_after, limit, } => { - let basin_client = client.basin_client(basin).await?; + let basin_client = BasinClient::connect(basin_config, basin).await?; let streams = BasinService::new(basin_client) .list_streams(prefix, start_after, limit as usize) .await?; @@ -324,50 +343,50 @@ async fn run() -> Result<(), S2CliError> { println!("{}", stream); } } + BasinActions::CreateStream { basin, stream, config, } => { - let basin_client = client.basin_client(basin).await?; + let basin_client = BasinClient::connect(basin_config, basin).await?; BasinService::new(basin_client) .create_stream(stream, config.map(Into::into)) .await?; println!("{}", "✓ Stream created successfully".green().bold()); } + BasinActions::DeleteStream { basin, stream } => { - let basin_client = client.basin_client(basin).await?; + let basin_client = BasinClient::connect(basin_config, basin).await?; BasinService::new(basin_client) .delete_stream(stream) .await?; println!("{}", "✓ Stream deleted successfully".green().bold()); } + BasinActions::GetStreamConfig { basin, stream } => { - let basin_client = client.basin_client(basin).await?; + let basin_client = BasinClient::connect(basin_config, basin).await?; let config = BasinService::new(basin_client) .get_stream_config(stream) .await?; + let config: StreamConfig = config.into(); println!("{:?}", serde_json::to_string_pretty(&config)?); } + BasinActions::ReconfigureStream { basin, stream, config, } => { - let basin_client = client.basin_client(basin).await?; + let basin_client = BasinClient::connect(basin_config, basin).await?; let mut mask = Vec::new(); - match config.storage_class { - Some(_) => { - mask.push("storage_class".to_string()); - } - None => {} + + if config.storage_class.is_some() { + mask.push("storage_class".to_string()); }; - match config.retention_policy { - Some(_) => { - mask.push("retention_policy".to_string()); - } - None => {} + if config.retention_policy.is_some() { + mask.push("retention_policy".to_string()); }; BasinService::new(basin_client) @@ -378,6 +397,17 @@ async fn run() -> Result<(), S2CliError> { } } } + Commands::Stream { action } => { + let cfg = config::load_config(&config_path)?; + let s2_config = s2_config(cfg.auth_token); + match action { + StreamActions::GetNextSeqNum { basin, stream } => { + let stream_client = StreamClient::connect(s2_config, basin, stream).await?; + let seq_num = StreamService::new(stream_client).get_next_seq_num().await?; + println!("{}", seq_num); + } + } + } } Ok(()) diff --git a/src/stream.rs b/src/stream.rs new file mode 100644 index 0000000..4a20c0f --- /dev/null +++ b/src/stream.rs @@ -0,0 +1,24 @@ +use s2::{ + client::StreamClient, + service_error::{GetNextSeqNumError, ServiceError}, +}; + +pub struct StreamService { + client: StreamClient, +} + +#[derive(Debug, thiserror::Error)] +pub enum StreamServiceError { + #[error("Failed to get next sequence number")] + GetNextSeqNumError(#[from] ServiceError), +} + +impl StreamService { + pub fn new(client: StreamClient) -> Self { + Self { client } + } + + pub async fn get_next_seq_num(&self) -> Result { + Ok(self.client.get_next_seq_num().await?) + } +} diff --git a/src/types.rs b/src/types.rs index 5a8ce45..2929fd0 100644 --- a/src/types.rs +++ b/src/types.rs @@ -1,13 +1,19 @@ +//! Types for Basin configuration that directly map to s2::types. + use clap::{Parser, ValueEnum}; +use serde::Serialize; use std::time::Duration; -#[derive(Parser, Debug, Clone)] +pub const STORAGE_CLASS_PATH: &str = "default_stream_config.storage_class"; +pub const RETENTION_POLICY_PATH: &str = "default_stream_config.retention_policy"; + +#[derive(Parser, Debug, Clone, Serialize)] pub struct BasinConfig { #[clap(flatten)] pub default_stream_config: Option, } -#[derive(Parser, Debug, Clone)] +#[derive(Parser, Debug, Clone, Serialize)] pub struct StreamConfig { #[arg(short, long)] /// Storage class for a stream. @@ -17,14 +23,14 @@ pub struct StreamConfig { pub retention_policy: Option, } -#[derive(ValueEnum, Debug, Clone)] +#[derive(ValueEnum, Debug, Clone, Serialize)] pub enum StorageClass { Unspecified, Standard, Express, } -#[derive(Clone, Debug)] +#[derive(Clone, Debug, Serialize)] pub enum RetentionPolicy { #[allow(dead_code)] Age(Duration), @@ -72,6 +78,16 @@ impl From for s2::types::StorageClass { } } +impl From for StorageClass { + fn from(class: s2::types::StorageClass) -> Self { + match class { + s2::types::StorageClass::Unspecified => StorageClass::Unspecified, + s2::types::StorageClass::Standard => StorageClass::Standard, + s2::types::StorageClass::Express => StorageClass::Express, + } + } +} + impl From for s2::types::RetentionPolicy { fn from(policy: RetentionPolicy) -> Self { match policy { @@ -79,3 +95,29 @@ impl From for s2::types::RetentionPolicy { } } } + +impl From for RetentionPolicy { + fn from(policy: s2::types::RetentionPolicy) -> Self { + match policy { + s2::types::RetentionPolicy::Age(d) => RetentionPolicy::Age(d), + } + } +} + +impl From for BasinConfig { + fn from(config: s2::types::BasinConfig) -> Self { + let default_stream_config = config.default_stream_config.map(|c| c.into()); + BasinConfig { + default_stream_config, + } + } +} + +impl From for StreamConfig { + fn from(config: s2::types::StreamConfig) -> Self { + StreamConfig { + storage_class: Some(config.storage_class.into()), + retention_policy: config.retention_policy.map(|r| r.into()), + } + } +}