Skip to content

Commit

Permalink
Merge pull request #30 from Fluxie/topic/mt-tester
Browse files Browse the repository at this point in the history
Support for parallel message generation and basic statistics
  • Loading branch information
Fluxie authored Dec 28, 2023
2 parents 206db1d + 1b21cc3 commit efb1abb
Show file tree
Hide file tree
Showing 8 changed files with 343 additions and 76 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ jobs:
- name: Run rustfmt
run: |
cargo fmt --all -- --check
cargo fmt --all --manifest-path ./test/rust_grpc/Cargo.toml -- --check
- name: Run clippy
run: |
cargo clippy --all -- -D warnings
cargo clippy --all --manifest-path ./test/rust_grpc/Cargo.toml -- -D warnings
18 changes: 18 additions & 0 deletions test/rust_grpc/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 7 additions & 1 deletion test/rust_grpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,15 @@ path="src/server.rs"
[dependencies]
tonic = "0.10.2"
prost = "0.12.3"
prost-types = "0.12.3"
tokio = { version = "1.35.0", features = ["macros", "rt-multi-thread"] }
tokio-shutdown = "0.1.4"
clap = { version= "4.4.11", features = ["derive"] }
portpicker = "0.1.1"
atomic-counter = "1.0.1"

[build-dependencies]
tonic-build = { version = "0.10.2", features = ["prost"] }
tonic-build = { version = "0.10.2", features = ["prost"] }

