Skip to content

Commit

Permalink
FeeRate estimation API and FeeRate poller API for Wallet
Browse files Browse the repository at this point in the history
  • Loading branch information
aspect committed Dec 23, 2024
1 parent 03100ae commit c7d7c88
Show file tree
Hide file tree
Showing 9 changed files with 260 additions and 1 deletion.
1 change: 1 addition & 0 deletions cli/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,7 @@ impl KaspaCli {
Events::Metrics { network_id : _, metrics : _ } => {
// log_info!("Kaspa NG - received metrics event {metrics:?}")
}
Events::FeeRate { .. } => {},
Events::Error { message } => { terrorln!(this,"{message}"); },
Events::UtxoProcStart => {},
Events::UtxoProcStop => {},
Expand Down
50 changes: 50 additions & 0 deletions wallet/core/src/api/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
use crate::imports::*;
use crate::tx::{Fees, GeneratorSummary, PaymentDestination};
use kaspa_addresses::Address;
use kaspa_rpc_core::RpcFeerateBucket;

#[derive(Clone, Debug, Serialize, Deserialize, BorshSerialize, BorshDeserialize)]
#[serde(rename_all = "camelCase")]
Expand Down Expand Up @@ -548,6 +549,55 @@ pub struct AccountsEstimateResponse {
pub generator_summary: GeneratorSummary,
}

#[derive(Clone, Debug, Serialize, Deserialize, BorshSerialize, BorshDeserialize)]
#[serde(rename_all = "camelCase")]
pub struct FeeRateEstimateBucket {
feerate: f64,
seconds: f64,
}

impl From<RpcFeerateBucket> for FeeRateEstimateBucket {
fn from(bucket: RpcFeerateBucket) -> Self {
Self { feerate: bucket.feerate, seconds: bucket.estimated_seconds }
}
}

impl From<&RpcFeerateBucket> for FeeRateEstimateBucket {
fn from(bucket: &RpcFeerateBucket) -> Self {
Self { feerate: bucket.feerate, seconds: bucket.estimated_seconds }
}
}

#[derive(Clone, Debug, Serialize, Deserialize, BorshSerialize, BorshDeserialize)]
#[serde(rename_all = "camelCase")]
pub struct FeeRateEstimateRequest {}

#[derive(Clone, Debug, Serialize, Deserialize, BorshSerialize, BorshDeserialize)]
#[serde(rename_all = "camelCase")]
pub struct FeeRateEstimateResponse {
pub priority: FeeRateEstimateBucket,
pub normal: FeeRateEstimateBucket,
pub low: FeeRateEstimateBucket,
}

#[derive(Clone, Debug, Serialize, Deserialize, BorshSerialize, BorshDeserialize)]
#[serde(rename_all = "camelCase")]
pub struct FeeRatePollerEnableRequest {
pub interval_seconds: u64,
}

#[derive(Clone, Debug, Serialize, Deserialize, BorshSerialize, BorshDeserialize)]
#[serde(rename_all = "camelCase")]
pub struct FeeRatePollerEnableResponse {}

#[derive(Clone, Debug, Serialize, Deserialize, BorshSerialize, BorshDeserialize)]
#[serde(rename_all = "camelCase")]
pub struct FeeRatePollerDisableRequest {}

#[derive(Clone, Debug, Serialize, Deserialize, BorshSerialize, BorshDeserialize)]
#[serde(rename_all = "camelCase")]
pub struct FeeRatePollerDisableResponse {}

#[derive(Clone, Debug, Serialize, Deserialize, BorshSerialize, BorshDeserialize)]
#[serde(rename_all = "camelCase")]
pub struct TransactionsDataGetRequest {
Expand Down
33 changes: 33 additions & 0 deletions wallet/core/src/api/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,39 @@ pub trait WalletApi: Send + Sync + AnySync {
/// an error.
async fn accounts_estimate_call(self: Arc<Self>, request: AccountsEstimateRequest) -> Result<AccountsEstimateResponse>;

/// Wrapper around [`accounts_estimate_call()`](Self::accounts_estimate_call)
async fn fee_rate_estimate(self: Arc<Self>) -> Result<FeeRateEstimateResponse> {
Ok(self.fee_rate_estimate_call(FeeRateEstimateRequest {}).await?)
}

/// Estimate current network fee rate. Returns a [`FeeRateEstimateResponse`]
async fn fee_rate_estimate_call(self: Arc<Self>, request: FeeRateEstimateRequest) -> Result<FeeRateEstimateResponse>;

/// Wrapper around [`fee_rate_poller_enable_call()`](Self::fee_rate_poller_enable_call).
async fn fee_rate_poller_enable(self: Arc<Self>, interval_seconds: u64) -> Result<()> {
self.fee_rate_poller_enable_call(FeeRatePollerEnableRequest { interval_seconds }).await?;
Ok(())
}

/// Enable the fee rate poller. The fee rate poller is a background task that
/// periodically polls the network for the current fee rate. The fee rate is
/// used to estimate the transaction fee. The poller is disabled by default.
/// This function stops the previously enabled poller and starts a new one
/// with the specified `interval`.
async fn fee_rate_poller_enable_call(self: Arc<Self>, request: FeeRatePollerEnableRequest) -> Result<FeeRatePollerEnableResponse>;

/// Wrapper around [`fee_rate_poller_disable_call()`](Self::fee_rate_poller_disable_call).
async fn fee_rate_poller_disable(self: Arc<Self>) -> Result<()> {
self.fee_rate_poller_disable_call(FeeRatePollerDisableRequest {}).await?;
Ok(())
}

/// Disable the fee rate poller.
async fn fee_rate_poller_disable_call(
self: Arc<Self>,
request: FeeRatePollerDisableRequest,
) -> Result<FeeRatePollerDisableResponse>;

/// Get a range of transaction records for a specific account id.
/// Wrapper around [`transactions_data_get_call()`](Self::transactions_data_get_call).
async fn transactions_data_get_range(
Expand Down
6 changes: 6 additions & 0 deletions wallet/core/src/api/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,9 @@ impl WalletApi for WalletClient {
TransactionsReplaceNote,
TransactionsReplaceMetadata,
AddressBookEnumerate,
FeeRateEstimate,
FeeRatePollerEnable,
FeeRatePollerDisable,
]}
}

Expand Down Expand Up @@ -182,6 +185,9 @@ impl WalletServer {
TransactionsReplaceNote,
TransactionsReplaceMetadata,
AddressBookEnumerate,
FeeRateEstimate,
FeeRatePollerEnable,
FeeRatePollerDisable,
]}
}

