diff --git a/blockprod/src/detail/job_manager/mod.rs b/blockprod/src/detail/job_manager/mod.rs index 01a1722a6a..0bab4391fb 100644 --- a/blockprod/src/detail/job_manager/mod.rs +++ b/blockprod/src/detail/job_manager/mod.rs @@ -299,7 +299,7 @@ impl JobManager { }, ); - this.subscribe_to_events(subscribe_func); + this.subscribe_to_subsystem_events(subscribe_func); }) .await }); diff --git a/blockprod/src/detail/tests.rs b/blockprod/src/detail/tests.rs index 9a2b0fb755..8a16069f0a 100644 --- a/blockprod/src/detail/tests.rs +++ b/blockprod/src/detail/tests.rs @@ -257,7 +257,10 @@ mod produce_block { let mut mock_chainstate = MockChainstateInterface::new(); mock_chainstate.expect_is_initial_block_download().returning(|| true); - mock_chainstate.expect_subscribe_to_events().times(..=1).returning(|_| ()); + mock_chainstate + .expect_subscribe_to_subsystem_events() + .times(..=1) + .returning(|_| ()); manager.add_subsystem("mock-chainstate", mock_chainstate) }; @@ -353,7 +356,10 @@ mod produce_block { let chainstate_subsystem: ChainstateHandle = { let mut mock_chainstate = Box::new(MockChainstateInterface::new()); - mock_chainstate.expect_subscribe_to_events().times(..=1).returning(|_| ()); + mock_chainstate + .expect_subscribe_to_subsystem_events() + .times(..=1) + .returning(|_| ()); mock_chainstate.expect_is_initial_block_download().returning(|| false); mock_chainstate.expect_get_best_block_index().times(1).returning(|| { @@ -719,7 +725,10 @@ mod produce_block { let chainstate_subsystem: ChainstateHandle = { let mut mock_chainstate = MockChainstateInterface::new(); - mock_chainstate.expect_subscribe_to_events().times(..=1).returning(|_| ()); + mock_chainstate + .expect_subscribe_to_subsystem_events() + .times(..=1) + .returning(|_| ()); mock_chainstate.expect_is_initial_block_download().returning(|| false); let mut expected_return_values = vec![ @@ -786,7 +795,10 @@ mod produce_block { let chainstate_subsystem: ChainstateHandle = { let mut mock_chainstate = MockChainstateInterface::new(); - mock_chainstate.expect_subscribe_to_events().times(..=1).returning(|_| ()); + mock_chainstate + .expect_subscribe_to_subsystem_events() + .times(..=1) + .returning(|_| ()); mock_chainstate.expect_is_initial_block_download().returning(|| false); let mut expected_return_values = vec![ diff --git a/chainstate/src/detail/mod.rs b/chainstate/src/detail/mod.rs index b53aec0e67..261af811f2 100644 --- a/chainstate/src/detail/mod.rs +++ b/chainstate/src/detail/mod.rs @@ -91,10 +91,10 @@ pub struct Chainstate { tx_verification_strategy: V, orphan_blocks: OrphansProxy, custom_orphan_error_hook: Option>, - events_controller: EventsController, + subsystem_events: EventsController, + rpc_events: broadcaster::Broadcaster, time_getter: TimeGetter, is_initial_block_download_finished: SetFlag, - event_broadcast: broadcaster::Broadcaster, } #[derive(Copy, Clone, Eq, Debug, PartialEq)] @@ -106,7 +106,7 @@ pub enum BlockSource { impl Chainstate { #[allow(dead_code)] pub fn wait_for_all_events(&self) { - self.events_controller.wait_for_all_events(); + self.subsystem_events.wait_for_all_events(); } fn make_db_tx(&mut self) -> chainstate_storage::Result, V>> { @@ -138,11 +138,11 @@ impl Chainstate } pub fn subscribe_to_events(&mut self, handler: ChainstateEventHandler) { - self.events_controller.subscribe_to_events(handler); + self.subsystem_events.subscribe_to_events(handler); } pub fn subscribe_to_event_broadcast(&mut self) -> broadcaster::Receiver { - self.event_broadcast.subscribe() + self.rpc_events.subscribe() } pub fn new( @@ -197,8 +197,8 @@ impl Chainstate time_getter: TimeGetter, ) -> Self { let orphan_blocks = OrphansProxy::new(*chainstate_config.max_orphan_blocks); - let events_controller = EventsController::new(); - let event_broadcast = broadcaster::Broadcaster::new(); + let subsystem_events = EventsController::new(); + let rpc_events = broadcaster::Broadcaster::new(); Self { chain_config, chainstate_config, @@ -206,8 +206,8 @@ impl Chainstate tx_verification_strategy, orphan_blocks, custom_orphan_error_hook, - events_controller, - event_broadcast, + subsystem_events, + rpc_events, time_getter, is_initial_block_download_finished: SetFlag::new(), } @@ -247,8 +247,8 @@ impl Chainstate let new_id = *new_block_index.block_id(); let event = ChainstateEvent::NewTip(new_id, new_height); - self.event_broadcast.broadcast(&event); - self.events_controller.broadcast(event); + self.rpc_events.broadcast(&event); + self.subsystem_events.broadcast(event); } None => (), } @@ -636,7 +636,7 @@ impl Chainstate } pub fn subscribers(&self) -> &[EventHandler] { - self.events_controller.subscribers() + self.subsystem_events.subscribers() } pub fn is_initial_block_download(&self) -> bool { diff --git a/chainstate/src/interface/chainstate_interface.rs b/chainstate/src/interface/chainstate_interface.rs index 4c94d472a5..4b343c2238 100644 --- a/chainstate/src/interface/chainstate_interface.rs +++ b/chainstate/src/interface/chainstate_interface.rs @@ -36,9 +36,11 @@ use utils_tokio::broadcaster; use utxo::Utxo; pub trait ChainstateInterface: Send + Sync { - // TODO: The two following should be unified - fn subscribe_to_events(&mut self, handler: Arc); - fn subscribe_to_event_broadcast(&mut self) -> broadcaster::Receiver; + fn subscribe_to_subsystem_events( + &mut self, + handler: Arc, + ); + fn subscribe_to_rpc_events(&mut self) -> broadcaster::Receiver; fn process_block( &mut self, block: Block, diff --git a/chainstate/src/interface/chainstate_interface_impl.rs b/chainstate/src/interface/chainstate_interface_impl.rs index 53464d37d2..6f2cbea2dd 100644 --- a/chainstate/src/interface/chainstate_interface_impl.rs +++ b/chainstate/src/interface/chainstate_interface_impl.rs @@ -59,11 +59,11 @@ where S: BlockchainStorage + Sync, V: TransactionVerificationStrategy + Sync, { - fn subscribe_to_events(&mut self, handler: EventHandler) { + fn subscribe_to_subsystem_events(&mut self, handler: EventHandler) { self.chainstate.subscribe_to_events(handler) } - fn subscribe_to_event_broadcast(&mut self) -> broadcaster::Receiver { + fn subscribe_to_rpc_events(&mut self) -> broadcaster::Receiver { self.chainstate.subscribe_to_event_broadcast() } diff --git a/chainstate/src/interface/chainstate_interface_impl_delegation.rs b/chainstate/src/interface/chainstate_interface_impl_delegation.rs index a14a41f8e8..ef04d15ab5 100644 --- a/chainstate/src/interface/chainstate_interface_impl_delegation.rs +++ b/chainstate/src/interface/chainstate_interface_impl_delegation.rs @@ -44,12 +44,15 @@ impl ChainstateInterface for T where T::Target: ChainstateInterface, { - fn subscribe_to_events(&mut self, handler: Arc) { - self.deref_mut().subscribe_to_events(handler) + fn subscribe_to_subsystem_events( + &mut self, + handler: Arc, + ) { + self.deref_mut().subscribe_to_subsystem_events(handler) } - fn subscribe_to_event_broadcast(&mut self) -> broadcaster::Receiver { - self.deref_mut().subscribe_to_event_broadcast() + fn subscribe_to_rpc_events(&mut self) -> broadcaster::Receiver { + self.deref_mut().subscribe_to_rpc_events() } fn process_block( diff --git a/chainstate/src/rpc/mod.rs b/chainstate/src/rpc/mod.rs index 19a12fe572..d62077f271 100644 --- a/chainstate/src/rpc/mod.rs +++ b/chainstate/src/rpc/mod.rs @@ -287,7 +287,7 @@ impl ChainstateRpcServer for super::ChainstateHandle { } async fn subscribe_events(&self, pending: subscription::Pending) -> subscription::Reply { - let event_rx = self.call_mut(move |this| this.subscribe_to_event_broadcast()).await?; + let event_rx = self.call_mut(move |this| this.subscribe_to_rpc_events()).await?; rpc::subscription::connect_broadcast_map(event_rx, pending, RpcEvent::from_event).await } } diff --git a/chainstate/test-suite/src/tests/events_tests.rs b/chainstate/test-suite/src/tests/events_tests.rs index 38b36acf2d..9a3c87d422 100644 --- a/chainstate/test-suite/src/tests/events_tests.rs +++ b/chainstate/test-suite/src/tests/events_tests.rs @@ -225,7 +225,7 @@ fn subscribe(chainstate: &mut TestChainstate, n: usize) -> EventList { events_.lock().unwrap().push((block_id, block_height)); } }); - chainstate.subscribe_to_events(handler); + chainstate.subscribe_to_subsystem_events(handler); } events @@ -252,7 +252,7 @@ async fn several_subscribers_several_events_broadcaster(#[case] seed: Seed) { let blocks = rng.gen_range(8..128); let mut receivers: Vec<_> = - (0..subscribers).map(|_| tf.chainstate.subscribe_to_event_broadcast()).collect(); + (0..subscribers).map(|_| tf.chainstate.subscribe_to_rpc_events()).collect(); let event_processor = tokio::spawn(async move { let mut events = vec![Vec::new(); receivers.len()]; diff --git a/chainstate/test-suite/src/tests/reorgs_tests.rs b/chainstate/test-suite/src/tests/reorgs_tests.rs index 2cdc74f100..a1ac0326d0 100644 --- a/chainstate/test-suite/src/tests/reorgs_tests.rs +++ b/chainstate/test-suite/src/tests/reorgs_tests.rs @@ -392,7 +392,7 @@ fn subscribe_to_events(tf: &mut TestFramework, events: &EventList) { } }, ); - tf.chainstate.subscribe_to_events(subscribe_func); + tf.chainstate.subscribe_to_subsystem_events(subscribe_func); } fn check_block_reorg_state( diff --git a/mempool/src/interface/mempool_interface_impl.rs b/mempool/src/interface/mempool_interface_impl.rs index edbaadf0a1..91c4085e56 100644 --- a/mempool/src/interface/mempool_interface_impl.rs +++ b/mempool/src/interface/mempool_interface_impl.rs @@ -81,7 +81,7 @@ impl MempoolInit { mempool .chainstate_handle() - .call_mut(|this| this.subscribe_to_events(subscribe_func)) + .call_mut(|this| this.subscribe_to_subsystem_events(subscribe_func)) .await?; Ok(mempool) diff --git a/mocks/src/chainstate.rs b/mocks/src/chainstate.rs index 11c68e3628..dd7b182c43 100644 --- a/mocks/src/chainstate.rs +++ b/mocks/src/chainstate.rs @@ -40,8 +40,8 @@ mockall::mock! { pub ChainstateInterface {} impl ChainstateInterface for ChainstateInterface { - fn subscribe_to_events(&mut self, handler: Arc); - fn subscribe_to_event_broadcast(&mut self) -> utils_tokio::broadcaster::Receiver; + fn subscribe_to_subsystem_events(&mut self, handler: Arc); + fn subscribe_to_rpc_events(&mut self) -> utils_tokio::broadcaster::Receiver; fn process_block(&mut self, block: Block, source: BlockSource) -> Result, ChainstateError>; fn invalidate_block(&mut self, block_id: &Id) -> Result<(), ChainstateError>; fn reset_block_failure_flags(&mut self, block_id: &Id) -> Result<(), ChainstateError>; diff --git a/node-gui/src/backend/chainstate_event_handler.rs b/node-gui/src/backend/chainstate_event_handler.rs index 1d4f76abc6..9bf9548003 100644 --- a/node-gui/src/backend/chainstate_event_handler.rs +++ b/node-gui/src/backend/chainstate_event_handler.rs @@ -36,11 +36,13 @@ impl ChainstateEventHandler { let (chainstate_event_tx, chainstate_event_rx) = unbounded_channel(); chainstate .call_mut(|this| { - this.subscribe_to_events(Arc::new(move |chainstate_event: ChainstateEvent| { - _ = chainstate_event_tx - .send(chainstate_event) - .log_err_pfx("Chainstate subscriber failed to send new tip"); - })); + this.subscribe_to_subsystem_events(Arc::new( + move |chainstate_event: ChainstateEvent| { + _ = chainstate_event_tx + .send(chainstate_event) + .log_err_pfx("Chainstate subscriber failed to send new tip"); + }, + )); }) .await .expect("Failed to subscribe to chainstate"); diff --git a/p2p/src/sync/mod.rs b/p2p/src/sync/mod.rs index be1c9e0077..628bfd7951 100644 --- a/p2p/src/sync/mod.rs +++ b/p2p/src/sync/mod.rs @@ -377,7 +377,7 @@ pub async fn subscribe_to_new_tip( chainstate_handle .call_mut(|this| { - this.subscribe_to_events(subscribe_func); + this.subscribe_to_subsystem_events(subscribe_func); Ok(()) }) .await?;