From 271d34448ee151f5f11984e80a63cdeee8a6da8d Mon Sep 17 00:00:00 2001 From: Vaibhav Rabber Date: Fri, 20 Dec 2024 01:49:05 +0530 Subject: [PATCH] upgrade sdk Signed-off-by: Vaibhav Rabber --- Cargo.lock | 4 ++-- Cargo.toml | 2 +- src/account.rs | 2 +- src/basin.rs | 2 +- src/error.rs | 2 +- src/main.rs | 12 +++++------- src/ping.rs | 2 +- src/stream.rs | 4 ++-- src/types.rs | 53 +++++++++++++++++++++++++------------------------- 9 files changed, 40 insertions(+), 43 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c3516f5..2e5f7d2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1624,9 +1624,9 @@ checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" [[package]] name = "streamstore" -version = "0.4.1" +version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ade73d3420a73c14e8560586d061b9eee83b60730b7645dd76944a986839281b" +checksum = "ed0565bd1f1642566b2f7473b8d101ed0c123b700a812d94c80febc645299974" dependencies = [ "async-stream", "backon", diff --git a/Cargo.toml b/Cargo.toml index 2a04c7d..6ab863a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,7 +28,7 @@ miette = { version = "7.2.0", features = ["fancy"] } rand = "0.8.5" serde = { version = "1.0.214", features = ["derive"] } serde_json = "1.0.132" -streamstore = "0.4.1" +streamstore = "0.5.0" thiserror = "2.0.6" tokio = { version = "1.41.1", features = ["full"] } tokio-stream = { version = "0.1.16", features = ["io-util"] } diff --git a/src/account.rs b/src/account.rs index 82eecb6..fae5b07 100644 --- a/src/account.rs +++ b/src/account.rs @@ -1,4 +1,4 @@ -use streamstore::{ +use s2::{ client::Client, types::{ BasinConfig, BasinInfo, BasinName, CreateBasinRequest, DeleteBasinRequest, diff --git a/src/basin.rs b/src/basin.rs index a11fbf3..7ced8d2 100644 --- a/src/basin.rs +++ b/src/basin.rs @@ -1,4 +1,4 @@ -use streamstore::{ +use s2::{ client::BasinClient, types::{ CreateStreamRequest, DeleteStreamRequest, ListStreamsRequest, ListStreamsResponse, diff --git a/src/error.rs b/src/error.rs index 6e696a1..68b5e8b 100644 --- a/src/error.rs +++ b/src/error.rs @@ -1,5 +1,5 @@ use miette::Diagnostic; -use streamstore::{client::ClientError, types::ConvertError}; +use s2::{client::ClientError, types::ConvertError}; use thiserror::Error; use crate::config::S2ConfigError; diff --git a/src/main.rs b/src/main.rs index a735147..26c9192 100644 --- a/src/main.rs +++ b/src/main.rs @@ -14,8 +14,7 @@ use error::{S2CliError, ServiceError, ServiceErrorContext}; use indicatif::{MultiProgress, ProgressBar, ProgressStyle}; use ping::{LatencyStats, PingResult, Pinger}; use rand::Rng; -use stream::{RecordStream, StreamService}; -use streamstore::{ +use s2::{ batching::AppendRecordsBatchingOpts, client::{BasinClient, Client, ClientConfig, S2Endpoints, StreamClient}, types::{ @@ -23,6 +22,7 @@ use streamstore::{ ReadOutput, StreamInfo, }, }; +use stream::{RecordStream, StreamService}; use tokio::{ fs::{File, OpenOptions}, io::{AsyncBufReadExt, AsyncWrite, AsyncWriteExt, BufWriter}, @@ -461,8 +461,8 @@ async fn run() -> Result<(), S2CliError> { let BasinInfo { name, state, .. } = basin_info; let state = match state { - streamstore::types::BasinState::Active => state.to_string().green(), - streamstore::types::BasinState::Deleting => state.to_string().red(), + s2::types::BasinState::Active => state.to_string().green(), + s2::types::BasinState::Deleting => state.to_string().red(), _ => state.to_string().yellow(), }; println!("{} {}", name, state); @@ -486,9 +486,7 @@ async fn run() -> Result<(), S2CliError> { .await?; let message = match state { - streamstore::types::BasinState::Creating => { - "✓ Basin creation requested".yellow().bold() - } + s2::types::BasinState::Creating => "✓ Basin creation requested".yellow().bold(), _ => "✓ Basin created".green().bold(), }; eprintln!("{message}"); diff --git a/src/ping.rs b/src/ping.rs index 1105eaf..53be389 100644 --- a/src/ping.rs +++ b/src/ping.rs @@ -1,7 +1,7 @@ use std::time::Duration; use rand::{distributions::Uniform, Rng}; -use streamstore::{ +use s2::{ batching::AppendRecordsBatchingOpts, types::{AppendOutput, AppendRecord, ReadOutput, SequencedRecord, SequencedRecordBatch}, }; diff --git a/src/stream.rs b/src/stream.rs index 05e85b1..a34f875 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -1,4 +1,4 @@ -use streamstore::{ +use s2::{ batching::{AppendRecordsBatchingOpts, AppendRecordsBatchingStream}, client::StreamClient, types::{ @@ -9,9 +9,9 @@ use streamstore::{ }; use futures::{Stream, StreamExt}; +use s2::types::AppendRecord; use std::pin::Pin; use std::task::{Context, Poll}; -use streamstore::types::AppendRecord; use crate::error::{ServiceError, ServiceErrorContext}; diff --git a/src/types.rs b/src/types.rs index f3ca8b8..14d931a 100644 --- a/src/types.rs +++ b/src/types.rs @@ -1,9 +1,9 @@ -//! Types for Basin configuration that directly map to streamstore::types. +//! Types for Basin configuration that directly map to s2::types. use clap::{Parser, ValueEnum}; +use s2::types::BasinName; use serde::Serialize; use std::{str::FromStr, time::Duration}; -use streamstore::types::BasinName; use crate::error::{BasinNameOrUriParseError, S2CliError}; @@ -166,26 +166,25 @@ impl From<&str> for RetentionPolicy { } } -impl From for streamstore::types::BasinConfig { +impl From for s2::types::BasinConfig { fn from(config: BasinConfig) -> Self { let BasinConfig { default_stream_config, } = config; - streamstore::types::BasinConfig { + s2::types::BasinConfig { default_stream_config: default_stream_config.map(Into::into), } } } -impl From for streamstore::types::StreamConfig { +impl From for s2::types::StreamConfig { fn from(config: StreamConfig) -> Self { let storage_class = config .storage_class - .map(streamstore::types::StorageClass::from) - .unwrap_or(streamstore::types::StorageClass::Unspecified); + .map(s2::types::StorageClass::from) + .unwrap_or(s2::types::StorageClass::Unspecified); let retention_policy = config.retention_policy.map(|r| r.into()); - let stream_config = - streamstore::types::StreamConfig::new().with_storage_class(storage_class); + let stream_config = s2::types::StreamConfig::new().with_storage_class(storage_class); if let Some(retention_policy) = retention_policy { stream_config.with_retention_policy(retention_policy) @@ -195,52 +194,52 @@ impl From for streamstore::types::StreamConfig { } } -impl From for streamstore::types::StorageClass { +impl From for s2::types::StorageClass { fn from(class: StorageClass) -> Self { match class { - StorageClass::Unspecified => streamstore::types::StorageClass::Unspecified, - StorageClass::Standard => streamstore::types::StorageClass::Standard, - StorageClass::Express => streamstore::types::StorageClass::Express, + StorageClass::Unspecified => s2::types::StorageClass::Unspecified, + StorageClass::Standard => s2::types::StorageClass::Standard, + StorageClass::Express => s2::types::StorageClass::Express, } } } -impl From for StorageClass { - fn from(class: streamstore::types::StorageClass) -> Self { +impl From for StorageClass { + fn from(class: s2::types::StorageClass) -> Self { match class { - streamstore::types::StorageClass::Unspecified => StorageClass::Unspecified, - streamstore::types::StorageClass::Standard => StorageClass::Standard, - streamstore::types::StorageClass::Express => StorageClass::Express, + s2::types::StorageClass::Unspecified => StorageClass::Unspecified, + s2::types::StorageClass::Standard => StorageClass::Standard, + s2::types::StorageClass::Express => StorageClass::Express, } } } -impl From for streamstore::types::RetentionPolicy { +impl From for s2::types::RetentionPolicy { fn from(policy: RetentionPolicy) -> Self { match policy { - RetentionPolicy::Age(d) => streamstore::types::RetentionPolicy::Age(d), + RetentionPolicy::Age(d) => s2::types::RetentionPolicy::Age(d), } } } -impl From for RetentionPolicy { - fn from(policy: streamstore::types::RetentionPolicy) -> Self { +impl From for RetentionPolicy { + fn from(policy: s2::types::RetentionPolicy) -> Self { match policy { - streamstore::types::RetentionPolicy::Age(d) => RetentionPolicy::Age(d), + s2::types::RetentionPolicy::Age(d) => RetentionPolicy::Age(d), } } } -impl From for BasinConfig { - fn from(config: streamstore::types::BasinConfig) -> Self { +impl From for BasinConfig { + fn from(config: s2::types::BasinConfig) -> Self { BasinConfig { default_stream_config: config.default_stream_config.map(Into::into), } } } -impl From for StreamConfig { - fn from(config: streamstore::types::StreamConfig) -> Self { +impl From for StreamConfig { + fn from(config: s2::types::StreamConfig) -> Self { StreamConfig { storage_class: Some(config.storage_class.into()), retention_policy: config.retention_policy.map(|r| r.into()),