Skip to content

Commit

Permalink
Support for parallel message generation and basic statistics
Browse files Browse the repository at this point in the history
The generator now takes "tasks" parameter which tells it how many tasks it should launch. The server counts the number of send_message calls it has processed and the generator periodically retrieves this value and reports how many such calls have been processed in one millisecond.

The goal of this is to enable performance assessment of the proxide proxy.

Includes linting support in the pipeline.
  • Loading branch information
Fluxie committed Dec 18, 2023
1 parent 09e4fb7 commit 1b21cc3
Show file tree
Hide file tree
Showing 8 changed files with 343 additions and 76 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ jobs:
- name: Run rustfmt
run: |
cargo fmt --all -- --check
cargo fmt --all --manifest-path ./test/rust_grpc/Cargo.toml -- --check
- name: Run clippy
run: |
cargo clippy --all -- -D warnings
cargo clippy --all --manifest-path ./test/rust_grpc/Cargo.toml -- -D warnings
18 changes: 18 additions & 0 deletions test/rust_grpc/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 7 additions & 1 deletion test/rust_grpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,15 @@ path="src/server.rs"
[dependencies]
tonic = "0.10.2"
prost = "0.12.3"
prost-types = "0.12.3"
tokio = { version = "1.35.0", features = ["macros", "rt-multi-thread"] }
tokio-shutdown = "0.1.4"
clap = { version= "4.4.11", features = ["derive"] }
portpicker = "0.1.1"
atomic-counter = "1.0.1"

[build-dependencies]
tonic-build = { version = "0.10.2", features = ["prost"] }
tonic-build = { version = "0.10.2", features = ["prost"] }

