Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rework the event system of sc-network #1370

Merged
merged 58 commits into from
Nov 28, 2023
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
58 commits
Select commit Hold shift + click to select a range
f643843
Rework the event system of `sc-network`
altonen Sep 3, 2023
026466c
Fix clippy
altonen Sep 3, 2023
229197e
Fix warnings
altonen Sep 4, 2023
6d995a0
Merge remote-tracking branch 'origin/master' into altonen-rework-sc-n…
altonen Sep 4, 2023
208fe22
Report notification metrics in `NotificationService`/`NotificationSink`
altonen Sep 4, 2023
00611e0
Remove notifications sinks on `NotificationStreamClosed`
altonen Sep 4, 2023
b6174e3
Update substrate/client/network/src/service/traits.rs
altonen Sep 5, 2023
4fe313e
Merge remote-tracking branch 'origin/master' into altonen-rework-sc-n…
altonen Sep 5, 2023
120f429
Implement `NotificationService::try_set_handshake()`
altonen Sep 5, 2023
fa7156b
Apply review comments
altonen Sep 7, 2023
e1acf57
Merge remote-tracking branch 'origin/master' into altonen-rework-sc-n…
altonen Sep 7, 2023
d5a8374
Merge remote-tracking branch 'origin/master' into altonen-rework-sc-n…
altonen Sep 14, 2023
d178270
Merge remote-tracking branch 'origin/master' into altonen-rework-sc-n…
altonen Sep 22, 2023
c6b10d5
Merge remote-tracking branch 'origin/master' into altonen-rework-sc-n…
altonen Sep 23, 2023
2d4bdda
Fix warnings
altonen Sep 23, 2023
b074c57
Merge remote-tracking branch 'origin/master' into altonen-rework-sc-n…
altonen Sep 26, 2023
910a737
Merge remote-tracking branch 'origin/master' into altonen-rework-sc-n…
altonen Sep 28, 2023
3aa962c
Merge remote-tracking branch 'origin/master' into altonen-rework-sc-n…
altonen Oct 1, 2023
beee591
Merge remote-tracking branch 'origin/master' into altonen-rework-sc-n…
altonen Oct 16, 2023
f4286da
Fix mixnet
altonen Oct 17, 2023
a2bf29d
Merge remote-tracking branch 'origin/master' into altonen-rework-sc-n…
altonen Oct 17, 2023
ff10bfb
Add `Sync`
altonen Oct 17, 2023
1fab0f1
Merge remote-tracking branch 'origin/master' into altonen-rework-sc-n…
altonen Oct 20, 2023
e2d6d22
Merge remote-tracking branch 'origin/master' into altonen-rework-sc-n…
altonen Nov 1, 2023
6451147
Use `NotificationService` in `ConsensusGossip`
altonen Nov 1, 2023
ede7ff0
Merge remote-tracking branch 'origin/master' into altonen-rework-sc-n…
altonen Nov 2, 2023
09be0b9
Merge remote-tracking branch 'origin/master' into altonen-rework-sc-n…
altonen Nov 4, 2023
fd3a6ce
Fix stuff
altonen Oct 16, 2023
bc9d6cf
Fix tests
altonen Nov 4, 2023
aa78cf6
Fix UI test
altonen Nov 6, 2023
cc2b3cf
Merge remote-tracking branch 'origin/master' into altonen-rework-sc-n…
altonen Nov 6, 2023
81b5997
Ignore missing `NotificationService`
altonen Nov 7, 2023
9d24a55
Merge remote-tracking branch 'origin/master' into altonen-rework-sc-n…
altonen Nov 8, 2023
11b0dac
Fix stuff
altonen Nov 9, 2023
2e52080
Merge remote-tracking branch 'origin/master' into altonen-rework-sc-n…
altonen Nov 13, 2023
6515510
Merge remote-tracking branch 'origin/master' into altonen-rework-sc-n…
altonen Nov 16, 2023
633f481
Reject peers with unknown roles
altonen Nov 16, 2023
8dec276
Update Cargo.lock
altonen Nov 16, 2023
6dfef05
Fix more stuff
altonen Nov 17, 2023
6da99b3
Merge remote-tracking branch 'origin/master' into altonen-rework-sc-n…
altonen Nov 20, 2023
65ece16
Fix stuff more
altonen Nov 20, 2023
7770477
Fix warnings
altonen Nov 20, 2023
eeab2fd
Use `ProtocolHandle` for fetching peer counts
altonen Nov 20, 2023
5049ade
zzz
altonen Nov 20, 2023
befd2d9
Prioritize validation events over other events
altonen Nov 21, 2023
cd94277
Improve validation
altonen Nov 22, 2023
0e140bd
Merge remote-tracking branch 'origin/master' into altonen-rework-sc-n…
altonen Nov 22, 2023
baf44b2
Fix clippy
altonen Nov 22, 2023
ac76424
Clean up things
altonen Nov 23, 2023
20395c6
Updates
altonen Nov 23, 2023
3273e32
Merge remote-tracking branch 'origin/master' into altonen-rework-sc-n…
altonen Nov 27, 2023
e791de2
Fix networkbridge test
altonen Nov 27, 2023
32c0c5f
Merge remote-tracking branch 'origin/master' into altonen-rework-sc-n…
altonen Nov 27, 2023
1ce77bd
Merge remote-tracking branch 'origin/master' into altonen-rework-sc-n…
altonen Nov 27, 2023
691f4c8
Update Cargo.lock
altonen Nov 27, 2023
b04d65d
Fix gitlab ui test
altonen Nov 27, 2023
0b2e718
Merge remote-tracking branch 'origin/master' into altonen-rework-sc-n…
altonen Nov 28, 2023
76bb0be
Merge remote-tracking branch 'origin/master' into altonen-rework-sc-n…
altonen Nov 28, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
216 changes: 110 additions & 106 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions cumulus/client/relay-chain-minimal-node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ schnellru = "0.2.1"
tracing = "0.1.37"
async-trait = "0.1.73"
futures = "0.3.28"
parking_lot = "0.12.1"

