Skip to content

Commit

Permalink
Merge pull request #1501 from mintlayer/node-events
Browse files Browse the repository at this point in the history
node-rpc: Expose chainstate events
  • Loading branch information
TheQuantumPhysicist authored Jan 26, 2024
2 parents f609834 + 3d1ac03 commit 6b53873
Show file tree
Hide file tree
Showing 35 changed files with 443 additions and 92 deletions.
12 changes: 12 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ members = [
"test-utils", # Various utilities for tests.
"tokens-accounting", # Tokens accounting
"utils", # Various utilities.
"utils/tokio", # Various async/tokio utilities.
"utxo", # Utxo and related utilities (cache, undo, etc.).
"wallet", # Wallet primitives.
"wallet/wallet-cli", # Wallet CLI/REPL binary.
Expand Down
2 changes: 1 addition & 1 deletion blockprod/src/detail/job_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ impl JobManager {
},
);

this.subscribe_to_events(subscribe_func);
this.subscribe_to_subsystem_events(subscribe_func);
})
.await
});
Expand Down
20 changes: 16 additions & 4 deletions blockprod/src/detail/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,10 @@ mod produce_block {
let mut mock_chainstate = MockChainstateInterface::new();
mock_chainstate.expect_is_initial_block_download().returning(|| true);

mock_chainstate.expect_subscribe_to_events().times(..=1).returning(|_| ());
mock_chainstate
.expect_subscribe_to_subsystem_events()
.times(..=1)
.returning(|_| ());

manager.add_subsystem("mock-chainstate", mock_chainstate)
};
Expand Down Expand Up @@ -353,7 +356,10 @@ mod produce_block {

let chainstate_subsystem: ChainstateHandle = {
let mut mock_chainstate = Box::new(MockChainstateInterface::new());
mock_chainstate.expect_subscribe_to_events().times(..=1).returning(|_| ());
mock_chainstate
.expect_subscribe_to_subsystem_events()
.times(..=1)
.returning(|_| ());
mock_chainstate.expect_is_initial_block_download().returning(|| false);

mock_chainstate.expect_get_best_block_index().times(1).returning(|| {
Expand Down Expand Up @@ -719,7 +725,10 @@ mod produce_block {

let chainstate_subsystem: ChainstateHandle = {
let mut mock_chainstate = MockChainstateInterface::new();
mock_chainstate.expect_subscribe_to_events().times(..=1).returning(|_| ());
mock_chainstate
.expect_subscribe_to_subsystem_events()
.times(..=1)
.returning(|_| ());
mock_chainstate.expect_is_initial_block_download().returning(|| false);

let mut expected_return_values = vec![
Expand Down Expand Up @@ -786,7 +795,10 @@ mod produce_block {

let chainstate_subsystem: ChainstateHandle = {
let mut mock_chainstate = MockChainstateInterface::new();
mock_chainstate.expect_subscribe_to_events().times(..=1).returning(|_| ());
mock_chainstate
.expect_subscribe_to_subsystem_events()
.times(..=1)
.returning(|_| ());
mock_chainstate.expect_is_initial_block_download().returning(|| false);

let mut expected_return_values = vec![
Expand Down
2 changes: 1 addition & 1 deletion build-tools/codecheck/codecheck.py
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ def check_trailing_whitespaces():
def run_checks():
return all([
disallow(SCALECODEC_RE, exclude = ['serialization/core', 'merkletree']),
disallow(JSONRPSEE_RE, exclude = ['rpc', 'wallet/wallet-node-client', 'wallet/wallet-rpc-lib/tests']),
disallow(JSONRPSEE_RE, exclude = ['rpc', 'wallet/wallet-node-client']),
check_local_licenses(),
check_crate_versions(),
check_workspace_and_package_versions_equal(),
Expand Down
1 change: 1 addition & 0 deletions chainstate/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ subsystem = {path = '../subsystem'}
tx-verifier = {path = './tx-verifier'}
tokens-accounting = {path = '../tokens-accounting'}
utils = {path = '../utils'}
utils-tokio = {path = '../utils/tokio'}
utxo = {path = '../utxo'}

async-trait.workspace = true
Expand Down
28 changes: 20 additions & 8 deletions chainstate/src/detail/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use std::{collections::VecDeque, sync::Arc};

use itertools::Itertools;
use thiserror::Error;
use utils_tokio::broadcaster;

use self::{
block_invalidation::BlockInvalidator,
Expand Down Expand Up @@ -90,7 +91,8 @@ pub struct Chainstate<S, V> {
tx_verification_strategy: V,
orphan_blocks: OrphansProxy,
custom_orphan_error_hook: Option<Arc<OrphanErrorHandler>>,
events_controller: EventsController<ChainstateEvent>,
subsystem_events: EventsController<ChainstateEvent>,
rpc_events: broadcaster::Broadcaster<ChainstateEvent>,
time_getter: TimeGetter,
is_initial_block_download_finished: SetFlag,
}
Expand All @@ -104,7 +106,7 @@ pub enum BlockSource {
impl<S: BlockchainStorage, V: TransactionVerificationStrategy> Chainstate<S, V> {
#[allow(dead_code)]
pub fn wait_for_all_events(&self) {
self.events_controller.wait_for_all_events();
self.subsystem_events.wait_for_all_events();
}

fn make_db_tx(&mut self) -> chainstate_storage::Result<ChainstateRef<TxRw<'_, S>, V>> {
Expand Down Expand Up @@ -136,7 +138,11 @@ impl<S: BlockchainStorage, V: TransactionVerificationStrategy> Chainstate<S, V>
}

pub fn subscribe_to_events(&mut self, handler: ChainstateEventHandler) {
self.events_controller.subscribe_to_events(handler);
self.subsystem_events.subscribe_to_events(handler);
}

pub fn subscribe_to_event_broadcast(&mut self) -> broadcaster::Receiver<ChainstateEvent> {
self.rpc_events.subscribe()
}

pub fn new(
Expand Down Expand Up @@ -191,14 +197,17 @@ impl<S: BlockchainStorage, V: TransactionVerificationStrategy> Chainstate<S, V>
time_getter: TimeGetter,
) -> Self {
let orphan_blocks = OrphansProxy::new(*chainstate_config.max_orphan_blocks);
let subsystem_events = EventsController::new();
let rpc_events = broadcaster::Broadcaster::new();
Self {
chain_config,
chainstate_config,
chainstate_storage,
tx_verification_strategy,
orphan_blocks,
custom_orphan_error_hook,
events_controller: EventsController::new(),
subsystem_events,
rpc_events,
time_getter,
is_initial_block_download_finished: SetFlag::new(),
}
Expand Down Expand Up @@ -231,12 +240,15 @@ impl<S: BlockchainStorage, V: TransactionVerificationStrategy> Chainstate<S, V>
Ok(())
}

fn broadcast_new_tip_event(&self, new_block_index: &Option<BlockIndex>) {
fn broadcast_new_tip_event(&mut self, new_block_index: &Option<BlockIndex>) {
match new_block_index {
Some(ref new_block_index) => {
let new_height = new_block_index.block_height();
let new_id = *new_block_index.block_id();
self.events_controller.broadcast(ChainstateEvent::NewTip(new_id, new_height))
let event = ChainstateEvent::NewTip(new_id, new_height);

self.rpc_events.broadcast(&event);
self.subsystem_events.broadcast(event);
}
None => (),
}
Expand Down Expand Up @@ -623,8 +635,8 @@ impl<S: BlockchainStorage, V: TransactionVerificationStrategy> Chainstate<S, V>
&self.orphan_blocks
}

pub fn events_controller(&self) -> &EventsController<ChainstateEvent> {
&self.events_controller
pub fn subscribers(&self) -> &[EventHandler<ChainstateEvent>] {
self.subsystem_events.subscribers()
}

pub fn is_initial_block_download(&self) -> bool {
Expand Down
9 changes: 7 additions & 2 deletions chainstate/src/interface/chainstate_interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,15 @@ use common::{
};
use pos_accounting::{DelegationData, PoolData};
use utils::eventhandler::EventHandler;
use utils_tokio::broadcaster;
use utxo::Utxo;

pub trait ChainstateInterface: Send + Sync {
fn subscribe_to_events(&mut self, handler: Arc<dyn Fn(ChainstateEvent) + Send + Sync>);
fn subscribe_to_subsystem_events(
&mut self,
handler: Arc<dyn Fn(ChainstateEvent) + Send + Sync>,
);
fn subscribe_to_rpc_events(&mut self) -> broadcaster::Receiver<ChainstateEvent>;
fn process_block(
&mut self,
block: Block,
Expand Down Expand Up @@ -115,7 +120,7 @@ pub trait ChainstateInterface: Send + Sync {
fn get_chain_config(&self) -> &Arc<ChainConfig>;
fn get_chainstate_config(&self) -> ChainstateConfig;
fn wait_for_all_events(&self);
fn subscribers(&self) -> &Vec<EventHandler<ChainstateEvent>>;
fn subscribers(&self) -> &[EventHandler<ChainstateEvent>];
fn calculate_median_time_past(
&self,
starting_block: &Id<GenBlock>,
Expand Down
11 changes: 8 additions & 3 deletions chainstate/src/interface/chainstate_interface_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ use common::{
};
use pos_accounting::{DelegationData, PoSAccountingView, PoolData};
use utils::eventhandler::EventHandler;
use utils_tokio::broadcaster;
use utxo::{Utxo, UtxosView};

pub struct ChainstateInterfaceImpl<S, V> {
Expand All @@ -58,10 +59,14 @@ where
S: BlockchainStorage + Sync,
V: TransactionVerificationStrategy + Sync,
{
fn subscribe_to_events(&mut self, handler: EventHandler<ChainstateEvent>) {
fn subscribe_to_subsystem_events(&mut self, handler: EventHandler<ChainstateEvent>) {
self.chainstate.subscribe_to_events(handler)
}

fn subscribe_to_rpc_events(&mut self) -> broadcaster::Receiver<ChainstateEvent> {
self.chainstate.subscribe_to_event_broadcast()
}

fn process_block(
&mut self,
block: Block,
Expand Down Expand Up @@ -292,8 +297,8 @@ where
self.chainstate.wait_for_all_events()
}

fn subscribers(&self) -> &Vec<EventHandler<ChainstateEvent>> {
self.chainstate.events_controller().subscribers()
fn subscribers(&self) -> &[EventHandler<ChainstateEvent>] {
self.chainstate.subscribers()
}

fn calculate_median_time_past(
Expand Down
14 changes: 11 additions & 3 deletions chainstate/src/interface/chainstate_interface_impl_delegation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use common::{
};
use pos_accounting::{DelegationData, PoolData};
use utils::eventhandler::EventHandler;
use utils_tokio::broadcaster;
use utxo::Utxo;

use crate::{
Expand All @@ -43,8 +44,15 @@ impl<T: Deref + DerefMut + Send + Sync> ChainstateInterface for T
where
T::Target: ChainstateInterface,
{
fn subscribe_to_events(&mut self, handler: Arc<dyn Fn(ChainstateEvent) + Send + Sync>) {
self.deref_mut().subscribe_to_events(handler)
fn subscribe_to_subsystem_events(
&mut self,
handler: Arc<dyn Fn(ChainstateEvent) + Send + Sync>,
) {
self.deref_mut().subscribe_to_subsystem_events(handler)
}

fn subscribe_to_rpc_events(&mut self) -> broadcaster::Receiver<ChainstateEvent> {
self.deref_mut().subscribe_to_rpc_events()
}

fn process_block(
Expand Down Expand Up @@ -176,7 +184,7 @@ where
self.deref().wait_for_all_events()
}

fn subscribers(&self) -> &Vec<EventHandler<ChainstateEvent>> {
fn subscribers(&self) -> &[EventHandler<ChainstateEvent>] {
self.deref().subscribers()
}

Expand Down
2 changes: 1 addition & 1 deletion chainstate/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ pub use detail::tx_verification_strategy::*;
pub use interface::{chainstate_interface, chainstate_interface_impl_delegation};
pub use tx_verifier;

#[derive(Debug, Clone)]
#[derive(Debug, Clone, Eq, PartialEq)]
pub enum ChainstateEvent {
NewTip(Id<Block>, BlockHeight),
}
Expand Down
12 changes: 10 additions & 2 deletions chainstate/src/rpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use std::{
sync::Arc,
};

use self::types::block::RpcBlock;
use self::types::{block::RpcBlock, event::RpcEvent};
use crate::{Block, BlockSource, ChainInfo, GenBlock};
use chainstate_types::BlockIndex;
use common::{
Expand All @@ -34,7 +34,7 @@ use common::{
},
primitives::{Amount, BlockHeight, Id},
};
use rpc::RpcResult;
use rpc::{subscription, RpcResult};
use serialization::hex_encoded::HexEncoded;

#[rpc::rpc(server, client, namespace = "chainstate")]
Expand Down Expand Up @@ -127,6 +127,9 @@ trait ChainstateRpc {
/// Return information about the chain.
#[method(name = "info")]
async fn info(&self) -> RpcResult<ChainInfo>;

#[subscription(name = "subscribe_events", item = RpcEvent)]
async fn subscribe_events(&self) -> rpc::subscription::Reply;
}

#[async_trait::async_trait]
Expand Down Expand Up @@ -282,6 +285,11 @@ impl ChainstateRpcServer for super::ChainstateHandle {
async fn info(&self) -> RpcResult<ChainInfo> {
rpc::handle_result(self.call(move |this| this.info()).await)
}

async fn subscribe_events(&self, pending: subscription::Pending) -> subscription::Reply {
let event_rx = self.call_mut(move |this| this.subscribe_to_rpc_events()).await?;
rpc::subscription::connect_broadcast_map(event_rx, pending, RpcEvent::from_event).await
}
}

#[cfg(test)]
Expand Down
Loading

0 comments on commit 6b53873

Please sign in to comment.