From 0ad47cff113a1b6bc5941aad995d51d4052cf1db Mon Sep 17 00:00:00 2001 From: Anton Yemelyanov Date: Thu, 1 Aug 2024 02:24:49 +0300 Subject: [PATCH] fix tls:any lookup issues --- Cargo.lock | 2 +- Cargo.toml | 4 +- src/args.rs | 5 +++ src/connection.rs | 61 +++++++++++++++++++++--------- src/imports.rs | 3 +- src/monitor.rs | 95 ++++++++++++++++++++++++++++++++++++++--------- src/params.rs | 62 +++++++++++++++++++++++++++---- src/status.rs | 1 - src/transport.rs | 32 +++++----------- 9 files changed, 195 insertions(+), 70 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5c90893..befc211 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2314,7 +2314,7 @@ dependencies = [ [[package]] name = "kaspa-resolver" -version = "0.3.0" +version = "0.4.0" dependencies = [ "ahash", "arc-swap", diff --git a/Cargo.toml b/Cargo.toml index 78abb7b..3705ce6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "kaspa-resolver" -description = "Kaspa wRPC endpoint resolver and monitor" -version = "0.3.0" +description = "Kaspa RPC endpoint resolver" +version = "0.4.0" edition = "2021" # authors.workspace = true # include.workspace = true diff --git a/src/args.rs b/src/args.rs index 81541c1..05b7687 100644 --- a/src/args.rs +++ b/src/args.rs @@ -24,6 +24,8 @@ pub struct Args { pub verbose: bool, /// Tracing mode pub trace: bool, + /// Debug mode + pub debug: bool, /// Auto-update pub auto_update: bool, /// Custom config file @@ -48,6 +50,7 @@ impl Args { .arg(arg!(--version "Display software version")) .arg(arg!(--verbose "Enable verbose logging")) .arg(arg!(--trace "Enable trace log level")) + .arg(arg!(--debug "Enable additional debug output")) // .arg(arg!(--auto-update "Poll configuration updates")) // .arg(arg!(--election "Show node data on each election")) // .arg(arg!(--status "Enable `/status` endpoint")) @@ -93,6 +96,7 @@ impl Args { let trace = matches.get_one::("trace").cloned().unwrap_or(false); let verbose = matches.get_one::("verbose").cloned().unwrap_or(false); + let debug = matches.get_one::("debug").cloned().unwrap_or(false); let auto_update = matches .get_one::("auto-update") .cloned() @@ -166,6 +170,7 @@ impl Args { Args { trace, verbose, + debug, auto_update, user_config, // election, diff --git a/src/connection.rs b/src/connection.rs index 2d25785..868e755 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -19,7 +19,6 @@ impl fmt::Display for Connection { #[derive(Debug)] pub struct Connection { args: Arc, - // caps: Arc>>, caps: ArcSwapOption, is_synced: AtomicBool, clients: AtomicU64, @@ -79,57 +78,72 @@ impl Connection { self.args.verbose } + /// Represents the connection score, which is currently + /// the number of sockets (clients + peers) the node has. #[inline] - pub fn score(&self) -> u64 { - self.clients.load(Ordering::Relaxed) + pub fn score(self: &Arc) -> u64 { + self.delegate().sockets() } + /// Connection availability state. #[inline] - pub fn is_available(&self) -> bool { - self.is_delegate() - && self.online() - && self.caps.load().as_ref().as_ref().is_some_and(|caps| { - let clients = self.clients(); - let peers = self.peers(); + pub fn is_available(self: &Arc) -> bool { + let delegate = self.delegate(); + + self.is_connected() + && delegate.is_online() + && delegate.caps.load().as_ref().as_ref().is_some_and(|caps| { + let clients = delegate.clients(); + let peers = delegate.peers(); clients < caps.clients_limit && clients + peers < caps.fd_limit }) } + /// Indicates if the connection RPC is connected. #[inline] - pub fn connected(&self) -> bool { + pub fn is_connected(&self) -> bool { self.is_connected.load(Ordering::Relaxed) } + /// Indicates if the connection is available as a general + /// concept: no errors have occurred during RPC calls + /// and the node is in synced synced. #[inline] - pub fn online(&self) -> bool { + pub fn is_online(&self) -> bool { self.is_online.load(Ordering::Relaxed) } + /// Indicates if the node is in synced state. #[inline] pub fn is_synced(&self) -> bool { self.is_synced.load(Ordering::Relaxed) } + /// Number of RPC clients connected to the node. #[inline] pub fn clients(&self) -> u64 { self.clients.load(Ordering::Relaxed) } + /// Number of p2p peers connected to the node. #[inline] pub fn peers(&self) -> u64 { self.peers.load(Ordering::Relaxed) } + /// Total number of TCP sockets connected to the node. #[inline] pub fn sockets(&self) -> u64 { self.clients() + self.peers() } + /// Node capabilities (partial system spec, see [`Caps`]) #[inline] pub fn caps(&self) -> Option> { self.caps.load().clone() } + /// Unique system (machine) identifier of the node. #[inline] pub fn system_id(&self) -> u64 { self.caps @@ -139,26 +153,37 @@ impl Connection { .unwrap_or_default() } + /// Connection address (URL). #[inline] pub fn address(&self) -> &str { self.node.address.as_str() } + /// Node configuration parameters used to create this connection. #[inline] pub fn node(&self) -> &Arc { &self.node } + /// Connection parameters used to create this connection. + #[inline] + pub fn params(&self) -> PathParams { + self.params + } + + /// Network id of the node. #[inline] pub fn network_id(&self) -> NetworkId { self.node.network } + /// Indicates if the connection is a delegate. #[inline] pub fn is_delegate(&self) -> bool { self.delegate.load().is_none() } + /// Get the delegate of this connection. #[inline] pub fn delegate(self: &Arc) -> Arc { match (**self.delegate.load()).clone() { @@ -167,12 +192,17 @@ impl Connection { } } + /// Associate a delegate to this connection. A delegate is a primary + /// connection to the node that does actual performance monitoring + /// while non-delegate connections remain idle in a keep-alive state. #[inline] pub fn bind_delegate(&self, delegate: Option>) { self.delegate.store(Arc::new(delegate)); } - pub fn resolve_delegates(self: &Arc) -> Vec> { + /// Creates a list of delegators for this connection, where the last + /// entry is the delegate. + pub fn resolve_delegators(self: &Arc) -> Vec> { let mut delegates = Vec::new(); let mut delegate = (*self).clone(); while let Some(next) = (**delegate.delegate.load()).clone() { @@ -183,7 +213,7 @@ impl Connection { } pub fn status(&self) -> &'static str { - if self.connected() { + if self.is_connected() { if !self.is_delegate() { "delegator" } else if self.is_synced() { @@ -207,14 +237,11 @@ impl Connection { let shutdown_ctl_receiver = self.shutdown_ctl.request.receiver.clone(); let shutdown_ctl_sender = self.shutdown_ctl.response.sender.clone(); - // let mut ttl = sleep(TtlSettings::period()); let mut ttl = TtlSettings::ttl(); // TODO - delegate state changes inside `update_state()`! let mut poll = if self.is_delegate() { - // workflow_core::task:: interval(SyncSettings::poll()) } else { - // workflow_core::task:: interval(SyncSettings::ping()) }; @@ -474,7 +501,7 @@ impl<'a> From<&'a Arc> for Status<'a> { .unwrap_or_else(|| ("n/a".to_string(), 0, 0, 0, 0)); let delegates = connection - .resolve_delegates() + .resolve_delegators() .iter() .map(|connection| format!("[{:016x}] {}", connection.system_id(), connection.address())) .collect::>(); diff --git a/src/imports.rs b/src/imports.rs index 505b8b1..70e2cec 100644 --- a/src/imports.rs +++ b/src/imports.rs @@ -34,8 +34,7 @@ pub use arc_swap::{ArcSwap, ArcSwapOption}; pub use cliclack::{log, outro}; pub use enum_dispatch::enum_dispatch; pub use futures::{select, FutureExt, StreamExt}; -pub use kaspa_consensus_core::network::NetworkId; -// pub use kaspa_rpc_core::api::rpc::RpcApi; +pub use kaspa_consensus_core::network::{NetworkId, NetworkType}; pub use kaspa_utils::hex::*; pub use rand::Rng; pub use serde::{de::DeserializeOwned, Deserialize, Serialize}; diff --git a/src/monitor.rs b/src/monitor.rs index 6d69fe0..8462c01 100644 --- a/src/monitor.rs +++ b/src/monitor.rs @@ -24,7 +24,7 @@ impl fmt::Debug for Monitor { impl Monitor { pub fn new(args: &Arc, service: Service) -> Self { - let sorts = PathParams::iter() + let sorts = PathParams::iter_tls_any() .map(|params| (params, AtomicBool::new(false))) .collect(); @@ -52,12 +52,9 @@ impl Monitor { } pub fn to_vec(&self) -> Vec> { - self.connections - .read() - .unwrap() - .values() + PathParams::iter_tls_strict() + .filter_map(|params| self.connections.read().unwrap().get(¶ms).cloned()) .flatten() - .cloned() .collect() } @@ -78,7 +75,10 @@ impl Monitor { let mut connections = self.connections(); - for params in PathParams::iter() { + let mut tls_any_created = Vec::new(); + let mut tls_any_removed = Vec::new(); + + for params in PathParams::iter_tls_strict() { let nodes = nodes .iter() .filter(|node| node.params() == ¶ms) @@ -105,22 +105,45 @@ impl Monitor { &self.args, )?); created.start()?; - list.push(created); + list.push(created.clone()); + tls_any_created.push(created); } for removed in remove { removed.stop().await?; list.retain(|c| c.node() != removed.node()); + tls_any_removed.push(removed); } } - let targets = AHashMap::group_from(connections.values().flatten().map(|c| { - ( - c.node().network_node_uid(), - c.node().transport_kind(), - c.clone(), - ) - })); + // remove connections from TlsAny list + tls_any_removed.into_iter().for_each(|connection| { + let params = connection.params().to_tls(TlsKind::Any); + let list = connections.entry(params).or_default(); + list.retain(|connection| connection.node() != connection.node()); + }); + + // create connections in TlsAny list + tls_any_created.into_iter().for_each(|connection| { + let params = connection.params().to_tls(TlsKind::Any); + let list = connections.entry(params).or_default(); + list.push(connection); + }); + + // collect all strict Tls connections and group them by network_uid (fqdn+network+tls) + let targets = AHashMap::group_from( + connections + .iter() + .filter_map(|(params, list)| params.is_tls_strict().then_some(list)) + .flatten() + .map(|connection| { + ( + connection.node().network_node_uid(), + connection.node().transport_kind(), + connection.clone(), + ) + }), + ); for (_network_uid, transport_map) in targets.iter() { if let Some(wrpc_borsh) = transport_map.get(&TransportKind::WrpcBorsh) { @@ -132,6 +155,23 @@ impl Monitor { } } + if self.args.debug { + for params in PathParams::iter_tls_any() { + println!("{}:{}", self.service, params); + if let Some(connections) = connections.get(¶ms) { + if connections.is_empty() { + println!("\t- None (0)"); + } else { + for connection in connections { + println!("\t- {}", connection); + } + } + } else { + println!("\t- N/A"); + } + } + } + *self.connections.write().unwrap() = connections; Ok(()) @@ -191,16 +231,37 @@ impl Monitor { Ok(()) } - pub fn schedule_sort(&self, params: &PathParams) { + pub fn schedule_sort(&self, params_tls_kind: &PathParams) { self.sorts - .get(params) + .get(params_tls_kind) + .unwrap() + .store(true, Ordering::Relaxed); + + let params_tls_any = params_tls_kind.to_tls(TlsKind::Any); + self.sorts + .get(¶ms_tls_any) .unwrap() .store(true, Ordering::Relaxed); } // /// Get JSON string representing node information (id, url, provider, link) pub fn election(&self, params: &PathParams) -> Option { + if self.verbose() { + println!("election for: {}", params); + } + let connections = self.connections.read().unwrap(); + + if self.verbose() { + if let Some(connections) = connections.get(params) { + for connection in connections { + println!("\t- {}", connection); + } + } else { + println!("\t- N/A"); + } + } + let connections = connections .get(params)? .iter() diff --git a/src/params.rs b/src/params.rs index 9f99bbd..3743ccf 100644 --- a/src/params.rs +++ b/src/params.rs @@ -1,10 +1,23 @@ use crate::imports::*; +pub static NETWORKS: &[NetworkId] = &[ + NetworkId::new(NetworkType::Mainnet), + NetworkId::with_suffix(NetworkType::Testnet, 10), + NetworkId::with_suffix(NetworkType::Testnet, 11), + // NetworkId::new(NetworkType::Devnet), + // NetworkId::new(NetworkType::Simnet), +]; + +pub static TRANSPORTS: &[TransportKind] = &[ + TransportKind::WrpcBorsh, + TransportKind::WrpcJson, + // TransportKind::Grpc, +]; + #[derive(Debug, Deserialize, Serialize, Clone, Copy, PartialEq, Eq, Hash)] pub struct PathParams { pub protocol: ProtocolKind, pub encoding: EncodingKind, - // pub transport_kind: TransportKind, pub network: NetworkId, pub tls: TlsKind, } @@ -22,14 +35,33 @@ impl PathParams { } } - pub fn iter() -> impl Iterator { - NetworkId::iter().flat_map(move |network_id| { - TransportKind::iter() + // iterates only TlsKind::Tls and TlsKind::None variants + pub fn iter_tls_strict() -> impl Iterator { + NETWORKS.iter().flat_map(move |network_id| { + TRANSPORTS + .iter() + .map(move |transport_kind| { + PathParams::new(*transport_kind, TlsKind::Tls, *network_id) + }) + .chain(TRANSPORTS.iter().map(move |transport_kind| { + PathParams::new(*transport_kind, TlsKind::None, *network_id) + })) + }) + } + + // iterates TlsKind::Tls, TlsKind::None, and TlsKind::Any variants + pub fn iter_tls_any() -> impl Iterator { + NETWORKS.iter().flat_map(move |network_id| { + TRANSPORTS + .iter() .map(move |transport_kind| { - PathParams::new(*transport_kind, TlsKind::Tls, network_id) + PathParams::new(*transport_kind, TlsKind::Tls, *network_id) }) - .chain(TransportKind::iter().map(move |transport_kind| { - PathParams::new(*transport_kind, TlsKind::None, network_id) + .chain(TRANSPORTS.iter().map(move |transport_kind| { + PathParams::new(*transport_kind, TlsKind::None, *network_id) + })) + .chain(TRANSPORTS.iter().map(move |transport_kind| { + PathParams::new(*transport_kind, TlsKind::Any, *network_id) })) }) } @@ -48,11 +80,25 @@ impl PathParams { pub fn tls(&self) -> TlsKind { self.tls } + + #[inline] + pub fn to_tls(self, tls: TlsKind) -> Self { + Self { tls, ..self } + } + + #[inline] + pub fn is_tls_strict(&self) -> bool { + matches!(self.tls, TlsKind::Tls | TlsKind::None) + } } impl fmt::Display for PathParams { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "{}:{}:{}", self.protocol, self.encoding, self.network) + write!( + f, + "{}:{}:{}:{}", + self.tls, self.protocol, self.encoding, self.network + ) } } diff --git a/src/status.rs b/src/status.rs index ac61635..ef3639e 100644 --- a/src/status.rs +++ b/src/status.rs @@ -149,7 +149,6 @@ pub async fn json_handler(resolver: &Arc, req: Request) -> impl pub async fn status_handler(resolver: &Arc, req: RequestKind) -> impl IntoResponse { let ctx = resolve_session(resolver, &req); - println!("ctx: {:?}", ctx); match ctx { Ok((Some(session), cookie)) => { session.touch(); diff --git a/src/transport.rs b/src/transport.rs index a99ff53..d036ca2 100644 --- a/src/transport.rs +++ b/src/transport.rs @@ -1,6 +1,5 @@ use crate::imports::*; -// #[derive(Debug, Clone, Copy, Serialize, Deserialize, Eq, Hash)] #[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Hash)] #[serde(rename_all = "lowercase")] pub enum TlsKind { @@ -9,19 +8,16 @@ pub enum TlsKind { Any, } -// impl PartialEq for TlsKind { -// fn eq(&self, other: &Self) -> bool { -// matches!( -// (self, other), -// (TlsKind::Tls, TlsKind::Tls) -// | (TlsKind::None, TlsKind::None) -// | (TlsKind::Any, TlsKind::Tls) -// | (TlsKind::Any, TlsKind::None) -// | (TlsKind::Tls, TlsKind::Any) -// | (TlsKind::None, TlsKind::Any) -// ) -// } -// } +impl Display for TlsKind { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + let s = match self { + TlsKind::Tls => "tls", + TlsKind::None => "none", + TlsKind::Any => "any", + }; + f.write_str(s) + } +} impl From for TlsKind { fn from(b: bool) -> Self { @@ -95,14 +91,6 @@ impl Display for TransportKind { } impl TransportKind { - // pub fn state_aggregator(&self) -> bool { - // match self { - // TransportKind::WrpcBorsh => true, - // TransportKind::WrpcJson => false, - // TransportKind::Grpc => false, - // } - // } - pub fn protocol(&self) -> ProtocolKind { match self { TransportKind::WrpcBorsh => ProtocolKind::Wrpc,