Skip to content

Commit

Permalink
chainstate: Rename event handlers
Browse files Browse the repository at this point in the history
  • Loading branch information
iljakuklic committed Jan 26, 2024
1 parent 67dd436 commit 3d1ac03
Show file tree
Hide file tree
Showing 13 changed files with 58 additions and 39 deletions.
2 changes: 1 addition & 1 deletion blockprod/src/detail/job_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ impl JobManager {
},
);

this.subscribe_to_events(subscribe_func);
this.subscribe_to_subsystem_events(subscribe_func);
})
.await
});
Expand Down
20 changes: 16 additions & 4 deletions blockprod/src/detail/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,10 @@ mod produce_block {
let mut mock_chainstate = MockChainstateInterface::new();
mock_chainstate.expect_is_initial_block_download().returning(|| true);

mock_chainstate.expect_subscribe_to_events().times(..=1).returning(|_| ());
mock_chainstate
.expect_subscribe_to_subsystem_events()
.times(..=1)
.returning(|_| ());

manager.add_subsystem("mock-chainstate", mock_chainstate)
};
Expand Down Expand Up @@ -353,7 +356,10 @@ mod produce_block {

let chainstate_subsystem: ChainstateHandle = {
let mut mock_chainstate = Box::new(MockChainstateInterface::new());
mock_chainstate.expect_subscribe_to_events().times(..=1).returning(|_| ());
mock_chainstate
.expect_subscribe_to_subsystem_events()
.times(..=1)
.returning(|_| ());
mock_chainstate.expect_is_initial_block_download().returning(|| false);

mock_chainstate.expect_get_best_block_index().times(1).returning(|| {
Expand Down Expand Up @@ -719,7 +725,10 @@ mod produce_block {

let chainstate_subsystem: ChainstateHandle = {
let mut mock_chainstate = MockChainstateInterface::new();
mock_chainstate.expect_subscribe_to_events().times(..=1).returning(|_| ());
mock_chainstate
.expect_subscribe_to_subsystem_events()
.times(..=1)
.returning(|_| ());
mock_chainstate.expect_is_initial_block_download().returning(|| false);

let mut expected_return_values = vec![
Expand Down Expand Up @@ -786,7 +795,10 @@ mod produce_block {

let chainstate_subsystem: ChainstateHandle = {
let mut mock_chainstate = MockChainstateInterface::new();
mock_chainstate.expect_subscribe_to_events().times(..=1).returning(|_| ());
mock_chainstate
.expect_subscribe_to_subsystem_events()
.times(..=1)
.returning(|_| ());
mock_chainstate.expect_is_initial_block_download().returning(|| false);

let mut expected_return_values = vec![
Expand Down
24 changes: 12 additions & 12 deletions chainstate/src/detail/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,10 @@ pub struct Chainstate<S, V> {
tx_verification_strategy: V,
orphan_blocks: OrphansProxy,
custom_orphan_error_hook: Option<Arc<OrphanErrorHandler>>,
events_controller: EventsController<ChainstateEvent>,
subsystem_events: EventsController<ChainstateEvent>,
rpc_events: broadcaster::Broadcaster<ChainstateEvent>,
time_getter: TimeGetter,
is_initial_block_download_finished: SetFlag,
event_broadcast: broadcaster::Broadcaster<ChainstateEvent>,
}

#[derive(Copy, Clone, Eq, Debug, PartialEq)]
Expand All @@ -106,7 +106,7 @@ pub enum BlockSource {
impl<S: BlockchainStorage, V: TransactionVerificationStrategy> Chainstate<S, V> {
#[allow(dead_code)]
pub fn wait_for_all_events(&self) {
self.events_controller.wait_for_all_events();
self.subsystem_events.wait_for_all_events();
}

fn make_db_tx(&mut self) -> chainstate_storage::Result<ChainstateRef<TxRw<'_, S>, V>> {
Expand Down Expand Up @@ -138,11 +138,11 @@ impl<S: BlockchainStorage, V: TransactionVerificationStrategy> Chainstate<S, V>
}

pub fn subscribe_to_events(&mut self, handler: ChainstateEventHandler) {
self.events_controller.subscribe_to_events(handler);
self.subsystem_events.subscribe_to_events(handler);
}

pub fn subscribe_to_event_broadcast(&mut self) -> broadcaster::Receiver<ChainstateEvent> {
self.event_broadcast.subscribe()
self.rpc_events.subscribe()
}

pub fn new(
Expand Down Expand Up @@ -197,17 +197,17 @@ impl<S: BlockchainStorage, V: TransactionVerificationStrategy> Chainstate<S, V>
time_getter: TimeGetter,
) -> Self {
let orphan_blocks = OrphansProxy::new(*chainstate_config.max_orphan_blocks);
let events_controller = EventsController::new();
let event_broadcast = broadcaster::Broadcaster::new();
let subsystem_events = EventsController::new();
let rpc_events = broadcaster::Broadcaster::new();
Self {
chain_config,
chainstate_config,
chainstate_storage,
tx_verification_strategy,
orphan_blocks,
custom_orphan_error_hook,
events_controller,
event_broadcast,
subsystem_events,
rpc_events,
time_getter,
is_initial_block_download_finished: SetFlag::new(),
}
Expand Down Expand Up @@ -247,8 +247,8 @@ impl<S: BlockchainStorage, V: TransactionVerificationStrategy> Chainstate<S, V>
let new_id = *new_block_index.block_id();
let event = ChainstateEvent::NewTip(new_id, new_height);

self.event_broadcast.broadcast(&event);
self.events_controller.broadcast(event);
self.rpc_events.broadcast(&event);
self.subsystem_events.broadcast(event);
}
None => (),
}
Expand Down Expand Up @@ -636,7 +636,7 @@ impl<S: BlockchainStorage, V: TransactionVerificationStrategy> Chainstate<S, V>
}

pub fn subscribers(&self) -> &[EventHandler<ChainstateEvent>] {
self.events_controller.subscribers()
self.subsystem_events.subscribers()
}

pub fn is_initial_block_download(&self) -> bool {
Expand Down
8 changes: 5 additions & 3 deletions chainstate/src/interface/chainstate_interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,11 @@ use utils_tokio::broadcaster;
use utxo::Utxo;

pub trait ChainstateInterface: Send + Sync {
// TODO: The two following should be unified
fn subscribe_to_events(&mut self, handler: Arc<dyn Fn(ChainstateEvent) + Send + Sync>);
fn subscribe_to_event_broadcast(&mut self) -> broadcaster::Receiver<ChainstateEvent>;
fn subscribe_to_subsystem_events(
&mut self,
handler: Arc<dyn Fn(ChainstateEvent) + Send + Sync>,
);
fn subscribe_to_rpc_events(&mut self) -> broadcaster::Receiver<ChainstateEvent>;
fn process_block(
&mut self,
block: Block,
Expand Down
4 changes: 2 additions & 2 deletions chainstate/src/interface/chainstate_interface_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,11 @@ where
S: BlockchainStorage + Sync,
V: TransactionVerificationStrategy + Sync,
{
fn subscribe_to_events(&mut self, handler: EventHandler<ChainstateEvent>) {
fn subscribe_to_subsystem_events(&mut self, handler: EventHandler<ChainstateEvent>) {
self.chainstate.subscribe_to_events(handler)
}

fn subscribe_to_event_broadcast(&mut self) -> broadcaster::Receiver<ChainstateEvent> {
fn subscribe_to_rpc_events(&mut self) -> broadcaster::Receiver<ChainstateEvent> {
self.chainstate.subscribe_to_event_broadcast()
}

Expand Down
11 changes: 7 additions & 4 deletions chainstate/src/interface/chainstate_interface_impl_delegation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,15 @@ impl<T: Deref + DerefMut + Send + Sync> ChainstateInterface for T
where
T::Target: ChainstateInterface,
{
fn subscribe_to_events(&mut self, handler: Arc<dyn Fn(ChainstateEvent) + Send + Sync>) {
self.deref_mut().subscribe_to_events(handler)
fn subscribe_to_subsystem_events(
&mut self,
handler: Arc<dyn Fn(ChainstateEvent) + Send + Sync>,
) {
self.deref_mut().subscribe_to_subsystem_events(handler)
}

fn subscribe_to_event_broadcast(&mut self) -> broadcaster::Receiver<ChainstateEvent> {
self.deref_mut().subscribe_to_event_broadcast()
fn subscribe_to_rpc_events(&mut self) -> broadcaster::Receiver<ChainstateEvent> {
self.deref_mut().subscribe_to_rpc_events()
}

fn process_block(
Expand Down
2 changes: 1 addition & 1 deletion chainstate/src/rpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ impl ChainstateRpcServer for super::ChainstateHandle {
}

async fn subscribe_events(&self, pending: subscription::Pending) -> subscription::Reply {
let event_rx = self.call_mut(move |this| this.subscribe_to_event_broadcast()).await?;
let event_rx = self.call_mut(move |this| this.subscribe_to_rpc_events()).await?;
rpc::subscription::connect_broadcast_map(event_rx, pending, RpcEvent::from_event).await
}
}
Expand Down
4 changes: 2 additions & 2 deletions chainstate/test-suite/src/tests/events_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ fn subscribe(chainstate: &mut TestChainstate, n: usize) -> EventList {
events_.lock().unwrap().push((block_id, block_height));
}
});
chainstate.subscribe_to_events(handler);
chainstate.subscribe_to_subsystem_events(handler);
}

events
Expand All @@ -252,7 +252,7 @@ async fn several_subscribers_several_events_broadcaster(#[case] seed: Seed) {
let blocks = rng.gen_range(8..128);

let mut receivers: Vec<_> =
(0..subscribers).map(|_| tf.chainstate.subscribe_to_event_broadcast()).collect();
(0..subscribers).map(|_| tf.chainstate.subscribe_to_rpc_events()).collect();

let event_processor = tokio::spawn(async move {
let mut events = vec![Vec::new(); receivers.len()];
Expand Down
2 changes: 1 addition & 1 deletion chainstate/test-suite/src/tests/reorgs_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,7 @@ fn subscribe_to_events(tf: &mut TestFramework, events: &EventList) {
}
},
);
tf.chainstate.subscribe_to_events(subscribe_func);
tf.chainstate.subscribe_to_subsystem_events(subscribe_func);
}

fn check_block_reorg_state(
Expand Down
2 changes: 1 addition & 1 deletion mempool/src/interface/mempool_interface_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ impl MempoolInit {

mempool
.chainstate_handle()
.call_mut(|this| this.subscribe_to_events(subscribe_func))
.call_mut(|this| this.subscribe_to_subsystem_events(subscribe_func))
.await?;

Ok(mempool)
Expand Down
4 changes: 2 additions & 2 deletions mocks/src/chainstate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ mockall::mock! {
pub ChainstateInterface {}

impl ChainstateInterface for ChainstateInterface {
fn subscribe_to_events(&mut self, handler: Arc<dyn Fn(ChainstateEvent) + Send + Sync>);
fn subscribe_to_event_broadcast(&mut self) -> utils_tokio::broadcaster::Receiver<ChainstateEvent>;
fn subscribe_to_subsystem_events(&mut self, handler: Arc<dyn Fn(ChainstateEvent) + Send + Sync>);
fn subscribe_to_rpc_events(&mut self) -> utils_tokio::broadcaster::Receiver<ChainstateEvent>;
fn process_block(&mut self, block: Block, source: BlockSource) -> Result<Option<BlockIndex>, ChainstateError>;
fn invalidate_block(&mut self, block_id: &Id<Block>) -> Result<(), ChainstateError>;
fn reset_block_failure_flags(&mut self, block_id: &Id<Block>) -> Result<(), ChainstateError>;
Expand Down
12 changes: 7 additions & 5 deletions node-gui/src/backend/chainstate_event_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,13 @@ impl ChainstateEventHandler {
let (chainstate_event_tx, chainstate_event_rx) = unbounded_channel();
chainstate
.call_mut(|this| {
this.subscribe_to_events(Arc::new(move |chainstate_event: ChainstateEvent| {
_ = chainstate_event_tx
.send(chainstate_event)
.log_err_pfx("Chainstate subscriber failed to send new tip");
}));
this.subscribe_to_subsystem_events(Arc::new(
move |chainstate_event: ChainstateEvent| {
_ = chainstate_event_tx
.send(chainstate_event)
.log_err_pfx("Chainstate subscriber failed to send new tip");
},
));
})
.await
.expect("Failed to subscribe to chainstate");
Expand Down
2 changes: 1 addition & 1 deletion p2p/src/sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,7 @@ pub async fn subscribe_to_new_tip(

chainstate_handle
.call_mut(|this| {
this.subscribe_to_events(subscribe_func);
this.subscribe_to_subsystem_events(subscribe_func);
Ok(())
})
.await?;
Expand Down

0 comments on commit 3d1ac03

Please sign in to comment.