From 1b21cc3ad558982259336da629638866ef8138d1 Mon Sep 17 00:00:00 2001 From: fluxie Date: Mon, 18 Dec 2023 10:14:50 +0200 Subject: [PATCH] Support for parallel message generation and basic statistics The generator now takes "tasks" parameter which tells it how many tasks it should launch. The server counts the number of send_message calls it has processed and the generator periodically retrieves this value and reports how many such calls have been processed in one millisecond. The goal of this is to enable performance assessment of the proxide proxy. Includes linting support in the pipeline. --- .github/workflows/ci.yml | 2 + test/rust_grpc/Cargo.lock | 18 ++++ test/rust_grpc/Cargo.toml | 8 +- test/rust_grpc/build.rs | 3 +- test/rust_grpc/proto/rust_grpc.proto | 8 ++ test/rust_grpc/src/generator.rs | 130 +++++++++++++++++++------ test/rust_grpc/src/lib.rs | 139 ++++++++++++++++++++++++--- test/rust_grpc/src/server.rs | 111 +++++++++++++++------ 8 files changed, 343 insertions(+), 76 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 7b17edf..c5a6f87 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -78,6 +78,8 @@ jobs: - name: Run rustfmt run: | cargo fmt --all -- --check + cargo fmt --all --manifest-path ./test/rust_grpc/Cargo.toml -- --check - name: Run clippy run: | cargo clippy --all -- -D warnings + cargo clippy --all --manifest-path ./test/rust_grpc/Cargo.toml -- -D warnings diff --git a/test/rust_grpc/Cargo.lock b/test/rust_grpc/Cargo.lock index 8a903b7..b65b8e6 100644 --- a/test/rust_grpc/Cargo.lock +++ b/test/rust_grpc/Cargo.lock @@ -113,6 +113,12 @@ dependencies = [ "syn", ] +[[package]] +name = "atomic-counter" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "62f447d68cfa5a9ab0c1c862a703da2a65b5ed1b7ce1153c9eb0169506d56019" + [[package]] name = "autocfg" version = "1.1.0" @@ -364,8 +370,11 @@ checksum = "4271d37baee1b8c7e4b708028c57d816cf9d2434acb33a549475f78c181f6253" name = "grpc-tester" version = "0.1.0" dependencies = [ + "atomic-counter", "clap", + "portpicker", "prost", + "prost-types", "tokio", "tokio-shutdown", "tonic", @@ -664,6 +673,15 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" +[[package]] +name = "portpicker" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "be97d76faf1bfab666e1375477b23fde79eccf0276e9b63b92a39d676a889ba9" +dependencies = [ + "rand", +] + [[package]] name = "ppv-lite86" version = "0.2.17" diff --git a/test/rust_grpc/Cargo.toml b/test/rust_grpc/Cargo.toml index 5f7adec..8f93f09 100644 --- a/test/rust_grpc/Cargo.toml +++ b/test/rust_grpc/Cargo.toml @@ -19,9 +19,15 @@ path="src/server.rs" [dependencies] tonic = "0.10.2" prost = "0.12.3" +prost-types = "0.12.3" tokio = { version = "1.35.0", features = ["macros", "rt-multi-thread"] } tokio-shutdown = "0.1.4" clap = { version= "4.4.11", features = ["derive"] } +portpicker = "0.1.1" +atomic-counter = "1.0.1" [build-dependencies] -tonic-build = { version = "0.10.2", features = ["prost"] } \ No newline at end of file +tonic-build = { version = "0.10.2", features = ["prost"] } + +[profile.release] +debug = true diff --git a/test/rust_grpc/build.rs b/test/rust_grpc/build.rs index d188817..d3b637f 100644 --- a/test/rust_grpc/build.rs +++ b/test/rust_grpc/build.rs @@ -1,4 +1,5 @@ -fn main() -> Result<(), Box> { +fn main() -> Result<(), Box> +{ tonic_build::compile_protos("proto/rust_grpc.proto")?; Ok(()) } diff --git a/test/rust_grpc/proto/rust_grpc.proto b/test/rust_grpc/proto/rust_grpc.proto index 8d9e292..ba7457c 100644 --- a/test/rust_grpc/proto/rust_grpc.proto +++ b/test/rust_grpc/proto/rust_grpc.proto @@ -23,6 +23,8 @@ option java_multiple_files = true; option java_package = "io.grpc.examples.rust_grpc"; option java_outer_classname = "RustGrpcProto"; +import "google/protobuf/duration.proto"; + /// Test server service TestService { @@ -49,6 +51,12 @@ message DiagnosticsRequest { } message DiagnosticsResponse { + + /// Uptime of the server. + google.protobuf.Duration uptime = 1; + + /// Number of times SendMessage has been called since the server was started. + uint64 send_message_calls = 2; } message WaitForFirstMessageRequest { diff --git a/test/rust_grpc/src/generator.rs b/test/rust_grpc/src/generator.rs index 5a97091..7bbc2f2 100644 --- a/test/rust_grpc/src/generator.rs +++ b/test/rust_grpc/src/generator.rs @@ -1,16 +1,21 @@ -use clap::Parser; +use clap::{arg, Parser}; +use rust_grpc_private::DiagnosticsRequest; use rust_grpc_private::{SendMessageRequest, WaitForFirstMessageRequest}; -use std::thread; +use std::{thread, time}; use tokio::sync::mpsc::UnboundedSender; +use tokio::task::JoinSet; +use tonic::transport::Channel; -mod rust_grpc_private { +mod rust_grpc_private +{ tonic::include_proto!("rust_grpc"); } /// Simple program to greet a person. -#[derive(Parser, Debug)] +#[derive(Parser, Debug, Clone)] #[command(author, version)] -pub struct Args { +pub struct Args +{ /// Name of the person to greet. #[arg(short, long, default_value = "http://[::1]:50051")] pub address: String, @@ -18,19 +23,26 @@ pub struct Args { /// Period / delay between the messages sent to the server. #[arg(short, long, value_parser = parse_period)] pub period: std::time::Duration, + + /// The number of asynchronous tasks used to send the messages in parallel. + #[arg(short, long, default_value_t = 1)] + pub tasks: u16, } /// A gRPC message generator that periodically sends messages to the target server. -pub struct GrpcGenerator { +pub struct GrpcGenerator +{ generator: Option>, stop: UnboundedSender<()>, } -impl GrpcGenerator { +impl GrpcGenerator +{ /// Stars a new gRPC generator. - pub async fn start(address: &str) -> Result> { + pub async fn start(args: Args) -> Result> + { // Start the generator in a separate tokio runtime to ensure its tasks won't interfere with the tests. - let address_clone = address.to_string(); + let address_clone = args.address.to_string(); let (generator_started_send, mut generator_started_recv) = tokio::sync::mpsc::unbounded_channel(); let (stop_requested_send, mut stop_requested_recv) = tokio::sync::mpsc::unbounded_channel(); @@ -42,8 +54,9 @@ impl GrpcGenerator { .expect("Starting runtime for message generator failed."); rt.block_on(async move { spawn(Args { - address: address_clone, - period: std::time::Duration::from_millis(100), + address: args.address, + period: args.period, + tasks: args.tasks, }) .expect("Starting generator failed."); generator_started_send @@ -60,10 +73,9 @@ impl GrpcGenerator { // Wait for the first message to reach the server. // This improve the robustness of the tests utilizing the generator as the environment is guaranteed to work after this. { - let mut client = rust_grpc_private::test_service_client::TestServiceClient::connect( - address.to_string(), - ) - .await?; + let mut client = + rust_grpc_private::test_service_client::TestServiceClient::connect(address_clone) + .await?; let _ = client .wait_for_first_message(WaitForFirstMessageRequest {}) .await; @@ -76,10 +88,11 @@ impl GrpcGenerator { } /// Stops the gRPC message generation. - pub fn stop(&mut self) -> Result<(), Box> { + pub fn stop(&mut self) -> Result<(), Box> + { let _ = self.stop.send(()); // Fails when called repeatedly as the channel gets dropped. if let Some(generator) = self.generator.take() { - if let Err(_) = generator.join() { + if generator.join().is_err() { return Err(Box::::from( "Waiting for the generator to stop failed.", )); @@ -89,14 +102,17 @@ impl GrpcGenerator { } } -impl Drop for GrpcGenerator { - fn drop(&mut self) { +impl Drop for GrpcGenerator +{ + fn drop(&mut self) + { self.stop().expect("Dropping the generator failed. "); } } /// Spawns a new asynchronous message generation tasks. -fn spawn(args: Args) -> Result<(), Box> { +fn spawn(args: Args) -> Result<(), Box> +{ tokio::spawn(async move { generate_messages(args) .await @@ -106,36 +122,88 @@ fn spawn(args: Args) -> Result<(), Box> { } /// Starts sending messages to the server, -async fn generate_messages(args: Args) -> Result<(), Box> { - let mut client = - rust_grpc_private::test_service_client::TestServiceClient::connect(args.address).await?; +async fn generate_messages(args: Args) -> Result<(), Box> +{ + // Start the requested number of tasks. + // Each task is given a unique client as the generator did not scale properly when the channel was shared + // between the clients. The number of requests sent to the peaked at around <6 tasks. (Very rough approximation.) + // TODO: Investigate further. + let mut tasks: JoinSet>> = JoinSet::new(); + for _t in 0..args.tasks { + let client = rust_grpc_private::test_service_client::TestServiceClient::connect( + args.address.to_string(), + ) + .await?; + tasks.spawn(async move { + generate_messages_task(client, args.period).await?; + Ok(()) + }); + } + while let Some(result) = tasks.join_next().await { + match result { + Ok(_) => {} + Err(error) if error.is_cancelled() => {} + Err(error) => return Err(Box::new(error)), + } + } + Ok(()) +} + +/// An asynchronous function which sends messages to the server. +async fn generate_messages_task( + mut client: rust_grpc_private::test_service_client::TestServiceClient, + period: std::time::Duration, +) -> Result<(), Box> +{ loop { let request = tonic::Request::new(SendMessageRequest {}); - tokio::select! { chosen = client.send_message(request) => { chosen?; }, _chosen = tokio::signal::ctrl_c() => { break; } } - if args.period.is_zero() == false { - tokio::time::sleep(args.period).await; + #[allow(clippy::bool_comparison)] + if period.is_zero() == false { + tokio::time::sleep(period).await; } } - Ok(()) } #[tokio::main] #[allow(dead_code)] -async fn main() -> Result<(), Box> { - spawn(Args::parse())?; - tokio::signal::ctrl_c().await?; +async fn main() -> Result<(), Box> +{ + let args = Args::parse(); + spawn(args.clone())?; + tokio::select! { + result = report_statistics( args ) => result?, + result = tokio::signal::ctrl_c() => result?, + } Ok(()) } /// Reads period from the command line and converts it into duration. -fn parse_period(arg: &str) -> Result { +fn parse_period(arg: &str) -> Result +{ let seconds = arg.parse()?; Ok(std::time::Duration::from_millis(seconds)) } + +/// Reports server statistics to the console. +async fn report_statistics(args: Args) -> Result<(), Box> +{ + let mut client = + rust_grpc_private::test_service_client::TestServiceClient::connect(args.address).await?; + + loop { + let response = client.get_diagnostics(DiagnosticsRequest {}).await?; + let diagnostics = response.get_ref(); + let server_uptime = time::Duration::try_from(diagnostics.uptime.clone().unwrap())?; + let call_rate = diagnostics.send_message_calls as u128 / server_uptime.as_millis(); + println!("Call rate: {} calls / ms", call_rate); + + tokio::time::sleep(time::Duration::from_secs(2)).await; + } +} diff --git a/test/rust_grpc/src/lib.rs b/test/rust_grpc/src/lib.rs index 22a93b6..922f529 100644 --- a/test/rust_grpc/src/lib.rs +++ b/test/rust_grpc/src/lib.rs @@ -1,32 +1,71 @@ pub use rust_grpc::{ DiagnosticsRequest, DiagnosticsResponse, SendMessageRequest, SendMessageResponse, }; +use std::time; +use std::time::Duration; use tonic::transport::channel::Channel; mod generator; mod server; -pub mod rust_grpc { +pub mod rust_grpc +{ tonic::include_proto!("rust_grpc"); } -pub struct GrpcTester { +pub struct Args +{ + /// Period / delay between the messages sent to the server. + pub period: std::time::Duration, + + /// The number of asynchronous tasks used to send the messages in parallel. + pub tasks: u16, +} + +/// Snapshot of statistics of a test run. +pub struct Statistics +{ + /// Uptime of the server associated with the tester. + pub server_uptime: std::time::Duration, + + /// Number of "SendMessage" calls the tester has processed. + pub send_message_calls_processed: u64, +} + +pub struct GrpcTester +{ server: server::GrpcServer, generator: generator::GrpcGenerator, client: rust_grpc::test_service_client::TestServiceClient, } -impl GrpcTester { +impl GrpcTester +{ /// Gets gRPC client for communicating with the server associated with the tester. - pub fn client(&self) -> rust_grpc::test_service_client::TestServiceClient { + pub fn client(&self) -> rust_grpc::test_service_client::TestServiceClient + { self.client.clone() } + pub async fn pipe() -> Result> + { + Self::pipe_with_args(Args { + tasks: 1, + period: Duration::from_secs(1), + }) + .await + } + /// Creates a new testes which internally pipes data from client to the server. - pub async fn pipe() -> Result> { + pub async fn pipe_with_args(args: Args) -> Result> + { let server = server::GrpcServer::start().await?; - let http_address = server.http(); - let generator = generator::GrpcGenerator::start(&http_address).await?; + let generator = generator::GrpcGenerator::start(generator::Args { + address: server.http(), + period: args.period, + tasks: args.tasks, + }) + .await?; let client = rust_grpc::test_service_client::TestServiceClient::connect(server.http()).await?; Ok(GrpcTester { @@ -36,8 +75,24 @@ impl GrpcTester { }) } + pub async fn get_statistics(&self) -> Result> + { + let diagnostics = self + .client + .clone() + .get_diagnostics(DiagnosticsRequest {}) + .await?; + let diagnostics = diagnostics.get_ref(); + + Ok(Statistics { + server_uptime: time::Duration::try_from(diagnostics.uptime.clone().unwrap())?, + send_message_calls_processed: diagnostics.send_message_calls, + }) + } + /// Stops the gRPC Server - pub fn stop(&mut self) -> Result<(), Box> { + pub fn stop(&mut self) -> Result<(), Box> + { self.generator.stop()?; self.server.stop()?; @@ -45,19 +100,77 @@ impl GrpcTester { } } -impl Drop for GrpcTester { - fn drop(&mut self) { +impl Drop for GrpcTester +{ + fn drop(&mut self) + { self.stop().expect("Dropping the tester failed."); } } #[cfg(test)] -mod test { - use crate::GrpcTester; +mod test +{ + use crate::{Args, GrpcTester}; + use std::time::Duration; #[tokio::test] - async fn starting_and_stopping_tester_succeeds() { + async fn starting_and_stopping_tester_succeeds() + { let mut tester = GrpcTester::pipe().await.expect("Starting tester failed."); tester.stop().expect("Stopping tester failed."); } + + #[tokio::test] + async fn server_has_valid_uptime() + { + let mut tester = GrpcTester::pipe().await.expect("Starting tester failed."); + + let statistics = tester + .get_statistics() + .await + .expect("Fetching tester statistics failed."); + if statistics.server_uptime.is_zero() { + panic!("Uptime of the server cannot be zero.") + } + + tester.stop().expect("Stopping tester failed."); + } + + #[tokio::test] + async fn server_receives_messages() + { + // Ensure the generator sends messages constantly to minimize the test duration. + let mut tester = GrpcTester::pipe_with_args(Args { + tasks: 1, + period: Duration::from_secs(0), + }) + .await + .expect("Starting tester failed."); + + // Ensure the server is reporting increase in the number of processed send_message calls. + let statistics_base = tester + .get_statistics() + .await + .expect("Fetching tester statistics failed."); + for attempt in 0.. { + let statistics = tester + .get_statistics() + .await + .expect("Fetching tester statistics failed."); + if statistics.server_uptime <= statistics_base.server_uptime { + panic!("Server's uptime should be increasing.") + } + if statistics.send_message_calls_processed + > statistics_base.send_message_calls_processed + { + break; + } + if attempt > 100 { + panic!("Server did not report any increase in send_message calls.") + } + tokio::time::sleep(Duration::from_millis(20)).await; + } + tester.stop().expect("Stopping tester failed."); + } } diff --git a/test/rust_grpc/src/server.rs b/test/rust_grpc/src/server.rs index 8336fa7..ed219d8 100644 --- a/test/rust_grpc/src/server.rs +++ b/test/rust_grpc/src/server.rs @@ -1,3 +1,4 @@ +use atomic_counter::AtomicCounter; use clap::Parser; use rust_grpc_private::test_service_server::{TestService, TestServiceServer}; use rust_grpc_private::{ @@ -6,38 +7,48 @@ use rust_grpc_private::{ }; use std::net::SocketAddr; use std::thread; +use std::time::Instant; use tokio::sync::mpsc::UnboundedSender; use tokio::sync::watch; use tokio::sync::watch::{Receiver, Sender}; use tonic::transport::Server; use tonic::{Request, Response, Status}; -mod rust_grpc_private { +mod rust_grpc_private +{ tonic::include_proto!("rust_grpc"); } /// A simple gRPC Server for receiving messages #[derive(Parser, Debug)] #[command(author, version)] -struct Args { +struct Args +{ /// Network address to listen. #[arg(short, long, default_value = "[::1]:50051")] pub address: String, } /// A gRPC server ready to accept messages -pub struct GrpcServer { +pub struct GrpcServer +{ address: SocketAddr, server: Option>, stop: UnboundedSender<()>, } -impl GrpcServer { +impl GrpcServer +{ /// Starts a new gRPC server. - pub async fn start() -> Result> { + pub async fn start() -> Result> + { // Start the server in a separate tokio runtime to ensure its tasks won't interfere with the tests. - let address: SocketAddr = "[::1]:50051".parse()?; - let address_clone = address.clone(); + let address: SocketAddr = format!( + "[::1]:{}", + portpicker::pick_unused_port().expect("No TCP ports available.") + ) + .parse()?; + let address_clone = address; let (server_listening_send, mut server_listening_recv) = tokio::sync::mpsc::unbounded_channel(); let (stop_requested_send, mut stop_requested_recv) = tokio::sync::mpsc::unbounded_channel(); @@ -71,15 +82,17 @@ impl GrpcServer { } /// Gets the HTTP address of the server. - pub fn http(&self) -> String { + pub fn http(&self) -> String + { format!("http://{}", &self.address) } /// Stops the gRPC server. - pub fn stop(&mut self) -> Result<(), Box> { - let _ = self.stop.send(()); // Fails when called repeatedly as the channel gets dropped. + pub fn stop(&mut self) -> Result<(), Box> + { + let _ = self.stop.send(()); // Fails when called repeatedly as the channel gets dropped. if let Some(server) = self.server.take() { - if let Err(_) = server.join() { + if server.join().is_err() { return Err(Box::::from( "Waiting for the server to stop failed.", )); @@ -89,7 +102,8 @@ impl GrpcServer { } /// Pings the server and ensures it is listening. - async fn wait_for_server_to_listen(&self) -> Result<(), Box> { + async fn wait_for_server_to_listen(&self) -> Result<(), Box> + { // Try to establish connection to the server. const MAX_ATTEMPTS: u32 = 100; for attempt in 1.. { @@ -108,7 +122,7 @@ impl GrpcServer { }; match client.ping(PingRequest {}).await { Ok(_) => { - break; // A message was sent to the server. + break; // A message was sent to the server. } Err(_) if attempt < MAX_ATTEMPTS => { tokio::time::sleep(std::time::Duration::from_millis(20)).await; @@ -123,24 +137,37 @@ impl GrpcServer { } } -impl Drop for GrpcServer { - fn drop(&mut self) { +impl Drop for GrpcServer +{ + fn drop(&mut self) + { self.stop().expect("Dropping GrpcServer failed."); } } +/// A message sink for the message generator. /// -struct LocalTestService { +/// Collects statistics about the calls it receives. +struct LocalTestService +{ /// A watcher for acknowledging that the first "SendMessage" call has been received by the server. message_received_notify: Sender, /// A watcher for checking whether the server has received "SendMessage" call, message_received_check: Receiver, + + /// Timestamp when the service was created. + started: Instant, + + /// The number of send_message calls the service has received. + send_message_calls_served: atomic_counter::RelaxedCounter, } -impl LocalTestService { +impl LocalTestService +{ /// Spawns the test service in a new asynchronous task. - fn spawn(address: SocketAddr) -> Result<(), Box> { + fn spawn(address: SocketAddr) -> Result<(), Box> + { tokio::spawn(async move { LocalTestService::run(address) .await @@ -150,14 +177,19 @@ impl LocalTestService { } /// Launches the the test service. - async fn run(address: SocketAddr) -> Result<(), Box> { + async fn run(address: SocketAddr) -> Result<(), Box> + { let (tx, rx) = watch::channel(false); let service = LocalTestService { message_received_notify: tx, message_received_check: rx, + started: Instant::now(), + send_message_calls_served: atomic_counter::RelaxedCounter::new(0), }; + #[cfg(not(test))] println!("Test server listening on {}", address); Server::builder() + .concurrency_limit_per_connection(128) .add_service(TestServiceServer::new(service)) .serve_with_shutdown(address, async { tokio::signal::ctrl_c() @@ -170,42 +202,59 @@ impl LocalTestService { } } -impl Drop for LocalTestService { - fn drop(&mut self) { +impl Drop for LocalTestService +{ + fn drop(&mut self) + { let _ = self.message_received_notify.send(true); } } #[tonic::async_trait] -impl TestService for LocalTestService { +impl TestService for LocalTestService +{ async fn send_message( &self, _request: Request, - ) -> Result, Status> { + ) -> Result, Status> + { // Avoid unnecessary notifications to reduce CPU <-> CPU communication. self.message_received_notify .send_if_modified(|value: &mut bool| { + #[allow(clippy::bool_comparison)] if *value == false { *value = true; - return true; + true } else { - return false; + false } }); + self.send_message_calls_served.inc(); Ok(Response::new(SendMessageResponse {})) } async fn get_diagnostics( &self, _request: Request, - ) -> Result, Status> { - Err(Status::unimplemented("")) + ) -> Result, Status> + { + let duration = Instant::now().duration_since(self.started); + let duration = match prost_types::Duration::try_from(duration) { + Ok(d) => d, + Err(_) => return Err(Status::internal("Calculating server uptime failed.")), + }; + + Ok(Response::new(DiagnosticsResponse { + uptime: Some(duration), + send_message_calls: self.send_message_calls_served.get() as u64, + })) } async fn wait_for_first_message( &self, _request: Request, - ) -> Result, Status> { + ) -> Result, Status> + { self.message_received_check .clone() .wait_for(|value| value == &true) @@ -214,14 +263,16 @@ impl TestService for LocalTestService { Ok(Response::new(WaitForFirstMessageResponse {})) } - async fn ping(&self, _request: Request) -> Result, Status> { + async fn ping(&self, _request: Request) -> Result, Status> + { Ok(Response::new(PingResponse {})) } } #[tokio::main] #[allow(dead_code)] -async fn main() -> Result<(), Box> { +async fn main() -> Result<(), Box> +{ let args = Args::parse(); LocalTestService::spawn(args.address.parse()?)?; tokio::signal::ctrl_c().await?;