Skip to content

Commit

Permalink
chore: Remove ArcOrBoxClient by separating StateSync from StateManager
Browse files Browse the repository at this point in the history
  • Loading branch information
tthebst committed Jan 26, 2023
1 parent d69e626 commit ac3e203
Show file tree
Hide file tree
Showing 12 changed files with 279 additions and 261 deletions.
2 changes: 1 addition & 1 deletion rs/artifact_manager/src/clients.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ pub(crate) trait ArtifactManagerBackend: Send + Sync {
/// Implementation struct for `ArtifactManagerBackend`.
pub(crate) struct ArtifactManagerBackendImpl<Artifact: ArtifactKind + 'static> {
/// Reference to the artifact client.
pub client: Arc<dyn ArtifactClient<Artifact>>,
pub client: Box<dyn ArtifactClient<Artifact>>,
/// The artifact processor front end.
pub processor: ArtifactProcessorManager<Artifact>,
}
Expand Down
34 changes: 3 additions & 31 deletions rs/artifact_manager/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,38 +185,13 @@ impl ArtifactManagerMaker {
clients: HashMap::new(),
}
}
/// The method adds a new `ArtifactClient` (that is already wrapped in
/// `Arc`) to be managed.
pub fn add_arc_client<Artifact: ArtifactKind + 'static>(
&mut self,
client: Arc<dyn ArtifactClient<Artifact>>,
processor: ArtifactProcessorManager<Artifact>,
) where
Artifact::Message:
ChunkableArtifact + Send + TryFrom<artifact::Artifact, Error = artifact::Artifact>,
Advert<Artifact>:
Into<p2p::GossipAdvert> + TryFrom<p2p::GossipAdvert, Error = p2p::GossipAdvert> + Eq,
for<'b> &'b Artifact::Id:
TryFrom<&'b artifact::ArtifactId, Error = &'b artifact::ArtifactId>,
artifact::ArtifactFilter: AsMut<Artifact::Filter> + AsRef<Artifact::Filter>,
for<'b> &'b Artifact::Attribute:
TryFrom<&'b artifact::ArtifactAttribute, Error = &'b artifact::ArtifactAttribute>,
Artifact::Attribute: 'static,
{
let tag = Artifact::TAG;
self.clients.insert(
tag,
Box::new(ArtifactManagerBackendImpl { client, processor }),
);
}

/// The method adds a new `ArtifactClient` to be managed.
pub fn add_client<Artifact: ArtifactKind + 'static, Client: 'static>(
pub fn add_client<Artifact: ArtifactKind + 'static>(
&mut self,
client: Client,
client: Box<dyn ArtifactClient<Artifact>>,
processor: ArtifactProcessorManager<Artifact>,
) where
Client: ArtifactClient<Artifact>,
Artifact::Message:
ChunkableArtifact + Send + TryFrom<artifact::Artifact, Error = artifact::Artifact>,
Advert<Artifact>:
Expand All @@ -231,10 +206,7 @@ impl ArtifactManagerMaker {
let tag = Artifact::TAG;
self.clients.insert(
tag,
Box::new(ArtifactManagerBackendImpl {
client: Arc::new(client) as Arc<_>,
processor,
}),
Box::new(ArtifactManagerBackendImpl { client, processor }),
);
}

Expand Down
39 changes: 8 additions & 31 deletions rs/artifact_manager/src/processors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,29 +41,6 @@ enum AdvertSource {
Relayed,
}

