Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

node-rpc: Expose chainstate events #1501

Merged
merged 2 commits into from
Jan 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ members = [
"test-utils", # Various utilities for tests.
"tokens-accounting", # Tokens accounting
"utils", # Various utilities.
"utils/tokio", # Various async/tokio utilities.
"utxo", # Utxo and related utilities (cache, undo, etc.).
"wallet", # Wallet primitives.
"wallet/wallet-cli", # Wallet CLI/REPL binary.
Expand Down
2 changes: 1 addition & 1 deletion blockprod/src/detail/job_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ impl JobManager {
},
);

this.subscribe_to_events(subscribe_func);
this.subscribe_to_subsystem_events(subscribe_func);
})
.await
});
Expand Down
20 changes: 16 additions & 4 deletions blockprod/src/detail/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,10 @@ mod produce_block {
let mut mock_chainstate = MockChainstateInterface::new();
mock_chainstate.expect_is_initial_block_download().returning(|| true);

mock_chainstate.expect_subscribe_to_events().times(..=1).returning(|_| ());
mock_chainstate
.expect_subscribe_to_subsystem_events()
.times(..=1)
.returning(|_| ());

manager.add_subsystem("mock-chainstate", mock_chainstate)
};
Expand Down Expand Up @@ -353,7 +356,10 @@ mod produce_block {

let chainstate_subsystem: ChainstateHandle = {
let mut mock_chainstate = Box::new(MockChainstateInterface::new());
mock_chainstate.expect_subscribe_to_events().times(..=1).returning(|_| ());
mock_chainstate
.expect_subscribe_to_subsystem_events()
.times(..=1)
.returning(|_| ());
mock_chainstate.expect_is_initial_block_download().returning(|| false);

mock_chainstate.expect_get_best_block_index().times(1).returning(|| {
Expand Down Expand Up @@ -719,7 +725,10 @@ mod produce_block {

let chainstate_subsystem: ChainstateHandle = {
let mut mock_chainstate = MockChainstateInterface::new();
mock_chainstate.expect_subscribe_to_events().times(..=1).returning(|_| ());
mock_chainstate
.expect_subscribe_to_subsystem_events()
.times(..=1)
.returning(|_| ());
mock_chainstate.expect_is_initial_block_download().returning(|| false);

let mut expected_return_values = vec![
Expand Down Expand Up @@ -786,7 +795,10 @@ mod produce_block {

let chainstate_subsystem: ChainstateHandle = {
let mut mock_chainstate = MockChainstateInterface::new();
mock_chainstate.expect_subscribe_to_events().times(..=1).returning(|_| ());
mock_chainstate
.expect_subscribe_to_subsystem_events()
.times(..=1)
.returning(|_| ());
mock_chainstate.expect_is_initial_block_download().returning(|| false);

let mut expected_return_values = vec![
Expand Down
2 changes: 1 addition & 1 deletion build-tools/codecheck/codecheck.py
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ def check_trailing_whitespaces():
def run_checks():
return all([
disallow(SCALECODEC_RE, exclude = ['serialization/core', 'merkletree']),
disallow(JSONRPSEE_RE, exclude = ['rpc', 'wallet/wallet-node-client', 'wallet/wallet-rpc-lib/tests']),
disallow(JSONRPSEE_RE, exclude = ['rpc', 'wallet/wallet-node-client']),
check_local_licenses(),
check_crate_versions(),
check_workspace_and_package_versions_equal(),
Expand Down
1 change: 1 addition & 0 deletions chainstate/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ subsystem = {path = '../subsystem'}
tx-verifier = {path = './tx-verifier'}
tokens-accounting = {path = '../tokens-accounting'}
utils = {path = '../utils'}
utils-tokio = {path = '../utils/tokio'}
utxo = {path = '../utxo'}

async-trait.workspace = true
Expand Down
28 changes: 20 additions & 8 deletions chainstate/src/detail/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use std::{collections::VecDeque, sync::Arc};

use itertools::Itertools;
use thiserror::Error;
use utils_tokio::broadcaster;

use self::{
block_invalidation::BlockInvalidator,
Expand Down Expand Up @@ -90,7 +91,8 @@ pub struct Chainstate<S, V> {
tx_verification_strategy: V,
orphan_blocks: OrphansProxy,
custom_orphan_error_hook: Option<Arc<OrphanErrorHandler>>,
events_controller: EventsController<ChainstateEvent>,
subsystem_events: EventsController<ChainstateEvent>,
rpc_events: broadcaster::Broadcaster<ChainstateEvent>,
time_getter: TimeGetter,
is_initial_block_download_finished: SetFlag,
}
Expand All @@ -104,7 +106,7 @@ pub enum BlockSource {
impl<S: BlockchainStorage, V: TransactionVerificationStrategy> Chainstate<S, V> {
#[allow(dead_code)]
pub fn wait_for_all_events(&self) {
self.events_controller.wait_for_all_events();
self.subsystem_events.wait_for_all_events();
}

fn make_db_tx(&mut self) -> chainstate_storage::Result<ChainstateRef<TxRw<'_, S>, V>> {
Expand Down Expand Up @@ -136,7 +138,11 @@ impl<S: BlockchainStorage, V: TransactionVerificationStrategy> Chainstate<S, V>
}

pub fn subscribe_to_events(&mut self, handler: ChainstateEventHandler) {
self.events_controller.subscribe_to_events(handler);
self.subsystem_events.subscribe_to_events(handler);
}

pub fn subscribe_to_event_broadcast(&mut self) -> broadcaster::Receiver<ChainstateEvent> {
self.rpc_events.subscribe()
}

pub fn new(
Expand Down Expand Up @@ -191,14 +197,17 @@ impl<S: BlockchainStorage, V: TransactionVerificationStrategy> Chainstate<S, V>
time_getter: TimeGetter,
) -> Self {
let orphan_blocks = OrphansProxy::new(*chainstate_config.max_orphan_blocks);
let subsystem_events = EventsController::new();
let rpc_events = broadcaster::Broadcaster::new();
Self {
chain_config,
chainstate_config,
chainstate_storage,
tx_verification_strategy,
orphan_blocks,
custom_orphan_error_hook,
events_controller: EventsController::new(),
subsystem_events,
rpc_events,
time_getter,
is_initial_block_download_finished: SetFlag::new(),
}
Expand Down Expand Up @@ -231,12 +240,15 @@ impl<S: BlockchainStorage, V: TransactionVerificationStrategy> Chainstate<S, V>
Ok(())
}

fn broadcast_new_tip_event(&self, new_block_index: &Option<BlockIndex>) {
fn broadcast_new_tip_event(&mut self, new_block_index: &Option<BlockIndex>) {
match new_block_index {
Some(ref new_block_index) => {
let new_height = new_block_index.block_height();
let new_id = *new_block_index.block_id();
self.events_controller.broadcast(ChainstateEvent::NewTip(new_id, new_height))
let event = ChainstateEvent::NewTip(new_id, new_height);

self.rpc_events.broadcast(&event);
self.subsystem_events.broadcast(event);
}
None => (),
}
Expand Down Expand Up @@ -623,8 +635,8 @@ impl<S: BlockchainStorage, V: TransactionVerificationStrategy> Chainstate<S, V>
&self.orphan_blocks
}

pub fn events_controller(&self) -> &EventsController<ChainstateEvent> {
&self.events_controller
pub fn subscribers(&self) -> &[EventHandler<ChainstateEvent>] {
self.subsystem_events.subscribers()
}

pub fn is_initial_block_download(&self) -> bool {
Expand Down
9 changes: 7 additions & 2 deletions chainstate/src/interface/chainstate_interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,15 @@ use common::{
};
use pos_accounting::{DelegationData, PoolData};
use utils::eventhandler::EventHandler;
use utils_tokio::broadcaster;
use utxo::Utxo;

pub trait ChainstateInterface: Send + Sync {
fn subscribe_to_events(&mut self, handler: Arc<dyn Fn(ChainstateEvent) + Send + Sync>);
fn subscribe_to_subsystem_events(
&mut self,
handler: Arc<dyn Fn(ChainstateEvent) + Send + Sync>,
);
fn subscribe_to_rpc_events(&mut self) -> broadcaster::Receiver<ChainstateEvent>;
fn process_block(
&mut self,
block: Block,
Expand Down Expand Up @@ -115,7 +120,7 @@ pub trait ChainstateInterface: Send + Sync {
fn get_chain_config(&self) -> &Arc<ChainConfig>;
fn get_chainstate_config(&self) -> ChainstateConfig;
fn wait_for_all_events(&self);
fn subscribers(&self) -> &Vec<EventHandler<ChainstateEvent>>;
fn subscribers(&self) -> &[EventHandler<ChainstateEvent>];
fn calculate_median_time_past(
&self,
starting_block: &Id<GenBlock>,
Expand Down
11 changes: 8 additions & 3 deletions chainstate/src/interface/chainstate_interface_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ use common::{
};
use pos_accounting::{DelegationData, PoSAccountingView, PoolData};
use utils::eventhandler::EventHandler;
use utils_tokio::broadcaster;
use utxo::{Utxo, UtxosView};

pub struct ChainstateInterfaceImpl<S, V> {
Expand All @@ -58,10 +59,14 @@ where
S: BlockchainStorage + Sync,
V: TransactionVerificationStrategy + Sync,
{
fn subscribe_to_events(&mut self, handler: EventHandler<ChainstateEvent>) {
fn subscribe_to_subsystem_events(&mut self, handler: EventHandler<ChainstateEvent>) {
self.chainstate.subscribe_to_events(handler)
}

fn subscribe_to_rpc_events(&mut self) -> broadcaster::Receiver<ChainstateEvent> {
self.chainstate.subscribe_to_event_broadcast()
}

fn process_block(
&mut self,
block: Block,
Expand Down Expand Up @@ -292,8 +297,8 @@ where
self.chainstate.wait_for_all_events()
}

fn subscribers(&self) -> &Vec<EventHandler<ChainstateEvent>> {
self.chainstate.events_controller().subscribers()
fn subscribers(&self) -> &[EventHandler<ChainstateEvent>] {
self.chainstate.subscribers()
}

fn calculate_median_time_past(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use common::{
};
use pos_accounting::{DelegationData, PoolData};
use utils::eventhandler::EventHandler;
use utils_tokio::broadcaster;
use utxo::Utxo;

use crate::{
Expand All @@ -43,8 +44,15 @@ impl<T: Deref + DerefMut + Send + Sync> ChainstateInterface for T
where
T::Target: ChainstateInterface,
{
fn subscribe_to_events(&mut self, handler: Arc<dyn Fn(ChainstateEvent) + Send + Sync>) {
self.deref_mut().subscribe_to_events(handler)
fn subscribe_to_subsystem_events(
&mut self,
handler: Arc<dyn Fn(ChainstateEvent) + Send + Sync>,
) {
self.deref_mut().subscribe_to_subsystem_events(handler)
}

fn subscribe_to_rpc_events(&mut self) -> broadcaster::Receiver<ChainstateEvent> {
self.deref_mut().subscribe_to_rpc_events()
}

fn process_block(
Expand Down Expand Up @@ -176,7 +184,7 @@ where
self.deref().wait_for_all_events()
}

fn subscribers(&self) -> &Vec<EventHandler<ChainstateEvent>> {
fn subscribers(&self) -> &[EventHandler<ChainstateEvent>] {
self.deref().subscribers()
}

Expand Down
2 changes: 1 addition & 1 deletion chainstate/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ pub use detail::tx_verification_strategy::*;
pub use interface::{chainstate_interface, chainstate_interface_impl_delegation};
pub use tx_verifier;

#[derive(Debug, Clone)]
#[derive(Debug, Clone, Eq, PartialEq)]
pub enum ChainstateEvent {
NewTip(Id<Block>, BlockHeight),
}
Expand Down
12 changes: 10 additions & 2 deletions chainstate/src/rpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use std::{
sync::Arc,
};

use self::types::block::RpcBlock;
use self::types::{block::RpcBlock, event::RpcEvent};
use crate::{Block, BlockSource, ChainInfo, GenBlock};
use chainstate_types::BlockIndex;
use common::{
Expand All @@ -34,7 +34,7 @@ use common::{
},
primitives::{Amount, BlockHeight, Id},
};
use rpc::RpcResult;
use rpc::{subscription, RpcResult};
use serialization::hex_encoded::HexEncoded;

#[rpc::rpc(server, client, namespace = "chainstate")]
Expand Down Expand Up @@ -127,6 +127,9 @@ trait ChainstateRpc {
/// Return information about the chain.
#[method(name = "info")]
async fn info(&self) -> RpcResult<ChainInfo>;

#[subscription(name = "subscribe_events", item = RpcEvent)]
async fn subscribe_events(&self) -> rpc::subscription::Reply;
}

#[async_trait::async_trait]
Expand Down Expand Up @@ -282,6 +285,11 @@ impl ChainstateRpcServer for super::ChainstateHandle {
async fn info(&self) -> RpcResult<ChainInfo> {
rpc::handle_result(self.call(move |this| this.info()).await)
}

async fn subscribe_events(&self, pending: subscription::Pending) -> subscription::Reply {
let event_rx = self.call_mut(move |this| this.subscribe_to_rpc_events()).await?;
rpc::subscription::connect_broadcast_map(event_rx, pending, RpcEvent::from_event).await
}
}

#[cfg(test)]
Expand Down
Loading
Loading