Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Ping (#48) #63

Merged
merged 18 commits into from
Dec 11, 2024
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ humantime = "2.1.0"
miette = { version = "7.2.0", features = ["fancy"] }
pin-project-lite = "0.2"
pin-utils = "0.1.0"
rand = "0.8.5"
serde = { version = "1.0.214", features = ["derive"] }
serde_json = "1.0.132"
thiserror = "1.0.67"
Expand Down
33 changes: 18 additions & 15 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ pub enum S2CliError {
#[error("Failed to initialize a `Record Reader`! {0}")]
RecordReaderInit(String),

#[error("Stream mutated concurrently during ping")]
PingStreamMutated,

#[error("Failed to write records: {0}")]
RecordWrite(String),

Expand Down Expand Up @@ -73,21 +76,21 @@ pub enum ServiceErrorContext {
impl std::fmt::Display for ServiceErrorContext {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
ServiceErrorContext::ListBasins => write!(f, "Failed to list basins"),
ServiceErrorContext::CreateBasin => write!(f, "Failed to create basin"),
ServiceErrorContext::DeleteBasin => write!(f, "Failed to delete basin"),
ServiceErrorContext::GetBasinConfig => write!(f, "Failed to get basin config"),
ServiceErrorContext::ReconfigureBasin => write!(f, "Failed to reconfigure basin"),
ServiceErrorContext::ListStreams => write!(f, "Failed to list streams"),
ServiceErrorContext::CreateStream => write!(f, "Failed to create stream"),
ServiceErrorContext::DeleteStream => write!(f, "Failed to delete stream"),
ServiceErrorContext::GetStreamConfig => write!(f, "Failed to get stream config"),
ServiceErrorContext::CheckTail => write!(f, "Failed to check tail"),
ServiceErrorContext::Trim => write!(f, "Failed to trim"),
ServiceErrorContext::Fence => write!(f, "Failed to set fencing token"),
ServiceErrorContext::AppendSession => write!(f, "Failed to append session"),
ServiceErrorContext::ReadSession => write!(f, "Failed to read session"),
ServiceErrorContext::ReconfigureStream => write!(f, "Failed to reconfigure stream"),
Self::ListBasins => write!(f, "Failed to list basins"),
Self::CreateBasin => write!(f, "Failed to create basin"),
Self::DeleteBasin => write!(f, "Failed to delete basin"),
Self::GetBasinConfig => write!(f, "Failed to get basin config"),
Self::ReconfigureBasin => write!(f, "Failed to reconfigure basin"),
Self::ListStreams => write!(f, "Failed to list streams"),
Self::CreateStream => write!(f, "Failed to create stream"),
Self::DeleteStream => write!(f, "Failed to delete stream"),
Self::GetStreamConfig => write!(f, "Failed to get stream config"),
Self::CheckTail => write!(f, "Failed to check tail"),
Self::Trim => write!(f, "Failed to trim"),
Self::Fence => write!(f, "Failed to set fencing token"),
Self::AppendSession => write!(f, "Failed to append session"),
Self::ReadSession => write!(f, "Failed to read session"),
Self::ReconfigureStream => write!(f, "Failed to reconfigure stream"),
}
}
}
Expand Down
158 changes: 152 additions & 6 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,11 @@ use clap::{builder::styling, Parser, Subcommand};
use colored::*;
use config::{config_path, create_config};
use error::{S2CliError, ServiceError, ServiceErrorContext};
use ping::{LatencyStats, PingResult, Pinger};
use rand::Rng;
use stream::{RecordStream, StreamService};
use streamstore::{
batching::AppendRecordsBatchingOpts,
client::{BasinClient, Client, ClientConfig, S2Endpoints, StreamClient},
types::{
BasinInfo, BasinName, CommandRecord, ConvertError, FencingToken, MeteredBytes as _,
Expand All @@ -36,6 +39,7 @@ mod stream;

mod config;
mod error;
mod ping;
mod types;

const STYLES: styling::Styles = styling::Styles::styled()
Expand Down Expand Up @@ -289,6 +293,31 @@ enum Commands {
#[arg(short = 'b', long)]
limit_bytes: Option<u64>,
},

/// Ping the stream to get append acknowledgement and end-to-end latencies.
Ping {
/// Name of the basin.
basin: BasinName,

/// Name of the stream.
stream: String,

/// Send a batch after this interval.
///
/// Will be set to a minimum of 100ms.
#[arg(short = 'i', long, default_value = "500ms")]
interval: humantime::Duration,

/// Batch size in bytes.
///
/// Truncated to a maximum of 50 KiB.
#[arg(short = 'b', long, default_value_t = 20 * 1024)]
batch_bytes: u64,

/// Stop after sending this number of batches.
#[arg(short = 'n', long)]
num_batches: Option<usize>,
},
}

#[derive(Subcommand, Debug)]
Expand Down Expand Up @@ -369,7 +398,8 @@ fn client_config(auth_token: String) -> Result<ClientConfig, S2CliError> {
let endpoints = S2Endpoints::from_env().map_err(S2CliError::EndpointsFromEnv)?;
let client_config = ClientConfig::new(auth_token.to_string())
.with_user_agent("s2-cli".parse::<HeaderValue>().expect("valid user agent"))
.with_endpoints(endpoints);
.with_endpoints(endpoints)
.with_request_timeout(Duration::from_secs(30));
Ok(client_config)
}

Expand Down Expand Up @@ -666,7 +696,12 @@ async fn run() -> Result<(), S2CliError> {
);

let mut append_output_stream = StreamService::new(stream_client)
.append_session(append_input_stream, fencing_token, match_seq_num)
.append_session(
append_input_stream,
AppendRecordsBatchingOpts::new()
.with_fencing_token(fencing_token)
.with_match_seq_num(match_seq_num),
)
.await?;
loop {
select! {
Expand Down Expand Up @@ -729,9 +764,9 @@ async fn run() -> Result<(), S2CliError> {
maybe_read_result = read_output_stream.next() => {
match maybe_read_result {
Some(read_result) => {
if start.is_none() {
start = Some(Instant::now());
}
if start.is_none() {
start = Some(Instant::now());
}
match read_result {
Ok(ReadOutput::Batch(sequenced_record_batch)) => {
let num_records = sequenced_record_batch.records.len();
Expand Down Expand Up @@ -830,6 +865,117 @@ async fn run() -> Result<(), S2CliError> {
writer.flush().await.expect("writer flush");
}
}
}

Commands::Ping {
basin,
stream,
interval,
batch_bytes,
num_batches,
} => {
let cfg = config::load_config(&config_path)?;
let client_config = client_config(cfg.auth_token)?;
let stream_client = StreamService::new(StreamClient::new(client_config, basin, stream));

let interval = interval.max(Duration::from_millis(100));
let batch_bytes = batch_bytes.min(50 * 1024);

eprintln!("Preparing...");

let mut pinger = Pinger::init(&stream_client).await?;

let mut pings = Vec::new();

async fn ping_next(
pinger: &mut Pinger,
pings: &mut Vec<PingResult>,
interval: Duration,
batch_bytes: u64,
) -> Result<(), S2CliError> {
let jitter_op = if rand::random() {
u64::saturating_add
} else {
u64::saturating_sub
};

let record_bytes = jitter_op(batch_bytes, rand::thread_rng().gen_range(0..10));
vrongmeal marked this conversation as resolved.
Show resolved Hide resolved

let Some(res) = pinger.ping(record_bytes).await? else {
return Ok(());
};

eprintln!(
"{:<5} bytes: ack = {:<7} e2e = {:<7}",
res.bytes.to_string().blue(),
format!("{} ms", res.ack.as_millis()).blue(),
format!("{} ms", res.e2e.as_millis()).blue(),
);

pings.push(res);

tokio::time::sleep(interval).await;
Ok(())
}

while Some(pings.len()) != num_batches {
select! {
_ = ping_next(&mut pinger, &mut pings, interval, batch_bytes) => (),
_ = signal::ctrl_c() => break,
}
}

// Close the pinger.
std::mem::drop(pinger);

let total_batches = pings.len();
let (bytes, (acks, e2es)): (Vec<_>, (Vec<_>, Vec<_>)) = pings
.into_iter()
.map(|PingResult { bytes, ack, e2e }| (bytes, (ack, e2e)))
.unzip();
let total_bytes = bytes.into_iter().sum::<u64>();

eprintln!(/* Empty line */);
eprintln!("Sent {} batches with {} bytes", total_batches, total_bytes);
vrongmeal marked this conversation as resolved.
Show resolved Hide resolved

pub fn print_stats(stats: LatencyStats, name: &str) {
eprintln!(
"{:-^60}",
format!(" {name} Latency Statistics ").yellow().bold()
);

fn stat(key: &str, val: String) {
eprintln!("{:>9} {}", key, val.green());
}

fn stat_duration(key: &str, val: Duration) {
stat(key, format!("{} ms", val.as_millis()));
}

let LatencyStats {
mean,
median,
p95,
p99,
max,
min,
stddev,
} = stats;

stat_duration("Mean", mean);
stat_duration("Median", median);
stat_duration("P95", p95);
stat_duration("P99", p99);
stat_duration("Max", max);
stat_duration("Min", min);
stat_duration("Std Dev", stddev);
}

eprintln!(/* Empty line */);
print_stats(LatencyStats::generate(acks), "Append Acknowledgement");
eprintln!(/* Empty line */);
print_stats(LatencyStats::generate(e2es), "End to End");
vrongmeal marked this conversation as resolved.
Show resolved Hide resolved
}
};

std::process::exit(0);
}
Loading
Loading