Skip to content

Commit

Permalink
chore: update CLI to latest sdk (#37)
Browse files Browse the repository at this point in the history
closes #36
  • Loading branch information
infiniteregrets authored Nov 26, 2024
1 parent ecd1abc commit 2d711b8
Show file tree
Hide file tree
Showing 6 changed files with 70 additions and 15 deletions.
30 changes: 28 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ serde = { version = "1.0.214", features = ["derive"] }
serde_json = "1.0.132"
signal-hook = "0.3.17"
signal-hook-tokio = { version = "0.3.1", features = ["futures-v0_3"] }
streamstore = { git = "https://github.com/s2-streamstore/s2-sdk-rust.git", rev = "63b4964b66503f705e7c73ae07ba47f81019b79a" }
streamstore = { git = "https://github.com/s2-streamstore/s2-sdk-rust.git", rev = "35cf0314dbc230f8f96c860a2ca8f5217f68151e" }
thiserror = "1.0.67"
tokio = { version = "*", features = ["full"] }
tokio-stream = { version = "0.1.16", features = ["io-util"] }
Expand Down
8 changes: 5 additions & 3 deletions src/account.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use streamstore::{
client::Client,
types::{
BasinConfig, BasinMetadata, BasinName, CreateBasinRequest, DeleteBasinRequest,
BasinConfig, BasinInfo, BasinName, CreateBasinRequest, DeleteBasinRequest,
ListBasinsRequest, ListBasinsResponse, ReconfigureBasinRequest, StreamConfig,
},
};
Expand Down Expand Up @@ -39,7 +39,7 @@ impl AccountService {
basin: BasinName,
storage_class: Option<crate::types::StorageClass>,
retention_policy: Option<crate::types::RetentionPolicy>,
) -> Result<BasinMetadata, ServiceError> {
) -> Result<BasinInfo, ServiceError> {
let mut stream_config = StreamConfig::new();

if let Some(storage_class) = storage_class {
Expand Down Expand Up @@ -86,6 +86,8 @@ impl AccountService {
self.client
.reconfigure_basin(reconfigure_basin_req)
.await
.map_err(|e| ServiceError::new(ServiceErrorContext::ReconfigureBasin, e))
.map_err(|e| ServiceError::new(ServiceErrorContext::ReconfigureBasin, e))?;

Ok(())
}
}
4 changes: 2 additions & 2 deletions src/basin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use streamstore::{
client::BasinClient,
types::{
CreateStreamRequest, DeleteStreamRequest, ListStreamsRequest, ListStreamsResponse,
ReconfigureStreamRequest, StreamConfig,
ReconfigureStreamRequest, StreamConfig, StreamInfo,
},
};

Expand All @@ -22,7 +22,7 @@ impl BasinService {
prefix: String,
start_after: String,
limit: usize,
) -> Result<Vec<String>, ServiceError> {
) -> Result<Vec<StreamInfo>, ServiceError> {
let list_streams_req = ListStreamsRequest::new()
.with_prefix(prefix)
.with_start_after(start_after)
Expand Down
33 changes: 27 additions & 6 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use std::path::PathBuf;
use std::{
path::PathBuf,
time::{Duration, UNIX_EPOCH},
};

use account::AccountService;
use basin::BasinService;
Expand All @@ -12,7 +15,7 @@ use stream::{RecordStream, StreamService};
use streamstore::{
bytesize::ByteSize,
client::{BasinClient, Client, ClientConfig, S2Endpoints, StreamClient},
types::{BasinMetadata, BasinName, MeteredSize as _, ReadOutput},
types::{BasinInfo, BasinName, MeteredSize as _, ReadOutput, StreamInfo},
HeaderValue,
};
use tokio::{
Expand Down Expand Up @@ -355,8 +358,8 @@ async fn run() -> Result<(), S2CliError> {
)
.await?;

for basin_metadata in response.basins {
let BasinMetadata { name, state, .. } = basin_metadata;
for basin_info in response.basins {
let BasinInfo { name, state, .. } = basin_info;

let state = match state {
streamstore::types::BasinState::Active => state.to_string().green(),
Expand Down Expand Up @@ -430,8 +433,26 @@ async fn run() -> Result<(), S2CliError> {
limit.unwrap_or_default(),
)
.await?;
for stream in streams {
println!("{}", stream);
for StreamInfo {
name,
created_at,
deleted_at,
} in streams
{
let date_time = |time: u32| {
humantime::format_rfc3339_seconds(
UNIX_EPOCH + Duration::from_secs(time as u64),
)
};

println!(
"{} {} {}",
name,
date_time(created_at).to_string().green(),
deleted_at
.map(|d| date_time(d).to_string().red())
.unwrap_or_default()
);
}
}

Expand Down
8 changes: 7 additions & 1 deletion src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,13 @@ impl<R: AsyncBufRead> Stream for RecordStream<R> {
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project();
match this.inner.as_mut().poll_next_line(cx) {
Poll::Ready(Ok(Some(line))) => Poll::Ready(Some(AppendRecord::new(line))),
Poll::Ready(Ok(Some(line))) => match AppendRecord::new(line) {
Ok(record) => Poll::Ready(Some(record)),
Err(e) => {
eprintln!("Error parsing line: {}", e);
Poll::Ready(None)
}
},
Poll::Ready(Ok(None)) => Poll::Ready(None),
Poll::Ready(Err(e)) => {
eprintln!("Error reading line: {}", e);
Expand Down

0 comments on commit 2d711b8

Please sign in to comment.