[profile.release]
debug = true
3 changes: 2 additions & 1 deletion test/rust_grpc/build.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
fn main() -> Result<(), Box<dyn std::error::Error>> {
fn main() -> Result<(), Box<dyn std::error::Error>>
{
tonic_build::compile_protos("proto/rust_grpc.proto")?;
Ok(())
}
8 changes: 8 additions & 0 deletions test/rust_grpc/proto/rust_grpc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ option java_multiple_files = true;
option java_package = "io.grpc.examples.rust_grpc";
option java_outer_classname = "RustGrpcProto";

import "google/protobuf/duration.proto";

/// Test server
service TestService {

Expand All @@ -49,6 +51,12 @@ message DiagnosticsRequest {
}

message DiagnosticsResponse {

/// Uptime of the server.
google.protobuf.Duration uptime = 1;

/// Number of times SendMessage has been called since the server was started.
uint64 send_message_calls = 2;
}

message WaitForFirstMessageRequest {
Expand Down
130 changes: 99 additions & 31 deletions test/rust_grpc/src/generator.rs
Original file line number Diff line number Diff line change
@@ -1,36 +1,48 @@
use clap::Parser;
use clap::{arg, Parser};
use rust_grpc_private::DiagnosticsRequest;
use rust_grpc_private::{SendMessageRequest, WaitForFirstMessageRequest};
use std::thread;
use std::{thread, time};
use tokio::sync::mpsc::UnboundedSender;
use tokio::task::JoinSet;
use tonic::transport::Channel;

mod rust_grpc_private {
mod rust_grpc_private
{
tonic::include_proto!("rust_grpc");
}

/// Simple program to greet a person.
#[derive(Parser, Debug)]
#[derive(Parser, Debug, Clone)]
#[command(author, version)]
pub struct Args {
pub struct Args
{
/// Name of the person to greet.
#[arg(short, long, default_value = "http://[::1]:50051")]
pub address: String,

/// Period / delay between the messages sent to the server.
#[arg(short, long, value_parser = parse_period)]
pub period: std::time::Duration,

/// The number of asynchronous tasks used to send the messages in parallel.
#[arg(short, long, default_value_t = 1)]
pub tasks: u16,
}

/// A gRPC message generator that periodically sends messages to the target server.
pub struct GrpcGenerator {
pub struct GrpcGenerator
{
generator: Option<thread::JoinHandle<()>>,
stop: UnboundedSender<()>,
}

impl GrpcGenerator {
impl GrpcGenerator
{
/// Stars a new gRPC generator.
pub async fn start(address: &str) -> Result<Self, Box<dyn std::error::Error>> {
pub async fn start(args: Args) -> Result<Self, Box<dyn std::error::Error>>
{
// Start the generator in a separate tokio runtime to ensure its tasks won't interfere with the tests.
let address_clone = address.to_string();
let address_clone = args.address.to_string();
let (generator_started_send, mut generator_started_recv) =
tokio::sync::mpsc::unbounded_channel();
let (stop_requested_send, mut stop_requested_recv) = tokio::sync::mpsc::unbounded_channel();
Expand All @@ -42,8 +54,9 @@ impl GrpcGenerator {
.expect("Starting runtime for message generator failed.");
rt.block_on(async move {
spawn(Args {
address: address_clone,
period: std::time::Duration::from_millis(100),
address: args.address,
period: args.period,
tasks: args.tasks,
})
.expect("Starting generator failed.");
generator_started_send
Expand All @@ -60,10 +73,9 @@ impl GrpcGenerator {
// Wait for the first message to reach the server.
// This improve the robustness of the tests utilizing the generator as the environment is guaranteed to work after this.
{
let mut client = rust_grpc_private::test_service_client::TestServiceClient::connect(
address.to_string(),
)
.await?;
let mut client =
rust_grpc_private::test_service_client::TestServiceClient::connect(address_clone)
.await?;
let _ = client
.wait_for_first_message(WaitForFirstMessageRequest {})
.await;
Expand All @@ -76,10 +88,11 @@ impl GrpcGenerator {
}

/// Stops the gRPC message generation.
pub fn stop(&mut self) -> Result<(), Box<dyn std::error::Error>> {
pub fn stop(&mut self) -> Result<(), Box<dyn std::error::Error>>
{
let _ = self.stop.send(()); // Fails when called repeatedly as the channel gets dropped.
if let Some(generator) = self.generator.take() {
if let Err(_) = generator.join() {
if generator.join().is_err() {
return Err(Box::<dyn std::error::Error + Send + Sync>::from(
"Waiting for the generator to stop failed.",
));
Expand All @@ -89,14 +102,17 @@ impl GrpcGenerator {
}
}

impl Drop for GrpcGenerator {
fn drop(&mut self) {
impl Drop for GrpcGenerator
{
fn drop(&mut self)
{
self.stop().expect("Dropping the generator failed. ");
}
}

/// Spawns a new asynchronous message generation tasks.
fn spawn(args: Args) -> Result<(), Box<dyn std::error::Error>> {
fn spawn(args: Args) -> Result<(), Box<dyn std::error::Error + Send + Sync>>
{
tokio::spawn(async move {
generate_messages(args)
.await
Expand All @@ -106,36 +122,88 @@ fn spawn(args: Args) -> Result<(), Box<dyn std::error::Error>> {
}

/// Starts sending messages to the server,
async fn generate_messages(args: Args) -> Result<(), Box<dyn std::error::Error>> {
let mut client =
rust_grpc_private::test_service_client::TestServiceClient::connect(args.address).await?;
async fn generate_messages(args: Args) -> Result<(), Box<dyn std::error::Error>>
{
// Start the requested number of tasks.
// Each task is given a unique client as the generator did not scale properly when the channel was shared
// between the clients. The number of requests sent to the peaked at around <6 tasks. (Very rough approximation.)
// TODO: Investigate further.
let mut tasks: JoinSet<Result<(), Box<dyn std::error::Error + Send + Sync>>> = JoinSet::new();
for _t in 0..args.tasks {
let client = rust_grpc_private::test_service_client::TestServiceClient::connect(
args.address.to_string(),
)
.await?;
tasks.spawn(async move {
generate_messages_task(client, args.period).await?;
Ok(())
});
}
while let Some(result) = tasks.join_next().await {
match result {
Ok(_) => {}
Err(error) if error.is_cancelled() => {}
Err(error) => return Err(Box::new(error)),
}
}

Ok(())
}

/// An asynchronous function which sends messages to the server.
async fn generate_messages_task(
mut client: rust_grpc_private::test_service_client::TestServiceClient<Channel>,
period: std::time::Duration,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>>
{
loop {
let request = tonic::Request::new(SendMessageRequest {});

tokio::select! {
chosen = client.send_message(request) => { chosen?; },
_chosen = tokio::signal::ctrl_c() => { break; }
}

if args.period.is_zero() == false {
tokio::time::sleep(args.period).await;
#[allow(clippy::bool_comparison)]
if period.is_zero() == false {
tokio::time::sleep(period).await;
}
}

Ok(())
}

#[tokio::main]
#[allow(dead_code)]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
spawn(Args::parse())?;
tokio::signal::ctrl_c().await?;
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>>
{
let args = Args::parse();
spawn(args.clone())?;
tokio::select! {
result = report_statistics( args ) => result?,
result = tokio::signal::ctrl_c() => result?,
}
Ok(())
}

/// Reads period from the command line and converts it into duration.
fn parse_period(arg: &str) -> Result<std::time::Duration, std::num::ParseIntError> {
fn parse_period(arg: &str) -> Result<std::time::Duration, std::num::ParseIntError>
{
let seconds = arg.parse()?;
Ok(std::time::Duration::from_millis(seconds))
}

/// Reports server statistics to the console.
async fn report_statistics(args: Args) -> Result<(), Box<dyn std::error::Error + Send + Sync>>
{
let mut client =
rust_grpc_private::test_service_client::TestServiceClient::connect(args.address).await?;

loop {
let response = client.get_diagnostics(DiagnosticsRequest {}).await?;
let diagnostics = response.get_ref();
let server_uptime = time::Duration::try_from(diagnostics.uptime.clone().unwrap())?;
let call_rate = diagnostics.send_message_calls as u128 / server_uptime.as_millis();
println!("Call rate: {} calls / ms", call_rate);

tokio::time::sleep(time::Duration::from_secs(2)).await;
}
}
Loading

0 comments on commit 1b21cc3

Please sign in to comment.