Skip to content

Commit

Permalink
..
Browse files Browse the repository at this point in the history
  • Loading branch information
infiniteregrets committed Sep 30, 2024
1 parent b035467 commit 93062bf
Show file tree
Hide file tree
Showing 6 changed files with 278 additions and 127 deletions.
13 changes: 0 additions & 13 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,4 @@ color-print = "0.3.6"
tracing = "0.1.40"
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
serde_json = "1.0.128"
json_dotpath = "1.1.0"
dialoguer = "0.11.0"
84 changes: 78 additions & 6 deletions src/basin.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
use s2::{
client::BasinClient,
service_error::{ListStreamsError, ServiceError},
types::ListStreamsResponse,
service_error::{
CreateStreamError, DeleteStreamError, GetStreamConfigError, ListStreamsError,
ReconfigureStreamError, ServiceError,
},
types::{GetStreamConfigResponse, ListStreamsResponse, StreamConfig},
};

pub struct BasinService {
Expand All @@ -12,6 +15,18 @@ pub struct BasinService {
pub enum BasinServiceError {
#[error("Failed to list streams: {0}")]
ListStreams(#[from] ServiceError<ListStreamsError>),

#[error("Failed to create stream")]
CreateStream(#[from] ServiceError<CreateStreamError>),

#[error("Failed to delete stream")]
DeleteStream(#[from] ServiceError<DeleteStreamError>),

#[error("Failed to get stream config")]
GetStreamConfig(#[from] ServiceError<GetStreamConfigError>),

#[error("Failed to reconfigure stream")]
ReconfigureStream(#[from] ServiceError<ReconfigureStreamError>),
}

impl BasinService {
Expand All @@ -24,16 +39,73 @@ impl BasinService {
prefix: String,
start_after: String,
limit: usize,
) -> Result<ListStreamsResponse, BasinServiceError> {
) -> Result<Vec<String>, BasinServiceError> {
let list_streams_req = s2::types::ListStreamsRequest::builder()
.prefix(prefix)
.start_after(start_after)
.limit(limit)
.build();

let ListStreamsResponse { streams, .. } =
self.client.list_streams(list_streams_req).await?;

Ok(streams)
}

pub async fn create_stream(
&self,
stream_name: String,
config: Option<StreamConfig>,
) -> Result<(), BasinServiceError> {
let create_stream_req = s2::types::CreateStreamRequest::builder()
.stream(stream_name)
.config(config)
.build();

self.client.create_stream(create_stream_req).await?;

Ok(())
}

pub async fn delete_stream(&self, stream_name: String) -> Result<(), BasinServiceError> {
let delete_stream_req = s2::types::DeleteStreamRequest::builder()
.stream(stream_name)
.build();

self.client.delete_stream(delete_stream_req).await?;

Ok(())
}

pub async fn get_stream_config(
&self,
stream: String,
) -> Result<StreamConfig, BasinServiceError> {
let get_stream_config_req = s2::types::GetStreamConfigRequest::builder()
.stream(stream)
.build();

let GetStreamConfigResponse { config } =
self.client.get_stream_config(get_stream_config_req).await?;
Ok(config)
}

pub async fn reconfigure_stream(
&self,
stream: String,
config: StreamConfig,
mask: Vec<String>,
) -> Result<(), BasinServiceError> {
let reconfigure_stream_req = s2::types::ReconfigureStreamRequest::builder()
.stream(stream)
.config(config)
.mask(mask)
.build();

self.client
.list_streams(list_streams_req)
.await
.map_err(BasinServiceError::ListStreams)
.reconfigure_stream(reconfigure_stream_req)
.await?;

Ok(())
}
}
24 changes: 0 additions & 24 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,23 +24,6 @@ fn get_help() -> &'static str {
})
}

const INVALID_PATH_HELP: &str = color_print::cstr!(
r#"
A valid basin configuration looks like this:
<yellow>
{
"basin": "my-basin-1",
"config": {
"defaultStreamConfig": {
"storageClass": "STORAGE_CLASS_STANDARD",
}
}
}
</yellow>
And a path like <red>default_stream_config.storage_class</red> would be valid.
"#
);

#[derive(Error, Debug, Diagnostic)]
pub enum S2CliError {
#[error(transparent)]
Expand All @@ -59,16 +42,9 @@ pub enum S2CliError {
#[diagnostic(help("{}", get_help()))]
BasinService(#[from] BasinServiceError),

#[error(transparent)]
InvalidConfigSubPath(#[from] json_dotpath::Error),

#[error(transparent)]
InvalidConfig(#[from] serde_json::Error),

#[error("Path: {0} not found!")]
#[diagnostic(help("{}", INVALID_PATH_HELP))]
PathKeyNotFound(String),

#[error("Failed to interact for confirmation!")]
#[diagnostic(help("{}", get_help()))]
ConfirmationError(#[from] dialoguer::Error),
Expand Down
Loading

0 comments on commit 93062bf

Please sign in to comment.