From 26637d6e9cae5a74c608c19003c762dcc59ee71f Mon Sep 17 00:00:00 2001 From: Vaibhav Rabber Date: Sat, 7 Dec 2024 00:10:56 +0530 Subject: [PATCH 01/10] feat: Pingtest (#48) Resolves: #48 Signed-off-by: Vaibhav Rabber --- Cargo.lock | 1 + Cargo.toml | 2 +- src/error.rs | 42 ++++++++----- src/main.rs | 161 +++++++++++++++++++++++++++++++++++++++++++++++--- src/stream.rs | 11 +++- 5 files changed, 190 insertions(+), 27 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 14347f6..17bc1aa 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1889,6 +1889,7 @@ dependencies = [ "futures-core", "pin-project-lite", "tokio", + "tokio-util", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 6e21132..efaef0e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,7 +19,7 @@ serde = { version = "1.0.214", features = ["derive"] } serde_json = "1.0.132" thiserror = "1.0.67" tokio = { version = "1.41.1", features = ["full"] } -tokio-stream = { version = "0.1.16", features = ["io-util"] } +tokio-stream = { version = "0.1.16", features = ["io-util", "sync"] } toml = "0.8.19" tracing = "0.1.40" tracing-subscriber = { version = "0.3.18", features = ["env-filter"] } diff --git a/src/error.rs b/src/error.rs index cde23f2..e3cf8f4 100644 --- a/src/error.rs +++ b/src/error.rs @@ -43,6 +43,9 @@ pub enum S2CliError { #[error("Failed to initialize a `Record Reader`! {0}")] RecordReaderInit(String), + #[error("Stream mutated concurrently during pingtest")] + PingtestStreamMutated, + #[error("Failed to write records: {0}")] RecordWrite(String), @@ -73,21 +76,21 @@ pub enum ServiceErrorContext { impl std::fmt::Display for ServiceErrorContext { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { - ServiceErrorContext::ListBasins => write!(f, "Failed to list basins"), - ServiceErrorContext::CreateBasin => write!(f, "Failed to create basin"), - ServiceErrorContext::DeleteBasin => write!(f, "Failed to delete basin"), - ServiceErrorContext::GetBasinConfig => write!(f, "Failed to get basin config"), - ServiceErrorContext::ReconfigureBasin => write!(f, "Failed to reconfigure basin"), - ServiceErrorContext::ListStreams => write!(f, "Failed to list streams"), - ServiceErrorContext::CreateStream => write!(f, "Failed to create stream"), - ServiceErrorContext::DeleteStream => write!(f, "Failed to delete stream"), - ServiceErrorContext::GetStreamConfig => write!(f, "Failed to get stream config"), - ServiceErrorContext::CheckTail => write!(f, "Failed to check tail"), - ServiceErrorContext::Trim => write!(f, "Failed to trim"), - ServiceErrorContext::Fence => write!(f, "Failed to set fencing token"), - ServiceErrorContext::AppendSession => write!(f, "Failed to append session"), - ServiceErrorContext::ReadSession => write!(f, "Failed to read session"), - ServiceErrorContext::ReconfigureStream => write!(f, "Failed to reconfigure stream"), + Self::ListBasins => write!(f, "Failed to list basins"), + Self::CreateBasin => write!(f, "Failed to create basin"), + Self::DeleteBasin => write!(f, "Failed to delete basin"), + Self::GetBasinConfig => write!(f, "Failed to get basin config"), + Self::ReconfigureBasin => write!(f, "Failed to reconfigure basin"), + Self::ListStreams => write!(f, "Failed to list streams"), + Self::CreateStream => write!(f, "Failed to create stream"), + Self::DeleteStream => write!(f, "Failed to delete stream"), + Self::GetStreamConfig => write!(f, "Failed to get stream config"), + Self::CheckTail => write!(f, "Failed to check tail"), + Self::Trim => write!(f, "Failed to trim"), + Self::Fence => write!(f, "Failed to set fencing token"), + Self::AppendSession => write!(f, "Failed to append session"), + Self::ReadSession => write!(f, "Failed to read session"), + Self::ReconfigureStream => write!(f, "Failed to reconfigure stream"), } } } @@ -100,6 +103,15 @@ pub struct ServiceStatus { pub status: String, } +impl ServiceStatus { + pub fn new(status: impl Into, message: impl Into) -> Self { + Self { + message: message.into(), + status: status.into(), + } + } +} + impl From for ServiceStatus { fn from(error: ClientError) -> Self { match error { diff --git a/src/main.rs b/src/main.rs index c576c6b..ae26139 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,6 +1,6 @@ use std::{ path::PathBuf, - time::{Duration, UNIX_EPOCH}, + time::{Duration, SystemTime, UNIX_EPOCH}, }; use account::AccountService; @@ -13,18 +13,18 @@ use stream::{RecordStream, StreamService}; use streamstore::{ client::{BasinClient, Client, ClientConfig, S2Endpoints, StreamClient}, types::{ - BasinInfo, BasinName, CommandRecord, FencingToken, MeteredBytes as _, ReadOutput, - StreamInfo, + AppendRecord, BasinInfo, BasinName, CommandRecord, FencingToken, Header, MeteredBytes as _, + ReadOutput, SequencedRecordBatch, StreamInfo, }, HeaderValue, }; -use tokio::signal; use tokio::{ fs::{File, OpenOptions}, io::{self, AsyncBufRead, AsyncBufReadExt, AsyncWrite, AsyncWriteExt, BufReader, BufWriter}, select, time::Instant, }; +use tokio::{signal, sync::mpsc}; use tokio_stream::StreamExt; use tracing::trace; use tracing_subscriber::{fmt::format::FmtSpan, layer::SubscriberExt, util::SubscriberInitExt}; @@ -270,6 +270,23 @@ enum Commands { #[arg(short = 'b', long)] limit_bytes: Option, }, + + /// Pingtest + Pingtest { + /// Name of the basin. + basin: BasinName, + + /// Name of the stream. + stream: String, + + /// Number of records. + #[arg(short = 'n', long, default_value_t = 1000)] + record_count: usize, + + /// Size of the record in bytes. + #[arg(short = 'b', long, default_value_t = 16 * 1000)] + record_bytes: u64, + }, } #[derive(Subcommand, Debug)] @@ -619,7 +636,7 @@ async fn run() -> Result<(), S2CliError> { ); let mut append_output_stream = StreamService::new(stream_client) - .append_session(append_input_stream, fencing_token, match_seq_num) + .append_session(append_input_stream, fencing_token, match_seq_num, None) .await?; loop { select! { @@ -682,9 +699,9 @@ async fn run() -> Result<(), S2CliError> { maybe_read_result = read_output_stream.next() => { match maybe_read_result { Some(read_result) => { - if start.is_none() { - start = Some(Instant::now()); - } + if start.is_none() { + start = Some(Instant::now()); + } match read_result { Ok(ReadOutput::Batch(sequenced_record_batch)) => { let num_records = sequenced_record_batch.records.len(); @@ -783,6 +800,134 @@ async fn run() -> Result<(), S2CliError> { writer.flush().await.expect("writer flush"); } } + + Commands::Pingtest { + basin, + stream, + record_count, + record_bytes, + } => { + let cfg = config::load_config(&config_path)?; + let client_config = client_config(cfg.auth_token)?; + let stream_client = StreamService::new(StreamClient::new(client_config, basin, stream)); + + let tail = stream_client.check_tail().await?; + + let mut read_stream = stream_client.read_session(tail, None, None).await?; + + let reads_handle = tokio::spawn(async move { + let mut reads = Vec::with_capacity(record_count); + + for _ in 0..record_count { + select! { + next = read_stream.next() => { + match next { + None => break, + Some(Err(e)) => { + return Err(ServiceError::new(ServiceErrorContext::ReadSession, e).into()); + } + Some(Ok(output)) => { + if let ReadOutput::Batch(SequencedRecordBatch { mut records }) = output { + if records.len() != 1 { + return Err(S2CliError::PingtestStreamMutated); + } + + let mut record = records.pop().unwrap(); + + if record.headers.len() != 1 { + return Err(S2CliError::PingtestStreamMutated); + } + + let header = record.headers.pop().unwrap(); + + if header.name.as_ref() != b"timestamp" { + return Err(S2CliError::PingtestStreamMutated); + } + + let append_timestamp = f64::from_be_bytes( + header + .value + .as_ref() + .try_into() + .map_err(|_| S2CliError::PingtestStreamMutated)?, + ); + let append_timestamp = Duration::from_secs_f64(append_timestamp); + + let now_timestamp = SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("valid duration"); + + reads.push(now_timestamp - append_timestamp); + } else { + return Err(S2CliError::PingtestStreamMutated); + } + } + } + } + _ = signal::ctrl_c() => { + break; + } + } + } + + Result::, S2CliError>::Ok(reads) + }); + + let (tx, rx) = mpsc::channel::(1); + + let append_stream = tokio_stream::wrappers::ReceiverStream::new(rx); + + let mut append_stream = stream_client + .append_session(append_stream, None, None, Some(1)) + .await?; + + let mut appends = Vec::with_capacity(record_count); + + for _ in 0..record_count { + let body = String::from_iter(std::iter::repeat_n('a', record_bytes as usize)); + + let start = SystemTime::now(); + let timestamp = start + .duration_since(UNIX_EPOCH) + .expect("valid duration") + .as_secs_f64(); + let rec = AppendRecord::new(body)?.with_headers(vec![Header::new( + b"timestamp".to_vec(), + timestamp.to_be_bytes().to_vec(), + )])?; + + select! { + send = tx.send(rec) => { + if send.is_ok() { + // This is not a correct representation of acknowledgement latency + // since the future is only polled later on. + // FIXME: Figure out how to send timestamp over to calculate ack latencies. + let _ = append_stream + .next() + .await + .expect("acknowledgement") + .map_err(|e| ServiceError::new(ServiceErrorContext::AppendSession, e))?; + let elapsed = start.elapsed().expect("valid duration"); + appends.push(elapsed); + } else { + // Receiver closed due to some error. + break; + } + } + + _ = signal::ctrl_c() => { + eprintln!("{}", "■ [ABORTED]".red().bold()); + std::mem::drop(tx); + break; + } + } + } + + let reads = reads_handle.await.expect("task panic")?; + + println!("appends = {appends:#?}"); + println!("reads = {reads:#?}"); + } } std::process::exit(0); } diff --git a/src/stream.rs b/src/stream.rs index 40c7f3d..0494544 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -88,15 +88,20 @@ impl StreamService { pub async fn append_session( &self, - append_input_stream: RecordStream>, + stream: impl 'static + Send + Stream + Unpin, fencing_token: Option, match_seq_num: Option, + max_batch_records: Option, ) -> Result, ServiceError> { - let opts = AppendRecordsBatchingOpts::default() + let mut opts = AppendRecordsBatchingOpts::default() .with_fencing_token(fencing_token) .with_match_seq_num(match_seq_num); - let append_record_stream = AppendRecordsBatchingStream::new(append_input_stream, opts); + if let Some(n) = max_batch_records { + opts = opts.with_max_batch_records(n); + } + + let append_record_stream = AppendRecordsBatchingStream::new(stream, opts); self.client .append_session(append_record_stream) From 4136ff6fd19c691133a618070388164733409d67 Mon Sep 17 00:00:00 2001 From: Vaibhav Rabber Date: Sat, 7 Dec 2024 02:33:45 +0530 Subject: [PATCH 02/10] .. Signed-off-by: Vaibhav Rabber --- src/error.rs | 9 --- src/main.rs | 191 ++++++++++++++++++++++++++------------------------- 2 files changed, 97 insertions(+), 103 deletions(-) diff --git a/src/error.rs b/src/error.rs index e3cf8f4..9baef60 100644 --- a/src/error.rs +++ b/src/error.rs @@ -103,15 +103,6 @@ pub struct ServiceStatus { pub status: String, } -impl ServiceStatus { - pub fn new(status: impl Into, message: impl Into) -> Self { - Self { - message: message.into(), - status: status.into(), - } - } -} - impl From for ServiceStatus { fn from(error: ClientError) -> Self { match error { diff --git a/src/main.rs b/src/main.rs index ae26139..12c2149 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,6 +1,7 @@ use std::{ + num::NonZeroU32, path::PathBuf, - time::{Duration, SystemTime, UNIX_EPOCH}, + time::{Duration, UNIX_EPOCH}, }; use account::AccountService; @@ -13,7 +14,7 @@ use stream::{RecordStream, StreamService}; use streamstore::{ client::{BasinClient, Client, ClientConfig, S2Endpoints, StreamClient}, types::{ - AppendRecord, BasinInfo, BasinName, CommandRecord, FencingToken, Header, MeteredBytes as _, + AppendRecord, BasinInfo, BasinName, CommandRecord, FencingToken, MeteredBytes as _, ReadOutput, SequencedRecordBatch, StreamInfo, }, HeaderValue, @@ -47,7 +48,7 @@ const STYLES: styling::Styles = styling::Styles::styled() const GENERAL_USAGE: &str = color_print::cstr!( r#" $ s2 config set --auth-token ... - $ s2 list-basins --prefix "foo" --limit 100 + $ s2 account list-basins --prefix "bar" --start-after "foo" --limit 100 "# ); @@ -211,7 +212,7 @@ enum Commands { /// Name of the stream. stream: String, - /// Payload upto 16 bytes in hex to set as the fencing token. + /// Payload upto 16 bytes to set as the fencing token. /// An empty payload clears the token. fencing_token: Option, }, @@ -226,8 +227,8 @@ enum Commands { /// Name of the stream. stream: String, - /// Enforce a fencing token specified in hex, - /// which must have been previously set by a `fence` command. + /// Enforce a fencing token which must have been previously set by a + /// `fence` command record. #[arg(short = 'f', long)] fencing_token: Option, @@ -280,8 +281,8 @@ enum Commands { stream: String, /// Number of records. - #[arg(short = 'n', long, default_value_t = 1000)] - record_count: usize, + #[arg(short = 'n', long, default_value = "1000")] + record_count: NonZeroU32, /// Size of the record in bytes. #[arg(short = 'b', long, default_value_t = 16 * 1000)] @@ -361,7 +362,8 @@ fn client_config(auth_token: String) -> Result { let endpoints = S2Endpoints::from_env().map_err(S2CliError::EndpointsFromEnv)?; let client_config = ClientConfig::new(auth_token.to_string()) .with_user_agent("s2-cli".parse::().expect("valid user agent")) - .with_endpoints(endpoints); + .with_endpoints(endpoints) + .with_request_timeout(Duration::from_secs(30)); Ok(client_config) } @@ -807,6 +809,8 @@ async fn run() -> Result<(), S2CliError> { record_count, record_bytes, } => { + let record_count = record_count.get() as usize; + let cfg = config::load_config(&config_path)?; let client_config = client_config(cfg.auth_token)?; let stream_client = StreamService::new(StreamClient::new(client_config, basin, stream)); @@ -818,62 +822,34 @@ async fn run() -> Result<(), S2CliError> { let reads_handle = tokio::spawn(async move { let mut reads = Vec::with_capacity(record_count); - for _ in 0..record_count { - select! { - next = read_stream.next() => { - match next { - None => break, - Some(Err(e)) => { - return Err(ServiceError::new(ServiceErrorContext::ReadSession, e).into()); - } - Some(Ok(output)) => { - if let ReadOutput::Batch(SequencedRecordBatch { mut records }) = output { - if records.len() != 1 { - return Err(S2CliError::PingtestStreamMutated); - } - - let mut record = records.pop().unwrap(); - - if record.headers.len() != 1 { - return Err(S2CliError::PingtestStreamMutated); - } - - let header = record.headers.pop().unwrap(); - - if header.name.as_ref() != b"timestamp" { - return Err(S2CliError::PingtestStreamMutated); - } - - let append_timestamp = f64::from_be_bytes( - header - .value - .as_ref() - .try_into() - .map_err(|_| S2CliError::PingtestStreamMutated)?, - ); - let append_timestamp = Duration::from_secs_f64(append_timestamp); - - let now_timestamp = SystemTime::now() - .duration_since(UNIX_EPOCH) - .expect("valid duration"); - - reads.push(now_timestamp - append_timestamp); - } else { - return Err(S2CliError::PingtestStreamMutated); - } + while let Some(next) = read_stream.next().await { + match next { + Err(e) => { + return Err( + ServiceError::new(ServiceErrorContext::ReadSession, e).into() + ); + } + Ok(output) => { + if let ReadOutput::Batch(SequencedRecordBatch { records }) = output { + let recv = Instant::now(); + for record in records { + reads.push((recv, record)); } + } else { + return Err(S2CliError::PingtestStreamMutated); } } - _ = signal::ctrl_c() => { - break; - } + } + + if reads.len() >= record_count { + break; } } - Result::, S2CliError>::Ok(reads) + Ok(reads) }); - let (tx, rx) = mpsc::channel::(1); + let (tx, rx) = mpsc::channel(1); let append_stream = tokio_stream::wrappers::ReceiverStream::new(rx); @@ -881,53 +857,80 @@ async fn run() -> Result<(), S2CliError> { .append_session(append_stream, None, None, Some(1)) .await?; - let mut appends = Vec::with_capacity(record_count); + let appends_handle = tokio::spawn(async move { + let mut appends = Vec::with_capacity(record_count); + + while let Some(next) = append_stream.next().await { + appends.push(Instant::now()); + + if let Err(e) = next { + return Err(ServiceError::new(ServiceErrorContext::AppendSession, e)); + } + } + + Ok(appends) + }); + + let mut sends = Vec::with_capacity(record_count); for _ in 0..record_count { + // TODO: Add jitter let body = String::from_iter(std::iter::repeat_n('a', record_bytes as usize)); + let rec = AppendRecord::new(body)?; - let start = SystemTime::now(); - let timestamp = start - .duration_since(UNIX_EPOCH) - .expect("valid duration") - .as_secs_f64(); - let rec = AppendRecord::new(body)?.with_headers(vec![Header::new( - b"timestamp".to_vec(), - timestamp.to_be_bytes().to_vec(), - )])?; + if tx.send(rec).await.is_ok() { + sends.push(Instant::now()); + } else { + // Receiver closed. + break; + } + } - select! { - send = tx.send(rec) => { - if send.is_ok() { - // This is not a correct representation of acknowledgement latency - // since the future is only polled later on. - // FIXME: Figure out how to send timestamp over to calculate ack latencies. - let _ = append_stream - .next() - .await - .expect("acknowledgement") - .map_err(|e| ServiceError::new(ServiceErrorContext::AppendSession, e))?; - let elapsed = start.elapsed().expect("valid duration"); - appends.push(elapsed); - } else { - // Receiver closed due to some error. - break; - } - } + // Close the stream. + std::mem::drop(tx); - _ = signal::ctrl_c() => { - eprintln!("{}", "■ [ABORTED]".red().bold()); - std::mem::drop(tx); - break; - } + let reads = reads_handle.await.expect("reads task panic")?; + let (reads, records): (Vec<_>, Vec<_>) = reads.into_iter().unzip(); + + // Verify records + // + // Doing this later on since bigger batch sizes might cause huge + // deflection in actual latencies. + for record in records { + if !record.headers.is_empty() { + return Err(S2CliError::PingtestStreamMutated); + } + + if record.body.iter().any(|b| *b != b'a') { + return Err(S2CliError::PingtestStreamMutated); } } - let reads = reads_handle.await.expect("task panic")?; + let appends = appends_handle.await.expect("appends task panic")?; + + let ack = appends + .iter() + .zip(sends.iter()) + .map(|(a, s)| *a - *s) + .collect::>(); - println!("appends = {appends:#?}"); - println!("reads = {reads:#?}"); + eprintln!( + "mean ack: {:?}", + ack.into_iter().sum::() / record_count as u32 + ); + + let e2e = reads + .iter() + .zip(sends.iter()) + .map(|(r, s)| *r - *s) + .collect::>(); + + eprintln!( + "mean e2e: {:?}", + e2e.into_iter().sum::() / record_count as u32 + ); } - } + }; + std::process::exit(0); } From 7a8f1bfb19616bbc0f597efc1a805184fa148e95 Mon Sep 17 00:00:00 2001 From: Vaibhav Rabber Date: Sat, 7 Dec 2024 02:34:54 +0530 Subject: [PATCH 03/10] .. Signed-off-by: Vaibhav Rabber --- Cargo.lock | 1 - Cargo.toml | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 17bc1aa..14347f6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1889,7 +1889,6 @@ dependencies = [ "futures-core", "pin-project-lite", "tokio", - "tokio-util", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index efaef0e..6e21132 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,7 +19,7 @@ serde = { version = "1.0.214", features = ["derive"] } serde_json = "1.0.132" thiserror = "1.0.67" tokio = { version = "1.41.1", features = ["full"] } -tokio-stream = { version = "0.1.16", features = ["io-util", "sync"] } +tokio-stream = { version = "0.1.16", features = ["io-util"] } toml = "0.8.19" tracing = "0.1.40" tracing-subscriber = { version = "0.3.18", features = ["env-filter"] } From ae83887cf2a08dab384d7a7b32ba9b3346f30cb3 Mon Sep 17 00:00:00 2001 From: Vaibhav Rabber Date: Mon, 9 Dec 2024 18:24:32 +0530 Subject: [PATCH 04/10] add jitter Signed-off-by: Vaibhav Rabber --- Cargo.lock | 1 + Cargo.toml | 1 + src/main.rs | 20 +++++++++++++++----- 3 files changed, 17 insertions(+), 5 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 14347f6..658d9bc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1523,6 +1523,7 @@ dependencies = [ "miette", "pin-project-lite", "pin-utils", + "rand", "serde", "serde_json", "streamstore", diff --git a/Cargo.toml b/Cargo.toml index 6e21132..f1b0c76 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,6 +15,7 @@ humantime = "2.1.0" miette = { version = "7.2.0", features = ["fancy"] } pin-project-lite = "0.2" pin-utils = "0.1.0" +rand = "0.8.5" serde = { version = "1.0.214", features = ["derive"] } serde_json = "1.0.132" thiserror = "1.0.67" diff --git a/src/main.rs b/src/main.rs index 12c2149..61d2ab4 100644 --- a/src/main.rs +++ b/src/main.rs @@ -10,12 +10,13 @@ use clap::{builder::styling, Parser, Subcommand}; use colored::*; use config::{config_path, create_config}; use error::{S2CliError, ServiceError, ServiceErrorContext}; +use rand::Rng; use stream::{RecordStream, StreamService}; use streamstore::{ client::{BasinClient, Client, ClientConfig, S2Endpoints, StreamClient}, types::{ - AppendRecord, BasinInfo, BasinName, CommandRecord, FencingToken, MeteredBytes as _, - ReadOutput, SequencedRecordBatch, StreamInfo, + AppendRecord, AppendRecordBatch, BasinInfo, BasinName, CommandRecord, FencingToken, + MeteredBytes as _, ReadOutput, SequencedRecordBatch, StreamInfo, }, HeaderValue, }; @@ -874,9 +875,18 @@ async fn run() -> Result<(), S2CliError> { let mut sends = Vec::with_capacity(record_count); for _ in 0..record_count { - // TODO: Add jitter - let body = String::from_iter(std::iter::repeat_n('a', record_bytes as usize)); - let rec = AppendRecord::new(body)?; + let jitter_op = if rand::random() { + u64::saturating_add + } else { + u64::saturating_sub + }; + + let record_bytes = jitter_op(record_bytes, rand::thread_rng().gen_range(0..=10)) + .min(AppendRecordBatch::MAX_BYTES); + + let body: String = std::iter::repeat_n('a', record_bytes as usize).collect(); + + let rec = AppendRecord::new(body).expect("pre validated append record bytes"); if tx.send(rec).await.is_ok() { sends.push(Instant::now()); From 3fb7fe4112ef486964915bf3820cfd1e4f41c5d2 Mon Sep 17 00:00:00 2001 From: Vaibhav Rabber Date: Mon, 9 Dec 2024 18:27:20 +0530 Subject: [PATCH 05/10] undo --- src/main.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main.rs b/src/main.rs index 3a0c3c8..3096f44 100644 --- a/src/main.rs +++ b/src/main.rs @@ -49,7 +49,7 @@ const STYLES: styling::Styles = styling::Styles::styled() const GENERAL_USAGE: &str = color_print::cstr!( r#" $ s2 config set --auth-token ... - $ s2 account list-basins --prefix "bar" --start-after "foo" --limit 100 + $ s2 list-basins --prefix "foo" --limit 100 "# ); From 950f38c9c8b567cd599e71cefa7662248de6d02b Mon Sep 17 00:00:00 2001 From: Vaibhav Rabber Date: Mon, 9 Dec 2024 19:10:08 +0530 Subject: [PATCH 06/10] report Signed-off-by: Vaibhav Rabber --- src/main.rs | 89 +++++++++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 80 insertions(+), 9 deletions(-) diff --git a/src/main.rs b/src/main.rs index 3096f44..c286242 100644 --- a/src/main.rs +++ b/src/main.rs @@ -305,7 +305,7 @@ enum Commands { record_count: NonZeroU32, /// Size of the record in bytes. - #[arg(short = 'b', long, default_value_t = 16 * 1000)] + #[arg(short = 'b', long, default_value_t = 16 * 1024)] record_bytes: u64, }, } @@ -971,10 +971,9 @@ async fn run() -> Result<(), S2CliError> { .map(|(a, s)| *a - *s) .collect::>(); - eprintln!( - "mean ack: {:?}", - ack.into_iter().sum::() / record_count as u32 - ); + LatencyStatsReport::generate(ack).print("Append acknowledgement"); + + eprintln!(); // Empty line let e2e = reads .iter() @@ -982,12 +981,84 @@ async fn run() -> Result<(), S2CliError> { .map(|(r, s)| *r - *s) .collect::>(); - eprintln!( - "mean e2e: {:?}", - e2e.into_iter().sum::() / record_count as u32 - ); + LatencyStatsReport::generate(e2e).print("End to end"); } }; std::process::exit(0); } + +struct LatencyStatsReport { + pub mean: Duration, + pub median: Duration, + pub p95: Duration, + pub p99: Duration, + pub max: Duration, + pub min: Duration, + pub stddev: Duration, +} + +impl LatencyStatsReport { + pub fn generate(mut data: Vec) -> Self { + data.sort_unstable(); + + let n = data.len(); + + let mean = data.iter().sum::() / n as u32; + + let median = if data.len() / 2 == 0 { + (data[n / 2 - 1] + data[n / 2]) / 2 + } else { + data[n / 2] + }; + + let p_idx = |p: f64| ((n as f64) * p).ceil() as usize - 1; + + let variance = data + .iter() + .map(|d| (d.as_secs_f64() - mean.as_secs_f64()).powi(2)) + .sum::() + / n as f64; + let stddev = Duration::from_secs_f64(variance.sqrt()); + + Self { + mean, + median, + p95: data[p_idx(0.95)], + p99: data[p_idx(0.99)], + max: data[n - 1], + min: data[0], + stddev, + } + } + + pub fn print(self, name: &str) { + eprintln!("{}", format!("{name} latency report").yellow().bold()); + + fn stat(key: &str, val: Duration) { + eprintln!( + "{}\t {}", + key, + humantime::format_duration(val).to_string().green() + ); + } + + let LatencyStatsReport { + mean, + median, + p95, + p99, + max, + min, + stddev, + } = self; + + stat("Mean", mean); + stat("Median", median); + stat("P95", p95); + stat("P99", p99); + stat("Max", max); + stat("Min", min); + stat("Std Dev", stddev); + } +} From 9f5671b0472b828db3674ad433b2375918fd45f9 Mon Sep 17 00:00:00 2001 From: Vaibhav Rabber Date: Mon, 9 Dec 2024 22:30:08 +0530 Subject: [PATCH 07/10] .. Signed-off-by: Vaibhav Rabber --- Cargo.lock | 72 ++++++++++++++++++++- Cargo.toml | 2 + src/main.rs | 170 ++++++++++++++++++++++++++++++-------------------- src/stream.rs | 12 +--- 4 files changed, 177 insertions(+), 79 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 14ad4fe..c598cb7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -397,6 +397,19 @@ dependencies = [ "yaml-rust2", ] +[[package]] +name = "console" +version = "0.15.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0e1f83fc076bd6dd27517eacdf25fef6c4dfe5f1d7448bafaaf3a26f13b5e4eb" +dependencies = [ + "encode_unicode", + "lazy_static", + "libc", + "unicode-width 0.1.14", + "windows-sys 0.52.0", +] + [[package]] name = "const-random" version = "0.1.18" @@ -451,6 +464,12 @@ dependencies = [ "typenum", ] +[[package]] +name = "defer" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "930c7171c8df9fb1782bdf9b918ed9ed2d33d1d22300abb754f9085bc48bf8e8" + [[package]] name = "digest" version = "0.10.7" @@ -497,6 +516,12 @@ version = "1.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "60b1af1c220855b6ceac025d3f6ecdd2b7c4894bfe9cd9bda4fbb4bc7c0d4cf0" +[[package]] +name = "encode_unicode" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a357d28ed41a50f9c765dbfe56cbc04a64e53e5fc58ba79fbc34c10ef3df831f" + [[package]] name = "encoding_rs" version = "0.8.35" @@ -855,6 +880,19 @@ dependencies = [ "hashbrown 0.15.1", ] +[[package]] +name = "indicatif" +version = "0.17.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cbf675b85ed934d3c67b5c5469701eec7db22689d0a2139d856e0925fa28b281" +dependencies = [ + "console", + "number_prefix", + "portable-atomic", + "unicode-width 0.2.0", + "web-time", +] + [[package]] name = "is_ci" version = "1.2.0" @@ -984,7 +1022,7 @@ dependencies = [ "terminal_size", "textwrap", "thiserror", - "unicode-width", + "unicode-width 0.1.14", ] [[package]] @@ -1057,6 +1095,12 @@ dependencies = [ "winapi", ] +[[package]] +name = "number_prefix" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "830b246a0e5f20af87141b25c173cd1b609bd7779a4617d6ec582abaf90870f3" + [[package]] name = "object" version = "0.36.5" @@ -1222,6 +1266,12 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" +[[package]] +name = "portable-atomic" +version = "1.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "280dc24453071f1b63954171985a0b0d30058d287960968b9b2aca264c8d4ee6" + [[package]] name = "ppv-lite86" version = "0.2.20" @@ -1525,8 +1575,10 @@ dependencies = [ "color-print", "colored", "config", + "defer", "dirs", "humantime", + "indicatif", "miette", "pin-project-lite", "pin-utils", @@ -1796,7 +1848,7 @@ checksum = "23d434d3f8967a09480fb04132ebe0a3e088c173e6d0ee7897abbdf4eab0f8b9" dependencies = [ "smawk", "unicode-linebreak", - "unicode-width", + "unicode-width 0.1.14", ] [[package]] @@ -2154,6 +2206,12 @@ version = "0.1.14" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7dd6e30e90baa6f72411720665d41d89b9a3d039dc45b8faea1ddd07f617f6af" +[[package]] +name = "unicode-width" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1fc81956842c57dac11422a97c3b8195a1ff727f06e85c84ed2e8aa277c9a0fd" + [[package]] name = "untrusted" version = "0.9.0" @@ -2258,6 +2316,16 @@ version = "0.2.95" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "65fc09f10666a9f147042251e0dda9c18f166ff7de300607007e96bdebc1068d" +[[package]] +name = "web-time" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a6580f308b1fad9207618087a65c04e7a10bc77e02c8e84e9b00dd4b12fa0bb" +dependencies = [ + "js-sys", + "wasm-bindgen", +] + [[package]] name = "webpki-roots" version = "0.26.6" diff --git a/Cargo.toml b/Cargo.toml index c1c23d7..7c712c2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,8 +11,10 @@ clap = { version = "4.5.20", features = ["derive"] } color-print = "0.3.6" colored = "2.1.0" config = "0.14.1" +defer = "0.2.1" dirs = "5.0.1" humantime = "2.1.0" +indicatif = "0.17.9" miette = { version = "7.2.0", features = ["fancy"] } pin-project-lite = "0.2" pin-utils = "0.1.0" diff --git a/src/main.rs b/src/main.rs index c286242..20afda3 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,5 +1,4 @@ use std::{ - num::NonZeroU32, path::PathBuf, time::{Duration, UNIX_EPOCH}, }; @@ -9,13 +8,16 @@ use basin::BasinService; use clap::{builder::styling, Parser, Subcommand}; use colored::*; use config::{config_path, create_config}; +use defer::defer; use error::{S2CliError, ServiceError, ServiceErrorContext}; -use rand::Rng; +use indicatif::{MultiProgress, ProgressBar, ProgressDrawTarget}; +use rand::{distributions::Alphanumeric, Rng}; use stream::{RecordStream, StreamService}; use streamstore::{ + batching::AppendRecordsBatchingOpts, client::{BasinClient, Client, ClientConfig, S2Endpoints, StreamClient}, types::{ - AppendRecord, AppendRecordBatch, BasinInfo, BasinName, CommandRecord, ConvertError, + AppendOutput, AppendRecord, BasinInfo, BasinName, CommandRecord, ConvertError, FencingToken, MeteredBytes as _, ReadOutput, SequencedRecordBatch, StreamInfo, }, HeaderValue, @@ -300,13 +302,11 @@ enum Commands { /// Name of the stream. stream: String, - /// Number of records. - #[arg(short = 'n', long, default_value = "1000")] - record_count: NonZeroU32, - - /// Size of the record in bytes. - #[arg(short = 'b', long, default_value_t = 16 * 1024)] - record_bytes: u64, + /// Bytes to send. + /// + /// Will be truncated to a maximum of 100 MiB. + #[arg(short = 'b', long, default_value_t = 100 * 1024 * 1024)] + total_bytes: u64, }, } @@ -686,7 +686,12 @@ async fn run() -> Result<(), S2CliError> { ); let mut append_output_stream = StreamService::new(stream_client) - .append_session(append_input_stream, fencing_token, match_seq_num, None) + .append_session( + append_input_stream, + AppendRecordsBatchingOpts::new() + .with_fencing_token(fencing_token) + .with_match_seq_num(match_seq_num), + ) .await?; loop { select! { @@ -854,20 +859,87 @@ async fn run() -> Result<(), S2CliError> { Commands::Pingtest { basin, stream, - record_count, - record_bytes, + total_bytes, } => { - let record_count = record_count.get() as usize; + const RECORD_BYTES: u64 = 2 * 1024; + const RECORD_COUNT_IN_BATCH: usize = 10; + + let total_bytes = total_bytes.min(100 * 1024 * 1024); + + let record_count = (total_bytes as f64 / RECORD_BYTES as f64).ceil() as usize; let cfg = config::load_config(&config_path)?; let client_config = client_config(cfg.auth_token)?; let stream_client = StreamService::new(StreamClient::new(client_config, basin, stream)); - let tail = stream_client.check_tail().await?; + let progress_bar = MultiProgress::with_draw_target(ProgressDrawTarget::stderr()); + + let sends_progress_bar = progress_bar.add(ProgressBar::new(record_count as u64)); + let appends_progress_bar = progress_bar.add(ProgressBar::new(record_count as u64)); + let reads_progress_bar = progress_bar.add(ProgressBar::new(record_count as u64)); + + let mut tail = stream_client.check_tail().await?; + + let (tx, rx) = mpsc::channel(RECORD_COUNT_IN_BATCH); + + let append_stream = tokio_stream::wrappers::ReceiverStream::new(rx); + + let mut append_stream = stream_client + .append_session( + append_stream, + AppendRecordsBatchingOpts::new() + .with_max_batch_records(RECORD_COUNT_IN_BATCH) + .with_linger(Duration::from_millis(1)) + .with_match_seq_num(Some(tail)), + ) + .await?; + + // Send in a "warm up" which we're going to ignore. + tx.send(AppendRecord::new("warm up").expect("valid record")) + .await + .expect("channel open"); + + match append_stream.next().await.expect("stream should receive") { + Ok(AppendOutput { next_seq_num, .. }) => { + // Update the tail so we start reading after warm up batch. + tail = next_seq_num; + } + Err(e) => { + return Err(ServiceError::new(ServiceErrorContext::AppendSession, e).into()) + } + }; + + let appends_handle = tokio::spawn(async move { + defer!({ appends_progress_bar.finish() }); + + let mut appends = Vec::with_capacity(record_count); + + while let Some(next) = append_stream.next().await { + match next { + Ok(AppendOutput { + start_seq_num, + end_seq_num, + .. + }) => { + let append = Instant::now(); + let records = end_seq_num - start_seq_num; + appends.extend(std::iter::repeat_n(append, records as usize)); + appends_progress_bar.inc(records); + } + Err(e) => { + return Err(ServiceError::new(ServiceErrorContext::AppendSession, e)) + } + } + } + + Ok(appends) + }); let mut read_stream = stream_client.read_session(tail, None, None).await?; let reads_handle = tokio::spawn(async move { + defer!({ reads_progress_bar.finish() }); + let mut reads = Vec::with_capacity(record_count); while let Some(next) = read_stream.next().await { @@ -880,9 +952,8 @@ async fn run() -> Result<(), S2CliError> { Ok(output) => { if let ReadOutput::Batch(SequencedRecordBatch { records }) = output { let recv = Instant::now(); - for record in records { - reads.push((recv, record)); - } + reads.extend(std::iter::repeat_n(recv, records.len())); + reads_progress_bar.inc(records.len() as u64); } else { return Err(S2CliError::PingtestStreamMutated); } @@ -897,30 +968,9 @@ async fn run() -> Result<(), S2CliError> { Ok(reads) }); - let (tx, rx) = mpsc::channel(1); - - let append_stream = tokio_stream::wrappers::ReceiverStream::new(rx); - - let mut append_stream = stream_client - .append_session(append_stream, None, None, Some(1)) - .await?; - - let appends_handle = tokio::spawn(async move { - let mut appends = Vec::with_capacity(record_count); - - while let Some(next) = append_stream.next().await { - appends.push(Instant::now()); - - if let Err(e) = next { - return Err(ServiceError::new(ServiceErrorContext::AppendSession, e)); - } - } - - Ok(appends) - }); - let mut sends = Vec::with_capacity(record_count); + // Send in an extra batch for warm-up, which is going to be ignored. for _ in 0..record_count { let jitter_op = if rand::random() { u64::saturating_add @@ -928,40 +978,30 @@ async fn run() -> Result<(), S2CliError> { u64::saturating_sub }; - let record_bytes = jitter_op(record_bytes, rand::thread_rng().gen_range(0..=10)) - .min(AppendRecordBatch::MAX_BYTES); + let record_bytes = jitter_op(RECORD_BYTES, rand::thread_rng().gen_range(0..=10)); - let body: String = std::iter::repeat_n('a', record_bytes as usize).collect(); + let body = rand::thread_rng() + .sample_iter(&Alphanumeric) + .take(record_bytes as usize) + .collect::>(); let rec = AppendRecord::new(body).expect("pre validated append record bytes"); if tx.send(rec).await.is_ok() { sends.push(Instant::now()); + sends_progress_bar.inc(1); } else { // Receiver closed. break; } } + sends_progress_bar.finish(); + // Close the stream. std::mem::drop(tx); let reads = reads_handle.await.expect("reads task panic")?; - let (reads, records): (Vec<_>, Vec<_>) = reads.into_iter().unzip(); - - // Verify records - // - // Doing this later on since bigger batch sizes might cause huge - // deflection in actual latencies. - for record in records { - if !record.headers.is_empty() { - return Err(S2CliError::PingtestStreamMutated); - } - - if record.body.iter().any(|b| *b != b'a') { - return Err(S2CliError::PingtestStreamMutated); - } - } let appends = appends_handle.await.expect("appends task panic")?; @@ -971,9 +1011,9 @@ async fn run() -> Result<(), S2CliError> { .map(|(a, s)| *a - *s) .collect::>(); - LatencyStatsReport::generate(ack).print("Append acknowledgement"); + eprintln!(/* Empty line */); - eprintln!(); // Empty line + LatencyStatsReport::generate(ack).print("Append acknowledgement"); let e2e = reads .iter() @@ -981,6 +1021,8 @@ async fn run() -> Result<(), S2CliError> { .map(|(r, s)| *r - *s) .collect::>(); + eprintln!(/* Empty line */); + LatencyStatsReport::generate(e2e).print("End to end"); } }; @@ -1006,7 +1048,7 @@ impl LatencyStatsReport { let mean = data.iter().sum::() / n as u32; - let median = if data.len() / 2 == 0 { + let median = if n % 2 == 0 { (data[n / 2 - 1] + data[n / 2]) / 2 } else { data[n / 2] @@ -1036,11 +1078,7 @@ impl LatencyStatsReport { eprintln!("{}", format!("{name} latency report").yellow().bold()); fn stat(key: &str, val: Duration) { - eprintln!( - "{}\t {}", - key, - humantime::format_duration(val).to_string().green() - ); + eprintln!("{}\t {}", key, format!("{} ms", val.as_millis()).green()); } let LatencyStatsReport { diff --git a/src/stream.rs b/src/stream.rs index 27619a9..023295f 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -96,18 +96,8 @@ impl StreamService { pub async fn append_session( &self, stream: impl 'static + Send + Stream + Unpin, - fencing_token: Option, - match_seq_num: Option, - max_batch_records: Option, + opts: AppendRecordsBatchingOpts, ) -> Result, ServiceError> { - let mut opts = AppendRecordsBatchingOpts::default() - .with_fencing_token(fencing_token) - .with_match_seq_num(match_seq_num); - - if let Some(n) = max_batch_records { - opts = opts.with_max_batch_records(n); - } - let append_record_stream = AppendRecordsBatchingStream::new(stream, opts); self.client From fc1d0d948c11804a28a9baad218b3886f2080868 Mon Sep 17 00:00:00 2001 From: Vaibhav Rabber Date: Mon, 9 Dec 2024 22:37:47 +0530 Subject: [PATCH 08/10] .. Signed-off-by: Vaibhav Rabber --- src/main.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main.rs b/src/main.rs index 20afda3..8113cf8 100644 --- a/src/main.rs +++ b/src/main.rs @@ -11,7 +11,7 @@ use config::{config_path, create_config}; use defer::defer; use error::{S2CliError, ServiceError, ServiceErrorContext}; use indicatif::{MultiProgress, ProgressBar, ProgressDrawTarget}; -use rand::{distributions::Alphanumeric, Rng}; +use rand::{distributions::Uniform, Rng}; use stream::{RecordStream, StreamService}; use streamstore::{ batching::AppendRecordsBatchingOpts, @@ -981,7 +981,7 @@ async fn run() -> Result<(), S2CliError> { let record_bytes = jitter_op(RECORD_BYTES, rand::thread_rng().gen_range(0..=10)); let body = rand::thread_rng() - .sample_iter(&Alphanumeric) + .sample_iter(&Uniform::new_inclusive(0, u8::MAX)) .take(record_bytes as usize) .collect::>(); From 88d1da3caceba705131acc39fb9bfa03987c873e Mon Sep 17 00:00:00 2001 From: Vaibhav Rabber Date: Mon, 9 Dec 2024 23:48:01 +0530 Subject: [PATCH 09/10] style Signed-off-by: Vaibhav Rabber --- src/main.rs | 23 ++++++++++++++++++----- 1 file changed, 18 insertions(+), 5 deletions(-) diff --git a/src/main.rs b/src/main.rs index 8113cf8..1910fe3 100644 --- a/src/main.rs +++ b/src/main.rs @@ -10,7 +10,7 @@ use colored::*; use config::{config_path, create_config}; use defer::defer; use error::{S2CliError, ServiceError, ServiceErrorContext}; -use indicatif::{MultiProgress, ProgressBar, ProgressDrawTarget}; +use indicatif::{MultiProgress, ProgressBar, ProgressDrawTarget, ProgressStyle}; use rand::{distributions::Uniform, Rng}; use stream::{RecordStream, StreamService}; use streamstore::{ @@ -874,9 +874,22 @@ async fn run() -> Result<(), S2CliError> { let progress_bar = MultiProgress::with_draw_target(ProgressDrawTarget::stderr()); - let sends_progress_bar = progress_bar.add(ProgressBar::new(record_count as u64)); - let appends_progress_bar = progress_bar.add(ProgressBar::new(record_count as u64)); - let reads_progress_bar = progress_bar.add(ProgressBar::new(record_count as u64)); + let progress_bar_style = ProgressStyle::default_bar() + .template("{prefix:>10.bold} [{bar:<72.cyan/blue}] {percent:>3}%") + .expect("valid template") + .progress_chars("#>-"); + + let add_progress_bar = |prefix: &'static str| { + progress_bar.add( + ProgressBar::new(record_count as u64) + .with_style(progress_bar_style.clone()) + .with_prefix(prefix), + ) + }; + + let sends_progress_bar = add_progress_bar("Sends"); + let appends_progress_bar = add_progress_bar("Appends"); + let reads_progress_bar = add_progress_bar("Reads"); let mut tail = stream_client.check_tail().await?; @@ -1011,7 +1024,7 @@ async fn run() -> Result<(), S2CliError> { .map(|(a, s)| *a - *s) .collect::>(); - eprintln!(/* Empty line */); + eprintln!("\n"); LatencyStatsReport::generate(ack).print("Append acknowledgement"); From 9b8720a15437310a22b7794646d394f407d92ac4 Mon Sep 17 00:00:00 2001 From: Vaibhav Rabber Date: Tue, 10 Dec 2024 20:33:11 +0530 Subject: [PATCH 10/10] .. Signed-off-by: Vaibhav Rabber --- Cargo.lock | 8 +- Cargo.toml | 2 +- src/error.rs | 4 +- src/main.rs | 264 +++++++++++++++++++++++++++++++++------------------ 4 files changed, 174 insertions(+), 104 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c598cb7..3a8db44 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -464,12 +464,6 @@ dependencies = [ "typenum", ] -[[package]] -name = "defer" -version = "0.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "930c7171c8df9fb1782bdf9b918ed9ed2d33d1d22300abb754f9085bc48bf8e8" - [[package]] name = "digest" version = "0.10.7" @@ -1575,7 +1569,6 @@ dependencies = [ "color-print", "colored", "config", - "defer", "dirs", "humantime", "indicatif", @@ -1583,6 +1576,7 @@ dependencies = [ "pin-project-lite", "pin-utils", "rand", + "scopeguard", "serde", "serde_json", "streamstore", diff --git a/Cargo.toml b/Cargo.toml index 7c712c2..f5de333 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,7 +11,6 @@ clap = { version = "4.5.20", features = ["derive"] } color-print = "0.3.6" colored = "2.1.0" config = "0.14.1" -defer = "0.2.1" dirs = "5.0.1" humantime = "2.1.0" indicatif = "0.17.9" @@ -19,6 +18,7 @@ miette = { version = "7.2.0", features = ["fancy"] } pin-project-lite = "0.2" pin-utils = "0.1.0" rand = "0.8.5" +scopeguard = "1.2.0" serde = { version = "1.0.214", features = ["derive"] } serde_json = "1.0.132" thiserror = "1.0.67" diff --git a/src/error.rs b/src/error.rs index c839dc9..15ef00b 100644 --- a/src/error.rs +++ b/src/error.rs @@ -43,8 +43,8 @@ pub enum S2CliError { #[error("Failed to initialize a `Record Reader`! {0}")] RecordReaderInit(String), - #[error("Stream mutated concurrently during pingtest")] - PingtestStreamMutated, + #[error("Stream mutated concurrently during speedtest")] + SpeedtestStreamMutated, #[error("Failed to write records: {0}")] RecordWrite(String), diff --git a/src/main.rs b/src/main.rs index 1910fe3..68d78cd 100644 --- a/src/main.rs +++ b/src/main.rs @@ -8,17 +8,20 @@ use basin::BasinService; use clap::{builder::styling, Parser, Subcommand}; use colored::*; use config::{config_path, create_config}; -use defer::defer; use error::{S2CliError, ServiceError, ServiceErrorContext}; use indicatif::{MultiProgress, ProgressBar, ProgressDrawTarget, ProgressStyle}; -use rand::{distributions::Uniform, Rng}; +use rand::{ + distributions::{Alphanumeric, Uniform}, + Rng, +}; +use scopeguard::defer; use stream::{RecordStream, StreamService}; use streamstore::{ batching::AppendRecordsBatchingOpts, client::{BasinClient, Client, ClientConfig, S2Endpoints, StreamClient}, types::{ AppendOutput, AppendRecord, BasinInfo, BasinName, CommandRecord, ConvertError, - FencingToken, MeteredBytes as _, ReadOutput, SequencedRecordBatch, StreamInfo, + FencingToken, Header, MeteredBytes as _, ReadOutput, SequencedRecordBatch, StreamInfo, }, HeaderValue, }; @@ -294,8 +297,8 @@ enum Commands { limit_bytes: Option, }, - /// Pingtest - Pingtest { + /// Run a speed test. + Speedtest { /// Name of the basin. basin: BasinName, @@ -856,7 +859,7 @@ async fn run() -> Result<(), S2CliError> { } } - Commands::Pingtest { + Commands::Speedtest { basin, stream, total_bytes, @@ -864,10 +867,15 @@ async fn run() -> Result<(), S2CliError> { const RECORD_BYTES: u64 = 2 * 1024; const RECORD_COUNT_IN_BATCH: usize = 10; + const WARM_UP_BATCH_BODY: &str = "warm up"; + const RECORD_ID_HEADER: &[u8] = b"record-id"; + let total_bytes = total_bytes.min(100 * 1024 * 1024); let record_count = (total_bytes as f64 / RECORD_BYTES as f64).ceil() as usize; + let total_bytes = record_count as u64 * RECORD_BYTES; + let cfg = config::load_config(&config_path)?; let client_config = client_config(cfg.auth_token)?; let stream_client = StreamService::new(StreamClient::new(client_config, basin, stream)); @@ -887,11 +895,115 @@ async fn run() -> Result<(), S2CliError> { ) }; + let prepare_progress_bar = add_progress_bar("Prepare"); let sends_progress_bar = add_progress_bar("Sends"); let appends_progress_bar = add_progress_bar("Appends"); let reads_progress_bar = add_progress_bar("Reads"); - let mut tail = stream_client.check_tail().await?; + let (records, record_ids): (Vec<_>, Vec<_>) = (0..record_count) + .map(|_| { + let jitter_op = if rand::random() { + u64::saturating_add + } else { + u64::saturating_sub + }; + + let record_bytes = + jitter_op(RECORD_BYTES, rand::thread_rng().gen_range(0..=10)); + + let body = rand::thread_rng() + .sample_iter(&Uniform::new_inclusive(0, u8::MAX)) + .take(record_bytes as usize) + .collect::>(); + + let rec_id = rand::thread_rng() + .sample_iter(&Alphanumeric) + .take(16) + .collect::>(); + + let rec = AppendRecord::new(body) + .expect("pre validated append record bytes") + .with_headers(vec![Header::new(RECORD_ID_HEADER, rec_id.clone())]) + .expect("pre validated append record header"); + + prepare_progress_bar.inc(1); + + (rec, rec_id) + }) + .unzip(); + + let tail = stream_client.check_tail().await?; + + prepare_progress_bar.finish(); + + let mut read_stream = stream_client.read_session(tail, None, None).await?; + + let reads_handle = tokio::spawn(async move { + defer!(reads_progress_bar.finish()); + + let mut reads = Vec::with_capacity(record_count); + + let mut received_warmup = false; + + let mut reads_start = Instant::now(); + let mut record_ids = record_ids.into_iter(); + + while let Some(next) = read_stream.next().await { + match next { + Err(e) => { + return Err( + ServiceError::new(ServiceErrorContext::ReadSession, e).into() + ); + } + Ok(output) => { + if let ReadOutput::Batch(SequencedRecordBatch { records }) = output { + let recv = Instant::now(); + let records = if !received_warmup { + // First batch should be "warm up" + let first = records.first().expect("empty batch"); + if first.body.as_ref() != WARM_UP_BATCH_BODY.as_bytes() + || first.seq_num != tail + { + return Err(S2CliError::SpeedtestStreamMutated); + } + received_warmup = true; + // Start read now. + reads_start = recv; + &records[1..] + } else { + &records + }; + // Validate records + for rec in records { + if rec.headers.len() != 1 { + return Err(S2CliError::SpeedtestStreamMutated); + } + let header = rec.headers.first().expect("validated length"); + if header.name.as_ref() != RECORD_ID_HEADER { + return Err(S2CliError::SpeedtestStreamMutated); + } + let rec_id = record_ids + .next() + .ok_or(S2CliError::SpeedtestStreamMutated)?; + if header.value.as_ref() != rec_id.as_slice() { + return Err(S2CliError::SpeedtestStreamMutated); + } + } + reads.extend(std::iter::repeat_n(recv, records.len())); + reads_progress_bar.inc(records.len() as u64); + } else { + return Err(S2CliError::SpeedtestStreamMutated); + } + } + } + + if reads.len() >= record_count { + break; + } + } + + Ok((reads_start, reads)) + }); let (tx, rx) = mpsc::channel(RECORD_COUNT_IN_BATCH); @@ -908,25 +1020,25 @@ async fn run() -> Result<(), S2CliError> { .await?; // Send in a "warm up" which we're going to ignore. - tx.send(AppendRecord::new("warm up").expect("valid record")) + tx.send(AppendRecord::new(WARM_UP_BATCH_BODY).expect("valid record")) .await .expect("channel open"); match append_stream.next().await.expect("stream should receive") { - Ok(AppendOutput { next_seq_num, .. }) => { - // Update the tail so we start reading after warm up batch. - tail = next_seq_num; - } + Ok(AppendOutput { start_seq_num, .. }) if start_seq_num == tail => (), + Ok(_) => return Err(S2CliError::SpeedtestStreamMutated), Err(e) => { return Err(ServiceError::new(ServiceErrorContext::AppendSession, e).into()) } }; let appends_handle = tokio::spawn(async move { - defer!({ appends_progress_bar.finish() }); + defer!(appends_progress_bar.finish()); let mut appends = Vec::with_capacity(record_count); + let appends_start = Instant::now(); + while let Some(next) = append_stream.next().await { match next { Ok(AppendOutput { @@ -945,61 +1057,13 @@ async fn run() -> Result<(), S2CliError> { } } - Ok(appends) - }); - - let mut read_stream = stream_client.read_session(tail, None, None).await?; - - let reads_handle = tokio::spawn(async move { - defer!({ reads_progress_bar.finish() }); - - let mut reads = Vec::with_capacity(record_count); - - while let Some(next) = read_stream.next().await { - match next { - Err(e) => { - return Err( - ServiceError::new(ServiceErrorContext::ReadSession, e).into() - ); - } - Ok(output) => { - if let ReadOutput::Batch(SequencedRecordBatch { records }) = output { - let recv = Instant::now(); - reads.extend(std::iter::repeat_n(recv, records.len())); - reads_progress_bar.inc(records.len() as u64); - } else { - return Err(S2CliError::PingtestStreamMutated); - } - } - } - - if reads.len() >= record_count { - break; - } - } - - Ok(reads) + Ok((appends_start, appends)) }); let mut sends = Vec::with_capacity(record_count); // Send in an extra batch for warm-up, which is going to be ignored. - for _ in 0..record_count { - let jitter_op = if rand::random() { - u64::saturating_add - } else { - u64::saturating_sub - }; - - let record_bytes = jitter_op(RECORD_BYTES, rand::thread_rng().gen_range(0..=10)); - - let body = rand::thread_rng() - .sample_iter(&Uniform::new_inclusive(0, u8::MAX)) - .take(record_bytes as usize) - .collect::>(); - - let rec = AppendRecord::new(body).expect("pre validated append record bytes"); - + for rec in records { if tx.send(rec).await.is_ok() { sends.push(Instant::now()); sends_progress_bar.inc(1); @@ -1014,36 +1078,24 @@ async fn run() -> Result<(), S2CliError> { // Close the stream. std::mem::drop(tx); - let reads = reads_handle.await.expect("reads task panic")?; - - let appends = appends_handle.await.expect("appends task panic")?; - - let ack = appends - .iter() - .zip(sends.iter()) - .map(|(a, s)| *a - *s) - .collect::>(); + let (reads_start, reads) = reads_handle.await.expect("reads task panic")?; + let (appends_start, appends) = appends_handle.await.expect("appends task panic")?; eprintln!("\n"); - LatencyStatsReport::generate(ack).print("Append acknowledgement"); - - let e2e = reads - .iter() - .zip(sends.iter()) - .map(|(r, s)| *r - *s) - .collect::>(); + StatsReport::generate(&sends, appends_start, &appends, total_bytes) + .print("Append acknowledgement"); eprintln!(/* Empty line */); - LatencyStatsReport::generate(e2e).print("End to end"); + StatsReport::generate(&sends, reads_start, &reads, total_bytes).print("End to end"); } }; std::process::exit(0); } -struct LatencyStatsReport { +struct StatsReport { pub mean: Duration, pub median: Duration, pub p95: Duration, @@ -1051,10 +1103,25 @@ struct LatencyStatsReport { pub max: Duration, pub min: Duration, pub stddev: Duration, + // In bytes per second + pub throughput: u128, } -impl LatencyStatsReport { - pub fn generate(mut data: Vec) -> Self { +impl StatsReport { + pub fn generate( + sends: &[Instant], + op_start: Instant, + op: &[Instant], + total_bytes: u64, + ) -> Self { + let op_duration = *op.last().unwrap() - op_start; + let throughput = total_bytes as u128 * 1_000_000 / op_duration.as_micros(); + + let mut data = op + .iter() + .zip(sends.iter()) + .map(|(o, s)| *o - *s) + .collect::>(); data.sort_unstable(); let n = data.len(); @@ -1084,17 +1151,22 @@ impl LatencyStatsReport { max: data[n - 1], min: data[0], stddev, + throughput, } } pub fn print(self, name: &str) { - eprintln!("{}", format!("{name} latency report").yellow().bold()); + eprintln!("{}", format!("{name} report").yellow().bold()); - fn stat(key: &str, val: Duration) { - eprintln!("{}\t {}", key, format!("{} ms", val.as_millis()).green()); + fn stat(key: &str, val: String) { + eprintln!("{:>12} {}", key, val.green()); } - let LatencyStatsReport { + fn stat_duration(key: &str, val: Duration) { + stat(key, format!("{} ms", val.as_millis())); + } + + let StatsReport { mean, median, p95, @@ -1102,14 +1174,18 @@ impl LatencyStatsReport { max, min, stddev, + throughput, } = self; - stat("Mean", mean); - stat("Median", median); - stat("P95", p95); - stat("P99", p99); - stat("Max", max); - stat("Min", min); - stat("Std Dev", stddev); + stat_duration("Mean", mean); + stat_duration("Median", median); + stat_duration("P95", p95); + stat_duration("P99", p99); + stat_duration("Max", max); + stat_duration("Min", min); + stat_duration("Std Dev", stddev); + + let mibps = throughput as f64 / (1024.0 * 1024.0); + stat("Throughput", format!("{mibps:.2} MiB/s")); } }