diff --git a/Cargo.lock b/Cargo.lock index a125dde..f60a1d7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -280,12 +280,6 @@ version = "1.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9ac0150caa2ae65ca5bd83f25c7de183dea78d4d366469f148435e2acfbad0da" -[[package]] -name = "bytesize" -version = "1.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a3e368af43e418a04d52505cf3dbc23dda4e3407ae2fa99fd0e4f308ce546acc" - [[package]] name = "cc" version = "1.2.1" @@ -1695,12 +1689,11 @@ checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" [[package]] name = "streamstore" version = "0.1.0" -source = "git+https://github.com/s2-streamstore/s2-sdk-rust.git?rev=88d6539cbec2165a79f07de4d3a13a9539b547d6#88d6539cbec2165a79f07de4d3a13a9539b547d6" +source = "git+https://github.com/s2-streamstore/s2-sdk-rust.git?rev=d40de3df1c76ba71361f9e9f5327aee1619763cb#d40de3df1c76ba71361f9e9f5327aee1619763cb" dependencies = [ "async-stream", "backon", "bytes", - "bytesize", "futures", "http", "hyper", @@ -1719,6 +1712,7 @@ dependencies = [ "tonic-side-effect", "tower-service", "tracing", + "uuid", ] [[package]] @@ -1768,7 +1762,7 @@ dependencies = [ [[package]] name = "sync_docs" version = "0.1.0" -source = "git+https://github.com/s2-streamstore/s2-sdk-rust.git?rev=88d6539cbec2165a79f07de4d3a13a9539b547d6#88d6539cbec2165a79f07de4d3a13a9539b547d6" +source = "git+https://github.com/s2-streamstore/s2-sdk-rust.git?rev=d40de3df1c76ba71361f9e9f5327aee1619763cb#d40de3df1c76ba71361f9e9f5327aee1619763cb" dependencies = [ "proc-macro2", "quote", @@ -2188,6 +2182,16 @@ version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" +[[package]] +name = "uuid" +version = "1.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f8c5f0a0af699448548ad1a2fbf920fb4bee257eae39953ba95cb84891a0446a" +dependencies = [ + "getrandom", + "rand", +] + [[package]] name = "valuable" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index 064b3a1..6e8882a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,4 +28,4 @@ tracing-subscriber = { version = "0.3.18", features = ["env-filter"] } [dependencies.streamstore] git = "https://github.com/s2-streamstore/s2-sdk-rust.git" -rev = "88d6539cbec2165a79f07de4d3a13a9539b547d6" +rev = "d40de3df1c76ba71361f9e9f5327aee1619763cb" diff --git a/src/main.rs b/src/main.rs index a7346d1..a7c85ad 100644 --- a/src/main.rs +++ b/src/main.rs @@ -13,9 +13,8 @@ use signal_hook::consts::{SIGINT, SIGTERM, SIGTSTP}; use signal_hook_tokio::Signals; use stream::{RecordStream, StreamService}; use streamstore::{ - bytesize::ByteSize, client::{BasinClient, Client, ClientConfig, S2Endpoints, StreamClient}, - types::{BasinInfo, BasinName, FencingToken, MeteredSize as _, ReadOutput, StreamInfo}, + types::{BasinInfo, BasinName, FencingToken, MeteredBytes as _, ReadOutput, StreamInfo}, HeaderValue, }; use tokio::{ @@ -245,7 +244,7 @@ enum StreamActions { /// Limit the number of bytes returned. #[arg(short = 'b', long)] - limit_bytes: Option, + limit_bytes: Option, }, } @@ -600,7 +599,7 @@ async fn run() -> Result<(), S2CliError> { let mut writer = output.into_writer().await.unwrap(); let mut start = None; - let mut total_data_len = ByteSize::b(0); + let mut total_data_len = 0; loop { select! { @@ -613,7 +612,7 @@ async fn run() -> Result<(), S2CliError> { match read_result { Ok(ReadOutput::Batch(sequenced_record_batch)) => { let num_records = sequenced_record_batch.records.len(); - let mut batch_len = ByteSize::b(0); + let mut batch_len = 0; let seq_range = match ( sequenced_record_batch.records.first(), @@ -624,7 +623,7 @@ async fn run() -> Result<(), S2CliError> { }; for sequenced_record in sequenced_record_batch.records { let data = &sequenced_record.body; - batch_len += sequenced_record.metered_size(); + batch_len += sequenced_record.metered_bytes(); writer .write_all(data) @@ -637,7 +636,7 @@ async fn run() -> Result<(), S2CliError> { } total_data_len += batch_len; - let throughput_mibps = (total_data_len.0 as f64 + let throughput_mibps = (total_data_len as f64 / start.unwrap().elapsed().as_secs_f64()) / 1024.0 / 1024.0; @@ -683,7 +682,7 @@ async fn run() -> Result<(), S2CliError> { let total_elapsed_time = start.unwrap().elapsed().as_secs_f64(); let total_throughput_mibps = - (total_data_len.0 as f64 / total_elapsed_time) / 1024.0 / 1024.0; + (total_data_len as f64 / total_elapsed_time) / 1024.0 / 1024.0; eprintln!( "{}", diff --git a/src/stream.rs b/src/stream.rs index 5550c7e..d08ba79 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -14,7 +14,6 @@ use tokio::io::Lines; use tokio_stream::Stream; use crate::error::{ServiceError, ServiceErrorContext}; -use crate::ByteSize; pin_project! { #[derive(Debug)] @@ -91,11 +90,11 @@ impl StreamService { &self, start_seq_num: u64, limit_count: Option, - limit_bytes: Option, + limit_bytes: Option, ) -> Result, ServiceError> { let read_session_req = ReadSessionRequest { start_seq_num: Some(start_seq_num), - limit: match (limit_count, limit_bytes.map(|b| b.as_u64())) { + limit: match (limit_count, limit_bytes) { (Some(count), Some(bytes)) => Some(ReadLimit { count, bytes }), (Some(count), None) => Some(ReadLimit { count, bytes: 0 }), (None, Some(bytes)) => Some(ReadLimit { count: 0, bytes }),