Skip to content

Commit

Permalink
Generator now adds client thread and process ids headers
Browse files Browse the repository at this point in the history
This step prepares for the client stack capture support implementation for the proxide proxy. The goal is for proxide to capture the callstacks of the threads of client process when the traffic from the client goes through the proxide. This enables analysis of the client application behavior when unexpected calls pass through the proxy. Functionality will be available when the proxide proxy is on the same host machine as the client.
  • Loading branch information
Fluxie committed Dec 20, 2023
1 parent bec7baf commit 0ef3c7b
Show file tree
Hide file tree
Showing 7 changed files with 290 additions and 7 deletions.
75 changes: 74 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

79 changes: 79 additions & 0 deletions test/rust_grpc/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 10 additions & 0 deletions test/rust_grpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,16 @@ tokio-shutdown = "0.1.4"
clap = { version= "4.4.11", features = ["derive"] }
portpicker = "0.1.1"
atomic-counter = "1.0.1"
os-id = "3.0.1"
crossbeam-skiplist = "0.1.1"

[dependencies.windows]
version = "0.52.0"
features = [
"Win32_Foundation",
"Win32_System_Threading",
]


[build-dependencies]
tonic-build = { version = "0.10.2", features = ["prost"] }
Expand Down
13 changes: 13 additions & 0 deletions test/rust_grpc/proto/rust_grpc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,26 @@ message SendMessageResponse {
message DiagnosticsRequest {
}


message ClientProcess {

/// Identifies the process in the system.
uint32 id = 1;

/// Identifies the threads that have called the server from this process.
repeated uint64 threads = 2;
}

message DiagnosticsResponse {

/// Uptime of the server.
google.protobuf.Duration uptime = 1;

/// Number of times SendMessage has been called since the server was started.
uint64 send_message_calls = 2;

/// Information about the clients that have called the server.
repeated ClientProcess clients = 3;
}

message WaitForFirstMessageRequest {
Expand Down
34 changes: 32 additions & 2 deletions test/rust_grpc/src/generator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@ use rust_grpc_private::{SendMessageRequest, WaitForFirstMessageRequest};
use std::{thread, time};
use tokio::sync::mpsc::UnboundedSender;
use tokio::task::JoinSet;
use tonic::metadata::MetadataValue;
use tonic::transport::Channel;
#[cfg(target_os = "windows")]
use windows;

mod rust_grpc_private
{
Expand Down Expand Up @@ -157,7 +160,15 @@ async fn generate_messages_task(
) -> Result<(), Box<dyn std::error::Error + Send + Sync>>
{
loop {
let request = tonic::Request::new(SendMessageRequest {});
let mut request = tonic::Request::new(SendMessageRequest {});
request.metadata_mut().append(
"proxide-client-process-id",
MetadataValue::from(std::process::id()),
);
request.metadata_mut().append(
"proxide-client-thread-id",
MetadataValue::from(get_current_native_thread_id()),
);
tokio::select! {
chosen = client.send_message(request) => { chosen?; },
_chosen = tokio::signal::ctrl_c() => { break; }
Expand Down Expand Up @@ -202,8 +213,27 @@ async fn report_statistics(args: Args) -> Result<(), Box<dyn std::error::Error +
let diagnostics = response.get_ref();
let server_uptime = time::Duration::try_from(diagnostics.uptime.clone().unwrap())?;
let call_rate = diagnostics.send_message_calls as u128 / server_uptime.as_millis();
println!("Call rate: {} calls / ms", call_rate);
println!(
"Call rate: {} calls / ms, processes: {} with {} threads",
call_rate,
diagnostics.clients.len(),
diagnostics
.clients
.iter()
.map(|c| c.threads.len() as u64)
.sum::<u64>()
);

tokio::time::sleep(time::Duration::from_secs(2)).await;
}
}

/// Gets the current native thread id.
pub fn get_current_native_thread_id() -> i64
{
#[cfg(not(target_os = "windows"))]
return os_id::thread::get_raw_id() as i64;

#[cfg(target_os = "windows")]
unsafe {return windows::Win32::System::Threading::GetCurrentThreadId() as i64; }
}
Loading

0 comments on commit 0ef3c7b

Please sign in to comment.