diff --git a/.github/workflows/coverage.yaml b/.github/workflows/coverage.yaml index f6919410..76265567 100644 --- a/.github/workflows/coverage.yaml +++ b/.github/workflows/coverage.yaml @@ -27,7 +27,7 @@ jobs: uses: actions-rs/tarpaulin@v0.1 with: version: '0.15.0' - args: '--manifest-path /home/runner/work/zksync-bft/zksync-bft/node/Cargo.toml -- --test-threads 1' + args: '--manifest-path node/Cargo.toml -- --test-threads 1' env: RUST_LOG: 'network=trace,sync_blocks=trace,consensus=trace' diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index 03605e00..215a84c0 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -12,7 +12,7 @@ on: env: CARGO_TERM_COLOR: always CARGO_INCREMENTAL: "0" - RUSTFLAGS: "-Dwarnings -C linker=clang -C link-arg=-fuse-ld=lld" + RUSTFLAGS: "-Dwarnings -C linker=clang -C link-arg=-fuse-ld=lld -C link-arg=-Wl,-z,nostart-stop-gc" RUSTC_WRAPPER: "sccache" SCCACHE_GHA_ENABLED: "true" RUST_BACKTRACE: "1" diff --git a/grafana_dashboard.json b/grafana_dashboard.json index f11bae9a..9426d349 100644 --- a/grafana_dashboard.json +++ b/grafana_dashboard.json @@ -341,7 +341,7 @@ "uid": "ebcc7fad-20b5-44f0-a8ab-7ba2195ef2c0" }, "editorMode": "code", - "expr": "avg(rate(network_rpc_message_size_sum{test_id=\"$test_id\",type=~\"req_sent|resp_sent\"}[1m])) by (type,method,submethod)", + "expr": "avg(rate(network_rpc_message_size_bytes_sum{test_id=\"$test_id\",type=~\"req_sent|resp_sent\"}[1m])) by (type,method,submethod)", "legendFormat": "__auto", "range": true, "refId": "A" @@ -352,7 +352,7 @@ "uid": "ebcc7fad-20b5-44f0-a8ab-7ba2195ef2c0" }, "editorMode": "code", - "expr": "quantile(0.1, rate(concurrency_net_tcp__bytes_sent{test_id=\"$test_id\"}[1m]))", + "expr": "quantile(0.1, rate(network_tcp_sent_bytes{test_id=\"$test_id\"}[1m]))", "hide": false, "legendFormat": "total, 10th percentile", "range": true, @@ -364,7 +364,7 @@ "uid": "ebcc7fad-20b5-44f0-a8ab-7ba2195ef2c0" }, "editorMode": "code", - "expr": "quantile(0.5, rate(concurrency_net_tcp__bytes_sent{test_id=\"$test_id\"}[1m]))", + "expr": "quantile(0.5, rate(network_tcp_sent_bytes{test_id=\"$test_id\"}[1m]))", "hide": false, "legendFormat": "total, 50th percentile", "range": true, @@ -376,7 +376,7 @@ "uid": "ebcc7fad-20b5-44f0-a8ab-7ba2195ef2c0" }, "editorMode": "code", - "expr": "quantile(0.9, rate(concurrency_net_tcp__bytes_sent{test_id=\"$test_id\"}[1m]))", + "expr": "quantile(0.9, rate(network_tcp_sent_bytes{test_id=\"$test_id\"}[1m]))", "hide": false, "legendFormat": "total, 90th percentile", "range": true, @@ -388,7 +388,7 @@ "uid": "ebcc7fad-20b5-44f0-a8ab-7ba2195ef2c0" }, "editorMode": "code", - "expr": "quantile(1., rate(concurrency_net_tcp__bytes_sent{test_id=\"$test_id\"}[1m]))", + "expr": "quantile(1., rate(network_tcp_sent_bytes{test_id=\"$test_id\"}[1m]))", "hide": false, "legendFormat": "total, max", "range": true, @@ -482,7 +482,7 @@ "uid": "ebcc7fad-20b5-44f0-a8ab-7ba2195ef2c0" }, "editorMode": "code", - "expr": "rate(avg(network_rpc_latency_count{test_id=\"$test_id\",type=\"client_send_recv\"}) by (method,submethod) [1m])", + "expr": "rate(avg(network_rpc_latency_seconds_count{test_id=\"$test_id\",type=\"client_send_recv\"}) by (method,submethod) [1m])", "legendFormat": "__auto", "range": true, "refId": "A" @@ -569,7 +569,7 @@ }, "editorMode": "code", "exemplar": false, - "expr": "sum by(le) (rate(network_rpc_latency_bucket{test_id=\"$test_id\",method=\"consensus\",type=\"client_send_recv\",submethod=\"LeaderPrepare\"}[1m]))", + "expr": "sum by(le) (rate(network_rpc_latency_seconds_bucket{test_id=\"$test_id\",method=\"consensus\",type=\"client_send_recv\",submethod=\"LeaderPrepare\"}[1m]))", "format": "heatmap", "instant": false, "interval": "", @@ -659,7 +659,7 @@ }, "editorMode": "code", "exemplar": false, - "expr": "sum by(le) (rate(network_rpc_latency_bucket{test_id=\"$test_id\",method=\"ping\",type=\"client_send_recv\"}[1m]))", + "expr": "sum by(le) (rate(network_rpc_latency_seconds_bucket{test_id=\"$test_id\",method=\"ping\",type=\"client_send_recv\"}[1m]))", "format": "heatmap", "instant": false, "interval": "", @@ -755,7 +755,7 @@ "uid": "ebcc7fad-20b5-44f0-a8ab-7ba2195ef2c0" }, "editorMode": "code", - "expr": "rate(max(executor_lib_io__finalized_block_number{test_id=\"$test_id\"})[1m])", + "expr": "rate(max(executor_finalized_block_number{test_id=\"$test_id\"})[1m])", "legendFormat": "__auto", "range": true, "refId": "A" @@ -847,7 +847,7 @@ "uid": "ebcc7fad-20b5-44f0-a8ab-7ba2195ef2c0" }, "editorMode": "code", - "expr": "max(executor_lib_io__finalized_block_number{test_id=\"$test_id\"})", + "expr": "max(executor_finalized_block_number{test_id=\"$test_id\"})", "legendFormat": "__auto", "range": true, "refId": "max" @@ -858,7 +858,7 @@ "uid": "ebcc7fad-20b5-44f0-a8ab-7ba2195ef2c0" }, "editorMode": "code", - "expr": "min(executor_lib_io__finalized_block_number{test_id=\"$test_id\"})", + "expr": "min(executor_finalized_block_number{test_id=\"$test_id\"})", "hide": false, "legendFormat": "__auto", "range": true, @@ -953,7 +953,7 @@ "uid": "ebcc7fad-20b5-44f0-a8ab-7ba2195ef2c0" }, "editorMode": "code", - "expr": "avg(consensus_replica__view_timeout{test_id=\"$test_id\"})", + "expr": "avg(consensus_replica_view_timeout_seconds{test_id=\"$test_id\"})", "legendFormat": "avg", "range": true, "refId": "A" @@ -964,7 +964,7 @@ "uid": "ebcc7fad-20b5-44f0-a8ab-7ba2195ef2c0" }, "editorMode": "code", - "expr": "max(consensus_replica__view_timeout{test_id=\"$test_id\"})", + "expr": "max(consensus_replica_view_timeout_seconds{test_id=\"$test_id\"})", "hide": false, "legendFormat": "max", "range": true, @@ -1044,7 +1044,7 @@ "uid": "ebcc7fad-20b5-44f0-a8ab-7ba2195ef2c0" }, "editorMode": "code", - "expr": "sum(rate(consensus_leader__commit_phase_latency_bucket{test_id=\"$test_id\"}[1m])) by (le)", + "expr": "sum(rate(consensus_leader_commit_phase_latency_seconds_bucket{test_id=\"$test_id\"}[1m])) by (le)", "format": "heatmap", "legendFormat": "__auto", "range": true, @@ -1139,7 +1139,7 @@ "uid": "ebcc7fad-20b5-44f0-a8ab-7ba2195ef2c0" }, "editorMode": "code", - "expr": "rate(sum (consensus_leader__processing_latency_sum{test_id=\"$test_id\"}) by (type)[1m])/rate(sum (consensus_leader__processing_latency_count{test_id=\"$test_id\"}) by (type)[1m])", + "expr": "rate(sum (consensus_leader_processing_latency_seconds_sum{test_id=\"$test_id\"}) by (type)[1m])/rate(sum (consensus_leader_processing_latency_seconds_count{test_id=\"$test_id\"}) by (type)[1m])", "legendFormat": "__auto", "range": true, "refId": "A" @@ -1150,7 +1150,7 @@ "uid": "ebcc7fad-20b5-44f0-a8ab-7ba2195ef2c0" }, "editorMode": "code", - "expr": "rate(sum (consensus_replica__processing_latency_sum{test_id=\"$test_id\"}) by (type)[1m])/rate(sum (consensus_replica__processing_latency_count{test_id=\"$test_id\"}) by (type)[1m])", + "expr": "rate(sum (consensus_replica_processing_latency_seconds_sum{test_id=\"$test_id\"}) by (type)[1m])/rate(sum (consensus_replica_processing_latency_seconds_count{test_id=\"$test_id\"}) by (type)[1m])", "hide": false, "legendFormat": "__auto", "range": true, @@ -1527,7 +1527,7 @@ "uid": "ebcc7fad-20b5-44f0-a8ab-7ba2195ef2c0" }, "editorMode": "code", - "expr": "sum(rate(concurrency_net_tcp__established{test_id=\"$test_id\",direction=\"inbound\"}[1m]))", + "expr": "sum(rate(network_tcp_established{test_id=\"$test_id\",direction=\"inbound\"}[1m]))", "legendFormat": "new inbound", "range": true, "refId": "A" @@ -1646,7 +1646,7 @@ "uid": "ebcc7fad-20b5-44f0-a8ab-7ba2195ef2c0" }, "editorMode": "code", - "expr": "avg((min(histogram_quantile(0.8,rate(network_rpc_latency_bucket{test_id=\"$test_id\",method=\"ping\",type=\"client_send_recv\"}[1m]))) by (instance))/(min(rate(consensus_leader__commit_phase_latency_sum{test_id=\"$test_id\"}[1m])/rate(consensus_leader__commit_phase_latency_count{test_id=\"$test_id\"}[1m])) by (instance)))", + "expr": "avg((min(histogram_quantile(0.8,rate(network_rpc_latency_seconds_bucket{test_id=\"$test_id\",method=\"ping\",type=\"client_send_recv\"}[1m]))) by (instance))/(min(rate(consensus_leader_commit_phase_latency_seconds_sum{test_id=\"$test_id\"}[1m])/rate(consensus_leader_commit_phase_latency_seconds_count{test_id=\"$test_id\"}[1m])) by (instance)))", "hide": false, "legendFormat": "actual commit phase rate", "range": true, @@ -1658,7 +1658,7 @@ "uid": "ebcc7fad-20b5-44f0-a8ab-7ba2195ef2c0" }, "editorMode": "code", - "expr": "2*avg(histogram_quantile(0.8,rate(network_rpc_latency_bucket{test_id=\"$test_id\",method=\"ping\",type=\"client_send_recv\"}[1m])))*rate(max(executor_lib_io__finalized_block_number{test_id=\"$test_id\"})[1m])", + "expr": "2*avg(histogram_quantile(0.8,rate(network_rpc_latency_seconds_bucket{test_id=\"$test_id\",method=\"ping\",type=\"client_send_recv\"}[1m])))*rate(max(executor_finalized_block_number{test_id=\"$test_id\"})[1m])", "hide": false, "legendFormat": "actual block rate", "range": true, @@ -1670,7 +1670,7 @@ "uid": "ebcc7fad-20b5-44f0-a8ab-7ba2195ef2c0" }, "editorMode": "code", - "expr": "avg(2*histogram_quantile(0.8,rate(network_rpc_latency_bucket{test_id=\"$test_id\",method=\"ping\",type=\"client_send_recv\"}[1m]))/(3*histogram_quantile(0.66,rate(network_rpc_latency_bucket{test_id=\"$test_id\",method=\"ping\",type=\"client_send_recv\"}[1m]))))", + "expr": "avg(2*histogram_quantile(0.8,rate(network_rpc_latency_seconds_bucket{test_id=\"$test_id\",method=\"ping\",type=\"client_send_recv\"}[1m]))/(3*histogram_quantile(0.66,rate(network_rpc_latency_seconds_bucket{test_id=\"$test_id\",method=\"ping\",type=\"client_send_recv\"}[1m]))))", "hide": false, "legendFormat": "theoretical optimum for 2/3 signatures 3 roundtrips", "range": true, @@ -1694,7 +1694,7 @@ "uid": "ebcc7fad-20b5-44f0-a8ab-7ba2195ef2c0" }, "editorMode": "code", - "expr": "1/(\n 1/(avg(histogram_quantile(0.8,rate(network_rpc_latency_bucket{test_id=\"$test_id\",method=\"ping\",type=\"client_send_recv\"}[1m])))*rate(max(executor_lib_io__finalized_block_number{test_id=\"$test_id\"})[1m]))\n-\n 1/avg((min(histogram_quantile(0.8,rate(network_rpc_latency_bucket{test_id=\"$test_id\",method=\"ping\",type=\"client_send_recv\"}[1m]))) by (instance))/(min(rate(consensus_leader__commit_phase_latency_sum{test_id=\"$test_id\"}[1m])/rate(consensus_leader__commit_phase_latency_count{test_id=\"$test_id\"}[1m])) by (instance)))\n)", + "expr": "1/(\n 1/(avg(histogram_quantile(0.8,rate(network_rpc_latency_seconds_bucket{test_id=\"$test_id\",method=\"ping\",type=\"client_send_recv\"}[1m])))*rate(max(executor_finalized_block_number{test_id=\"$test_id\"})[1m]))\n-\n 1/avg((min(histogram_quantile(0.8,rate(network_rpc_latency_seconds_bucket{test_id=\"$test_id\",method=\"ping\",type=\"client_send_recv\"}[1m]))) by (instance))/(min(rate(consensus_leader_commit_phase_latency_seconds_sum{test_id=\"$test_id\"}[1m])/rate(consensus_leader_commit_phase_latency_seconds_count{test_id=\"$test_id\"}[1m])) by (instance)))\n)", "hide": false, "legendFormat": "actual prepare phase rate (approx)", "range": true, @@ -1789,7 +1789,7 @@ "uid": "ebcc7fad-20b5-44f0-a8ab-7ba2195ef2c0" }, "editorMode": "code", - "expr": "(avg(rate(network_rpc_latency_sum{test_id=\"$test_id\",type=~\"client_send_recv\"}[30s])/rate(network_rpc_latency_count{test_id=\"$test_id\",type=~\"client_send_recv\",method!=\"sync_validator_addrs\"}[30s])) by (method,submethod,type))", + "expr": "(avg(rate(network_rpc_latency_seconds_sum{test_id=\"$test_id\",type=~\"client_send_recv\"}[30s])/rate(network_rpc_latency_seconds_count{test_id=\"$test_id\",type=~\"client_send_recv\",method!=\"sync_validator_addrs\"}[30s])) by (method,submethod,type))", "legendFormat": "__auto", "range": true, "refId": "A" diff --git a/node/Cargo.lock b/node/Cargo.lock index 7aaea7fe..3755a046 100644 --- a/node/Cargo.lock +++ b/node/Cargo.lock @@ -393,7 +393,6 @@ dependencies = [ "hyper", "once_cell", "pin-project", - "prometheus", "rand", "sha2", "thiserror", @@ -401,6 +400,7 @@ dependencies = [ "tokio", "tracing", "tracing-subscriber", + "vise", ] [[package]] @@ -412,7 +412,6 @@ dependencies = [ "crypto", "network", "once_cell", - "prometheus", "rand", "roles", "schema", @@ -422,6 +421,7 @@ dependencies = [ "tokio", "tracing", "utils", + "vise", ] [[package]] @@ -532,6 +532,12 @@ dependencies = [ "subtle", ] +[[package]] +name = "dtoa" +version = "1.0.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dcbb2bf8e87535c23f7a8a321e364ce21462d0ff10cb6407820e8e96dfff6653" + [[package]] name = "ed25519" version = "2.2.2" @@ -563,6 +569,15 @@ version = "1.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a26ae43d7bcc3b814de94796a5e736d4029efb0ee900c12e2d54c993ad1a1e07" +[[package]] +name = "elsa" +version = "1.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "714f766f3556b44e7e4776ad133fcc3445a489517c25c704ace411bb14790194" +dependencies = [ + "stable_deref_trait", +] + [[package]] name = "errno" version = "0.3.2" @@ -595,7 +610,6 @@ dependencies = [ "hex", "network", "once_cell", - "prometheus", "rand", "roles", "schema", @@ -607,6 +621,8 @@ dependencies = [ "tracing", "tracing-subscriber", "utils", + "vise", + "vise-exporter", ] [[package]] @@ -980,6 +996,26 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "linkme" +version = "0.3.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9f948366ad5bb46b5514ba7a7a80643726eef08b06632592699676748c8bc33b" +dependencies = [ + "linkme-impl", +] + +[[package]] +name = "linkme-impl" +version = "0.3.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bc28438cad73dcc90ff3466fc329a9252b1b8ba668eb0d5668ba97088cf4eef0" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.29", +] + [[package]] name = "linux-raw-sys" version = "0.4.5" @@ -1072,7 +1108,6 @@ dependencies = [ "once_cell", "pin-project", "pretty_assertions", - "prometheus", "rand", "roles", "schema", @@ -1082,6 +1117,7 @@ dependencies = [ "tokio", "tracing", "utils", + "vise", ] [[package]] @@ -1321,18 +1357,26 @@ dependencies = [ ] [[package]] -name = "prometheus" -version = "0.13.3" +name = "prometheus-client" +version = "0.21.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "449811d15fbdf5ceb5c1144416066429cf82316e2ec8ce0c1f6f8a02e7bbcf8c" +checksum = "3c99afa9a01501019ac3a14d71d9f94050346f55ca471ce90c799a15c58f61e2" dependencies = [ - "cfg-if", - "fnv", - "lazy_static", - "memchr", + "dtoa", + "itoa", "parking_lot", - "protobuf", - "thiserror", + "prometheus-client-derive-encode", +] + +[[package]] +name = "prometheus-client-derive-encode" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "440f724eba9f6996b75d63681b0a92b06947f1457076d503a4d2e2c8f56442b8" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.29", ] [[package]] @@ -1425,12 +1469,6 @@ dependencies = [ "prost", ] -[[package]] -name = "protobuf" -version = "2.28.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "106dd99e98437432fed6519dedecfade6a06a73bb7b2a1e019fdd2bee5778d94" - [[package]] name = "protoc-bin-vendored" version = "3.0.0" @@ -1843,6 +1881,12 @@ dependencies = [ "der", ] +[[package]] +name = "stable_deref_trait" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a8f112729512f8e442d81f95a8a7ddf2b7c6b8a1a6f509a95864142b30cab2d3" + [[package]] name = "storage" version = "0.1.0" @@ -2187,6 +2231,40 @@ version = "0.9.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f" +[[package]] +name = "vise" +version = "0.1.0" +source = "git+https://github.com/matter-labs/vise.git?rev=8322ddc4bb115a7d11127626730b94f93b804cbe#8322ddc4bb115a7d11127626730b94f93b804cbe" +dependencies = [ + "elsa", + "linkme", + "once_cell", + "prometheus-client", + "vise-macros", +] + +[[package]] +name = "vise-exporter" +version = "0.1.0" +source = "git+https://github.com/matter-labs/vise.git?rev=8322ddc4bb115a7d11127626730b94f93b804cbe#8322ddc4bb115a7d11127626730b94f93b804cbe" +dependencies = [ + "hyper", + "once_cell", + "tokio", + "tracing", + "vise", +] + +[[package]] +name = "vise-macros" +version = "0.1.0" +source = "git+https://github.com/matter-labs/vise.git?rev=8322ddc4bb115a7d11127626730b94f93b804cbe#8322ddc4bb115a7d11127626730b94f93b804cbe" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.29", +] + [[package]] name = "want" version = "0.3.1" diff --git a/node/Cargo.toml b/node/Cargo.toml index 7ce80f2d..4ef28062 100644 --- a/node/Cargo.toml +++ b/node/Cargo.toml @@ -34,7 +34,6 @@ hyper = { version = "0.14.27", features = ["http1", "http2","server","tcp"] } im = "15.1.0" once_cell = "1.17.1" pin-project = "1.1.0" -prometheus = "0.13.3" prost = "0.11.0" prost-build = "0.11.0" prost-reflect = { version = "0.11.0", features = ["derive","serde"] } @@ -59,6 +58,8 @@ time = "0.3.23" tokio = { version = "1.28.1", features = ["full"] } tracing = { version = "0.1.37", features = ["attributes"] } tracing-subscriber = { version = "0.3.16", features = ["env-filter", "fmt"] } +vise = { version = "0.1.0", git = "https://github.com/matter-labs/vise.git", rev = "8322ddc4bb115a7d11127626730b94f93b804cbe" } +vise-exporter = { version = "0.1.0", git = "https://github.com/matter-labs/vise.git", rev = "8322ddc4bb115a7d11127626730b94f93b804cbe" } # Note that "bench" profile inherits from "release" profile and # "test" profile inherits from "dev" profile. diff --git a/node/actors/consensus/Cargo.toml b/node/actors/consensus/Cargo.toml index d07953a8..beab70a8 100644 --- a/node/actors/consensus/Cargo.toml +++ b/node/actors/consensus/Cargo.toml @@ -9,11 +9,11 @@ license.workspace = true [dependencies] anyhow.workspace = true once_cell.workspace = true -prometheus.workspace = true rand.workspace = true tempfile.workspace = true thiserror.workspace = true tracing.workspace = true +vise.workspace = true concurrency = { path = "../../libs/concurrency" } crypto = { path = "../../libs/crypto" } diff --git a/node/actors/consensus/src/leader/replica_commit.rs b/node/actors/consensus/src/leader/replica_commit.rs index 835525fd..d9d4e948 100644 --- a/node/actors/consensus/src/leader/replica_commit.rs +++ b/node/actors/consensus/src/leader/replica_commit.rs @@ -1,21 +1,11 @@ use super::StateMachine; -use crate::{inner::ConsensusInner, leader::error::Error}; +use crate::{inner::ConsensusInner, leader::error::Error, metrics}; use anyhow::{anyhow, Context}; -use concurrency::ctx; +use concurrency::{ctx, metrics::LatencyHistogramExt as _}; use network::io::{ConsensusInputMessage, Target}; -use once_cell::sync::Lazy; use roles::validator; use tracing::instrument; -static COMMIT_PHASE_LATENCY: Lazy = Lazy::new(|| { - prometheus::register_histogram!( - "consensus_leader__commit_phase_latency", - "latency of the commit phase observed by the leader", - prometheus::exponential_buckets(0.01, 1.5, 20).unwrap(), - ) - .unwrap() -}); - impl StateMachine { #[instrument(level = "trace", ret)] pub(crate) fn process_replica_commit( @@ -92,7 +82,9 @@ impl StateMachine { // ----------- Update the state machine -------------- let now = ctx.now(); - COMMIT_PHASE_LATENCY.observe((now - self.phase_start).as_seconds_f64()); + metrics::METRICS + .leader_commit_phase_latency + .observe_latency(now - self.phase_start); self.view = message.view.next(); self.phase = validator::Phase::Prepare; self.phase_start = now; diff --git a/node/actors/consensus/src/leader/replica_prepare.rs b/node/actors/consensus/src/leader/replica_prepare.rs index a9eaacba..912807ad 100644 --- a/node/actors/consensus/src/leader/replica_prepare.rs +++ b/node/actors/consensus/src/leader/replica_prepare.rs @@ -1,22 +1,13 @@ use super::StateMachine; -use crate::{inner::ConsensusInner, leader::error::Error}; +use crate::{inner::ConsensusInner, leader::error::Error, metrics}; use anyhow::{anyhow, Context as _}; use concurrency::ctx; use network::io::{ConsensusInputMessage, Target}; -use once_cell::sync::Lazy; use rand::Rng; use roles::validator; use std::collections::HashMap; use tracing::instrument; -static PROPOSAL_PAYLOAD_SIZE: Lazy = Lazy::new(|| { - prometheus::register_histogram!( - "consensus_leader__proposal_payload_size", - "size of the proposed payload in bytes", - ) - .unwrap() -}); - impl StateMachine { #[instrument(level = "trace", ret)] pub(crate) fn process_replica_prepare( @@ -170,7 +161,9 @@ impl StateMachine { number: highest_qc.message.proposal_block_number.next(), payload, }; - PROPOSAL_PAYLOAD_SIZE.observe(block.payload.len() as f64); + metrics::METRICS + .leader_proposal_payload_size + .observe(block.payload.len()); let vote = validator::ReplicaCommit { view: message.view, diff --git a/node/actors/consensus/src/leader/state_machine.rs b/node/actors/consensus/src/leader/state_machine.rs index b065bda5..f2473d92 100644 --- a/node/actors/consensus/src/leader/state_machine.rs +++ b/node/actors/consensus/src/leader/state_machine.rs @@ -1,6 +1,5 @@ -use crate::ConsensusInner; -use concurrency::{ctx, time}; -use once_cell::sync::Lazy; +use crate::{metrics, ConsensusInner}; +use concurrency::{ctx, metrics::LatencyHistogramExt as _, time}; use roles::validator; use std::{ collections::{BTreeMap, HashMap}, @@ -8,22 +7,6 @@ use std::{ }; use tracing::{instrument, warn}; -static PROCESSING_LATENCY: Lazy = Lazy::new(|| { - prometheus::register_histogram_vec!( - "consensus_leader__processing_latency", - "latency of processing messages", - &["type", "result"] - ) - .unwrap() -}); - -fn result_label(res: &Result) -> &str { - match res { - Ok(_) => "ok", - Err(_) => "err", - } -} - /// The StateMachine struct contains the state of the leader. This is a simple state machine. We just store /// replica messages and produce leader messages (including proposing blocks) when we reach the threshold for /// those messages. When participating in consensus we are not the leader most of the time. @@ -76,20 +59,19 @@ impl StateMachine { input: validator::Signed, ) { let now = ctx.now(); - let (type_, result) = match &input.msg { + let (label, result) = match &input.msg { validator::ConsensusMsg::ReplicaPrepare(_) => ( - "ReplicaPrepare", + metrics::ConsensusMsgLabel::ReplicaPrepare, self.process_replica_prepare(ctx, consensus, input.cast().unwrap()), ), validator::ConsensusMsg::ReplicaCommit(_) => ( - "ReplicaCommit", + metrics::ConsensusMsgLabel::ReplicaCommit, self.process_replica_commit(ctx, consensus, input.cast().unwrap()), ), _ => unreachable!(), }; - PROCESSING_LATENCY - .with_label_values(&[type_, result_label(&result)]) - .observe((ctx.now() - now).as_seconds_f64()); + metrics::METRICS.leader_processing_latency[&label.with_result(&result)] + .observe_latency(ctx.now() - now); if let Err(e) = result { warn!("{}", e); } diff --git a/node/actors/consensus/src/lib.rs b/node/actors/consensus/src/lib.rs index 3c7ab9ee..0ade85e7 100644 --- a/node/actors/consensus/src/lib.rs +++ b/node/actors/consensus/src/lib.rs @@ -27,6 +27,7 @@ use utils::pipe::ActorPipe; mod inner; pub mod io; mod leader; +mod metrics; pub mod misc; mod replica; pub mod testonly; diff --git a/node/actors/consensus/src/metrics.rs b/node/actors/consensus/src/metrics.rs new file mode 100644 index 00000000..46d61dfb --- /dev/null +++ b/node/actors/consensus/src/metrics.rs @@ -0,0 +1,73 @@ +//! Metrics for the consensus module. + +use std::time::Duration; +use vise::{Buckets, EncodeLabelSet, EncodeLabelValue, Family, Gauge, Histogram, Metrics, Unit}; + +const PAYLOAD_SIZE_BUCKETS: Buckets = + Buckets::exponential((4 * schema::kB) as f64..=(4 * schema::MB) as f64, 4.0); + +/// Label for a consensus message. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, EncodeLabelValue)] +#[metrics(rename_all = "snake_case")] +pub(crate) enum ConsensusMsgLabel { + /// Label for a `LeaderPrepare` message. + LeaderPrepare, + /// Label for a `LeaderCommit` message. + LeaderCommit, + /// Label for a `ReplicaPrepare` message. + ReplicaPrepare, + /// Label for a `ReplicaCommit` message. + ReplicaCommit, +} + +impl ConsensusMsgLabel { + /// Attaches a result to this label. + pub(crate) fn with_result(self, result: &Result<(), E>) -> ProcessingLatencyLabels { + ProcessingLatencyLabels { + r#type: self, + result: match result { + Ok(()) => ResultLabel::Ok, + Err(_) => ResultLabel::Err, + }, + } + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, EncodeLabelValue)] +#[metrics(rename_all = "snake_case")] +enum ResultLabel { + Ok, + Err, +} + +/// Labels for processing latency metrics. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, EncodeLabelSet)] +pub(crate) struct ProcessingLatencyLabels { + r#type: ConsensusMsgLabel, + result: ResultLabel, +} + +/// Metrics defined by the consensus module. +#[derive(Debug, Metrics)] +#[metrics(prefix = "consensus")] +pub(crate) struct ConsensusMetrics { + /// Size of the proposed payload in bytes. + #[metrics(buckets = PAYLOAD_SIZE_BUCKETS, unit = Unit::Bytes)] + pub(crate) leader_proposal_payload_size: Histogram, + /// Latency of the commit phase observed by the leader. + #[metrics(buckets = Buckets::exponential(0.01..=20.0, 1.5), unit = Unit::Seconds)] + pub(crate) leader_commit_phase_latency: Histogram, + /// Currently set timeout after which replica will proceed to the next view. + #[metrics(unit = Unit::Seconds)] + pub(crate) replica_view_timeout: Gauge, + /// Latency of processing messages by the replicas. + #[metrics(buckets = Buckets::LATENCIES, unit = Unit::Seconds)] + pub(crate) replica_processing_latency: Family>, + /// Latency of processing messages by the leader. + #[metrics(buckets = Buckets::LATENCIES, unit = Unit::Seconds)] + pub(crate) leader_processing_latency: Family>, +} + +/// Global instance of [`ConsensusMetrics`]. +#[vise::register] +pub(crate) static METRICS: vise::Global = vise::Global::new(); diff --git a/node/actors/consensus/src/replica/state_machine.rs b/node/actors/consensus/src/replica/state_machine.rs index abf8d793..3380a65d 100644 --- a/node/actors/consensus/src/replica/state_machine.rs +++ b/node/actors/consensus/src/replica/state_machine.rs @@ -1,6 +1,5 @@ -use crate::ConsensusInner; -use concurrency::{ctx, time}; -use once_cell::sync::Lazy; +use crate::{metrics, ConsensusInner}; +use concurrency::{ctx, metrics::LatencyHistogramExt as _, time}; use roles::validator; use std::{ collections::{BTreeMap, HashMap}, @@ -9,22 +8,6 @@ use std::{ use storage::Storage; use tracing::{instrument, warn}; -static PROCESSING_LATENCY: Lazy = Lazy::new(|| { - prometheus::register_histogram_vec!( - "consensus_replica__processing_latency", - "latency of processing messages", - &["type", "result"] - ) - .unwrap() -}); - -fn result_label(res: &Result) -> &str { - match res { - Ok(_) => "ok", - Err(_) => "err", - } -} - /// The StateMachine struct contains the state of the replica. This is the most complex state machine and is responsible /// for validating and voting on blocks. When participating in consensus we are always a replica. #[derive(Debug)] @@ -106,20 +89,19 @@ impl StateMachine { }; let now = ctx.now(); - let (type_, result) = match &signed_msg.msg { + let (label, result) = match &signed_msg.msg { validator::ConsensusMsg::LeaderPrepare(_) => ( - "LeaderPrepare", + metrics::ConsensusMsgLabel::LeaderPrepare, self.process_leader_prepare(ctx, consensus, signed_msg.cast().unwrap()), ), validator::ConsensusMsg::LeaderCommit(_) => ( - "LeaderCommit", + metrics::ConsensusMsgLabel::LeaderCommit, self.process_leader_commit(ctx, consensus, signed_msg.cast().unwrap()), ), _ => unreachable!(), }; - PROCESSING_LATENCY - .with_label_values(&[type_, result_label(&result)]) - .observe((ctx.now() - now).as_seconds_f64()); + metrics::METRICS.replica_processing_latency[&label.with_result(&result)] + .observe_latency(ctx.now() - now); // All errors from processing inputs are recoverable, so we just log them. if let Err(e) = result { warn!("{}", e); diff --git a/node/actors/consensus/src/replica/timer.rs b/node/actors/consensus/src/replica/timer.rs index fac0aef6..4df7b7da 100644 --- a/node/actors/consensus/src/replica/timer.rs +++ b/node/actors/consensus/src/replica/timer.rs @@ -1,16 +1,8 @@ use super::StateMachine; -use concurrency::{ctx, time}; -use once_cell::sync::Lazy; +use crate::metrics; +use concurrency::{ctx, metrics::LatencyGaugeExt as _, time}; use tracing::instrument; -static VIEW_TIMEOUT: Lazy = Lazy::new(|| { - prometheus::register_gauge!( - "consensus_replica__view_timeout", - "currently set timeout after which replica will proceed to the next view", - ) - .unwrap() -}); - impl StateMachine { /// The base duration of the timeout. pub(crate) const BASE_DURATION: time::Duration = time::Duration::milliseconds(1000); @@ -21,7 +13,7 @@ impl StateMachine { pub(crate) fn reset_timer(&mut self, ctx: &ctx::Ctx) { let timeout = Self::BASE_DURATION * 2u32.pow((self.view.0 - self.high_qc.message.view.0) as u32); - VIEW_TIMEOUT.set(timeout.as_seconds_f64()); + metrics::METRICS.replica_view_timeout.set_latency(timeout); self.timeout_deadline = time::Deadline::Finite(ctx.now() + timeout); } } diff --git a/node/actors/executor/Cargo.toml b/node/actors/executor/Cargo.toml index 2581b8d4..bcd3376a 100644 --- a/node/actors/executor/Cargo.toml +++ b/node/actors/executor/Cargo.toml @@ -11,13 +11,14 @@ default-run = "executor" anyhow.workspace = true hex.workspace = true once_cell.workspace = true -prometheus.workspace = true rand.workspace = true serde.workspace = true serde_json.workspace = true tokio.workspace = true tracing.workspace = true tracing-subscriber.workspace = true +vise.workspace = true +vise-exporter.workspace = true concurrency = { path = "../../libs/concurrency" } crypto = { path = "../../libs/crypto" } diff --git a/node/actors/executor/src/lib/io.rs b/node/actors/executor/src/lib/io.rs index a52aa9fc..cfe684b8 100644 --- a/node/actors/executor/src/lib/io.rs +++ b/node/actors/executor/src/lib/io.rs @@ -1,5 +1,6 @@ //! Module to manage the communication between actors. It simply converts and forwards messages from and to each different actor. +use crate::metrics; use concurrency::{ ctx::{self, channel}, scope, @@ -8,21 +9,12 @@ use consensus::io::{ InputMessage as ConsensusInputMessage, OutputMessage as ConsensusOutputMessage, }; use network::io::{InputMessage as NetworkInputMessage, OutputMessage as NetworkOutputMessage}; -use once_cell::sync::Lazy; use sync_blocks::io::{ InputMessage as SyncBlocksInputMessage, OutputMessage as SyncBlocksOutputMessage, }; use tracing::instrument; use utils::pipe::DispatcherPipe; -static FINALIZED_BLOCK_NUMBER: Lazy = Lazy::new(|| { - prometheus::register_int_gauge!( - "executor_lib_io__finalized_block_number", - "Number of the last finalized block observed by the node", - ) - .unwrap() -}); - /// The IO dispatcher, it is the main struct to handle actor messages. It simply contains a sender and a receiver for /// a pair of channels for each actor. This of course allows us to send and receive messages to and from each actor. #[derive(Debug)] @@ -62,8 +54,11 @@ impl Dispatcher { self.network_input.send(message.into()) } ConsensusOutputMessage::FinalizedBlock(b) => { - let n = FINALIZED_BLOCK_NUMBER.get(); - FINALIZED_BLOCK_NUMBER.set(n.max(b.block.number.0 as i64)); + let number_metric = &metrics::METRICS.finalized_block_number; + let current_number = number_metric.get(); + number_metric.set(current_number.max(b.block.number.0)); + // This works because this is the only place where `finalized_block_number` + // is modified, and there should be a single running `Dispatcher`. } } } diff --git a/node/actors/executor/src/lib/lib.rs b/node/actors/executor/src/lib/lib.rs index 8cef2047..8177f880 100644 --- a/node/actors/executor/src/lib/lib.rs +++ b/node/actors/executor/src/lib/lib.rs @@ -2,3 +2,4 @@ pub mod configurator; pub mod io; +mod metrics; diff --git a/node/actors/executor/src/lib/metrics.rs b/node/actors/executor/src/lib/metrics.rs new file mode 100644 index 00000000..e404722f --- /dev/null +++ b/node/actors/executor/src/lib/metrics.rs @@ -0,0 +1,15 @@ +//! Metrics defined by the executor module. + +use vise::{Gauge, Metrics}; + +/// Metrics defined by the executor module. +#[derive(Debug, Metrics)] +#[metrics(prefix = "executor")] +pub(crate) struct ExecutorMetrics { + /// Number of the last finalized block observed by the node. + pub(crate) finalized_block_number: Gauge, +} + +/// Global instance of [`ExecutorMetrics`]. +#[vise::register] +pub(crate) static METRICS: vise::Global = vise::Global::new(); diff --git a/node/actors/executor/src/main.rs b/node/actors/executor/src/main.rs index 404a27c4..84a46e9e 100644 --- a/node/actors/executor/src/main.rs +++ b/node/actors/executor/src/main.rs @@ -2,14 +2,15 @@ //! manages communication between the actors. It is the main executable in this workspace. use anyhow::Context as _; -use concurrency::{ctx, metrics, scope, time}; +use concurrency::{ctx, scope, time}; use consensus::Consensus; use executor::{configurator::Configs, io::Dispatcher}; use std::{fs, io::IsTerminal as _, path::Path, sync::Arc}; use storage::Storage; use tracing::{debug, info, metadata::LevelFilter}; use tracing_subscriber::{prelude::*, Registry}; -use utils::pipe; +use utils::{no_copy::NoCopy, pipe}; +use vise_exporter::MetricsExporter; #[tokio::main] async fn main() -> anyhow::Result<()> { @@ -100,16 +101,22 @@ async fn main() -> anyhow::Result<()> { debug!("Starting actors in separate threads."); scope::run!(ctx, |ctx, s| async { if let Some(addr) = configs.config.metrics_server_addr { - s.spawn_bg(metrics::run_server(ctx, addr)); + let addr = NoCopy::from(addr); + s.spawn_bg(async { + let addr = addr; + MetricsExporter::default() + .with_graceful_shutdown(ctx.canceled_owned()) // FIXME: support non-'static shutdown + .start(*addr) + .await?; + Ok(()) + }); } s.spawn_blocking(|| dispatcher.run(ctx).context("IO Dispatcher stopped")); s.spawn(async { let state = network::State::new(configs.network_config(), None, None); - prometheus::default_registry() - .register(Box::new(state.collector())) - .unwrap(); + state.register_metrics(); network::run_network(ctx, state, network_actor_pipe) .await .context("Network stopped") diff --git a/node/actors/network/Cargo.toml b/node/actors/network/Cargo.toml index 4ed60b29..7d24287a 100644 --- a/node/actors/network/Cargo.toml +++ b/node/actors/network/Cargo.toml @@ -12,11 +12,11 @@ async-trait.workspace = true im.workspace = true once_cell.workspace = true pin-project.workspace = true -prometheus.workspace = true rand.workspace = true snow.workspace = true thiserror.workspace = true tracing.workspace = true +vise.workspace = true concurrency = { path = "../../libs/concurrency" } crypto = { path = "../../libs/crypto" } diff --git a/node/actors/network/src/lib.rs b/node/actors/network/src/lib.rs index b8b876e9..011b6e20 100644 --- a/node/actors/network/src/lib.rs +++ b/node/actors/network/src/lib.rs @@ -9,6 +9,7 @@ mod event; mod frame; pub mod gossip; pub mod io; +mod metrics; mod mux; mod noise; mod pool; diff --git a/node/actors/network/src/metrics.rs b/node/actors/network/src/metrics.rs new file mode 100644 index 00000000..601c0ad2 --- /dev/null +++ b/node/actors/network/src/metrics.rs @@ -0,0 +1,162 @@ +//! General-purpose network metrics. + +use crate::state::State; +use concurrency::{ctx, io, metrics::GaugeGuard, net}; +use std::{ + net::SocketAddr, + pin::Pin, + sync::Weak, + task::{ready, Context, Poll}, +}; +use vise::{Collector, Counter, EncodeLabelSet, EncodeLabelValue, Family, Gauge, Metrics, Unit}; + +/// Metered TCP stream. +#[pin_project::pin_project] +pub(crate) struct MeteredStream { + #[pin] + stream: net::tcp::Stream, + _active: GaugeGuard, +} + +impl MeteredStream { + /// Opens a TCP connection to a remote host and returns a metered stream. + pub(crate) async fn connect( + ctx: &ctx::Ctx, + addr: SocketAddr, + ) -> ctx::OrCanceled> { + let io_result = net::tcp::connect(ctx, addr).await?; + Ok(io_result.map(|stream| Self::new(stream, Direction::Outbound))) + } + + /// Accepts an inbound connection and returns a metered stream. + pub(crate) async fn listen( + ctx: &ctx::Ctx, + listener: &mut net::tcp::Listener, + ) -> ctx::OrCanceled> { + let io_result = net::tcp::accept(ctx, listener).await?; + Ok(io_result.map(|stream| Self::new(stream, Direction::Inbound))) + } + + #[cfg(test)] + pub(crate) async fn test_pipe(ctx: &ctx::Ctx) -> (Self, Self) { + let (outbound_stream, inbound_stream) = net::tcp::testonly::pipe(ctx).await; + let outbound_stream = Self::new(outbound_stream, Direction::Outbound); + let inbound_stream = Self::new(inbound_stream, Direction::Inbound); + (outbound_stream, inbound_stream) + } + + fn new(stream: net::tcp::Stream, direction: Direction) -> Self { + TCP_METRICS.established[&direction].inc(); + Self { + stream, + _active: GaugeGuard::from(TCP_METRICS.active[&direction].clone()), + } + } +} + +impl io::AsyncRead for MeteredStream { + #[inline(always)] + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut io::ReadBuf<'_>, + ) -> Poll> { + let this = self.project(); + let before = buf.remaining(); + let res = this.stream.poll_read(cx, buf); + let after = buf.remaining(); + TCP_METRICS.received.inc_by((before - after) as u64); + res + } +} + +impl io::AsyncWrite for MeteredStream { + #[inline(always)] + fn poll_write(self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll> { + let this = self.project(); + let res = ready!(this.stream.poll_write(cx, buf))?; + TCP_METRICS.sent.inc_by(res as u64); + Poll::Ready(Ok(res)) + } + + #[inline(always)] + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + self.project().stream.poll_flush(cx) + } + + #[inline(always)] + fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + self.project().stream.poll_shutdown(cx) + } +} + +/// Direction of a TCP connection. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, EncodeLabelSet, EncodeLabelValue)] +#[metrics(label = "direction", rename_all = "snake_case")] +enum Direction { + /// Inbound connection. + Inbound, + /// Outbound connection. + Outbound, +} + +/// Metrics reported for TCP connections. +#[derive(Debug, Metrics)] +#[metrics(prefix = "network_tcp")] +struct TcpMetrics { + /// Total bytes sent over all TCP connections. + #[metrics(unit = Unit::Bytes)] + sent: Counter, + /// Total bytes received over all TCP connections. + #[metrics(unit = Unit::Bytes)] + received: Counter, + /// TCP connections established since the process started. + established: Family, + /// Number of currently active TCP connections. + active: Family, +} + +/// TCP metrics instance. +#[vise::register] +static TCP_METRICS: vise::Global = vise::Global::new(); + +/// General-purpose network metrics exposed via a collector. +#[derive(Debug, Metrics)] +#[metrics(prefix = "network")] +pub(crate) struct NetworkGauges { + /// Number of active inbound gossip connections. + gossip_inbound_connections: Gauge, + /// Number of active outbound gossip connections. + gossip_outbound_connections: Gauge, + /// Number of active inbound consensus connections. + consensus_inbound_connections: Gauge, + /// Number of active outbound consensus connections. + consensus_outbound_connections: Gauge, +} + +impl NetworkGauges { + /// Registers a metrics collector for the specified state. + pub(crate) fn register(state_ref: Weak) { + #[vise::register] + static COLLECTOR: Collector> = Collector::new(); + + let register_result = COLLECTOR.before_scrape(move || { + state_ref.upgrade().map(|state| { + let gauges = NetworkGauges::default(); + let len = state.gossip.inbound.subscribe().borrow().current().len(); + gauges.gossip_inbound_connections.set(len); + let len = state.gossip.outbound.subscribe().borrow().current().len(); + gauges.gossip_outbound_connections.set(len); + let len = state.consensus.inbound.subscribe().borrow().current().len(); + gauges.consensus_inbound_connections.set(len); + let subscriber = state.consensus.outbound.subscribe(); + let len = subscriber.borrow().current().len(); + gauges.consensus_outbound_connections.set(len); + gauges + }) + }); + if register_result.is_err() { + tracing::warn!("Failed registering network metrics collector: already registered"); + } + } +} diff --git a/node/actors/network/src/noise/stream.rs b/node/actors/network/src/noise/stream.rs index adf94b70..d3b97544 100644 --- a/node/actors/network/src/noise/stream.rs +++ b/node/actors/network/src/noise/stream.rs @@ -1,9 +1,9 @@ //! `tokio::io` stream using Noise encryption. use super::bytes; +use crate::metrics::MeteredStream; use concurrency::{ ctx, io, io::{AsyncRead as _, AsyncWrite as _}, - net, }; use crypto::{sha256::Sha256, ByteFmt}; use std::{ @@ -32,65 +32,6 @@ fn params() -> snow::params::NoiseParams { } } -impl Stream { - /// Performs a server-side noise handshake and returns the encrypted stream. - pub(crate) async fn server_handshake( - ctx: &ctx::Ctx, - s: net::tcp::Stream, - ) -> anyhow::Result { - Self::handshake(ctx, s, snow::Builder::new(params()).build_responder()?).await - } - - /// Performs a client-side noise handshake and returns the encrypted stream. - pub(crate) async fn client_handshake( - ctx: &ctx::Ctx, - s: net::tcp::Stream, - ) -> anyhow::Result { - Self::handshake(ctx, s, snow::Builder::new(params()).build_initiator()?).await - } - - /// Performs the noise handshake given the HandshakeState. - async fn handshake( - ctx: &ctx::Ctx, - mut stream: net::tcp::Stream, - mut hs: snow::HandshakeState, - ) -> anyhow::Result { - let mut buf = vec![0; 65536]; - let mut payload = vec![]; - loop { - if hs.is_handshake_finished() { - return Ok(Stream { - id: ByteFmt::decode(hs.get_handshake_hash()).unwrap(), - inner: stream, - noise: hs.into_transport_mode()?, - read_buf: Box::default(), - write_buf: Box::default(), - }); - } - if hs.is_my_turn() { - let n = hs.write_message(&payload, &mut buf)?; - // TODO(gprusak): writing/reading length field and the frame content could be - // done in a single syscall. - io::write_all(ctx, &mut stream, &u16::to_le_bytes(n as u16)).await??; - io::write_all(ctx, &mut stream, &buf[..n]).await??; - io::flush(ctx, &mut stream).await??; - } else { - let mut msg_size = [0u8, 2]; - io::read_exact(ctx, &mut stream, &mut msg_size).await??; - let n = u16::from_le_bytes(msg_size) as usize; - io::read_exact(ctx, &mut stream, &mut buf[..n]).await??; - hs.read_message(&buf[..n], &mut payload)?; - } - } - } - - /// Returns the noise session id. - /// See `Stream::id`. - pub(crate) fn id(&self) -> Sha256 { - self.id - } -} - // Constants from the Noise spec. /// Maximal size of the encrypted frame that Noise may output. @@ -130,16 +71,14 @@ impl Default for Buffer { /// Encrypted stream. /// It implements tokio::io::AsyncRead/AsyncWrite. -#[pin_project::pin_project(project=StreamProject)] -pub(crate) struct Stream { +#[pin_project::pin_project(project = StreamProject)] +pub(crate) struct Stream { /// Hash of the handshake messages. /// Uniquely identifies the noise session. id: Sha256, /// Underlying TCP stream. - /// TODO(gprusak): we can generalize noise::Stream to wrap an arbitrary - /// stream if needed. #[pin] - inner: net::tcp::Stream, + inner: S, /// Noise protocol state, used to encrypt/decrypt frames. noise: snow::TransportState, /// Buffers used for the read half of the stream. @@ -148,12 +87,66 @@ pub(crate) struct Stream { write_buf: Box, } -impl Stream { +impl Stream +where + S: io::AsyncRead + io::AsyncWrite + Unpin, +{ + /// Performs a server-side noise handshake and returns the encrypted stream. + pub(crate) async fn server_handshake(ctx: &ctx::Ctx, stream: S) -> anyhow::Result { + Self::handshake(ctx, stream, snow::Builder::new(params()).build_responder()?).await + } + + /// Performs a client-side noise handshake and returns the encrypted stream. + pub(crate) async fn client_handshake(ctx: &ctx::Ctx, stream: S) -> anyhow::Result { + Self::handshake(ctx, stream, snow::Builder::new(params()).build_initiator()?).await + } + + /// Performs the noise handshake given the HandshakeState. + async fn handshake( + ctx: &ctx::Ctx, + mut stream: S, + mut hs: snow::HandshakeState, + ) -> anyhow::Result { + let mut buf = vec![0; 65536]; + let mut payload = vec![]; + loop { + if hs.is_handshake_finished() { + return Ok(Self { + id: ByteFmt::decode(hs.get_handshake_hash()).unwrap(), + inner: stream, + noise: hs.into_transport_mode()?, + read_buf: Box::default(), + write_buf: Box::default(), + }); + } + if hs.is_my_turn() { + let n = hs.write_message(&payload, &mut buf)?; + // TODO(gprusak): writing/reading length field and the frame content could be + // done in a single syscall. + io::write_all(ctx, &mut stream, &u16::to_le_bytes(n as u16)).await??; + io::write_all(ctx, &mut stream, &buf[..n]).await??; + io::flush(ctx, &mut stream).await??; + } else { + let mut msg_size = [0u8, 2]; + io::read_exact(ctx, &mut stream, &mut msg_size).await??; + let n = u16::from_le_bytes(msg_size) as usize; + io::read_exact(ctx, &mut stream, &mut buf[..n]).await??; + hs.read_message(&buf[..n], &mut payload)?; + } + } + } + + /// Returns the noise session id. + /// See `Stream::id`. + pub(crate) fn id(&self) -> Sha256 { + self.id + } + /// Wait until a frame is fully loaded. /// Returns the size of the frame. /// Returns None in case EOF is reached before the frame is loaded. fn poll_read_frame( - this: &mut StreamProject<'_>, + this: &mut StreamProject<'_, S>, cx: &mut Context<'_>, ) -> Poll>> { // Fetch frame until complete. @@ -179,7 +172,7 @@ impl Stream { /// Wait until payload is nonempty. fn poll_read_payload( - this: &mut StreamProject<'_>, + this: &mut StreamProject<'_, S>, cx: &mut Context<'_>, ) -> Poll> { if this.read_buf.payload.len() > 0 { @@ -203,7 +196,10 @@ impl Stream { } } -impl io::AsyncRead for Stream { +impl io::AsyncRead for Stream +where + S: io::AsyncRead + io::AsyncWrite + Unpin, +{ /// From tokio::io::AsyncRead: /// * The amount of data read can be determined by the increase /// in the length of the slice returned by ReadBuf::filled. @@ -227,9 +223,15 @@ impl io::AsyncRead for Stream { } } -impl Stream { +impl Stream +where + S: io::AsyncRead + io::AsyncWrite + Unpin, +{ /// poll_flush_frame will either flush this.write_buf.frame, or return an error. - fn poll_flush_frame(this: &mut StreamProject, cx: &mut Context<'_>) -> Poll> { + fn poll_flush_frame( + this: &mut StreamProject<'_, S>, + cx: &mut Context<'_>, + ) -> Poll> { while this.write_buf.frame.len() > 0 { let n = ready!(Pin::new(&mut this.inner).poll_write(cx, this.write_buf.frame.as_slice()))?; @@ -242,7 +244,10 @@ impl Stream { } /// poll_flush_payload will either flush this.write_buf.payload, or return an error. - fn poll_flush_payload(this: &mut StreamProject, cx: &mut Context<'_>) -> Poll> { + fn poll_flush_payload( + this: &mut StreamProject<'_, S>, + cx: &mut Context<'_>, + ) -> Poll> { if this.write_buf.payload.len() == 0 { return Poll::Ready(Ok(())); } @@ -266,7 +271,10 @@ impl Stream { } } -impl io::AsyncWrite for Stream { +impl io::AsyncWrite for Stream +where + S: io::AsyncRead + io::AsyncWrite + Unpin, +{ /// from futures::io::AsyncWrite: /// * poll_write must try to make progress by flushing if needed to become writable /// from std::io::Write: diff --git a/node/actors/network/src/noise/testonly.rs b/node/actors/network/src/noise/testonly.rs index 425e2f99..9304c6d2 100644 --- a/node/actors/network/src/noise/testonly.rs +++ b/node/actors/network/src/noise/testonly.rs @@ -1,12 +1,17 @@ -use crate::noise; -use concurrency::{ctx, net, scope}; +use crate::{metrics, noise}; +use concurrency::{ctx, scope}; pub(crate) async fn pipe(ctx: &ctx::Ctx) -> (noise::Stream, noise::Stream) { scope::run!(ctx, |ctx, s| async { - let (s1, s2) = net::tcp::testonly::pipe(ctx).await; - let s1 = s.spawn(async { noise::Stream::client_handshake(ctx, s1).await }); - let s2 = s.spawn(async { noise::Stream::server_handshake(ctx, s2).await }); - Ok((s1.join(ctx).await?, s2.join(ctx).await?)) + let (outbound_stream, inbound_stream) = metrics::MeteredStream::test_pipe(ctx).await; + let outbound_task = + s.spawn(async { noise::Stream::client_handshake(ctx, outbound_stream).await }); + let inbound_task = + s.spawn(async { noise::Stream::server_handshake(ctx, inbound_stream).await }); + Ok(( + outbound_task.join(ctx).await?, + inbound_task.join(ctx).await?, + )) }) .await .unwrap() diff --git a/node/actors/network/src/preface.rs b/node/actors/network/src/preface.rs index 7641468c..37542746 100644 --- a/node/actors/network/src/preface.rs +++ b/node/actors/network/src/preface.rs @@ -7,8 +7,8 @@ //! //! Hence, the preface protocol is used to enable encryption //! and multiplex between mutliple endpoints available on the same TCP port. -use crate::{frame, noise}; -use concurrency::{ctx, net, time}; +use crate::{frame, metrics, noise}; +use concurrency::{ctx, time}; use schema::{proto::network::preface as proto, required, ProtoFmt}; /// Timeout on executing the preface protocol. @@ -79,7 +79,7 @@ pub(crate) async fn connect( endpoint: Endpoint, ) -> anyhow::Result { let ctx = &ctx.with_timeout(TIMEOUT); - let mut stream = net::tcp::connect(ctx, addr).await??; + let mut stream = metrics::MeteredStream::connect(ctx, addr).await??; frame::send_proto(ctx, &mut stream, &Encryption::NoiseNN).await?; let mut stream = noise::Stream::client_handshake(ctx, stream).await?; frame::send_proto(ctx, &mut stream, &endpoint).await?; @@ -89,7 +89,7 @@ pub(crate) async fn connect( /// Performs a server-side preface protocol. pub(crate) async fn accept( ctx: &ctx::Ctx, - mut stream: net::tcp::Stream, + mut stream: metrics::MeteredStream, ) -> anyhow::Result<(noise::Stream, Endpoint)> { let ctx = &ctx.with_timeout(TIMEOUT); let _: Encryption = frame::recv_proto(ctx, &mut stream).await?; diff --git a/node/actors/network/src/rpc/metrics.rs b/node/actors/network/src/rpc/metrics.rs new file mode 100644 index 00000000..a8770168 --- /dev/null +++ b/node/actors/network/src/rpc/metrics.rs @@ -0,0 +1,108 @@ +//! Metrics for RPCs. + +use super::Rpc; +use std::{any::Any, time::Duration}; +use vise::{ + Buckets, EncodeLabelSet, EncodeLabelValue, Family, Gauge, Histogram, LabeledFamily, Metrics, + Unit, +}; + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, EncodeLabelValue)] +#[metrics(rename_all = "snake_case")] +pub(super) enum CallLatencyType { + ClientSendRecv, + ServerRecvSend, + ServerProcess, +} + +impl CallLatencyType { + pub(super) fn to_labels( + self, + req: &R::Req, + result: &anyhow::Result, + ) -> CallLatencyLabels { + CallLatencyLabels { + r#type: self, + method: R::METHOD, + submethod: R::submethod(req), + result: match result { + Ok(_) => ResultLabel::Ok, + Err(_) => ResultLabel::Err, + }, + } + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, EncodeLabelValue)] +#[metrics(rename_all = "snake_case")] +enum ResultLabel { + Ok, + Err, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, EncodeLabelSet)] +pub(super) struct CallLatencyLabels { + r#type: CallLatencyType, + method: &'static str, + submethod: &'static str, + result: ResultLabel, +} + +impl CallLatencyLabels { + pub(super) fn set_result(&mut self, result: &anyhow::Result) { + self.result = match result { + Ok(_) => ResultLabel::Ok, + Err(_) => ResultLabel::Err, + }; + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, EncodeLabelValue)] +#[metrics(rename_all = "snake_case")] +pub(super) enum CallType { + Client, + Server, + ReqSent, + ReqRecv, + RespSent, + RespRecv, +} + +impl CallType { + pub(super) fn to_labels(self, req: &R::Req) -> CallLabels { + CallLabels { + r#type: self, + method: R::METHOD, + submethod: R::submethod(req), + } + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, EncodeLabelSet)] +pub(super) struct CallLabels { + r#type: CallType, + method: &'static str, + submethod: &'static str, +} + +const MESSAGE_SIZE_BUCKETS: Buckets = + Buckets::exponential(schema::kB as f64..=schema::MB as f64, 2.0); + +#[derive(Debug, Metrics)] +#[metrics(prefix = "network_rpc")] +pub(super) struct RpcMetrics { + /// Latency of RPCs in seconds. + #[metrics(unit = Unit::Seconds, buckets = Buckets::LATENCIES)] + pub(super) latency: Family>, + /// Current number of executing RPCs. + pub(super) inflight: Family, + /// RPC message sizes in bytes. + #[metrics(unit = Unit::Bytes, buckets = MESSAGE_SIZE_BUCKETS)] + pub(super) message_size: Family>, + /// Time that client waits for the server to prepare a stream for an RPC call. + #[metrics(unit = Unit::Seconds, buckets = Buckets::LATENCIES, labels = ["method"])] + pub(super) call_reserve_latency: LabeledFamily<&'static str, Histogram>, +} + +#[vise::register] +pub(super) static RPC_METRICS: vise::Global = vise::Global::new(); diff --git a/node/actors/network/src/rpc/mod.rs b/node/actors/network/src/rpc/mod.rs index 357bf5b8..b77b359f 100644 --- a/node/actors/network/src/rpc/mod.rs +++ b/node/actors/network/src/rpc/mod.rs @@ -14,13 +14,19 @@ //! ``` //! You can construct an Rpc service with multiple servers and clients //! at the same time (max 1 client + server per CapabilityId). + +use self::metrics::{CallLatencyType, CallType, RPC_METRICS}; use crate::{frame, mux}; use anyhow::Context as _; -use concurrency::{ctx, io, limiter, metrics, scope}; -use once_cell::sync::Lazy; +use concurrency::{ + ctx, io, limiter, + metrics::{GaugeGuard, LatencyHistogramExt as _}, + scope, +}; use std::{collections::BTreeMap, sync::Arc}; pub(crate) mod consensus; +mod metrics; pub(crate) mod ping; pub(crate) mod sync_blocks; pub(crate) mod sync_validator_addrs; @@ -29,52 +35,6 @@ pub(crate) mod testonly; #[cfg(test)] mod tests; -static RPC_LATENCY: Lazy = Lazy::new(|| { - prometheus::register_histogram_vec!( - "network_rpc_latency", - "latency of Rpcs in seconds", - &["type", "method", "submethod", "result"], - prometheus::exponential_buckets(0.01, 1.5, 20).unwrap(), - ) - .unwrap() -}); -static RPC_INFLIGHT: Lazy = Lazy::new(|| { - prometheus::register_int_gauge_vec!( - "network_rpc_inflight", - "Rpcs inflight", - &["type", "method", "submethod"] - ) - .unwrap() -}); -static RPC_MESSAGE_SIZE: Lazy = Lazy::new(|| { - prometheus::register_histogram_vec!( - "network_rpc_message_size", - "message sizes in bytes", - &["type", "method", "submethod"], - ) - .unwrap() -}); -static RPC_CALL_RESERVE_LATENCY: Lazy = Lazy::new(|| { - prometheus::register_histogram_vec!( - "network_rpc_call_reserve_latency", - "time that client waits for the server to prepare a stream for an RPC call", - &["method"], - ) - .unwrap() -}); - -const CLIENT_SEND_RECV: &str = "client_send_recv"; -const SERVER_RECV_SEND: &str = "server_recv_send"; -const SERVER_PROCESS: &str = "server_process"; - -const REQ_SENT: &str = "req_sent"; -const REQ_RECV: &str = "req_recv"; -const RESP_SENT: &str = "resp_sent"; -const RESP_RECV: &str = "resp_recv"; - -const OK: &str = "ok"; -const ERR: &str = "err"; - const MUX_CONFIG: mux::Config = mux::Config { read_buffer_size: 160 * schema::kB as u64, read_frame_size: 16 * schema::kB as u64, @@ -131,31 +91,20 @@ impl<'a, R: Rpc> ReservedCall<'a, R> { let mut stream = self.stream.open(ctx).await??; drop(self.permit); let res = async { - let _guard = metrics::GaugeGuard::from(RPC_INFLIGHT.with_label_values(&[ - "client", - R::submethod(req), - R::METHOD, - ])); + let metric_labels = CallType::Client.to_labels::(req); + let _guard = GaugeGuard::from(RPC_METRICS.inflight[&metric_labels].clone()); let msg_size = frame::mux_send_proto(ctx, &mut stream.write, req).await?; - RPC_MESSAGE_SIZE - .with_label_values(&[REQ_SENT, R::METHOD, R::submethod(req)]) - .observe(msg_size as f64); + RPC_METRICS.message_size[&CallType::ReqSent.to_labels::(req)].observe(msg_size); drop(stream.write); frame::mux_recv_proto(ctx, &mut stream.read).await } .await; - let res_label = match res { - Ok(_) => OK, - Err(_) => ERR, - }; + let now = ctx.now(); - RPC_LATENCY - .with_label_values(&[CLIENT_SEND_RECV, R::METHOD, R::submethod(req), res_label]) - .observe((now - send_time).as_seconds_f64()); + let metric_labels = CallLatencyType::ClientSendRecv.to_labels::(req, &res); + RPC_METRICS.latency[&metric_labels].observe_latency(now - send_time); let (res, msg_size) = res?; - RPC_MESSAGE_SIZE - .with_label_values(&[RESP_RECV, R::METHOD, R::submethod(req)]) - .observe(msg_size as f64); + RPC_METRICS.message_size[&CallType::RespRecv.to_labels::(req)].observe(msg_size); Ok(res) } } @@ -189,9 +138,7 @@ impl Client { .reserve(ctx) .await .context("StreamQueue::open()")?; - RPC_CALL_RESERVE_LATENCY - .with_label_values(&[R::METHOD]) - .observe((ctx.now() - reserve_time).as_seconds_f64()); + RPC_METRICS.call_reserve_latency[&R::METHOD].observe_latency(ctx.now() - reserve_time); Ok(ReservedCall { stream, permit, @@ -241,43 +188,30 @@ impl> ServerTrait for Server { let recv_time = ctx.now(); let (req, msg_size) = frame::mux_recv_proto::(ctx, &mut stream.read).await?; - let submethod = R::submethod(&req); - RPC_MESSAGE_SIZE - .with_label_values(&[REQ_RECV, R::METHOD, submethod]) - .observe(msg_size as f64); + + let size_labels = CallType::ReqRecv.to_labels::(&req); + let resp_size_labels = CallType::RespSent.to_labels::(&req); + RPC_METRICS.message_size[&size_labels].observe(msg_size); + let inflight_labels = CallType::Server.to_labels::(&req); let _guard = - metrics::GaugeGuard::from(RPC_INFLIGHT.with_label_values(&[ - "server", - R::METHOD, - submethod, - ])); + GaugeGuard::from(RPC_METRICS.inflight[&inflight_labels].clone()); + let mut server_process_labels = + CallLatencyType::ServerProcess.to_labels::(&req, &Ok(())); + let mut recv_send_labels = + CallLatencyType::ServerRecvSend.to_labels::(&req, &Ok(())); + let process_time = ctx.now(); let res = self.handler.handle(ctx, req).await.context(R::METHOD); - let res_label = match res { - Ok(_) => OK, - Err(_) => ERR, - }; - RPC_LATENCY - .with_label_values(&[ - SERVER_PROCESS, - R::METHOD, - submethod, - res_label, - ]) - .observe((ctx.now() - process_time).as_seconds_f64()); + server_process_labels.set_result(&res); + RPC_METRICS.latency[&server_process_labels] + .observe_latency(ctx.now() - process_time); + let res = frame::mux_send_proto(ctx, &mut stream.write, &res?).await; - RPC_LATENCY - .with_label_values(&[ - SERVER_RECV_SEND, - R::METHOD, - submethod, - res_label, - ]) - .observe((ctx.now() - recv_time).as_seconds_f64()); + recv_send_labels.set_result(&res); + RPC_METRICS.latency[&recv_send_labels] + .observe_latency(ctx.now() - recv_time); let msg_size = res?; - RPC_MESSAGE_SIZE - .with_label_values(&[RESP_SENT, R::METHOD, submethod]) - .observe(msg_size as f64); + RPC_METRICS.message_size[&resp_size_labels].observe(msg_size); anyhow::Ok(()) } .await; diff --git a/node/actors/network/src/state.rs b/node/actors/network/src/state.rs index 75df3d32..ee2f9ef4 100644 --- a/node/actors/network/src/state.rs +++ b/node/actors/network/src/state.rs @@ -1,8 +1,8 @@ //! Network actor maintaining a pool of outbound and inbound connections to other nodes. -use super::{consensus, event::Event, gossip, preface}; +use super::{consensus, event::Event, gossip, metrics, preface}; use crate::io::{InputMessage, OutputMessage, SyncState}; use anyhow::Context as _; -use concurrency::{ctx, ctx::channel, metrics, net, scope, sync::watch}; +use concurrency::{ctx, ctx::channel, net, scope, sync::watch}; use std::sync::Arc; use utils::pipe::ActorPipe; @@ -51,6 +51,11 @@ impl State { pub fn cfg(&self) -> &Config { &self.cfg } + + /// Registers metrics for this state. + pub fn register_metrics(self: &Arc) { + metrics::NetworkGauges::register(Arc::downgrade(self)); + } } /// Runs the network actor. @@ -97,7 +102,7 @@ pub async fn run_network( }); // TODO(gprusak): add rate limit and inflight limit for inbound handshakes. - while let Ok(stream) = net::tcp::accept(ctx, &mut listener).await { + while let Ok(stream) = metrics::MeteredStream::listen(ctx, &mut listener).await { let stream = stream.context("listener.accept()")?; s.spawn(async { let res = async { @@ -126,31 +131,3 @@ pub async fn run_network( }) .await } - -impl State { - /// Collection of gauges monitoring the state, - /// which can be added to prometheus registry. - pub fn collector(self: &Arc) -> metrics::Collector { - metrics::Collector::new(Arc::downgrade(self)) - .gauge( - "network_gossip_inbound_connections", - "number of active gossipnet inbound connections", - |s| s.gossip.inbound.subscribe().borrow().current().len() as f64, - ) - .gauge( - "network_gossip_outbound_connections", - "number of active gossipnet outbound connections", - |s| s.gossip.outbound.subscribe().borrow().current().len() as f64, - ) - .gauge( - "network_consensus_inbound_connections", - "number of active consensusnet inbound connections", - |s| s.consensus.inbound.subscribe().borrow().current().len() as f64, - ) - .gauge( - "network_consensus_outbound_connections", - "number of active consensusnet outbound connections", - |s| s.consensus.outbound.subscribe().borrow().current().len() as f64, - ) - } -} diff --git a/node/actors/network/src/tests.rs b/node/actors/network/src/tests.rs index 7ccbfa0c..89eb5c4c 100644 --- a/node/actors/network/src/tests.rs +++ b/node/actors/network/src/tests.rs @@ -20,11 +20,12 @@ async fn test_metrics() { ); } testonly::instant_network(ctx, nodes.iter()).await?; - let r = prometheus::Registry::new(); - r.register(Box::new(nodes[0].state.collector())).unwrap(); - let enc = prometheus::TextEncoder::new(); - let body = enc.encode_to_string(&r.gather())?; - tracing::info!("stats =\n{}", body); + + let registry = vise::Registry::collect(); + nodes[0].state().register_metrics(); + let mut encoded_metrics = String::new(); + registry.encode(&mut encoded_metrics, vise::Format::OpenMetricsForPrometheus)?; + tracing::info!("stats =\n{encoded_metrics}"); Ok(()) }) .await diff --git a/node/deny.toml b/node/deny.toml index 02e8a190..dd10be9e 100644 --- a/node/deny.toml +++ b/node/deny.toml @@ -61,3 +61,8 @@ skip = [ # Old versions required by hyper. { name = "socket2", version = "=0.4.9" }, ] + +[sources] +unknown-registry = "deny" +unknown-git = "deny" +allow-org = { github = ["matter-labs"] } diff --git a/node/libs/concurrency/Cargo.toml b/node/libs/concurrency/Cargo.toml index 8769fb4a..bbad2377 100644 --- a/node/libs/concurrency/Cargo.toml +++ b/node/libs/concurrency/Cargo.toml @@ -12,7 +12,6 @@ futures.workspace = true hyper.workspace = true once_cell.workspace = true pin-project.workspace = true -prometheus.workspace = true rand.workspace = true sha2.workspace = true thiserror.workspace = true @@ -20,3 +19,4 @@ time.workspace = true tokio.workspace = true tracing.workspace = true tracing-subscriber.workspace = true +vise.workspace = true diff --git a/node/libs/concurrency/src/ctx/mod.rs b/node/libs/concurrency/src/ctx/mod.rs index 98794e99..00b732a0 100644 --- a/node/libs/concurrency/src/ctx/mod.rs +++ b/node/libs/concurrency/src/ctx/mod.rs @@ -171,6 +171,13 @@ impl Ctx { CtxAware(self.0.canceled.cancel_safe_recv()) } + /// Awaits until the local context gets canceled. Unlike [`Self::canceled()`], the returned + /// future has a static lifetime. + pub fn canceled_owned(&self) -> impl Future { + let canceled = self.0.canceled.clone(); + async move { canceled.cancel_safe_recv().await } + } + /// Checks if the context is still active (i.e., not canceled). pub fn is_active(&self) -> bool { !self.0.canceled.try_recv() diff --git a/node/libs/concurrency/src/metrics.rs b/node/libs/concurrency/src/metrics.rs index 28b1f74c..23698628 100644 --- a/node/libs/concurrency/src/metrics.rs +++ b/node/libs/concurrency/src/metrics.rs @@ -1,110 +1,47 @@ //! Prometheus metrics utilities. -use crate::ctx; -use prometheus::core::{Atomic, GenericGauge}; -use std::{collections::HashMap, sync::Weak}; + +use std::time::Duration; +use vise::Gauge; /// Guard which increments the gauge when constructed /// and decrements it when dropped. -pub struct GaugeGuard(GenericGauge

); +pub struct GaugeGuard(Gauge); -impl From> for GaugeGuard

