From a4dbe62cfb5c0fdb8e02b75b6078bc43e0044c65 Mon Sep 17 00:00:00 2001 From: Zak Stucke Date: Tue, 21 May 2024 14:57:43 +0300 Subject: [PATCH] Rust metric instrumentation helper for system and current process. --- .zetch.lock | 23 +- opencollector.yaml | 2 +- rust/Cargo.lock | 64 ++++ rust/Cargo.toml | 4 + rust/bitbazaar/log/global_log/global_fns.rs | 17 +- rust/bitbazaar/log/mod.rs | 25 ++ .../log/system_and_process_metrics.rs | 321 ++++++++++++++++++ 7 files changed, 440 insertions(+), 16 deletions(-) create mode 100644 rust/bitbazaar/log/system_and_process_metrics.rs diff --git a/.zetch.lock b/.zetch.lock index 8bf37fb8..cae5661e 100644 --- a/.zetch.lock +++ b/.zetch.lock @@ -1,23 +1,24 @@ { "version": "0.0.10", "files": { + "CODE_OF_CONDUCT.zetch.md": "bf106326ffc75f5167cfde27c997c77c6b97c843a9e392b564355d0e70e50b97", + "rust/LICENSE.zetch.md": "d2c12e539d357957b950a54a5477c3a9f87bd2b3ee707be7a4db7adaf5aacc2b", + "rust/pkg/LICENSE.zetch.md": "d2c12e539d357957b950a54a5477c3a9f87bd2b3ee707be7a4db7adaf5aacc2b", + "js/tsconfig.zetch.json": "fb5d57b825bb3c2f6dd4254bf939f2444e52946622a7f93b91e3acb75876ebbc", "py/README.zetch.md": "9c98c0dccb947318f46cd8f5319143e8481fbf869225a9111574d690d0fb9daf", - "LICENSE.zetch.md": "d2c12e539d357957b950a54a5477c3a9f87bd2b3ee707be7a4db7adaf5aacc2b", + "opencollector.yaml.zetch": "33aa6a1e9e386b9e01c3d4c010b70260654c06850fb1433afb15545bd85f2f2d", "README.zetch.md": "04b4682145ff67da8e6e9f16289a77ff76f45d7fa9373b891c664048c07a5cd0", - "docs/LICENSE.zetch.md": "d2c12e539d357957b950a54a5477c3a9f87bd2b3ee707be7a4db7adaf5aacc2b", - "js/README.zetch.md": "9c98c0dccb947318f46cd8f5319143e8481fbf869225a9111574d690d0fb9daf", - "docs/index.zetch.md": "9c98c0dccb947318f46cd8f5319143e8481fbf869225a9111574d690d0fb9daf", + "LICENSE.zetch.md": "d2c12e539d357957b950a54a5477c3a9f87bd2b3ee707be7a4db7adaf5aacc2b", "py_rust/README.zetch.md": "9c98c0dccb947318f46cd8f5319143e8481fbf869225a9111574d690d0fb9daf", "rust/README.zetch.md": "9c98c0dccb947318f46cd8f5319143e8481fbf869225a9111574d690d0fb9daf", - "opencollector.yaml.zetch": "678a691ae64d7f9893e8799ea657842fe051b3fcce4da568969d8de070a29393", - "CODE_OF_CONDUCT.zetch.md": "bf106326ffc75f5167cfde27c997c77c6b97c843a9e392b564355d0e70e50b97", - "py/LICENSE.zetch.md": "d2c12e539d357957b950a54a5477c3a9f87bd2b3ee707be7a4db7adaf5aacc2b", - "rust/LICENSE.zetch.md": "d2c12e539d357957b950a54a5477c3a9f87bd2b3ee707be7a4db7adaf5aacc2b", "docs/CONTRIBUTING.zetch.md": "bace46dc064746b54cf472eba960d934d705c2f83120b865a4b47032ff1552c5", - "CONTRIBUTING.zetch.md": "bace46dc064746b54cf472eba960d934d705c2f83120b865a4b47032ff1552c5", "docs/CODE_OF_CONDUCT.zetch.md": "bf106326ffc75f5167cfde27c997c77c6b97c843a9e392b564355d0e70e50b97", + "js/README.zetch.md": "9c98c0dccb947318f46cd8f5319143e8481fbf869225a9111574d690d0fb9daf", + "CONTRIBUTING.zetch.md": "bace46dc064746b54cf472eba960d934d705c2f83120b865a4b47032ff1552c5", + "js/LICENSE.zetch.md": "d2c12e539d357957b950a54a5477c3a9f87bd2b3ee707be7a4db7adaf5aacc2b", + "py/LICENSE.zetch.md": "d2c12e539d357957b950a54a5477c3a9f87bd2b3ee707be7a4db7adaf5aacc2b", "py_rust/LICENSE.zetch.md": "d2c12e539d357957b950a54a5477c3a9f87bd2b3ee707be7a4db7adaf5aacc2b", - "js/tsconfig.zetch.json": "fb5d57b825bb3c2f6dd4254bf939f2444e52946622a7f93b91e3acb75876ebbc", - "js/LICENSE.zetch.md": "d2c12e539d357957b950a54a5477c3a9f87bd2b3ee707be7a4db7adaf5aacc2b" + "docs/index.zetch.md": "9c98c0dccb947318f46cd8f5319143e8481fbf869225a9111574d690d0fb9daf", + "docs/LICENSE.zetch.md": "d2c12e539d357957b950a54a5477c3a9f87bd2b3ee707be7a4db7adaf5aacc2b" } } \ No newline at end of file diff --git a/opencollector.yaml b/opencollector.yaml index 2024bff1..7ee2e638 100644 --- a/opencollector.yaml +++ b/opencollector.yaml @@ -32,7 +32,7 @@ exporters: stream-name: default # Writes all opentelemetry logs, traces, metrics to a file, useful for testing: file/debug_file_writing: - path: /home/runner/work/bitbazaar/bitbazaar/logs/otlp_telemetry_out.log + path: /Users/zak/z/code/bitbazaar/logs/otlp_telemetry_out.log rotation: max_megabytes: 10 max_days: 3 diff --git a/rust/Cargo.lock b/rust/Cargo.lock index c8d68c91..ad6804eb 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -179,6 +179,7 @@ dependencies = [ "serde_json", "sha1_smol", "strum", + "sysinfo", "tempfile", "time", "tokio", @@ -325,6 +326,25 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "crossbeam-deque" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "613f8cc01fe9cf1a3eb3d7f488fd2fa8388403e97039e2f73692932e291a770d" +dependencies = [ + "crossbeam-epoch", + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-epoch" +version = "0.9.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b82ac4a3c2ca9c3460964f020e1402edd5753411d7737aa39c3714ad1b5420e" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "crossbeam-utils" version = "0.8.19" @@ -1086,6 +1106,15 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "ntapi" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e8a3895c6391c39d7fe7ebc444a87eb2991b2a0bc718fdabd071eec617fc68e4" +dependencies = [ + "winapi", +] + [[package]] name = "nu-ansi-term" version = "0.46.0" @@ -1434,6 +1463,26 @@ dependencies = [ "getrandom", ] +[[package]] +name = "rayon" +version = "1.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b418a60154510ca1a002a752ca9714984e21e4241e804d32555251faf8b78ffa" +dependencies = [ + "either", + "rayon-core", +] + +[[package]] +name = "rayon-core" +version = "1.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1465873a3dfdaa8ae7cb14b4383657caab0b3e8a0aa9ae8e04b044854c8dfce2" +dependencies = [ + "crossbeam-deque", + "crossbeam-utils", +] + [[package]] name = "redis" version = "0.25.3" @@ -1785,6 +1834,21 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2047c6ded9c721764247e62cd3b03c09ffc529b2ba5b10ec482ae507a4a70160" +[[package]] +name = "sysinfo" +version = "0.30.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "732ffa00f53e6b2af46208fba5718d9662a421049204e156328b66791ffa15ae" +dependencies = [ + "cfg-if", + "core-foundation-sys", + "libc", + "ntapi", + "once_cell", + "rayon", + "windows", +] + [[package]] name = "system-configuration" version = "0.5.1" diff --git a/rust/Cargo.toml b/rust/Cargo.toml index e5f91de6..84da68cd 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -24,6 +24,7 @@ log-filter = ["dep:regex"] chrono = ['dep:chrono', 'dep:chrono-humanize'] timing = ['dep:comfy-table', 'chrono'] cli = ['dep:normpath', 'dep:conch-parser', 'dep:homedir', 'chrono', 'dep:strum'] +system = ['dep:sysinfo'] redis = [ 'dep:tokio', 'dep:deadpool-redis', @@ -118,6 +119,9 @@ opentelemetry-otlp = { version = "0.14", default-features = false, features = [ ], optional = true } opentelemetry-semantic-conventions = { version = "0.13.0", optional = true } +# FEAT: system: +sysinfo = { version = "0.30", optional = true } + [target.'cfg(target_arch = "wasm32")'.dependencies] tracing-subscriber-wasm = "0.1.0" diff --git a/rust/bitbazaar/log/global_log/global_fns.rs b/rust/bitbazaar/log/global_log/global_fns.rs index 7ac30e81..d6fd690c 100644 --- a/rust/bitbazaar/log/global_log/global_fns.rs +++ b/rust/bitbazaar/log/global_log/global_fns.rs @@ -23,10 +23,19 @@ pub fn record_exception(message: impl Into, stacktrace: impl Into>, -) -> Result { - get_global()?.meter(name) +pub fn meter(name: impl Into>) -> opentelemetry::metrics::Meter { + opentelemetry::global::meter(name) +} + +#[cfg(any(feature = "opentelemetry-grpc", feature = "opentelemetry-http"))] +/// Returns the default [`opentelemetry::metrics::Meter`] for the app, labelled "default". +pub fn global_meter() -> &'static opentelemetry::metrics::Meter { + use once_cell::sync::Lazy; + + static GLOBAL_METER: Lazy = + Lazy::new(|| opentelemetry::global::meter("default")); + + &GLOBAL_METER } #[cfg(any(feature = "opentelemetry-grpc", feature = "opentelemetry-http"))] diff --git a/rust/bitbazaar/log/mod.rs b/rust/bitbazaar/log/mod.rs index 94d10896..0804dda6 100644 --- a/rust/bitbazaar/log/mod.rs +++ b/rust/bitbazaar/log/mod.rs @@ -5,7 +5,32 @@ mod macros; #[cfg(any(feature = "opentelemetry-grpc", feature = "opentelemetry-http"))] mod ot_tracing_bridge; +#[cfg(all( + feature = "system", + any(feature = "opentelemetry-grpc", feature = "opentelemetry-http") +))] +mod system_and_process_metrics; pub use global_log::{global_fns::*, GlobalLog, GlobalLogBuilder}; +#[cfg(all( + feature = "system", + any(feature = "opentelemetry-grpc", feature = "opentelemetry-http") +))] +pub use system_and_process_metrics::*; + +#[cfg(any(feature = "opentelemetry-grpc", feature = "opentelemetry-http"))] +/// Opentelemetry types that might be needed downstream. +/// The aim is to avoid the user having to depend on opentelemetry crates directly. +pub mod otlp { + pub use opentelemetry::{Key, KeyValue, StringValue, Value}; + + /// Otlp metric types. + pub mod metrics { + pub use opentelemetry::metrics::{ + Counter, Histogram, ObservableCounter, ObservableGauge, ObservableUpDownCounter, + SyncCounter, SyncHistogram, SyncUpDownCounter, Unit, UpDownCounter, + }; + } +} #[cfg(test)] mod tests { diff --git a/rust/bitbazaar/log/system_and_process_metrics.rs b/rust/bitbazaar/log/system_and_process_metrics.rs new file mode 100644 index 00000000..859443e7 --- /dev/null +++ b/rust/bitbazaar/log/system_and_process_metrics.rs @@ -0,0 +1,321 @@ +use opentelemetry::{ + metrics::{Meter, Unit}, + Key, +}; +use parking_lot::Mutex; +use sysinfo::{ + get_current_pid, CpuRefreshKind, Disks, MemoryRefreshKind, Networks, RefreshKind, System, +}; + +// https://opentelemetry.io/docs/specs/semconv/system/system-metrics/#metric-systemcpuutilization +const SYSTEM_CPU_UTILIZATION: &str = "system.cpu.utilization"; + +// https://opentelemetry.io/docs/specs/semconv/system/system-metrics/#metric-systemcpuphysicalcount +const SYSTEM_CPU_PHYSICAL_COUNT: &str = "system.cpu.physical.count"; + +// https://opentelemetry.io/docs/specs/semconv/system/system-metrics/#metric-systemcpuphysicalcount +const SYSTEM_CPU_LOGICAL_COUNT: &str = "system.cpu.logical.count"; + +// https://opentelemetry.io/docs/specs/semconv/system/system-metrics/#metric-systemcpufrequency +const SYSTEM_CPU_FREQUENCY: &str = "system.cpu.frequency"; + +// https://opentelemetry.io/docs/specs/semconv/system/system-metrics/#metric-systemmemoryusage +const SYSTEM_MEMORY_USAGE: &str = "system.memory.usage"; + +// https://opentelemetry.io/docs/specs/semconv/system/system-metrics/#metric-systemmemoryutilization +const SYSTEM_MEMORY_UTILIZATION: &str = "system.memory.utilization"; + +// https://opentelemetry.io/docs/specs/semconv/system/system-metrics/#metric-systemnetworkio +const SYSTEM_NETWORK_IO: &str = "system.network.io"; + +// https://opentelemetry.io/docs/specs/semconv/system/system-metrics/#metric-systemfilesystemusage +const SYSTEM_FILESYSTEM_USAGE: &str = "system.filesystem.usage"; + +// https://opentelemetry.io/docs/specs/semconv/system/system-metrics/#metric-systemfilesystemutilization +const SYSTEM_FILESYSTEM_UTILIZATION: &str = "system.filesystem.utilization"; + +// Used to specify which CPU the metric is for: +const SYSTEM_CPU_LOGICAL_NUMBER: Key = Key::from_static_str("system.cpu.logical_number"); + +// https://opentelemetry.io/docs/specs/semconv/system/process-metrics/#metric-processcpuutilization +const PROCESS_CPU_UTILIZATION: &str = "process.cpu.utilization"; + +// https://opentelemetry.io/docs/specs/semconv/system/process-metrics/#metric-processmemoryusage +const PROCESS_MEMORY_USAGE: &str = "process.memory.usage"; + +// https://opentelemetry.io/docs/specs/semconv/system/process-metrics/#metric-processmemoryvirtual +const PROCESS_MEMORY_VIRTUAL: &str = "process.memory.virtual"; + +// https://opentelemetry.io/docs/specs/semconv/system/process-metrics/#metric-processdiskio +const PROCESS_DISK_IO: &str = "process.disk.io"; + +// Used to specify disk info: +const SYSTEM_FILESYSTEM_MOUNTPOINT: Key = Key::from_static_str("system.filesystem.mountpoint"); +const SYSTEM_FILESYSTEM_TYPE: Key = Key::from_static_str("system.filesystem.type"); + +// read/write for disk io, receive/transmit for network io: +const DIRECTION: Key = Key::from_static_str("direction"); + +// Device name for filesystem and network: +const SYSTEM_DEVICE: Key = Key::from_static_str("system.device"); + +use super::record_exception; +use crate::prelude::*; + +/// Automatically record system metrics: +/// SYSTEM WIDE: +/// - system.cpu.physical.count +/// - system.cpu.logical.count +/// - system.cpu.utilization +/// - system.cpu.frequency +/// - system.memory.usage +/// - system.memory.utilization +/// - system.network.io +/// - system.filesystem.usage +/// - system.filesystem.utilization +/// +/// CURRENT PROCESS: +/// - process.cpu.utilization +/// - process.memory.usage +/// - process.memory.virtual +/// - process.disk.io +pub fn init_system_and_process_metrics(meter: &Meter) -> Result<(), AnyErr> { + let system_cpu_physical_count = meter + .u64_observable_gauge(SYSTEM_CPU_PHYSICAL_COUNT) + .with_description("The number of physical CPUs.") + .init(); + + let system_cpu_logical_count = meter + .u64_observable_gauge(SYSTEM_CPU_LOGICAL_COUNT) + .with_description("The number of logical CPUs.") + .init(); + + let system_cpu_utilisation = meter + .f64_observable_gauge(SYSTEM_CPU_UTILIZATION) + .with_description("The total CPU usage.") + .with_unit(Unit::new("1")) + .init(); + + let system_cpu_frequency = meter + .u64_observable_gauge(SYSTEM_CPU_FREQUENCY) + .with_description("The frequency of the CPU.") + .with_unit(Unit::new("Hz")) + .init(); + + let system_memory_usage = meter + .u64_observable_gauge(SYSTEM_MEMORY_USAGE) + .with_description("The total amount of memory in use.") + .with_unit(Unit::new("Bytes")) + .init(); + + let system_memory_utilisation = meter + .f64_observable_gauge(SYSTEM_MEMORY_UTILIZATION) + .with_description("The total memory usage ratio.") + .with_unit(Unit::new("1")) + .init(); + + let system_network_io = meter + .u64_observable_counter(SYSTEM_NETWORK_IO) + .with_description("Network bytes transferred.") + .with_unit(Unit::new("Bytes")) + .init(); + + let system_filesystem_usage = meter + .u64_observable_gauge(SYSTEM_FILESYSTEM_USAGE) + .with_description("The total data on disk.") + .with_unit(Unit::new("Bytes")) + .init(); + + let system_filesystem_utilization = meter + .f64_observable_gauge(SYSTEM_FILESYSTEM_UTILIZATION) + .with_description("The total disk usage ratio.") + .with_unit(Unit::new("1")) + .init(); + + let process_cpu_utilization = meter + .f64_observable_gauge(PROCESS_CPU_UTILIZATION) + .with_description("The process' CPU usage ratio.") + .with_unit(Unit::new("1")) + .init(); + let process_memory_usage = meter + .u64_observable_gauge(PROCESS_MEMORY_USAGE) + .with_description("The process' amount of memory in use.") + .with_unit(Unit::new("Bytes")) + .init(); + let process_memory_virtual = meter + .u64_observable_gauge(PROCESS_MEMORY_VIRTUAL) + .with_description("The amount of committed virtual memory.") + .with_unit(Unit::new("Bytes")) + .init(); + let process_disk_io = meter + .u64_observable_counter(PROCESS_DISK_IO) + .with_description("Disk bytes transferred.") + .with_unit(Unit::new("Bytes")) + .init(); + + let pid = + get_current_pid().map_err(|err| anyerr!("Couldn't get current pid. Error: {}", err))?; + + // Apparently much more efficient + accurate re-using these objects than creating each time in the callback: + let objs = Mutex::new(( + System::new_all(), + Networks::new_with_refreshed_list(), + Disks::new_with_refreshed_list(), + )); + meter + .register_callback( + &[ + system_cpu_physical_count.as_any(), + system_cpu_logical_count.as_any(), + system_cpu_utilisation.as_any(), + system_cpu_frequency.as_any(), + system_memory_usage.as_any(), + system_memory_utilisation.as_any(), + system_network_io.as_any(), + system_filesystem_usage.as_any(), + system_filesystem_utilization.as_any(), + process_cpu_utilization.as_any(), + process_memory_usage.as_any(), + process_memory_virtual.as_any(), + process_disk_io.as_any(), + ], + move |context| { + let mut objs = objs.lock(); + let (sys, networks, disks) = &mut *objs; + + // Refresh everything needed for system metrics: + sys.refresh_specifics( + RefreshKind::new() + .with_cpu(CpuRefreshKind::new().with_cpu_usage()) + .with_memory(MemoryRefreshKind::everything()), + ); + + let physical_core_count = if let Some(count) = sys.physical_core_count() { + count as u64 + } else { + record_exception("Could not get physical core count. Defaulting to 2.", ""); + 2 + }; + let cpus = sys.cpus(); + let logical_core_count = cpus.len() as u64; + + // system.cpu.physical.count + context.observe_u64(&system_cpu_physical_count, physical_core_count, &[]); + + // system.cpu.logical.count + context.observe_u64(&system_cpu_logical_count, logical_core_count, &[]); + + for (cpu_index, cpu) in cpus.iter().enumerate() { + // system.cpu.utilization + context.observe_f64( + &system_cpu_utilisation, + (cpu.cpu_usage() as f64) / 100.0, + &[SYSTEM_CPU_LOGICAL_NUMBER.i64(cpu_index as i64)], + ); + + // system.cpu.frequency + context.observe_u64( + &system_cpu_frequency, + cpu.frequency(), + &[SYSTEM_CPU_LOGICAL_NUMBER.i64(cpu_index as i64)], + ); + } + + let used_memory = sys.used_memory(); + + // system.memory.usage + context.observe_u64(&system_memory_usage, used_memory, &[]); + + // system.memory.utilization + context.observe_f64( + &system_memory_utilisation, + used_memory as f64 / sys.total_memory() as f64, + &[], + ); + + // system.network.io + networks.refresh_list(); + for (interface_name, data) in networks.iter() { + let net_common_attributes = [SYSTEM_DEVICE.string(interface_name.to_string())]; + context.observe_u64( + &system_network_io, + data.received(), + &[ + net_common_attributes.as_slice(), + &[DIRECTION.string("receive")], + ] + .concat(), + ); + context.observe_u64( + &system_network_io, + data.transmitted(), + &[ + net_common_attributes.as_slice(), + &[DIRECTION.string("transmit")], + ] + .concat(), + ); + } + + // system.filesystem.usage + system.filesystem.utilization + disks.refresh_list(); + for disk in disks.iter() { + let disk_common_attributes = [ + SYSTEM_DEVICE.string(disk.name().to_string_lossy().to_string()), + SYSTEM_FILESYSTEM_MOUNTPOINT + .string(disk.mount_point().to_string_lossy().to_string()), + SYSTEM_FILESYSTEM_TYPE + .string(disk.file_system().to_string_lossy().to_string()), + ]; + let total_space = disk.total_space(); + context.observe_u64( + &system_filesystem_usage, + total_space, + &disk_common_attributes, + ); + context.observe_f64( + &system_filesystem_utilization, + (total_space - disk.available_space()) as f64 / total_space as f64, + &disk_common_attributes, + ); + } + + // Refresh the process for the specific metrics: + sys.refresh_process(pid); + if let Some(process) = sys.process(pid) { + // process.cpu.utilization + context.observe_f64( + &process_cpu_utilization, + ((process.cpu_usage() / 100.0) / logical_core_count as f32).into(), + &[], + ); + + // process.memory.usage + context.observe_u64(&process_memory_usage, process.memory(), &[]); + + // process.memory.virtual + context.observe_u64(&process_memory_virtual, process.virtual_memory(), &[]); + + // - process.disk.io + let disk_io = process.disk_usage(); + context.observe_u64( + &process_disk_io, + disk_io.read_bytes, + &[DIRECTION.string("read")], + ); + context.observe_u64( + &process_disk_io, + disk_io.written_bytes, + &[DIRECTION.string("write")], + ); + } else { + record_exception( + "Could not get current process for system metric collection.", + "", + ); + } + }, + ) + .change_context(AnyErr)?; + Ok(()) +}