Skip to content

Commit

Permalink
upgrade sdk
Browse files Browse the repository at this point in the history
Signed-off-by: Vaibhav Rabber <[email protected]>
  • Loading branch information
vrongmeal committed Dec 19, 2024
1 parent 77e149b commit 271d344
Show file tree
Hide file tree
Showing 9 changed files with 40 additions and 43 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ miette = { version = "7.2.0", features = ["fancy"] }
rand = "0.8.5"
serde = { version = "1.0.214", features = ["derive"] }
serde_json = "1.0.132"
streamstore = "0.4.1"
streamstore = "0.5.0"
thiserror = "2.0.6"
tokio = { version = "1.41.1", features = ["full"] }
tokio-stream = { version = "0.1.16", features = ["io-util"] }
Expand Down
2 changes: 1 addition & 1 deletion src/account.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use streamstore::{
use s2::{
client::Client,
types::{
BasinConfig, BasinInfo, BasinName, CreateBasinRequest, DeleteBasinRequest,
Expand Down
2 changes: 1 addition & 1 deletion src/basin.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use streamstore::{
use s2::{
client::BasinClient,
types::{
CreateStreamRequest, DeleteStreamRequest, ListStreamsRequest, ListStreamsResponse,
Expand Down
2 changes: 1 addition & 1 deletion src/error.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use miette::Diagnostic;
use streamstore::{client::ClientError, types::ConvertError};
use s2::{client::ClientError, types::ConvertError};
use thiserror::Error;

use crate::config::S2ConfigError;
Expand Down
12 changes: 5 additions & 7 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,15 @@ use error::{S2CliError, ServiceError, ServiceErrorContext};
use indicatif::{MultiProgress, ProgressBar, ProgressStyle};
use ping::{LatencyStats, PingResult, Pinger};
use rand::Rng;
use stream::{RecordStream, StreamService};
use streamstore::{
use s2::{
batching::AppendRecordsBatchingOpts,
client::{BasinClient, Client, ClientConfig, S2Endpoints, StreamClient},
types::{
AppendRecordBatch, BasinInfo, CommandRecord, ConvertError, FencingToken, MeteredBytes as _,
ReadOutput, StreamInfo,
},
};
use stream::{RecordStream, StreamService};
use tokio::{
fs::{File, OpenOptions},
io::{AsyncBufReadExt, AsyncWrite, AsyncWriteExt, BufWriter},
Expand Down Expand Up @@ -461,8 +461,8 @@ async fn run() -> Result<(), S2CliError> {
let BasinInfo { name, state, .. } = basin_info;

let state = match state {
streamstore::types::BasinState::Active => state.to_string().green(),
streamstore::types::BasinState::Deleting => state.to_string().red(),
s2::types::BasinState::Active => state.to_string().green(),
s2::types::BasinState::Deleting => state.to_string().red(),
_ => state.to_string().yellow(),
};
println!("{} {}", name, state);
Expand All @@ -486,9 +486,7 @@ async fn run() -> Result<(), S2CliError> {
.await?;

let message = match state {
streamstore::types::BasinState::Creating => {
"✓ Basin creation requested".yellow().bold()
}
s2::types::BasinState::Creating => "✓ Basin creation requested".yellow().bold(),
_ => "✓ Basin created".green().bold(),
};
eprintln!("{message}");
Expand Down
2 changes: 1 addition & 1 deletion src/ping.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::time::Duration;

use rand::{distributions::Uniform, Rng};
use streamstore::{
use s2::{
batching::AppendRecordsBatchingOpts,
types::{AppendOutput, AppendRecord, ReadOutput, SequencedRecord, SequencedRecordBatch},
};
Expand Down
4 changes: 2 additions & 2 deletions src/stream.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use streamstore::{
use s2::{
batching::{AppendRecordsBatchingOpts, AppendRecordsBatchingStream},
client::StreamClient,
types::{
Expand All @@ -9,9 +9,9 @@ use streamstore::{
};

use futures::{Stream, StreamExt};
use s2::types::AppendRecord;
use std::pin::Pin;
use std::task::{Context, Poll};
use streamstore::types::AppendRecord;

use crate::error::{ServiceError, ServiceErrorContext};

Expand Down
53 changes: 26 additions & 27 deletions src/types.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
//! Types for Basin configuration that directly map to streamstore::types.
//! Types for Basin configuration that directly map to s2::types.
use clap::{Parser, ValueEnum};
use s2::types::BasinName;
use serde::Serialize;
use std::{str::FromStr, time::Duration};
use streamstore::types::BasinName;

use crate::error::{BasinNameOrUriParseError, S2CliError};

Expand Down Expand Up @@ -166,26 +166,25 @@ impl From<&str> for RetentionPolicy {
}
}

impl From<BasinConfig> for streamstore::types::BasinConfig {
impl From<BasinConfig> for s2::types::BasinConfig {
fn from(config: BasinConfig) -> Self {
let BasinConfig {
default_stream_config,
} = config;
streamstore::types::BasinConfig {
s2::types::BasinConfig {
default_stream_config: default_stream_config.map(Into::into),
}
}
}

impl From<StreamConfig> for streamstore::types::StreamConfig {
impl From<StreamConfig> for s2::types::StreamConfig {
fn from(config: StreamConfig) -> Self {
let storage_class = config
.storage_class
.map(streamstore::types::StorageClass::from)
.unwrap_or(streamstore::types::StorageClass::Unspecified);
.map(s2::types::StorageClass::from)
.unwrap_or(s2::types::StorageClass::Unspecified);
let retention_policy = config.retention_policy.map(|r| r.into());
let stream_config =
streamstore::types::StreamConfig::new().with_storage_class(storage_class);
let stream_config = s2::types::StreamConfig::new().with_storage_class(storage_class);

if let Some(retention_policy) = retention_policy {
stream_config.with_retention_policy(retention_policy)
Expand All @@ -195,52 +194,52 @@ impl From<StreamConfig> for streamstore::types::StreamConfig {
}
}

impl From<StorageClass> for streamstore::types::StorageClass {
impl From<StorageClass> for s2::types::StorageClass {
fn from(class: StorageClass) -> Self {
match class {
StorageClass::Unspecified => streamstore::types::StorageClass::Unspecified,
StorageClass::Standard => streamstore::types::StorageClass::Standard,
StorageClass::Express => streamstore::types::StorageClass::Express,
StorageClass::Unspecified => s2::types::StorageClass::Unspecified,
StorageClass::Standard => s2::types::StorageClass::Standard,
StorageClass::Express => s2::types::StorageClass::Express,
}
}
}

impl From<streamstore::types::StorageClass> for StorageClass {
fn from(class: streamstore::types::StorageClass) -> Self {
impl From<s2::types::StorageClass> for StorageClass {
fn from(class: s2::types::StorageClass) -> Self {
match class {
streamstore::types::StorageClass::Unspecified => StorageClass::Unspecified,
streamstore::types::StorageClass::Standard => StorageClass::Standard,
streamstore::types::StorageClass::Express => StorageClass::Express,
s2::types::StorageClass::Unspecified => StorageClass::Unspecified,
s2::types::StorageClass::Standard => StorageClass::Standard,
s2::types::StorageClass::Express => StorageClass::Express,
}
}
}

impl From<RetentionPolicy> for streamstore::types::RetentionPolicy {
impl From<RetentionPolicy> for s2::types::RetentionPolicy {
fn from(policy: RetentionPolicy) -> Self {
match policy {
RetentionPolicy::Age(d) => streamstore::types::RetentionPolicy::Age(d),
RetentionPolicy::Age(d) => s2::types::RetentionPolicy::Age(d),
}
}
}

impl From<streamstore::types::RetentionPolicy> for RetentionPolicy {
fn from(policy: streamstore::types::RetentionPolicy) -> Self {
impl From<s2::types::RetentionPolicy> for RetentionPolicy {
fn from(policy: s2::types::RetentionPolicy) -> Self {
match policy {
streamstore::types::RetentionPolicy::Age(d) => RetentionPolicy::Age(d),
s2::types::RetentionPolicy::Age(d) => RetentionPolicy::Age(d),
}
}
}

impl From<streamstore::types::BasinConfig> for BasinConfig {
fn from(config: streamstore::types::BasinConfig) -> Self {
impl From<s2::types::BasinConfig> for BasinConfig {
fn from(config: s2::types::BasinConfig) -> Self {
BasinConfig {
default_stream_config: config.default_stream_config.map(Into::into),
}
}
}

impl From<streamstore::types::StreamConfig> for StreamConfig {
fn from(config: streamstore::types::StreamConfig) -> Self {
impl From<s2::types::StreamConfig> for StreamConfig {
fn from(config: s2::types::StreamConfig) -> Self {
StreamConfig {
storage_class: Some(config.storage_class.into()),
retention_policy: config.retention_policy.map(|r| r.into()),
Expand Down

0 comments on commit 271d344

Please sign in to comment.