diff --git a/Cargo.lock b/Cargo.lock index 108488516b..4983f40e5e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -942,6 +942,7 @@ dependencies = [ "tokio", "tx-verifier", "utils", + "utils-tokio", "utxo", ] @@ -1028,6 +1029,7 @@ dependencies = [ "serialization", "test-utils", "tokens-accounting", + "tokio", "tx-verifier", "utils", "utxo", @@ -3800,6 +3802,7 @@ dependencies = [ "tokens-accounting", "tokio", "utils", + "utils-tokio", "utxo", ] @@ -5432,6 +5435,7 @@ dependencies = [ "tower", "tower-http", "utils", + "utils-tokio", ] [[package]] @@ -7332,6 +7336,13 @@ dependencies = [ "zeroize", ] +[[package]] +name = "utils-tokio" +version = "0.2.0" +dependencies = [ + "tokio", +] + [[package]] name = "utxo" version = "0.2.0" @@ -7610,6 +7621,7 @@ dependencies = [ "thiserror", "tokio", "utils", + "utils-tokio", "wallet", "wallet-controller", "wallet-test-node", diff --git a/Cargo.toml b/Cargo.toml index 0e5d2e19b8..ffef3b62a7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -55,6 +55,7 @@ members = [ "test-utils", # Various utilities for tests. "tokens-accounting", # Tokens accounting "utils", # Various utilities. + "utils/tokio", # Various async/tokio utilities. "utxo", # Utxo and related utilities (cache, undo, etc.). "wallet", # Wallet primitives. "wallet/wallet-cli", # Wallet CLI/REPL binary. diff --git a/blockprod/src/detail/job_manager/mod.rs b/blockprod/src/detail/job_manager/mod.rs index 01a1722a6a..0bab4391fb 100644 --- a/blockprod/src/detail/job_manager/mod.rs +++ b/blockprod/src/detail/job_manager/mod.rs @@ -299,7 +299,7 @@ impl JobManager { }, ); - this.subscribe_to_events(subscribe_func); + this.subscribe_to_subsystem_events(subscribe_func); }) .await }); diff --git a/blockprod/src/detail/tests.rs b/blockprod/src/detail/tests.rs index 9a2b0fb755..8a16069f0a 100644 --- a/blockprod/src/detail/tests.rs +++ b/blockprod/src/detail/tests.rs @@ -257,7 +257,10 @@ mod produce_block { let mut mock_chainstate = MockChainstateInterface::new(); mock_chainstate.expect_is_initial_block_download().returning(|| true); - mock_chainstate.expect_subscribe_to_events().times(..=1).returning(|_| ()); + mock_chainstate + .expect_subscribe_to_subsystem_events() + .times(..=1) + .returning(|_| ()); manager.add_subsystem("mock-chainstate", mock_chainstate) }; @@ -353,7 +356,10 @@ mod produce_block { let chainstate_subsystem: ChainstateHandle = { let mut mock_chainstate = Box::new(MockChainstateInterface::new()); - mock_chainstate.expect_subscribe_to_events().times(..=1).returning(|_| ()); + mock_chainstate + .expect_subscribe_to_subsystem_events() + .times(..=1) + .returning(|_| ()); mock_chainstate.expect_is_initial_block_download().returning(|| false); mock_chainstate.expect_get_best_block_index().times(1).returning(|| { @@ -719,7 +725,10 @@ mod produce_block { let chainstate_subsystem: ChainstateHandle = { let mut mock_chainstate = MockChainstateInterface::new(); - mock_chainstate.expect_subscribe_to_events().times(..=1).returning(|_| ()); + mock_chainstate + .expect_subscribe_to_subsystem_events() + .times(..=1) + .returning(|_| ()); mock_chainstate.expect_is_initial_block_download().returning(|| false); let mut expected_return_values = vec![ @@ -786,7 +795,10 @@ mod produce_block { let chainstate_subsystem: ChainstateHandle = { let mut mock_chainstate = MockChainstateInterface::new(); - mock_chainstate.expect_subscribe_to_events().times(..=1).returning(|_| ()); + mock_chainstate + .expect_subscribe_to_subsystem_events() + .times(..=1) + .returning(|_| ()); mock_chainstate.expect_is_initial_block_download().returning(|| false); let mut expected_return_values = vec![ diff --git a/build-tools/codecheck/codecheck.py b/build-tools/codecheck/codecheck.py index 59eef02206..cf58737799 100755 --- a/build-tools/codecheck/codecheck.py +++ b/build-tools/codecheck/codecheck.py @@ -360,7 +360,7 @@ def check_trailing_whitespaces(): def run_checks(): return all([ disallow(SCALECODEC_RE, exclude = ['serialization/core', 'merkletree']), - disallow(JSONRPSEE_RE, exclude = ['rpc', 'wallet/wallet-node-client', 'wallet/wallet-rpc-lib/tests']), + disallow(JSONRPSEE_RE, exclude = ['rpc', 'wallet/wallet-node-client']), check_local_licenses(), check_crate_versions(), check_workspace_and_package_versions_equal(), diff --git a/chainstate/Cargo.toml b/chainstate/Cargo.toml index a36646a4c4..d3dc4f0fef 100644 --- a/chainstate/Cargo.toml +++ b/chainstate/Cargo.toml @@ -20,6 +20,7 @@ subsystem = {path = '../subsystem'} tx-verifier = {path = './tx-verifier'} tokens-accounting = {path = '../tokens-accounting'} utils = {path = '../utils'} +utils-tokio = {path = '../utils/tokio'} utxo = {path = '../utxo'} async-trait.workspace = true diff --git a/chainstate/src/detail/mod.rs b/chainstate/src/detail/mod.rs index 2832e2fe1c..261af811f2 100644 --- a/chainstate/src/detail/mod.rs +++ b/chainstate/src/detail/mod.rs @@ -30,6 +30,7 @@ use std::{collections::VecDeque, sync::Arc}; use itertools::Itertools; use thiserror::Error; +use utils_tokio::broadcaster; use self::{ block_invalidation::BlockInvalidator, @@ -90,7 +91,8 @@ pub struct Chainstate { tx_verification_strategy: V, orphan_blocks: OrphansProxy, custom_orphan_error_hook: Option>, - events_controller: EventsController, + subsystem_events: EventsController, + rpc_events: broadcaster::Broadcaster, time_getter: TimeGetter, is_initial_block_download_finished: SetFlag, } @@ -104,7 +106,7 @@ pub enum BlockSource { impl Chainstate { #[allow(dead_code)] pub fn wait_for_all_events(&self) { - self.events_controller.wait_for_all_events(); + self.subsystem_events.wait_for_all_events(); } fn make_db_tx(&mut self) -> chainstate_storage::Result, V>> { @@ -136,7 +138,11 @@ impl Chainstate } pub fn subscribe_to_events(&mut self, handler: ChainstateEventHandler) { - self.events_controller.subscribe_to_events(handler); + self.subsystem_events.subscribe_to_events(handler); + } + + pub fn subscribe_to_event_broadcast(&mut self) -> broadcaster::Receiver { + self.rpc_events.subscribe() } pub fn new( @@ -191,6 +197,8 @@ impl Chainstate time_getter: TimeGetter, ) -> Self { let orphan_blocks = OrphansProxy::new(*chainstate_config.max_orphan_blocks); + let subsystem_events = EventsController::new(); + let rpc_events = broadcaster::Broadcaster::new(); Self { chain_config, chainstate_config, @@ -198,7 +206,8 @@ impl Chainstate tx_verification_strategy, orphan_blocks, custom_orphan_error_hook, - events_controller: EventsController::new(), + subsystem_events, + rpc_events, time_getter, is_initial_block_download_finished: SetFlag::new(), } @@ -231,12 +240,15 @@ impl Chainstate Ok(()) } - fn broadcast_new_tip_event(&self, new_block_index: &Option) { + fn broadcast_new_tip_event(&mut self, new_block_index: &Option) { match new_block_index { Some(ref new_block_index) => { let new_height = new_block_index.block_height(); let new_id = *new_block_index.block_id(); - self.events_controller.broadcast(ChainstateEvent::NewTip(new_id, new_height)) + let event = ChainstateEvent::NewTip(new_id, new_height); + + self.rpc_events.broadcast(&event); + self.subsystem_events.broadcast(event); } None => (), } @@ -623,8 +635,8 @@ impl Chainstate &self.orphan_blocks } - pub fn events_controller(&self) -> &EventsController { - &self.events_controller + pub fn subscribers(&self) -> &[EventHandler] { + self.subsystem_events.subscribers() } pub fn is_initial_block_download(&self) -> bool { diff --git a/chainstate/src/interface/chainstate_interface.rs b/chainstate/src/interface/chainstate_interface.rs index 119bbae3ba..4b343c2238 100644 --- a/chainstate/src/interface/chainstate_interface.rs +++ b/chainstate/src/interface/chainstate_interface.rs @@ -32,10 +32,15 @@ use common::{ }; use pos_accounting::{DelegationData, PoolData}; use utils::eventhandler::EventHandler; +use utils_tokio::broadcaster; use utxo::Utxo; pub trait ChainstateInterface: Send + Sync { - fn subscribe_to_events(&mut self, handler: Arc); + fn subscribe_to_subsystem_events( + &mut self, + handler: Arc, + ); + fn subscribe_to_rpc_events(&mut self) -> broadcaster::Receiver; fn process_block( &mut self, block: Block, @@ -115,7 +120,7 @@ pub trait ChainstateInterface: Send + Sync { fn get_chain_config(&self) -> &Arc; fn get_chainstate_config(&self) -> ChainstateConfig; fn wait_for_all_events(&self); - fn subscribers(&self) -> &Vec>; + fn subscribers(&self) -> &[EventHandler]; fn calculate_median_time_past( &self, starting_block: &Id, diff --git a/chainstate/src/interface/chainstate_interface_impl.rs b/chainstate/src/interface/chainstate_interface_impl.rs index ab3f18dfdf..6f2cbea2dd 100644 --- a/chainstate/src/interface/chainstate_interface_impl.rs +++ b/chainstate/src/interface/chainstate_interface_impl.rs @@ -41,6 +41,7 @@ use common::{ }; use pos_accounting::{DelegationData, PoSAccountingView, PoolData}; use utils::eventhandler::EventHandler; +use utils_tokio::broadcaster; use utxo::{Utxo, UtxosView}; pub struct ChainstateInterfaceImpl { @@ -58,10 +59,14 @@ where S: BlockchainStorage + Sync, V: TransactionVerificationStrategy + Sync, { - fn subscribe_to_events(&mut self, handler: EventHandler) { + fn subscribe_to_subsystem_events(&mut self, handler: EventHandler) { self.chainstate.subscribe_to_events(handler) } + fn subscribe_to_rpc_events(&mut self) -> broadcaster::Receiver { + self.chainstate.subscribe_to_event_broadcast() + } + fn process_block( &mut self, block: Block, @@ -292,8 +297,8 @@ where self.chainstate.wait_for_all_events() } - fn subscribers(&self) -> &Vec> { - self.chainstate.events_controller().subscribers() + fn subscribers(&self) -> &[EventHandler] { + self.chainstate.subscribers() } fn calculate_median_time_past( diff --git a/chainstate/src/interface/chainstate_interface_impl_delegation.rs b/chainstate/src/interface/chainstate_interface_impl_delegation.rs index 293306b21e..ef04d15ab5 100644 --- a/chainstate/src/interface/chainstate_interface_impl_delegation.rs +++ b/chainstate/src/interface/chainstate_interface_impl_delegation.rs @@ -32,6 +32,7 @@ use common::{ }; use pos_accounting::{DelegationData, PoolData}; use utils::eventhandler::EventHandler; +use utils_tokio::broadcaster; use utxo::Utxo; use crate::{ @@ -43,8 +44,15 @@ impl ChainstateInterface for T where T::Target: ChainstateInterface, { - fn subscribe_to_events(&mut self, handler: Arc) { - self.deref_mut().subscribe_to_events(handler) + fn subscribe_to_subsystem_events( + &mut self, + handler: Arc, + ) { + self.deref_mut().subscribe_to_subsystem_events(handler) + } + + fn subscribe_to_rpc_events(&mut self) -> broadcaster::Receiver { + self.deref_mut().subscribe_to_rpc_events() } fn process_block( @@ -176,7 +184,7 @@ where self.deref().wait_for_all_events() } - fn subscribers(&self) -> &Vec> { + fn subscribers(&self) -> &[EventHandler] { self.deref().subscribers() } diff --git a/chainstate/src/lib.rs b/chainstate/src/lib.rs index ce68842412..375fa32cee 100644 --- a/chainstate/src/lib.rs +++ b/chainstate/src/lib.rs @@ -47,7 +47,7 @@ pub use detail::tx_verification_strategy::*; pub use interface::{chainstate_interface, chainstate_interface_impl_delegation}; pub use tx_verifier; -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Eq, PartialEq)] pub enum ChainstateEvent { NewTip(Id, BlockHeight), } diff --git a/chainstate/src/rpc/mod.rs b/chainstate/src/rpc/mod.rs index 1719268509..d62077f271 100644 --- a/chainstate/src/rpc/mod.rs +++ b/chainstate/src/rpc/mod.rs @@ -23,7 +23,7 @@ use std::{ sync::Arc, }; -use self::types::block::RpcBlock; +use self::types::{block::RpcBlock, event::RpcEvent}; use crate::{Block, BlockSource, ChainInfo, GenBlock}; use chainstate_types::BlockIndex; use common::{ @@ -34,7 +34,7 @@ use common::{ }, primitives::{Amount, BlockHeight, Id}, }; -use rpc::RpcResult; +use rpc::{subscription, RpcResult}; use serialization::hex_encoded::HexEncoded; #[rpc::rpc(server, client, namespace = "chainstate")] @@ -127,6 +127,9 @@ trait ChainstateRpc { /// Return information about the chain. #[method(name = "info")] async fn info(&self) -> RpcResult; + + #[subscription(name = "subscribe_events", item = RpcEvent)] + async fn subscribe_events(&self) -> rpc::subscription::Reply; } #[async_trait::async_trait] @@ -282,6 +285,11 @@ impl ChainstateRpcServer for super::ChainstateHandle { async fn info(&self) -> RpcResult { rpc::handle_result(self.call(move |this| this.info()).await) } + + async fn subscribe_events(&self, pending: subscription::Pending) -> subscription::Reply { + let event_rx = self.call_mut(move |this| this.subscribe_to_rpc_events()).await?; + rpc::subscription::connect_broadcast_map(event_rx, pending, RpcEvent::from_event).await + } } #[cfg(test)] diff --git a/chainstate/src/rpc/types/event.rs b/chainstate/src/rpc/types/event.rs new file mode 100644 index 0000000000..19bc67e629 --- /dev/null +++ b/chainstate/src/rpc/types/event.rs @@ -0,0 +1,34 @@ +// Copyright (c) 2023 RBB S.r.l +// opensource@mintlayer.org +// SPDX-License-Identifier: MIT +// Licensed under the MIT License; +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://github.com/mintlayer/mintlayer-core/blob/master/LICENSE +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use common::{ + chain::Block, + primitives::{BlockHeight, Id}, +}; + +use crate::ChainstateEvent; + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub enum RpcEvent { + NewTip { id: Id, height: BlockHeight }, +} + +impl RpcEvent { + pub fn from_event(event: ChainstateEvent) -> Self { + match event { + ChainstateEvent::NewTip(id, height) => Self::NewTip { id, height }, + } + } +} diff --git a/chainstate/src/rpc/types/mod.rs b/chainstate/src/rpc/types/mod.rs index 5424c942df..ccc41abe6f 100644 --- a/chainstate/src/rpc/types/mod.rs +++ b/chainstate/src/rpc/types/mod.rs @@ -14,4 +14,5 @@ // limitations under the License. pub mod block; +pub mod event; pub mod signed_transaction; diff --git a/chainstate/test-suite/Cargo.toml b/chainstate/test-suite/Cargo.toml index 227d6cb8a9..d6f7dac657 100644 --- a/chainstate/test-suite/Cargo.toml +++ b/chainstate/test-suite/Cargo.toml @@ -33,6 +33,7 @@ itertools.workspace = true criterion.workspace = true expect-test.workspace = true rstest.workspace = true +tokio = { workspace = true, features = ['rt', 'time'] } [[bench]] name = "benches" diff --git a/chainstate/test-suite/src/tests/events_tests.rs b/chainstate/test-suite/src/tests/events_tests.rs index 4c22f72c64..9a3c87d422 100644 --- a/chainstate/test-suite/src/tests/events_tests.rs +++ b/chainstate/test-suite/src/tests/events_tests.rs @@ -225,7 +225,7 @@ fn subscribe(chainstate: &mut TestChainstate, n: usize) -> EventList { events_.lock().unwrap().push((block_id, block_height)); } }); - chainstate.subscribe_to_events(handler); + chainstate.subscribe_to_subsystem_events(handler); } events @@ -239,3 +239,51 @@ fn orphan_error_hook() -> (Arc, ErrorList) { }); (handler, errors) } + +#[rstest] +#[trace] +#[case(Seed::from_entropy())] +#[tokio::test] +async fn several_subscribers_several_events_broadcaster(#[case] seed: Seed) { + let mut rng = make_seedable_rng(seed); + let mut tf = TestFramework::builder(&mut rng).build(); + + let subscribers = rng.gen_range(4..16); + let blocks = rng.gen_range(8..128); + + let mut receivers: Vec<_> = + (0..subscribers).map(|_| tf.chainstate.subscribe_to_rpc_events()).collect(); + + let event_processor = tokio::spawn(async move { + let mut events = vec![Vec::new(); receivers.len()]; + for _ in 0..blocks { + for (idx, receiver) in receivers.iter_mut().enumerate() { + events[idx].push(receiver.recv().await.unwrap()); + } + } + for receiver in receivers.iter_mut() { + assert!(receiver.recv().await.is_none()); + } + events + }); + + let mut expected_events = Vec::new(); + for _ in 0..blocks { + let block = tf.make_block_builder().add_test_transaction_from_best_block(&mut rng).build(); + let index = tf.process_block(block.clone(), BlockSource::Local).ok().flatten().unwrap(); + expected_events.push(ChainstateEvent::NewTip( + *index.block_id(), + index.block_height(), + )); + } + + std::mem::drop(tf); + + let event_traces = tokio::time::timeout(std::time::Duration::from_secs(5), event_processor) + .await + .expect("timeout") + .expect("event processor panicked"); + + // All receivers get the same sequence of events + assert!(event_traces.into_iter().all(|t| t == expected_events)); +} diff --git a/chainstate/test-suite/src/tests/reorgs_tests.rs b/chainstate/test-suite/src/tests/reorgs_tests.rs index 2cdc74f100..a1ac0326d0 100644 --- a/chainstate/test-suite/src/tests/reorgs_tests.rs +++ b/chainstate/test-suite/src/tests/reorgs_tests.rs @@ -392,7 +392,7 @@ fn subscribe_to_events(tf: &mut TestFramework, events: &EventList) { } }, ); - tf.chainstate.subscribe_to_events(subscribe_func); + tf.chainstate.subscribe_to_subsystem_events(subscribe_func); } fn check_block_reorg_state( diff --git a/mempool/src/interface/mempool_interface_impl.rs b/mempool/src/interface/mempool_interface_impl.rs index edbaadf0a1..91c4085e56 100644 --- a/mempool/src/interface/mempool_interface_impl.rs +++ b/mempool/src/interface/mempool_interface_impl.rs @@ -81,7 +81,7 @@ impl MempoolInit { mempool .chainstate_handle() - .call_mut(|this| this.subscribe_to_events(subscribe_func)) + .call_mut(|this| this.subscribe_to_subsystem_events(subscribe_func)) .await?; Ok(mempool) diff --git a/mocks/Cargo.toml b/mocks/Cargo.toml index 7cb479900a..25b8f1e9a7 100644 --- a/mocks/Cargo.toml +++ b/mocks/Cargo.toml @@ -16,6 +16,7 @@ pos-accounting = { path = '../pos-accounting/' } subsystem = { path = "../subsystem/" } tokens-accounting = { path = '../tokens-accounting/' } utils = { path = "../utils/" } +utils-tokio = { path = "../utils/tokio" } utxo = { path = "../utxo/" } p2p-types = { path = "../p2p/types" } diff --git a/mocks/src/chainstate.rs b/mocks/src/chainstate.rs index 0b1fb95b45..dd7b182c43 100644 --- a/mocks/src/chainstate.rs +++ b/mocks/src/chainstate.rs @@ -40,7 +40,8 @@ mockall::mock! { pub ChainstateInterface {} impl ChainstateInterface for ChainstateInterface { - fn subscribe_to_events(&mut self, handler: Arc); + fn subscribe_to_subsystem_events(&mut self, handler: Arc); + fn subscribe_to_rpc_events(&mut self) -> utils_tokio::broadcaster::Receiver; fn process_block(&mut self, block: Block, source: BlockSource) -> Result, ChainstateError>; fn invalidate_block(&mut self, block_id: &Id) -> Result<(), ChainstateError>; fn reset_block_failure_flags(&mut self, block_id: &Id) -> Result<(), ChainstateError>; @@ -94,7 +95,7 @@ mockall::mock! { fn get_best_block_index(&self) -> Result; fn get_chainstate_config(&self) -> ChainstateConfig; fn wait_for_all_events(&self); - fn subscribers(&self) -> &Vec>; + fn subscribers(&self) -> &[EventHandler]; fn calculate_median_time_past(&self, starting_block: &Id) -> Result; fn is_already_an_orphan(&self, block_id: &Id) -> bool; fn orphans_count(&self) -> usize; diff --git a/node-gui/src/backend/chainstate_event_handler.rs b/node-gui/src/backend/chainstate_event_handler.rs index 1d4f76abc6..9bf9548003 100644 --- a/node-gui/src/backend/chainstate_event_handler.rs +++ b/node-gui/src/backend/chainstate_event_handler.rs @@ -36,11 +36,13 @@ impl ChainstateEventHandler { let (chainstate_event_tx, chainstate_event_rx) = unbounded_channel(); chainstate .call_mut(|this| { - this.subscribe_to_events(Arc::new(move |chainstate_event: ChainstateEvent| { - _ = chainstate_event_tx - .send(chainstate_event) - .log_err_pfx("Chainstate subscriber failed to send new tip"); - })); + this.subscribe_to_subsystem_events(Arc::new( + move |chainstate_event: ChainstateEvent| { + _ = chainstate_event_tx + .send(chainstate_event) + .log_err_pfx("Chainstate subscriber failed to send new tip"); + }, + )); }) .await .expect("Failed to subscribe to chainstate"); diff --git a/p2p/src/sync/mod.rs b/p2p/src/sync/mod.rs index be1c9e0077..628bfd7951 100644 --- a/p2p/src/sync/mod.rs +++ b/p2p/src/sync/mod.rs @@ -377,7 +377,7 @@ pub async fn subscribe_to_new_tip( chainstate_handle .call_mut(|this| { - this.subscribe_to_events(subscribe_func); + this.subscribe_to_subsystem_events(subscribe_func); Ok(()) }) .await?; diff --git a/rpc/Cargo.toml b/rpc/Cargo.toml index 8a0d74344b..57793f163f 100644 --- a/rpc/Cargo.toml +++ b/rpc/Cargo.toml @@ -5,13 +5,15 @@ version.workspace = true edition.workspace = true rust-version.workspace = true -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html +[features] +test-support = [] [dependencies] crypto = { path = "../crypto/" } logging = { path = "../logging" } subsystem = { path = "../subsystem" } utils = { path = "../utils/" } +utils-tokio = { path = "../utils/tokio" } anyhow.workspace = true async-trait.workspace = true diff --git a/rpc/src/lib.rs b/rpc/src/lib.rs index 7c9cae22fe..c80021835a 100644 --- a/rpc/src/lib.rs +++ b/rpc/src/lib.rs @@ -40,6 +40,11 @@ use tower_http::{ }; use utils::cookie::load_cookie; +#[cfg(feature = "test-support")] +pub mod test_support { + pub use jsonrpsee::core::client::{ClientT, Subscription, SubscriptionClientT}; +} + /// The RPC subsystem builder. Used to populate the RPC server with method handlers. pub struct Builder { http_bind_address: SocketAddr, diff --git a/rpc/src/subscription.rs b/rpc/src/subscription.rs index aac4896eaa..f15ed65260 100644 --- a/rpc/src/subscription.rs +++ b/rpc/src/subscription.rs @@ -18,6 +18,8 @@ use jsonrpsee::{PendingSubscriptionAcceptError, SubscriptionMessage}; +use utils_tokio::broadcaster; + /// Pending subscription. Use [accept] to get subscription sink. pub type Pending = jsonrpsee::PendingSubscriptionSink; @@ -71,6 +73,40 @@ pub async fn accept(pending: Pending) -> Result, Error> { Sink::accept(pending).await } +/// Connect a broadcaster to event sink, transforming and filtering the events on the go +pub async fn connect_broadcast_filter_map( + mut event_receiver: broadcaster::Receiver, + pending: Pending, + mut filter_map_fn: impl FnMut(T) -> Option, +) -> Reply { + let subscription = accept::(pending).await?; + + while let Some(event) = event_receiver.recv().await { + if let Some(event) = filter_map_fn(event) { + subscription.send(&event).await?; + } + } + + Ok(()) +} + +/// Connect a broadcaster to event sink, transforming and filtering the events on the go +pub async fn connect_broadcast_map( + event_receiver: broadcaster::Receiver, + pending: Pending, + mut map_fn: impl FnMut(T) -> U, +) -> Reply { + connect_broadcast_filter_map(event_receiver, pending, |x| Some(map_fn(x))).await +} + +/// Connect a broadcaster to event sink +pub async fn connect_broadcast( + event_receiver: broadcaster::Receiver, + pending: Pending, +) -> Reply { + connect_broadcast_filter_map(event_receiver, pending, Some).await +} + /// Subscription processing error #[derive(thiserror::Error, Debug)] pub enum Error { diff --git a/utils/tokio/Cargo.toml b/utils/tokio/Cargo.toml new file mode 100644 index 0000000000..4d0ceb83f6 --- /dev/null +++ b/utils/tokio/Cargo.toml @@ -0,0 +1,14 @@ +[package] +name = "utils-tokio" +license.workspace = true +version.workspace = true +edition.workspace = true +rust-version.workspace = true + +[dependencies] + +tokio = { workspace = true, features = ["sync"] } + +[dev-dependencies] + +tokio = { workspace = true, features = ["sync", "macros", "test-util"] } diff --git a/utils/tokio/src/broadcaster.rs b/utils/tokio/src/broadcaster.rs new file mode 100644 index 0000000000..c3ab301294 --- /dev/null +++ b/utils/tokio/src/broadcaster.rs @@ -0,0 +1,145 @@ +// Copyright (c) 2024 RBB S.r.l +// opensource@mintlayer.org +// SPDX-License-Identifier: MIT +// Licensed under the MIT License; +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://github.com/mintlayer/mintlayer-core/blob/master/LICENSE +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Broadcaster is a reliable version of [tokio::sync::broadcast]. + +use tokio::sync::mpsc; + +/// A reliable version of [tokio::sync::broadcast], sender part. +/// +/// It does not have capacity limits so no messages are lost. It is achieved by using unbounded +/// channels. +pub struct Broadcaster { + senders: Vec>, + auto_purge_ticks: u32, +} + +impl Broadcaster { + const AUTO_PURGE_PERIOD: u32 = 128; + + /// New broadcaster + pub fn new() -> Self { + Self { + senders: Vec::new(), + auto_purge_ticks: 0, + } + } + + /// Add a new subscriber + pub fn subscribe(&mut self) -> Receiver { + let (tx, rx) = mpsc::unbounded_channel(); + self.subscribe_using(tx); + Receiver(rx) + } + + /// Add a new subscription that emits the events to given channel + pub fn subscribe_using(&mut self, tx: mpsc::UnboundedSender) { + self.senders.push(tx); + self.auto_purge(); + } + + /// Get the number of subscribers + /// + /// Due to how the subscriber cleanup works, this may not be completely up-to-date but is + /// always either accurate or an over-approximation. + pub fn num_subscribers(&self) -> usize { + self.senders.len() + } + + /// Purge the dead subscribers from the subscriber list + pub fn purge(&mut self) { + self.senders.retain(|sender| !sender.is_closed()) + } + + /// Purge every [Self::AUTO_PURGE_PERIOD] invocations (to amortize iteration over sender list). + fn auto_purge(&mut self) { + self.auto_purge_ticks += 1; + if self.auto_purge_ticks >= Self::AUTO_PURGE_PERIOD { + self.purge(); + self.auto_purge_ticks = 0; + } + } + + /// Broadcast a value to all subscribers + pub fn broadcast(&mut self, value: &T) + where + T: Clone, + { + // Since the broadcast has to iterate over the whole sender list, we also purge the dead + // connections as we go and reset the purge tick counter. + self.auto_purge_ticks = 0; + self.senders.retain(|sender| sender.send(value.clone()).is_ok()); + } +} + +/// Broadcast receiver +pub struct Receiver(mpsc::UnboundedReceiver); + +impl Receiver { + /// Receive a value + pub async fn recv(&mut self) -> Option { + self.0.recv().await + } + + /// Receive a value in the blocking context + pub fn blocking_recv(&mut self) -> Option { + self.0.blocking_recv() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn basic() { + const VALUES: [u32; 5] = [555, 1955420, 1, 99, 99]; + const NUM_CONSUMERS: usize = 8; + + let mut bcast = Broadcaster::::new(); + + let mut consumers = tokio::task::JoinSet::new(); + for _ in 0..NUM_CONSUMERS { + let mut sub = bcast.subscribe(); + consumers.spawn(async move { + for expected in VALUES { + assert_eq!(sub.recv().await, Some(expected)); + } + assert_eq!(sub.recv().await, None); + }); + } + + VALUES.iter().for_each(|x| bcast.broadcast(x)); + std::mem::drop(bcast); + + for _ in 0..NUM_CONSUMERS { + assert_eq!(consumers.join_next().await.map(Result::ok), Some(Some(()))); + } + assert!(consumers.join_next().await.is_none()); + } + + #[test] + fn auto_purging() { + let num_inserts = Broadcaster::<()>::AUTO_PURGE_PERIOD + 5; + + let mut bcast = Broadcaster::<()>::new(); + for _ in 0..num_inserts { + let subscriber = bcast.subscribe(); + std::mem::drop(subscriber); + } + + assert!(bcast.num_subscribers() < num_inserts as usize); + } +} diff --git a/utils/tokio/src/lib.rs b/utils/tokio/src/lib.rs new file mode 100644 index 0000000000..7ab048b384 --- /dev/null +++ b/utils/tokio/src/lib.rs @@ -0,0 +1,18 @@ +// Copyright (c) 2024 RBB S.r.l +// opensource@mintlayer.org +// SPDX-License-Identifier: MIT +// Licensed under the MIT License; +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://github.com/mintlayer/mintlayer-core/blob/master/LICENSE +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Various utils for working with async / tokio. + +pub mod broadcaster; diff --git a/wallet/wallet-rpc-lib/Cargo.toml b/wallet/wallet-rpc-lib/Cargo.toml index ac5e1bcd70..f91c62eb26 100644 --- a/wallet/wallet-rpc-lib/Cargo.toml +++ b/wallet/wallet-rpc-lib/Cargo.toml @@ -17,6 +17,7 @@ node-comm = { path = "../wallet-node-client" } rpc = { path = "../../rpc" } serialization = { path = "../../serialization" } utils = { path = "../../utils" } +utils-tokio = { path = "../../utils/tokio" } wallet = { path = ".." } wallet-controller = { path = "../wallet-controller" } wallet-types = { path = "../types" } @@ -36,6 +37,7 @@ tokio.workspace = true consensus = { path = "../../consensus" } mempool = { path = "../../mempool" } +rpc = { path = "../../rpc", features = [ "test-support" ] } subsystem = { path = "../../subsystem" } test-utils = { path = "../../test-utils" } wallet-test-node = { path = "../wallet-test-node" } diff --git a/wallet/wallet-rpc-lib/src/rpc/mod.rs b/wallet/wallet-rpc-lib/src/rpc/mod.rs index 6ac6d503e1..b2fc4cf3b6 100644 --- a/wallet/wallet-rpc-lib/src/rpc/mod.rs +++ b/wallet/wallet-rpc-lib/src/rpc/mod.rs @@ -103,19 +103,21 @@ impl WalletRpc { wallet_path: PathBuf, password: Option, ) -> WRpcResult<(), N> { - self.wallet + Ok(self + .wallet .manage_async(move |wallet_manager| { Box::pin(async move { wallet_manager.open_wallet(wallet_path, password).await }) }) - .await? + .await??) } pub async fn close_wallet(&self) -> WRpcResult<(), N> { - self.wallet + Ok(self + .wallet .manage_async(move |wallet_manager| { Box::pin(async move { wallet_manager.close_wallet() }) }) - .await? + .await??) } pub async fn set_lookahead_size( diff --git a/wallet/wallet-rpc-lib/src/rpc/server_impl.rs b/wallet/wallet-rpc-lib/src/rpc/server_impl.rs index b4b565be30..9a2c2519f9 100644 --- a/wallet/wallet-rpc-lib/src/rpc/server_impl.rs +++ b/wallet/wallet-rpc-lib/src/rpc/server_impl.rs @@ -38,7 +38,7 @@ use crate::{ StakePoolBalance, TokenMetadata, TransactionOptions, TxOptionsOverrides, UtxoInfo, VrfPublicKeyInfo, }, - Event, RpcError, + RpcError, }; #[async_trait::async_trait] @@ -713,13 +713,7 @@ impl WalletRpcServer f pending: rpc::subscription::Pending, _options: EmptyArgs, ) -> rpc::subscription::Reply { - let mut wallet_events = self.wallet.subscribe()?; - let sub = rpc::subscription::accept::(pending).await?; - - while let Some(evt) = wallet_events.recv().await { - sub.send(&evt).await?; - } - - Ok(()) + let wallet_events = self.wallet.subscribe().await?; + rpc::subscription::connect_broadcast(wallet_events, pending).await } } diff --git a/wallet/wallet-rpc-lib/src/service/handle.rs b/wallet/wallet-rpc-lib/src/service/handle.rs index 60d8488469..d0a33a2ab2 100644 --- a/wallet/wallet-rpc-lib/src/service/handle.rs +++ b/wallet/wallet-rpc-lib/src/service/handle.rs @@ -17,16 +17,16 @@ use futures::future::{BoxFuture, Future}; -use tokio::sync::mpsc; use utils::shallow_clone::ShallowClone; use wallet_controller::NodeInterface; use crate::{ service::worker::{self, WalletCommand, WalletController, WalletWorker}, types::RpcError, - Event, }; +pub use crate::service::worker::EventStream; + /// Wallet handle allows the user to control the wallet service, perform queries etc. #[derive(Clone)] pub struct WalletHandle(worker::CommandSender); @@ -66,14 +66,14 @@ impl WalletHandle { }) } - pub fn manage_async> + Send + 'static>( + pub fn manage_async( &self, - action_fn: impl FnOnce(&mut WalletWorker) -> BoxFuture> + Send + 'static, - ) -> impl Future>, SubmitError>> { + action_fn: impl FnOnce(&mut WalletWorker) -> BoxFuture + Send + 'static, + ) -> impl Future> { let (tx, rx) = tokio::sync::oneshot::channel(); let command = WalletCommand::Manage(Box::new(move |wallet_manager| { Box::pin(async move { - let _ = tx.send(action_fn(wallet_manager).await.map_err(|e| e.into())); + let _ = tx.send(action_fn(wallet_manager).await); }) })); @@ -86,10 +86,9 @@ impl WalletHandle { } /// Subscribe to wallet events - pub fn subscribe(&self) -> Result { - let (tx, rx) = mpsc::unbounded_channel(); - self.send_raw(WalletCommand::Subscribe(tx))?; - Ok(EventStream(rx)) + pub async fn subscribe(&self) -> Result { + self.manage_async(move |worker| Box::pin(async move { worker.subscribe() })) + .await } /// Stop the wallet service @@ -117,16 +116,6 @@ impl ShallowClone for WalletHandle { } } -/// A stream of wallet events -pub struct EventStream(mpsc::UnboundedReceiver); - -impl EventStream { - /// Receive an event - pub async fn recv(&mut self) -> Option { - self.0.recv().await - } -} - /// Error that can occur during wallet request submission or reply reception #[derive(thiserror::Error, Debug, Eq, PartialEq)] pub enum SubmitError { diff --git a/wallet/wallet-rpc-lib/src/service/worker.rs b/wallet/wallet-rpc-lib/src/service/worker.rs index a472b2206c..6049785c06 100644 --- a/wallet/wallet-rpc-lib/src/service/worker.rs +++ b/wallet/wallet-rpc-lib/src/service/worker.rs @@ -20,6 +20,7 @@ use futures::{future::BoxFuture, never::Never}; use tokio::{sync::mpsc, task::JoinHandle}; use logging::log; +use utils_tokio::broadcaster::Broadcaster; use wallet::wallet::Mnemonic; use wallet_controller::{ControllerError, NodeInterface}; use wallet_types::seed_phrase::StoreSeedPhrase; @@ -34,6 +35,7 @@ pub type WalletController = wallet_controller::RpcController = wallet_controller::ControllerError; pub type CommandReceiver = mpsc::UnboundedReceiver>; pub type CommandSender = mpsc::UnboundedSender>; +pub type EventStream = utils_tokio::broadcaster::Receiver; type CommandFn = dyn Send + FnOnce(&mut Option>) -> BoxFuture<()>; type ManageFn = dyn Send + FnOnce(&mut WalletWorker) -> BoxFuture<()>; @@ -46,9 +48,6 @@ pub enum WalletCommand { /// Manage the Wallet itself, i.e. Create/Open/Close Manage(Box>), - /// Subscribe to events - Subscribe(mpsc::UnboundedSender), - /// Shutdown the wallet service task Stop, } @@ -64,7 +63,7 @@ pub struct WalletWorker { command_rx: CommandReceiver, chain_config: Arc, node_rpc: N, - subscribers: Vec>, + events_bcast: Broadcaster, events_rx: mpsc::UnboundedReceiver, wallet_events: WalletServiceEvents, } @@ -78,13 +77,13 @@ impl WalletWorker { events_rx: mpsc::UnboundedReceiver, wallet_events: WalletServiceEvents, ) -> Self { - let subscribers = Vec::new(); + let events_bcast = Broadcaster::new(); Self { controller, command_rx, chain_config, node_rpc, - subscribers, + events_bcast, events_rx, wallet_events, } @@ -126,7 +125,7 @@ impl WalletWorker { // Forward events to subscribers event = self.events_rx.recv() => { match event { - Some(event) => self.broadcast(&event), + Some(event) => self.events_bcast.broadcast(&event), None => log::warn!("Events channel closed unexpectedly"), } } @@ -152,10 +151,6 @@ impl WalletWorker { call(self).await; ControlFlow::Continue(()) } - Some(WalletCommand::Subscribe(sender)) => { - self.subscribers.push(sender); - ControlFlow::Continue(()) - } Some(WalletCommand::Stop) => { log::info!("Wallet service terminating upon user request"); ControlFlow::Break(()) @@ -247,8 +242,8 @@ impl WalletWorker { Ok(result) } - fn broadcast(&mut self, event: &Event) { - self.subscribers.retain(|sub| sub.send(event.clone()).is_ok()) + pub fn subscribe(&mut self) -> EventStream { + self.events_bcast.subscribe() } async fn background_task( diff --git a/wallet/wallet-rpc-lib/tests/basic.rs b/wallet/wallet-rpc-lib/tests/basic.rs index c66f6cf7d5..10b69bfcdb 100644 --- a/wallet/wallet-rpc-lib/tests/basic.rs +++ b/wallet/wallet-rpc-lib/tests/basic.rs @@ -15,7 +15,6 @@ mod utils; -use jsonrpsee::core::client::Subscription; use logging::log; use rstest::*; @@ -24,7 +23,8 @@ use common::{ primitives::{Amount, BlockHeight, Id}, }; use utils::{ - make_seedable_rng, ClientT, JsonValue, Seed, SubscriptionClientT, ACCOUNT0_ARG, ACCOUNT1_ARG, + make_seedable_rng, ClientT, JsonValue, Seed, Subscription, SubscriptionClientT, ACCOUNT0_ARG, + ACCOUNT1_ARG, }; use wallet_rpc_lib::{ types::{ diff --git a/wallet/wallet-rpc-lib/tests/utils.rs b/wallet/wallet-rpc-lib/tests/utils.rs index 6e5857f500..73abbdedbf 100644 --- a/wallet/wallet-rpc-lib/tests/utils.rs +++ b/wallet/wallet-rpc-lib/tests/utils.rs @@ -18,9 +18,8 @@ use std::{sync::Arc, time::Duration}; use common::{ - chain::{ - config::{regtest::GenesisStakingSettings, regtest_options::ChainConfigOptions}, - ChainConfig, + chain::config::{ + regtest::GenesisStakingSettings, regtest_options::ChainConfigOptions, ChainConfig, }, primitives::BlockHeight, }; @@ -32,10 +31,8 @@ use wallet_rpc_lib::{ use wallet_test_node::{RPC_PASSWORD, RPC_USERNAME}; pub use crypto::random::Rng; -pub use jsonrpsee::{ - core::client::{ClientT, SubscriptionClientT}, - core::JsonValue, -}; +pub use rpc::test_support::{ClientT, Subscription, SubscriptionClientT}; +pub use serde_json::Value as JsonValue; pub use test_utils::random::{make_seedable_rng, Seed}; pub const ACCOUNT0_ARG: AccountIndexArg = AccountIndexArg { account: 0 };