diff --git a/.gitignore b/.gitignore index f276657ea4a..0fcad7afbab 100644 --- a/.gitignore +++ b/.gitignore @@ -101,7 +101,6 @@ prestorecon.o **/bin/ic-consensus-pool-util **/bin/ic-crypto-csp **/bin/ic-regedit -**/bin/ic-onchain-observability-adapter **/bin/orchestrator **/bin/replica **/bin/state-tool diff --git a/.gitlab/CODEOWNERS b/.gitlab/CODEOWNERS index bb5d1ca8a0d..4a8c2023b9b 100644 --- a/.gitlab/CODEOWNERS +++ b/.gitlab/CODEOWNERS @@ -162,7 +162,6 @@ go_deps.bzl @dfinity-lab/teams/idx /rs/monitoring/adapter_metrics_server/ @dfinity-lab/teams/networking-team /rs/monitoring/adapter_metrics_service/ @dfinity-lab/teams/networking-team /rs/monitoring/backtrace/ @dfinity-lab/teams/message-routing-owners @dfinity-lab/teams/networking-team -/rs/monitoring/onchain_observability/ @dfinity-lab/teams/networking-team /rs/nervous_system/ @dfinity-lab/teams/nns-team /rs/nns/ @dfinity-lab/teams/nns-team /rs/observability/ @dfinity-lab/teams/DRE diff --git a/Cargo.lock b/Cargo.lock index 8df8fcb0e31..fa493a8c5f0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5648,28 +5648,6 @@ dependencies = [ "strum_macros 0.23.1", ] -[[package]] -name = "ic-certified-blockchain-lib" -version = "0.1.0" -dependencies = [ - "candid 0.8.4", - "hex", - "ic-cdk 0.7.4", - "ic-cdk-macros 0.6.10", - "ic-cdk-timers", - "ic-certified-map", - "ic-stable-structures", - "num", - "num-derive", - "num-traits", - "prost", - "serde", - "serde_bytes", - "serde_cbor", - "sha2 0.10.6", - "url", -] - [[package]] name = "ic-certified-map" version = "0.3.4" @@ -8996,68 +8974,6 @@ dependencies = [ "syn 1.0.109", ] -[[package]] -name = "ic-onchain-observability-adapter" -version = "0.1.0" -dependencies = [ - "async-trait", - "candid 0.8.4", - "clap 3.2.23", - "ic-adapter-metrics-server", - "ic-async-utils", - "ic-base-types", - "ic-canister-client", - "ic-config", - "ic-crypto", - "ic-interfaces", - "ic-interfaces-registry", - "ic-logger", - "ic-metrics", - "ic-onchain-observability-service", - "ic-registry-client", - "ic-registry-client-helpers", - "ic-registry-local-store", - "ic-types", - "ic-types-test-utils", - "itertools 0.10.5", - "prometheus 0.12.0", - "prometheus-parse", - "rand 0.8.5", - "serde", - "serde_json", - "serde_with", - "sha2 0.9.9", - "slog", - "tempfile", - "thiserror", - "tokio", - "tonic", - "tower", - "url", -] - -[[package]] -name = "ic-onchain-observability-server" -version = "0.1.0" -dependencies = [ - "ic-adapter-metrics", - "ic-async-utils", - "ic-metrics", - "ic-onchain-observability-service", - "prometheus 0.12.0", - "tokio", - "tonic", -] - -[[package]] -name = "ic-onchain-observability-service" -version = "0.1.0" -dependencies = [ - "prost", - "tonic", - "tonic-build", -] - [[package]] name = "ic-p2p" version = "0.8.0" @@ -9802,7 +9718,6 @@ dependencies = [ "ic-messaging", "ic-metrics", "ic-nns-constants", - "ic-onchain-observability-server", "ic-p2p", "ic-pprof", "ic-protobuf", @@ -13372,17 +13287,6 @@ dependencies = [ "num-traits", ] -[[package]] -name = "num-derive" -version = "0.3.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "876a53fff98e03a936a674b29568b0e605f06b29372c2489ff4de23f1949743d" -dependencies = [ - "proc-macro2 1.0.66", - "quote 1.0.31", - "syn 1.0.109", -] - [[package]] name = "num-format" version = "0.4.4" @@ -14635,18 +14539,6 @@ dependencies = [ "thiserror", ] -[[package]] -name = "prometheus-parse" -version = "0.2.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ef7a8ed15bcffc55fe0328931ef20d393bb89ad704756a37bd20cffb4804f306" -dependencies = [ - "chrono", - "itertools 0.10.5", - "lazy_static", - "regex", -] - [[package]] name = "prometheus_config_updater" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index 95569b45e0c..42d13679803 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -144,10 +144,6 @@ members = [ "rs/monitoring/context_logger", "rs/monitoring/logger", "rs/monitoring/metrics", - "rs/monitoring/onchain_observability/adapter", - "rs/monitoring/onchain_observability/canister", - "rs/monitoring/onchain_observability/server", - "rs/monitoring/onchain_observability/service", "rs/monitoring/pprof", "rs/nervous_system/clients", "rs/nervous_system/common", diff --git a/ic-os/guestos/defs.bzl b/ic-os/guestos/defs.bzl index cebe2605503..6ab735bcde3 100644 --- a/ic-os/guestos/defs.bzl +++ b/ic-os/guestos/defs.bzl @@ -34,7 +34,6 @@ def image_deps(mode, malicious = False): "//publish/binaries:ic-consensus-pool-util": "/opt/ic/bin/ic-consensus-pool-util:0755", "//publish/binaries:ic-https-outcalls-adapter": "/opt/ic/bin/ic-https-outcalls-adapter:0755", "//publish/binaries:ic-crypto-csp": "/opt/ic/bin/ic-crypto-csp:0755", - "//publish/binaries:ic-onchain-observability-adapter": "/opt/ic/bin/ic-onchain-observability-adapter:0755", "//publish/binaries:ic-regedit": "/opt/ic/bin/ic-regedit:0755", "//publish/binaries:ic-recovery": "/opt/ic/bin/ic-recovery:0755", "//publish/binaries:orchestrator": "/opt/ic/bin/orchestrator:0755", diff --git a/ic-os/guestos/docs/ConfigStore.adoc b/ic-os/guestos/docs/ConfigStore.adoc index c37d7b78702..4bc60e3a52c 100644 --- a/ic-os/guestos/docs/ConfigStore.adoc +++ b/ic-os/guestos/docs/ConfigStore.adoc @@ -131,12 +131,6 @@ Must be a file of key/value pairs separated by "=" (one per line) with the follo - socks_proxy: URL of the socks proxy to use. E.g socks5://socksproxy.com:1080 -=== onchain_observability_overrides.json - -A json object containing config-values that will override the existing values in the onchain observability config. Key names must match those in the actual config. - -- onchain_observability_overrides: Struct containing the overrides - === bitcoin_addr.conf Configuration for bitcoin adapter. diff --git a/ic-os/guestos/rootfs/Dockerfile b/ic-os/guestos/rootfs/Dockerfile index 9553ad925f2..951b99f80c6 100644 --- a/ic-os/guestos/rootfs/Dockerfile +++ b/ic-os/guestos/rootfs/Dockerfile @@ -120,14 +120,6 @@ RUN addgroup ic-http-adapter && \ adduser --system --disabled-password --shell /usr/sbin/nologin -c "IC Canister HTTP Adapter" ic-http-adapter && \ adduser ic-http-adapter ic-http-adapter -# The "onchain-observability" account. Used to run `ic-onchain-observability-adapter` binary -# to send connectivity data to the observability canister. -RUN addgroup onchain-observability && \ - adduser --system --disabled-password --shell /usr/sbin/nologin -c "IC Onchain Observability Adapter" onchain-observability && \ - adduser onchain-observability onchain-observability && \ - adduser onchain-observability ic-csp-vault-socket && \ - adduser onchain-observability ic-registry-local-store - # User which will run the replica service. RUN adduser --system --disabled-password --home /var/lib/ic --group --no-create-home ic-replica && \ adduser ic-replica backup && \ @@ -135,7 +127,6 @@ RUN adduser --system --disabled-password --home /var/lib/ic --group --no-create- adduser ic-replica nonconfidential && \ adduser ic-replica ic-registry-local-store && \ adduser ic-replica ic-http-adapter && \ - adduser ic-replica onchain-observability && \ adduser ic-replica vsock # Accounts to allow remote access to state bits diff --git a/ic-os/guestos/rootfs/etc/systemd/system/ic-onchain-observability-adapter-metrics.socket b/ic-os/guestos/rootfs/etc/systemd/system/ic-onchain-observability-adapter-metrics.socket deleted file mode 100644 index 8d7bb56117c..00000000000 --- a/ic-os/guestos/rootfs/etc/systemd/system/ic-onchain-observability-adapter-metrics.socket +++ /dev/null @@ -1,12 +0,0 @@ -[Unit] -Description= Socket for metrics for the IC onchain observability adapter - -[Socket] -ListenStream=/run/ic-node/onchain-observability-adapter/metrics -Service=ic-onchain-observability-adapter.service -SocketUser=onchain-observability -SocketGroup=onchain-observability -SocketMode=0660 - -[Install] -WantedBy=sockets.target diff --git a/ic-os/guestos/rootfs/etc/systemd/system/ic-onchain-observability-adapter.service b/ic-os/guestos/rootfs/etc/systemd/system/ic-onchain-observability-adapter.service deleted file mode 100644 index 682441a4bdd..00000000000 --- a/ic-os/guestos/rootfs/etc/systemd/system/ic-onchain-observability-adapter.service +++ /dev/null @@ -1,30 +0,0 @@ -[Unit] -Description=Process for collecting metrics and sending to IC Onchain Observability Canister -# We must wait for IC bootstrap to complete: It writes various -# state files and may also be needed to obtain network config. -After=bootstrap-ic-node.service -Wants=bootstrap-ic-node.service -# For creating the crypto component, this needs ic-crypto-csp service running. -After=ic-crypto-csp.service -Wants=ic-crypto-csp.service -Requires=ic-onchain-observability-adapter-metrics.socket -StartLimitIntervalSec=0 - -[Service] -User=onchain-observability -Environment=RUST_BACKTRACE=1 -# When starting this service, ideally --replica-config-file would directly point to -# /run/ic-node/config/ic.json5, but this file may be not available yet as it is generated -# only on ExecStartPre by ic-replica.service. As a temporary workaround, we are generating -# an onchain-observability-adapter-service-specific version of ic.json5 of which only the parts related to crypto -# and registry are relevant. Then, we will merge this with adapter-specific-config to generate the full config -ExecStartPre=+/opt/ic/bin/generate-replica-config.sh -i /opt/ic/share/ic.json5.template -o /run/ic-node/config/onchain-observability-adapter-replica-config.json5 -ExecStartPre=+/opt/ic/bin/generate-onchain-observability-adapter-config.sh -o /run/ic-node/config/onchain-observability-adapter-specific-config.json -t /boot/config/onchain_observability_overrides.json -ExecStart=/opt/ic/bin/ic-onchain-observability-adapter --adapter-specific-config-file /run/ic-node/config/onchain-observability-adapter-specific-config.json --replica-config-file /run/ic-node/config/onchain-observability-adapter-replica-config.json5 -NotifyAccess=main -#TODO: once this service is complete, we can change to "always" -Restart=on-failure - -[Install] -WantedBy=ic-replica.service -Also=ic-onchain-observability-adapter-metrics.socket \ No newline at end of file diff --git a/ic-os/guestos/rootfs/etc/systemd/system/ic-onchain-observability-adapter.socket b/ic-os/guestos/rootfs/etc/systemd/system/ic-onchain-observability-adapter.socket deleted file mode 100644 index ddc71aff1f6..00000000000 --- a/ic-os/guestos/rootfs/etc/systemd/system/ic-onchain-observability-adapter.socket +++ /dev/null @@ -1,13 +0,0 @@ -[Unit] -Description= Replica socket to serve gRPC requests to the IC onchain observability adapter - -[Socket] -ListenStream=/run/ic-node/onchain-observability-adapter/socket -Service=ic-replica.service -SocketUser=onchain-observability -SocketGroup=onchain-observability -SocketMode=0660 - - -[Install] -WantedBy=sockets.target diff --git a/ic-os/guestos/rootfs/etc/systemd/system/ic-replica.service b/ic-os/guestos/rootfs/etc/systemd/system/ic-replica.service index a575c34c8a0..789152409a0 100644 --- a/ic-os/guestos/rootfs/etc/systemd/system/ic-replica.service +++ b/ic-os/guestos/rootfs/etc/systemd/system/ic-replica.service @@ -18,8 +18,6 @@ After=network-online.target BindsTo=network-online.target After=systemd-networkd.service PartOf=systemd-networkd.service -# Set up a socket to serve gRPC requests to the onchain observability adapter -Wants=ic-onchain-observability-adapter.socket StartLimitBurst=5 StartLimitIntervalSec=60 @@ -41,4 +39,3 @@ RestartSec=10 [Install] WantedBy=multi-user.target -Also=ic-onchain-observability-adapter.socket diff --git a/ic-os/guestos/rootfs/opt/ic/bin/bootstrap-ic-node.sh b/ic-os/guestos/rootfs/opt/ic/bin/bootstrap-ic-node.sh index e2547b0a7ac..e3eee70aadf 100755 --- a/ic-os/guestos/rootfs/opt/ic/bin/bootstrap-ic-node.sh +++ b/ic-os/guestos/rootfs/opt/ic/bin/bootstrap-ic-node.sh @@ -124,7 +124,7 @@ function process_bootstrap() { # stash the following configuration files to config store # note: keep this list in sync with configurations supported in build-bootstrap-config-image.sh - for FILE in journalbeat.conf network.conf nns.conf backup.conf log.conf malicious_behavior.conf bitcoind_addr.conf socks_proxy.conf onchain_observability_overrides.json; do + for FILE in journalbeat.conf network.conf nns.conf backup.conf log.conf malicious_behavior.conf bitcoind_addr.conf socks_proxy.conf; do if [ -e "${TMPDIR}/${FILE}" ]; then echo "Setting up ${FILE}" cp "${TMPDIR}/${FILE}" "${CONFIG_ROOT}/${FILE}" diff --git a/ic-os/guestos/rootfs/opt/ic/bin/generate-onchain-observability-adapter-config.sh b/ic-os/guestos/rootfs/opt/ic/bin/generate-onchain-observability-adapter-config.sh deleted file mode 100755 index 8580ccad44c..00000000000 --- a/ic-os/guestos/rootfs/opt/ic/bin/generate-onchain-observability-adapter-config.sh +++ /dev/null @@ -1,70 +0,0 @@ -#!/bin/bash - -function usage() { - cat <$OUT_FILE -else - echo "${CONFIG}" >$OUT_FILE -fi - -# umask for service is set to be restricted, but this file needs to be -# world-readable -chmod 644 "${OUT_FILE}" diff --git a/ic-os/guestos/rootfs/opt/ic/share/ic.json5.template b/ic-os/guestos/rootfs/opt/ic/share/ic.json5.template index 487ef995728..af285653966 100644 --- a/ic-os/guestos/rootfs/opt/ic/share/ic.json5.template +++ b/ic-os/guestos/rootfs/opt/ic/share/ic.json5.template @@ -108,7 +108,6 @@ // The canister http adapter socket file is: /ic-os/guestos/rootfs/etc/systemd/system/ic-https-outcalls-adapter.socket https_outcalls_uds_path: "/run/ic-node/https-outcalls-adapter/socket", https_outcalls_uds_metrics_path: "/run/ic-node/https-outcalls-adapter/metrics", - onchain_observability_uds_metrics_path: "/run/ic-node/onchain-observability-adapter/metrics", }, // ================================================== diff --git a/ic-os/guestos/rootfs/prep/ic-node/ic-node.fc b/ic-os/guestos/rootfs/prep/ic-node/ic-node.fc index cf5cdd4c39a..416795fd139 100644 --- a/ic-os/guestos/rootfs/prep/ic-node/ic-node.fc +++ b/ic-os/guestos/rootfs/prep/ic-node/ic-node.fc @@ -2,7 +2,6 @@ /opt/ic/bin/replica -- gen_context(system_u:object_r:ic_replica_exec_t,s0) /opt/ic/bin/ic-https-outcalls-adapter -- gen_context(system_u:object_r:ic_http_adapter_exec_t,s0) /opt/ic/bin/canister_sandbox -- gen_context(system_u:object_r:ic_canister_sandbox_exec_t,s0) -/opt/ic/bin/ic_onchain_observability_adapter -- gen_context(system_u:object_r:ic_onchain_observability_adapter_exec_t,s0) /var/lib/ic/backup(/.*)? gen_context(system_u:object_r:ic_data_t,s0) /var/lib/ic/data(/.*)? gen_context(system_u:object_r:ic_data_t,s0) /var/lib/ic/data/ic_state/page_deltas(/.*)? gen_context(system_u:object_r:ic_canister_mem_t,s0) diff --git a/ic-os/guestos/rootfs/prep/ic-node/ic-node.te b/ic-os/guestos/rootfs/prep/ic-node/ic-node.te index ff8a6bdaa46..ba93005d723 100644 --- a/ic-os/guestos/rootfs/prep/ic-node/ic-node.te +++ b/ic-os/guestos/rootfs/prep/ic-node/ic-node.te @@ -14,9 +14,6 @@ type ic_canister_sandbox_exec_t; type ic_http_adapter_t; type ic_http_adapter_exec_t; -type ic_onchain_observability_adapter_t; -type ic_onchain_observability_adapter_exec_t; - # On-disk state types. type ic_crypto_store_t; type ic_data_t; @@ -391,7 +388,3 @@ dontaudit ic_canister_sandbox_t cert_t : file { read }; # ic-https-outcalls-adapter application_domain(ic_http_adapter_t, ic_http_adapter_exec_t) - -# onchain-observability-adapter - -application_domain(ic_onchain_observability_adapter_t, ic_onchain_observability_adapter_exec_t) diff --git a/ic-os/scripts/build-bootstrap-config-image.sh b/ic-os/scripts/build-bootstrap-config-image.sh index f636abe39f5..eba5bde18fa 100755 --- a/ic-os/scripts/build-bootstrap-config-image.sh +++ b/ic-os/scripts/build-bootstrap-config-image.sh @@ -107,11 +107,6 @@ options may be specified: The URL of the socks proxy to use. To be used in systems tests only. - --onchain_observability_overrides overrides - The overrides struct (report length, sampling interval) for the onchain observability config. To be used by system tests - - Be sure to properly quote the string. - --get_sev_certs If on an SEV-SNP enabled machine, include the ark, ask, and vcek certificates in the config image. Note: this requires that this @@ -135,7 +130,6 @@ function build_ic_bootstrap_tar() { local REPLICA_LOG_DEBUG_OVERRIDES local MALICIOUS_BEHAVIOR local BITCOIND_ADDR - local ONCHAIN_OBSERVABILITY_OVERRIDES local GET_SEV_CERTS=false while true; do @@ -197,9 +191,6 @@ function build_ic_bootstrap_tar() { --socks_proxy) SOCKS_PROXY="$2" ;; - --onchain_observability_overrides) - ONCHAIN_OBSERVABILITY_OVERRIDES="$2" - ;; --get_sev_certs) GET_SEV_CERTS=true shift 1 @@ -256,9 +247,6 @@ EOF if [ "${SOCKS_PROXY}" != "" ]; then echo "socks_proxy=${SOCKS_PROXY}" >"${BOOTSTRAP_TMPDIR}/socks_proxy.conf" fi - if [ "${ONCHAIN_OBSERVABILITY_OVERRIDES}" != "" ]; then - echo ${ONCHAIN_OBSERVABILITY_OVERRIDES} >"${BOOTSTRAP_TMPDIR}/onchain_observability_overrides.json" - fi if [ "${IC_CRYPTO}" != "" ]; then cp -r "${IC_CRYPTO}" "${BOOTSTRAP_TMPDIR}/ic_crypto" fi diff --git a/policy-monitoring/.dockerignore b/policy-monitoring/.dockerignore index baaa7161539..f652c9a2749 100644 --- a/policy-monitoring/.dockerignore +++ b/policy-monitoring/.dockerignore @@ -11,6 +11,5 @@ artifacts/ **/bin/ic-crypto-csp **/bin/ic-btc-adapter **/bin/ic-https-outcalls-adapter -**/bin/ic-onchain-observability-adapter **/bin/state-tool **/disk.img diff --git a/policy-monitoring/pipeline/es_doc.py b/policy-monitoring/pipeline/es_doc.py index 73db2fea375..5ec1618983a 100644 --- a/policy-monitoring/pipeline/es_doc.py +++ b/policy-monitoring/pipeline/es_doc.py @@ -131,7 +131,6 @@ def is_host_reboot_intent(self) -> bool: "ic-btc-adapter", "ic-crypto-csp", "ic-https-outcalls-adapter", - "ic-onchain-observability-adapter", # TODO: all components that use the ReplicaLogger library ] ) diff --git a/publish/binaries/BUILD.bazel b/publish/binaries/BUILD.bazel index 2c7c9d60fd6..866466e7626 100644 --- a/publish/binaries/BUILD.bazel +++ b/publish/binaries/BUILD.bazel @@ -12,7 +12,6 @@ NO_STRIP = [ "canister_sandbox", "ic-btc-adapter", "ic-https-outcalls-adapter", - "ic-onchain-observability-adapter", "replica", ] @@ -34,7 +33,6 @@ BINARIES = { "ic-https-outcalls-adapter": "//rs/https_outcalls/adapter:ic-https-outcalls-adapter", "ic-consensus-pool-util": "//rs/artifact_pool:ic-consensus-pool-util", "ic-crypto-csp": "//rs/crypto:ic-crypto-csp", - "ic-onchain-observability-adapter": "//rs/monitoring/onchain_observability/adapter:ic-onchain-observability-adapter", "ic-nns-init": "//rs/nns/init:ic-nns-init", "ic-p8s-sd": "//rs/ic_p8s_service_discovery:ic-p8s-sd", "ic-p8s-service-discovery": "//rs/ic_p8s_service_discovery:ic-p8s-service-discovery", diff --git a/rs/bitcoin/adapter/tests/adapter_test.rs b/rs/bitcoin/adapter/tests/adapter_test.rs index dba61603f35..96a77f00e65 100644 --- a/rs/bitcoin/adapter/tests/adapter_test.rs +++ b/rs/bitcoin/adapter/tests/adapter_test.rs @@ -98,8 +98,6 @@ async fn start_client( bitcoin_testnet_uds_metrics_path: None, https_outcalls_uds_path: None, https_outcalls_uds_metrics_path: None, - onchain_observability_enable_grpc_server: false, - onchain_observability_uds_metrics_path: None, }; setup_bitcoin_adapter_clients( diff --git a/rs/canister_client/BUILD.bazel b/rs/canister_client/BUILD.bazel index aa9e293c558..792e5d42f94 100644 --- a/rs/canister_client/BUILD.bazel +++ b/rs/canister_client/BUILD.bazel @@ -3,7 +3,6 @@ load("@rules_rust//rust:defs.bzl", "rust_library", "rust_test") # The package is deprecated, NET-1274. package(default_visibility = [ "//rs/cup_explorer:__pkg__", - "//rs/monitoring/onchain_observability/adapter:__pkg__", "//rs/nns/init:__pkg__", "//rs/orchestrator:__pkg__", "//rs/registry/admin:__pkg__", diff --git a/rs/config/src/adapters.rs b/rs/config/src/adapters.rs index ff96fea7ffe..b15cf21bc03 100644 --- a/rs/config/src/adapters.rs +++ b/rs/config/src/adapters.rs @@ -9,11 +9,4 @@ pub struct AdaptersConfig { pub bitcoin_testnet_uds_metrics_path: Option, pub https_outcalls_uds_path: Option, pub https_outcalls_uds_metrics_path: Option, - #[serde(default = "enable_onchain_observability_grpc_server_default")] - pub onchain_observability_enable_grpc_server: bool, - pub onchain_observability_uds_metrics_path: Option, -} - -fn enable_onchain_observability_grpc_server_default() -> bool { - false } diff --git a/rs/messaging/src/message_routing/tests.rs b/rs/messaging/src/message_routing/tests.rs index d532c4b1490..68f73200fab 100644 --- a/rs/messaging/src/message_routing/tests.rs +++ b/rs/messaging/src/message_routing/tests.rs @@ -531,7 +531,6 @@ fn try_read_registry_succeeds_with_fully_specified_registry_records() { canister_sandboxing: true, http_requests: true, sev_status: Some(SevFeatureStatus::Disabled), - onchain_observability: Some(true), }, ecdsa_config: EcdsaConfig { key_ids: vec![ diff --git a/rs/monitoring/onchain_observability/adapter/BUILD.bazel b/rs/monitoring/onchain_observability/adapter/BUILD.bazel deleted file mode 100644 index 5d2ac06da8a..00000000000 --- a/rs/monitoring/onchain_observability/adapter/BUILD.bazel +++ /dev/null @@ -1,67 +0,0 @@ -load("@rules_rust//rust:defs.bzl", "rust_binary", "rust_library", "rust_test") - -package(default_visibility = ["//visibility:private"]) - -DEPENDENCIES = [ - "//rs/async_utils", - "//rs/canister_client", - "//rs/config", - "//rs/crypto", - "//rs/interfaces", - "//rs/interfaces/registry", - "//rs/monitoring/adapter_metrics_server", - "//rs/monitoring/logger", - "//rs/monitoring/metrics", - "//rs/monitoring/onchain_observability/service", - "//rs/registry/client", - "//rs/registry/helpers", - "//rs/registry/local_store", - "//rs/types/base_types", - "//rs/types/types", - "@crate_index//:candid", - "@crate_index//:clap", - "@crate_index//:itertools", - "@crate_index//:prometheus", - "@crate_index//:prometheus-parse", - "@crate_index//:rand_0_8_4", - "@crate_index//:serde_json", - "@crate_index//:serde_with", - "@crate_index//:serde", - "@crate_index//:sha2", - "@crate_index//:slog", - "@crate_index//:tempfile", - "@crate_index//:thiserror", - "@crate_index//:tokio", - "@crate_index//:tonic", - "@crate_index//:tower", - "@crate_index//:url", -] - -DEV_DEPENDENCIES = [ - "//rs/types/types_test_utils", -] - -ALIASES = {} - -rust_library( - name = "adapter", - srcs = glob(["src/**"]), - aliases = ALIASES, - crate_name = "ic_onchain_observability_adapter", - version = "0.1.0", - deps = DEPENDENCIES, -) - -rust_test( - name = "adapter_test", - crate = ":adapter", - deps = DEPENDENCIES + DEV_DEPENDENCIES, -) - -rust_binary( - name = "ic-onchain-observability-adapter", - srcs = ["src/main.rs"], - aliases = ALIASES, - visibility = ["//publish:__subpackages__"], - deps = DEPENDENCIES + [":adapter"], -) diff --git a/rs/monitoring/onchain_observability/adapter/Cargo.toml b/rs/monitoring/onchain_observability/adapter/Cargo.toml deleted file mode 100644 index cd0bbfd46a5..00000000000 --- a/rs/monitoring/onchain_observability/adapter/Cargo.toml +++ /dev/null @@ -1,43 +0,0 @@ -[package] -name = "ic-onchain-observability-adapter" -version = "0.1.0" -edition = "2021" -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - -[dependencies] -async-trait = "0.1.31" -candid = { workspace = true } -clap = { version = "3.1.6", features = ["derive"] } -ic-adapter-metrics-server = { path = "../../adapter_metrics_server" } -ic-async-utils = { path = "../../../async_utils" } -ic-base-types = { path = "../../../types/base_types" } -ic-canister-client = { path = "../../../canister_client" } -ic-config = { path = "../../../config" } -ic-crypto = { path = "../../../crypto" } -ic-interfaces = { path = "../../../interfaces" } -ic-interfaces-registry = { path = "../../../interfaces/registry" } -ic-logger = { path = "../../logger" } -ic-metrics = { path = "../../metrics" } -ic-onchain-observability-service = { path = "../service" } -ic-registry-client = { path = "../../../registry/client" } -ic-registry-client-helpers = { path = "../../../registry/helpers" } -ic-registry-local-store = { path = "../../../registry/local_store" } -ic-types = { path = "../../../types/types" } -itertools = "0.10.5" -prometheus = { version = "0.12.0", features = [ "process" ] } -prometheus-parse = "0.2.3" -rand = "0.8" -sha2 = "0.9.9" -serde = { version = "1.0", features = ["derive"] } -serde_json = "1.0" -serde_with = "1.6.2" -slog = "2.7.0" -tempfile = "3.1.0" -thiserror = "1.0.26" -tokio = { version = "1.15.0", features = ["full"] } -tonic = "0.8.3" -tower = { version = "0.4.11", features = ["util"] } -url = "2.1.1" - -[dev-dependencies] -ic-types-test-utils = { path = "../../../types/types_test_utils" } diff --git a/rs/monitoring/onchain_observability/adapter/src/config.rs b/rs/monitoring/onchain_observability/adapter/src/config.rs deleted file mode 100644 index 11b974fbb40..00000000000 --- a/rs/monitoring/onchain_observability/adapter/src/config.rs +++ /dev/null @@ -1,55 +0,0 @@ -use ic_config::{ - crypto::CryptoConfig, logger::Config as LoggerConfig, registry_client::Config as RegistryConfig, -}; -use serde::{Deserialize, Serialize}; -use serde_with::{serde_as, DurationSeconds}; -use std::time::Duration; - -/// This struct contains configuration options for the observability canister adapter -/// It is a combination of Onchain Observability Adapter Specific Config and the IC Replica Config -#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)] -#[serde(default)] -pub struct Config { - pub logger: LoggerConfig, - pub crypto_config: CryptoConfig, - pub registry_config: RegistryConfig, - pub report_length_sec: Duration, - pub sampling_interval_sec: Duration, - pub canister_client_url: String, - pub canister_id: String, - pub uds_socket_path: String, -} - -#[serde_as] -#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)] -#[serde(default)] -pub struct OnchainObservabilityAdapterSpecificConfig { - pub logger: LoggerConfig, - #[serde_as(as = "DurationSeconds")] - #[serde(default = "default_report_length")] - pub report_length_sec: Duration, - #[serde_as(as = "DurationSeconds")] - #[serde(default = "default_sampling_interval")] - pub sampling_interval_sec: Duration, - #[serde(default = "default_url")] - pub canister_client_url: String, - pub canister_id: String, - #[serde(default = "uds_default_socket")] - pub uds_socket_path: String, -} - -pub(crate) const fn default_report_length() -> Duration { - Duration::from_secs(3600) // 1 hour -} - -pub(crate) const fn default_sampling_interval() -> Duration { - Duration::from_secs(60) // 1 minute -} - -pub(crate) fn default_url() -> String { - "https://ic0.app".to_string() -} - -pub(crate) fn uds_default_socket() -> String { - "/run/ic-node/onchain-observability-adapter/socket".to_string() -} diff --git a/rs/monitoring/onchain_observability/adapter/src/error_types.rs b/rs/monitoring/onchain_observability/adapter/src/error_types.rs deleted file mode 100644 index 448fed5fcd8..00000000000 --- a/rs/monitoring/onchain_observability/adapter/src/error_types.rs +++ /dev/null @@ -1,17 +0,0 @@ -use thiserror::Error; - -#[derive(Debug, Hash, Eq, PartialEq, Error)] -pub enum MetricsCollectError { - #[error("Metric parse failure: {0}")] - MetricParseFailure(String), - #[error("gPRC Request to replica failed with error {0}")] - RpcRequestFailure(String), -} - -#[derive(Debug, Hash, Eq, PartialEq, Error)] -pub enum CanisterPublishError { - #[error("Serialization failed: {0}")] - SerializationFailure(String), - #[error("Canister client failed: {0}")] - CanisterClientFailure(String), -} diff --git a/rs/monitoring/onchain_observability/adapter/src/flags.rs b/rs/monitoring/onchain_observability/adapter/src/flags.rs deleted file mode 100644 index c0ca56a6687..00000000000 --- a/rs/monitoring/onchain_observability/adapter/src/flags.rs +++ /dev/null @@ -1,90 +0,0 @@ -//! A parser for the configuration file. -// Here we can crash as we cannot proceed with an invalid config. -#![allow(clippy::expect_used)] - -use crate::config::{Config, OnchainObservabilityAdapterSpecificConfig}; -use clap::Parser; -use ic_config::{Config as ReplicaConfig, ConfigSource}; -use slog::Level; -use std::{fs::File, io, path::PathBuf}; -use tempfile::Builder; -use thiserror::Error; - -#[derive(Debug, Error)] -pub enum FlagsError { - #[error("{0}")] - Io(io::Error), - #[error("An error occurred while deserialized the provided configuration: {0}")] - Deserialize(String), - #[error("An error occurred while validating the provided configuration: {0}")] - Validation(String), - #[error("Internal")] - Internal, -} - -/// This struct is use to provide a command line interface to the adapter. -#[derive(Parser)] -#[clap(version = "0.1.0", author = "DFINITY team ")] -pub struct Flags { - /// Config specific to the onchain observability adapter. This will be combined with replica config to generate overall config. - #[clap(long = "adapter-specific-config-file", parse(from_os_str))] - pub adapter_specific_config: PathBuf, - /// We also want to stay in sync with replica filepaths for crypto, registry - #[clap(long = "replica-config-file", parse(from_os_str))] - pub replica_config: PathBuf, - /// This field represents if the adapter should run in verbose. - #[clap(short, long)] - pub verbose: bool, -} - -impl Flags { - /// Gets the log filter level by checking the verbose field. - pub fn get_logging_level(&self) -> Level { - if self.verbose { - Level::Debug - } else { - Level::Info - } - } - - /// Loads the adapter specific config and replica config and synthesizes into a final config - pub fn get_config(&self) -> Result { - let adapter_specific_config_file = - File::open(&self.adapter_specific_config).map_err(FlagsError::Io)?; - let adapter_specific_config: OnchainObservabilityAdapterSpecificConfig = - serde_json::from_reader(adapter_specific_config_file) - .map_err(|err| FlagsError::Deserialize(err.to_string()))?; - - let replica_config = get_replica_config(self.replica_config.clone()); - - // This is a hack to disable the adapter. This code should be cleaned up. - if !replica_config - .adapters_config - .onchain_observability_enable_grpc_server - { - return Err(FlagsError::Internal); - } - - Ok(Config { - logger: adapter_specific_config.logger, - crypto_config: replica_config.crypto, - registry_config: replica_config.registry_client, - report_length_sec: adapter_specific_config.report_length_sec, - sampling_interval_sec: adapter_specific_config.sampling_interval_sec, - canister_client_url: adapter_specific_config.canister_client_url, - canister_id: adapter_specific_config.canister_id, - uds_socket_path: adapter_specific_config.uds_socket_path, - }) - } -} - -fn get_replica_config(replica_config_file: PathBuf) -> ReplicaConfig { - let tmpdir = Builder::new() - .prefix("ic_config") - .tempdir() - .expect("failed to create temporary directory for replica config") - .path() - .to_path_buf(); - - ReplicaConfig::load_with_tmpdir(ConfigSource::File(replica_config_file), tmpdir) -} diff --git a/rs/monitoring/onchain_observability/adapter/src/helpers.rs b/rs/monitoring/onchain_observability/adapter/src/helpers.rs deleted file mode 100644 index 5fc7fa6a9be..00000000000 --- a/rs/monitoring/onchain_observability/adapter/src/helpers.rs +++ /dev/null @@ -1,26 +0,0 @@ -use ic_base_types::SubnetId; -use ic_registry_client::client::RegistryClient; -use ic_registry_client_helpers::subnet::SubnetRegistry; -use std::sync::Arc; -use tokio::time::{sleep, Duration}; - -// Checks registry to see whether feature flag is enabled to run the adapter. -// This should be run periodically in case feature flag is turned on/off. -// If feature flag is disabled, the registry client will be queried poll_duration secs until flag is enabled. -pub async fn poll_until_reporting_enabled( - registry_client: Arc, - subnet_id: SubnetId, - poll_duration: Duration, -) { - loop { - if let Ok(Some(true)) = registry_client - .get_features(subnet_id, registry_client.get_latest_version()) - .map(|features| features.unwrap_or_default().onchain_observability) - { - return; - } - sleep(poll_duration).await; - } -} - -// TODO - Migrate other functions from main diff --git a/rs/monitoring/onchain_observability/adapter/src/lib.rs b/rs/monitoring/onchain_observability/adapter/src/lib.rs deleted file mode 100644 index fe955215f79..00000000000 --- a/rs/monitoring/onchain_observability/adapter/src/lib.rs +++ /dev/null @@ -1,27 +0,0 @@ -//! This module is for collecting replica stats and sending to a reporting canister - -/// This module contains the basic configuration struct used to start up an adapter instance. -mod config; -/// Error handling -mod error_types; -/// Config settings from command line -mod flags; -/// Separate location for misc helper functions -mod helpers; -/// Metrics for the adapter -mod metrics; -/// Temporary helpers for node id -mod non_sampled_metrics_collector; -/// Provides a sampling API to fetch single instance of metric and aggregate -mod sampled_metrics_collector; - -pub use config::Config; -pub use error_types::{CanisterPublishError, MetricsCollectError}; -pub use flags::Flags; -pub use helpers::poll_until_reporting_enabled; -pub use metrics::OnchainObservabilityAdapterMetrics; -pub use non_sampled_metrics_collector::{ - collect_metrics_for_peers, derive_peer_counters_for_current_report_interval, NonSampledMetrics, - PeerCounterMetrics, -}; -pub use sampled_metrics_collector::SampledMetricsCollector; diff --git a/rs/monitoring/onchain_observability/adapter/src/main.rs b/rs/monitoring/onchain_observability/adapter/src/main.rs deleted file mode 100644 index 4a6d99934b6..00000000000 --- a/rs/monitoring/onchain_observability/adapter/src/main.rs +++ /dev/null @@ -1,694 +0,0 @@ -/// This binary is managed by systemd and added to the replica image. -/// The replica communicates with the HTTP adapter over unix domain sockets. -/// Relevant configuration files: -/// systemd service ic-os/guestos/rootfs/etc/systemd/system/ic-onchain-observability-adapter.service -/// systemd socket ic-os/guestos/rootfs/etc/systemd/system/ic-onchain-observability-adapter.socket -use candid::{Decode, Encode}; -use clap::Parser; -use ic_adapter_metrics_server::start_metrics_grpc; -use ic_async_utils::{abort_on_panic, incoming_from_nth_systemd_socket}; -use ic_base_types::{CanisterId, NodeId, SubnetId}; -use ic_canister_client::{Agent, Sender}; -use ic_crypto::CryptoComponent; -use ic_interfaces::crypto::{BasicSigner, ErrorReproducibility, KeyManager}; -use ic_interfaces_registry::ZERO_REGISTRY_VERSION; -use ic_logger::{error, info, new_replica_logger_from_config, warn, ReplicaLogger}; -use ic_metrics::MetricsRegistry; -use ic_onchain_observability_adapter::{ - collect_metrics_for_peers, derive_peer_counters_for_current_report_interval, - poll_until_reporting_enabled, CanisterPublishError, Config, Flags, MetricsCollectError, - OnchainObservabilityAdapterMetrics, PeerCounterMetrics, SampledMetricsCollector, -}; -use ic_onchain_observability_service::onchain_observability_service_client::OnchainObservabilityServiceClient; -use ic_registry_client::client::{RegistryClient, RegistryClientImpl}; -use ic_registry_client_helpers::{node::NodeRegistry, subnet::SubnetRegistry}; -use ic_registry_local_store::LocalStoreImpl; -use ic_types::{ - crypto::CryptoError, - messages::MessageId, - onchain_observability::{PeerReport, Report, SignedReport}, -}; -use rand::Rng; -use serde_json::to_string_pretty; -use sha2::Digest; -use std::{ - collections::{HashMap, HashSet}, - path::PathBuf, - str::FromStr, - sync::Arc, - time::SystemTime, -}; -use tokio::{ - net::UnixStream, - runtime::Handle, - time::{interval, sleep, Duration, MissedTickBehavior}, -}; -use tonic::transport::{Channel, Endpoint, Uri}; -use tower::service_fn; -use url::Url; - -const MAX_CRYPTO_SIGNATURE_ATTEMPTS: u64 = 5; - -const PREPARE_SOME_METHOD: &str = "prepare_some"; -const GET_CERTIFICATE_METHOD: &str = "get_certificate"; -const COMMIT_METHOD: &str = "commit"; -const FIND_METHOD: &str = "find"; - -const FIND_REPORT_SLEEP_DURATION: Duration = Duration::from_secs(10); -const REGISTRY_RETRY_SLEEP_DURATION: Duration = Duration::from_secs(10); -const FEATURE_FLAG_POLL_DURATION: Duration = Duration::from_secs(60); - -#[tokio::main] -pub async fn main() { - // We abort the whole program with a core dump if a single thread panics. - // This way we can capture all the context if a critical error - // happens. - abort_on_panic(); - - let flags = Flags::parse(); - let config = match flags.get_config() { - Ok(config) => config, - Err(_) => return, - }; - - let metrics_registry = MetricsRegistry::global(); - let (logger, _async_log_guard) = new_replica_logger_from_config(&config.logger); - let onchain_observability_adapter_metrics = - OnchainObservabilityAdapterMetrics::new(&metrics_registry); - - let handle = Handle::current(); - - let (registry_client, crypto_component) = create_registry_client_and_crypto_component( - logger.clone(), - metrics_registry.clone(), - config.clone(), - handle, - ) - .await; - - while registry_client.get_latest_version() == ZERO_REGISTRY_VERSION { - warn!(logger, "Registry client not ready"); - sleep(REGISTRY_RETRY_SLEEP_DURATION).await; - } - - let node_id = crypto_component.get_node_id(); - - let subnet_id = registry_client - .get_subnet_id_from_node_id(node_id, registry_client.get_latest_version()) - .expect("Failed to query subnet topology data") - .expect("Failed to get subnet id from node"); - - info!(logger, "Waiting for feature flag to be enabled..."); - - // Adapter should only collect/send data if feature flag is enabled - poll_until_reporting_enabled( - registry_client.clone(), - subnet_id, - FEATURE_FLAG_POLL_DURATION, - ) - .await; - - let canister_client_url = - Url::parse(&config.canister_client_url).expect("Failed to create url"); - let canister_id = - CanisterId::from_str(&config.canister_id).expect("Failed to parse canister id"); - - info!( - logger, - "Starting the onchain observability adapter with config: {}, url: {:?} id{:?}", - to_string_pretty(&config).unwrap(), - canister_client_url, - canister_id, - ); - - // SAFETY: - // The systemd service is configured to set its first socket as the metrics socket, so we expect the FD to exist. - // Additionally, this is the only callsite within the adapter so this should only be consumed once. - // Systemd Socket config: ic-os/guestos/rootfs/etc/systemd/system/ic-onchain-observability-adapter-metrics.socket - // Systemd Service config: ic-os/guestos/rootfs/etc/systemd/system/ic-onchain-observability-adapter.service - let stream = unsafe { incoming_from_nth_systemd_socket(1) }; - start_metrics_grpc(metrics_registry.clone(), logger.clone(), stream); - - let canister_client = - create_canister_client(crypto_component.clone(), canister_client_url, node_id).await; - - let grpc_client = - setup_onchain_observability_adapter_client(PathBuf::from(config.uds_socket_path)); - - // Continuously collect and send reports. There are 2 types of metrics - sampled and non-sampled. - // Sampled will be collected periodically and averaged at the end of the reporting interval. - // Non-sampled will be collected at the end and the delta will be computed from previous baseline. - // On failure, the report publish will be skipped and attempted again at the next interval. - - let mut sampling_interval = interval(config.sampling_interval_sec); - sampling_interval.set_missed_tick_behavior(MissedTickBehavior::Delay); - let mut sampler = SampledMetricsCollector::new( - grpc_client.clone(), - onchain_observability_adapter_metrics.clone(), - ); - let mut non_sampled_metrics_baseline_opt = None; - let mut start_time = SystemTime::now(); - let mut report_duration = config.report_length_sec; - - loop { - sampling_interval.tick().await; - if let Err(e) = sampler.sample().await { - error!(logger, "sampling failed {:?}", e); - } - if start_time - .elapsed() - .expect("Negative system time must not happen") - >= report_duration - || non_sampled_metrics_baseline_opt.is_none() - { - onchain_observability_adapter_metrics - .report_interval_elapsed_total - .inc(); - - // Refresh peer ids. TODO(NET-1384) if fetching peer id fails, either fallback to old peers or skip report - let peer_ids = get_peer_ids(node_id, subnet_id, registry_client.clone()); - - let non_sampled_metrics_at_report_end_result = collect_metrics_for_peers( - grpc_client.clone(), - &peer_ids, - &onchain_observability_adapter_metrics, - ) - .await; - - if let Err(e) = non_sampled_metrics_at_report_end_result { - // On failure, retry data collection and publish on next interval. Note that start time / baseline counters are preserved and we simply extend the report end time. - onchain_observability_adapter_metrics - .reports_delayed_total - .inc(); - error!( - logger, - "Failed to collect non-sampled metrics, defering report to next interval: {:?}", - e.to_string() - ); - report_duration += config.report_length_sec; - continue; - } - let non_sampled_metrics_at_report_end = - non_sampled_metrics_at_report_end_result.unwrap(); - - // If the report start is unset, this implies we haven't established a baseline yet, so we must wait - // until the next iteration before we can compute the delta and publish the report. - if let Some(non_sampled_metrics_baseline) = non_sampled_metrics_baseline_opt { - let end_time = SystemTime::now(); - - // The gRPC response provides cumulative metric counts since replica last restart, so we may need to adjust counts for the current reporting window. - let peer_counters_for_current_interval = - derive_peer_counters_for_current_report_interval( - &non_sampled_metrics_baseline, - &non_sampled_metrics_at_report_end, - ); - let replica_last_start_time = - non_sampled_metrics_at_report_end.replica_last_start_time; - - let up_time_peer_labels = sampler.aggregate(); - - let report = prepare_report( - node_id, - start_time, - end_time, - replica_last_start_time, - peer_counters_for_current_interval, - up_time_peer_labels, - &peer_ids, - ); - - info!(logger, "Completed Report: report {:?}", report); - - for signature_attempts in 0..MAX_CRYPTO_SIGNATURE_ATTEMPTS { - match sign_report(crypto_component.clone(), report.clone(), node_id).await { - Ok(signed_report) => { - if let Err(e) = send_report_to_canister( - &canister_client, - canister_id, - &signed_report, - &logger, - ) - .await - { - warn!(logger, "Send report may have failed: {:?}", e); - } - // Add a delay to allocate sufficient time in case data is commited by another node - sleep(FIND_REPORT_SLEEP_DURATION).await; - - let publish_result = match is_report_published( - &canister_client, - canister_id, - &signed_report, - ) - .await - { - Ok(is_published) => is_published.to_string(), - Err(e) => { - error!( - logger, - "Could not check whether report was published {:?}", e - ); - "unknown".to_string() - } - }; - onchain_observability_adapter_metrics - .find_published_report_in_canister_requests_total - .with_label_values(&[&publish_result]) - .inc(); - - break; - } - Err(e) => { - if e.is_reproducible() - || signature_attempts == MAX_CRYPTO_SIGNATURE_ATTEMPTS - 1 - { - // "Reproducible" represents a fundamental issue with the crypto setup - onchain_observability_adapter_metrics - .failed_crypto_signatures_total - .inc(); - error!(logger, "Failed to sign report, skipping {:?}", e); - break; - } - // Otherwise, if we receive sporadic error, re-try a limited number of times. - warn!(logger, "Received sporadic crypto signature failure {:?}", e); - } - } - } - } - // Reset the baseline counts - non_sampled_metrics_baseline_opt = Some(non_sampled_metrics_at_report_end); - sampler.clear(); - report_duration = config.report_length_sec; - // Only proceed if feature flag is still enabled - poll_until_reporting_enabled( - registry_client.clone(), - subnet_id, - FEATURE_FLAG_POLL_DURATION, - ) - .await; - start_time = SystemTime::now(); - } - } -} - -async fn is_report_published( - canister_client: &Agent, - canister_id: CanisterId, - report: &SignedReport, -) -> Result { - let encoded_report = Encode!(&report).or_else(|_| { - Err(CanisterPublishError::SerializationFailure( - "Report encoding".to_string(), - )) - })?; - - let hash: [u8; 32] = sha2::Sha256::digest(&encoded_report).into(); - - let encoded_arg = Encode!(&hash).or_else(|_| { - Err(CanisterPublishError::SerializationFailure( - "Find query arg encoding".to_string(), - )) - })?; - - // Running as replicated query to avoid hitting a lagging node - let encoded_option = canister_client - .execute_update( - &canister_id, - &canister_id, - FIND_METHOD, - encoded_arg, - generate_nonce(), - ) - .await - .map_err(|e| { - CanisterPublishError::CanisterClientFailure(format!("find() query failed: {:?}", e)) - })? - .ok_or_else(|| { - CanisterPublishError::CanisterClientFailure("Empty response from find()".to_string()) - })?; - - let index_option = Decode!(&encoded_option, Option).or_else(|_| { - Err(CanisterPublishError::SerializationFailure( - "Canister client response decoding".to_string(), - )) - })?; - - match index_option { - Some(_) => Ok(true), - None => Ok(false), - } -} - -// Currently the adapter only consider the peer ids from the latest registry versions. -// This may cause some occasional discrepancies between metrics between registry and peers. -// NET-1352 to track -fn get_peer_ids( - current_node_id: NodeId, - subnet_id: SubnetId, - registry_client: Arc, -) -> HashSet { - let latest_version = registry_client.get_latest_version(); - - registry_client - .get_node_ids_on_subnet(subnet_id, latest_version) - .expect("Failed to get subnet data for subnet id") - .expect("Failed to retrieve node ids from subnet") - .iter() - .copied() - .filter(|&x| x != current_node_id) - .collect() -} - -// Generate crypto component which is needed for signing messages -async fn create_registry_client_and_crypto_component( - logger: ReplicaLogger, - metrics_registry: MetricsRegistry, - config: Config, - rt_handle: Handle, -) -> (Arc, Arc) { - let data_provider = Arc::new(LocalStoreImpl::new(config.registry_config.local_store)); - let registry_client = Arc::new(RegistryClientImpl::new( - data_provider, - Some(&metrics_registry), - )); - // Cloning registry client since we need to move it into the task below - let registry_client_clone = registry_client.clone(); - - // TODO (NET-1336) proper error handling in case registry is not populated - registry_client - .fetch_and_start_polling() - .expect("fetch_and_start_polling failed"); - - let crypto_component = tokio::task::spawn_blocking(move || { - Arc::new(CryptoComponent::new( - &config.crypto_config, - Some(rt_handle), - registry_client_clone, - logger, - Some(&metrics_registry), - )) - }) - .await - .expect("Failed to create crypto component"); - - (registry_client, crypto_component) -} - -async fn create_canister_client(crypto: Arc, url: Url, node_id: NodeId) -> Agent { - let latest_version = crypto.registry_client().get_latest_version(); - - let crypto_clone = crypto.clone(); - - let node_pub_key = tokio::task::spawn_blocking(move || { - crypto - .current_node_public_keys() - .map(|cnpks| cnpks.node_signing_public_key) - .expect("Failed to retrieve current node public keys") - .expect("Missing node signing public key") - }) - .await - .unwrap(); - - let sign_cmd = move |msg: &MessageId| { - // Implementation of 'sign_basic' uses Tokio's 'block_on' when issuing a RPC - // to the crypto service. 'block_on' panics when called from async context - // that's why we need to wrap 'sign_basic' in 'block_in_place'. - #[allow(clippy::disallowed_methods)] - tokio::task::block_in_place(|| { - crypto_clone - .sign_basic(msg, node_id, latest_version) - .map_err(|e| Box::new(e) as Box) - .map(|value| value.get().0) - }) - }; - - let sender = Sender::Node { - pub_key: node_pub_key.key_value, - sign: Arc::new(sign_cmd), - }; - Agent::new(url, sender) -} - -// Prepare a report in the format expected by the observability canister. -// The report will be sent along with a signature allowing canister to -// verify its authentictiy. -fn prepare_report( - reporting_node_id: NodeId, - start_time: SystemTime, - end_time: SystemTime, - replica_last_start_time: SystemTime, - counter_metrics: HashMap, - uptime_peer_labels: HashMap, - peer_ids: &HashSet, -) -> Report { - // Convert uptime from peer label into peer ids - let uptime_peer_ids: HashMap = uptime_peer_labels - .iter() - .filter_map(|(node_label, percent)| { - match convert_peer_label_to_node_id(node_label, peer_ids) { - Ok(node_id) => Some((node_id, *percent)), - Err(_) => None, - } - }) - .collect(); - - let mut peer_reports: Vec = vec![]; - for peer in peer_ids { - if counter_metrics.get(peer).is_some() && uptime_peer_ids.get(peer).is_some() { - peer_reports.push(PeerReport { - peer_id_binary: peer.get().to_vec(), - peer_uptime_percent: uptime_peer_ids[peer], - num_retries: counter_metrics[peer].num_retries, - connection_bytes_received: counter_metrics[peer].bytes_received, - connection_bytes_sent: counter_metrics[peer].bytes_sent, - }); - } - } - - // Next, append the reporting-node-specific fields - Report { - start_time, - end_time, - reporting_node_id_binary: reporting_node_id.get().to_vec(), - replica_last_start_time, - peer_report: peer_reports, - } -} - -async fn sign_report( - crypto: Arc, - report: Report, - reporting_node_id: NodeId, -) -> Result { - let report_clone = report.clone(); - match tokio::task::spawn_blocking(move || { - crypto - .sign_basic( - &report_clone, - reporting_node_id, - crypto.registry_client().get_latest_version(), - ) - .map(|value| value.get().0) - }) - .await - .unwrap() - { - Ok(signature) => Ok(SignedReport { report, signature }), - Err(e) => Err(e), - } -} - -/// Publish the report to the canister using the Canister Client API. This involves a 3-step process -/// 1. prepare_some(vec![encoded report]) to prepare the data and create a certificate -/// 2. Get_certificate() to get the certificate for the data -/// 3. commit(certificate) to publish the results to the canister -/// Note that these calls must account for race conditions from simultaneous publishes across replicas -async fn send_report_to_canister( - canister_client: &Agent, - canister_id: CanisterId, - report: &SignedReport, - logger: &ReplicaLogger, -) -> Result<(), CanisterPublishError> { - // Introducing some jitter to break synchronization across replicas and reduce probability that multiple replicas - // initiate a request at the same time, which can cause canister client calls to fail. TODO(NET-1343) - integrate into retry logic - let mut rng = rand::thread_rng(); - let random_sleep_duration_sec = 2 * rng.gen_range(0..5); - sleep(Duration::from_secs(random_sleep_duration_sec)).await; - - // Step 1 - the canister API requires us to call prepare_some(Vec>) to prepare the data. Note that this requires 2 levels of encoding. - // First we encode the report itself to vec. However, the API expects vec so we must then wrap it in another vector. - // Finally, the canister client expects a candid-encoded representation of the method arguments, so we must re-encode this back into another Vec. - let encoded_report = Encode!(&report).or_else(|_| { - Err(CanisterPublishError::SerializationFailure( - "Report encoding".to_string(), - )) - })?; - let candid_prepare_some_arg = Encode!(&vec![encoded_report]).or_else(|_| { - Err(CanisterPublishError::SerializationFailure( - "prepare_some() arg encoding".to_string(), - )) - })?; - - canister_client - .execute_update( - &canister_id, - &canister_id, - PREPARE_SOME_METHOD, - candid_prepare_some_arg, - generate_nonce(), - ) - .await - .map_err(|e| { - CanisterPublishError::CanisterClientFailure(format!( - "prepare_some() query failed: {:?}", - e - )) - })? - .ok_or_else(|| { - CanisterPublishError::CanisterClientFailure( - "Empty response from prepare_some()".to_string(), - ) - })?; - - // Step 2 - We must call get_certificate() to obtain the certificate corresponding to the prepared data. - // This is used later to confirm we are not attempting to publish stale data. - let encoded_empty_arg = Encode!(&Vec::::new()).or_else(|_| { - Err(CanisterPublishError::SerializationFailure( - "get_certificate() args encoding".to_string(), - )) - })?; - - let encoded_certificate = canister_client - .execute_query(&canister_id, GET_CERTIFICATE_METHOD, encoded_empty_arg) - .await - .map_err(|e| { - CanisterPublishError::CanisterClientFailure(format!( - "get_certificate() query failed: {:?}", - e - )) - })? - .ok_or_else(|| { - CanisterPublishError::CanisterClientFailure( - "Empty response from get_certificate()".to_string(), - ) - })?; - - let decoded_certificate_opt = Decode!(&encoded_certificate, Option>).or_else(|_| { - Err(CanisterPublishError::SerializationFailure( - "Certificate decoding".to_string(), - )) - })?; - - // Step 3 - We must commit the data using the certificate - // If certificate is not found, that means there was no pending data and we can assume - // that this data was already published by another replica. - let certificate = match decoded_certificate_opt { - Some(cert) => cert, - None => { - warn!( - logger, - "Certificate not found. Data may have already been published" - ); - return Ok(()); - } - }; - - let candid_commit_arg = Encode!(&certificate).or_else(|_| { - Err(CanisterPublishError::SerializationFailure( - "commit() args encoding".to_string(), - )) - })?; - - match canister_client - .execute_update( - &canister_id, - &canister_id, - COMMIT_METHOD, - candid_commit_arg, - generate_nonce(), - ) - .await - { - // If error, we can assume this is due to stale certificate and that - // another replica staged new data and will eventually commit the data - - // TODO: Convert canister client to strongly typed error so that we can also - // check for timeouts - Ok(Some(encoded_block_number)) => { - let decoded_result = Decode!(&encoded_block_number, Option).or_else(|_| { - Err(CanisterPublishError::SerializationFailure( - "block decoding".to_string(), - )) - })?; - if let Some(block) = decoded_result { - info!( - logger, - "Successfully published data at block height: {:?}", block - ); - } else { - warn!(logger, "Commit was skipped due to no pending data"); - } - } - Ok(None) => { - return Err(CanisterPublishError::CanisterClientFailure( - "Commit() did not return a valid response".to_string(), - )) - } - Err(e) => { - warn!( - logger, - "Did not commit data (may have already been committed): {:?}", e - ); - } - } - Ok(()) -} - -fn generate_nonce() -> Vec { - SystemTime::now() - .duration_since(SystemTime::UNIX_EPOCH) - .unwrap() - .as_nanos() - .to_le_bytes() - .to_vec() -} - -// Peer label is in form of "{NODE IP}_{NODE ID PREFIX}" so we can take the prefix -// and compare to peer ids to infer the full id -fn convert_peer_label_to_node_id( - peer_label: &str, - node_id_list: &HashSet, -) -> Result { - let peer_label_split = peer_label.split('_').collect::>(); - if peer_label_split.len() != 2 { - return Err(MetricsCollectError::MetricParseFailure( - "Peer label was not succesfully split into 2 pieces".to_string(), - )); - } - let peer_id_prefix = peer_label_split[1]; - - let valid_node_ids: Vec<_> = node_id_list - .iter() - .filter(|&id| id.to_string().contains(peer_id_prefix)) - .collect(); - - // Assumption: There is a 1:1 mapping between id prefix and full id - if valid_node_ids.len() != 1 { - return Err(MetricsCollectError::MetricParseFailure( - "Did not find 1:1 mapping between node id prefix and node id list".to_string(), - )); - } - Ok(*valid_node_ids[0]) -} - -pub fn setup_onchain_observability_adapter_client( - uds_path: PathBuf, -) -> OnchainObservabilityServiceClient { - let endpoint = Endpoint::try_from("http://[::]:50051").expect("Failed to connect to endpoint"); - let channel = endpoint.connect_with_connector_lazy(service_fn(move |_: Uri| { - // Connect to a Uds socket - UnixStream::connect(uds_path.clone()) - })); - - OnchainObservabilityServiceClient::new(channel) -} diff --git a/rs/monitoring/onchain_observability/adapter/src/metrics.rs b/rs/monitoring/onchain_observability/adapter/src/metrics.rs deleted file mode 100644 index 9e9501a1315..00000000000 --- a/rs/monitoring/onchain_observability/adapter/src/metrics.rs +++ /dev/null @@ -1,45 +0,0 @@ -use ic_metrics::MetricsRegistry; -use prometheus::{IntCounter, IntCounterVec}; - -const REPORT_WAS_FOUND: &str = "report_was_found"; -const REQUEST_TYPE: &str = "request_type"; -const ERROR_STATUS: &str = "error_status"; - -#[derive(Debug, Clone)] -pub struct OnchainObservabilityAdapterMetrics { - pub report_interval_elapsed_total: IntCounter, - pub failed_crypto_signatures_total: IntCounter, - pub failed_grpc_requests_total: IntCounterVec, - pub reports_delayed_total: IntCounter, - pub find_published_report_in_canister_requests_total: IntCounterVec, -} - -// Note that all adapter metrics are automatically prefixed with "onchain_observability" -impl OnchainObservabilityAdapterMetrics { - pub fn new(metrics_registry: &MetricsRegistry) -> Self { - Self { - report_interval_elapsed_total: metrics_registry.int_counter( - "report_interval_elapsed_total", - "Counts the number of 'report-loop iterations' to measure how many reports are expected.", - ), - failed_crypto_signatures_total: metrics_registry.int_counter( - "failed_crypto_signatures_total", - "Counts failures from attempting to sign the report.", - ), - failed_grpc_requests_total: metrics_registry.int_counter_vec( - "failed_grpc_requests_total", - "Tracks when gRPC request to replica fails, indexed on error type and whether request was sampled or non-sampled. Note that this does not necessarily equate to a skipped report if retries succeed.", - &[REQUEST_TYPE, ERROR_STATUS], - ), - reports_delayed_total: metrics_registry.int_counter( - "reports_delayed_total", - "Tracks when gRPC consistently fails to the point of re-attempting metric collection at the next interval.", - ), - find_published_report_in_canister_requests_total: metrics_registry.int_counter_vec( - "find_published_report_in_canister_requests_total", - "Tracks what fraction of reports are successfully published to the canister by querying the canister for report after a commit sequence. 'True' indicates a report was published successfully, and 'False' indicates a report is missing.", - &[REPORT_WAS_FOUND], - ), - } - } -} diff --git a/rs/monitoring/onchain_observability/adapter/src/non_sampled_metrics_collector.rs b/rs/monitoring/onchain_observability/adapter/src/non_sampled_metrics_collector.rs deleted file mode 100644 index 8f0f35fa870..00000000000 --- a/rs/monitoring/onchain_observability/adapter/src/non_sampled_metrics_collector.rs +++ /dev/null @@ -1,258 +0,0 @@ -use crate::{error_types::MetricsCollectError, OnchainObservabilityAdapterMetrics}; -use ic_base_types::{NodeId, PrincipalId}; -use ic_onchain_observability_service::{ - onchain_observability_service_client::OnchainObservabilityServiceClient, - OnchainObservabilityServiceGetMetricsDataRequest, -}; -use prometheus_parse::{Sample, Scrape, Value, Value::Counter}; -use std::{ - collections::{HashMap, HashSet}, - str::FromStr, - time::{Duration, SystemTime}, -}; -use tonic::transport::Channel; - -const PROCESS_START_TIME_METRIC: &str = "process_start_time_seconds"; -const RETRY_CONNECTION_METRIC: &str = "transport_retry_connection"; -const TOTAL_BYTES_SENT_METRIC: &str = "transport_write_bytes_total"; -const TOTAL_BYTES_RECEIVED_METRIC: &str = "transport_read_bytes_total"; -const PEER_ID_LABEL: &str = "peer_id"; - -// TODO make these config-based -const RETRY_INTERVAL_SEC: Duration = Duration::from_secs(30); -const TIMEOUT_LENGTH_SEC: Duration = Duration::from_secs(30); -const MAX_ATTEMPTS: u64 = 5; - -// Represents the non-sampled metrics collected from replica from an individual gRPC request -#[derive(Clone, Debug, PartialEq)] - -pub struct NonSampledMetrics { - // Represents last restart time for the replica providing peer metrics. Can be used to - // determine if metric counters were reset since last gRPC query. - pub replica_last_start_time: SystemTime, - pub peer_metrics: HashMap, -} - -// The non-sampled peer metrics obtained from gRPC request. -#[derive(Clone, Debug, Default, PartialEq)] -pub struct PeerCounterMetrics { - // The number of times a peer connection was disconnected and a reconnect was attempted - pub num_retries: u64, - // Total bytes the reporting replica received from the peer - pub bytes_received: u64, - // Total bytes the reporting replica wrote to the peer - pub bytes_sent: u64, -} - -impl PeerCounterMetrics { - pub fn subtract(&self, other: &Self) -> Self { - PeerCounterMetrics { - num_retries: self.num_retries - other.num_retries, - bytes_received: self.bytes_received - other.bytes_received, - bytes_sent: self.bytes_sent - other.bytes_sent, - } - } -} - -// Sends a gRPC request to the replica to collect the relevant metrics. -// Response will be filtered on peer ids passed as a parameter. This is to resolve any -// possible discrepancies between peers from latest registry version (source of truth) and prometheus metrics. -pub async fn collect_metrics_for_peers( - mut client: OnchainObservabilityServiceClient, - peer_ids: &HashSet, - adapter_metrics: &OnchainObservabilityAdapterMetrics, -) -> Result { - let request = OnchainObservabilityServiceGetMetricsDataRequest { - requested_metrics: vec![ - PROCESS_START_TIME_METRIC.to_string(), - RETRY_CONNECTION_METRIC.to_string(), - TOTAL_BYTES_SENT_METRIC.to_string(), - TOTAL_BYTES_RECEIVED_METRIC.to_string(), - ], - }; - - for _ in 0..MAX_ATTEMPTS { - let mut tonic_request = tonic::Request::new(request.clone()); - tonic_request.set_timeout(TIMEOUT_LENGTH_SEC); - - match client.get_metrics_data(tonic_request).await { - Ok(response) => { - return parse_metrics_response(response.into_inner().metrics_data, peer_ids) - } - Err(status) => { - adapter_metrics - .failed_grpc_requests_total - .with_label_values(&["non_sampled", &status.code().to_string()]) - .inc(); - } - } - tokio::time::sleep(RETRY_INTERVAL_SEC).await; - } - Err(MetricsCollectError::RpcRequestFailure(format!( - "Max attempts ({:?}) exceeded", - MAX_ATTEMPTS - ))) -} - -// Takes raw string gRPC response and converts into NonSampledMetrics struct -pub fn parse_metrics_response( - response: String, - peer_ids: &HashSet, -) -> Result { - let lines = response.lines().map(|s| Ok(s.to_string())); - let scraped_metrics = Scrape::parse(lines).map_err(|_| { - MetricsCollectError::MetricParseFailure("prometheus scrape error".to_string()) - })?; - - let replica_last_start_time = extract_replica_last_start(&scraped_metrics).map_err(|e| { - MetricsCollectError::MetricParseFailure(format!( - "Failed to parse replica last start: {:?}", - e - )) - })?; - - // If parsing the prometheus metrics data fails, then a log entry will not be created - // TODO (NET-1338) Add counter to track metric parse failure - let bytes_sent_for_peers = - extract_peer_counts_for_metric(&scraped_metrics, TOTAL_BYTES_SENT_METRIC)?; - - let bytes_received_for_peers = - extract_peer_counts_for_metric(&scraped_metrics, TOTAL_BYTES_RECEIVED_METRIC)?; - - let retry_count_for_peers = - extract_peer_counts_for_metric(&scraped_metrics, RETRY_CONNECTION_METRIC)?; - - let mut peer_metrics_map = HashMap::new(); - - for peer_id in peer_ids.iter() { - let num_retries = *retry_count_for_peers.get(peer_id).unwrap_or(&0); - let bytes_received = *bytes_received_for_peers.get(peer_id).unwrap_or(&0); - let bytes_sent = *bytes_sent_for_peers.get(peer_id).unwrap_or(&0); - - let current_peer_metrics = PeerCounterMetrics { - num_retries, - bytes_received, - bytes_sent, - }; - peer_metrics_map.insert(*peer_id, current_peer_metrics); - } - - Ok(NonSampledMetrics { - replica_last_start_time, - peer_metrics: peer_metrics_map, - }) -} - -// Returns the replica last start time from the prometheus scrape -fn extract_replica_last_start(parse: &Scrape) -> Result { - let replica_last_start_time_vec: Vec<&Value> = parse - .samples - .iter() - .filter(|&sample| sample.metric == PROCESS_START_TIME_METRIC) - .map(|sample| &sample.value) - .collect(); - - if replica_last_start_time_vec.len() != 1 { - return Err(MetricsCollectError::MetricParseFailure(format!( - "Expected exactly 1 field for replica start time, found {:?}", - replica_last_start_time_vec.len() - ))); - } - - if let Value::Gauge(last_start) = replica_last_start_time_vec[0] { - let time = *last_start as u64; - return SystemTime::UNIX_EPOCH - .checked_add(Duration::new(time, 0)) - .ok_or_else(|| { - MetricsCollectError::MetricParseFailure( - "Replica last start value not convertible into system time".to_string(), - ) - }); - } - - Err(MetricsCollectError::MetricParseFailure( - "Replica last start metric key found, but missing value".to_string(), - )) -} - -fn extract_peer_counts_for_metric( - parse: &Scrape, - metric_name: &str, -) -> Result, MetricsCollectError> { - let filtered_prometheus_scrape: Vec<&Sample> = parse - .samples - .iter() - .filter(|&sample| sample.metric == metric_name) - .collect(); - - let mut metric_for_peers = HashMap::new(); - - if filtered_prometheus_scrape.is_empty() { - // A missing metric may or may not be interpreted as an error. Pass an empty response and let the caller decide - return Ok(metric_for_peers); - } - - for sample in filtered_prometheus_scrape { - let node_id_str = sample.labels.get(PEER_ID_LABEL).ok_or_else(|| { - MetricsCollectError::MetricParseFailure(format!( - "Missing peer id label for metric {:?}", - metric_name - )) - })?; - // convert node id str into NodeId - let principal_id = PrincipalId::from_str(node_id_str).map_err(|e| { - MetricsCollectError::MetricParseFailure(format!( - "Could not convert string to Principal Id {:?}", - e - )) - })?; - let node_id = NodeId::from(principal_id); - - let count = match sample.value { - Counter(count) | Value::Gauge(count) => { - // TODO(NET-1366): Handle any unexpected negative numbers - count as u64 - } - _ => { - return Err(MetricsCollectError::MetricParseFailure(format!( - "Unsupported value for metric {:?} could not be converted into count", - metric_name - ))) - } - }; - metric_for_peers.insert(node_id, count); - } - - Ok(metric_for_peers) -} - -// A helper function to isolate the counts for the current reporting interval. -// If replica restarted since the last report, then the latest cumulative counts are already within -// the report window -pub fn derive_peer_counters_for_current_report_interval( - metrics_report_start: &NonSampledMetrics, - metrics_report_end: &NonSampledMetrics, -) -> HashMap { - if metrics_report_end.replica_last_start_time > metrics_report_start.replica_last_start_time { - return metrics_report_end.peer_metrics.clone(); - } - - // Used as a no-op baseline if a new peer is added between reports - let zero_counters = PeerCounterMetrics::default(); - - let delta_peer_metrics_map = metrics_report_end - .peer_metrics - .iter() - .map(|(peer_id, latest_metric_counts)| { - let baseline_counts = metrics_report_start - .peer_metrics - .get(peer_id) - .unwrap_or(&zero_counters); - (*peer_id, latest_metric_counts.subtract(baseline_counts)) - }) - .collect(); - - // We cannot modify the existing struct directly since we need to preserve the cumulative counts - // as a baseline for the following report, so store the delta counts as a new struct - delta_peer_metrics_map -} diff --git a/rs/monitoring/onchain_observability/adapter/src/sampled_metrics_collector.rs b/rs/monitoring/onchain_observability/adapter/src/sampled_metrics_collector.rs deleted file mode 100644 index a4d418e6363..00000000000 --- a/rs/monitoring/onchain_observability/adapter/src/sampled_metrics_collector.rs +++ /dev/null @@ -1,132 +0,0 @@ -use crate::{error_types::MetricsCollectError, OnchainObservabilityAdapterMetrics}; -use ic_onchain_observability_service::{ - onchain_observability_service_client::OnchainObservabilityServiceClient, - OnchainObservabilityServiceGetMetricsDataRequest, -}; -use prometheus_parse::{Sample, Scrape, Value}; -use std::{collections::HashMap, time::Duration}; -use tonic::transport::Channel; - -const CONNECTED_STATE: u64 = 3; -const CONNECTION_STATE_METRIC: &str = "transport_connection_state"; -const PEER_ID: &str = "peer_id"; -const TIMEOUT_LENGTH_SEC: Duration = Duration::from_secs(30); - -// TODO(NET-1365) Come up with more generic interface for fetching multiple metrics + consolidate shared code with sampling code - -// Provides an API for fetching metrics periodically and aggregating results. -// Intended usage: Call sample() until desire sample size is received. Then call aggregate() to return result. Finally, call clear() to reset the counters. -pub struct SampledMetricsCollector { - num_samples: u64, - connected_state_count: HashMap, - client: OnchainObservabilityServiceClient, - adapter_metrics: OnchainObservabilityAdapterMetrics, -} - -impl SampledMetricsCollector { - pub fn new( - client: OnchainObservabilityServiceClient, - adapter_metrics: OnchainObservabilityAdapterMetrics, - ) -> Self { - SampledMetricsCollector { - num_samples: 0, - connected_state_count: HashMap::new(), - client, - adapter_metrics, - } - } - - pub async fn sample(&mut self) -> Result<(), MetricsCollectError> { - let request = OnchainObservabilityServiceGetMetricsDataRequest { - requested_metrics: vec![CONNECTION_STATE_METRIC.to_string()], - }; - - let mut tonic_request = tonic::Request::new(request.clone()); - tonic_request.set_timeout(TIMEOUT_LENGTH_SEC); - - match self.client.clone().get_metrics_data(tonic_request).await { - Ok(response) => { - let parsed_data = peer_label_and_connected_state_from_raw_response( - response.into_inner().metrics_data, - )?; - // Track the counts of "connected state". Later, we will transform into a % - for (peer_label, connection_state) in parsed_data { - if connection_state == CONNECTED_STATE { - *self.connected_state_count.entry(peer_label).or_insert(0) += 1; - } - } - self.num_samples += 1; - Ok(()) - } - Err(status) => { - self.adapter_metrics - .failed_grpc_requests_total - .with_label_values(&["sampled", &status.code().to_string()]) - .inc(); - - Err(MetricsCollectError::RpcRequestFailure(format!( - "Request failed {:?}", - status.code() - ))) - } - } - } - - pub fn aggregate(&mut self) -> HashMap { - // Peer up time = (# responses in connected state) / (total responses) - let mut up_time = HashMap::new(); - if self.num_samples == 0 { - return up_time; - } - - for (peer_id, connected_count) in self.connected_state_count.iter() { - let percent = ((*connected_count as f32) * 100.0) / (self.num_samples as f32); - up_time.insert(peer_id.clone(), percent); - } - up_time - } - - pub fn clear(&mut self) { - self.num_samples = 0; - self.connected_state_count = HashMap::new(); - } -} - -// Since metrics endpoint is a giant string, we must manually scrape relevant data -// Derive connection state using the transport_connection_state metrics -// Assumes endpoint response is well formatted -fn peer_label_and_connected_state_from_raw_response( - response: String, -) -> Result, MetricsCollectError> { - let mut res = vec![]; - - let lines = response.lines().map(|s| Ok(s.to_string())); - let metrics = Scrape::parse(lines).map_err(|_| { - MetricsCollectError::RpcRequestFailure("prometheus scrape error".to_string()) - })?; - - // Get peer *label* and connected state from transport connection state metric - let connection_state_metric: Vec<_> = metrics - .samples - .iter() - .filter(|&sample| sample.metric == CONNECTION_STATE_METRIC) - .collect(); - - for sample in connection_state_metric { - let peer_label = sample.labels.get(PEER_ID).ok_or_else(|| { - MetricsCollectError::MetricParseFailure("Failed to get peer id label".to_string()) - })?; - let connection_state = get_connection_state(sample)?; - res.push((peer_label.to_string(), connection_state)); - } - Ok(res) -} - -fn get_connection_state(sample: &Sample) -> Result { - if let Value::Gauge(connection_state) = sample.value { - return Ok(connection_state as u64); - } - Err(MetricsCollectError::MetricParseFailure( - ("Failed to find connection state").to_string(), - )) -} diff --git a/rs/monitoring/onchain_observability/canister/BUILD.bazel b/rs/monitoring/onchain_observability/canister/BUILD.bazel deleted file mode 100644 index d603a365441..00000000000 --- a/rs/monitoring/onchain_observability/canister/BUILD.bazel +++ /dev/null @@ -1,51 +0,0 @@ -load("@rules_rust//rust:defs.bzl", "rust_library") -load("//bazel:canisters.bzl", "rust_canister") - -package(default_visibility = ["//visibility:public"]) - -exports_files(["ic_certified_blockchain.did"]) - -DEPENDENCIES = [ - "@crate_index//:candid", - "@crate_index//:hex", - "@crate_index//:ic-certified-map", - "@crate_index//:ic-cdk", - "@crate_index//:ic-cdk-timers", - "@crate_index//:ic-stable-structures", - "@crate_index//:num", - "@crate_index//:num-traits", - "@crate_index//:prost", - "@crate_index//:serde", - "@crate_index//:serde_bytes", - "@crate_index//:serde_cbor", - "@crate_index//:sha2", - "@crate_index//:url", -] - -MACRO_DEPENDENCIES = [ - "@crate_index//:ic-cdk-macros", - "@crate_index//:num-derive", -] - -ALIASES = {} - -rust_canister( - name = "ic-certified-blockchain", - srcs = ["src/main.rs"], - aliases = ALIASES, - proc_macro_deps = MACRO_DEPENDENCIES, - service_file = "ic_certified_blockchain.did", - deps = DEPENDENCIES + [":ic-certified-blockchain-lib"], -) - -rust_library( - name = "ic-certified-blockchain-lib", - srcs = glob([ - "src/*.rs", - ]), - aliases = ALIASES, - crate_name = "ic_certified_blockchain_lib", - proc_macro_deps = MACRO_DEPENDENCIES, - version = "0.1.0", - deps = DEPENDENCIES, -) diff --git a/rs/monitoring/onchain_observability/canister/Cargo.toml b/rs/monitoring/onchain_observability/canister/Cargo.toml deleted file mode 100644 index 54bbd8c88d5..00000000000 --- a/rs/monitoring/onchain_observability/canister/Cargo.toml +++ /dev/null @@ -1,27 +0,0 @@ -[package] -name = "ic-certified-blockchain-lib" -version = "0.1.0" -edition = "2021" - -[dependencies] -candid = { workspace = true } -hex = "0.4.3" -ic-certified-map = "0.3.1" -ic-cdk = { workspace = true } -ic-cdk-macros = { workspace = true } -ic-cdk-timers = { workspace = true } -ic-stable-structures = { workspace = true } -num = "0.4" -num-traits = "0.2.12" -num-derive = "0.3" -prost = "0.11" -serde = { version = "1.0.99", features = [ "derive" ] } -serde_bytes = "0.11" -serde_cbor = "0.11.2" -sha2 = "^0.10.2" -url = "2.1.1" - -[profile.release] -debug = false -lto = true -opt-level = 'z' diff --git a/rs/monitoring/onchain_observability/canister/README.md b/rs/monitoring/onchain_observability/canister/README.md deleted file mode 100644 index e7baa536f07..00000000000 --- a/rs/monitoring/onchain_observability/canister/README.md +++ /dev/null @@ -1,99 +0,0 @@ -# A Certified Blockchain - -This canister holds a chain of blocks, each of which is certified by the IC Root Key and contain both a certification time and a hash of previous blocks. - -The result is a irrefutable record independent of the controllers. The certified blockchain is public and available for remote backup. The canister can also owned by a detached canister e.g. https://github.com/ninegua/ic-blackhole or a DAO to ensure availability. - -## Blockchain - -The blockchain is a sequence of blocks of the format: - -``` -type Block = record { - // Certificate is signed by the NNS root key and contains the root of tree. - certificate: blob; - // Under b"certified_blocks is a map from i as u32 BE bytes to sha256(sha256(caller{i])sha256(data[i])) - // with an entry from "previous_hash" to previous_hash. - tree: blob; - // The raw data entries. - data: vec blob; - // Callers of prepare()/prepare_some() for corresponding "data". - callers: vec principal; - previous_hash: blob; -}; -``` - -The canister smart contract provides an API to store, find entries and retrieve blocks: - -``` -type Auth = variant { User; Admin }; -type Authorization = record { - id: principal; - auth: Auth; -}; - -service blockchain: (opt text) -> { - // Stage a block, returning the certified data for informational purposes. - // Traps if some data is already staged. - prepare: (data: vec blob) -> (blob); - // Stage some (more) data into a block, returning the hash of the root of tree for informational purposes. - prepare_some: (data: vec blob) -> (blob); - // Get certificate for the certified data. Returns None if nothing is staged. - get_certificate: () -> (opt blob) query; - // Append the staged data with certificate and tree. Traps if the certificate is stale. - // Returns None if there is nothing staged. - commit: (certificate: blob) -> (opt nat64); - // Get a certified block. - get_block: (index: nat64) -> (Block) query; - // Find block index with matching block hash or latest matching data entry hash. - find: (hash: blob) -> (opt nat64) query; - // Return the index of the first block stored. - first: () -> (nat64) query; - // Return the index of the start of the primary part (length of log - first() - secondary.len()). - mid: () -> (nat64) query; - // Return the index of the next block to be stored (the length of the log - first()). - next: () -> (nat64) query; - // Return hex string representing the hash of the last block or 0. - last_hash: () -> (text) query; - // Rotate the log by making the primary part secondary and deleting the old secondary and making it primary. - // Returns the new first index stored if the secondary part had anything to delete. - rotate: () -> (opt nat64); - // Manage the set of Principals allowed to prepare and append (User) or authorize (Admin). - authorize: (principal, Auth) -> (); - deauthorize: (principal) -> (); - get_authorized: () -> (vec Authorization) query; -} -``` - -## Certification - -The certificate contains an NNS signed delegation for the canister to the subnet which certifies the canister root hash along with the date. The canister root hash is the root of the Merkle tree containing the hashes of all the block entries. This enables each entry to be independently certified by extracting the corresponding path from the tree. Code to verify blocks is found in the `./verify` directory. - -Additional verifications e.g. the signature of the appender should be verified at the application level. - -## Storing Blocks - -A block is an array of byte arrays (entries). First the block is staged by calling `prepare()` which returns the tree root hash (for reference). Then the certificate is retrieved via `get_certificate()` and then the block is appended by calling `append()` with the certificate. Code to upload blocks is found in the `./store` directory. - -## Blockchain Persistence - -The canister smart contract stores all persistent data in stable memory. There is no provision for deleting or rewriting blocks short of reinstalling or deleting the canister. However, because the blocks are certified, they can be backed up remotely and validated offline. The blocks can even be transferred to a different canister smart contract by re-storing the blocks and substituting the original certificate during the `append()` phase. - -## Usage - -### Single Writer - -A single writer should use `prepare()` then `get_certificate()` then `append()`. An error in `prepare()` means that there is already a prepared block which needs `get_certificate()` then `append()`. An error in `get_certificate()` or `append()` mean that there is no prepared block or that the certificate is stale. The client should use `get_block()` to determine if the data has already been written and retry if not. - -### Multiple Writer - -Multiple writers can either use the single writer workflow or they can all call `prepare_some()` and then `get_certificate()` followed by `append()` recognizing that the `get_certificate()` `append()` commit sequence might fail if there is a race. Use of `prepare_some()` may result in higher throughput. Clients may defer or retry the commit sequence until `get_certificate()` returns None. Note that there is no provision in this code for DOS prevention although callers of `prepare_some()` are recorded which may be of some use. - -### Log Rotation - -In some use cases it may be desirable to backup and remove old blocks from the canister smart contract. Since the committed log entries are individually certified, they can be verified independent of the smart contract so the backup can be used as a primary source. Safe backup and clearing of old log entries is done via a process of log rotation. Internally the blockchain log is broken up into a primary part and a secondary part. Periodically a backup agent should `get_block()` all blocks between `first()` and `mid()` (the first index beyond the secondary part) then call `rotate()` which makes the primary secondary, deletes the data in the old secondary and makes it primary. Note that log indexes are preserved (do not change) over time and that `find()` continues to work for entries in both the primary and secondary parts of the log. - -### Dependencies - -* rustup, cargo, rustc with wasm -* hash\_tree.rs is copied from github.com/dfinity/agent-rs/src/hash\_tree/mod.rs diff --git a/rs/monitoring/onchain_observability/canister/canister_ids.json b/rs/monitoring/onchain_observability/canister/canister_ids.json deleted file mode 100644 index 25856ce3918..00000000000 --- a/rs/monitoring/onchain_observability/canister/canister_ids.json +++ /dev/null @@ -1,5 +0,0 @@ -{ - "ic-certified-blockchain": { - "ic": "3kvk3-xyaaa-aaaae-qaesq-cai" - } -} diff --git a/rs/monitoring/onchain_observability/canister/dfx.json b/rs/monitoring/onchain_observability/canister/dfx.json deleted file mode 100644 index 9b6f7038391..00000000000 --- a/rs/monitoring/onchain_observability/canister/dfx.json +++ /dev/null @@ -1,22 +0,0 @@ -{ - "canisters": { - "ic-certified-blockchain": { - "candid": "ic_certified_blockchain.did", - "package": "ic-certified-blockchain", - "type": "rust" - } - }, - "networks": { - "local": { - "bind": "127.0.0.1:8080", - "replica": { - "subnet_type": "system" - } - }, - "staging": { - "providers": [ "https://ic0.app" ], - "type": "persistent" - } - }, - "version": 1 -} diff --git a/rs/monitoring/onchain_observability/canister/ic_certified_blockchain.did b/rs/monitoring/onchain_observability/canister/ic_certified_blockchain.did deleted file mode 100644 index 9d01576a3fc..00000000000 --- a/rs/monitoring/onchain_observability/canister/ic_certified_blockchain.did +++ /dev/null @@ -1,50 +0,0 @@ -type Block = record { - // Certificate is signed by the NNS root key and contains the root of tree. - certificate: blob; - // Under b"certified_blocks is a map from i as u32 BE bytes to sha256(sha256(caller{i])sha256(data[i])) - // with an entry from "previous_hash" to previous_hash. - tree: blob; - // The raw data entries. - data: vec blob; - // Callers of prepare()/prepare_some() for corresponding "data". - callers: vec principal; - previous_hash: blob; -}; - -type Auth = variant { User; Admin }; -type Authorization = record { - id: principal; - auth: Auth; -}; - -service blockchain: (opt text) -> { - // Stage a block, returning the certified data for informational purposes. - // Traps if some data is already staged. - prepare: (data: vec blob) -> (blob); - // Stage some (more) data into a block, returning the hash of the root of tree for informational purposes. - prepare_some: (data: vec blob) -> (blob); - // Get certificate for the certified data. Returns None if nothing is staged. - get_certificate: () -> (opt blob) query; - // Append the staged data with certificate and tree. Traps if the certificate is stale. - // Returns None if there is nothing staged. - commit: (certificate: blob) -> (opt nat64); - // Get a certified block. - get_block: (index: nat64) -> (Block) query; - // Find block index with matching block hash or latest matching data entry hash. - find: (hash: blob) -> (opt nat64) query; - // Return the index of the first block stored. - first: () -> (nat64) query; - // Return the index of the start of the primary part (length of log - first() - secondary.len()). - mid: () -> (nat64) query; - // Return the index of the next block to be stored (the length of the log - first()). - next: () -> (nat64) query; - // Return hex string representing the hash of the last block or 0. - last_hash: () -> (text) query; - // Rotate the log by making the primary part secondary and deleting the old secondary and making it primary. - // Returns the new first index stored if the secondary part had anything to delete. - rotate: () -> (opt nat64); - // Manage the set of Principals allowed to prepare and append (User) or authorize (Admin). - authorize: (principal, Auth) -> (); - deauthorize: (principal) -> (); - get_authorized: () -> (vec Authorization) query; -} diff --git a/rs/monitoring/onchain_observability/canister/src/block.rs b/rs/monitoring/onchain_observability/canister/src/block.rs deleted file mode 100644 index 96198448fde..00000000000 --- a/rs/monitoring/onchain_observability/canister/src/block.rs +++ /dev/null @@ -1,15 +0,0 @@ -use candid::{CandidType, Deserialize, Principal}; -use ic_certified_map::Hash; - -pub type Blob = Vec; -pub type Data = Vec; -pub type Callers = Vec; - -#[derive(Clone, Debug, CandidType, Deserialize)] -pub struct Block { - pub certificate: Blob, - pub tree: Blob, - pub data: Data, - pub callers: Callers, - pub previous_hash: Hash, -} diff --git a/rs/monitoring/onchain_observability/canister/src/hash_tree.rs b/rs/monitoring/onchain_observability/canister/src/hash_tree.rs deleted file mode 100644 index 2790e18c47a..00000000000 --- a/rs/monitoring/onchain_observability/canister/src/hash_tree.rs +++ /dev/null @@ -1,531 +0,0 @@ -use sha2::Digest; -use std::borrow::Cow; - -/// Sha256 Digest: 32 bytes -pub type Sha256Digest = [u8; 32]; - -#[derive(Clone, Hash, Ord, PartialOrd, Eq, PartialEq, serde::Deserialize)] -#[serde(from = "&serde_bytes::Bytes")] -#[serde(into = "&serde_bytes::ByteBuf")] -/// For labeled [HashTreeNode] -pub struct Label(Vec); - -impl Label { - /// Returns this label as bytes. - pub fn as_bytes(&self) -> &[u8] { - self.0.as_ref() - } -} - -impl From