Skip to content

Commit

Permalink
.
Browse files Browse the repository at this point in the history
  • Loading branch information
infiniteregrets committed Sep 20, 2024
1 parent 0c5d333 commit 6f8fd93
Show file tree
Hide file tree
Showing 6 changed files with 151 additions and 23 deletions.
52 changes: 37 additions & 15 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,4 @@ s2 = { git = "ssh://[email protected]/s2-streamstore/s2.rs.git", branch = "main" }
tokio = { version = "*", features = ["full"] }
humantime = "2.1.0"
miette = { version = "7.2.0", features = ["fancy"] }
color-print = "0.3.6"
2 changes: 1 addition & 1 deletion src/account.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ impl AccountService {
&self,
prefix: String,
start_after: String,
limit: u32,
limit: usize,
) -> Result<ListBasinsResponse, AccountServiceError> {
let list_basins_req = s2::types::ListBasinsRequest::builder()
.prefix(prefix)
Expand Down
39 changes: 39 additions & 0 deletions src/basin.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
use s2::{
client::BasinClient,
service_error::{ListStreamsError, ServiceError},
types::ListStreamsResponse,
};

pub struct BasinService {
client: BasinClient,
}

#[derive(Debug, thiserror::Error)]
pub enum BasinServiceError {
#[error("Failed to list streams: {0}")]
ListStreams(#[from] ServiceError<ListStreamsError>),
}

impl BasinService {
pub fn new(client: BasinClient) -> Self {
Self { client }
}

pub async fn list_streams(
&self,
prefix: String,
start_after: String,
limit: usize,
) -> Result<ListStreamsResponse, BasinServiceError> {
let list_streams_req = s2::types::ListStreamsRequest::builder()
.prefix(prefix)
.start_after(start_after)
.limit(limit)
.build();

self.client
.list_streams(list_streams_req)
.await
.map_err(BasinServiceError::ListStreams)
}
}
6 changes: 5 additions & 1 deletion src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use miette::Diagnostic;
use s2::client::ClientError;
use thiserror::Error;

use crate::{account::AccountServiceError, config::S2ConfigError};
use crate::{account::AccountServiceError, basin::BasinServiceError, config::S2ConfigError};

static HELP: OnceLock<String> = OnceLock::new();

Expand Down Expand Up @@ -37,4 +37,8 @@ pub enum S2CliError {
#[error(transparent)]
#[diagnostic(help("{}", get_help()))]
AccountService(#[from] AccountServiceError),

#[error(transparent)]
#[diagnostic(help("{}", get_help()))]
BasinService(#[from] BasinServiceError),
}
74 changes: 68 additions & 6 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use account::AccountService;
use clap::{Parser, Subcommand};
use basin::BasinService;
use clap::{builder::styling, Parser, Subcommand};
use colored::*;
use config::{config_path, create_config};
use error::S2CliError;
Expand All @@ -9,19 +10,33 @@ use s2::{
};

mod account;
mod basin;
mod config;
mod error;

const STYLES: styling::Styles = styling::Styles::styled()
.header(styling::AnsiColor::Green.on_default().bold())
.usage(styling::AnsiColor::Green.on_default().bold())
.literal(styling::AnsiColor::Blue.on_default().bold())
.placeholder(styling::AnsiColor::Cyan.on_default());

const USAGE: &str = color_print::cstr!(
r#"
<dim>$</dim> <bold>s2-cli config set --token ...</bold>
<dim>$</dim> <bold>s2-cli account list-basins --prefix "bar" --start-after "foo" --limit 100</bold>
"#
);

#[derive(Parser, Debug)]
#[command(version, about, long_about = None)]
#[command(version, about, override_usage = USAGE, styles = STYLES)]
struct Cli {
#[command(subcommand)]
command: Commands,
}

#[derive(Subcommand, Debug)]
enum Commands {
/// Manage s2 configuration
/// Manage s2-cli configuration
Config {
#[command(subcommand)]
action: ConfigActions,
Expand All @@ -32,6 +47,12 @@ enum Commands {
#[command(subcommand)]
action: AccountActions,
},

/// Manage s2 basins
Basins {
#[command(subcommand)]
action: BasinActions,
},
}

#[derive(Subcommand, Debug)]
Expand All @@ -57,7 +78,7 @@ enum AccountActions {

/// Number of results, upto a maximum of 1000.
#[arg(short, long)]
limit: u32,
limit: usize,
},

/// Create a basin
Expand All @@ -66,11 +87,11 @@ enum AccountActions {
basin: String,

/// Storage class for recent writes.
#[arg(short, long, requires_all = ["retention_policy"])]
#[arg(short, long)]
storage_class: Option<StorageClass>,

/// Age threshold of oldest records in the stream, which can be automatically trimmed.
#[arg(short, long, requires_all = ["storage_class"])]
#[arg(short, long)]
retention_policy: Option<humantime::Duration>,
},

Expand All @@ -81,6 +102,27 @@ enum AccountActions {
},
}

#[derive(Subcommand, Debug)]
enum BasinActions {
/// List Streams
ListStreams {
/// Name of the basin to list streams from.
basin: String,

/// List stream names that begin with this prefix.
#[arg(short, long)]
prefix: String,

/// List stream names that lexicographically start after this name.
#[arg(short, long)]
start_after: String,

/// Number of results, upto a maximum of 1000.
#[arg(short, long)]
limit: u32,
},
}

async fn s2_client(token: String) -> Result<Client, S2CliError> {
let config = ClientConfig::builder()
.host_uri(HostCloud::Local)
Expand Down Expand Up @@ -147,6 +189,26 @@ async fn run() -> Result<(), S2CliError> {
}
}
}
Commands::Basins { action } => {
let cfg = config::load_config(&config_path)?;
let client = s2_client(cfg.token).await?;
match action {
BasinActions::ListStreams {
basin,
prefix,
start_after,
limit,
} => {
let basin_client = client.basin_client(basin).await?;
let response = BasinService::new(basin_client)
.list_streams(prefix, start_after, limit as usize)
.await?;
for stream in response.streams {
println!("{}", stream);
}
}
}
}
}

Ok(())
Expand Down

0 comments on commit 6f8fd93

Please sign in to comment.