Expand Down
10 changes: 10 additions & 0 deletions wallet/core/src/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
//! produced by the client RPC and the Kaspa node monitoring subsystems.
//!
use crate::api::message::FeeRateEstimateBucket;
use crate::imports::*;
use crate::storage::{Hint, PrvKeyDataInfo, StorageDescriptor, TransactionRecord, WalletDescriptor};
use crate::utxo::context::UtxoContextId;
Expand Down Expand Up @@ -221,6 +222,11 @@ pub enum Events {
// metrics_data: MetricsData,
metrics: MetricsUpdate,
},
FeeRate {
priority: FeeRateEstimateBucket,
normal: FeeRateEstimateBucket,
low: FeeRateEstimateBucket,
},
/// A general wallet framework error, emitted when an unexpected
/// error occurs within the wallet framework.
Error {
Expand Down Expand Up @@ -284,6 +290,7 @@ pub enum EventKind {
Discovery,
Balance,
Metrics,
FeeRate,
Error,
}

Expand Down Expand Up @@ -320,6 +327,7 @@ impl From<&Events> for EventKind {
Events::Discovery { .. } => EventKind::Discovery,
Events::Balance { .. } => EventKind::Balance,
Events::Metrics { .. } => EventKind::Metrics,
Events::FeeRate { .. } => EventKind::FeeRate,
Events::Error { .. } => EventKind::Error,
}
}
Expand Down Expand Up @@ -359,6 +367,7 @@ impl FromStr for EventKind {
"discovery" => Ok(EventKind::Discovery),
"balance" => Ok(EventKind::Balance),
"metrics" => Ok(EventKind::Metrics),
"fee-rate" => Ok(EventKind::FeeRate),
"error" => Ok(EventKind::Error),
_ => Err(Error::custom("Invalid event kind")),
}
Expand Down Expand Up @@ -406,6 +415,7 @@ impl std::fmt::Display for EventKind {
EventKind::Discovery => "discovery",
EventKind::Balance => "balance",
EventKind::Metrics => "metrics",
EventKind::FeeRate => "fee-rate",
EventKind::Error => "error",
};

