Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mempool rpc events #1807

Merged
merged 5 commits into from
Oct 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion blockprod/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ mod tests {
let tip_sx = utils::sync::Mutex::new(Some(tip_sx));
mempool
.call_mut(move |m| {
m.subscribe_to_events(Arc::new({
m.subscribe_to_subsystem_events(Arc::new({
move |evt| match evt {
mempool::event::MempoolEvent::NewTip(tip) => {
if let Some(tip_sx) = tip_sx.lock().unwrap().take() {
Expand Down
6 changes: 3 additions & 3 deletions chainstate/src/rpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,8 @@ trait ChainstateRpc {
/// Subscribe to chainstate events, such as new tip.
///
/// After a successful subscription, the node will message the subscriber with a message on every event.
#[subscription(name = "subscribe_events", item = RpcEvent)]
async fn subscribe_events(&self) -> rpc::subscription::Reply;
#[subscription(name = "subscribe_to_events", item = RpcEvent)]
async fn subscribe_to_events(&self) -> rpc::subscription::Reply;
}

#[async_trait::async_trait]
Expand Down Expand Up @@ -405,7 +405,7 @@ impl ChainstateRpcServer for super::ChainstateHandle {
rpc::handle_result(self.call(move |this| this.info()).await)
}

async fn subscribe_events(&self, pending: subscription::Pending) -> subscription::Reply {
async fn subscribe_to_events(&self, pending: subscription::Pending) -> subscription::Reply {
let event_rx = self.call_mut(move |this| this.subscribe_to_rpc_events()).await?;
rpc::subscription::connect_broadcast_map(event_rx, pending, RpcEvent::from_event).await
}
Expand Down
1 change: 1 addition & 0 deletions mempool/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ serialization = { path = "../serialization" }
subsystem = { path = "../subsystem" }
tokens-accounting = { path = "../tokens-accounting" }
utils = { path = "../utils" }
utils-networking = {path = '../utils/networking'}
utxo = { path = "../utxo" }

anyhow.workspace = true
Expand Down
7 changes: 5 additions & 2 deletions mempool/src/interface/mempool_interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,11 @@ pub trait MempoolInterface: Send + Sync {
packing_strategy: PackingStrategy,
) -> Result<Option<Box<dyn TransactionAccumulator>>, BlockConstructionError>;

/// Subscribe to events emitted by mempool
fn subscribe_to_events(&mut self, handler: Arc<dyn Fn(MempoolEvent) + Send + Sync>);
/// Subscribe to events emitted by mempool subsystem
fn subscribe_to_subsystem_events(&mut self, handler: Arc<dyn Fn(MempoolEvent) + Send + Sync>);

/// Subscribe to broadcast mempool events
fn subscribe_to_rpc_events(&mut self) -> utils_networking::broadcaster::Receiver<MempoolEvent>;
Comment on lines +75 to +79
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think the naming is adequate here, because these 2 functions subscribe to exactly the same events, only the mechanism differ.

The names of the corresponsing methods of Mempool - subscribe_to_events and subscribe_to_event_broadcast - sound better.

(I'd still rename the latter though, e.g. to subscribe_to_events_via_broadcaster or something like that)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure. This is literally reverting changes from #1501. Though I agree that these events aren't subsystem's or rpc's. subscribe_to_events_subsystem?


/// Get current memory usage
fn memory_usage(&self) -> usize;
Expand Down
6 changes: 5 additions & 1 deletion mempool/src/interface/mempool_interface_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,10 +150,14 @@ impl MempoolInterface for Mempool {
self.collect_txs(tx_accumulator, transaction_ids, packing_strategy)
}

fn subscribe_to_events(&mut self, handler: Arc<dyn Fn(MempoolEvent) + Send + Sync>) {
fn subscribe_to_subsystem_events(&mut self, handler: Arc<dyn Fn(MempoolEvent) + Send + Sync>) {
self.subscribe_to_events(handler);
}

fn subscribe_to_rpc_events(&mut self) -> utils_networking::broadcaster::Receiver<MempoolEvent> {
self.subscribe_to_event_broadcast()
}

fn memory_usage(&self) -> usize {
self.memory_usage()
}
Expand Down
1 change: 1 addition & 0 deletions mempool/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ pub mod event;
mod interface;
mod pool;
pub mod rpc;
pub mod rpc_event;
pub mod tx_accumulator;

pub use {config::MempoolConfig, pool::feerate_points::find_interpolated_value, pool::FeeRate};
Expand Down
61 changes: 48 additions & 13 deletions mempool/src/pool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use common::{
};
use logging::log;
use utils::{const_value::ConstValue, ensure, eventhandler::EventsController};
use utils_networking::broadcaster;

pub use self::{feerate::FeeRate, tx_pool::feerate_points};

Expand Down Expand Up @@ -66,7 +67,7 @@ pub struct Mempool<M> {
tx_pool: tx_pool::TxPool<M>,
orphans: TxOrphanPool,
work_queue: WorkQueue,
events_controller: EventsController<MempoolEvent>,
events_broadcast: EventsBroadcast,
clock: TimeGetter,
}

Expand All @@ -89,13 +90,17 @@ impl<M> Mempool<M> {
tx_pool,
orphans: orphans::TxOrphanPool::new(),
work_queue: WorkQueue::new(),
events_controller: EventsController::new(),
events_broadcast: EventsBroadcast::new(),
clock,
}
}

pub fn subscribe_to_events(&mut self, handler: Arc<dyn Fn(MempoolEvent) + Send + Sync>) {
self.events_controller.subscribe_to_events(handler)
self.events_broadcast.subscribe_to_events(handler)
}

pub fn subscribe_to_event_broadcast(&mut self) -> broadcaster::Receiver<MempoolEvent> {
self.events_broadcast.subscribe_to_event_broadcast()
}

pub fn on_peer_disconnected(&mut self, peer_id: p2p_types::PeerId) {
Expand Down Expand Up @@ -140,11 +145,11 @@ impl<M> Mempool<M> {
tx_pool,
orphans,
work_queue,
events_controller,
events_broadcast,
clock,
} = self;

let finalizer = TxFinalizer::new(orphans, clock, events_controller, work_queue);
let finalizer = TxFinalizer::new(orphans, clock, events_broadcast, work_queue);
(tx_pool, finalizer)
}
}
Expand Down Expand Up @@ -243,7 +248,8 @@ impl<M: MemoryUsageEstimator> Mempool<M> {
})?;

let new_tip = event::NewTip::new(block_id, height);
self.events_controller.broadcast(new_tip.into());
let event = new_tip.into();
self.events_broadcast.broadcast(event);

Ok(())
}
Expand Down Expand Up @@ -281,6 +287,33 @@ impl<M: MemoryUsageEstimator> Mempool<M> {
}
}

struct EventsBroadcast {
events_controller: EventsController<MempoolEvent>,
events_broadcaster: broadcaster::Broadcaster<MempoolEvent>,
}

impl EventsBroadcast {
fn new() -> Self {
Self {
events_controller: EventsController::new(),
events_broadcaster: broadcaster::Broadcaster::new(),
}
}

fn subscribe_to_events(&mut self, handler: Arc<dyn Fn(MempoolEvent) + Send + Sync>) {
self.events_controller.subscribe_to_events(handler)
}

fn subscribe_to_event_broadcast(&mut self) -> broadcaster::Receiver<MempoolEvent> {
self.events_broadcaster.subscribe()
}

fn broadcast(&mut self, event: MempoolEvent) {
self.events_broadcaster.broadcast(&event);
self.events_controller.broadcast(event);
}
}

/// [TxFinalizer] holds data needed to finalize the transaction processing after it's been processed
/// by the transaction pool.
///
Expand All @@ -291,21 +324,21 @@ impl<M: MemoryUsageEstimator> Mempool<M> {
struct TxFinalizer<'a> {
orphan_pool: &'a mut TxOrphanPool,
cur_time: Time,
events_controller: &'a EventsController<MempoolEvent>,
events_broadcast: &'a mut EventsBroadcast,
work_queue: &'a mut WorkQueue,
}

impl<'a> TxFinalizer<'a> {
pub fn new(
orphan_pool: &'a mut TxOrphanPool,
clock: &TimeGetter,
events_controller: &'a EventsController<MempoolEvent>,
events_broadcast: &'a mut EventsBroadcast,
work_queue: &'a mut WorkQueue,
) -> Self {
Self {
orphan_pool,
cur_time: clock.get_time(),
events_controller,
events_broadcast,
work_queue,
}
}
Expand All @@ -323,8 +356,9 @@ impl<'a> TxFinalizer<'a> {
log::trace!("Added transaction {tx_id}");

self.enqueue_children(transaction.tx_entry());
let evt = event::TransactionProcessed::accepted(tx_id, relay_policy, origin);
self.events_controller.broadcast(evt.into());
let event = event::TransactionProcessed::accepted(tx_id, relay_policy, origin);
let event = event.into();
self.events_broadcast.broadcast(event);
Ok(TxStatus::InMempool)
}
TxAdditionOutcome::Duplicate { transaction } => {
Expand All @@ -337,8 +371,9 @@ impl<'a> TxFinalizer<'a> {
log::trace!("Rejected transaction {tx_id}, checking orphan status");

self.try_add_orphan(tx_pool, transaction, error).inspect_err(|err| {
let evt = event::TransactionProcessed::rejected(tx_id, err.clone(), origin);
self.events_controller.broadcast(evt.into());
let event = event::TransactionProcessed::rejected(tx_id, err.clone(), origin);
let event = event.into();
self.events_broadcast.broadcast(event);
})
}
}
Expand Down
16 changes: 15 additions & 1 deletion mempool/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use mempool_types::{tx_options::TxOptionsOverrides, tx_origin::LocalTxOrigin, Tx
use serialization::hex_encoded::HexEncoded;
use utils::tap_log::TapLog;

use crate::{FeeRate, MempoolMaxSize, TxStatus};
use crate::{rpc_event::RpcEvent, FeeRate, MempoolMaxSize, TxStatus};

use rpc::RpcResult;

Expand Down Expand Up @@ -102,6 +102,12 @@ trait MempoolRpc {
/// Get the curve data points that represent the fee rate as a function of transaction size.
#[method(name = "get_fee_rate_points")]
async fn get_fee_rate_points(&self) -> RpcResult<Vec<(usize, FeeRate)>>;

/// Subscribe to mempool events, such as tx processed.
///
/// After a successful subscription, the node will message the subscriber with a message on every event.
#[subscription(name = "subscribe_to_events", item = RpcEvent)]
async fn subscribe_to_events(&self) -> rpc::subscription::Reply;
}

#[async_trait::async_trait]
Expand Down Expand Up @@ -182,4 +188,12 @@ impl MempoolRpcServer for super::MempoolHandle {
const NUM_POINTS: NonZeroUsize = NonZeroUsize::MIN.saturating_add(9);
rpc::handle_result(self.call(move |this| this.get_fee_rate_points(NUM_POINTS)).await)
}

async fn subscribe_to_events(
&self,
pending: rpc::subscription::Pending,
) -> rpc::subscription::Reply {
let event_rx = self.call_mut(move |this| this.subscribe_to_rpc_events()).await?;
rpc::subscription::connect_broadcast_map(event_rx, pending, RpcEvent::from_event).await
}
}
97 changes: 97 additions & 0 deletions mempool/src/rpc_event.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
// Copyright (c) 2024 RBB S.r.l
// [email protected]
// SPDX-License-Identifier: MIT
// Licensed under the MIT License;
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://github.com/mintlayer/mintlayer-core/blob/master/LICENSE
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use common::{
chain::{Block, Transaction},
primitives::{BlockHeight, Id},
};
use mempool_types::{tx_options::TxRelayPolicy, tx_origin::LocalTxOrigin};
use p2p_types::PeerId;

use crate::event::MempoolEvent;

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, rpc::description::HasValueHint)]
#[serde(tag = "type", content = "content")]
pub enum RpcEvent {
NewTip {
id: Id<Block>,
height: BlockHeight,
},
TransactionProcessed {
tx_id: Id<Transaction>,
origin: RpcTxOrigin,
relay: RpcTxRelayPolicy,
successful: bool,
},
}

impl RpcEvent {
pub fn from_event(event: MempoolEvent) -> Self {
match event {
MempoolEvent::NewTip(e) => RpcEvent::NewTip {
id: *e.block_id(),
height: e.block_height(),
},
MempoolEvent::TransactionProcessed(e) => RpcEvent::TransactionProcessed {
tx_id: *e.tx_id(),
origin: match e.origin() {
mempool_types::tx_origin::TxOrigin::Local(local_origin) => RpcTxOrigin::Local {
origin: match local_origin {
LocalTxOrigin::Mempool => RpcLocalTxOrigin::Mempool,
LocalTxOrigin::P2p => RpcLocalTxOrigin::P2p,
LocalTxOrigin::PastBlock => RpcLocalTxOrigin::PastBlock,
},
},
mempool_types::tx_origin::TxOrigin::Remote(r) => RpcTxOrigin::Remote {
peer_id: r.peer_id(),
},
},
relay: match e.relay_policy() {
TxRelayPolicy::DoRelay => RpcTxRelayPolicy::DoRelay,
TxRelayPolicy::DontRelay => RpcTxRelayPolicy::DontRelay,
},
successful: e.result().is_ok(),
},
}
}
}

#[derive(
Debug, Clone, Copy, serde::Serialize, serde::Deserialize, rpc_description::HasValueHint,
)]
#[serde(tag = "type", content = "content")]
pub enum RpcTxOrigin {
Local { origin: RpcLocalTxOrigin },
Remote { peer_id: PeerId },
}

#[derive(
Debug, Clone, Copy, serde::Serialize, serde::Deserialize, rpc_description::HasValueHint,
)]
#[serde(tag = "type", content = "content")]
pub enum RpcLocalTxOrigin {
Mempool,
P2p,
PastBlock,
}

#[derive(
Debug, Clone, Copy, serde::Serialize, serde::Deserialize, rpc_description::HasValueHint,
)]
#[serde(tag = "type", content = "content")]
pub enum RpcTxRelayPolicy {
DoRelay,
DontRelay,
}
4 changes: 3 additions & 1 deletion mocks/src/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,9 @@ mockall::mock! {
packing_strategy: PackingStrategy,
) -> Result<Option<Box<dyn TransactionAccumulator>>, BlockConstructionError>;

fn subscribe_to_events(&mut self, handler: Arc<dyn Fn(MempoolEvent) + Send + Sync>);
fn subscribe_to_subsystem_events(&mut self, handler: Arc<dyn Fn(MempoolEvent) + Send + Sync>);
fn subscribe_to_rpc_events(&mut self) -> utils_networking::broadcaster::Receiver<MempoolEvent>;

fn memory_usage(&self) -> usize;
fn get_size_limit(&self) -> MempoolMaxSize;
fn set_size_limit(&mut self, max_size: MempoolMaxSize) -> Result<(), Error>;
Expand Down
Loading
Loading