diff --git a/Cargo.lock b/Cargo.lock index 76c14f3..5a1c41f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -398,6 +398,39 @@ dependencies = [ "libc", ] +[[package]] +name = "crossbeam-epoch" +version = "0.9.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2d2fe95351b870527a5d09bf563ed3c97c0cffb87cf1c78a591bf48bb218d9aa" +dependencies = [ + "autocfg", + "cfg-if", + "crossbeam-utils", + "memoffset", +] + +[[package]] +name = "crossbeam-skiplist" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "883a5821d7d079fcf34ac55f27a833ee61678110f6b97637cc74513c0d0b42fc" +dependencies = [ + "cfg-if", + "crossbeam-epoch", + "crossbeam-utils", + "scopeguard", +] + +[[package]] +name = "crossbeam-utils" +version = "0.8.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c06d96137f14f244c37f989d9fff8f95e6c18b918e71f36638f8c49112e4c78f" +dependencies = [ + "cfg-if", +] + [[package]] name = "crossterm" version = "0.25.0" @@ -670,6 +703,8 @@ version = "0.1.0" dependencies = [ "atomic-counter", "clap 4.4.11", + "crossbeam-skiplist", + "os-id", "portpicker", "prost", "prost-types", @@ -677,6 +712,7 @@ dependencies = [ "tokio-shutdown", "tonic", "tonic-build", + "windows", ] [[package]] @@ -831,7 +867,7 @@ dependencies = [ "iana-time-zone-haiku", "js-sys", "wasm-bindgen", - "windows-core", + "windows-core 0.51.1", ] [[package]] @@ -933,6 +969,15 @@ version = "2.6.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f665ee40bc4a3c5590afb1e9677db74a508659dfd71e126420da8274909a0167" +[[package]] +name = "memoffset" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a634b1c61a95585bd15607c6ab0c4e5b226e695ff2800ba0cdccddf208c406c" +dependencies = [ + "autocfg", +] + [[package]] name = "mime" version = "0.3.17" @@ -1057,6 +1102,15 @@ version = "1.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92" +[[package]] +name = "os-id" +version = "3.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "510856ec55c552d86db0d675df95c32b87f28cfe1cdc47d3eba2342c39a0a5f6" +dependencies = [ + "libc", +] + [[package]] name = "os_str_bytes" version = "6.6.1" @@ -2214,6 +2268,16 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" +[[package]] +name = "windows" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e48a53791691ab099e5e2ad123536d0fff50652600abaf43bbf952894110d0be" +dependencies = [ + "windows-core 0.52.0", + "windows-targets 0.52.0", +] + [[package]] name = "windows-core" version = "0.51.1" @@ -2223,6 +2287,15 @@ dependencies = [ "windows-targets 0.48.5", ] +[[package]] +name = "windows-core" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "33ab640c8d7e35bf8ba19b884ba838ceb4fba93a4e8c65a9059d08afcfc683d9" +dependencies = [ + "windows-targets 0.52.0", +] + [[package]] name = "windows-sys" version = "0.48.0" diff --git a/test/rust_grpc/Cargo.lock b/test/rust_grpc/Cargo.lock index b65b8e6..a3ec707 100644 --- a/test/rust_grpc/Cargo.lock +++ b/test/rust_grpc/Cargo.lock @@ -270,6 +270,39 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "acbf1af155f9b9ef647e42cdc158db4b64a1b61f743629225fde6f3e0be2a7c7" +[[package]] +name = "crossbeam-epoch" +version = "0.9.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2d2fe95351b870527a5d09bf563ed3c97c0cffb87cf1c78a591bf48bb218d9aa" +dependencies = [ + "autocfg", + "cfg-if", + "crossbeam-utils", + "memoffset", +] + +[[package]] +name = "crossbeam-skiplist" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "883a5821d7d079fcf34ac55f27a833ee61678110f6b97637cc74513c0d0b42fc" +dependencies = [ + "cfg-if", + "crossbeam-epoch", + "crossbeam-utils", + "scopeguard", +] + +[[package]] +name = "crossbeam-utils" +version = "0.8.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c06d96137f14f244c37f989d9fff8f95e6c18b918e71f36638f8c49112e4c78f" +dependencies = [ + "cfg-if", +] + [[package]] name = "either" version = "1.9.0" @@ -372,6 +405,8 @@ version = "0.1.0" dependencies = [ "atomic-counter", "clap", + "crossbeam-skiplist", + "os-id", "portpicker", "prost", "prost-types", @@ -379,6 +414,7 @@ dependencies = [ "tokio-shutdown", "tonic", "tonic-build", + "windows", ] [[package]] @@ -568,6 +604,15 @@ version = "2.6.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f665ee40bc4a3c5590afb1e9677db74a508659dfd71e126420da8274909a0167" +[[package]] +name = "memoffset" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a634b1c61a95585bd15607c6ab0c4e5b226e695ff2800ba0cdccddf208c406c" +dependencies = [ + "autocfg", +] + [[package]] name = "mime" version = "0.3.17" @@ -625,6 +670,15 @@ version = "1.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92" +[[package]] +name = "os-id" +version = "3.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "510856ec55c552d86db0d675df95c32b87f28cfe1cdc47d3eba2342c39a0a5f6" +dependencies = [ + "libc", +] + [[package]] name = "percent-encoding" version = "2.3.1" @@ -863,6 +917,12 @@ version = "1.0.14" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7ffc183a10b4478d04cbbbfc96d0873219d962dd5accaff2ffbd4ceb7df837f4" +[[package]] +name = "scopeguard" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" + [[package]] name = "serde" version = "1.0.193" @@ -1201,6 +1261,25 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" +[[package]] +name = "windows" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e48a53791691ab099e5e2ad123536d0fff50652600abaf43bbf952894110d0be" +dependencies = [ + "windows-core", + "windows-targets 0.52.0", +] + +[[package]] +name = "windows-core" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "33ab640c8d7e35bf8ba19b884ba838ceb4fba93a4e8c65a9059d08afcfc683d9" +dependencies = [ + "windows-targets 0.52.0", +] + [[package]] name = "windows-sys" version = "0.48.0" diff --git a/test/rust_grpc/Cargo.toml b/test/rust_grpc/Cargo.toml index 8f93f09..80387a6 100644 --- a/test/rust_grpc/Cargo.toml +++ b/test/rust_grpc/Cargo.toml @@ -25,6 +25,16 @@ tokio-shutdown = "0.1.4" clap = { version= "4.4.11", features = ["derive"] } portpicker = "0.1.1" atomic-counter = "1.0.1" +os-id = "3.0.1" +crossbeam-skiplist = "0.1.1" + +[dependencies.windows] +version = "0.52.0" +features = [ + "Win32_Foundation", + "Win32_System_Threading", +] + [build-dependencies] tonic-build = { version = "0.10.2", features = ["prost"] } diff --git a/test/rust_grpc/proto/rust_grpc.proto b/test/rust_grpc/proto/rust_grpc.proto index ba7457c..74cb944 100644 --- a/test/rust_grpc/proto/rust_grpc.proto +++ b/test/rust_grpc/proto/rust_grpc.proto @@ -50,6 +50,16 @@ message SendMessageResponse { message DiagnosticsRequest { } + +message ClientProcess { + + /// Identifies the process in the system. + uint32 id = 1; + + /// Identifies the threads that have called the server from this process. + repeated uint64 threads = 2; +} + message DiagnosticsResponse { /// Uptime of the server. @@ -57,6 +67,9 @@ message DiagnosticsResponse { /// Number of times SendMessage has been called since the server was started. uint64 send_message_calls = 2; + + /// Information about the clients that have called the server. + repeated ClientProcess clients = 3; } message WaitForFirstMessageRequest { diff --git a/test/rust_grpc/src/generator.rs b/test/rust_grpc/src/generator.rs index 7bbc2f2..fa3d26d 100644 --- a/test/rust_grpc/src/generator.rs +++ b/test/rust_grpc/src/generator.rs @@ -4,7 +4,10 @@ use rust_grpc_private::{SendMessageRequest, WaitForFirstMessageRequest}; use std::{thread, time}; use tokio::sync::mpsc::UnboundedSender; use tokio::task::JoinSet; +use tonic::metadata::MetadataValue; use tonic::transport::Channel; +#[cfg(target_os = "windows")] +use windows; mod rust_grpc_private { @@ -157,7 +160,15 @@ async fn generate_messages_task( ) -> Result<(), Box> { loop { - let request = tonic::Request::new(SendMessageRequest {}); + let mut request = tonic::Request::new(SendMessageRequest {}); + request.metadata_mut().append( + "proxide-client-process-id", + MetadataValue::from(std::process::id()), + ); + request.metadata_mut().append( + "proxide-client-thread-id", + MetadataValue::from(get_current_native_thread_id()), + ); tokio::select! { chosen = client.send_message(request) => { chosen?; }, _chosen = tokio::signal::ctrl_c() => { break; } @@ -202,8 +213,27 @@ async fn report_statistics(args: Args) -> Result<(), Box() + ); tokio::time::sleep(time::Duration::from_secs(2)).await; } } + +/// Gets the current native thread id. +pub fn get_current_native_thread_id() -> i64 +{ + #[cfg(not(target_os = "windows"))] + return os_id::thread::get_raw_id() as i64; + + #[cfg(target_os = "windows")] + unsafe {return windows::Win32::System::Threading::GetCurrentThreadId() as i64; } +} diff --git a/test/rust_grpc/src/lib.rs b/test/rust_grpc/src/lib.rs index 35e4bc6..9e6dad5 100644 --- a/test/rust_grpc/src/lib.rs +++ b/test/rust_grpc/src/lib.rs @@ -1,6 +1,6 @@ use crate::server::GrpcServer; pub use rust_grpc::{ - DiagnosticsRequest, DiagnosticsResponse, SendMessageRequest, SendMessageResponse, + ClientProcess, DiagnosticsRequest, DiagnosticsResponse, SendMessageRequest, SendMessageResponse, }; use std::time; use std::time::Duration; @@ -31,6 +31,9 @@ pub struct Statistics /// Number of "SendMessage" calls the tester has processed. pub send_message_calls_processed: u64, + + /// Information about the clients that have contacted the server. + pub clients: Vec, } pub struct GrpcTester @@ -110,6 +113,15 @@ impl GrpcTester Ok(Statistics { server_uptime: time::Duration::try_from(diagnostics.uptime.clone().unwrap())?, send_message_calls_processed: diagnostics.send_message_calls, + clients: diagnostics + .clients + .clone() + .into_iter() + .map(|c| ClientProcess { + id: c.id, + threads: c.threads, + }) + .collect(), }) } @@ -195,4 +207,25 @@ mod test } tester.stop().expect("Stopping tester failed."); } + + #[tokio::test] + async fn server_collects_generator_thread_id() + { + // Ensure the generator sends messages constantly to minimize the test duration. + let tester = GrpcTester::pipe_with_args(Args { + tasks: 1, + period: Duration::from_secs(0), + }) + .await + .expect("Starting tester failed."); + + // The server should have now received the first send_message call as the tester waits for it before returning. + let statistics = tester + .get_statistics() + .await + .expect("Retrieving statistics failed."); + assert_eq!(statistics.clients.len(), 1); + assert_eq!(statistics.clients[0].id, std::process::id()); + assert!(statistics.clients[0].threads.len() > 0); + } } diff --git a/test/rust_grpc/src/server.rs b/test/rust_grpc/src/server.rs index 3575569..e605bbb 100644 --- a/test/rust_grpc/src/server.rs +++ b/test/rust_grpc/src/server.rs @@ -2,15 +2,18 @@ use atomic_counter::AtomicCounter; use clap::Parser; use rust_grpc_private::test_service_server::{TestService, TestServiceServer}; use rust_grpc_private::{ - DiagnosticsRequest, DiagnosticsResponse, PingRequest, PingResponse, SendMessageRequest, - SendMessageResponse, WaitForFirstMessageRequest, WaitForFirstMessageResponse, + ClientProcess, DiagnosticsRequest, DiagnosticsResponse, PingRequest, PingResponse, + SendMessageRequest, SendMessageResponse, WaitForFirstMessageRequest, + WaitForFirstMessageResponse, }; use std::net::SocketAddr; +use std::str::FromStr; use std::thread; use std::time::Instant; use tokio::sync::mpsc::UnboundedSender; use tokio::sync::watch; use tokio::sync::watch::{Receiver, Sender}; +use tonic::metadata::{Ascii, MetadataValue}; use tonic::transport::Server; use tonic::{Request, Response, Status}; @@ -167,6 +170,10 @@ struct LocalTestService /// The number of send_message calls the service has received. send_message_calls_served: atomic_counter::RelaxedCounter, + + /// Collection of client processes and threads as reported with + /// the proxide-client-process-id and proxide-client-thread-id headers + clients: crossbeam_skiplist::SkipMap>, } impl LocalTestService @@ -191,6 +198,7 @@ impl LocalTestService message_received_check: rx, started: Instant::now(), send_message_calls_served: atomic_counter::RelaxedCounter::new(0), + clients: crossbeam_skiplist::SkipMap::new(), }; #[cfg(not(test))] println!("Test server listening on {}", address); @@ -221,7 +229,7 @@ impl TestService for LocalTestService { async fn send_message( &self, - _request: Request, + request: Request, ) -> Result, Status> { // Avoid unnecessary notifications to reduce CPU <-> CPU communication. @@ -236,6 +244,19 @@ impl TestService for LocalTestService } }); self.send_message_calls_served.inc(); + + // Collect the client info. + let process_id = request.metadata().get("proxide-client-process-id"); + let thread_id = request.metadata().get("proxide-client-thread-id"); + if process_id.is_some() && thread_id.is_some() { + let threads = self.clients.get_or_insert( + number_from_client(process_id.unwrap())?, + crossbeam_skiplist::SkipSet::new(), + ); + threads + .value() + .insert(number_from_client(thread_id.unwrap())?); + } Ok(Response::new(SendMessageResponse {})) } @@ -250,9 +271,19 @@ impl TestService for LocalTestService Err(_) => return Err(Status::internal("Calculating server uptime failed.")), }; + let clients = self + .clients + .iter() + .map(|c| ClientProcess { + id: *c.key(), + threads: c.value().iter().map(|e| *e.value()).collect(), + }) + .collect(); + Ok(Response::new(DiagnosticsResponse { uptime: Some(duration), send_message_calls: self.send_message_calls_served.get() as u64, + clients, })) } @@ -285,3 +316,17 @@ async fn main() -> Result<(), Box> tokio::signal::ctrl_c().await?; Ok(()) } + +fn number_from_client(value: &MetadataValue) -> Result +where + N: FromStr, +{ + let value = match value.to_str() { + Ok(v) => v, + Err(_) => return Err(Status::invalid_argument("Header was not a string.")), + }; + match N::from_str(value) { + Ok(numbert) => Ok(numbert), + Err(_) => Err(Status::invalid_argument("Expected number")), + } +}