{ - fn from(g: GenericGauge

) -> Self { - g.inc(); - Self(g) +impl From for GaugeGuard { + fn from(gauge: Gauge) -> Self { + gauge.inc_by(1); + Self(gauge) } } -impl Drop for GaugeGuard

{ +impl Drop for GaugeGuard { fn drop(&mut self) { - self.0.dec(); + self.0.dec_by(1); } } -type Fetcher = dyn Send + Sync + Fn(&T) -> prometheus::proto::MetricFamily; - -/// Collection of metrics. Implements prometheus::core::Collector, -/// it can be used to avoid embedding gauges in objects: -/// instead you can register a collector which will on demand gather -/// the metrics from the specificied object. -pub struct Collector { - t: Weak, - descs: Vec, - builders: Vec>>, +/// Extension trait for latency histograms. +pub trait LatencyHistogramExt { + /// Observes latency. + fn observe_latency(&self, latency: time::Duration); } -impl Collector { - /// Constructs a new noop Collector. - pub fn new(t: Weak) -> Self { - Self { - t, - descs: vec![], - builders: vec![], - } - } - /// Adds a gauge to the Collector. - /// `f` is expected to fetch the current gauge value. - pub fn gauge( - mut self, - name: &'static str, - help: &'static str, - f: impl 'static + Send + Sync + Fn(&T) -> f64, - ) -> Self { - self.descs.push( - prometheus::core::Desc::new(name.to_string(), help.to_string(), vec![], HashMap::new()) - .unwrap(), - ); - self.builders.push(Box::new(move |t| { - let mut mf = prometheus::proto::MetricFamily::default(); - mf.set_field_type(prometheus::proto::MetricType::GAUGE); - mf.set_name(name.to_string()); - mf.set_help(help.to_string()); - mf.mut_metric().push_default().mut_gauge().set_value(f(t)); - mf - })); - self +impl LatencyHistogramExt for vise::Histogram { + fn observe_latency(&self, latency: time::Duration) { + let latency = Duration::try_from(latency).unwrap_or(Duration::ZERO); + self.observe(latency); } } -impl prometheus::core::Collector for Collector { - fn desc(&self) -> Vec<&prometheus::core::Desc> { - self.descs.iter().collect() - } - fn collect(&self) -> Vec { - let Some(t) = self.t.upgrade() else { - return vec![]; - }; - self.builders.iter().map(|b| b(&t)).collect() - } +/// Extension trait for latency gauges. +pub trait LatencyGaugeExt { + /// Sets the gauge value. + fn set_latency(&self, latency: time::Duration); } -const TEXT_CONTENT_TYPE: &str = "application/openmetrics-text; version=1.0.0; charset=utf-8"; - -/// Runs and HTTP server on the specified address, which exposes the following endpoints: -/// -/// - `GET` on any path: serves the metrics from the default prometheus registry -/// in the Open Metrics text format -pub async fn run_server(ctx: &ctx::Ctx, bind_address: std::net::SocketAddr) -> anyhow::Result<()> { - let serve = |_| async { - let reg = prometheus::default_registry(); - let enc = prometheus::TextEncoder::new(); - let body = enc.encode_to_string(®.gather())?; - tracing::info!("HTTP request"); - anyhow::Ok( - hyper::Response::builder() - .status(hyper::StatusCode::OK) - .header(hyper::header::CONTENT_TYPE, TEXT_CONTENT_TYPE) - .body(body) - .unwrap(), - ) - }; - // For some unknown reasons `make_service` has to be static. - let make_service = hyper::service::make_service_fn(|_| async move { - tracing::info!("HTTP connection"); - anyhow::Ok(hyper::service::service_fn(serve)) - }); - hyper::Server::try_bind(&bind_address)? - .serve(make_service) - .with_graceful_shutdown(ctx.canceled()) - .await?; - Ok(()) +impl LatencyGaugeExt for Gauge { + fn set_latency(&self, latency: time::Duration) { + let latency = Duration::try_from(latency).unwrap_or(Duration::ZERO); + self.set(latency); + } } diff --git a/node/libs/concurrency/src/net/tcp/mod.rs b/node/libs/concurrency/src/net/tcp/mod.rs index c2bf684e..097b4e3f 100644 --- a/node/libs/concurrency/src/net/tcp/mod.rs +++ b/node/libs/concurrency/src/net/tcp/mod.rs @@ -3,73 +3,26 @@ //! algorithm (so that the transmission latency is more //! predictable), so the caller is expected to apply //! user space buffering. -use crate::{ctx, metrics}; +use crate::ctx; pub use listener_addr::*; -use once_cell::sync::Lazy; -use std::{ - pin::Pin, - task::{ready, Context, Poll}, -}; use tokio::io; mod listener_addr; pub mod testonly; -static BYTES_SENT: Lazy = Lazy::new(|| { - prometheus::register_int_counter!( - "concurrency_net_tcp__bytes_sent", - "bytes sent over TCP connections" - ) - .unwrap() -}); -static BYTES_RECV: Lazy = Lazy::new(|| { - prometheus::register_int_counter!( - "concurrency_net_tcp__bytes_recv", - "bytes received over TCP connections" - ) - .unwrap() -}); -static ESTABLISHED: Lazy = Lazy::new(|| { - prometheus::register_int_counter_vec!( - "concurrency_net_tcp__established", - "TCP connections established since the process started", - &["direction"] - ) - .unwrap() -}); -static ACTIVE: Lazy = Lazy::new(|| { - prometheus::register_int_gauge_vec!( - "concurrency_net_tcp__active", - "currently active TCP connections", - &["direction"] - ) - .unwrap() -}); - /// TCP stream. -#[pin_project::pin_project] -pub struct Stream { - #[pin] - stream: tokio::net::TcpStream, - _active: metrics::GaugeGuard, -} - +pub type Stream = tokio::net::TcpStream; /// TCP listener. pub type Listener = tokio::net::TcpListener; /// Accepts an INBOUND listener connection. pub async fn accept(ctx: &ctx::Ctx, this: &mut Listener) -> ctx::OrCanceled> { - const INBOUND: &[&str] = &["inbound"]; - Ok(ctx.wait(this.accept()).await?.map(|stream| { - ESTABLISHED.with_label_values(INBOUND).inc(); + Ok(ctx.wait(this.accept()).await?.map(|(stream, _)| { // We are the only owner of the correctly opened // socket at this point so `set_nodelay` should // always succeed. - stream.0.set_nodelay(true).unwrap(); - Stream { - stream: stream.0, - _active: ACTIVE.with_label_values(INBOUND).into(), - } + stream.set_nodelay(true).unwrap(); + stream })) } @@ -78,55 +31,14 @@ pub async fn connect( ctx: &ctx::Ctx, addr: std::net::SocketAddr, ) -> ctx::OrCanceled> { - const OUTBOUND: &[&str] = &["outbound"]; Ok(ctx .wait(tokio::net::TcpStream::connect(addr)) .await? .map(|stream| { - ESTABLISHED.with_label_values(OUTBOUND).inc(); // We are the only owner of the correctly opened // socket at this point so `set_nodelay` should // always succeed. stream.set_nodelay(true).unwrap(); - Stream { - stream, - _active: ACTIVE.with_label_values(OUTBOUND).into(), - } + stream })) } - -impl io::AsyncRead for Stream { - #[inline(always)] - fn poll_read( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &mut io::ReadBuf<'_>, - ) -> Poll> { - let this = self.project(); - let before = buf.remaining(); - let res = this.stream.poll_read(cx, buf); - let after = buf.remaining(); - BYTES_RECV.inc_by((before - after) as u64); - res - } -} - -impl io::AsyncWrite for Stream { - #[inline(always)] - fn poll_write(self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll> { - let this = self.project(); - let res = ready!(this.stream.poll_write(cx, buf))?; - BYTES_SENT.inc_by(res as u64); - Poll::Ready(Ok(res)) - } - - #[inline(always)] - fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - self.project().stream.poll_flush(cx) - } - - #[inline(always)] - fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - self.project().stream.poll_shutdown(cx) - } -}