From adc9044c4289d95952cad03e445c39554c66dd6b Mon Sep 17 00:00:00 2001 From: Anton Yemelyanov Date: Thu, 12 Sep 2024 02:39:32 +0300 Subject: [PATCH] fix node version update on reconnect + improve connection balancing across non-delegate connections --- Cargo.lock | 2 +- Cargo.toml | 2 +- src/connection.rs | 42 ++++++++++++++++++++++++++++++------------ src/monitor.rs | 25 +++++++++++++++++++++---- src/rpc/kaspa.rs | 2 ++ src/rpc/mod.rs | 2 ++ 6 files changed, 57 insertions(+), 18 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 2a29758..b6e90aa 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2268,7 +2268,7 @@ dependencies = [ [[package]] name = "kaspa-resolver" -version = "0.6.0" +version = "0.7.0" dependencies = [ "ahash", "arc-swap", diff --git a/Cargo.toml b/Cargo.toml index bda0e07..d8e95dc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "kaspa-resolver" description = "Kaspa RPC endpoint resolver" -version = "0.6.0" +version = "0.7.0" edition = "2021" # authors.workspace = true # include.workspace = true diff --git a/src/connection.rs b/src/connection.rs index 868e755..d80d10d 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -1,16 +1,18 @@ use crate::imports::*; -#[allow(dead_code)] -pub const BIAS_SCALE: u64 = 1_000_000; - impl fmt::Display for Connection { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let load = self + .load() + .map(|load| format!("{:1.2}%", load)) + .unwrap_or_else(|| "n/a ".to_string()); write!( f, - "[{:016x}:{:016x}] [{:>4}] {}", + "[{:016x}:{:016x}] [{:>4}] [{:>7}] {}", self.system_id(), self.node.uid(), - self.sockets(), + self.clients(), + load, self.node.address ) } @@ -137,6 +139,14 @@ impl Connection { self.clients() + self.peers() } + /// Connection load as a ratio of clients to capacity. + pub fn load(&self) -> Option { + self.caps + .load() + .as_ref() + .map(|caps| self.clients() as f64 / caps.capacity as f64) + } + /// Node capabilities (partial system spec, see [`Caps`]) #[inline] pub fn caps(&self) -> Option> { @@ -303,6 +313,9 @@ impl Connection { log_success!("Connected","{}",self.node.address); } self.is_connected.store(true, Ordering::Relaxed); + // trigger caps reset + self.caps.store(None); + // update state if self.update_state().await.is_ok() { self.is_online.store(true, Ordering::Relaxed); self.update(); @@ -366,15 +379,20 @@ impl Connection { } if self.caps().is_none() { + let last_system_id = self.caps().as_ref().map(|caps| caps.system_id()); let caps = self.client.get_caps().await?; - let delegate_key = Delegate::new(caps.system_id(), self.network_id()); + let system_id = caps.system_id(); self.caps.store(Some(Arc::new(caps))); - let mut delegates = self.monitor.delegates().write().unwrap(); - if let Some(delegate) = delegates.get(&delegate_key) { - self.bind_delegate(Some(delegate.clone())); - } else { - delegates.insert(delegate_key, self.clone()); - self.bind_delegate(None); + + if last_system_id != Some(system_id) { + let delegate_key = Delegate::new(system_id, self.network_id()); + let mut delegates = self.monitor.delegates().write().unwrap(); + if let Some(delegate) = delegates.get(&delegate_key) { + self.bind_delegate(Some(delegate.clone())); + } else { + delegates.insert(delegate_key, self.clone()); + self.bind_delegate(None); + } } } diff --git a/src/monitor.rs b/src/monitor.rs index 8462c01..d47a08c 100644 --- a/src/monitor.rs +++ b/src/monitor.rs @@ -252,11 +252,22 @@ impl Monitor { let connections = self.connections.read().unwrap(); + const DELEGATES_ONLY: bool = true; + if self.verbose() { if let Some(connections) = connections.get(params) { - for connection in connections { - println!("\t- {}", connection); - } + connections + .iter() + .filter(|connection| { + if DELEGATES_ONLY { + connection.is_delegate() + } else { + true + } + }) + .for_each(|connection| { + println!("\t- {}", connection); + }); } else { println!("\t- N/A"); } @@ -265,7 +276,13 @@ impl Monitor { let connections = connections .get(params)? .iter() - .filter(|connection| connection.is_available()) + .filter(|connection| { + if DELEGATES_ONLY { + connection.is_delegate() && connection.is_available() + } else { + connection.is_available() + } + }) .collect::>(); if !connections.is_empty() { diff --git a/src/rpc/kaspa.rs b/src/rpc/kaspa.rs index b60d50d..b591f5c 100644 --- a/src/rpc/kaspa.rs +++ b/src/rpc/kaspa.rs @@ -82,6 +82,7 @@ impl rpc::ClientT for Client { let system_id = system_id .and_then(|v| v[0..8].try_into().ok().map(u64::from_be_bytes)) .unwrap_or_default(); + let capacity = fd_limit_actual.min(clients_limit); // let system_id_hex_string = format!("{:016x}", system_id); let git_hash = git_hash.as_ref().map(ToHex::to_hex); Ok(Caps { @@ -92,6 +93,7 @@ impl rpc::ClientT for Client { cpu_physical_cores, fd_limit: fd_limit_actual, clients_limit, + capacity, }) } diff --git a/src/rpc/mod.rs b/src/rpc/mod.rs index 8b43fa4..979f144 100644 --- a/src/rpc/mod.rs +++ b/src/rpc/mod.rs @@ -22,6 +22,8 @@ pub struct Caps { pub fd_limit: u64, // number of available clients pub clients_limit: u64, + // client capacity: min(fd_limit, clients_limit) + pub capacity: u64, } impl Caps {