diff --git a/Cargo.lock b/Cargo.lock index b611fcd489..a627871434 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4136,6 +4136,7 @@ dependencies = [ "tokio", "tracing", "utils", + "utils-networking", "utxo", ] diff --git a/blockprod/src/lib.rs b/blockprod/src/lib.rs index 2e119ee229..5220401946 100644 --- a/blockprod/src/lib.rs +++ b/blockprod/src/lib.rs @@ -201,7 +201,7 @@ mod tests { let tip_sx = utils::sync::Mutex::new(Some(tip_sx)); mempool .call_mut(move |m| { - m.subscribe_to_events(Arc::new({ + m.subscribe_to_subsystem_events(Arc::new({ move |evt| match evt { mempool::event::MempoolEvent::NewTip(tip) => { if let Some(tip_sx) = tip_sx.lock().unwrap().take() { diff --git a/chainstate/src/rpc/mod.rs b/chainstate/src/rpc/mod.rs index 63324daafc..51a2981951 100644 --- a/chainstate/src/rpc/mod.rs +++ b/chainstate/src/rpc/mod.rs @@ -174,8 +174,8 @@ trait ChainstateRpc { /// Subscribe to chainstate events, such as new tip. /// /// After a successful subscription, the node will message the subscriber with a message on every event. - #[subscription(name = "subscribe_events", item = RpcEvent)] - async fn subscribe_events(&self) -> rpc::subscription::Reply; + #[subscription(name = "subscribe_to_events", item = RpcEvent)] + async fn subscribe_to_events(&self) -> rpc::subscription::Reply; } #[async_trait::async_trait] @@ -405,7 +405,7 @@ impl ChainstateRpcServer for super::ChainstateHandle { rpc::handle_result(self.call(move |this| this.info()).await) } - async fn subscribe_events(&self, pending: subscription::Pending) -> subscription::Reply { + async fn subscribe_to_events(&self, pending: subscription::Pending) -> subscription::Reply { let event_rx = self.call_mut(move |this| this.subscribe_to_rpc_events()).await?; rpc::subscription::connect_broadcast_map(event_rx, pending, RpcEvent::from_event).await } diff --git a/mempool/Cargo.toml b/mempool/Cargo.toml index ba0b5012ba..a58c90773c 100644 --- a/mempool/Cargo.toml +++ b/mempool/Cargo.toml @@ -26,6 +26,7 @@ serialization = { path = "../serialization" } subsystem = { path = "../subsystem" } tokens-accounting = { path = "../tokens-accounting" } utils = { path = "../utils" } +utils-networking = {path = '../utils/networking'} utxo = { path = "../utxo" } anyhow.workspace = true diff --git a/mempool/src/interface/mempool_interface.rs b/mempool/src/interface/mempool_interface.rs index 511c9abadc..b12da196b4 100644 --- a/mempool/src/interface/mempool_interface.rs +++ b/mempool/src/interface/mempool_interface.rs @@ -72,8 +72,11 @@ pub trait MempoolInterface: Send + Sync { packing_strategy: PackingStrategy, ) -> Result>, BlockConstructionError>; - /// Subscribe to events emitted by mempool - fn subscribe_to_events(&mut self, handler: Arc); + /// Subscribe to events emitted by mempool subsystem + fn subscribe_to_subsystem_events(&mut self, handler: Arc); + + /// Subscribe to broadcast mempool events + fn subscribe_to_rpc_events(&mut self) -> utils_networking::broadcaster::Receiver; /// Get current memory usage fn memory_usage(&self) -> usize; diff --git a/mempool/src/interface/mempool_interface_impl.rs b/mempool/src/interface/mempool_interface_impl.rs index d3bcf70d45..eba3c23dfd 100644 --- a/mempool/src/interface/mempool_interface_impl.rs +++ b/mempool/src/interface/mempool_interface_impl.rs @@ -150,10 +150,14 @@ impl MempoolInterface for Mempool { self.collect_txs(tx_accumulator, transaction_ids, packing_strategy) } - fn subscribe_to_events(&mut self, handler: Arc) { + fn subscribe_to_subsystem_events(&mut self, handler: Arc) { self.subscribe_to_events(handler); } + fn subscribe_to_rpc_events(&mut self) -> utils_networking::broadcaster::Receiver { + self.subscribe_to_event_broadcast() + } + fn memory_usage(&self) -> usize { self.memory_usage() } diff --git a/mempool/src/lib.rs b/mempool/src/lib.rs index 4b1545fae5..e80891ec7b 100644 --- a/mempool/src/lib.rs +++ b/mempool/src/lib.rs @@ -25,6 +25,7 @@ pub mod event; mod interface; mod pool; pub mod rpc; +pub mod rpc_event; pub mod tx_accumulator; pub use {config::MempoolConfig, pool::feerate_points::find_interpolated_value, pool::FeeRate}; diff --git a/mempool/src/pool/mod.rs b/mempool/src/pool/mod.rs index 4391bf4778..78f0aecaa9 100644 --- a/mempool/src/pool/mod.rs +++ b/mempool/src/pool/mod.rs @@ -23,6 +23,7 @@ use common::{ }; use logging::log; use utils::{const_value::ConstValue, ensure, eventhandler::EventsController}; +use utils_networking::broadcaster; pub use self::{feerate::FeeRate, tx_pool::feerate_points}; @@ -66,7 +67,7 @@ pub struct Mempool { tx_pool: tx_pool::TxPool, orphans: TxOrphanPool, work_queue: WorkQueue, - events_controller: EventsController, + events_broadcast: EventsBroadcast, clock: TimeGetter, } @@ -89,13 +90,17 @@ impl Mempool { tx_pool, orphans: orphans::TxOrphanPool::new(), work_queue: WorkQueue::new(), - events_controller: EventsController::new(), + events_broadcast: EventsBroadcast::new(), clock, } } pub fn subscribe_to_events(&mut self, handler: Arc) { - self.events_controller.subscribe_to_events(handler) + self.events_broadcast.subscribe_to_events(handler) + } + + pub fn subscribe_to_event_broadcast(&mut self) -> broadcaster::Receiver { + self.events_broadcast.subscribe_to_event_broadcast() } pub fn on_peer_disconnected(&mut self, peer_id: p2p_types::PeerId) { @@ -140,11 +145,11 @@ impl Mempool { tx_pool, orphans, work_queue, - events_controller, + events_broadcast, clock, } = self; - let finalizer = TxFinalizer::new(orphans, clock, events_controller, work_queue); + let finalizer = TxFinalizer::new(orphans, clock, events_broadcast, work_queue); (tx_pool, finalizer) } } @@ -243,7 +248,8 @@ impl Mempool { })?; let new_tip = event::NewTip::new(block_id, height); - self.events_controller.broadcast(new_tip.into()); + let event = new_tip.into(); + self.events_broadcast.broadcast(event); Ok(()) } @@ -281,6 +287,33 @@ impl Mempool { } } +struct EventsBroadcast { + events_controller: EventsController, + events_broadcaster: broadcaster::Broadcaster, +} + +impl EventsBroadcast { + fn new() -> Self { + Self { + events_controller: EventsController::new(), + events_broadcaster: broadcaster::Broadcaster::new(), + } + } + + fn subscribe_to_events(&mut self, handler: Arc) { + self.events_controller.subscribe_to_events(handler) + } + + fn subscribe_to_event_broadcast(&mut self) -> broadcaster::Receiver { + self.events_broadcaster.subscribe() + } + + fn broadcast(&mut self, event: MempoolEvent) { + self.events_broadcaster.broadcast(&event); + self.events_controller.broadcast(event); + } +} + /// [TxFinalizer] holds data needed to finalize the transaction processing after it's been processed /// by the transaction pool. /// @@ -291,7 +324,7 @@ impl Mempool { struct TxFinalizer<'a> { orphan_pool: &'a mut TxOrphanPool, cur_time: Time, - events_controller: &'a EventsController, + events_broadcast: &'a mut EventsBroadcast, work_queue: &'a mut WorkQueue, } @@ -299,13 +332,13 @@ impl<'a> TxFinalizer<'a> { pub fn new( orphan_pool: &'a mut TxOrphanPool, clock: &TimeGetter, - events_controller: &'a EventsController, + events_broadcast: &'a mut EventsBroadcast, work_queue: &'a mut WorkQueue, ) -> Self { Self { orphan_pool, cur_time: clock.get_time(), - events_controller, + events_broadcast, work_queue, } } @@ -323,8 +356,9 @@ impl<'a> TxFinalizer<'a> { log::trace!("Added transaction {tx_id}"); self.enqueue_children(transaction.tx_entry()); - let evt = event::TransactionProcessed::accepted(tx_id, relay_policy, origin); - self.events_controller.broadcast(evt.into()); + let event = event::TransactionProcessed::accepted(tx_id, relay_policy, origin); + let event = event.into(); + self.events_broadcast.broadcast(event); Ok(TxStatus::InMempool) } TxAdditionOutcome::Duplicate { transaction } => { @@ -337,8 +371,9 @@ impl<'a> TxFinalizer<'a> { log::trace!("Rejected transaction {tx_id}, checking orphan status"); self.try_add_orphan(tx_pool, transaction, error).inspect_err(|err| { - let evt = event::TransactionProcessed::rejected(tx_id, err.clone(), origin); - self.events_controller.broadcast(evt.into()); + let event = event::TransactionProcessed::rejected(tx_id, err.clone(), origin); + let event = event.into(); + self.events_broadcast.broadcast(event); }) } } diff --git a/mempool/src/rpc.rs b/mempool/src/rpc.rs index 41f8eb7b57..b6b043f45f 100644 --- a/mempool/src/rpc.rs +++ b/mempool/src/rpc.rs @@ -25,7 +25,7 @@ use mempool_types::{tx_options::TxOptionsOverrides, tx_origin::LocalTxOrigin, Tx use serialization::hex_encoded::HexEncoded; use utils::tap_log::TapLog; -use crate::{FeeRate, MempoolMaxSize, TxStatus}; +use crate::{rpc_event::RpcEvent, FeeRate, MempoolMaxSize, TxStatus}; use rpc::RpcResult; @@ -102,6 +102,12 @@ trait MempoolRpc { /// Get the curve data points that represent the fee rate as a function of transaction size. #[method(name = "get_fee_rate_points")] async fn get_fee_rate_points(&self) -> RpcResult>; + + /// Subscribe to mempool events, such as tx processed. + /// + /// After a successful subscription, the node will message the subscriber with a message on every event. + #[subscription(name = "subscribe_to_events", item = RpcEvent)] + async fn subscribe_to_events(&self) -> rpc::subscription::Reply; } #[async_trait::async_trait] @@ -182,4 +188,12 @@ impl MempoolRpcServer for super::MempoolHandle { const NUM_POINTS: NonZeroUsize = NonZeroUsize::MIN.saturating_add(9); rpc::handle_result(self.call(move |this| this.get_fee_rate_points(NUM_POINTS)).await) } + + async fn subscribe_to_events( + &self, + pending: rpc::subscription::Pending, + ) -> rpc::subscription::Reply { + let event_rx = self.call_mut(move |this| this.subscribe_to_rpc_events()).await?; + rpc::subscription::connect_broadcast_map(event_rx, pending, RpcEvent::from_event).await + } } diff --git a/mempool/src/rpc_event.rs b/mempool/src/rpc_event.rs new file mode 100644 index 0000000000..c75e65d51d --- /dev/null +++ b/mempool/src/rpc_event.rs @@ -0,0 +1,97 @@ +// Copyright (c) 2024 RBB S.r.l +// opensource@mintlayer.org +// SPDX-License-Identifier: MIT +// Licensed under the MIT License; +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://github.com/mintlayer/mintlayer-core/blob/master/LICENSE +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use common::{ + chain::{Block, Transaction}, + primitives::{BlockHeight, Id}, +}; +use mempool_types::{tx_options::TxRelayPolicy, tx_origin::LocalTxOrigin}; +use p2p_types::PeerId; + +use crate::event::MempoolEvent; + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, rpc::description::HasValueHint)] +#[serde(tag = "type", content = "content")] +pub enum RpcEvent { + NewTip { + id: Id, + height: BlockHeight, + }, + TransactionProcessed { + tx_id: Id, + origin: RpcTxOrigin, + relay: RpcTxRelayPolicy, + successful: bool, + }, +} + +impl RpcEvent { + pub fn from_event(event: MempoolEvent) -> Self { + match event { + MempoolEvent::NewTip(e) => RpcEvent::NewTip { + id: *e.block_id(), + height: e.block_height(), + }, + MempoolEvent::TransactionProcessed(e) => RpcEvent::TransactionProcessed { + tx_id: *e.tx_id(), + origin: match e.origin() { + mempool_types::tx_origin::TxOrigin::Local(local_origin) => RpcTxOrigin::Local { + origin: match local_origin { + LocalTxOrigin::Mempool => RpcLocalTxOrigin::Mempool, + LocalTxOrigin::P2p => RpcLocalTxOrigin::P2p, + LocalTxOrigin::PastBlock => RpcLocalTxOrigin::PastBlock, + }, + }, + mempool_types::tx_origin::TxOrigin::Remote(r) => RpcTxOrigin::Remote { + peer_id: r.peer_id(), + }, + }, + relay: match e.relay_policy() { + TxRelayPolicy::DoRelay => RpcTxRelayPolicy::DoRelay, + TxRelayPolicy::DontRelay => RpcTxRelayPolicy::DontRelay, + }, + successful: e.result().is_ok(), + }, + } + } +} + +#[derive( + Debug, Clone, Copy, serde::Serialize, serde::Deserialize, rpc_description::HasValueHint, +)] +#[serde(tag = "type", content = "content")] +pub enum RpcTxOrigin { + Local { origin: RpcLocalTxOrigin }, + Remote { peer_id: PeerId }, +} + +#[derive( + Debug, Clone, Copy, serde::Serialize, serde::Deserialize, rpc_description::HasValueHint, +)] +#[serde(tag = "type", content = "content")] +pub enum RpcLocalTxOrigin { + Mempool, + P2p, + PastBlock, +} + +#[derive( + Debug, Clone, Copy, serde::Serialize, serde::Deserialize, rpc_description::HasValueHint, +)] +#[serde(tag = "type", content = "content")] +pub enum RpcTxRelayPolicy { + DoRelay, + DontRelay, +} diff --git a/mocks/src/mempool.rs b/mocks/src/mempool.rs index 9e93baaff4..cf8b07841d 100644 --- a/mocks/src/mempool.rs +++ b/mocks/src/mempool.rs @@ -61,7 +61,9 @@ mockall::mock! { packing_strategy: PackingStrategy, ) -> Result>, BlockConstructionError>; - fn subscribe_to_events(&mut self, handler: Arc); + fn subscribe_to_subsystem_events(&mut self, handler: Arc); + fn subscribe_to_rpc_events(&mut self) -> utils_networking::broadcaster::Receiver; + fn memory_usage(&self) -> usize; fn get_size_limit(&self) -> MempoolMaxSize; fn set_size_limit(&mut self, max_size: MempoolMaxSize) -> Result<(), Error>; diff --git a/node-daemon/docs/RPC.md b/node-daemon/docs/RPC.md index d0b25ebc6c..00cdd339c6 100644 --- a/node-daemon/docs/RPC.md +++ b/node-daemon/docs/RPC.md @@ -532,7 +532,7 @@ Returns: } ``` -### Subscription `chainstate_subscribe_events` +### Subscription `chainstate_subscribe_to_events` Subscribe to chainstate events, such as new tip. @@ -555,7 +555,7 @@ Produces: } ``` -Unsubscribe using `chainstate_unsubscribe_events`. +Unsubscribe using `chainstate_unsubscribe_to_events`. Note: Subscriptions only work over WebSockets. @@ -760,6 +760,56 @@ Returns: ], .. ] ``` +### Subscription `mempool_subscribe_to_events` + +Subscribe to mempool events, such as tx processed. + +After a successful subscription, the node will message the subscriber with a message on every event. + + +Parameters: +``` +{} +``` + +Produces: +``` +EITHER OF + 1) { + "type": "NewTip", + "content": { + "id": hex string, + "height": number, + }, + } + 2) { + "type": "TransactionProcessed", + "content": { + "tx_id": hex string, + "origin": EITHER OF + 1) { + "type": "Local", + "content": { "origin": EITHER OF + 1) { "type": "Mempool" } + 2) { "type": "P2p" } + 3) { "type": "PastBlock" } }, + } + 2) { + "type": "Remote", + "content": { "peer_id": number }, + }, + "relay": EITHER OF + 1) { "type": "DoRelay" } + 2) { "type": "DontRelay" }, + "successful": bool, + }, + } +``` + +Unsubscribe using `mempool_unsubscribe_to_events`. + +Note: Subscriptions only work over WebSockets. + ## Module `p2p` ### Method `p2p_enable_networking` diff --git a/p2p/src/sync/mod.rs b/p2p/src/sync/mod.rs index 4ec8b8b9f3..53109fbfc0 100644 --- a/p2p/src/sync/mod.rs +++ b/p2p/src/sync/mod.rs @@ -400,7 +400,7 @@ pub async fn subscribe_to_tx_processed( let subscribe_func = Arc::new(subscribe_func); mempool_handle - .call_mut(|this| this.subscribe_to_events(subscribe_func)) + .call_mut(|this| this.subscribe_to_subsystem_events(subscribe_func)) .await .map_err(|_| P2pError::SubsystemFailure)?;