diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 7b17edf..c5a6f87 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -78,6 +78,8 @@ jobs: - name: Run rustfmt run: | cargo fmt --all -- --check + cargo fmt --all --manifest-path ./test/rust_grpc/Cargo.toml -- --check - name: Run clippy run: | cargo clippy --all -- -D warnings + cargo clippy --all --manifest-path ./test/rust_grpc/Cargo.toml -- -D warnings diff --git a/test/rust_grpc/Cargo.lock b/test/rust_grpc/Cargo.lock index 8a903b7..b65b8e6 100644 --- a/test/rust_grpc/Cargo.lock +++ b/test/rust_grpc/Cargo.lock @@ -113,6 +113,12 @@ dependencies = [ "syn", ] +[[package]] +name = "atomic-counter" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "62f447d68cfa5a9ab0c1c862a703da2a65b5ed1b7ce1153c9eb0169506d56019" + [[package]] name = "autocfg" version = "1.1.0" @@ -364,8 +370,11 @@ checksum = "4271d37baee1b8c7e4b708028c57d816cf9d2434acb33a549475f78c181f6253" name = "grpc-tester" version = "0.1.0" dependencies = [ + "atomic-counter", "clap", + "portpicker", "prost", + "prost-types", "tokio", "tokio-shutdown", "tonic", @@ -664,6 +673,15 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" +[[package]] +name = "portpicker" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "be97d76faf1bfab666e1375477b23fde79eccf0276e9b63b92a39d676a889ba9" +dependencies = [ + "rand", +] + [[package]] name = "ppv-lite86" version = "0.2.17" diff --git a/test/rust_grpc/Cargo.toml b/test/rust_grpc/Cargo.toml index 5f7adec..8f93f09 100644 --- a/test/rust_grpc/Cargo.toml +++ b/test/rust_grpc/Cargo.toml @@ -19,9 +19,15 @@ path="src/server.rs" [dependencies] tonic = "0.10.2" prost = "0.12.3" +prost-types = "0.12.3" tokio = { version = "1.35.0", features = ["macros", "rt-multi-thread"] } tokio-shutdown = "0.1.4" clap = { version= "4.4.11", features = ["derive"] } +portpicker = "0.1.1" +atomic-counter = "1.0.1" [build-dependencies] -tonic-build = { version = "0.10.2", features = ["prost"] } \ No newline at end of file +tonic-build = { version = "0.10.2", features = ["prost"] } + +[profile.release] +debug = true diff --git a/test/rust_grpc/build.rs b/test/rust_grpc/build.rs index d188817..d3b637f 100644 --- a/test/rust_grpc/build.rs +++ b/test/rust_grpc/build.rs @@ -1,4 +1,5 @@ -fn main() -> Result<(), Box> { +fn main() -> Result<(), Box> +{ tonic_build::compile_protos("proto/rust_grpc.proto")?; Ok(()) } diff --git a/test/rust_grpc/proto/rust_grpc.proto b/test/rust_grpc/proto/rust_grpc.proto index 8d9e292..ba7457c 100644 --- a/test/rust_grpc/proto/rust_grpc.proto +++ b/test/rust_grpc/proto/rust_grpc.proto @@ -23,6 +23,8 @@ option java_multiple_files = true; option java_package = "io.grpc.examples.rust_grpc"; option java_outer_classname = "RustGrpcProto"; +import "google/protobuf/duration.proto"; + /// Test server service TestService { @@ -49,6 +51,12 @@ message DiagnosticsRequest { } message DiagnosticsResponse { + + /// Uptime of the server. + google.protobuf.Duration uptime = 1; + + /// Number of times SendMessage has been called since the server was started. + uint64 send_message_calls = 2; } message WaitForFirstMessageRequest { diff --git a/test/rust_grpc/src/generator.rs b/test/rust_grpc/src/generator.rs index 5a97091..7bbc2f2 100644 --- a/test/rust_grpc/src/generator.rs +++ b/test/rust_grpc/src/generator.rs @@ -1,16 +1,21 @@ -use clap::Parser; +use clap::{arg, Parser}; +use rust_grpc_private::DiagnosticsRequest; use rust_grpc_private::{SendMessageRequest, WaitForFirstMessageRequest}; -use std::thread; +use std::{thread, time}; use tokio::sync::mpsc::UnboundedSender; +use tokio::task::JoinSet; +use tonic::transport::Channel; -mod rust_grpc_private { +mod rust_grpc_private +{ tonic::include_proto!("rust_grpc"); } /// Simple program to greet a person. -#[derive(Parser, Debug)] +#[derive(Parser, Debug, Clone)] #[command(author, version)] -pub struct Args { +pub struct Args +{ /// Name of the person to greet. #[arg(short, long, default_value = "http://[::1]:50051")] pub address: String, @@ -18,19 +23,26 @@ pub struct Args { /// Period / delay between the messages sent to the server. #[arg(short, long, value_parser = parse_period)] pub period: std::time::Duration, + + /// The number of asynchronous tasks used to send the messages in parallel. + #[arg(short, long, default_value_t = 1)] + pub tasks: u16, } /// A gRPC message generator that periodically sends messages to the target server. -pub struct GrpcGenerator { +pub struct GrpcGenerator +{ generator: Option>, stop: UnboundedSender<()>, } -impl GrpcGenerator { +impl GrpcGenerator +{ /// Stars a new gRPC generator. - pub async fn start(address: &str) -> Result> { + pub async fn start(args: Args) -> Result> + { // Start the generator in a separate tokio runtime to ensure its tasks won't interfere with the tests. - let address_clone = address.to_string(); + let address_clone = args.address.to_string(); let (generator_started_send, mut generator_started_recv) = tokio::sync::mpsc::unbounded_channel(); let (stop_requested_send, mut stop_requested_recv) = tokio::sync::mpsc::unbounded_channel(); @@ -42,8 +54,9 @@ impl GrpcGenerator { .expect("Starting runtime for message generator failed."); rt.block_on(async move { spawn(Args { - address: address_clone, - period: std::time::Duration::from_millis(100), + address: args.address, + period: args.period, + tasks: args.tasks, }) .expect("Starting generator failed."); generator_started_send @@ -60,10 +73,9 @@ impl GrpcGenerator { // Wait for the first message to reach the server. // This improve the robustness of the tests utilizing the generator as the environment is guaranteed to work after this. { - let mut client = rust_grpc_private::test_service_client::TestServiceClient::connect( - address.to_string(), - ) - .await?; + let mut client = + rust_grpc_private::test_service_client::TestServiceClient::connect(address_clone) + .await?; let _ = client .wait_for_first_message(WaitForFirstMessageRequest {}) .await; @@ -76,10 +88,11 @@ impl GrpcGenerator { } /// Stops the gRPC message generation. - pub fn stop(&mut self) -> Result<(), Box> { + pub fn stop(&mut self) -> Result<(), Box> + { let _ = self.stop.send(()); // Fails when called repeatedly as the channel gets dropped. if let Some(generator) = self.generator.take() { - if let Err(_) = generator.join() { + if generator.join().is_err() { return Err(Box::::from( "Waiting for the generator to stop failed.", )); @@ -89,14 +102,17 @@ impl GrpcGenerator { } } -impl Drop for GrpcGenerator { - fn drop(&mut self) { +impl Drop for GrpcGenerator +{ + fn drop(&mut self) + { self.stop().expect("Dropping the generator failed. "); } } /// Spawns a new asynchronous message generation tasks. -fn spawn(args: Args) -> Result<(), Box> { +fn spawn(args: Args) -> Result<(), Box> +{ tokio::spawn(async move { generate_messages(args) .await @@ -106,36 +122,88 @@ fn spawn(args: Args) -> Result<(), Box> { } /// Starts sending messages to the server, -async fn generate_messages(args: Args) -> Result<(), Box> { - let mut client = - rust_grpc_private::test_service_client::TestServiceClient::connect(args.address).await?; +async fn generate_messages(args: Args) -> Result<(), Box> +{ + // Start the requested number of tasks. + // Each task is given a unique client as the generator did not scale properly when the channel was shared + // between the clients. The number of requests sent to the peaked at around <6 tasks. (Very rough approximation.) + // TODO: Investigate further. + let mut tasks: JoinSet>> = JoinSet::new(); + for _t in 0..args.tasks { + let client = rust_grpc_private::test_service_client::TestServiceClient::connect( + args.address.to_string(), + ) + .await?; + tasks.spawn(async move { + generate_messages_task(client, args.period).await?; + Ok(()) + }); + } + while let Some(result) = tasks.join_next().await { + match result { + Ok(_) => {} + Err(error) if error.is_cancelled() => {} + Err(error) => return Err(Box::new(error)), + } + } + Ok(()) +} + +/// An asynchronous function which sends messages to the server. +async fn generate_messages_task( + mut client: rust_grpc_private::test_service_client::TestServiceClient, + period: std::time::Duration, +) -> Result<(), Box> +{ loop { let request = tonic::Request::new(SendMessageRequest {}); - tokio::select! { chosen = client.send_message(request) => { chosen?; }, _chosen = tokio::signal::ctrl_c() => { break; } } - if args.period.is_zero() == false { - tokio::time::sleep(args.period).await; + #[allow(clippy::bool_comparison)] + if period.is_zero() == false { + tokio::time::sleep(period).await; } } - Ok(()) } #[tokio::main] #[allow(dead_code)] -async fn main() -> Result<(), Box> { - spawn(Args::parse())?; - tokio::signal::ctrl_c().await?; +async fn main() -> Result<(), Box> +{ + let args = Args::parse(); + spawn(args.clone())?; + tokio::select! { + result = report_statistics( args ) => result?, + result = tokio::signal::ctrl_c() => result?, + } Ok(()) } /// Reads period from the command line and converts it into duration. -fn parse_period(arg: &str) -> Result { +fn parse_period(arg: &str) -> Result +{ let seconds = arg.parse()?; Ok(std::time::Duration::from_millis(seconds)) } + +/// Reports server statistics to the console. +async fn report_statistics(args: Args) -> Result<(), Box> +{ + let mut client = + rust_grpc_private::test_service_client::TestServiceClient::connect(args.address).await?; + + loop { + let response = client.get_diagnostics(DiagnosticsRequest {}).await?; + let diagnostics = response.get_ref(); + let server_uptime = time::Duration::try_from(diagnostics.uptime.clone().unwrap())?; + let call_rate = diagnostics.send_message_calls as u128 / server_uptime.as_millis(); + println!("Call rate: {} calls / ms", call_rate); + + tokio::time::sleep(time::Duration::from_secs(2)).await; + } +} diff --git a/test/rust_grpc/src/lib.rs b/test/rust_grpc/src/lib.rs index 22a93b6..922f529 100644 --- a/test/rust_grpc/src/lib.rs +++ b/test/rust_grpc/src/lib.rs @@ -1,32 +1,71 @@ pub use rust_grpc::{ DiagnosticsRequest, DiagnosticsResponse, SendMessageRequest, SendMessageResponse, }; +use std::time; +use std::time::Duration; use tonic::transport::channel::Channel; mod generator; mod server; -pub mod rust_grpc { +pub mod rust_grpc +{ tonic::include_proto!("rust_grpc"); } -pub struct GrpcTester { +pub struct Args +{ + /// Period / delay between the messages sent to the server. + pub period: std::time::Duration, + + /// The number of asynchronous tasks used to send the messages in parallel. + pub tasks: u16, +} + +/// Snapshot of statistics of a test run. +pub struct Statistics +{ + /// Uptime of the server associated with the tester. + pub server_uptime: std::time::Duration, + + /// Number of "SendMessage" calls the tester has processed. + pub send_message_calls_processed: u64, +} + +pub struct GrpcTester +{ server: server::GrpcServer, generator: generator::GrpcGenerator, client: rust_grpc::test_service_client::TestServiceClient, } -impl GrpcTester { +impl GrpcTester +{ /// Gets gRPC client for communicating with the server associated with the tester. - pub fn client(&self) -> rust_grpc::test_service_client::TestServiceClient { + pub fn client(&self) -> rust_grpc::test_service_client::TestServiceClient + { self.client.clone() } + pub async fn pipe() -> Result> + { + Self::pipe_with_args(Args { + tasks: 1, + period: Duration::from_secs(1), + }) + .await + } + /// Creates a new testes which internally pipes data from client to the server. - pub async fn pipe() -> Result> { + pub async fn pipe_with_args(args: Args) -> Result> + { let server = server::GrpcServer::start().await?; - let http_address = server.http(); - let generator = generator::GrpcGenerator::start(&http_address).await?; + let generator = generator::GrpcGenerator::start(generator::Args { + address: server.http(), + period: args.period, + tasks: args.tasks, + }) + .await?; let client = rust_grpc::test_service_client::TestServiceClient::connect(server.http()).await?; Ok(GrpcTester { @@ -36,8 +75,24 @@ impl GrpcTester { }) } + pub async fn get_statistics(&self) -> Result> + { + let diagnostics = self + .client + .clone() + .get_diagnostics(DiagnosticsRequest {}) + .await?; + let diagnostics = diagnostics.get_ref(); + + Ok(Statistics { + server_uptime: time::Duration::try_from(diagnostics.uptime.clone().unwrap())?, + send_message_calls_processed: diagnostics.send_message_calls, + }) + } + /// Stops the gRPC Server - pub fn stop(&mut self) -> Result<(), Box> { + pub fn stop(&mut self) -> Result<(), Box> + { self.generator.stop()?; self.server.stop()?; @@ -45,19 +100,77 @@ impl GrpcTester { } } -impl Drop for GrpcTester { - fn drop(&mut self) { +impl Drop for GrpcTester +{ + fn drop(&mut self) + { self.stop().expect("Dropping the tester failed."); } } #[cfg(test)] -mod test { - use crate::GrpcTester; +mod test +{ + use crate::{Args, GrpcTester}; + use std::time::Duration; #[tokio::test] - async fn starting_and_stopping_tester_succeeds() { + async fn starting_and_stopping_tester_succeeds() + { let mut tester = GrpcTester::pipe().await.expect("Starting tester failed."); tester.stop().expect("Stopping tester failed."); } + + #[tokio::test] + async fn server_has_valid_uptime() + { + let mut tester = GrpcTester::pipe().await.expect("Starting tester failed."); + + let statistics = tester + .get_statistics() + .await + .expect("Fetching tester statistics failed."); + if statistics.server_uptime.is_zero() { + panic!("Uptime of the server cannot be zero.") + } + + tester.stop().expect("Stopping tester failed."); + } + + #[tokio::test] + async fn server_receives_messages() + { + // Ensure the generator sends messages constantly to minimize the test duration. + let mut tester = GrpcTester::pipe_with_args(Args { + tasks: 1, + period: Duration::from_secs(0), + }) + .await + .expect("Starting tester failed."); + + // Ensure the server is reporting increase in the number of processed send_message calls. + let statistics_base = tester + .get_statistics() + .await + .expect("Fetching tester statistics failed."); + for attempt in 0.. { + let statistics = tester + .get_statistics() + .await + .expect("Fetching tester statistics failed."); + if statistics.server_uptime <= statistics_base.server_uptime { + panic!("Server's uptime should be increasing.") + } + if statistics.send_message_calls_processed + > statistics_base.send_message_calls_processed + { + break; + } + if attempt > 100 { + panic!("Server did not report any increase in send_message calls.") + } + tokio::time::sleep(Duration::from_millis(20)).await; + } + tester.stop().expect("Stopping tester failed."); + } } diff --git a/test/rust_grpc/src/server.rs b/test/rust_grpc/src/server.rs index 8336fa7..ed219d8 100644 --- a/test/rust_grpc/src/server.rs +++ b/test/rust_grpc/src/server.rs @@ -1,3 +1,4 @@ +use atomic_counter::AtomicCounter; use clap::Parser; use rust_grpc_private::test_service_server::{TestService, TestServiceServer}; use rust_grpc_private::{ @@ -6,38 +7,48 @@ use rust_grpc_private::{ }; use std::net::SocketAddr; use std::thread; +use std::time::Instant; use tokio::sync::mpsc::UnboundedSender; use tokio::sync::watch; use tokio::sync::watch::{Receiver, Sender}; use tonic::transport::Server; use tonic::{Request, Response, Status}; -mod rust_grpc_private { +mod rust_grpc_private +{ tonic::include_proto!("rust_grpc"); } /// A simple gRPC Server for receiving messages #[derive(Parser, Debug)] #[command(author, version)] -struct Args { +struct Args +{ /// Network address to listen. #[arg(short, long, default_value = "[::1]:50051")] pub address: String, } /// A gRPC server ready to accept messages -pub struct GrpcServer { +pub struct GrpcServer +{ address: SocketAddr, server: Option>, stop: UnboundedSender<()>, } -impl GrpcServer { +impl GrpcServer +{ /// Starts a new gRPC server. - pub async fn start() -> Result> { + pub async fn start() -> Result> + { // Start the server in a separate tokio runtime to ensure its tasks won't interfere with the tests. - let address: SocketAddr = "[::1]:50051".parse()?; - let address_clone = address.clone(); + let address: SocketAddr = format!( + "[::1]:{}", + portpicker::pick_unused_port().expect("No TCP ports available.") + ) + .parse()?; + let address_clone = address; let (server_listening_send, mut server_listening_recv) = tokio::sync::mpsc::unbounded_channel(); let (stop_requested_send, mut stop_requested_recv) = tokio::sync::mpsc::unbounded_channel(); @@ -71,15 +82,17 @@ impl GrpcServer { } /// Gets the HTTP address of the server. - pub fn http(&self) -> String { + pub fn http(&self) -> String + { format!("http://{}", &self.address) } /// Stops the gRPC server. - pub fn stop(&mut self) -> Result<(), Box> { - let _ = self.stop.send(()); // Fails when called repeatedly as the channel gets dropped. + pub fn stop(&mut self) -> Result<(), Box> + { + let _ = self.stop.send(()); // Fails when called repeatedly as the channel gets dropped. if let Some(server) = self.server.take() { - if let Err(_) = server.join() { + if server.join().is_err() { return Err(Box::::from( "Waiting for the server to stop failed.", )); @@ -89,7 +102,8 @@ impl GrpcServer { } /// Pings the server and ensures it is listening. - async fn wait_for_server_to_listen(&self) -> Result<(), Box> { + async fn wait_for_server_to_listen(&self) -> Result<(), Box> + { // Try to establish connection to the server. const MAX_ATTEMPTS: u32 = 100; for attempt in 1.. { @@ -108,7 +122,7 @@ impl GrpcServer { }; match client.ping(PingRequest {}).await { Ok(_) => { - break; // A message was sent to the server. + break; // A message was sent to the server. } Err(_) if attempt < MAX_ATTEMPTS => { tokio::time::sleep(std::time::Duration::from_millis(20)).await; @@ -123,24 +137,37 @@ impl GrpcServer { } } -impl Drop for GrpcServer { - fn drop(&mut self) { +impl Drop for GrpcServer +{ + fn drop(&mut self) + { self.stop().expect("Dropping GrpcServer failed."); } } +/// A message sink for the message generator. /// -struct LocalTestService { +/// Collects statistics about the calls it receives. +struct LocalTestService +{ /// A watcher for acknowledging that the first "SendMessage" call has been received by the server. message_received_notify: Sender, /// A watcher for checking whether the server has received "SendMessage" call, message_received_check: Receiver, + + /// Timestamp when the service was created. + started: Instant, + + /// The number of send_message calls the service has received. + send_message_calls_served: atomic_counter::RelaxedCounter, } -impl LocalTestService { +impl LocalTestService +{ /// Spawns the test service in a new asynchronous task. - fn spawn(address: SocketAddr) -> Result<(), Box> { + fn spawn(address: SocketAddr) -> Result<(), Box> + { tokio::spawn(async move { LocalTestService::run(address) .await @@ -150,14 +177,19 @@ impl LocalTestService { } /// Launches the the test service. - async fn run(address: SocketAddr) -> Result<(), Box> { + async fn run(address: SocketAddr) -> Result<(), Box> + { let (tx, rx) = watch::channel(false); let service = LocalTestService { message_received_notify: tx, message_received_check: rx, + started: Instant::now(), + send_message_calls_served: atomic_counter::RelaxedCounter::new(0), }; + #[cfg(not(test))] println!("Test server listening on {}", address); Server::builder() + .concurrency_limit_per_connection(128) .add_service(TestServiceServer::new(service)) .serve_with_shutdown(address, async { tokio::signal::ctrl_c() @@ -170,42 +202,59 @@ impl LocalTestService { } } -impl Drop for LocalTestService { - fn drop(&mut self) { +impl Drop for LocalTestService +{ + fn drop(&mut self) + { let _ = self.message_received_notify.send(true); } } #[tonic::async_trait] -impl TestService for LocalTestService { +impl TestService for LocalTestService +{ async fn send_message( &self, _request: Request, - ) -> Result, Status> { + ) -> Result, Status> + { // Avoid unnecessary notifications to reduce CPU <-> CPU communication. self.message_received_notify .send_if_modified(|value: &mut bool| { + #[allow(clippy::bool_comparison)] if *value == false { *value = true; - return true; + true } else { - return false; + false } }); + self.send_message_calls_served.inc(); Ok(Response::new(SendMessageResponse {})) } async fn get_diagnostics( &self, _request: Request, - ) -> Result, Status> { - Err(Status::unimplemented("")) + ) -> Result, Status> + { + let duration = Instant::now().duration_since(self.started); + let duration = match prost_types::Duration::try_from(duration) { + Ok(d) => d, + Err(_) => return Err(Status::internal("Calculating server uptime failed.")), + }; + + Ok(Response::new(DiagnosticsResponse { + uptime: Some(duration), + send_message_calls: self.send_message_calls_served.get() as u64, + })) } async fn wait_for_first_message( &self, _request: Request, - ) -> Result, Status> { + ) -> Result, Status> + { self.message_received_check .clone() .wait_for(|value| value == &true) @@ -214,14 +263,16 @@ impl TestService for LocalTestService { Ok(Response::new(WaitForFirstMessageResponse {})) } - async fn ping(&self, _request: Request) -> Result, Status> { + async fn ping(&self, _request: Request) -> Result, Status> + { Ok(Response::new(PingResponse {})) } } #[tokio::main] #[allow(dead_code)] -async fn main() -> Result<(), Box> { +async fn main() -> Result<(), Box> +{ let args = Args::parse(); LocalTestService::spawn(args.address.parse()?)?; tokio::signal::ctrl_c().await?;