/// A client may be either wrapped in `Box` or `Arc`.
pub enum BoxOrArcClient<Artifact: ArtifactKind> {
/// The client wrapped in `Box`.
BoxClient(Box<dyn ArtifactProcessor<Artifact>>),
/// The client wrapped in `Arc`.
ArcClient(Arc<dyn ArtifactProcessor<Artifact> + Sync + 'static>),
}

impl<Artifact: ArtifactKind> BoxOrArcClient<Artifact> {
/// The method calls the corresponding client's `process_changes` with the
/// given time source and artifacts.
fn process_changes(
&self,
time_source: &dyn TimeSource,
artifacts: Vec<UnvalidatedArtifact<Artifact::Message>>,
) -> (Vec<AdvertSendRequest<Artifact>>, ProcessingResult) {
match self {
BoxOrArcClient::BoxClient(client) => client.process_changes(time_source, artifacts),
BoxOrArcClient::ArcClient(client) => client.process_changes(time_source, artifacts),
}
}
}

/// Metrics for a client artifact processor.
struct ArtifactProcessorMetrics {
/// The processing time histogram.
Expand Down Expand Up @@ -140,7 +117,7 @@ impl<Artifact: ArtifactKind + 'static> ArtifactProcessorManager<Artifact> {
pub fn new<S: Fn(AdvertSendRequest<Artifact>) + Send + 'static>(
time_source: Arc<SysTimeSource>,
metrics_registry: MetricsRegistry,
client: BoxOrArcClient<Artifact>,
client: Box<dyn ArtifactProcessor<Artifact>>,
send_advert: S,
) -> Self
where
Expand Down Expand Up @@ -191,7 +168,7 @@ impl<Artifact: ArtifactKind + 'static> ArtifactProcessorManager<Artifact> {
fn process_messages<S: Fn(AdvertSendRequest<Artifact>) + Send + 'static>(
pending_artifacts: Arc<Mutex<Vec<UnvalidatedArtifact<Artifact::Message>>>>,
time_source: Arc<SysTimeSource>,
client: BoxOrArcClient<Artifact>,
client: Box<dyn ArtifactProcessor<Artifact>>,
send_advert: Box<S>,
sender: Sender<ProcessRequest>,
receiver: Receiver<ProcessRequest>,
Expand Down Expand Up @@ -291,7 +268,7 @@ impl<PoolConsensus: MutableConsensusPool + Send + Sync + 'static>
let manager = ArtifactProcessorManager::new(
time_source,
metrics_registry,
BoxOrArcClient::BoxClient(Box::new(client)),
Box::new(client),
send_advert,
);
(
Expand Down Expand Up @@ -433,7 +410,7 @@ impl<Pool: MutableIngressPool + Send + Sync + 'static> IngressProcessor<Pool> {
let manager = ArtifactProcessorManager::new(
time_source.clone(),
metrics_registry,
BoxOrArcClient::BoxClient(Box::new(client)),
Box::new(client),
send_advert,
);
(
Expand Down Expand Up @@ -559,7 +536,7 @@ impl<PoolCertification: MutableCertificationPool + Send + Sync + 'static>
let manager = ArtifactProcessorManager::new(
time_source,
metrics_registry,
BoxOrArcClient::BoxClient(Box::new(client)),
Box::new(client),
send_advert,
);
(
Expand Down Expand Up @@ -678,7 +655,7 @@ impl<PoolDkg: MutableDkgPool + Send + Sync + 'static> DkgProcessor<PoolDkg> {
let manager = ArtifactProcessorManager::new(
time_source,
metrics_registry,
BoxOrArcClient::BoxClient(Box::new(client)),
Box::new(client),
send_advert,
);
(clients::DkgClient::new(dkg_pool, dkg_gossip), manager)
Expand Down Expand Up @@ -780,7 +757,7 @@ impl<PoolEcdsa: MutableEcdsaPool + Send + Sync + 'static> EcdsaProcessor<PoolEcd
let manager = ArtifactProcessorManager::new(
time_source,
metrics_registry,
BoxOrArcClient::BoxClient(Box::new(client)),
Box::new(client),
send_advert,
);
(clients::EcdsaClient::new(ecdsa_pool, ecdsa_gossip), manager)
Expand Down Expand Up @@ -900,7 +877,7 @@ impl<
let manager = ArtifactProcessorManager::new(
time_source,
metrics_registry,
BoxOrArcClient::BoxClient(Box::new(client)),
Box::new(client),
send_advert,
);
(
Expand Down
2 changes: 1 addition & 1 deletion rs/artifact_manager/tests/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ fn setup_manager(artifact_pool_config: ArtifactPoolConfig) -> Arc<dyn ArtifactMa
replica_logger,
metrics_registry,
);
artifact_manager_maker.add_client(consensus_client, actor);
artifact_manager_maker.add_client(Box::new(consensus_client), actor);
artifact_manager_maker.finish()
}

Expand Down
15 changes: 8 additions & 7 deletions rs/p2p/tests/framework/file_tree_artifact_mgr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use ic_types::NodeId;
use std::collections::HashMap;
use std::error::Error;
use std::path::PathBuf;
use std::sync::Mutex;
use std::sync::{Arc, Mutex};

const NODE_PREFIX: &str = "NODE";
const POOL: &str = "POOL";
Expand All @@ -29,13 +29,14 @@ const MAX_CHUNKS: u32 = 5;

type FileTreeSyncInMemoryPool = HashMap<TestArtifactId, FileTreeSyncArtifact>;

#[derive(Clone)]
pub struct ArtifactChunkingTestImpl {
node_pool_dir: PathBuf, // Path to on disk pool
node_id: NodeId,
file_tree_sync_unvalidated_pool: Mutex<FileTreeSyncInMemoryPool>, /* In memory representaion
* on on-disk
* pool */
file_tree_sync_validated_pool: Mutex<FileTreeSyncInMemoryPool>,
file_tree_sync_unvalidated_pool: Arc<Mutex<FileTreeSyncInMemoryPool>>, /* In memory representaion
* on on-disk
* pool */
file_tree_sync_validated_pool: Arc<Mutex<FileTreeSyncInMemoryPool>>,
}

impl ArtifactProcessor<TestArtifact> for ArtifactChunkingTestImpl {
Expand Down Expand Up @@ -132,8 +133,8 @@ impl ArtifactChunkingTestImpl {
ArtifactChunkingTestImpl {
node_pool_dir: on_disk_pool_path,
node_id,
file_tree_sync_unvalidated_pool: Mutex::new(mem_pool),
file_tree_sync_validated_pool: Mutex::new(HashMap::new()),
file_tree_sync_unvalidated_pool: Arc::new(Mutex::new(mem_pool)),
file_tree_sync_validated_pool: Arc::new(Mutex::new(HashMap::new())),
}
}

Expand Down
2 changes: 1 addition & 1 deletion rs/p2p/tests/framework/p2p_runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ fn execute_test_chunking_pool(
let fake_crypto = CryptoReturningOk::default();
let fake_crypto = Arc::new(fake_crypto);
let node_pool_dir = test_synchronizer.get_test_group_directory();
let state_sync_client = Arc::new(ArtifactChunkingTestImpl::new(node_pool_dir, node_id));
let state_sync_client = Box::new(ArtifactChunkingTestImpl::new(node_pool_dir, node_id));
let state_sync_client =
P2PStateSyncClient::TestChunkingPool(state_sync_client.clone(), state_sync_client);
let subnet_config = SubnetConfigs::default().own_subnet_config(SubnetType::System);
Expand Down
29 changes: 14 additions & 15 deletions rs/replica/setup_ic_network/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ use ic_metrics::MetricsRegistry;
use ic_p2p::{start_p2p, AdvertBroadcaster, P2PThreadJoiner};
use ic_registry_client_helpers::subnet::SubnetRegistry;
use ic_replicated_state::ReplicatedState;
use ic_state_manager::StateManagerImpl;
use ic_state_manager::state_sync::StateSync;
use ic_transport::transport::create_transport;
use ic_types::{
artifact::{Advert, ArtifactKind, ArtifactTag, FileTreeSyncAttribute},
Expand All @@ -55,16 +55,15 @@ use ic_types::{
use std::sync::{Arc, Mutex, RwLock};

/// The P2P state sync client.
#[derive(Clone)]
pub enum P2PStateSyncClient {
/// The main client variant.
Client(Arc<StateManagerImpl>),
Client(StateSync),
/// The test client variant.
TestClient(),
/// The test chunking pool variant.
TestChunkingPool(
Arc<dyn ArtifactClient<TestArtifact>>,
Arc<dyn ArtifactProcessor<TestArtifact> + Sync + 'static>,
Box<dyn ArtifactClient<TestArtifact>>,
Box<dyn ArtifactProcessor<TestArtifact> + Sync + 'static>,
),
}

Expand Down Expand Up @@ -234,21 +233,21 @@ fn setup_artifact_manager(
let addr = processors::ArtifactProcessorManager::new(
Arc::clone(&time_source) as Arc<_>,
metrics_registry,
processors::BoxOrArcClient::ArcClient(client_on_state_change),
client_on_state_change,
move |req| advert_broadcaster.broadcast_advert(req.advert.into(), req.advert_class),
);
artifact_manager_maker.add_arc_client(client, addr);
artifact_manager_maker.add_client(client, addr);
return Ok(artifact_manager_maker.finish());
}
if let P2PStateSyncClient::Client(state_sync_client) = state_sync_client {
let advert_broadcaster = advert_broadcaster.clone();
let addr = processors::ArtifactProcessorManager::new(
Arc::clone(&time_source) as Arc<_>,
metrics_registry.clone(),
processors::BoxOrArcClient::ArcClient(Arc::clone(&state_sync_client) as Arc<_>),
Box::new(state_sync_client.clone()) as Box<_>,
move |req| advert_broadcaster.broadcast_advert(req.advert.into(), req.advert_class),
);
artifact_manager_maker.add_arc_client(state_sync_client, addr);
artifact_manager_maker.add_client(Box::new(state_sync_client), addr);
}

let consensus_replica_config = ReplicaConfig { node_id, subnet_id };
Expand Down Expand Up @@ -332,7 +331,7 @@ fn setup_artifact_manager(
replica_logger.clone(),
metrics_registry.clone(),
);
artifact_manager_maker.add_client(consensus_client, actor);
artifact_manager_maker.add_client(Box::new(consensus_client), actor);
}

{
Expand All @@ -348,7 +347,7 @@ fn setup_artifact_manager(
node_id,
malicious_flags.clone(),
);
artifact_manager_maker.add_client(ingress_client, actor);
artifact_manager_maker.add_client(Box::new(ingress_client), actor);
}

{
Expand All @@ -372,7 +371,7 @@ fn setup_artifact_manager(
replica_logger.clone(),
metrics_registry.clone(),
);
artifact_manager_maker.add_client(certification_client, actor);
artifact_manager_maker.add_client(Box::new(certification_client), actor);
}

{
Expand All @@ -398,7 +397,7 @@ fn setup_artifact_manager(
replica_logger.clone(),
metrics_registry.clone(),
);
artifact_manager_maker.add_client(dkg_client, actor);
artifact_manager_maker.add_client(Box::new(dkg_client), actor);
}

{
Expand Down Expand Up @@ -441,7 +440,7 @@ fn setup_artifact_manager(
metrics_registry.clone(),
replica_logger.clone(),
);
artifact_manager_maker.add_client(ecdsa_client, actor);
artifact_manager_maker.add_client(Box::new(ecdsa_client), actor);
}

{
Expand Down Expand Up @@ -472,7 +471,7 @@ fn setup_artifact_manager(
replica_logger.clone(),
metrics_registry.clone(),
);
artifact_manager_maker.add_client(canister_http_client, actor);
artifact_manager_maker.add_client(Box::new(canister_http_client), actor);
}

Ok(artifact_manager_maker.finish())
Expand Down
6 changes: 4 additions & 2 deletions rs/replica/src/setup_p2p.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use ic_replica_setup_ic_network::{
create_networking_stack, init_artifact_pools, P2PStateSyncClient,
};
use ic_replicated_state::ReplicatedState;
use ic_state_manager::StateManagerImpl;
use ic_state_manager::{state_sync::StateSync, StateManagerImpl};
use ic_types::{consensus::catchup::CUPWithOriginalProtobuf, NodeId, SubnetId};
use ic_xnet_endpoint::{XNetEndpoint, XNetEndpointConfig};
use ic_xnet_payload_builder::XNetPayloadBuilderImpl;
Expand Down Expand Up @@ -233,6 +233,8 @@ pub fn construct_ic_stack(
replica_logger.clone(),
);

let state_sync = StateSync::new(state_manager.clone(), replica_logger.clone());

let (ingress_ingestion_service, p2p_runner) = create_networking_stack(
metrics_registry,
replica_logger,
Expand All @@ -245,7 +247,7 @@ pub fn construct_ic_stack(
None,
Arc::clone(&crypto) as Arc<_>,
Arc::clone(&state_manager) as Arc<_>,
P2PStateSyncClient::Client(Arc::clone(&state_manager) as Arc<_>),
P2PStateSyncClient::Client(state_sync),
xnet_payload_builder as Arc<_>,
self_validating_payload_builder as Arc<_>,
message_router as Arc<_>,
Expand Down
Loading

0 comments on commit ac3e203

Please sign in to comment.