[features]
network-protocol-staging = [
Expand Down
15 changes: 12 additions & 3 deletions cumulus/client/relay-chain-minimal-node/src/collator_overseer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.

use futures::{select, StreamExt};
use parking_lot::Mutex;
use schnellru::{ByLength, LruMap};
use std::sync::Arc;
use std::{collections::HashMap, sync::Arc};

use polkadot_availability_recovery::AvailabilityRecoverySubsystem;
use polkadot_collator_protocol::{CollatorProtocolSubsystem, ProtocolSide};
Expand All @@ -27,7 +28,7 @@ use polkadot_network_bridge::{
use polkadot_node_collation_generation::CollationGenerationSubsystem;
use polkadot_node_core_runtime_api::RuntimeApiSubsystem;
use polkadot_node_network_protocol::{
peer_set::PeerSetProtocolNames,
peer_set::{PeerSet, PeerSetProtocolNames},
request_response::{
v1::{self, AvailableDataFetchingRequest},
vstaging, IncomingRequestReceiver, ReqProtocolNames,
Expand All @@ -41,7 +42,7 @@ use polkadot_overseer::{
use polkadot_primitives::CollatorPair;

use sc_authority_discovery::Service as AuthorityDiscoveryService;
use sc_network::NetworkStateInfo;
use sc_network::{NetworkStateInfo, NotificationService};
use sc_service::TaskManager;
use sp_runtime::traits::Block as BlockT;

Expand Down Expand Up @@ -77,6 +78,8 @@ pub(crate) struct CollatorOverseerGenArgs<'a> {
pub req_protocol_names: ReqProtocolNames,
/// Peerset protocols name mapping
pub peer_set_protocol_names: PeerSetProtocolNames,
/// Notification services for validation/collation protocols.
pub notification_services: HashMap<PeerSet, Box<dyn NotificationService>>,
}

fn build_overseer(
Expand All @@ -94,13 +97,16 @@ fn build_overseer(
collator_pair,
req_protocol_names,
peer_set_protocol_names,
notification_services,
}: CollatorOverseerGenArgs<'_>,
) -> Result<
(Overseer<SpawnGlue<sc_service::SpawnTaskHandle>, Arc<BlockChainRpcClient>>, OverseerHandle),
RelayChainError,
> {
let spawner = SpawnGlue(spawner);
let network_bridge_metrics: NetworkBridgeMetrics = Metrics::register(registry)?;
let notification_sinks = Arc::new(Mutex::new(HashMap::new()));

let builder = Overseer::builder()
.availability_distribution(DummySubsystem)
.availability_recovery(AvailabilityRecoverySubsystem::with_availability_store_skip(
Expand Down Expand Up @@ -131,13 +137,16 @@ fn build_overseer(
sync_oracle,
network_bridge_metrics.clone(),
peer_set_protocol_names.clone(),
notification_services,
notification_sinks.clone(),
))
.network_bridge_tx(NetworkBridgeTxSubsystem::new(
network_service,
authority_discovery_service,
network_bridge_metrics,
req_protocol_names,
peer_set_protocol_names,
notification_sinks,
))
.provisioner(DummySubsystem)
.runtime_api(RuntimeApiSubsystem::new(
Expand Down
14 changes: 9 additions & 5 deletions cumulus/client/relay-chain-minimal-node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use cumulus_relay_chain_rpc_interface::{RelayChainRpcClient, RelayChainRpcInterf
use network::build_collator_network;
use polkadot_network_bridge::{peer_sets_info, IsAuthority};
use polkadot_node_network_protocol::{
peer_set::PeerSetProtocolNames,
peer_set::{PeerSet, PeerSetProtocolNames},
request_response::{
v1, vstaging, IncomingRequest, IncomingRequestReceiver, Protocol, ReqProtocolNames,
},
Expand Down Expand Up @@ -176,10 +176,13 @@ async fn new_minimal_relay_chain(
let peer_set_protocol_names =
PeerSetProtocolNames::new(genesis_hash, config.chain_spec.fork_id());
let is_authority = if role.is_authority() { IsAuthority::Yes } else { IsAuthority::No };

for config in peer_sets_info(is_authority, &peer_set_protocol_names) {
net_config.add_notification_protocol(config);
}
let notification_services = peer_sets_info(is_authority, &peer_set_protocol_names)
.into_iter()
.map(|(config, (peerset, service))| {
net_config.add_notification_protocol(config);
(peerset, service)
})
.collect::<std::collections::HashMap<PeerSet, Box<dyn sc_network::NotificationService>>>();

let request_protocol_names = ReqProtocolNames::new(genesis_hash, config.chain_spec.fork_id());
let (collation_req_receiver_v1, collation_req_receiver_vstaging, available_data_req_receiver) =
Expand Down Expand Up @@ -219,6 +222,7 @@ async fn new_minimal_relay_chain(
collator_pair,
req_protocol_names: request_protocol_names,
peer_set_protocol_names,
notification_services,
};

let overseer_handle =
Expand Down
25 changes: 10 additions & 15 deletions cumulus/client/relay-chain-minimal-node/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,9 @@ use sc_network::{
NetworkService,
};

use sc_network::config::FullNetworkConfiguration;
use sc_network::{config::FullNetworkConfiguration, NotificationService};
use sc_network_common::{role::Roles, sync::message::BlockAnnouncesHandshake};
use sc_service::{error::Error, Configuration, NetworkStarter, SpawnTaskHandle};
use sc_utils::mpsc::tracing_unbounded;

use std::{iter, sync::Arc};

Expand All @@ -44,7 +43,7 @@ pub(crate) fn build_collator_network(
Error,
> {
let protocol_id = config.protocol_id();
let block_announce_config = get_block_announce_proto_config::<Block>(
let (block_announce_config, _notification_service) = get_block_announce_proto_config::<Block>(
protocol_id.clone(),
&None,
Roles::from(&config.role),
Expand All @@ -64,8 +63,6 @@ pub(crate) fn build_collator_network(
let peer_store_handle = peer_store.handle();
spawn_handle.spawn("peer-store", Some("networking"), peer_store.run());

// RX is not used for anything because syncing is not started for the minimal node
let (tx, _rx) = tracing_unbounded("mpsc_syncing_engine_protocol", 100_000);
let network_params = sc_network::config::Params::<Block> {
role: config.role.clone(),
executor: {
Expand All @@ -81,7 +78,6 @@ pub(crate) fn build_collator_network(
protocol_id,
metrics_registry: config.prometheus_config.as_ref().map(|config| config.registry.clone()),
block_announce_config,
tx,
};

let network_worker = sc_network::NetworkWorker::new(network_params)?;
Expand Down Expand Up @@ -133,7 +129,7 @@ fn get_block_announce_proto_config<B: BlockT>(
best_number: NumberFor<B>,
best_hash: B::Hash,
genesis_hash: B::Hash,
) -> NonDefaultSetConfig {
) -> (NonDefaultSetConfig, Box<dyn NotificationService>) {
let block_announces_protocol = {
let genesis_hash = genesis_hash.as_ref();
if let Some(ref fork_id) = fork_id {
Expand All @@ -143,24 +139,23 @@ fn get_block_announce_proto_config<B: BlockT>(
}
};

NonDefaultSetConfig {
notifications_protocol: block_announces_protocol.into(),
fallback_names: iter::once(format!("/{}/block-announces/1", protocol_id.as_ref()).into())
.collect(),
max_notification_size: 1024 * 1024,
handshake: Some(NotificationHandshake::new(BlockAnnouncesHandshake::<B>::build(
NonDefaultSetConfig::new(
block_announces_protocol.into(),
iter::once(format!("/{}/block-announces/1", protocol_id.as_ref()).into()).collect(),
1024 * 1024,
Some(NotificationHandshake::new(BlockAnnouncesHandshake::<B>::build(
roles,
best_number,
best_hash,
genesis_hash,
))),
// NOTE: `set_config` will be ignored by `protocol.rs` as the block announcement
// protocol is still hardcoded into the peerset.
set_config: SetConfig {
SetConfig {
in_peers: 0,
out_peers: 0,
reserved_nodes: Vec::new(),
non_reserved_mode: NonReservedPeerMode::Deny,
},
}
)
}
1 change: 1 addition & 0 deletions polkadot/node/network/bridge/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ pub(crate) enum WireMessage<M> {
ViewUpdate(View),
}

#[derive(Debug)]
pub(crate) struct PeerData {
/// The Latest view sent by the peer.
view: View,
Expand Down
60 changes: 29 additions & 31 deletions polkadot/node/network/bridge/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,24 @@
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.

use std::{collections::HashSet, sync::Arc};
use std::{
collections::{HashMap, HashSet},
sync::Arc,
};

use async_trait::async_trait;
use futures::{prelude::*, stream::BoxStream};
use parking_lot::Mutex;

use parity_scale_codec::Encode;

use sc_network::{
config::parse_addr, multiaddr::Multiaddr, types::ProtocolName, Event as NetworkEvent,
IfDisconnected, NetworkEventStream, NetworkNotification, NetworkPeers, NetworkRequest,
NetworkService, OutboundFailure, ReputationChange, RequestFailure,
config::parse_addr, multiaddr::Multiaddr, types::ProtocolName, IfDisconnected, MessageSink,
NetworkPeers, NetworkRequest, NetworkService, OutboundFailure, ReputationChange,
RequestFailure,
};

use polkadot_node_network_protocol::{
peer_set::{PeerSet, PeerSetProtocolNames, ProtocolVersion},
peer_set::{PeerSet, ProtocolVersion},
request_response::{OutgoingRequest, Recipient, ReqProtocolNames, Requests},
PeerId,
};
Expand All @@ -45,51 +48,50 @@ const LOG_TARGET: &'static str = "parachain::network-bridge-net";
/// messages that are compatible with the passed peer set, as that is currently not enforced by
/// this function. These are messages of type `WireMessage` parameterized on the matching type.
pub(crate) fn send_message<M>(
net: &mut impl Network,
mut peers: Vec<PeerId>,
peer_set: PeerSet,
version: ProtocolVersion,
protocol_names: &PeerSetProtocolNames,
message: M,
metrics: &super::Metrics,
network_notification_sinks: &Arc<Mutex<HashMap<(PeerSet, PeerId), Box<dyn MessageSink>>>>,
) where
M: Encode + Clone,
{
if peers.is_empty() {
return
}

let message = {
let encoded = message.encode();
metrics.on_notification_sent(peer_set, version, encoded.len(), peers.len());
encoded
};

let notification_sinks = network_notification_sinks.lock();
altonen marked this conversation as resolved.
Show resolved Hide resolved

// optimization: avoid cloning the message for the last peer in the
// list. The message payload can be quite large. If the underlying
// network used `Bytes` this would not be necessary.
//
// peer may have gotten disconnect by the time `send_message()` is called
// at which point the the sink is not available.
let last_peer = peers.pop();

// We always send messages on the "main" name even when a negotiated
// fallback is used. The libp2p implementation handles the fallback
// under the hood.
let protocol_name = protocol_names.get_main_name(peer_set);
peers.into_iter().for_each(|peer| {
net.write_notification(peer, protocol_name.clone(), message.clone());
if let Some(sink) = notification_sinks.get(&(peer_set, peer)) {
sink.send_sync_notification(message.clone());
}
});

if let Some(peer) = last_peer {
net.write_notification(peer, protocol_name, message);
if let Some(sink) = notification_sinks.get(&(peer_set, peer)) {
sink.send_sync_notification(message.clone());
}
}
}

/// An abstraction over networking for the purposes of this subsystem.
#[async_trait]
pub trait Network: Clone + Send + 'static {
/// Get a stream of all events occurring on the network. This may include events unrelated
/// to the Polkadot protocol - the user of this function should filter only for events related
/// to the [`VALIDATION_PROTOCOL_NAME`](VALIDATION_PROTOCOL_NAME)
/// or [`COLLATION_PROTOCOL_NAME`](COLLATION_PROTOCOL_NAME)
fn event_stream(&mut self) -> BoxStream<'static, NetworkEvent>;

/// Ask the network to keep a substream open with these nodes and not disconnect from them
/// until removed from the protocol's peer set.
/// Note that `out_peers` setting has no effect on this.
Expand Down Expand Up @@ -121,16 +123,12 @@ pub trait Network: Clone + Send + 'static {
/// Disconnect a given peer from the protocol specified without harming reputation.
fn disconnect_peer(&self, who: PeerId, protocol: ProtocolName);

/// Write a notification to a peer on the given protocol.
fn write_notification(&self, who: PeerId, protocol: ProtocolName, message: Vec<u8>);
/// Get peer role.
fn peer_role(&self, who: PeerId, handshake: Vec<u8>) -> Option<sc_network::ObservedRole>;
}

#[async_trait]
impl Network for Arc<NetworkService<Block, Hash>> {
fn event_stream(&mut self) -> BoxStream<'static, NetworkEvent> {
NetworkService::event_stream(self, "polkadot-network-bridge").boxed()
}

async fn set_reserved_peers(
&mut self,
protocol: ProtocolName,
Expand All @@ -155,10 +153,6 @@ impl Network for Arc<NetworkService<Block, Hash>> {
NetworkService::disconnect_peer(&**self, who, protocol);
}

fn write_notification(&self, who: PeerId, protocol: ProtocolName, message: Vec<u8>) {
NetworkService::write_notification(&**self, who, protocol, message);
}

async fn start_request<AD: AuthorityDiscovery>(
&self,
authority_discovery: &mut AD,
Expand Down Expand Up @@ -230,6 +224,10 @@ impl Network for Arc<NetworkService<Block, Hash>> {
if_disconnected,
);
}

fn peer_role(&self, who: PeerId, handshake: Vec<u8>) -> Option<sc_network::ObservedRole> {
NetworkService::peer_role(self, who, handshake)
}
}

/// We assume one `peer_id` per `authority_id`.
Expand Down
Loading