Skip to content

Commit

Permalink
.
Browse files Browse the repository at this point in the history
  • Loading branch information
infiniteregrets committed Oct 2, 2024
1 parent ad02f77 commit 564520f
Show file tree
Hide file tree
Showing 7 changed files with 68 additions and 270 deletions.
21 changes: 0 additions & 21 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

57 changes: 26 additions & 31 deletions src/account.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ use s2::{
ServiceError,
},
types::{
BasinConfig, BasinMetadata, ListBasinsResponse, RetentionPolicy, StorageClass, StreamConfig,
BasinConfig, BasinMetadata, CreateBasinRequest, DeleteBasinRequest, ListBasinsRequest,
ListBasinsResponse, ReconfigureBasinRequest, RetentionPolicy, StorageClass, StreamConfig,
},
};

Expand Down Expand Up @@ -42,11 +43,10 @@ impl AccountService {
start_after: String,
limit: usize,
) -> Result<ListBasinsResponse, AccountServiceError> {
let list_basins_req = s2::types::ListBasinsRequest::builder()
.prefix(prefix)
.start_after(start_after)
.limit(limit)
.build();
let list_basins_req = ListBasinsRequest::new()
.with_prefix(prefix)
.with_start_after(start_after)
.with_limit(limit);

self.client
.list_basins(list_basins_req)
Expand All @@ -56,49 +56,46 @@ impl AccountService {

pub async fn create_basin(
&self,
name: String,
basin: String,
storage_class: Option<StorageClass>,
retention_policy: Option<humantime::Duration>,
) -> Result<BasinMetadata, AccountServiceError> {
let basin_config = match (&storage_class, retention_policy) {
(Some(storage_class), Some(retention_policy)) => {
let stream_config = StreamConfig::builder()
.storage_class(*storage_class)
.retention_policy(RetentionPolicy::Age(*retention_policy))
.build();
let stream_config = StreamConfig::new()
.with_storage_class(*storage_class)
.with_retention_policy(RetentionPolicy::Age(*retention_policy));

let basin_config = BasinConfig::builder()
.default_stream_config(Some(stream_config))
.build();
let basin_config = BasinConfig::with_default_stream_config(stream_config);

Some(basin_config)
}
_ => None,
};

let create_basin_req = s2::types::CreateBasinRequest::builder()
.basin(name)
.config(basin_config)
.build();
let mut create_basin_req = CreateBasinRequest::new(basin);

if let Some(basin_config) = basin_config {
create_basin_req = create_basin_req.with_config(basin_config)
};

self.client
.create_basin(create_basin_req)
.await
.map_err(AccountServiceError::CreateBasin)
}

pub async fn delete_basin(&self, name: String) -> Result<(), AccountServiceError> {
let delete_basin_req = s2::types::DeleteBasinRequest::builder().basin(name).build();
pub async fn delete_basin(&self, basin: String) -> Result<(), AccountServiceError> {
let delete_basin_req = DeleteBasinRequest::new(basin);
self.client.delete_basin(delete_basin_req).await?;
Ok(())
}

pub async fn get_basin_config(&self, name: String) -> Result<BasinConfig, AccountServiceError> {
let get_basin_config_req = s2::types::GetBasinConfigRequest::builder()
.basin(name)
.build();

Ok(self.client.get_basin_config(get_basin_config_req).await?)
pub async fn get_basin_config(
&self,
basin: String,
) -> Result<BasinConfig, AccountServiceError> {
Ok(self.client.get_basin_config(basin).await?)
}

pub async fn reconfigure_basin(
Expand All @@ -107,11 +104,9 @@ impl AccountService {
basin_config: BasinConfig,
mask: Vec<String>,
) -> Result<(), AccountServiceError> {
let reconfigure_basin_req = s2::types::ReconfigureBasinRequest::builder()
.basin(basin)
.config(basin_config)
.mask(mask)
.build();
let reconfigure_basin_req = ReconfigureBasinRequest::new(basin)
.with_config(basin_config)
.with_mask(mask);
self.client.reconfigure_basin(reconfigure_basin_req).await?;
Ok(())
}
Expand Down
52 changes: 22 additions & 30 deletions src/basin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@ use s2::{
CreateStreamError, DeleteStreamError, GetStreamConfigError, ListStreamsError,
ReconfigureStreamError, ServiceError,
},
types::{ListStreamsResponse, StreamConfig},
types::{
CreateStreamRequest, DeleteStreamRequest, ListStreamsRequest, ListStreamsResponse,
ReconfigureStreamRequest, StreamConfig,
},
};

pub struct BasinService {
Expand Down Expand Up @@ -40,11 +43,10 @@ impl BasinService {
start_after: String,
limit: usize,
) -> Result<Vec<String>, BasinServiceError> {
let list_streams_req = s2::types::ListStreamsRequest::builder()
.prefix(prefix)
.start_after(start_after)
.limit(limit)
.build();
let list_streams_req = ListStreamsRequest::new()
.with_prefix(prefix)
.with_start_after(start_after)
.with_limit(limit);

let ListStreamsResponse { streams, .. } =
self.client.list_streams(list_streams_req).await?;
Expand All @@ -54,38 +56,31 @@ impl BasinService {

pub async fn create_stream(
&self,
stream_name: String,
stream: String,
config: Option<StreamConfig>,
) -> Result<(), BasinServiceError> {
let create_stream_req = s2::types::CreateStreamRequest::builder()
.stream(stream_name)
.config(config)
.build();
let mut create_stream_req = CreateStreamRequest::new(stream);

self.client.create_stream(create_stream_req).await?;
if let Some(config) = config {
create_stream_req = create_stream_req.with_config(config);
};

self.client.create_stream(create_stream_req).await?;
Ok(())
}

pub async fn delete_stream(&self, stream_name: String) -> Result<(), BasinServiceError> {
let delete_stream_req = s2::types::DeleteStreamRequest::builder()
.stream(stream_name)
.build();

self.client.delete_stream(delete_stream_req).await?;

pub async fn delete_stream(&self, stream: String) -> Result<(), BasinServiceError> {
self.client
.delete_stream(DeleteStreamRequest::new(stream))
.await?;
Ok(())
}

pub async fn get_stream_config(
&self,
stream: String,
) -> Result<StreamConfig, BasinServiceError> {
let get_stream_config_req = s2::types::GetStreamConfigRequest::builder()
.stream(stream)
.build();

Ok(self.client.get_stream_config(get_stream_config_req).await?)
Ok(self.client.get_stream_config(stream).await?)
}

pub async fn reconfigure_stream(
Expand All @@ -94,16 +89,13 @@ impl BasinService {
config: StreamConfig,
mask: Vec<String>,
) -> Result<(), BasinServiceError> {
let reconfigure_stream_req = s2::types::ReconfigureStreamRequest::builder()
.stream(stream)
.config(config)
.mask(mask)
.build();
let reconfigure_stream_req = ReconfigureStreamRequest::new(stream)
.with_config(config)
.with_mask(mask);

self.client
.reconfigure_stream(reconfigure_stream_req)
.await?;

Ok(())
}
}
12 changes: 1 addition & 11 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,7 @@ use miette::Diagnostic;
use s2::client::ClientError;
use thiserror::Error;

use crate::{
account::AccountServiceError, basin::BasinServiceError, config::S2ConfigError,
stream::StreamServiceError,
};
use crate::{account::AccountServiceError, basin::BasinServiceError, config::S2ConfigError};

const HELP: &str = color_print::cstr!(
"\n<cyan><bold>Notice something wrong?</bold></cyan>\n\n\
Expand Down Expand Up @@ -35,13 +32,6 @@ pub enum S2CliError {
#[diagnostic(help("{}", HELP))]
BasinService(#[from] BasinServiceError),

#[error(transparent)]
#[diagnostic(help("{}", HELP))]
StreamService(#[from] StreamServiceError),

#[error(transparent)]
InvalidConfig(#[from] serde_json::Error),

#[error("Failed to record file: {0}")]
RecordFileReadError(String),
}
91 changes: 5 additions & 86 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,21 @@
use std::{fs::File, io::BufReader, path::PathBuf};

use account::AccountService;
use basin::BasinService;
use clap::{builder::styling, Parser, Subcommand};
use colored::*;
use config::{config_path, create_config};
use error::S2CliError;
use s2::{
client::{BasinClient, Client, ClientConfig, HostCloud, StreamClient},
client::{BasinClient, Client, ClientConfig, HostCloud},
types::{BasinMetadata, StorageClass},
};
use stream::StreamService;
use tracing_subscriber::{fmt::format::FmtSpan, layer::SubscriberExt, util::SubscriberInitExt};
use types::{BasinConfig, StreamConfig, RETENTION_POLICY_PATH, STORAGE_CLASS_PATH};

mod account;
mod basin;

mod config;
mod error;
mod stream;
mod types;

const STYLES: styling::Styles = styling::Styles::styled()
Expand Down Expand Up @@ -60,12 +57,6 @@ enum Commands {
#[command(subcommand)]
action: BasinActions,
},

/// Manage s2 streams
Stream {
#[command(subcommand)]
action: StreamActions,
},
}

#[derive(Subcommand, Debug)]
Expand Down Expand Up @@ -197,44 +188,10 @@ enum BasinActions {
},
}

#[derive(Subcommand, Debug)]
enum StreamActions {
/// Get the next sequence number that will be assigned by a stream.
GetNextSeqNum {
/// Name of the basin to get the next sequence number from.
basin: String,

/// Name of the stream to get the next sequence number for.
stream: String,
},

/// Append a batch of records to a stream.
Append {
/// Name of the basin.
basin: String,

/// Name of the stream.
stream: String,

/// Enforce that the sequence number issued to the first record matches.
#[arg(short, long)]
match_seq_num: Option<u64>,

/// Enforce a fencing token which must have been previously set by a `fence` command record.
#[arg(short, long)]
fencing_token: Option<String>,

/// Path to the file containing the records to append.
file: PathBuf,
},
}

fn s2_config(auth_token: String) -> ClientConfig {
ClientConfig::builder()
.host_uri(HostCloud::Local)
.token(auth_token.to_string())
.connection_timeout(std::time::Duration::from_secs(5))
.build()
ClientConfig::new(auth_token.to_string())
.with_host_uri(HostCloud::Local)
.with_connection_timeout(std::time::Duration::from_secs(5))
}

#[tokio::main]
Expand Down Expand Up @@ -417,44 +374,6 @@ async fn run() -> Result<(), S2CliError> {
}
}
}
Commands::Stream { action } => {
let cfg = config::load_config(&config_path)?;
let s2_config = s2_config(cfg.auth_token);
match action {
StreamActions::GetNextSeqNum { basin, stream } => {
let stream_client = StreamClient::connect(s2_config, basin, stream).await?;
let seq_num = StreamService::new(stream_client).get_next_seq_num().await?;
println!("{}", seq_num);
}

StreamActions::Append {
basin,
stream,
match_seq_num,
fencing_token,
file,
} => {
let stream_client = StreamClient::connect(s2_config, basin, stream).await?;
let stream_service = StreamService::new(stream_client);

let record_file = File::open(file.clone())
.map_err(|_| S2CliError::RecordFileReadError(file.display().to_string()))?;

let records = jsonl::read::<BufReader<File>, Vec<types::AppendRecord>>(
BufReader::new(record_file),
)
.map_err(|_| {
S2CliError::RecordFileReadError("Failed to parse records".to_string())
})?;

stream_service
.append(records, match_seq_num, fencing_token)
.await?;
println!("{}", "✓ Records appended successfully".green().bold());
}
}
}
}

Ok(())
}
Loading

0 comments on commit 564520f

Please sign in to comment.