diff --git a/cli/src/cli.rs b/cli/src/cli.rs index 11ba2e261..8543541f2 100644 --- a/cli/src/cli.rs +++ b/cli/src/cli.rs @@ -159,12 +159,12 @@ impl KaspaCli { self.wallet.is_connected() } - pub fn rpc(&self) -> Arc { - self.wallet.rpc().clone() + pub fn rpc_api(&self) -> Arc { + self.wallet.rpc_api().clone() } - pub fn rpc_client(&self) -> Arc { - self.wallet.rpc_client().clone() + pub fn rpc_client(&self) -> Option> { + self.wallet.wrpc_client().clone() } pub fn store(&self) -> Arc { @@ -292,7 +292,7 @@ impl KaspaCli { }, #[allow(unused_variables)] Events::Disconnect{ url, network_id } => { - tprintln!(this, "Disconnected from {url}"); + tprintln!(this, "Disconnected from {}",url.unwrap_or("N/A".to_string())); this.term().refresh_prompt(); }, Events::UtxoIndexNotEnabled { .. } => { @@ -309,7 +309,7 @@ impl KaspaCli { .. } => { - tprintln!(this, "Connected to Kaspa node version {server_version} at {url}"); + tprintln!(this, "Connected to Kaspa node version {server_version} at {}", url.unwrap_or("N/A".to_string())); let is_open = this.wallet.is_open(); diff --git a/cli/src/metrics/metrics.rs b/cli/src/metrics/metrics.rs index d5dd7d7cb..4fe83c47b 100644 --- a/cli/src/metrics/metrics.rs +++ b/cli/src/metrics/metrics.rs @@ -64,7 +64,7 @@ impl Handler for Metrics { self.mute.store(mute, Ordering::Relaxed); } - self.rpc.lock().unwrap().replace(ctx.wallet().rpc().clone()); + self.rpc.lock().unwrap().replace(ctx.wallet().rpc_api().clone()); self.start_task(&ctx).await?; Ok(()) diff --git a/cli/src/modules/connect.rs b/cli/src/modules/connect.rs index b8633b8b7..3caa9eb75 100644 --- a/cli/src/modules/connect.rs +++ b/cli/src/modules/connect.rs @@ -7,11 +7,15 @@ pub struct Connect; impl Connect { async fn main(self: Arc, ctx: &Arc, argv: Vec, _cmd: &str) -> Result<()> { let ctx = ctx.clone().downcast_arc::()?; - let url = argv.first().cloned().or_else(|| ctx.wallet().settings().get(WalletSettings::Server)); - let network_type = ctx.wallet().network_id()?; - let url = ctx.wallet().rpc_client().parse_url_with_network_type(url, network_type.into()).map_err(|e| e.to_string())?; - let options = ConnectOptions { block_async_connect: true, strategy: ConnectStrategy::Fallback, url, timeout: None }; - ctx.wallet().rpc_client().connect(options).await.map_err(|e| e.to_string())?; + if let Some(wrpc_client) = ctx.wallet().wrpc_client().as_ref() { + let url = argv.first().cloned().or_else(|| ctx.wallet().settings().get(WalletSettings::Server)); + let network_type = ctx.wallet().network_id()?; + let url = wrpc_client.parse_url_with_network_type(url, network_type.into()).map_err(|e| e.to_string())?; + let options = ConnectOptions { block_async_connect: true, strategy: ConnectStrategy::Fallback, url, timeout: None }; + wrpc_client.connect(options).await.map_err(|e| e.to_string())?; + } else { + terrorln!(ctx, "Unable to connect with non-wRPC client"); + } Ok(()) } } diff --git a/cli/src/modules/disconnect.rs b/cli/src/modules/disconnect.rs index e6f934183..4687fb25c 100644 --- a/cli/src/modules/disconnect.rs +++ b/cli/src/modules/disconnect.rs @@ -7,7 +7,11 @@ pub struct Disconnect; impl Disconnect { async fn main(self: Arc, ctx: &Arc, _argv: Vec, _cmd: &str) -> Result<()> { let ctx = ctx.clone().downcast_arc::()?; - ctx.wallet().rpc_client().shutdown().await?; + if let Some(wrpc_client) = ctx.wallet().wrpc_client().as_ref() { + wrpc_client.shutdown().await?; + } else { + terrorln!(ctx, "Unable to disconnect from non-wRPC client"); + } Ok(()) } } diff --git a/cli/src/modules/node.rs b/cli/src/modules/node.rs index fbf0feca7..3fc1387e8 100644 --- a/cli/src/modules/node.rs +++ b/cli/src/modules/node.rs @@ -106,15 +106,15 @@ impl Node { tprintln!(ctx, "starting kaspa node... {}", style("(use 'node mute' to mute logging)").dim()); } + let wrpc_client = ctx.wallet().wrpc_client().ok_or(Error::custom("Unable to start node with non-wRPC client"))?; + kaspad.configure(self.create_config(&ctx).await?).await?; kaspad.start().await?; // temporary setup for autoconnect let url = ctx.wallet().settings().get(WalletSettings::Server); let network_type = ctx.wallet().network_id()?; - if let Some(url) = - ctx.wallet().rpc_client().parse_url_with_network_type(url, network_type.into()).map_err(|e| e.to_string())? - { + if let Some(url) = wrpc_client.parse_url_with_network_type(url, network_type.into()).map_err(|e| e.to_string())? { // log_info!("connecting to url: {}", url); if url.contains("127.0.0.1") || url.contains("localhost") { spawn(async move { @@ -126,7 +126,7 @@ impl Node { }; for _ in 0..5 { sleep(Duration::from_millis(1000)).await; - if ctx.wallet().rpc_client().connect(options.clone()).await.is_ok() { + if wrpc_client.connect(options.clone()).await.is_ok() { break; } } diff --git a/cli/src/modules/rpc.rs b/cli/src/modules/rpc.rs index 40de65018..820a09b46 100644 --- a/cli/src/modules/rpc.rs +++ b/cli/src/modules/rpc.rs @@ -16,7 +16,7 @@ impl Rpc { async fn main(self: Arc, ctx: &Arc, mut argv: Vec, cmd: &str) -> Result<()> { let ctx = ctx.clone().downcast_arc::()?; - let rpc = ctx.wallet().rpc().clone(); + let rpc = ctx.wallet().rpc_api().clone(); // tprintln!(ctx, "{response}"); if argv.is_empty() { diff --git a/rpc/core/src/api/ctl.rs b/rpc/core/src/api/ctl.rs new file mode 100644 index 000000000..5483dd4b0 --- /dev/null +++ b/rpc/core/src/api/ctl.rs @@ -0,0 +1,74 @@ +use crate::error::RpcResult; +use std::sync::{Arc, Mutex}; +use workflow_core::channel::Multiplexer; + +/// RPC channel control operations +#[derive(Debug, Clone)] +pub enum RpcCtlOp { + /// RpcApi channel open (connected) + Open, + /// RpcApi channel close (disconnected) + Close, +} + +#[derive(Default)] +struct Inner { + // MPMC channel for [`RpcCtlOp`] operations. + multiplexer: Multiplexer, + // Optional Connection descriptor such as a connection URL. + descriptor: Mutex>, +} + +/// RPC channel control helper. This is a companion +/// struct to [`RpcApi`](crate::api::RpcApi) that +/// provides signaling for RPC open/close events as +/// well as an optional connection descriptor (URL). +#[derive(Default, Clone)] +pub struct RpcCtl { + inner: Arc, +} + +impl RpcCtl { + pub fn new() -> Self { + Self { inner: Arc::new(Inner::default()) } + } + + pub fn with_descriptor(descriptor: Str) -> Self { + Self { inner: Arc::new(Inner { descriptor: Mutex::new(Some(descriptor.to_string())), ..Inner::default() }) } + } + + /// Obtain internal multiplexer (MPMC channel for [`RpcCtlOp`] operations) + pub fn multiplexer(&self) -> &Multiplexer { + &self.inner.multiplexer + } + + /// Signal open to all listeners (async) + pub async fn signal_open(&self) -> RpcResult<()> { + Ok(self.inner.multiplexer.broadcast(RpcCtlOp::Open).await?) + } + + /// Signal close to all listeners (async) + pub async fn signal_close(&self) -> RpcResult<()> { + Ok(self.inner.multiplexer.broadcast(RpcCtlOp::Close).await?) + } + + /// Try signal open to all listeners (sync) + pub fn try_signal_open(&self) -> RpcResult<()> { + Ok(self.inner.multiplexer.try_broadcast(RpcCtlOp::Open)?) + } + + /// Try signal close to all listeners (sync) + pub fn try_signal_close(&self) -> RpcResult<()> { + Ok(self.inner.multiplexer.try_broadcast(RpcCtlOp::Close)?) + } + + /// Set the connection descriptor (URL, peer address, etc.) + pub fn set_descriptor(&self, descriptor: Option) { + *self.inner.descriptor.lock().unwrap() = descriptor; + } + + /// Get the connection descriptor (URL, peer address, etc.) + pub fn descriptor(&self) -> Option { + self.inner.descriptor.lock().unwrap().clone() + } +} diff --git a/rpc/core/src/api/mod.rs b/rpc/core/src/api/mod.rs index 2a74e294d..6bc968b46 100644 --- a/rpc/core/src/api/mod.rs +++ b/rpc/core/src/api/mod.rs @@ -1,3 +1,4 @@ +pub mod ctl; pub mod notifications; pub mod ops; pub mod rpc; diff --git a/rpc/core/src/error.rs b/rpc/core/src/error.rs index 7f39735a5..723beeb2f 100644 --- a/rpc/core/src/error.rs +++ b/rpc/core/src/error.rs @@ -2,8 +2,9 @@ use kaspa_consensus_core::tx::TransactionId; use kaspa_utils::networking::IpAddress; use std::{net::AddrParseError, num::TryFromIntError}; use thiserror::Error; +use workflow_core::channel::ChannelError; -use crate::{RpcHash, RpcTransactionId}; +use crate::{api::ctl::RpcCtlOp, RpcHash, RpcTransactionId}; #[derive(Clone, Debug, Error)] pub enum RpcError { @@ -102,6 +103,9 @@ pub enum RpcError { #[error("{0}")] General(String), + + #[error("RpcCtl dispatch error")] + RpcCtlDispatchError, } impl From for RpcError { @@ -116,4 +120,10 @@ impl From<&str> for RpcError { } } +impl From> for RpcError { + fn from(_: ChannelError) -> Self { + RpcError::RpcCtlDispatchError + } +} + pub type RpcResult = std::result::Result; diff --git a/rpc/macros/src/wrpc/client.rs b/rpc/macros/src/wrpc/client.rs index 8c2f2cc20..f33fe57f3 100644 --- a/rpc/macros/src/wrpc/client.rs +++ b/rpc/macros/src/wrpc/client.rs @@ -67,7 +67,7 @@ impl ToTokens for RpcTable { let __self = self; //let request = request; let __ret: RpcResult<#response_type> = { - let resp: ClientResult<#response_type> = __self.inner.rpc.call(#rpc_api_ops::#handler, request).await; + let resp: ClientResult<#response_type> = __self.inner.rpc_client.call(#rpc_api_ops::#handler, request).await; Ok(resp.map_err(|e| kaspa_rpc_core::error::RpcError::RpcSubsystem(e.to_string()))?) }; #[allow(unreachable_code)] diff --git a/rpc/wrpc/client/src/client.rs b/rpc/wrpc/client/src/client.rs index 3375311fc..1c46e7928 100644 --- a/rpc/wrpc/client/src/client.rs +++ b/rpc/wrpc/client/src/client.rs @@ -2,12 +2,15 @@ use crate::error::Error; use crate::imports::*; use crate::parse::parse_host; use kaspa_consensus_core::network::NetworkType; -use kaspa_rpc_core::notify::collector::{RpcCoreCollector, RpcCoreConverter}; +use kaspa_rpc_core::{ + api::ctl::RpcCtl, + notify::collector::{RpcCoreCollector, RpcCoreConverter}, +}; pub use kaspa_rpc_macros::build_wrpc_client_interface; use std::fmt::Debug; use workflow_core::{channel::Multiplexer, runtime as application_runtime}; use workflow_dom::utils::window; -use workflow_rpc::client::Ctl; +use workflow_rpc::client::Ctl as WrpcCtl; pub use workflow_rpc::client::{ConnectOptions, ConnectResult, ConnectStrategy, WebSocketConfig}; // /// [`NotificationMode`] controls notification delivery process @@ -25,19 +28,22 @@ pub use workflow_rpc::client::{ConnectOptions, ConnectResult, ConnectStrategy, W #[derive(Clone)] struct Inner { - rpc: Arc>, + rpc_client: Arc>, notification_channel: Channel, encoding: Encoding, - ctl_multiplexer: Multiplexer, + wrpc_ctl_multiplexer: Multiplexer, + rpc_ctl: RpcCtl, background_services_running: Arc, + service_ctl: DuplexChannel<()>, } impl Inner { pub fn new(encoding: Encoding, url: &str) -> Result { // log_trace!("Kaspa wRPC::{encoding} connecting to: {url}"); - let ctl_multiplexer = Multiplexer::::new(); + let rpc_ctl = RpcCtl::with_descriptor(url); + let wrpc_ctl_multiplexer = Multiplexer::::new(); - let options = RpcClientOptions { url, ctl_multiplexer: Some(ctl_multiplexer.clone()), ..RpcClientOptions::default() }; + let options = RpcClientOptions { url, ctl_multiplexer: Some(wrpc_ctl_multiplexer.clone()), ..RpcClientOptions::default() }; let notification_channel = Channel::unbounded(); @@ -88,10 +94,12 @@ impl Inner { let rpc = Arc::new(RpcClient::new_with_encoding(encoding, interface.into(), options, Some(ws_config))?); let client = Self { - rpc, + rpc_client: rpc, notification_channel, encoding, - ctl_multiplexer, + wrpc_ctl_multiplexer, + rpc_ctl, + service_ctl: DuplexChannel::unbounded(), background_services_running: Arc::new(AtomicBool::new(false)), }; Ok(client) @@ -107,13 +115,14 @@ impl Inner { /// Start sending notifications of some type to the client. async fn start_notify_to_client(&self, scope: Scope) -> RpcResult<()> { - let _response: SubscribeResponse = self.rpc.call(RpcApiOps::Subscribe, scope).await.map_err(|err| err.to_string())?; + let _response: SubscribeResponse = self.rpc_client.call(RpcApiOps::Subscribe, scope).await.map_err(|err| err.to_string())?; Ok(()) } /// Stop sending notifications of some type to the client. async fn stop_notify_to_client(&self, scope: Scope) -> RpcResult<()> { - let _response: UnsubscribeResponse = self.rpc.call(RpcApiOps::Unsubscribe, scope).await.map_err(|err| err.to_string())?; + let _response: UnsubscribeResponse = + self.rpc_client.call(RpcApiOps::Unsubscribe, scope).await.map_err(|err| err.to_string())?; Ok(()) } } @@ -186,20 +195,29 @@ impl KaspaRpcClient { } pub fn url(&self) -> String { - self.inner.rpc.url() + self.inner.rpc_client.url() } pub fn set_url(&self, url: &str) -> Result<()> { - self.inner.rpc.set_url(url)?; + self.inner.rpc_ctl.set_descriptor(Some(url.to_string())); + self.inner.rpc_client.set_url(url)?; Ok(()) } pub fn is_open(&self) -> bool { - self.inner.rpc.is_open() + self.inner.rpc_client.is_open() } - pub fn rpc(&self) -> &Arc> { - &self.inner.rpc + pub fn rpc_client(&self) -> &Arc> { + &self.inner.rpc_client + } + + pub fn rpc_api(self: &Arc) -> Arc { + self.clone() + } + + pub fn rpc_ctl(&self) -> &RpcCtl { + &self.inner.rpc_ctl } /// Starts RPC services. @@ -211,6 +229,8 @@ impl KaspaRpcClient { } NotificationMode::Direct => {} } + + self.start_rpc_ctl_service().await?; } Ok(()) } @@ -227,6 +247,8 @@ impl KaspaRpcClient { // self.notification_ctl.signal(()).await?; } } + + self.stop_rpc_ctl_service().await?; } Ok(()) } @@ -236,12 +258,15 @@ impl KaspaRpcClient { /// this function will block until the first successful /// connection. pub async fn connect(&self, options: ConnectOptions) -> ConnectResult { + if let Some(url) = options.url.as_ref() { + self.inner.rpc_ctl.set_descriptor(Some(url.clone())); + } self.start().await?; - Ok(self.inner.rpc.connect(options).await?) + Ok(self.inner.rpc_client.connect(options).await?) } pub async fn disconnect(&self) -> Result<()> { - self.inner.rpc.shutdown().await?; + self.inner.rpc_client.shutdown().await?; self.stop().await?; Ok(()) } @@ -249,7 +274,7 @@ impl KaspaRpcClient { /// Stop and shutdown RPC disconnecting existing connections /// and stopping reconnection process. pub async fn shutdown(&self) -> Result<()> { - Ok(self.inner.rpc.shutdown().await?) + Ok(self.inner.rpc_client.shutdown().await?) } /// A helper function that is not `async`, allowing connection @@ -257,7 +282,7 @@ impl KaspaRpcClient { pub fn connect_as_task(&self) -> Result<()> { let self_ = self.clone(); workflow_core::task::spawn(async move { - self_.inner.rpc.connect(ConnectOptions::default()).await.ok(); + self_.inner.rpc_client.connect(ConnectOptions::default()).await.ok(); }); Ok(()) } @@ -274,8 +299,12 @@ impl KaspaRpcClient { self.notification_mode } - pub fn ctl_multiplexer(&self) -> &Multiplexer { - &self.inner.ctl_multiplexer + // pub fn ctl_multiplexer(&self) -> &Multiplexer { + // &self.inner.ctl_multiplexer + // } + + pub fn ctl(&self) -> &RpcCtl { + &self.inner.rpc_ctl } pub fn parse_url_with_network_type(&self, url: Option, network_type: NetworkType) -> Result> { @@ -310,6 +339,44 @@ impl KaspaRpcClient { Ok(Some(format!("{}://{}:{}", scheme, parse_output.host.to_string(), port))) } + + async fn start_rpc_ctl_service(&self) -> Result<()> { + let inner = self.inner.clone(); + let wrpc_ctl_channel = inner.wrpc_ctl_multiplexer.channel(); + spawn(async move { + loop { + select! { + _ = inner.service_ctl.request.receiver.recv().fuse() => { + break; + }, + msg = wrpc_ctl_channel.receiver.recv().fuse() => { + if let Ok(msg) = msg { + match msg { + WrpcCtl::Open => { + // inner.rpc_ctl.set_descriptor(Some(inner.rpc.url())); + inner.rpc_ctl.signal_open().await.expect("(KaspaRpcClient) rpc_ctl.signal_open() error"); + } + WrpcCtl::Close => { + inner.rpc_ctl.signal_close().await.expect("(KaspaRpcClient) rpc_ctl.signal_close() error"); + // inner.rpc_ctl.set_descriptor(None); + } + } + } else { + log_error!("wrpc_ctl_channel.receiver.recv() error"); + } + } + } + } + inner.service_ctl.response.send(()).await.unwrap(); + }); + + Ok(()) + } + + async fn stop_rpc_ctl_service(&self) -> Result<()> { + self.inner.service_ctl.signal(()).await?; + Ok(()) + } } #[async_trait] diff --git a/wallet/core/src/events.rs b/wallet/core/src/events.rs index 25dec9609..eda59585e 100644 --- a/wallet/core/src/events.rs +++ b/wallet/core/src/events.rs @@ -54,20 +54,20 @@ pub enum Events { network_id: NetworkId, /// Kaspa node RPC url on which connection /// has been established - url: String, + url: Option, }, /// RPC disconnection Disconnect { #[serde(rename = "networkId")] network_id: NetworkId, - url: String, + url: Option, }, /// A special event emitted if the connected node /// does not have UTXO index enabled UtxoIndexNotEnabled { /// Kaspa node RPC url on which connection /// has been established - url: String, + url: Option, }, /// [`SyncState`] notification posted /// when the node sync state changes @@ -96,7 +96,7 @@ pub enum Events { is_synced: bool, /// Kaspa node RPC url on which connection /// has been established - url: String, + url: Option, }, /// Successful start of [`UtxoProcessor`](crate::utxo::processor::UtxoProcessor). diff --git a/wallet/core/src/runtime/account/mod.rs b/wallet/core/src/runtime/account/mod.rs index 0f06d2d96..2b494a6c9 100644 --- a/wallet/core/src/runtime/account/mod.rs +++ b/wallet/core/src/runtime/account/mod.rs @@ -313,7 +313,7 @@ pub trait Account: AnySync + Send + Sync + 'static { transaction.try_sign()?; transaction.log().await?; - let id = transaction.try_submit(self.wallet().rpc()).await?; + let id = transaction.try_submit(self.wallet().rpc_api()).await?; ids.push(id); yield_executor().await; } @@ -347,7 +347,7 @@ pub trait Account: AnySync + Send + Sync + 'static { transaction.try_sign()?; transaction.log().await?; - let id = transaction.try_submit(self.wallet().rpc()).await?; + let id = transaction.try_submit(self.wallet().rpc_api()).await?; ids.push(id); yield_executor().await; } @@ -426,7 +426,7 @@ pub trait DerivationCapableAccount: Account { // ---- let addresses = keypairs.iter().map(|(address, _)| address.clone()).collect::>(); - let utxos = self.wallet().rpc().get_utxos_by_addresses(addresses).await?; + let utxos = self.wallet().rpc_api().get_utxos_by_addresses(addresses).await?; let balance = utxos.iter().map(|utxo| utxo.utxo_entry.amount).sum::(); if balance > 0 { aggregate_balance += balance; @@ -453,7 +453,7 @@ pub trait DerivationCapableAccount: Account { let mut stream = generator.stream(); while let Some(transaction) = stream.try_next().await? { transaction.try_sign()?; - let id = transaction.try_submit(self.wallet().rpc()).await?; + let id = transaction.try_submit(self.wallet().rpc_api()).await?; if let Some(notifier) = notifier.as_ref() { notifier(index, balance, Some(id)); } diff --git a/wallet/core/src/runtime/wallet.rs b/wallet/core/src/runtime/wallet.rs index b0197c77e..98926af4b 100644 --- a/wallet/core/src/runtime/wallet.rs +++ b/wallet/core/src/runtime/wallet.rs @@ -18,6 +18,7 @@ use kaspa_notify::{ listener::ListenerId, scope::{Scope, VirtualDaaScoreChangedScope}, }; +use kaspa_rpc_core::api::ctl::RpcCtl; use kaspa_rpc_core::notify::mode::NotificationMode; use kaspa_wrpc_client::{KaspaRpcClient, WrpcEncoding}; use std::sync::Arc; @@ -101,7 +102,8 @@ pub struct Inner { store: Arc, settings: SettingsStore, utxo_processor: Arc, - rpc: Arc, + rpc_api: Arc, + rpc_ctl: RpcCtl, multiplexer: Multiplexer>, } @@ -124,19 +126,31 @@ impl Wallet { Wallet::try_with_rpc(None, storage, network_id) } - pub fn try_with_rpc(rpc: Option>, store: Arc, network_id: Option) -> Result { - let rpc: Arc = if let Some(rpc) = rpc { - rpc + pub fn try_with_rpc( + rpc: Option<(Arc, RpcCtl)>, + store: Arc, + network_id: Option, + ) -> Result { + let (rpc_api, rpc_ctl) = if let Some((rpc_api, rpc_ctl)) = rpc { + (rpc_api, rpc_ctl) } else { - Arc::new(KaspaRpcClient::new_with_args(WrpcEncoding::Borsh, NotificationMode::MultiListeners, "wrpc://127.0.0.1:17110")?) + let rpc_client = Arc::new(KaspaRpcClient::new_with_args( + WrpcEncoding::Borsh, + NotificationMode::MultiListeners, + "wrpc://127.0.0.1:17110", + )?); + let rpc_ctl = rpc_client.ctl().clone(); + let rpc_api: Arc = rpc_client; + (rpc_api, rpc_ctl) }; let multiplexer = Multiplexer::>::new(); - let utxo_processor = Arc::new(UtxoProcessor::new(&rpc, network_id, Some(multiplexer.clone()))); + let utxo_processor = Arc::new(UtxoProcessor::new(&rpc_api, &rpc_ctl, network_id, Some(multiplexer.clone()))); let wallet = Wallet { inner: Arc::new(Inner { - rpc, + rpc_api, + rpc_ctl, multiplexer, store, active_accounts: ActiveAccountMap::default(), @@ -285,12 +299,16 @@ impl Wallet { Ok(self.get_prv_key_info(account).await?.map(|info| info.is_encrypted())) } - pub fn rpc_client(&self) -> Arc { - self.rpc().clone().downcast_arc::().expect("unable to downcast DynRpcApi to KaspaRpcClient") + pub fn wrpc_client(&self) -> Option> { + self.rpc_api().clone().downcast_arc::().ok() } - pub fn rpc(&self) -> &Arc { - &self.inner.rpc + pub fn rpc_api(&self) -> &Arc { + &self.inner.rpc_api + } + + pub fn rpc_ctl(&self) -> &RpcCtl { + &self.inner.rpc_ctl } pub fn multiplexer(&self) -> &Multiplexer> { @@ -315,7 +333,9 @@ impl Wallet { } if let Some(url) = settings.get::(WalletSettings::Server) { - self.rpc_client().set_url(url.as_str()).unwrap_or_else(|_| log_error!("Unable to set rpc url: `{}`", url)); + if let Some(wrpc_client) = self.wrpc_client() { + wrpc_client.set_url(url.as_str()).unwrap_or_else(|_| log_error!("Unable to set rpc url: `{}`", url)); + } } Ok(()) @@ -328,7 +348,9 @@ impl Wallet { self.start_task().await?; self.utxo_processor().start().await?; // rpc services (notifier) - self.rpc_client().start().await?; + if let Some(rpc_client) = self.wrpc_client() { + rpc_client.start().await?; + } Ok(()) } @@ -337,8 +359,10 @@ impl Wallet { pub async fn stop(&self) -> Result<()> { self.utxo_processor().stop().await?; self.stop_task().await?; - self.rpc_client().stop().await?; - self.rpc_client().disconnect().await?; + if let Some(rpc_client) = self.wrpc_client() { + rpc_client.stop().await?; + rpc_client.disconnect().await?; + } Ok(()) } @@ -347,22 +371,22 @@ impl Wallet { } pub async fn get_info(&self) -> Result { - let v = self.rpc().get_info().await?; + let v = self.rpc_api().get_info().await?; Ok(format!("{v:#?}").replace('\n', "\r\n")) } pub async fn subscribe_daa_score(&self) -> Result<()> { - self.rpc().start_notify(self.listener_id(), Scope::VirtualDaaScoreChanged(VirtualDaaScoreChangedScope {})).await?; + self.rpc_api().start_notify(self.listener_id(), Scope::VirtualDaaScoreChanged(VirtualDaaScoreChangedScope {})).await?; Ok(()) } pub async fn unsubscribe_daa_score(&self) -> Result<()> { - self.rpc().stop_notify(self.listener_id(), Scope::VirtualDaaScoreChanged(VirtualDaaScoreChangedScope {})).await?; + self.rpc_api().stop_notify(self.listener_id(), Scope::VirtualDaaScoreChanged(VirtualDaaScoreChangedScope {})).await?; Ok(()) } pub async fn ping(&self) -> bool { - self.rpc().ping().await.is_ok() + self.rpc_api().ping().await.is_ok() } pub async fn broadcast(&self) -> Result<()> { @@ -385,13 +409,17 @@ impl Wallet { Ok(self.network_id()?.into()) } - pub fn default_port(&self) -> Result { + pub fn default_port(&self) -> Result> { let network_type = self.network_id()?; - let port = match self.rpc_client().encoding() { - WrpcEncoding::Borsh => network_type.default_borsh_rpc_port(), - WrpcEncoding::SerdeJson => network_type.default_json_rpc_port(), - }; - Ok(port) + if let Some(wrpc_client) = self.wrpc_client() { + let port = match wrpc_client.encoding() { + WrpcEncoding::Borsh => network_type.default_borsh_rpc_port(), + WrpcEncoding::SerdeJson => network_type.default_json_rpc_port(), + }; + Ok(Some(port)) + } else { + Ok(None) + } } // pub async fn create_private_key_impl(self: &Arc, wallet_secret: Secret, payment_secret: Option, save : ) -> Result { @@ -770,7 +798,6 @@ mod test { use kaspa_bip32::{ChildNumber, ExtendedPrivateKey, SecretKey}; use kaspa_consensus_core::subnets::SUBNETWORK_ID_NATIVE; use kaspa_consensus_wasm::{sign_transaction, SignableTransaction, Transaction, TransactionInput, TransactionOutput}; - use kaspa_rpc_core::api::rpc::RpcApi; use kaspa_txscript::pay_to_address_script; use workflow_rpc::client::ConnectOptions; @@ -804,16 +831,16 @@ mod test { // let utxo_db_ctx = wallet.utxo_db_core().ctx(self) // wallet.load_accounts(stored_accounts); - let rpc = wallet.rpc(); + let rpc_api = wallet.rpc_api(); // let utxo_processor = UtxoProcessor::new(rpc, None); let utxo_processor = wallet.utxo_processor(); - let rpc_client = wallet.rpc_client(); + let wrpc_client = wallet.wrpc_client().expect("Unable to obtain wRPC client"); - let info = rpc_client.get_block_dag_info().await?; + let info = rpc_api.get_block_dag_info().await?; let current_daa_score = info.virtual_daa_score; - let _connect_result = rpc_client.connect(ConnectOptions::fallback()).await; + let _connect_result = wrpc_client.connect(ConnectOptions::fallback()).await; //println!("connect_result: {_connect_result:?}"); let _result = wallet.start().await; @@ -824,7 +851,8 @@ mod test { let address = Address::try_from("kaspatest:qz7ulu4c25dh7fzec9zjyrmlhnkzrg4wmf89q7gzr3gfrsj3uz6xjceef60sd")?; let utxo_context = - self::create_utxos_context_with_addresses(rpc.clone(), vec![address.clone()], current_daa_score, utxo_processor).await?; + self::create_utxos_context_with_addresses(rpc_api.clone(), vec![address.clone()], current_daa_score, utxo_processor) + .await?; let utxo_set_balance = utxo_context.calculate_balance().await; println!("get_utxos_by_addresses: {utxo_set_balance:?}"); @@ -903,18 +931,18 @@ mod test { //println!("mtx: {mtx:?}"); let utxo_context = - self::create_utxos_context_with_addresses(rpc.clone(), vec![to_address.clone()], current_daa_score, utxo_processor) + self::create_utxos_context_with_addresses(rpc_api.clone(), vec![to_address.clone()], current_daa_score, utxo_processor) .await?; let to_balance = utxo_context.calculate_balance().await; println!("to address balance before tx submit: {to_balance:?}"); - let result = rpc.submit_transaction(mtx.into(), false).await?; + let result = rpc_api.submit_transaction(mtx.into(), false).await?; println!("tx submit result, {:?}", result); println!("sleep for 5s..."); sleep(time::Duration::from_millis(5000)); let utxo_context = - self::create_utxos_context_with_addresses(rpc.clone(), vec![to_address.clone()], current_daa_score, utxo_processor) + self::create_utxos_context_with_addresses(rpc_api.clone(), vec![to_address.clone()], current_daa_score, utxo_processor) .await?; let to_balance = utxo_context.calculate_balance().await; println!("to address balance after tx submit: {to_balance:?}"); diff --git a/wallet/core/src/utxo/context.rs b/wallet/core/src/utxo/context.rs index f4429ac55..fd834b257 100644 --- a/wallet/core/src/utxo/context.rs +++ b/wallet/core/src/utxo/context.rs @@ -515,7 +515,7 @@ impl UtxoContext { pub async fn scan_and_register_addresses(&self, addresses: Vec
, current_daa_score: Option) -> Result<()> { self.register_addresses(&addresses).await?; - let resp = self.processor().rpc().get_utxos_by_addresses(addresses).await?; + let resp = self.processor().rpc_api().get_utxos_by_addresses(addresses).await?; let refs: Vec = resp.into_iter().map(UtxoEntryReference::from).collect(); let current_daa_score = current_daa_score.unwrap_or_else(|| { self.processor() diff --git a/wallet/core/src/utxo/processor.rs b/wallet/core/src/utxo/processor.rs index af4edd4f0..252a2c78d 100644 --- a/wallet/core/src/utxo/processor.rs +++ b/wallet/core/src/utxo/processor.rs @@ -3,11 +3,13 @@ use kaspa_notify::{ listener::ListenerId, scope::{Scope, UtxosChangedScope, VirtualDaaScoreChangedScope}, }; -use kaspa_rpc_core::message::UtxosChangedNotification; +use kaspa_rpc_core::{ + api::ctl::{RpcCtl, RpcCtlOp}, + message::UtxosChangedNotification, +}; use kaspa_wrpc_client::KaspaRpcClient; use workflow_core::channel::{Channel, DuplexChannel}; use workflow_core::task::spawn; -use workflow_rpc::client::Ctl; use crate::imports::*; use crate::result::Result; @@ -31,7 +33,8 @@ pub struct Inner { // --- current_daa_score: Arc, network_id: Arc>>, - rpc: Arc, + rpc_api: Arc, + rpc_ctl: RpcCtl, is_connected: AtomicBool, listener_id: Mutex>, task_ctl: DuplexChannel, @@ -41,14 +44,15 @@ pub struct Inner { } impl Inner { - pub fn new(rpc: &Arc, network_id: Option, multiplexer: Multiplexer>) -> Self { + pub fn new(rpc: &Arc, rpc_ctl: &RpcCtl, network_id: Option, multiplexer: Multiplexer>) -> Self { Self { pending: DashMap::new(), address_to_utxo_context_map: DashMap::new(), recoverable_contexts: DashSet::new(), current_daa_score: Arc::new(AtomicU64::new(0)), network_id: Arc::new(Mutex::new(network_id)), - rpc: rpc.clone(), + rpc_api: rpc.clone(), + rpc_ctl: rpc_ctl.clone(), is_connected: AtomicBool::new(false), listener_id: Mutex::new(None), task_ctl: DuplexChannel::oneshot(), @@ -65,17 +69,30 @@ pub struct UtxoProcessor { } impl UtxoProcessor { - pub fn new(rpc: &Arc, network_id: Option, multiplexer: Option>>) -> Self { + pub fn new( + rpc: &Arc, + rpc_ctl: &RpcCtl, + network_id: Option, + multiplexer: Option>>, + ) -> Self { let multiplexer = multiplexer.unwrap_or_else(Multiplexer::new); - UtxoProcessor { inner: Arc::new(Inner::new(rpc, network_id, multiplexer)) } + UtxoProcessor { inner: Arc::new(Inner::new(rpc, rpc_ctl, network_id, multiplexer)) } + } + + pub fn rpc_api(&self) -> &Arc { + &self.inner.rpc_api } - pub fn rpc(&self) -> &Arc { - &self.inner.rpc + pub fn rpc_ctl(&self) -> &RpcCtl { + &self.inner.rpc_ctl } - pub fn rpc_client(&self) -> Arc { - self.rpc().clone().downcast_arc::().expect("unable to downcast DynRpcApi to KaspaRpcClient") + pub fn rpc_url(&self) -> Option { + self.rpc_ctl().descriptor() + } + + pub fn rpc_client(&self) -> Option> { + self.rpc_api().clone().downcast_arc::().ok() } pub fn multiplexer(&self) -> &Multiplexer> { @@ -128,7 +145,7 @@ impl UtxoProcessor { if !addresses.is_empty() { let addresses = addresses.into_iter().map(|address| (*address).clone()).collect::>(); let utxos_changed_scope = UtxosChangedScope { addresses }; - self.rpc().start_notify(self.listener_id(), Scope::UtxosChanged(utxos_changed_scope)).await?; + self.rpc_api().start_notify(self.listener_id(), Scope::UtxosChanged(utxos_changed_scope)).await?; } else { log_info!("registering empty address list!"); } @@ -145,7 +162,7 @@ impl UtxoProcessor { if !addresses.is_empty() { let addresses = addresses.into_iter().map(|address| (*address).clone()).collect::>(); let utxos_changed_scope = UtxosChangedScope { addresses }; - self.rpc().stop_notify(self.listener_id(), Scope::UtxosChanged(utxos_changed_scope)).await?; + self.rpc_api().stop_notify(self.listener_id(), Scope::UtxosChanged(utxos_changed_scope)).await?; } else { log_info!("unregistering empty address list!"); } @@ -256,14 +273,14 @@ impl UtxoProcessor { pub async fn init_state_from_server(&self) -> Result<()> { - let kaspa_rpc_core::GetInfoResponse { is_synced, is_utxo_indexed: has_utxo_index, server_version, .. } = self.rpc().get_info().await?; + let kaspa_rpc_core::GetInfoResponse { is_synced, is_utxo_indexed: has_utxo_index, server_version, .. } = self.rpc_api().get_info().await?; if !has_utxo_index { - self.notify(Events::UtxoIndexNotEnabled { url: self.rpc_client().url().to_string() }).await?; + self.notify(Events::UtxoIndexNotEnabled { url: self.rpc_url() }).await?; return Err(Error::MissingUtxoIndex); } - let kaspa_rpc_core::GetBlockDagInfoResponse { virtual_daa_score, network: server_network_id, .. } = self.rpc().get_block_dag_info().await?; + let kaspa_rpc_core::GetBlockDagInfoResponse { virtual_daa_score, network: server_network_id, .. } = self.rpc_api().get_block_dag_info().await?; let network_id = self.network_id()?; if network_id != server_network_id { @@ -275,7 +292,7 @@ impl UtxoProcessor { log_trace!("Connected to kaspad: '{server_version}' on '{server_network_id}'; SYNC: {is_synced} DAA: {virtual_daa_score}"); self.sync_proc().track(is_synced).await?; - self.notify(Events::ServerStatus { server_version, is_synced, network_id, url: self.rpc_client().url().to_string() }).await?; + self.notify(Events::ServerStatus { server_version, is_synced, network_id, url: self.rpc_url() }).await?; Ok(()) } @@ -288,7 +305,7 @@ impl UtxoProcessor { self.rpc().get_server_info().await?; if !has_utxo_index { - self.notify(Events::UtxoIndexNotEnabled { url: self.rpc_client().url().to_string() }).await?; + self.notify(Events::UtxoIndexNotEnabled { url: self.rpc_url() }).await?; return Err(Error::MissingUtxoIndex); } @@ -302,7 +319,7 @@ impl UtxoProcessor { log_trace!("Connected to kaspad: '{server_version}' on '{server_network_id}'; SYNC: {is_synced} DAA: {virtual_daa_score}"); self.sync_proc().track(is_synced).await?; - self.notify(Events::ServerStatus { server_version, is_synced, network_id, url: self.rpc_client().url().to_string() }).await?; + self.notify(Events::ServerStatus { server_version, is_synced, network_id, url: self.rpc_url() }).await?; Ok(()) } @@ -320,8 +337,11 @@ impl UtxoProcessor { pub async fn handle_connect(&self) -> Result<()> { if let Err(err) = self.handle_connect_impl().await { + log_error!("UtxoProcessor: error while connecting to node: {err}"); self.notify(Events::UtxoProcError { message: err.to_string() }).await?; - self.rpc_client().disconnect().await?; + if let Some(client) = self.rpc_client() { + client.disconnect().await?; + } } Ok(()) } @@ -336,11 +356,11 @@ impl UtxoProcessor { async fn register_notification_listener(&self) -> Result<()> { let listener_id = self - .rpc() + .rpc_api() .register_new_listener(ChannelConnection::new(self.inner.notification_channel.sender.clone(), ChannelType::Persistent)); *self.inner.listener_id.lock().unwrap() = Some(listener_id); - self.rpc().start_notify(listener_id, Scope::VirtualDaaScoreChanged(VirtualDaaScoreChangedScope {})).await?; + self.rpc_api().start_notify(listener_id, Scope::VirtualDaaScoreChanged(VirtualDaaScoreChangedScope {})).await?; Ok(()) } @@ -349,7 +369,7 @@ impl UtxoProcessor { let listener_id = self.inner.listener_id.lock().unwrap().take(); if let Some(id) = listener_id { // we do not need this as we are unregister the entire listener here... - self.rpc().unregister_listener(id).await?; + self.rpc_api().unregister_listener(id).await?; } Ok(()) } @@ -380,14 +400,7 @@ impl UtxoProcessor { pub async fn start(&self) -> Result<()> { let this = self.clone(); - let rpc_ctl_channel = this - .rpc() - .clone() - .downcast_arc::() - .expect("unable to downcast DynRpcApi to KaspaRpcClient") - .ctl_multiplexer() - .channel(); - + let rpc_ctl_channel = this.inner.rpc_ctl.multiplexer().channel(); let task_ctl_receiver = self.inner.task_ctl.request.receiver.clone(); let task_ctl_sender = self.inner.task_ctl.response.sender.clone(); let notification_receiver = self.inner.notification_channel.receiver.clone(); @@ -402,17 +415,17 @@ impl UtxoProcessor { match msg { Ok(msg) => { match msg { - Ctl::Open => { + RpcCtlOp::Open => { this.inner.multiplexer.try_broadcast(Box::new(Events::Connect { network_id : this.network_id().expect("network id expected during connection"), - url : this.rpc_client().url().to_string() + url : this.rpc_url() })).unwrap_or_else(|err| log_error!("{err}")); this.handle_connect().await.unwrap_or_else(|err| log_error!("{err}")); }, - Ctl::Close => { + RpcCtlOp::Close => { this.inner.multiplexer.try_broadcast(Box::new(Events::Disconnect { network_id : this.network_id().expect("network id expected during connection"), - url : this.rpc_client().url().to_string() + url : this.rpc_url() })).unwrap_or_else(|err| log_error!("{err}")); this.handle_disconnect().await.unwrap_or_else(|err| log_error!("{err}")); } diff --git a/wallet/core/src/utxo/scan.rs b/wallet/core/src/utxo/scan.rs index 66536f356..030c59849 100644 --- a/wallet/core/src/utxo/scan.rs +++ b/wallet/core/src/utxo/scan.rs @@ -85,7 +85,7 @@ impl Scan { utxo_context.register_addresses(&addresses).await?; let ts = Instant::now(); - let resp = utxo_context.processor().rpc().get_utxos_by_addresses(addresses).await?; + let resp = utxo_context.processor().rpc_api().get_utxos_by_addresses(addresses).await?; let elapsed_msec = ts.elapsed().as_secs_f32(); if elapsed_msec > 1.0 { log_warning!("get_utxos_by_address() fetched {} entries in: {} msec", resp.len(), elapsed_msec); @@ -146,7 +146,7 @@ impl Scan { let address_vec = address_set.iter().cloned().collect::>(); utxo_context.register_addresses(&address_vec).await?; - let resp = utxo_context.processor().rpc().get_utxos_by_addresses(address_vec).await?; + let resp = utxo_context.processor().rpc_api().get_utxos_by_addresses(address_vec).await?; let refs: Vec = resp.into_iter().map(UtxoEntryReference::from).collect(); let balance: Balance = refs.iter().fold(Balance::default(), |mut balance, r| { diff --git a/wallet/core/src/wasm/utxo/processor.rs b/wallet/core/src/wasm/utxo/processor.rs index 42f31ba27..daf486092 100644 --- a/wallet/core/src/wasm/utxo/processor.rs +++ b/wallet/core/src/wasm/utxo/processor.rs @@ -26,8 +26,9 @@ impl UtxoProcessor { #[wasm_bindgen(constructor)] pub async fn ctor(js_value: JsValue) -> Result { let UtxoProcessorCreateArgs { rpc, network_id } = js_value.try_into()?; - let rpc_client: Arc = rpc.client().clone(); - let inner = native::UtxoProcessor::new(&rpc_client, Some(network_id), None); + let rpc_api: Arc = rpc.client().clone(); + let rpc_ctl = rpc.client().rpc_ctl(); + let inner = native::UtxoProcessor::new(&rpc_api, rpc_ctl, Some(network_id), None); let events = EventDispatcher::new(); inner.start().await?; diff --git a/wallet/core/src/wasm/wallet/wallet.rs b/wallet/core/src/wasm/wallet/wallet.rs index 66dbb63b9..1e306381c 100644 --- a/wallet/core/src/wasm/wallet/wallet.rs +++ b/wallet/core/src/wasm/wallet/wallet.rs @@ -35,7 +35,9 @@ impl Wallet { encoding.unwrap_or(WrpcEncoding::Borsh), None, )?; - let wallet = Arc::new(runtime::Wallet::try_with_rpc(Some(rpc.client().clone()), store, network_id)?); + let rpc_api: Arc = rpc.client().rpc_api().clone(); + let rpc_ctl = rpc.client().rpc_ctl().clone(); + let wallet = Arc::new(runtime::Wallet::try_with_rpc(Some((rpc_api, rpc_ctl)), store, network_id)?); let events = EventDispatcher::default(); Ok(Self { wallet, events, rpc })