Expand Down
48 changes: 47 additions & 1 deletion wallet/core/src/utxo/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use kaspa_rpc_core::{
ops::{RPC_API_REVISION, RPC_API_VERSION},
},
message::UtxosChangedNotification,
GetServerInfoResponse,
GetServerInfoResponse, RpcFeeEstimate,
};
use kaspa_wrpc_client::KaspaRpcClient;
use workflow_core::channel::{Channel, DuplexChannel, Sender};
Expand Down Expand Up @@ -61,6 +61,8 @@ pub struct Inner {
metrics: Arc<Metrics>,
metrics_kinds: Mutex<Vec<MetricsUpdateKind>>,
connection_signaler: Mutex<Option<Sender<std::result::Result<(), String>>>>,
fee_rate_task_ctl: DuplexChannel,
fee_rate_task_is_running: AtomicBool,
}

impl Inner {
Expand Down Expand Up @@ -91,6 +93,8 @@ impl Inner {
metrics: Arc::new(Metrics::default()),
metrics_kinds: Mutex::new(vec![]),
connection_signaler: Mutex::new(None),
fee_rate_task_ctl: DuplexChannel::oneshot(),
fee_rate_task_is_running: AtomicBool::new(false),
}
}
}
Expand Down Expand Up @@ -728,6 +732,48 @@ impl UtxoProcessor {
pub fn enable_metrics_kinds(&self, metrics_kinds: &[MetricsUpdateKind]) {
*self.inner.metrics_kinds.lock().unwrap() = metrics_kinds.to_vec();
}

pub async fn start_fee_rate_poller(&self, poller_interval: Duration) -> Result<()> {
self.stop_fee_rate_poller().await.ok();

let this = self.clone();
this.inner.fee_rate_task_is_running.store(true, Ordering::SeqCst);
let fee_rate_task_ctl_receiver = self.inner.fee_rate_task_ctl.request.receiver.clone();
let fee_rate_task_ctl_sender = self.inner.fee_rate_task_ctl.response.sender.clone();

let mut interval = workflow_core::task::interval(poller_interval);

spawn(async move {
loop {
select_biased! {
_ = interval.next().fuse() => {
if let Ok(fee_rate) = this.rpc_api().get_fee_estimate().await {
let RpcFeeEstimate { priority_bucket, normal_buckets, low_buckets } = fee_rate;
this.notify(Events::FeeRate {
priority : priority_bucket.into(),
normal : normal_buckets.first().expect("missing normal feerate bucket").into(),
low : low_buckets.first().expect("missing normal feerate bucket").into()
}).await.ok();
}
},
_ = fee_rate_task_ctl_receiver.recv().fuse() => {
break;
},
}
}

fee_rate_task_ctl_sender.send(()).await.unwrap();
});

Ok(())
}

pub async fn stop_fee_rate_poller(&self) -> Result<()> {
if self.inner.fee_rate_task_is_running.load(Ordering::SeqCst) {
self.inner.fee_rate_task_ctl.signal(()).await.expect("UtxoProcessor::stop_task() `signal` error");
}
Ok(())
}
}

#[cfg(test)]
Expand Down
25 changes: 25 additions & 0 deletions wallet/core/src/wallet/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use crate::result::Result;
use crate::storage::interface::TransactionRangeResult;
use crate::storage::Binding;
use crate::tx::Fees;
use kaspa_rpc_core::RpcFeeEstimate;
use workflow_core::channel::Receiver;

#[async_trait]
Expand Down Expand Up @@ -513,4 +514,28 @@ impl WalletApi for super::Wallet {
) -> Result<AddressBookEnumerateResponse> {
return Err(Error::NotImplemented);
}