[profile.release]
debug = true
3 changes: 2 additions & 1 deletion test/rust_grpc/build.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
fn main() -> Result<(), Box<dyn std::error::Error>> {
fn main() -> Result<(), Box<dyn std::error::Error>>
{
tonic_build::compile_protos("proto/rust_grpc.proto")?;
Ok(())
}
8 changes: 8 additions & 0 deletions test/rust_grpc/proto/rust_grpc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ option java_multiple_files = true;
option java_package = "io.grpc.examples.rust_grpc";
option java_outer_classname = "RustGrpcProto";

import "google/protobuf/duration.proto";

/// Test server
service TestService {

Expand All @@ -49,6 +51,12 @@ message DiagnosticsRequest {
}

message DiagnosticsResponse {

/// Uptime of the server.
google.protobuf.Duration uptime = 1;

/// Number of times SendMessage has been called since the server was started.
uint64 send_message_calls = 2;
}

message WaitForFirstMessageRequest {
Expand Down
130 changes: 99 additions & 31 deletions test/rust_grpc/src/generator.rs
Original file line number Diff line number Diff line change
@@ -1,36 +1,48 @@
use clap::Parser;
use clap::{arg, Parser};
use rust_grpc_private::DiagnosticsRequest;
use rust_grpc_private::{SendMessageRequest, WaitForFirstMessageRequest};
use std::thread;
use std::{thread, time};
use tokio::sync::mpsc::UnboundedSender;
use tokio::task::JoinSet;
use tonic::transport::Channel;

mod rust_grpc_private {
mod rust_grpc_private
{
tonic::include_proto!("rust_grpc");
}

/// Simple program to greet a person.
#[derive(Parser, Debug)]
#[derive(Parser, Debug, Clone)]
#[command(author, version)]
pub struct Args {
pub struct Args
{
/// Name of the person to greet.
#[arg(short, long, default_value = "http://[::1]:50051")]
pub address: String,

/// Period / delay between the messages sent to the server.
#[arg(short, long, value_parser = parse_period)]
pub period: std::time::Duration,

/// The number of asynchronous tasks used to send the messages in parallel.
#[arg(short, long, default_value_t = 1)]
pub tasks: u16,
}

/// A gRPC message generator that periodically sends messages to the target server.
pub struct GrpcGenerator {
pub struct GrpcGenerator
{
generator: Option<thread::JoinHandle<()>>,
stop: UnboundedSender<()>,
}

impl GrpcGenerator {
impl GrpcGenerator
{
/// Stars a new gRPC generator.
pub async fn start(address: &str) -> Result<Self, Box<dyn std::error::Error>> {
pub async fn start(args: Args) -> Result<Self, Box<dyn std::error::Error>>
{
// Start the generator in a separate tokio runtime to ensure its tasks won't interfere with the tests.
let address_clone = address.to_string();
let address_clone = args.address.to_string();
let (generator_started_send, mut generator_started_recv) =
tokio::sync::mpsc::unbounded_channel();
let (stop_requested_send, mut stop_requested_recv) = tokio::sync::mpsc::unbounded_channel();
Expand All @@ -42,8 +54,9 @@ impl GrpcGenerator {
.expect("Starting runtime for message generator failed.");
rt.block_on(async move {
spawn(Args {
address: address_clone,
period: std::time::Duration::from_millis(100),
address: args.address,
period: args.period,
tasks: args.tasks,
})
.expect("Starting generator failed.");
generator_started_send
Expand All @@ -60,10 +73,9 @@ impl GrpcGenerator {
// Wait for the first message to reach the server.
// This improve the robustness of the tests utilizing the generator as the environment is guaranteed to work after this.
{
let mut client = rust_grpc_private::test_service_client::TestServiceClient::connect(
address.to_string(),
)
.await?;
let mut client =
rust_grpc_private::test_service_client::TestServiceClient::connect(address_clone)
.await?;
let _ = client
.wait_for_first_message(WaitForFirstMessageRequest {})
.await;
Expand All @@ -76,10 +88,11 @@ impl GrpcGenerator {
}

/// Stops the gRPC message generation.
pub fn stop(&mut self) -> Result<(), Box<dyn std::error::Error>> {
pub fn stop(&mut self) -> Result<(), Box<dyn std::error::Error>>
{
let _ = self.stop.send(()); // Fails when called repeatedly as the channel gets dropped.
if let Some(generator) = self.generator.take() {
if let Err(_) = generator.join() {
if generator.join().is_err() {
return Err(Box::<dyn std::error::Error + Send + Sync>::from(
"Waiting for the generator to stop failed.",
));
Expand All @@ -89,14 +102,17 @@ impl GrpcGenerator {
}
}

impl Drop for GrpcGenerator {
fn drop(&mut self) {
impl Drop for GrpcGenerator
{
fn drop(&mut self)
{
self.stop().expect("Dropping the generator failed. ");
}
}

/// Spawns a new asynchronous message generation tasks.
fn spawn(args: Args) -> Result<(), Box<dyn std::error::Error>> {
fn spawn(args: Args) -> Result<(), Box<dyn std::error::Error + Send + Sync>>
{
tokio::spawn(async move {
generate_messages(args)
.await
Expand All @@ -106,36 +122,88 @@ fn spawn(args: Args) -> Result<(), Box<dyn std::error::Error>> {
}

/// Starts sending messages to the server,
async fn generate_messages(args: Args) -> Result<(), Box<dyn std::error::Error>> {
let mut client =
rust_grpc_private::test_service_client::TestServiceClient::connect(args.address).await?;
async fn generate_messages(args: Args) -> Result<(), Box<dyn std::error::Error>>
{
// Start the requested number of tasks.
// Each task is given a unique client as the generator did not scale properly when the channel was shared
// between the clients. The number of requests sent to the peaked at around <6 tasks. (Very rough approximation.)
// TODO: Investigate further.
let mut tasks: JoinSet<Result<(), Box<dyn std::error::Error + Send + Sync>>> = JoinSet::new();
for _t in 0..args.tasks {
let client = rust_grpc_private::test_service_client::TestServiceClient::connect(
args.address.to_string(),
)
.await?;
tasks.spawn(async move {
generate_messages_task(client, args.period).await?;
Ok(())
});
}
while let Some(result) = tasks.join_next().await {
match result {
Ok(_) => {}
Err(error) if error.is_cancelled() => {}
Err(error) => return Err(Box::new(error)),
}
}

Ok(())
}

/// An asynchronous function which sends messages to the server.
async fn generate_messages_task(
mut client: rust_grpc_private::test_service_client::TestServiceClient<Channel>,
period: std::time::Duration,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>>
{
loop {
let request = tonic::Request::new(SendMessageRequest {});

tokio::select! {
chosen = client.send_message(request) => { chosen?; },
_chosen = tokio::signal::ctrl_c() => { break; }
}

if args.period.is_zero() == false {
tokio::time::sleep(args.period).await;
#[allow(clippy::bool_comparison)]
if period.is_zero() == false {
tokio::time::sleep(period).await;
}
}

Ok(())
}

#[tokio::main]
#[allow(dead_code)]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
spawn(Args::parse())?;
tokio::signal::ctrl_c().await?;
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>>
{
let args = Args::parse();
spawn(args.clone())?;
tokio::select! {
result = report_statistics( args ) => result?,
result = tokio::signal::ctrl_c() => result?,
}
Ok(())
}

/// Reads period from the command line and converts it into duration.
fn parse_period(arg: &str) -> Result<std::time::Duration, std::num::ParseIntError> {
fn parse_period(arg: &str) -> Result<std::time::Duration, std::num::ParseIntError>
{
let seconds = arg.parse()?;
Ok(std::time::Duration::from_millis(seconds))
}

/// Reports server statistics to the console.
async fn report_statistics(args: Args) -> Result<(), Box<dyn std::error::Error + Send + Sync>>
{
let mut client =
rust_grpc_private::test_service_client::TestServiceClient::connect(args.address).await?;

loop {
let response = client.get_diagnostics(DiagnosticsRequest {}).await?;
let diagnostics = response.get_ref();
let server_uptime = time::Duration::try_from(diagnostics.uptime.clone().unwrap())?;
let call_rate = diagnostics.send_message_calls as u128 / server_uptime.as_millis();
println!("Call rate: {} calls / ms", call_rate);

tokio::time::sleep(time::Duration::from_secs(2)).await;
}
}
Loading

0 comments on commit efb1abb

Please sign in to comment.