Skip to content

Commit

Permalink
de-couple wRPC from the runtiem wallet making RPC bindings generic
Browse files Browse the repository at this point in the history
  • Loading branch information
aspect committed Sep 9, 2023
1 parent 2caf4df commit dea9062
Show file tree
Hide file tree
Showing 13 changed files with 146 additions and 82 deletions.
2 changes: 1 addition & 1 deletion cli/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ use crate::modules::node::Node;
use crate::notifier::{Notification, Notifier};
use crate::result::Result;
use kaspa_daemon::{DaemonEvent, DaemonKind, Daemons};
use kaspa_wallet_core::rpc::DynRpcApi;
use kaspa_wallet_core::runtime::{Account, BalanceStrings};
use kaspa_wallet_core::storage::{IdT, PrvKeyDataInfo};
use kaspa_wallet_core::DynRpcApi;
use kaspa_wallet_core::{runtime::Wallet, Events};
use kaspa_wrpc_client::KaspaRpcClient;
use workflow_core::channel::*;
Expand Down
39 changes: 27 additions & 12 deletions kaspad/src/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,15 @@ fn get_home_dir() -> PathBuf {
return dirs::home_dir().unwrap();
}

fn get_app_dir() -> PathBuf {
/// Get the default application directory.
pub fn get_app_dir() -> PathBuf {
#[cfg(target_os = "windows")]
return get_home_dir().join("rusty-kaspa");
#[cfg(not(target_os = "windows"))]
return get_home_dir().join(".rusty-kaspa");
}

fn validate_args(args: &Args) -> ConfigResult<()> {
pub fn validate_args(args: &Args) -> ConfigResult<()> {
#[cfg(feature = "devnet-prealloc")]
{
if args.num_prealloc_utxos.is_some() && !(args.devnet || args.simnet) {
Expand Down Expand Up @@ -92,12 +93,16 @@ fn get_user_approval_or_exit(message: &str, approve: bool) {
}
}

/// Runtime configuration struct for the application.
#[derive(Default)]
pub struct Runtime {
log_dir: Option<String>,
}

fn get_app_dir_from_args(args: &Args) -> PathBuf {
/// Get the application directory from the supplied [`Args`].
/// This function can be used to identify the location of
/// the application folder that contains kaspad logs and the database.
pub fn get_app_dir_from_args(args: &Args) -> PathBuf {
let app_dir = args
.appdir
.clone()
Expand All @@ -110,32 +115,42 @@ fn get_app_dir_from_args(args: &Args) -> PathBuf {
}
}

/// Get the log directory from the supplied [`Args`].
pub fn get_log_dir(args: &Args) -> Option<String> {
let network = args.network();
let app_dir = get_app_dir_from_args(args);

// Logs directory is usually under the application directory, unless otherwise specified
let log_dir = args.logdir.clone().unwrap_or_default().replace('~', get_home_dir().as_path().to_str().unwrap());
let log_dir = if log_dir.is_empty() { app_dir.join(network.to_prefixed()).join(DEFAULT_LOG_DIR) } else { PathBuf::from(log_dir) };
let log_dir = if args.no_log_files { None } else { log_dir.to_str().map(String::from) };
log_dir
}

impl Runtime {
pub fn from_args(args: &Args) -> Self {
// Configure the panic behavior
kaspa_core::panic::configure_panic();

let network = args.network();
let app_dir = get_app_dir_from_args(args);

// Logs directory is usually under the application directory, unless otherwise specified
let log_dir = args.logdir.clone().unwrap_or_default().replace('~', get_home_dir().as_path().to_str().unwrap());
let log_dir =
if log_dir.is_empty() { app_dir.join(network.to_prefixed()).join(DEFAULT_LOG_DIR) } else { PathBuf::from(log_dir) };
let log_dir = if args.no_log_files { None } else { log_dir.to_str() };
let log_dir = get_log_dir(args);

// Initialize the logger
kaspa_core::log::init_logger(log_dir, &args.log_level);
kaspa_core::log::init_logger(log_dir.as_deref(), &args.log_level);

Self { log_dir: log_dir.map(|log_dir| log_dir.to_owned()) }
}
}

/// Create [`Core`] instance with supplied [`Args`].
/// This function will automatically create a [`Runtime`]
/// instance with the supplied [`Args`] and then
/// call [`create_core_with_runtime`].
pub fn create_core(args: Args) -> (Arc<Core>, Arc<RpcCoreService>) {
let rt = Runtime::from_args(&args);
create_core_with_runtime(&rt, &args)
}

/// Create [`Core`] instance with supplied [`Args`] and [`Runtime`].
pub fn create_core_with_runtime(runtime: &Runtime, args: &Args) -> (Arc<Core>, Arc<RpcCoreService>) {
let network = args.network();

Expand Down
3 changes: 2 additions & 1 deletion wallet/core/src/imports.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
pub use crate::error::Error;
pub use crate::events::{Events, SyncState};
pub use crate::rpc::Rpc;
pub use crate::rpc::{DynRpcApi, RpcCtl};
pub use crate::utxo::scan::{Scan, ScanExtent};
pub use crate::DynRpcApi;
pub use crate::{runtime, storage, utils, utxo};
pub use async_trait::async_trait;
pub use borsh::{BorshDeserialize, BorshSchema, BorshSerialize};
Expand Down
7 changes: 5 additions & 2 deletions wallet/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ pub mod error;
pub mod events;
mod imports;
pub mod result;
pub mod rpc;
pub mod runtime;
pub mod secret;
pub mod settings;
Expand All @@ -23,5 +24,7 @@ pub use kaspa_wrpc_client::client::{ConnectOptions, ConnectStrategy};
pub use result::Result;
pub use settings::{DefaultSettings, SettingsStore, SettingsStoreT, WalletSettings};

pub type DynRpcApi = dyn kaspa_rpc_core::api::rpc::RpcApi;
pub type NotificationChannel = kaspa_utils::channel::Channel<kaspa_rpc_core::Notification>;
/// Returns the version of the Wallet framework.
pub fn version() -> String {
env!("CARGO_PKG_VERSION").to_string()
}
29 changes: 29 additions & 0 deletions wallet/core/src/rpc.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
use std::sync::Arc;

pub use kaspa_rpc_core::api::ctl::RpcCtl;
pub type DynRpcApi = dyn kaspa_rpc_core::api::rpc::RpcApi;
pub type NotificationChannel = kaspa_utils::channel::Channel<kaspa_rpc_core::Notification>;
pub use kaspa_rpc_core::notify::mode::NotificationMode;
pub use kaspa_wrpc_client::WrpcEncoding;

/// RPC adaptor class that holds the [`RpcApi`](crate::api::RpcApi)
/// and [`RpcCtl`](crate::api::RpcCtl) instances.
#[derive(Clone)]
pub struct Rpc {
pub rpc_api: Arc<DynRpcApi>,
pub rpc_ctl: RpcCtl,
}

impl Rpc {
pub fn new(rpc_api: Arc<DynRpcApi>, rpc_ctl: RpcCtl) -> Self {
Rpc { rpc_api, rpc_ctl }
}

pub fn rpc_api(&self) -> &Arc<DynRpcApi> {
&self.rpc_api
}

pub fn rpc_ctl(&self) -> &RpcCtl {
&self.rpc_ctl
}
}
10 changes: 5 additions & 5 deletions wallet/core/src/runtime/account/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ pub trait Account: AnySync + Send + Sync + 'static {

transaction.try_sign()?;
transaction.log().await?;
let id = transaction.try_submit(self.wallet().rpc_api()).await?;
let id = transaction.try_submit(&self.wallet().rpc_api()).await?;
ids.push(id);
yield_executor().await;
}
Expand Down Expand Up @@ -347,7 +347,7 @@ pub trait Account: AnySync + Send + Sync + 'static {

transaction.try_sign()?;
transaction.log().await?;
let id = transaction.try_submit(self.wallet().rpc_api()).await?;
let id = transaction.try_submit(&self.wallet().rpc_api()).await?;
ids.push(id);
yield_executor().await;
}
Expand Down Expand Up @@ -453,7 +453,7 @@ pub trait DerivationCapableAccount: Account {
let mut stream = generator.stream();
while let Some(transaction) = stream.try_next().await? {
transaction.try_sign()?;
let id = transaction.try_submit(self.wallet().rpc_api()).await?;
let id = transaction.try_submit(&self.wallet().rpc_api()).await?;
if let Some(notifier) = notifier.as_ref() {
notifier(index, balance, Some(id));
}
Expand Down Expand Up @@ -483,7 +483,7 @@ pub trait DerivationCapableAccount: Account {
let address = self.derivation().receive_address_manager().new_address()?;
self.utxo_context().register_addresses(&[address.clone()]).await?;

let metadata = self.metadata()?.expect("derivation accounds must provide metadata");
let metadata = self.metadata()?.expect("derivation accounts must provide metadata");
let store = self.wallet().store().as_account_store()?;
store.update_metadata(&[&metadata]).await?;

Expand All @@ -494,7 +494,7 @@ pub trait DerivationCapableAccount: Account {
let address = self.derivation().change_address_manager().new_address()?;
self.utxo_context().register_addresses(&[address.clone()]).await?;

let metadata = self.metadata()?.expect("derivation accounds must provide metadata");
let metadata = self.metadata()?.expect("derivation accounts must provide metadata");
let store = self.wallet().store().as_account_store()?;
store.update_metadata(&[&metadata]).await?;

Expand Down
17 changes: 11 additions & 6 deletions wallet/core/src/runtime/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use futures::{pin_mut, TryFutureExt};
use regex::Regex;
struct Inner {
task_ctl: DuplexChannel,
rpc: Arc<DynRpcApi>,
rpc: Mutex<Option<Rpc>>,
multiplexer: Multiplexer<Box<Events>>,
running: AtomicBool,
is_synced: AtomicBool,
Expand All @@ -18,10 +18,10 @@ pub struct SyncMonitor {
}

impl SyncMonitor {
pub fn new(rpc: &Arc<DynRpcApi>, multiplexer: &Multiplexer<Box<Events>>) -> Self {
pub fn new(rpc: Option<Rpc>, multiplexer: &Multiplexer<Box<Events>>) -> Self {
Self {
inner: Arc::new(Inner {
rpc: rpc.clone(),
rpc: Mutex::new(rpc.clone()),
multiplexer: multiplexer.clone(),
task_ctl: DuplexChannel::oneshot(),
running: AtomicBool::new(false),
Expand Down Expand Up @@ -70,8 +70,13 @@ impl SyncMonitor {
Ok(())
}

pub fn rpc(&self) -> &Arc<DynRpcApi> {
&self.inner.rpc
pub fn rpc_api(&self) -> Arc<DynRpcApi> {
self.inner.rpc.lock().unwrap().as_ref().expect("SyncMonitor RPC not initialized").rpc_api().clone()
}

pub async fn bind_rpc(&self, rpc: Option<Rpc>) -> Result<()> {
*self.inner.rpc.lock().unwrap() = rpc;
Ok(())
}

pub fn multiplexer(&self) -> &Multiplexer<Box<Events>> {
Expand All @@ -98,7 +103,7 @@ impl SyncMonitor {
async fn get_sync_status(&self) -> Result<bool> {
cfg_if! {
if #[cfg(feature = "legacy-rpc")] {
Ok(self.rpc().get_info().map_ok(|info| info.is_synced).await?)
Ok(self.rpc_api().get_info().map_ok(|info| info.is_synced).await?)
} else {
Ok(self.rpc().get_sync_status().await?)
}
Expand Down
52 changes: 27 additions & 25 deletions wallet/core/src/runtime/wallet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ use kaspa_notify::{
listener::ListenerId,
scope::{Scope, VirtualDaaScoreChangedScope},
};
use kaspa_rpc_core::api::ctl::RpcCtl;
use kaspa_rpc_core::notify::mode::NotificationMode;
use kaspa_wrpc_client::{KaspaRpcClient, WrpcEncoding};
use std::sync::Arc;
Expand Down Expand Up @@ -102,8 +101,7 @@ pub struct Inner {
store: Arc<dyn Interface>,
settings: SettingsStore<WalletSettings>,
utxo_processor: Arc<UtxoProcessor>,
rpc_api: Arc<DynRpcApi>,
rpc_ctl: RpcCtl,
rpc: Mutex<Option<Rpc>>,
multiplexer: Multiplexer<Box<Events>>,
}

Expand All @@ -123,34 +121,29 @@ impl Wallet {
}

pub fn try_new(storage: Arc<dyn Interface>, network_id: Option<NetworkId>) -> Result<Wallet> {
Wallet::try_with_rpc(None, storage, network_id)
Wallet::try_with_wrpc(storage, network_id)
}

pub fn try_with_wrpc(store: Arc<dyn Interface>, network_id: Option<NetworkId>) -> Result<Wallet> {
let rpc_client =
Arc::new(KaspaRpcClient::new_with_args(WrpcEncoding::Borsh, NotificationMode::MultiListeners, "wrpc://127.0.0.1:17110")?);
let rpc_ctl = rpc_client.ctl().clone();
let rpc_api: Arc<DynRpcApi> = rpc_client;
let rpc = Rpc::new(rpc_api, rpc_ctl);
Self::try_with_rpc(Some(rpc), store, network_id)
}

pub fn try_with_rpc(
rpc: Option<(Arc<DynRpcApi>, RpcCtl)>,
rpc: Option<Rpc>,
store: Arc<dyn Interface>,
network_id: Option<NetworkId>,
) -> Result<Wallet> {
let (rpc_api, rpc_ctl) = if let Some((rpc_api, rpc_ctl)) = rpc {
(rpc_api, rpc_ctl)
} else {
let rpc_client = Arc::new(KaspaRpcClient::new_with_args(
WrpcEncoding::Borsh,
NotificationMode::MultiListeners,
"wrpc://127.0.0.1:17110",
)?);
let rpc_ctl = rpc_client.ctl().clone();
let rpc_api: Arc<DynRpcApi> = rpc_client;
(rpc_api, rpc_ctl)
};

let multiplexer = Multiplexer::<Box<Events>>::new();
let utxo_processor = Arc::new(UtxoProcessor::new(&rpc_api, &rpc_ctl, network_id, Some(multiplexer.clone())));
let utxo_processor = Arc::new(UtxoProcessor::new(rpc.clone(), network_id, Some(multiplexer.clone())));

let wallet = Wallet {
inner: Arc::new(Inner {
rpc_api,
rpc_ctl,
rpc: Mutex::new(rpc),
multiplexer,
store,
active_accounts: ActiveAccountMap::default(),
Expand Down Expand Up @@ -303,12 +296,21 @@ impl Wallet {
self.rpc_api().clone().downcast_arc::<KaspaRpcClient>().ok()
}

pub fn rpc_api(&self) -> &Arc<DynRpcApi> {
&self.inner.rpc_api
pub fn rpc_api(&self) -> Arc<DynRpcApi> {
self.inner.rpc.lock().unwrap().as_ref().expect("Wallet RPC not initialized").rpc_api().clone()
}

pub fn rpc_ctl(&self) -> &RpcCtl {
&self.inner.rpc_ctl
pub fn rpc_ctl(&self) -> RpcCtl {
self.inner.rpc.lock().unwrap().as_ref().expect("Wallet RPC not initialized").rpc_ctl().clone()
}

pub fn has_rpc(&self) -> bool {
self.inner.rpc.lock().unwrap().is_some()
}

pub async fn bind_rpc(self: &Arc<Self>, rpc: Option<Rpc>) -> Result<()> {
self.utxo_processor().bind_rpc(rpc).await?;
Ok(())
}

pub fn multiplexer(&self) -> &Multiplexer<Box<Events>> {
Expand Down
4 changes: 1 addition & 3 deletions wallet/core/src/secret.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,6 @@ impl Drop for Secret {

impl std::fmt::Debug for Secret {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Secret")
.field("secret", &"********")
.finish()
f.debug_struct("Secret").field("secret", &"********").finish()
}
}
2 changes: 1 addition & 1 deletion wallet/core/src/tx/generator/pending.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::result::Result;
use crate::rpc::DynRpcApi;
use crate::tx::{DataKind, Generator};
use crate::utxo::UtxoEntryReference;
use crate::DynRpcApi;
use kaspa_addresses::Address;
use kaspa_consensus_core::network::NetworkType;
use kaspa_consensus_core::sign::sign_with_multiple_v2;
Expand Down
Loading

0 comments on commit dea9062

Please sign in to comment.