diff --git a/Cargo.lock b/Cargo.lock index 12056e7..d59a541 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1634,6 +1634,7 @@ dependencies = [ "miette", "pin-project-lite", "pin-utils", + "rand", "serde", "serde_json", "streamstore", diff --git a/Cargo.toml b/Cargo.toml index ad2f154..f5cb5f7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,6 +19,7 @@ humantime = "2.1.0" miette = { version = "7.2.0", features = ["fancy"] } pin-project-lite = "0.2" pin-utils = "0.1.0" +rand = "0.8.5" serde = { version = "1.0.214", features = ["derive"] } serde_json = "1.0.132" streamstore = "0.3.0" diff --git a/src/error.rs b/src/error.rs index ddf9b49..1e7beec 100644 --- a/src/error.rs +++ b/src/error.rs @@ -43,6 +43,9 @@ pub enum S2CliError { #[error("Failed to initialize a `Record Reader`! {0}")] RecordReaderInit(String), + #[error("Stream mutated concurrently during ping")] + PingStreamMutated, + #[error("Failed to write records: {0}")] RecordWrite(String), @@ -73,21 +76,21 @@ pub enum ServiceErrorContext { impl std::fmt::Display for ServiceErrorContext { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { - ServiceErrorContext::ListBasins => write!(f, "Failed to list basins"), - ServiceErrorContext::CreateBasin => write!(f, "Failed to create basin"), - ServiceErrorContext::DeleteBasin => write!(f, "Failed to delete basin"), - ServiceErrorContext::GetBasinConfig => write!(f, "Failed to get basin config"), - ServiceErrorContext::ReconfigureBasin => write!(f, "Failed to reconfigure basin"), - ServiceErrorContext::ListStreams => write!(f, "Failed to list streams"), - ServiceErrorContext::CreateStream => write!(f, "Failed to create stream"), - ServiceErrorContext::DeleteStream => write!(f, "Failed to delete stream"), - ServiceErrorContext::GetStreamConfig => write!(f, "Failed to get stream config"), - ServiceErrorContext::CheckTail => write!(f, "Failed to check tail"), - ServiceErrorContext::Trim => write!(f, "Failed to trim"), - ServiceErrorContext::Fence => write!(f, "Failed to set fencing token"), - ServiceErrorContext::AppendSession => write!(f, "Failed to append session"), - ServiceErrorContext::ReadSession => write!(f, "Failed to read session"), - ServiceErrorContext::ReconfigureStream => write!(f, "Failed to reconfigure stream"), + Self::ListBasins => write!(f, "Failed to list basins"), + Self::CreateBasin => write!(f, "Failed to create basin"), + Self::DeleteBasin => write!(f, "Failed to delete basin"), + Self::GetBasinConfig => write!(f, "Failed to get basin config"), + Self::ReconfigureBasin => write!(f, "Failed to reconfigure basin"), + Self::ListStreams => write!(f, "Failed to list streams"), + Self::CreateStream => write!(f, "Failed to create stream"), + Self::DeleteStream => write!(f, "Failed to delete stream"), + Self::GetStreamConfig => write!(f, "Failed to get stream config"), + Self::CheckTail => write!(f, "Failed to check tail"), + Self::Trim => write!(f, "Failed to trim"), + Self::Fence => write!(f, "Failed to set fencing token"), + Self::AppendSession => write!(f, "Failed to append session"), + Self::ReadSession => write!(f, "Failed to read session"), + Self::ReconfigureStream => write!(f, "Failed to reconfigure stream"), } } } diff --git a/src/main.rs b/src/main.rs index 6a5b898..0af6a07 100644 --- a/src/main.rs +++ b/src/main.rs @@ -9,8 +9,11 @@ use clap::{builder::styling, Parser, Subcommand}; use colored::*; use config::{config_path, create_config}; use error::{S2CliError, ServiceError, ServiceErrorContext}; +use ping::{LatencyStats, PingResult, Pinger}; +use rand::Rng; use stream::{RecordStream, StreamService}; use streamstore::{ + batching::AppendRecordsBatchingOpts, client::{BasinClient, Client, ClientConfig, S2Endpoints, StreamClient}, types::{ BasinInfo, BasinName, CommandRecord, ConvertError, FencingToken, MeteredBytes as _, @@ -36,6 +39,7 @@ mod stream; mod config; mod error; +mod ping; mod types; const STYLES: styling::Styles = styling::Styles::styled() @@ -289,6 +293,31 @@ enum Commands { #[arg(short = 'b', long)] limit_bytes: Option, }, + + /// Ping the stream to get append acknowledgement and end-to-end latencies. + Ping { + /// Name of the basin. + basin: BasinName, + + /// Name of the stream. + stream: String, + + /// Send a batch after this interval. + /// + /// Will be set to a minimum of 100ms. + #[arg(short = 'i', long, default_value = "500ms")] + interval: humantime::Duration, + + /// Batch size in bytes. A jitter (+/- 25%) will be added. + /// + /// Truncated to a maximum of 128 KiB. + #[arg(short = 'b', long, default_value_t = 32 * 1024)] + batch_bytes: u64, + + /// Stop after sending this number of batches. + #[arg(short = 'n', long)] + num_batches: Option, + }, } #[derive(Subcommand, Debug)] @@ -369,7 +398,8 @@ fn client_config(auth_token: String) -> Result { let endpoints = S2Endpoints::from_env().map_err(S2CliError::EndpointsFromEnv)?; let client_config = ClientConfig::new(auth_token.to_string()) .with_user_agent("s2-cli".parse::().expect("valid user agent")) - .with_endpoints(endpoints); + .with_endpoints(endpoints) + .with_request_timeout(Duration::from_secs(30)); Ok(client_config) } @@ -666,8 +696,14 @@ async fn run() -> Result<(), S2CliError> { ); let mut append_output_stream = StreamService::new(stream_client) - .append_session(append_input_stream, fencing_token, match_seq_num) + .append_session( + append_input_stream, + AppendRecordsBatchingOpts::new() + .with_fencing_token(fencing_token) + .with_match_seq_num(match_seq_num), + ) .await?; + loop { select! { maybe_append_result = append_output_stream.next() => { @@ -697,10 +733,10 @@ async fn run() -> Result<(), S2CliError> { } _ = signal::ctrl_c() => { - drop(append_output_stream); - eprintln!("{}", "■ [ABORTED]".red().bold()); - break; - } + drop(append_output_stream); + eprintln!("{}", "■ [ABORTED]".red().bold()); + break; + } } } } @@ -729,9 +765,9 @@ async fn run() -> Result<(), S2CliError> { maybe_read_result = read_output_stream.next() => { match maybe_read_result { Some(read_result) => { - if start.is_none() { - start = Some(Instant::now()); - } + if start.is_none() { + start = Some(Instant::now()); + } match read_result { Ok(ReadOutput::Batch(sequenced_record_batch)) => { let num_records = sequenced_record_batch.records.len(); @@ -830,6 +866,120 @@ async fn run() -> Result<(), S2CliError> { writer.flush().await.expect("writer flush"); } } - } + + Commands::Ping { + basin, + stream, + interval, + batch_bytes, + num_batches, + } => { + let cfg = config::load_config(&config_path)?; + let client_config = client_config(cfg.auth_token)?; + let stream_client = StreamService::new(StreamClient::new(client_config, basin, stream)); + + let interval = interval.max(Duration::from_millis(100)); + let batch_bytes = batch_bytes.min(128 * 1024); + + eprintln!("Preparing..."); + + let mut pinger = Pinger::init(&stream_client).await?; + + let mut pings = Vec::new(); + + async fn ping_next( + pinger: &mut Pinger, + pings: &mut Vec, + interval: Duration, + batch_bytes: u64, + ) -> Result<(), S2CliError> { + let jitter_op = if rand::random() { + u64::saturating_add + } else { + u64::saturating_sub + }; + + let max_jitter = batch_bytes / 4; + + let record_bytes = + jitter_op(batch_bytes, rand::thread_rng().gen_range(0..=max_jitter)); + + let Some(res) = pinger.ping(record_bytes).await? else { + return Ok(()); + }; + + eprintln!( + "{:<5} bytes: ack = {:<7} e2e = {:<7}", + res.bytes.to_string().blue(), + format!("{} ms", res.ack.as_millis()).blue(), + format!("{} ms", res.e2e.as_millis()).blue(), + ); + + pings.push(res); + + tokio::time::sleep(interval).await; + Ok(()) + } + + while Some(pings.len()) != num_batches { + select! { + _ = ping_next(&mut pinger, &mut pings, interval, batch_bytes) => (), + _ = signal::ctrl_c() => break, + } + } + + // Close the pinger. + std::mem::drop(pinger); + + let total_batches = pings.len(); + let (bytes, (acks, e2es)): (Vec<_>, (Vec<_>, Vec<_>)) = pings + .into_iter() + .map(|PingResult { bytes, ack, e2e }| (bytes, (ack, e2e))) + .unzip(); + let total_bytes = bytes.into_iter().sum::(); + + eprintln!(/* Empty line */); + eprintln!("Round-tripped {total_bytes} bytes in {total_batches} batches"); + + pub fn print_stats(stats: LatencyStats, name: &str) { + eprintln!( + "{:-^60}", + format!(" {name} Latency Statistics ").yellow().bold() + ); + + fn stat(key: &str, val: String) { + eprintln!("{:>9} {}", key, val.green()); + } + + fn stat_duration(key: &str, val: Duration) { + stat(key, format!("{} ms", val.as_millis())); + } + + let LatencyStats { + mean, + median, + p95, + p99, + max, + min, + stddev, + } = stats; + + stat_duration("Mean", mean); + stat_duration("Median", median); + stat_duration("P95", p95); + stat_duration("P99", p99); + stat_duration("Max", max); + stat_duration("Min", min); + stat_duration("Std Dev", stddev); + } + + eprintln!(/* Empty line */); + print_stats(LatencyStats::generate(acks), "Append Acknowledgement"); + eprintln!(/* Empty line */); + print_stats(LatencyStats::generate(e2es), "End-to-End"); + } + }; + std::process::exit(0); } diff --git a/src/ping.rs b/src/ping.rs new file mode 100644 index 0000000..f1344fa --- /dev/null +++ b/src/ping.rs @@ -0,0 +1,235 @@ +use std::time::Duration; + +use rand::{distributions::Uniform, Rng}; +use streamstore::{ + batching::AppendRecordsBatchingOpts, + types::{AppendOutput, AppendRecord, ReadOutput, SequencedRecord, SequencedRecordBatch}, +}; +use tokio::{join, select, signal, sync::mpsc, task::JoinHandle, time::Instant}; +use tokio_stream::StreamExt; + +use crate::{ + error::{S2CliError, ServiceError, ServiceErrorContext}, + stream::StreamService, +}; + +pub struct PingResult { + pub bytes: u64, + pub ack: Duration, + pub e2e: Duration, +} + +pub struct Pinger { + records_tx: mpsc::UnboundedSender, + appends_handle: JoinHandle<()>, + reads_handle: JoinHandle<()>, + appends_rx: mpsc::UnboundedReceiver>, + reads_rx: mpsc::UnboundedReceiver>, +} + +impl Pinger { + pub async fn init(stream_client: &StreamService) -> Result { + let tail = stream_client.check_tail().await?; + + let mut read_stream = stream_client.read_session(tail, None, None).await?; + + let (records_tx, records_rx) = mpsc::unbounded_channel(); + let mut append_stream = stream_client + .append_session( + tokio_stream::wrappers::UnboundedReceiverStream::new(records_rx), + AppendRecordsBatchingOpts::new() + .with_max_batch_records(1) + .with_match_seq_num(Some(tail)), + ) + .await?; + + let warmup_record = AppendRecord::new("warmup").expect("valid record"); + records_tx + .send(warmup_record.clone()) + .expect("stream channel open"); + + match append_stream.next().await.expect("warmup batch ack") { + Ok(AppendOutput { start_seq_num, .. }) if start_seq_num == tail => (), + Ok(_) => return Err(S2CliError::PingStreamMutated), + Err(e) => return Err(ServiceError::new(ServiceErrorContext::AppendSession, e).into()), + }; + + match read_stream.next().await.expect("warmup batch e2e") { + Ok(ReadOutput::Batch(SequencedRecordBatch { records })) + if records.len() == 1 + && records[0].headers.is_empty() + && records[0].body.as_ref() == warmup_record.body() => {} + Ok(_) => return Err(S2CliError::PingStreamMutated), + Err(e) => return Err(ServiceError::new(ServiceErrorContext::ReadSession, e).into()), + }; + + let (reads_tx, reads_rx) = mpsc::unbounded_channel(); + let reads_handle = tokio::spawn(async move { + loop { + select! { + next = read_stream.next() => match next { + Some(Err(e)) => { + reads_tx.send(Err( + ServiceError::new(ServiceErrorContext::ReadSession, e).into() + )).expect("open reads channel"); + return; + } + Some(Ok(output)) => { + if let ReadOutput::Batch(SequencedRecordBatch { mut records }) = output { + let read = Instant::now(); + if records.len() != 1 { + reads_tx.send(Err( + S2CliError::PingStreamMutated + )).expect("reads channel open"); + return; + } + let record = records.pop().expect("pre validated length"); + reads_tx.send(Ok((read, record))).expect("reads channel open"); + } else { + reads_tx.send(Err( + S2CliError::PingStreamMutated + )).expect("reads channel open"); + return; + } + } + None => break, + }, + _ = signal::ctrl_c() => break, + }; + } + }); + + let (appends_tx, appends_rx) = mpsc::unbounded_channel(); + let appends_handle = tokio::spawn(async move { + while let Some(next) = append_stream.next().await { + match next { + Ok(AppendOutput { + start_seq_num, + end_seq_num, + .. + }) => { + let append = Instant::now(); + let records = end_seq_num - start_seq_num; + if records != 1 { + appends_tx + .send(Err(S2CliError::PingStreamMutated)) + .expect("appends channel open"); + return; + } + appends_tx.send(Ok(append)).expect("appends channel open"); + } + Err(e) => { + appends_tx + .send(Err(S2CliError::from(ServiceError::new( + ServiceErrorContext::AppendSession, + e, + )))) + .expect("appends channel open"); + } + } + } + }); + + Ok(Self { + records_tx, + appends_handle, + reads_handle, + appends_rx, + reads_rx, + }) + } + + pub async fn ping(&mut self, bytes: u64) -> Result, S2CliError> { + let body = rand::thread_rng() + .sample_iter(&Uniform::new_inclusive(0, u8::MAX)) + .take(bytes as usize) + .collect::>(); + + let record = AppendRecord::new(body.clone()).expect("pre validated append record bytes"); + + self.records_tx.send(record).expect("stream channel open"); + + let send = Instant::now(); + + let (append, read, record) = match join!(self.appends_rx.recv(), self.reads_rx.recv()) { + (None, _) | (_, None) => return Ok(None), + (Some(Err(e)), _) | (_, Some(Err(e))) => return Err(e), + (Some(Ok(append)), Some(Ok((read, record)))) => (append, read, record), + }; + + // Validate the received record + if body != record.body || !record.headers.is_empty() { + return Err(S2CliError::PingStreamMutated); + } + + Ok(Some(PingResult { + bytes, + ack: append - send, + e2e: read - send, + })) + } +} + +impl Drop for Pinger { + fn drop(&mut self) { + self.appends_handle.abort(); + self.reads_handle.abort(); + } +} + +pub struct LatencyStats { + pub mean: Duration, + pub median: Duration, + pub p95: Duration, + pub p99: Duration, + pub max: Duration, + pub min: Duration, + pub stddev: Duration, +} + +impl LatencyStats { + pub fn generate(mut data: Vec) -> Self { + data.sort_unstable(); + + let n = data.len(); + + if n == 0 { + return Self { + mean: Duration::ZERO, + median: Duration::ZERO, + p95: Duration::ZERO, + p99: Duration::ZERO, + max: Duration::ZERO, + min: Duration::ZERO, + stddev: Duration::ZERO, + }; + } + + let mean = data.iter().sum::() / n as u32; + + let median = if n % 2 == 0 { + (data[n / 2 - 1] + data[n / 2]) / 2 + } else { + data[n / 2] + }; + + let p_idx = |p: f64| ((n as f64) * p).ceil() as usize - 1; + + let variance = data + .iter() + .map(|d| (d.as_secs_f64() - mean.as_secs_f64()).powi(2)) + .sum::() + / n as f64; + let stddev = Duration::from_secs_f64(variance.sqrt()); + + Self { + mean, + median, + p95: data[p_idx(0.95)], + p99: data[p_idx(0.99)], + max: data[n - 1], + min: data[0], + stddev, + } + } +} diff --git a/src/stream.rs b/src/stream.rs index 2794c32..023295f 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -95,15 +95,10 @@ impl StreamService { pub async fn append_session( &self, - append_input_stream: RecordStream>, - fencing_token: Option, - match_seq_num: Option, + stream: impl 'static + Send + Stream + Unpin, + opts: AppendRecordsBatchingOpts, ) -> Result, ServiceError> { - let opts = AppendRecordsBatchingOpts::default() - .with_fencing_token(fencing_token) - .with_match_seq_num(match_seq_num); - - let append_record_stream = AppendRecordsBatchingStream::new(append_input_stream, opts); + let append_record_stream = AppendRecordsBatchingStream::new(stream, opts); self.client .append_session(append_record_stream)