diff --git a/rs/artifact_manager/src/clients.rs b/rs/artifact_manager/src/clients.rs index 953bf2991bf..b21edbbfadc 100644 --- a/rs/artifact_manager/src/clients.rs +++ b/rs/artifact_manager/src/clients.rs @@ -95,7 +95,7 @@ pub(crate) trait ArtifactManagerBackend: Send + Sync { /// Implementation struct for `ArtifactManagerBackend`. pub(crate) struct ArtifactManagerBackendImpl { /// Reference to the artifact client. - pub client: Arc>, + pub client: Box>, /// The artifact processor front end. pub processor: ArtifactProcessorManager, } diff --git a/rs/artifact_manager/src/manager.rs b/rs/artifact_manager/src/manager.rs index 8ce88803b6e..bed05de0abe 100644 --- a/rs/artifact_manager/src/manager.rs +++ b/rs/artifact_manager/src/manager.rs @@ -185,38 +185,13 @@ impl ArtifactManagerMaker { clients: HashMap::new(), } } - /// The method adds a new `ArtifactClient` (that is already wrapped in - /// `Arc`) to be managed. - pub fn add_arc_client( - &mut self, - client: Arc>, - processor: ArtifactProcessorManager, - ) where - Artifact::Message: - ChunkableArtifact + Send + TryFrom, - Advert: - Into + TryFrom + Eq, - for<'b> &'b Artifact::Id: - TryFrom<&'b artifact::ArtifactId, Error = &'b artifact::ArtifactId>, - artifact::ArtifactFilter: AsMut + AsRef, - for<'b> &'b Artifact::Attribute: - TryFrom<&'b artifact::ArtifactAttribute, Error = &'b artifact::ArtifactAttribute>, - Artifact::Attribute: 'static, - { - let tag = Artifact::TAG; - self.clients.insert( - tag, - Box::new(ArtifactManagerBackendImpl { client, processor }), - ); - } /// The method adds a new `ArtifactClient` to be managed. - pub fn add_client( + pub fn add_client( &mut self, - client: Client, + client: Box>, processor: ArtifactProcessorManager, ) where - Client: ArtifactClient, Artifact::Message: ChunkableArtifact + Send + TryFrom, Advert: @@ -231,10 +206,7 @@ impl ArtifactManagerMaker { let tag = Artifact::TAG; self.clients.insert( tag, - Box::new(ArtifactManagerBackendImpl { - client: Arc::new(client) as Arc<_>, - processor, - }), + Box::new(ArtifactManagerBackendImpl { client, processor }), ); } diff --git a/rs/artifact_manager/src/processors.rs b/rs/artifact_manager/src/processors.rs index c898aa6dcff..9ae147da5fd 100644 --- a/rs/artifact_manager/src/processors.rs +++ b/rs/artifact_manager/src/processors.rs @@ -41,29 +41,6 @@ enum AdvertSource { Relayed, } -/// A client may be either wrapped in `Box` or `Arc`. -pub enum BoxOrArcClient { - /// The client wrapped in `Box`. - BoxClient(Box>), - /// The client wrapped in `Arc`. - ArcClient(Arc + Sync + 'static>), -} - -impl BoxOrArcClient { - /// The method calls the corresponding client's `process_changes` with the - /// given time source and artifacts. - fn process_changes( - &self, - time_source: &dyn TimeSource, - artifacts: Vec>, - ) -> (Vec>, ProcessingResult) { - match self { - BoxOrArcClient::BoxClient(client) => client.process_changes(time_source, artifacts), - BoxOrArcClient::ArcClient(client) => client.process_changes(time_source, artifacts), - } - } -} - /// Metrics for a client artifact processor. struct ArtifactProcessorMetrics { /// The processing time histogram. @@ -140,7 +117,7 @@ impl ArtifactProcessorManager { pub fn new) + Send + 'static>( time_source: Arc, metrics_registry: MetricsRegistry, - client: BoxOrArcClient, + client: Box>, send_advert: S, ) -> Self where @@ -191,7 +168,7 @@ impl ArtifactProcessorManager { fn process_messages) + Send + 'static>( pending_artifacts: Arc>>>, time_source: Arc, - client: BoxOrArcClient, + client: Box>, send_advert: Box, sender: Sender, receiver: Receiver, @@ -291,7 +268,7 @@ impl let manager = ArtifactProcessorManager::new( time_source, metrics_registry, - BoxOrArcClient::BoxClient(Box::new(client)), + Box::new(client), send_advert, ); ( @@ -433,7 +410,7 @@ impl IngressProcessor { let manager = ArtifactProcessorManager::new( time_source.clone(), metrics_registry, - BoxOrArcClient::BoxClient(Box::new(client)), + Box::new(client), send_advert, ); ( @@ -559,7 +536,7 @@ impl let manager = ArtifactProcessorManager::new( time_source, metrics_registry, - BoxOrArcClient::BoxClient(Box::new(client)), + Box::new(client), send_advert, ); ( @@ -678,7 +655,7 @@ impl DkgProcessor { let manager = ArtifactProcessorManager::new( time_source, metrics_registry, - BoxOrArcClient::BoxClient(Box::new(client)), + Box::new(client), send_advert, ); (clients::DkgClient::new(dkg_pool, dkg_gossip), manager) @@ -780,7 +757,7 @@ impl EcdsaProcessor Arc; +#[derive(Clone)] pub struct ArtifactChunkingTestImpl { node_pool_dir: PathBuf, // Path to on disk pool node_id: NodeId, - file_tree_sync_unvalidated_pool: Mutex, /* In memory representaion - * on on-disk - * pool */ - file_tree_sync_validated_pool: Mutex, + file_tree_sync_unvalidated_pool: Arc>, /* In memory representaion + * on on-disk + * pool */ + file_tree_sync_validated_pool: Arc>, } impl ArtifactProcessor for ArtifactChunkingTestImpl { @@ -132,8 +133,8 @@ impl ArtifactChunkingTestImpl { ArtifactChunkingTestImpl { node_pool_dir: on_disk_pool_path, node_id, - file_tree_sync_unvalidated_pool: Mutex::new(mem_pool), - file_tree_sync_validated_pool: Mutex::new(HashMap::new()), + file_tree_sync_unvalidated_pool: Arc::new(Mutex::new(mem_pool)), + file_tree_sync_validated_pool: Arc::new(Mutex::new(HashMap::new())), } } diff --git a/rs/p2p/tests/framework/p2p_runner.rs b/rs/p2p/tests/framework/p2p_runner.rs index f802ad80b25..a61b2e17978 100755 --- a/rs/p2p/tests/framework/p2p_runner.rs +++ b/rs/p2p/tests/framework/p2p_runner.rs @@ -236,7 +236,7 @@ fn execute_test_chunking_pool( let fake_crypto = CryptoReturningOk::default(); let fake_crypto = Arc::new(fake_crypto); let node_pool_dir = test_synchronizer.get_test_group_directory(); - let state_sync_client = Arc::new(ArtifactChunkingTestImpl::new(node_pool_dir, node_id)); + let state_sync_client = Box::new(ArtifactChunkingTestImpl::new(node_pool_dir, node_id)); let state_sync_client = P2PStateSyncClient::TestChunkingPool(state_sync_client.clone(), state_sync_client); let subnet_config = SubnetConfigs::default().own_subnet_config(SubnetType::System); diff --git a/rs/replica/setup_ic_network/src/lib.rs b/rs/replica/setup_ic_network/src/lib.rs index 20744c63d9b..21528c6778d 100644 --- a/rs/replica/setup_ic_network/src/lib.rs +++ b/rs/replica/setup_ic_network/src/lib.rs @@ -40,7 +40,7 @@ use ic_metrics::MetricsRegistry; use ic_p2p::{start_p2p, AdvertBroadcaster, P2PThreadJoiner}; use ic_registry_client_helpers::subnet::SubnetRegistry; use ic_replicated_state::ReplicatedState; -use ic_state_manager::StateManagerImpl; +use ic_state_manager::state_sync::StateSync; use ic_transport::transport::create_transport; use ic_types::{ artifact::{Advert, ArtifactKind, ArtifactTag, FileTreeSyncAttribute}, @@ -55,16 +55,15 @@ use ic_types::{ use std::sync::{Arc, Mutex, RwLock}; /// The P2P state sync client. -#[derive(Clone)] pub enum P2PStateSyncClient { /// The main client variant. - Client(Arc), + Client(StateSync), /// The test client variant. TestClient(), /// The test chunking pool variant. TestChunkingPool( - Arc>, - Arc + Sync + 'static>, + Box>, + Box + Sync + 'static>, ), } @@ -234,10 +233,10 @@ fn setup_artifact_manager( let addr = processors::ArtifactProcessorManager::new( Arc::clone(&time_source) as Arc<_>, metrics_registry, - processors::BoxOrArcClient::ArcClient(client_on_state_change), + client_on_state_change, move |req| advert_broadcaster.broadcast_advert(req.advert.into(), req.advert_class), ); - artifact_manager_maker.add_arc_client(client, addr); + artifact_manager_maker.add_client(client, addr); return Ok(artifact_manager_maker.finish()); } if let P2PStateSyncClient::Client(state_sync_client) = state_sync_client { @@ -245,10 +244,10 @@ fn setup_artifact_manager( let addr = processors::ArtifactProcessorManager::new( Arc::clone(&time_source) as Arc<_>, metrics_registry.clone(), - processors::BoxOrArcClient::ArcClient(Arc::clone(&state_sync_client) as Arc<_>), + Box::new(state_sync_client.clone()) as Box<_>, move |req| advert_broadcaster.broadcast_advert(req.advert.into(), req.advert_class), ); - artifact_manager_maker.add_arc_client(state_sync_client, addr); + artifact_manager_maker.add_client(Box::new(state_sync_client), addr); } let consensus_replica_config = ReplicaConfig { node_id, subnet_id }; @@ -332,7 +331,7 @@ fn setup_artifact_manager( replica_logger.clone(), metrics_registry.clone(), ); - artifact_manager_maker.add_client(consensus_client, actor); + artifact_manager_maker.add_client(Box::new(consensus_client), actor); } { @@ -348,7 +347,7 @@ fn setup_artifact_manager( node_id, malicious_flags.clone(), ); - artifact_manager_maker.add_client(ingress_client, actor); + artifact_manager_maker.add_client(Box::new(ingress_client), actor); } { @@ -372,7 +371,7 @@ fn setup_artifact_manager( replica_logger.clone(), metrics_registry.clone(), ); - artifact_manager_maker.add_client(certification_client, actor); + artifact_manager_maker.add_client(Box::new(certification_client), actor); } { @@ -398,7 +397,7 @@ fn setup_artifact_manager( replica_logger.clone(), metrics_registry.clone(), ); - artifact_manager_maker.add_client(dkg_client, actor); + artifact_manager_maker.add_client(Box::new(dkg_client), actor); } { @@ -441,7 +440,7 @@ fn setup_artifact_manager( metrics_registry.clone(), replica_logger.clone(), ); - artifact_manager_maker.add_client(ecdsa_client, actor); + artifact_manager_maker.add_client(Box::new(ecdsa_client), actor); } { @@ -472,7 +471,7 @@ fn setup_artifact_manager( replica_logger.clone(), metrics_registry.clone(), ); - artifact_manager_maker.add_client(canister_http_client, actor); + artifact_manager_maker.add_client(Box::new(canister_http_client), actor); } Ok(artifact_manager_maker.finish()) diff --git a/rs/replica/src/setup_p2p.rs b/rs/replica/src/setup_p2p.rs index 8f289259c46..8fc71505e10 100755 --- a/rs/replica/src/setup_p2p.rs +++ b/rs/replica/src/setup_p2p.rs @@ -22,7 +22,7 @@ use ic_replica_setup_ic_network::{ create_networking_stack, init_artifact_pools, P2PStateSyncClient, }; use ic_replicated_state::ReplicatedState; -use ic_state_manager::StateManagerImpl; +use ic_state_manager::{state_sync::StateSync, StateManagerImpl}; use ic_types::{consensus::catchup::CUPWithOriginalProtobuf, NodeId, SubnetId}; use ic_xnet_endpoint::{XNetEndpoint, XNetEndpointConfig}; use ic_xnet_payload_builder::XNetPayloadBuilderImpl; @@ -233,6 +233,8 @@ pub fn construct_ic_stack( replica_logger.clone(), ); + let state_sync = StateSync::new(state_manager.clone(), replica_logger.clone()); + let (ingress_ingestion_service, p2p_runner) = create_networking_stack( metrics_registry, replica_logger, @@ -245,7 +247,7 @@ pub fn construct_ic_stack( None, Arc::clone(&crypto) as Arc<_>, Arc::clone(&state_manager) as Arc<_>, - P2PStateSyncClient::Client(Arc::clone(&state_manager) as Arc<_>), + P2PStateSyncClient::Client(state_sync), xnet_payload_builder as Arc<_>, self_validating_payload_builder as Arc<_>, message_router as Arc<_>, diff --git a/rs/state_manager/src/lib.rs b/rs/state_manager/src/lib.rs index dc5392cf252..ba9e093a4a1 100644 --- a/rs/state_manager/src/lib.rs +++ b/rs/state_manager/src/lib.rs @@ -42,8 +42,6 @@ use ic_replicated_state::{ }; use ic_state_layout::{error::LayoutError, AccessPolicy, CheckpointLayout, ReadOnly, StateLayout}; use ic_types::{ - artifact::StateSyncArtifactId, - chunkable::Chunkable, consensus::certification::Certification, crypto::CryptoHash, malicious_flags::MaliciousFlags, @@ -634,7 +632,6 @@ pub struct StateManagerImpl { // requested quite often and this causes high contention on the lock. latest_state_height: AtomicU64, latest_certified_height: AtomicU64, - state_sync_refs: StateSyncRefs, _state_hasher_handle: JoinOnDrop<()>, _deallocation_handle: JoinOnDrop<()>, persist_metadata_guard: Arc>, @@ -1474,7 +1471,7 @@ impl StateManagerImpl { report_last_diverged_state(&log, &metrics, &state_layout); Self { - log: log.clone(), + log, metrics, state_layout, states, @@ -1485,7 +1482,6 @@ impl StateManagerImpl { deallocation_sender, latest_state_height, latest_certified_height, - state_sync_refs: StateSyncRefs::new(log), _state_hasher_handle, _deallocation_handle, persist_metadata_guard, @@ -1501,29 +1497,6 @@ impl StateManagerImpl { &self.state_layout } - /// Returns requested state as a Chunkable artifact for StateSync. - pub fn create_chunkable_state( - &self, - id: &StateSyncArtifactId, - ) -> Box { - info!(self.log, "Starting state sync @{}", id.height); - - Box::new(crate::state_sync::chunkable::IncompleteState::new( - self.log.clone(), - id.height, - id.hash.clone(), - self.state_layout.clone(), - self.latest_manifest(), - self.metrics.clone(), - self.own_subnet_type, - Arc::new(Mutex::new(scoped_threadpool::Pool::new( - NUMBER_OF_CHECKPOINT_THREADS, - ))), - self.state_sync_refs.clone(), - self.malicious_flags.clone(), - )) - } - /// Reads states metadata file, returning an empty one if any errors occurs. /// /// It's OK to miss some (or all) metadata entries as it will be re-computed diff --git a/rs/state_manager/src/state_sync.rs b/rs/state_manager/src/state_sync.rs index 891023ea0c5..2eff029f3c2 100644 --- a/rs/state_manager/src/state_sync.rs +++ b/rs/state_manager/src/state_sync.rs @@ -1,14 +1,17 @@ pub(crate) mod chunkable; use super::StateManagerImpl; -use crate::{manifest::build_file_group_chunks, EXTRA_CHECKPOINTS_TO_KEEP}; +use crate::{ + manifest::build_file_group_chunks, StateSyncRefs, EXTRA_CHECKPOINTS_TO_KEEP, + NUMBER_OF_CHECKPOINT_THREADS, +}; use ic_interfaces::{ artifact_manager::{ArtifactClient, ArtifactProcessor, ProcessingResult}, artifact_pool::{ArtifactPoolError, UnvalidatedArtifact}, time_source::TimeSource, }; use ic_interfaces_state_manager::{StateManager, CERT_CERTIFIED}; -use ic_logger::{info, warn}; +use ic_logger::{info, warn, ReplicaLogger}; use ic_types::{ artifact::{ Advert, AdvertClass, AdvertSendRequest, ArtifactKind, ArtifactTag, Priority, @@ -19,7 +22,46 @@ use ic_types::{ state_sync::FileGroupChunks, Height, NodeId, }; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; + +#[derive(Clone)] +pub struct StateSync { + state_manager: Arc, + state_sync_refs: StateSyncRefs, + log: ReplicaLogger, +} + +impl StateSync { + pub fn new(state_manager: Arc, log: ReplicaLogger) -> Self { + Self { + state_manager, + state_sync_refs: StateSyncRefs::new(log.clone()), + log, + } + } + /// Returns requested state as a Chunkable artifact for StateSync. + pub fn create_chunkable_state( + &self, + id: &StateSyncArtifactId, + ) -> Box { + info!(self.log, "Starting state sync @{}", id.height); + + Box::new(crate::state_sync::chunkable::IncompleteState::new( + self.log.clone(), + id.height, + id.hash.clone(), + self.state_manager.state_layout.clone(), + self.state_manager.latest_manifest(), + self.state_manager.metrics.clone(), + self.state_manager.own_subnet_type, + Arc::new(Mutex::new(scoped_threadpool::Pool::new( + NUMBER_OF_CHECKPOINT_THREADS, + ))), + self.state_sync_refs.clone(), + self.state_manager.malicious_flags.clone(), + )) + } +} #[derive(Debug, Clone, Copy, Eq, PartialEq)] pub struct StateSyncArtifact; @@ -53,7 +95,7 @@ impl ArtifactKind for StateSyncArtifact { } } -impl ArtifactClient for StateManagerImpl { +impl ArtifactClient for StateSync { fn check_artifact_acceptance( &self, _msg: &StateSyncMessage, @@ -67,40 +109,48 @@ impl ArtifactClient for StateManagerImpl { msg_id: &StateSyncArtifactId, ) -> Option { let mut file_group_to_populate: Option> = None; - let state_sync_message = - self.states - .read() - .states_metadata - .iter() - .find_map(|(height, metadata)| { - if metadata.root_hash() == Some(&msg_id.hash) { - let manifest = metadata.manifest()?; - let checkpoint_root = self.state_layout.checkpoint(*height).ok()?; - let state_sync_file_group = match &metadata.state_sync_file_group { - Some(value) => value.clone(), - None => { - // Note that this code path will be called at most once because the value is then populated. - let computed_file_group_chunks = - Arc::new(build_file_group_chunks(manifest)); - file_group_to_populate = Some(computed_file_group_chunks.clone()); - computed_file_group_chunks - } - }; + let state_sync_message = self + .state_manager + .states + .read() + .states_metadata + .iter() + .find_map(|(height, metadata)| { + if metadata.root_hash() == Some(&msg_id.hash) { + let manifest = metadata.manifest()?; + let checkpoint_root = + self.state_manager.state_layout.checkpoint(*height).ok()?; + let state_sync_file_group = match &metadata.state_sync_file_group { + Some(value) => value.clone(), + None => { + // Note that this code path will be called at most once because the value is then populated. + let computed_file_group_chunks = + Arc::new(build_file_group_chunks(manifest)); + file_group_to_populate = Some(computed_file_group_chunks.clone()); + computed_file_group_chunks + } + }; - Some(StateSyncMessage { - height: *height, - root_hash: msg_id.hash.clone(), - checkpoint_root: checkpoint_root.raw_path().to_path_buf(), - manifest: manifest.clone(), - state_sync_file_group, - }) - } else { - None - } - }); + Some(StateSyncMessage { + height: *height, + root_hash: msg_id.hash.clone(), + checkpoint_root: checkpoint_root.raw_path().to_path_buf(), + manifest: manifest.clone(), + state_sync_file_group, + }) + } else { + None + } + }); if let Some(state_sync_file_group) = file_group_to_populate { - if let Some(metadata) = self.states.write().states_metadata.get_mut(&msg_id.height) { + if let Some(metadata) = self + .state_manager + .states + .write() + .states_metadata + .get_mut(&msg_id.height) + { metadata.state_sync_file_group = Some(state_sync_file_group); } } @@ -108,7 +158,8 @@ impl ArtifactClient for StateManagerImpl { } fn has_artifact(&self, msg_id: &StateSyncArtifactId) -> bool { - self.states + self.state_manager + .states .read() .states_metadata .iter() @@ -123,7 +174,7 @@ impl ArtifactClient for StateManagerImpl { &self, filter: &StateSyncFilter, ) -> Vec> { - let heights = match self.state_layout.checkpoint_heights() { + let heights = match self.state_manager.state_layout.checkpoint_heights() { Ok(heights) => heights, Err(err) => { warn!( @@ -133,7 +184,7 @@ impl ArtifactClient for StateManagerImpl { return Vec::new(); } }; - let states = self.states.read(); + let states = self.state_manager.states.read(); heights .into_iter() @@ -141,7 +192,7 @@ impl ArtifactClient for StateManagerImpl { if h > filter.height { let metadata = states.states_metadata.get(&h)?; let manifest = metadata.manifest()?; - let checkpoint_root = self.state_layout.checkpoint(h).ok()?; + let checkpoint_root = self.state_manager.state_layout.checkpoint(h).ok()?; let msg = StateSyncMessage { height: h, root_hash: metadata.root_hash()?.clone(), @@ -164,8 +215,8 @@ impl ArtifactClient for StateManagerImpl { > { use ic_interfaces_state_manager::StateReader; - let latest_height = self.latest_state_height(); - let fetch_state = self.states.read().fetch_state.clone(); + let latest_height = self.state_manager.latest_state_height(); + let fetch_state = self.state_manager.states.read().fetch_state.clone(); let state_sync_refs = self.state_sync_refs.clone(); let log = self.log.clone(); @@ -240,6 +291,7 @@ impl ArtifactClient for StateManagerImpl { fn get_filter(&self) -> StateSyncFilter { StateSyncFilter { height: *self + .state_manager .list_state_heights(CERT_CERTIFIED) .last() .unwrap_or(&Height::from(0)), @@ -252,7 +304,7 @@ impl ArtifactClient for StateManagerImpl { } } -impl ArtifactProcessor for StateManagerImpl { +impl ArtifactProcessor for StateSync { // Returns the states checkpointed since the last time process_changes was // called. fn process_changes( @@ -274,20 +326,26 @@ impl ArtifactProcessor for StateManagerImpl { ); let ro_layout = self + .state_manager .state_layout .checkpoint(height) .expect("failed to create checkpoint layout"); let state = crate::checkpoint::load_checkpoint_parallel( &ro_layout, - self.own_subnet_type, - &self.metrics.checkpoint_metrics, + self.state_manager.own_subnet_type, + &self.state_manager.metrics.checkpoint_metrics, ) .expect("failed to recover checkpoint"); - self.on_synced_checkpoint(state, height, message.manifest, message.root_hash); + self.state_manager.on_synced_checkpoint( + state, + height, + message.manifest, + message.root_hash, + ); } let filter = StateSyncFilter { - height: self.states.read().last_advertised, + height: self.state_manager.states.read().last_advertised, }; let artifacts = self.get_all_validated_by_filter(&filter); let artifacts: Vec> = artifacts @@ -300,7 +358,7 @@ impl ArtifactProcessor for StateManagerImpl { .collect(); if let Some(artifact) = artifacts.last() { - self.states.write().last_advertised = artifact.advert.attribute.height; + self.state_manager.states.write().last_advertised = artifact.advert.attribute.height; } (artifacts, ProcessingResult::StateUnchanged) diff --git a/rs/state_manager/tests/common/mod.rs b/rs/state_manager/tests/common/mod.rs index 2923cab4a90..803b56f43f4 100644 --- a/rs/state_manager/tests/common/mod.rs +++ b/rs/state_manager/tests/common/mod.rs @@ -11,7 +11,7 @@ use ic_metrics::MetricsRegistry; use ic_registry_subnet_type::SubnetType; use ic_replicated_state::canister_state::execution_state::WasmBinary; use ic_replicated_state::{testing::ReplicatedStateTesting, ReplicatedState, Stream}; -use ic_state_manager::{stream_encoding, StateManagerImpl}; +use ic_state_manager::{state_sync::StateSync, stream_encoding, StateManagerImpl}; use ic_test_utilities::{ consensus::fake::{Fake, FakeVerifier}, state::{initial_execution_state, new_canister_state}, @@ -480,10 +480,49 @@ pub fn state_manager_test_with_verifier_result, StateSync), +>( + should_pass_verification: bool, + f: F, +) { + let tmp = tmpdir("sm"); + let config = Config::new(tmp.path().into()); + let metrics_registry = MetricsRegistry::new(); + let own_subnet = subnet_test_id(42); + let verifier: Arc = if should_pass_verification { + Arc::new(FakeVerifier::new()) + } else { + Arc::new(RejectingVerifier::default()) + }; + + with_test_replica_logger(|log| { + let sm = Arc::new(StateManagerImpl::new( + verifier, + own_subnet, + SubnetType::Application, + log.clone(), + &metrics_registry, + &config, + None, + ic_types::malicious_flags::MaliciousFlags::default(), + )); + f(&metrics_registry, sm.clone(), StateSync::new(sm, log)); + }) +} + pub fn state_manager_test(f: F) { state_manager_test_with_verifier_result(true, f) } +pub fn state_manager_test_with_state_sync< + F: FnOnce(&MetricsRegistry, Arc, StateSync), +>( + f: F, +) { + state_manager_test_with_state_sync_and_verifier_result(true, f) +} + pub fn state_manager_restart_test_deleting_metadata(test: Test) where Test: FnOnce( diff --git a/rs/state_manager/tests/state_manager.rs b/rs/state_manager/tests/state_manager.rs index 33d581e1631..f88c44ebc17 100644 --- a/rs/state_manager/tests/state_manager.rs +++ b/rs/state_manager/tests/state_manager.rs @@ -1,11 +1,8 @@ use ic_base_types::NumBytes; use ic_config::state_manager::Config; use ic_crypto_tree_hash::{flatmap, Label, LabeledTree, MixedHashTree}; -use ic_interfaces::{ - artifact_manager::{ArtifactClient, ArtifactProcessor}, - artifact_pool::UnvalidatedArtifact, - certification::Verifier, -}; +use ic_interfaces::artifact_manager::{ArtifactClient, ArtifactProcessor}; +use ic_interfaces::{artifact_pool::UnvalidatedArtifact, certification::Verifier}; use ic_interfaces_certified_stream_store::{CertifiedStreamStore, EncodeStreamError}; use ic_interfaces_state_manager::*; use ic_logger::replica_logger::no_op_logger; @@ -80,26 +77,26 @@ fn label>(t: T) -> Label { #[test] fn rejoining_node_doesnt_accumulate_states() { - state_manager_test(|src_metrics, src_state_manager| { - state_manager_test(|dst_metrics, dst_state_manager| { + state_manager_test_with_state_sync(|src_metrics, src_state_manager, src_state_sync| { + state_manager_test_with_state_sync(|dst_metrics, dst_state_manager, dst_state_sync| { for i in 1..=3 { let mut state = src_state_manager.take_tip().1; insert_dummy_canister(&mut state, canister_test_id(100 + i)); src_state_manager.commit_and_certify(state, height(i), CertificationScope::Full); let time_source = ic_test_utilities::FastForwardTimeSource::new(); - let hash = wait_for_checkpoint(&src_state_manager, height(i)); + let hash = wait_for_checkpoint(&*src_state_manager, height(i)); let id = StateSyncArtifactId { height: height(i), hash, }; - let msg = src_state_manager + let msg = src_state_sync .get_validated_by_identifier(&id) .expect("failed to get state sync messages"); - let chunkable = dst_state_manager.create_chunkable_state(&id); + let chunkable = dst_state_sync.create_chunkable_state(&id); let dst_msg = pipe_state_sync(msg.clone(), chunkable); - dst_state_manager.process_changes( + dst_state_sync.process_changes( time_source.as_ref(), vec![UnvalidatedArtifact { message: dst_msg, @@ -1483,25 +1480,25 @@ fn encode_stream_index_is_checked() { #[test] fn delivers_state_adverts_once() { - state_manager_test(|_metrics, state_manager| { + state_manager_test_with_state_sync(|_metrics, state_manager, state_sync| { let (_height, state) = state_manager.take_tip(); let time_source = ic_test_utilities::FastForwardTimeSource::new(); state_manager.commit_and_certify(state, height(1), CertificationScope::Full); - let hash = wait_for_checkpoint(&state_manager, height(1)); + let hash = wait_for_checkpoint(&*state_manager, height(1)); let id = StateSyncArtifactId { height: height(1), hash, }; - let (adverts, _) = state_manager.process_changes(time_source.as_ref(), Default::default()); + let (adverts, _) = state_sync.process_changes(time_source.as_ref(), Default::default()); assert_eq!(adverts.len(), 1); assert_eq!(adverts[0].advert.id, id); - assert!(state_manager.has_artifact(&id)); + assert!(state_sync.has_artifact(&id)); - let (adverts, _) = state_manager.process_changes(time_source.as_ref(), Default::default()); + let (adverts, _) = state_sync.process_changes(time_source.as_ref(), Default::default()); assert_eq!(adverts.len(), 0); - assert!(state_manager.has_artifact(&id)); + assert!(state_sync.has_artifact(&id)); }); } @@ -1524,17 +1521,17 @@ fn recomputes_metadata_on_restart_if_missing() { #[test] fn state_sync_message_contains_manifest() { - state_manager_test(|_metrics, state_manager| { + state_manager_test_with_state_sync(|_metrics, state_manager, state_sync| { let (_height, state) = state_manager.take_tip(); state_manager.commit_and_certify(state, height(1), CertificationScope::Full); - let hash = wait_for_checkpoint(&state_manager, height(1)); + let hash = wait_for_checkpoint(&*state_manager, height(1)); let id = StateSyncArtifactId { height: height(1), hash, }; - let msg = state_manager + let msg = state_sync .get_validated_by_identifier(&id) .expect("failed to get state sync messages"); @@ -1562,7 +1559,7 @@ fn state_sync_message_contains_manifest() { #[test] fn state_sync_priority_fn_respects_states_to_fetch() { - state_manager_test(|_metrics, state_manager| { + state_manager_test_with_state_sync(|_metrics, state_manager, state_sync| { fn hash(n: u8) -> CryptoHashOfState { CryptoHashOfState::from(CryptoHash(vec![n; 32])) } @@ -1573,7 +1570,7 @@ fn state_sync_priority_fn_respects_states_to_fetch() { let (_height, state) = state_manager.take_tip(); state_manager.commit_and_certify(state, height(2), CertificationScope::Metadata); - let priority_fn = state_manager + let priority_fn = state_sync .get_priority_function() .expect("state manager returned no priority function"); @@ -1601,7 +1598,7 @@ fn state_sync_priority_fn_respects_states_to_fetch() { // Request fetching of state 3. state_manager.fetch_state(height(3), hash(3), Height::new(99)); - let priority_fn = state_manager + let priority_fn = state_sync .get_priority_function() .expect("state manager returned no priority function"); // Good hash @@ -1635,7 +1632,7 @@ fn state_sync_priority_fn_respects_states_to_fetch() { // Request fetching of newer state 4. state_manager.fetch_state(height(4), hash(4), Height::new(99)); - let priority_fn = state_manager + let priority_fn = state_sync .get_priority_function() .expect("state manager returned no priority function"); assert_eq!( @@ -1686,13 +1683,13 @@ fn assert_no_remaining_chunks(metrics: &MetricsRegistry) { #[test] fn can_do_simple_state_sync_transfer() { - state_manager_test(|src_metrics, src_state_manager| { + state_manager_test_with_state_sync(|src_metrics, src_state_manager, src_state_sync| { let (_height, mut state) = src_state_manager.take_tip(); insert_dummy_canister(&mut state, canister_test_id(100)); let time_source = ic_test_utilities::FastForwardTimeSource::new(); src_state_manager.commit_and_certify(state, height(1), CertificationScope::Full); - let hash = wait_for_checkpoint(&src_state_manager, height(1)); + let hash = wait_for_checkpoint(&*src_state_manager, height(1)); let id = StateSyncArtifactId { height: height(1), hash, @@ -1700,17 +1697,17 @@ fn can_do_simple_state_sync_transfer() { let state = src_state_manager.get_latest_state().take(); - let msg = src_state_manager + let msg = src_state_sync .get_validated_by_identifier(&id) .expect("failed to get state sync messages"); assert_error_counters(src_metrics); - state_manager_test(|dst_metrics, dst_state_manager| { - let chunkable = dst_state_manager.create_chunkable_state(&id); + state_manager_test_with_state_sync(|dst_metrics, dst_state_manager, dst_state_sync| { + let chunkable = dst_state_sync.create_chunkable_state(&id); let dst_msg = pipe_state_sync(msg, chunkable); - dst_state_manager.process_changes( + dst_state_sync.process_changes( time_source.as_ref(), vec![UnvalidatedArtifact { message: dst_msg, @@ -1731,7 +1728,7 @@ fn can_do_simple_state_sync_transfer() { // Because `take_tip()` modifies the `prev_state_hash`, we change it back to compare the rest of state. tip.metadata.prev_state_hash = state.metadata.prev_state_hash.clone(); assert_eq!(*state.as_ref(), tip); - assert_eq!(vec![height(1)], heights_to_certify(&dst_state_manager)); + assert_eq!(vec![height(1)], heights_to_certify(&*dst_state_manager)); assert_error_counters(dst_metrics); assert_no_remaining_chunks(dst_metrics); @@ -1741,14 +1738,14 @@ fn can_do_simple_state_sync_transfer() { #[test] fn can_state_sync_from_cache() { - state_manager_test(|src_metrics, src_state_manager| { + state_manager_test_with_state_sync(|src_metrics, src_state_manager, src_state_sync| { let (_height, mut state) = src_state_manager.take_tip(); insert_dummy_canister(&mut state, canister_test_id(100)); insert_dummy_canister(&mut state, canister_test_id(200)); let time_source = ic_test_utilities::FastForwardTimeSource::new(); src_state_manager.commit_and_certify(state, height(1), CertificationScope::Full); - let hash = wait_for_checkpoint(&src_state_manager, height(1)); + let hash = wait_for_checkpoint(&*src_state_manager, height(1)); let id = StateSyncArtifactId { height: height(1), hash: hash.clone(), @@ -1756,19 +1753,19 @@ fn can_state_sync_from_cache() { let state = src_state_manager.get_latest_state().take(); - let msg = src_state_manager + let msg = src_state_sync .get_validated_by_identifier(&id) .expect("failed to get state sync messages"); assert_error_counters(src_metrics); - state_manager_test(|dst_metrics, dst_state_manager| { + state_manager_test_with_state_sync(|dst_metrics, dst_state_manager, dst_state_sync| { let omit: HashSet = maplit::hashset! {ChunkId::new(1), ChunkId::new(FILE_GROUP_CHUNK_ID_OFFSET)}; // First state sync is destroyed before completion { - let mut chunkable = dst_state_manager.create_chunkable_state(&id); + let mut chunkable = dst_state_sync.create_chunkable_state(&id); // First fetch chunk 0 (the manifest), and then ask for all chunks afterwards, // but never receive 1 and FILE_GROUP_CHUNK_ID_OFFSET @@ -1784,7 +1781,7 @@ fn can_state_sync_from_cache() { hash: hash.clone(), }; - let mut chunkable = dst_state_manager.create_chunkable_state(&id); + let mut chunkable = dst_state_sync.create_chunkable_state(&id); let result = pipe_manifest(&msg, &mut *chunkable); assert!(result.is_none()); @@ -1804,7 +1801,7 @@ fn can_state_sync_from_cache() { // Download chunk 1 let dst_msg = pipe_state_sync(msg.clone(), chunkable); - dst_state_manager.process_changes( + dst_state_sync.process_changes( time_source.as_ref(), vec![UnvalidatedArtifact { message: dst_msg, @@ -1824,7 +1821,7 @@ fn can_state_sync_from_cache() { *state.as_ref(), *dst_state_manager.get_latest_state().take() ); - assert_eq!(vec![height(2)], heights_to_certify(&dst_state_manager)); + assert_eq!(vec![height(2)], heights_to_certify(&*dst_state_manager)); } assert_no_remaining_chunks(dst_metrics); // Third state sync can copy all chunks immediately @@ -1835,12 +1832,12 @@ fn can_state_sync_from_cache() { hash, }; - let mut chunkable = dst_state_manager.create_chunkable_state(&id); + let mut chunkable = dst_state_sync.create_chunkable_state(&id); // The manifest alone is enough to complete the sync let dst_msg = pipe_manifest(&msg, &mut *chunkable).unwrap(); - dst_state_manager.process_changes( + dst_state_sync.process_changes( time_source.as_ref(), vec![UnvalidatedArtifact { message: dst_msg, @@ -1862,7 +1859,7 @@ fn can_state_sync_from_cache() { ); assert_eq!( vec![height(2), height(3)], - heights_to_certify(&dst_state_manager) + heights_to_certify(&*dst_state_manager) ); } @@ -1874,26 +1871,26 @@ fn can_state_sync_from_cache() { #[test] fn can_state_sync_into_existing_checkpoint() { - state_manager_test(|src_metrics, src_state_manager| { + state_manager_test_with_state_sync(|src_metrics, src_state_manager, src_state_sync| { let (_height, mut state) = src_state_manager.take_tip(); insert_dummy_canister(&mut state, canister_test_id(100)); let time_source = ic_test_utilities::FastForwardTimeSource::new(); src_state_manager.commit_and_certify(state.clone(), height(1), CertificationScope::Full); - let hash = wait_for_checkpoint(&src_state_manager, height(1)); + let hash = wait_for_checkpoint(&*src_state_manager, height(1)); let id = StateSyncArtifactId { height: height(1), hash, }; - let msg = src_state_manager + let msg = src_state_sync .get_validated_by_identifier(&id) .expect("failed to get state sync messages"); assert_error_counters(src_metrics); - state_manager_test(|dst_metrics, dst_state_manager| { - let chunkable = dst_state_manager.create_chunkable_state(&id); + state_manager_test_with_state_sync(|dst_metrics, dst_state_manager, dst_state_sync| { + let chunkable = dst_state_sync.create_chunkable_state(&id); dst_state_manager.take_tip(); dst_state_manager.commit_and_certify( @@ -1903,7 +1900,7 @@ fn can_state_sync_into_existing_checkpoint() { ); let dst_msg = pipe_state_sync(msg, chunkable); - dst_state_manager.process_changes( + dst_state_sync.process_changes( time_source.as_ref(), vec![UnvalidatedArtifact { message: dst_msg, @@ -1920,7 +1917,7 @@ fn can_state_sync_into_existing_checkpoint() { #[test] fn can_group_small_files_in_state_sync() { - state_manager_test(|src_metrics, src_state_manager| { + state_manager_test_with_state_sync(|src_metrics, src_state_manager, src_state_sync| { let (_height, mut state) = src_state_manager.take_tip(); let num_canisters = 200; let time_source = ic_test_utilities::FastForwardTimeSource::new(); @@ -1937,7 +1934,7 @@ fn can_group_small_files_in_state_sync() { ); src_state_manager.commit_and_certify(state, height(1), CertificationScope::Full); - let hash = wait_for_checkpoint(&src_state_manager, height(1)); + let hash = wait_for_checkpoint(&*src_state_manager, height(1)); let id = StateSyncArtifactId { height: height(1), hash, @@ -1945,7 +1942,7 @@ fn can_group_small_files_in_state_sync() { let state = src_state_manager.get_latest_state().take(); - let msg = src_state_manager + let msg = src_state_sync .get_validated_by_identifier(&id) .expect("failed to get state sync messages"); @@ -1968,8 +1965,8 @@ fn can_group_small_files_in_state_sync() { assert_error_counters(src_metrics); - state_manager_test(|dst_metrics, dst_state_manager| { - let mut chunkable = dst_state_manager.create_chunkable_state(&id); + state_manager_test_with_state_sync(|dst_metrics, dst_state_manager, dst_state_sync| { + let mut chunkable = dst_state_sync.create_chunkable_state(&id); let result = pipe_manifest(&msg, &mut *chunkable); assert!(result.is_none()); @@ -1980,7 +1977,7 @@ fn can_group_small_files_in_state_sync() { let dst_msg = pipe_state_sync(msg, chunkable); - dst_state_manager.process_changes( + dst_state_sync.process_changes( time_source.as_ref(), vec![UnvalidatedArtifact { message: dst_msg, @@ -2005,7 +2002,7 @@ fn can_group_small_files_in_state_sync() { #[test] fn can_commit_after_prev_state_is_gone() { - state_manager_test(|src_metrics, src_state_manager| { + state_manager_test_with_state_sync(|src_metrics, src_state_manager, src_state_sync| { let (_height, mut tip) = src_state_manager.take_tip(); insert_dummy_canister(&mut tip, canister_test_id(100)); src_state_manager.commit_and_certify(tip, height(1), CertificationScope::Metadata); @@ -2017,28 +2014,28 @@ fn can_commit_after_prev_state_is_gone() { let (_height, tip) = src_state_manager.take_tip(); src_state_manager.commit_and_certify(tip, height(3), CertificationScope::Full); - let hash = wait_for_checkpoint(&src_state_manager, height(3)); + let hash = wait_for_checkpoint(&*src_state_manager, height(3)); let id = StateSyncArtifactId { height: height(3), hash, }; - let msg = src_state_manager + let msg = src_state_sync .get_validated_by_identifier(&id) .expect("failed to get state sync messages"); assert_error_counters(src_metrics); - state_manager_test(|dst_metrics, dst_state_manager| { + state_manager_test_with_state_sync(|dst_metrics, dst_state_manager, dst_state_sync| { let (_height, mut tip) = dst_state_manager.take_tip(); insert_dummy_canister(&mut tip, canister_test_id(100)); dst_state_manager.commit_and_certify(tip, height(1), CertificationScope::Metadata); let (_height, tip) = dst_state_manager.take_tip(); - let chunkable = dst_state_manager.create_chunkable_state(&id); + let chunkable = dst_state_sync.create_chunkable_state(&id); let dst_msg = pipe_state_sync(msg, chunkable); - dst_state_manager.process_changes( + dst_state_sync.process_changes( time_source.as_ref(), vec![UnvalidatedArtifact { message: dst_msg, @@ -2070,7 +2067,7 @@ fn can_commit_after_prev_state_is_gone() { #[test] fn can_commit_without_prev_hash_mismatch_after_taking_tip_at_the_synced_height() { - state_manager_test(|src_metrics, src_state_manager| { + state_manager_test_with_state_sync(|src_metrics, src_state_manager, src_state_sync| { let (_height, mut tip) = src_state_manager.take_tip(); insert_dummy_canister(&mut tip, canister_test_id(100)); src_state_manager.commit_and_certify(tip, height(1), CertificationScope::Metadata); @@ -2082,26 +2079,26 @@ fn can_commit_without_prev_hash_mismatch_after_taking_tip_at_the_synced_height() let (_height, tip) = src_state_manager.take_tip(); src_state_manager.commit_and_certify(tip, height(3), CertificationScope::Full); - let hash = wait_for_checkpoint(&src_state_manager, height(3)); + let hash = wait_for_checkpoint(&*src_state_manager, height(3)); let id = StateSyncArtifactId { height: height(3), hash, }; - let msg = src_state_manager + let msg = src_state_sync .get_validated_by_identifier(&id) .expect("failed to get state sync messages"); assert_error_counters(src_metrics); - state_manager_test(|dst_metrics, dst_state_manager| { + state_manager_test_with_state_sync(|dst_metrics, dst_state_manager, dst_state_sync| { let (_height, mut tip) = dst_state_manager.take_tip(); insert_dummy_canister(&mut tip, canister_test_id(100)); dst_state_manager.commit_and_certify(tip, height(1), CertificationScope::Metadata); - let chunkable = dst_state_manager.create_chunkable_state(&id); + let chunkable = dst_state_sync.create_chunkable_state(&id); let dst_msg = pipe_state_sync(msg, chunkable); - dst_state_manager.process_changes( + dst_state_sync.process_changes( time_source.as_ref(), vec![UnvalidatedArtifact { message: dst_msg, @@ -2123,7 +2120,7 @@ fn can_commit_without_prev_hash_mismatch_after_taking_tip_at_the_synced_height() #[test] fn can_state_sync_based_on_old_checkpoint() { - state_manager_test(|src_metrics, src_state_manager| { + state_manager_test_with_state_sync(|src_metrics, src_state_manager, src_state_sync| { let (_height, mut state) = src_state_manager.take_tip(); insert_dummy_canister(&mut state, canister_test_id(100)); src_state_manager.commit_and_certify(state, height(1), CertificationScope::Full); @@ -2133,28 +2130,28 @@ fn can_state_sync_based_on_old_checkpoint() { insert_dummy_canister(&mut state, canister_test_id(200)); src_state_manager.commit_and_certify(state, height(2), CertificationScope::Full); - let hash = wait_for_checkpoint(&src_state_manager, height(2)); + let hash = wait_for_checkpoint(&*src_state_manager, height(2)); let id = StateSyncArtifactId { height: height(2), hash, }; - let msg = src_state_manager + let msg = src_state_sync .get_validated_by_identifier(&id) .expect("failed to get state sync message"); assert_error_counters(src_metrics); - state_manager_test(|dst_metrics, dst_state_manager| { + state_manager_test_with_state_sync(|dst_metrics, dst_state_manager, dst_state_sync| { let (_height, mut state) = dst_state_manager.take_tip(); insert_dummy_canister(&mut state, canister_test_id(100)); dst_state_manager.commit_and_certify(state, height(1), CertificationScope::Full); - wait_for_checkpoint(&dst_state_manager, height(1)); + wait_for_checkpoint(&*dst_state_manager, height(1)); - let chunkable = dst_state_manager.create_chunkable_state(&id); + let chunkable = dst_state_sync.create_chunkable_state(&id); let dst_msg = pipe_state_sync(msg, chunkable); - dst_state_manager.process_changes( + dst_state_sync.process_changes( time_source.as_ref(), vec![UnvalidatedArtifact { message: dst_msg, @@ -2223,14 +2220,14 @@ fn can_recover_from_corruption_on_state_sync() { ]); }; - state_manager_test(|src_metrics, src_state_manager| { + state_manager_test_with_state_sync(|src_metrics, src_state_manager, src_state_sync| { // Create initial state with a single canister. let (_height, mut state) = src_state_manager.take_tip(); populate_original_state(&mut state); src_state_manager.commit_and_certify(state, height(1), CertificationScope::Full); let time_source = ic_test_utilities::FastForwardTimeSource::new(); - let hash_1 = wait_for_checkpoint(&src_state_manager, height(1)); + let hash_1 = wait_for_checkpoint(&*src_state_manager, height(1)); // Create another state with an extra canister. let (_height, mut state) = src_state_manager.take_tip(); @@ -2264,23 +2261,23 @@ fn can_recover_from_corruption_on_state_sync() { src_state_manager.commit_and_certify(state, height(2), CertificationScope::Full); - let hash_2 = wait_for_checkpoint(&src_state_manager, height(2)); + let hash_2 = wait_for_checkpoint(&*src_state_manager, height(2)); let id = StateSyncArtifactId { height: height(2), hash: hash_2, }; - let msg = src_state_manager + let msg = src_state_sync .get_validated_by_identifier(&id) .expect("failed to get state sync message"); assert_error_counters(src_metrics); - state_manager_test(|dst_metrics, dst_state_manager| { + state_manager_test_with_state_sync(|dst_metrics, dst_state_manager, dst_state_sync| { let (_height, mut state) = dst_state_manager.take_tip(); populate_original_state(&mut state); dst_state_manager.commit_and_certify(state, height(1), CertificationScope::Full); - let hash_dst_1 = wait_for_checkpoint(&dst_state_manager, height(1)); + let hash_dst_1 = wait_for_checkpoint(&*dst_state_manager, height(1)); assert_eq!(hash_1, hash_dst_1); // Corrupt some files in the destination checkpoint. @@ -2341,9 +2338,9 @@ fn can_recover_from_corruption_on_state_sync() { make_mutable(&canister_100_raw_pb).unwrap(); std::fs::write(&canister_100_raw_pb, b"Garbage").unwrap(); - let chunkable = dst_state_manager.create_chunkable_state(&id); + let chunkable = dst_state_sync.create_chunkable_state(&id); let dst_msg = pipe_state_sync(msg, chunkable); - dst_state_manager.process_changes( + dst_state_sync.process_changes( time_source.as_ref(), vec![UnvalidatedArtifact { message: dst_msg, @@ -2370,7 +2367,7 @@ fn can_recover_from_corruption_on_state_sync() { #[test] fn can_commit_below_state_sync() { - state_manager_test(|src_metrics, src_state_manager| { + state_manager_test_with_state_sync(|src_metrics, src_state_manager, src_state_sync| { let (_height, mut state) = src_state_manager.take_tip(); insert_dummy_canister(&mut state, canister_test_id(100)); let time_source = ic_test_utilities::FastForwardTimeSource::new(); @@ -2379,24 +2376,24 @@ fn can_commit_below_state_sync() { let (_height, state) = src_state_manager.take_tip(); src_state_manager.commit_and_certify(state, height(2), CertificationScope::Full); - let hash = wait_for_checkpoint(&src_state_manager, height(2)); + let hash = wait_for_checkpoint(&*src_state_manager, height(2)); let id = StateSyncArtifactId { height: height(2), hash, }; - let msg = src_state_manager + let msg = src_state_sync .get_validated_by_identifier(&id) .expect("failed to get state sync messages"); assert_error_counters(src_metrics); - state_manager_test(|dst_metrics, dst_state_manager| { + state_manager_test_with_state_sync(|dst_metrics, dst_state_manager, dst_state_sync| { let (tip_height, state) = dst_state_manager.take_tip(); assert_eq!(tip_height, height(0)); - let chunkable = dst_state_manager.create_chunkable_state(&id); + let chunkable = dst_state_sync.create_chunkable_state(&id); let dst_msg = pipe_state_sync(msg, chunkable); - dst_state_manager.process_changes( + dst_state_sync.process_changes( time_source.as_ref(), vec![UnvalidatedArtifact { message: dst_msg, @@ -2406,7 +2403,7 @@ fn can_commit_below_state_sync() { ); // Check committing an old state doesn't panic dst_state_manager.commit_and_certify(state, height(1), CertificationScope::Full); - wait_for_checkpoint(&dst_state_manager, height(1)); + wait_for_checkpoint(&*dst_state_manager, height(1)); // take_tip should update the tip to the synced checkpoint let (tip_height, _state) = dst_state_manager.take_tip(); @@ -2422,40 +2419,40 @@ fn can_commit_below_state_sync() { #[test] fn can_state_sync_below_commit() { - state_manager_test(|src_metrics, src_state_manager| { + state_manager_test_with_state_sync(|src_metrics, src_state_manager, src_state_sync| { let (_height, mut state) = src_state_manager.take_tip(); insert_dummy_canister(&mut state, canister_test_id(100)); let time_source = ic_test_utilities::FastForwardTimeSource::new(); src_state_manager.commit_and_certify(state.clone(), height(1), CertificationScope::Full); + let hash = wait_for_checkpoint(&*src_state_manager, height(1)); - let hash = wait_for_checkpoint(&src_state_manager, height(1)); let id = StateSyncArtifactId { height: height(1), hash, }; - let msg = src_state_manager + let msg = src_state_sync .get_validated_by_identifier(&id) .expect("failed to get state sync messages"); assert_error_counters(src_metrics); - state_manager_test(|dst_metrics, dst_state_manager| { + state_manager_test_with_state_sync(|dst_metrics, dst_state_manager, dst_state_sync| { let (tip_height, state) = dst_state_manager.take_tip(); assert_eq!(tip_height, height(0)); dst_state_manager.commit_and_certify(state, height(1), CertificationScope::Full); let (_height, state) = dst_state_manager.take_tip(); dst_state_manager.commit_and_certify(state, height(2), CertificationScope::Full); - wait_for_checkpoint(&dst_state_manager, height(2)); + wait_for_checkpoint(&*dst_state_manager, height(2)); let (_height, state) = dst_state_manager.take_tip(); dst_state_manager.remove_states_below(height(2)); assert_eq!(dst_state_manager.checkpoint_heights(), vec![height(2)]); - let chunkable = dst_state_manager.create_chunkable_state(&id); + let chunkable = dst_state_sync.create_chunkable_state(&id); let dst_msg = pipe_state_sync(msg, chunkable); - dst_state_manager.process_changes( + dst_state_sync.process_changes( time_source.as_ref(), vec![UnvalidatedArtifact { message: dst_msg, @@ -2468,7 +2465,7 @@ fn can_state_sync_below_commit() { vec![height(1), height(2)] ); dst_state_manager.commit_and_certify(state, height(3), CertificationScope::Full); - wait_for_checkpoint(&dst_state_manager, height(3)); + wait_for_checkpoint(&*dst_state_manager, height(3)); let (tip_height, _state) = dst_state_manager.take_tip(); assert_eq!(tip_height, height(3));