Skip to content

Commit

Permalink
fix node version update on reconnect + improve connection balancing a…
Browse files Browse the repository at this point in the history
…cross non-delegate connections
  • Loading branch information
aspect committed Sep 11, 2024
1 parent a3864fc commit adc9044
Show file tree
Hide file tree
Showing 6 changed files with 57 additions and 18 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "kaspa-resolver"
description = "Kaspa RPC endpoint resolver"
version = "0.6.0"
version = "0.7.0"
edition = "2021"
# authors.workspace = true
# include.workspace = true
Expand Down
42 changes: 30 additions & 12 deletions src/connection.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
use crate::imports::*;

#[allow(dead_code)]
pub const BIAS_SCALE: u64 = 1_000_000;

impl fmt::Display for Connection {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let load = self
.load()
.map(|load| format!("{:1.2}%", load))
.unwrap_or_else(|| "n/a ".to_string());
write!(
f,
"[{:016x}:{:016x}] [{:>4}] {}",
"[{:016x}:{:016x}] [{:>4}] [{:>7}] {}",
self.system_id(),
self.node.uid(),
self.sockets(),
self.clients(),
load,
self.node.address
)
}
Expand Down Expand Up @@ -137,6 +139,14 @@ impl Connection {
self.clients() + self.peers()
}

/// Connection load as a ratio of clients to capacity.
pub fn load(&self) -> Option<f64> {
self.caps
.load()
.as_ref()
.map(|caps| self.clients() as f64 / caps.capacity as f64)
}

/// Node capabilities (partial system spec, see [`Caps`])
#[inline]
pub fn caps(&self) -> Option<Arc<Caps>> {
Expand Down Expand Up @@ -303,6 +313,9 @@ impl Connection {
log_success!("Connected","{}",self.node.address);
}
self.is_connected.store(true, Ordering::Relaxed);
// trigger caps reset
self.caps.store(None);
// update state
if self.update_state().await.is_ok() {
self.is_online.store(true, Ordering::Relaxed);
self.update();
Expand Down Expand Up @@ -366,15 +379,20 @@ impl Connection {
}

if self.caps().is_none() {
let last_system_id = self.caps().as_ref().map(|caps| caps.system_id());
let caps = self.client.get_caps().await?;
let delegate_key = Delegate::new(caps.system_id(), self.network_id());
let system_id = caps.system_id();
self.caps.store(Some(Arc::new(caps)));
let mut delegates = self.monitor.delegates().write().unwrap();
if let Some(delegate) = delegates.get(&delegate_key) {
self.bind_delegate(Some(delegate.clone()));
} else {
delegates.insert(delegate_key, self.clone());
self.bind_delegate(None);

if last_system_id != Some(system_id) {
let delegate_key = Delegate::new(system_id, self.network_id());
let mut delegates = self.monitor.delegates().write().unwrap();
if let Some(delegate) = delegates.get(&delegate_key) {
self.bind_delegate(Some(delegate.clone()));
} else {
delegates.insert(delegate_key, self.clone());
self.bind_delegate(None);
}
}
}

Expand Down
25 changes: 21 additions & 4 deletions src/monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,11 +252,22 @@ impl Monitor {

let connections = self.connections.read().unwrap();

const DELEGATES_ONLY: bool = true;

if self.verbose() {
if let Some(connections) = connections.get(params) {
for connection in connections {
println!("\t- {}", connection);
}
connections
.iter()
.filter(|connection| {
if DELEGATES_ONLY {
connection.is_delegate()
} else {
true
}
})
.for_each(|connection| {
println!("\t- {}", connection);
});
} else {
println!("\t- N/A");
}
Expand All @@ -265,7 +276,13 @@ impl Monitor {
let connections = connections
.get(params)?
.iter()
.filter(|connection| connection.is_available())
.filter(|connection| {
if DELEGATES_ONLY {
connection.is_delegate() && connection.is_available()
} else {
connection.is_available()
}
})
.collect::<Vec<_>>();

if !connections.is_empty() {
Expand Down
2 changes: 2 additions & 0 deletions src/rpc/kaspa.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ impl rpc::ClientT for Client {
let system_id = system_id
.and_then(|v| v[0..8].try_into().ok().map(u64::from_be_bytes))
.unwrap_or_default();
let capacity = fd_limit_actual.min(clients_limit);
// let system_id_hex_string = format!("{:016x}", system_id);
let git_hash = git_hash.as_ref().map(ToHex::to_hex);
Ok(Caps {
Expand All @@ -92,6 +93,7 @@ impl rpc::ClientT for Client {
cpu_physical_cores,
fd_limit: fd_limit_actual,
clients_limit,
capacity,
})
}

Expand Down
2 changes: 2 additions & 0 deletions src/rpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ pub struct Caps {
pub fd_limit: u64,
// number of available clients
pub clients_limit: u64,
// client capacity: min(fd_limit, clients_limit)
pub capacity: u64,
}

impl Caps {
Expand Down

0 comments on commit adc9044

Please sign in to comment.