Skip to content

Commit

Permalink
introduce RpcCtl + isolate wRPC from DynRpcApi
Browse files Browse the repository at this point in the history
  • Loading branch information
aspect committed Sep 9, 2023
1 parent b3366a7 commit 572081f
Show file tree
Hide file tree
Showing 19 changed files with 328 additions and 124 deletions.
12 changes: 6 additions & 6 deletions cli/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,12 +159,12 @@ impl KaspaCli {
self.wallet.is_connected()
}

pub fn rpc(&self) -> Arc<DynRpcApi> {
self.wallet.rpc().clone()
pub fn rpc_api(&self) -> Arc<DynRpcApi> {
self.wallet.rpc_api().clone()
}

pub fn rpc_client(&self) -> Arc<KaspaRpcClient> {
self.wallet.rpc_client().clone()
pub fn rpc_client(&self) -> Option<Arc<KaspaRpcClient>> {
self.wallet.wrpc_client().clone()
}

pub fn store(&self) -> Arc<dyn Interface> {
Expand Down Expand Up @@ -292,7 +292,7 @@ impl KaspaCli {
},
#[allow(unused_variables)]
Events::Disconnect{ url, network_id } => {
tprintln!(this, "Disconnected from {url}");
tprintln!(this, "Disconnected from {}",url.unwrap_or("N/A".to_string()));
this.term().refresh_prompt();
},
Events::UtxoIndexNotEnabled { .. } => {
Expand All @@ -309,7 +309,7 @@ impl KaspaCli {
..
} => {

tprintln!(this, "Connected to Kaspa node version {server_version} at {url}");
tprintln!(this, "Connected to Kaspa node version {server_version} at {}", url.unwrap_or("N/A".to_string()));

let is_open = this.wallet.is_open();

Expand Down
2 changes: 1 addition & 1 deletion cli/src/metrics/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ impl Handler for Metrics {
self.mute.store(mute, Ordering::Relaxed);
}

self.rpc.lock().unwrap().replace(ctx.wallet().rpc().clone());
self.rpc.lock().unwrap().replace(ctx.wallet().rpc_api().clone());

self.start_task(&ctx).await?;
Ok(())
Expand Down
14 changes: 9 additions & 5 deletions cli/src/modules/connect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,15 @@ pub struct Connect;
impl Connect {
async fn main(self: Arc<Self>, ctx: &Arc<dyn Context>, argv: Vec<String>, _cmd: &str) -> Result<()> {
let ctx = ctx.clone().downcast_arc::<KaspaCli>()?;
let url = argv.first().cloned().or_else(|| ctx.wallet().settings().get(WalletSettings::Server));
let network_type = ctx.wallet().network_id()?;
let url = ctx.wallet().rpc_client().parse_url_with_network_type(url, network_type.into()).map_err(|e| e.to_string())?;
let options = ConnectOptions { block_async_connect: true, strategy: ConnectStrategy::Fallback, url, timeout: None };
ctx.wallet().rpc_client().connect(options).await.map_err(|e| e.to_string())?;
if let Some(wrpc_client) = ctx.wallet().wrpc_client().as_ref() {
let url = argv.first().cloned().or_else(|| ctx.wallet().settings().get(WalletSettings::Server));
let network_type = ctx.wallet().network_id()?;
let url = wrpc_client.parse_url_with_network_type(url, network_type.into()).map_err(|e| e.to_string())?;
let options = ConnectOptions { block_async_connect: true, strategy: ConnectStrategy::Fallback, url, timeout: None };
wrpc_client.connect(options).await.map_err(|e| e.to_string())?;
} else {
terrorln!(ctx, "Unable to connect with non-wRPC client");
}
Ok(())
}
}
6 changes: 5 additions & 1 deletion cli/src/modules/disconnect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,11 @@ pub struct Disconnect;
impl Disconnect {
async fn main(self: Arc<Self>, ctx: &Arc<dyn Context>, _argv: Vec<String>, _cmd: &str) -> Result<()> {
let ctx = ctx.clone().downcast_arc::<KaspaCli>()?;
ctx.wallet().rpc_client().shutdown().await?;
if let Some(wrpc_client) = ctx.wallet().wrpc_client().as_ref() {
wrpc_client.shutdown().await?;
} else {
terrorln!(ctx, "Unable to disconnect from non-wRPC client");
}
Ok(())
}
}
8 changes: 4 additions & 4 deletions cli/src/modules/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,15 +106,15 @@ impl Node {
tprintln!(ctx, "starting kaspa node... {}", style("(use 'node mute' to mute logging)").dim());
}

let wrpc_client = ctx.wallet().wrpc_client().ok_or(Error::custom("Unable to start node with non-wRPC client"))?;

kaspad.configure(self.create_config(&ctx).await?).await?;
kaspad.start().await?;

// temporary setup for autoconnect
let url = ctx.wallet().settings().get(WalletSettings::Server);
let network_type = ctx.wallet().network_id()?;
if let Some(url) =
ctx.wallet().rpc_client().parse_url_with_network_type(url, network_type.into()).map_err(|e| e.to_string())?
{
if let Some(url) = wrpc_client.parse_url_with_network_type(url, network_type.into()).map_err(|e| e.to_string())? {
// log_info!("connecting to url: {}", url);
if url.contains("127.0.0.1") || url.contains("localhost") {
spawn(async move {
Expand All @@ -126,7 +126,7 @@ impl Node {
};
for _ in 0..5 {
sleep(Duration::from_millis(1000)).await;
if ctx.wallet().rpc_client().connect(options.clone()).await.is_ok() {
if wrpc_client.connect(options.clone()).await.is_ok() {
break;
}
}
Expand Down
2 changes: 1 addition & 1 deletion cli/src/modules/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ impl Rpc {

async fn main(self: Arc<Self>, ctx: &Arc<dyn Context>, mut argv: Vec<String>, cmd: &str) -> Result<()> {
let ctx = ctx.clone().downcast_arc::<KaspaCli>()?;
let rpc = ctx.wallet().rpc().clone();
let rpc = ctx.wallet().rpc_api().clone();
// tprintln!(ctx, "{response}");

if argv.is_empty() {
Expand Down
74 changes: 74 additions & 0 deletions rpc/core/src/api/ctl.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
use crate::error::RpcResult;
use std::sync::{Arc, Mutex};
use workflow_core::channel::Multiplexer;

/// RPC channel control operations
#[derive(Debug, Clone)]
pub enum RpcCtlOp {
/// RpcApi channel open (connected)
Open,
/// RpcApi channel close (disconnected)
Close,
}

#[derive(Default)]
struct Inner {
// MPMC channel for [`RpcCtlOp`] operations.
multiplexer: Multiplexer<RpcCtlOp>,
// Optional Connection descriptor such as a connection URL.
descriptor: Mutex<Option<String>>,
}

/// RPC channel control helper. This is a companion
/// struct to [`RpcApi`](crate::api::RpcApi) that
/// provides signaling for RPC open/close events as
/// well as an optional connection descriptor (URL).
#[derive(Default, Clone)]
pub struct RpcCtl {
inner: Arc<Inner>,
}

impl RpcCtl {
pub fn new() -> Self {
Self { inner: Arc::new(Inner::default()) }
}

pub fn with_descriptor<Str: ToString>(descriptor: Str) -> Self {
Self { inner: Arc::new(Inner { descriptor: Mutex::new(Some(descriptor.to_string())), ..Inner::default() }) }
}

/// Obtain internal multiplexer (MPMC channel for [`RpcCtlOp`] operations)
pub fn multiplexer(&self) -> &Multiplexer<RpcCtlOp> {
&self.inner.multiplexer
}

/// Signal open to all listeners (async)
pub async fn signal_open(&self) -> RpcResult<()> {
Ok(self.inner.multiplexer.broadcast(RpcCtlOp::Open).await?)
}

/// Signal close to all listeners (async)
pub async fn signal_close(&self) -> RpcResult<()> {
Ok(self.inner.multiplexer.broadcast(RpcCtlOp::Close).await?)
}

/// Try signal open to all listeners (sync)
pub fn try_signal_open(&self) -> RpcResult<()> {
Ok(self.inner.multiplexer.try_broadcast(RpcCtlOp::Open)?)
}

/// Try signal close to all listeners (sync)
pub fn try_signal_close(&self) -> RpcResult<()> {
Ok(self.inner.multiplexer.try_broadcast(RpcCtlOp::Close)?)
}

/// Set the connection descriptor (URL, peer address, etc.)
pub fn set_descriptor(&self, descriptor: Option<String>) {
*self.inner.descriptor.lock().unwrap() = descriptor;
}

/// Get the connection descriptor (URL, peer address, etc.)
pub fn descriptor(&self) -> Option<String> {
self.inner.descriptor.lock().unwrap().clone()
}
}
1 change: 1 addition & 0 deletions rpc/core/src/api/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pub mod ctl;
pub mod notifications;
pub mod ops;
pub mod rpc;
12 changes: 11 additions & 1 deletion rpc/core/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@ use kaspa_consensus_core::tx::TransactionId;
use kaspa_utils::networking::IpAddress;
use std::{net::AddrParseError, num::TryFromIntError};
use thiserror::Error;
use workflow_core::channel::ChannelError;

use crate::{RpcHash, RpcTransactionId};
use crate::{api::ctl::RpcCtlOp, RpcHash, RpcTransactionId};

#[derive(Clone, Debug, Error)]
pub enum RpcError {
Expand Down Expand Up @@ -102,6 +103,9 @@ pub enum RpcError {

#[error("{0}")]
General(String),

#[error("RpcCtl dispatch error")]
RpcCtlDispatchError,
}

impl From<String> for RpcError {
Expand All @@ -116,4 +120,10 @@ impl From<&str> for RpcError {
}
}

impl From<ChannelError<RpcCtlOp>> for RpcError {
fn from(_: ChannelError<RpcCtlOp>) -> Self {
RpcError::RpcCtlDispatchError
}
}

pub type RpcResult<T> = std::result::Result<T, crate::RpcError>;
2 changes: 1 addition & 1 deletion rpc/macros/src/wrpc/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ impl ToTokens for RpcTable {
let __self = self;
//let request = request;
let __ret: RpcResult<#response_type> = {
let resp: ClientResult<#response_type> = __self.inner.rpc.call(#rpc_api_ops::#handler, request).await;
let resp: ClientResult<#response_type> = __self.inner.rpc_client.call(#rpc_api_ops::#handler, request).await;
Ok(resp.map_err(|e| kaspa_rpc_core::error::RpcError::RpcSubsystem(e.to_string()))?)
};
#[allow(unreachable_code)]
Expand Down
Loading

0 comments on commit 572081f

Please sign in to comment.