async fn fee_rate_estimate_call(self: Arc<Self>, _request: FeeRateEstimateRequest) -> Result<FeeRateEstimateResponse> {
let RpcFeeEstimate { priority_bucket, normal_buckets, low_buckets } = self.rpc_api().get_fee_estimate().await?;

Ok(FeeRateEstimateResponse {
priority: priority_bucket.into(),
normal: normal_buckets.first().ok_or(Error::custom("missing normal feerate bucket"))?.into(),
low: low_buckets.first().ok_or(Error::custom("missing normal feerate bucket"))?.into(),
})
}

async fn fee_rate_poller_enable_call(self: Arc<Self>, request: FeeRatePollerEnableRequest) -> Result<FeeRatePollerEnableResponse> {
let FeeRatePollerEnableRequest { interval_seconds } = request;
self.utxo_processor().start_fee_rate_poller(Duration::from_secs(interval_seconds)).await?;
Ok(FeeRatePollerEnableResponse {})
}

async fn fee_rate_poller_disable_call(
self: Arc<Self>,
_request: FeeRatePollerDisableRequest,
) -> Result<FeeRatePollerDisableResponse> {
self.utxo_processor().stop_fee_rate_poller().await?;
Ok(FeeRatePollerDisableResponse {})
}
}
85 changes: 85 additions & 0 deletions wallet/core/src/wasm/api/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1555,6 +1555,91 @@ try_from! ( args: AccountsEstimateResponse, IAccountsEstimateResponse, {

// ---

declare! {
IFeeRateEstimateBucket,
r#"
export interface IFeeRateEstimateBucket {
feeRate : number;
seconds : number;
}
"#,
}

declare! {
IFeeRateEstimateRequest,
r#"
export interface IFeeRateEstimateRequest { }
"#,
}

try_from! ( _args: IFeeRateEstimateRequest, FeeRateEstimateRequest, {
Ok(FeeRateEstimateRequest { })
});

declare! {
IFeeRateEstimateResponse,
r#"
export interface IFeeRateEstimateResponse {
priority : IFeeRateEstimateBucket,
normal : IFeeRateEstimateBucket,
low : IFeeRateEstimateBucket,
}
"#,
}

try_from! ( args: FeeRateEstimateResponse, IFeeRateEstimateResponse, {
Ok(to_value(&args)?.into())
});

declare! {
IFeeRatePollerEnableRequest,
r#"
export interface IFeeRatePollerEnableRequest {
intervalSeconds : number;
}
"#,
}

try_from! ( args: IFeeRatePollerEnableRequest, FeeRatePollerEnableRequest, {
let interval_seconds = args.get_u64("intervalSeconds")?;
Ok(FeeRatePollerEnableRequest { interval_seconds })
});

declare! {
IFeeRatePollerEnableResponse,
r#"
export interface IFeeRatePollerEnableResponse { }
"#,
}

try_from! ( _args: FeeRatePollerEnableResponse, IFeeRatePollerEnableResponse, {
Ok(IFeeRatePollerEnableResponse::default())
});

declare! {
IFeeRatePollerDisableRequest,
r#"
export interface IFeeRatePollerDisableRequest { }
"#,
}

try_from! ( _args: IFeeRatePollerDisableRequest, FeeRatePollerDisableRequest, {
Ok(FeeRatePollerDisableRequest { })
});

declare! {
IFeeRatePollerDisableResponse,
r#"
export interface IFeeRatePollerDisableResponse { }
"#,
}

try_from! ( _args: FeeRatePollerDisableResponse, IFeeRatePollerDisableResponse, {
Ok(IFeeRatePollerDisableResponse::default())
});

// ---

declare! {
ITransactionsDataGetRequest,
r#"
Expand Down
3 changes: 3 additions & 0 deletions wallet/core/src/wasm/api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,4 +52,7 @@ declare_wasm_handlers!([
TransactionsReplaceNote,
TransactionsReplaceMetadata,
AddressBookEnumerate,
FeeRateEstimate,
FeeRatePollerEnable,
FeeRatePollerDisable,
]);

0 comments on commit c7d7c88

Please sign in to comment.