diff --git a/substrate/client/network/common/src/sync.rs b/substrate/client/network/common/src/sync.rs index b142925aeb10..461c4ae411d6 100644 --- a/substrate/client/network/common/src/sync.rs +++ b/substrate/client/network/common/src/sync.rs @@ -22,12 +22,12 @@ pub mod message; pub mod metrics; pub mod warp; -use crate::{role::Roles, types::ReputationChange}; +use crate::{role::Roles, sync::message::BlockAnnounce, types::ReputationChange}; use futures::Stream; use libp2p_identity::PeerId; -use message::{BlockAnnounce, BlockData, BlockRequest, BlockResponse}; +use message::{BlockData, BlockRequest, BlockResponse}; use sc_consensus::{import_queue::RuntimeOrigin, IncomingBlock}; use sp_consensus::BlockOrigin; use sp_runtime::{ @@ -157,38 +157,6 @@ pub enum ImportResult { JustificationImport(RuntimeOrigin, B::Hash, NumberFor, Justifications), } -/// Value polled from `ChainSync` -#[derive(Debug)] -pub enum PollResult { - Import(ImportResult), - Announce(PollBlockAnnounceValidation), -} - -/// Result of [`ChainSync::poll_block_announce_validation`]. -#[derive(Debug, Clone, PartialEq, Eq)] -pub enum PollBlockAnnounceValidation { - /// The announcement failed at validation. - /// - /// The peer reputation should be decreased. - Failure { - /// Who sent the processed block announcement? - who: PeerId, - /// Should the peer be disconnected? - disconnect: bool, - }, - /// The announcement does not require further handling. - Nothing { - /// Who sent the processed block announcement? - who: PeerId, - /// Was this their new best block? - is_best: bool, - /// The announcement. - announce: BlockAnnounce, - }, - /// The block announcement should be skipped. - Skip, -} - /// Sync operation mode. #[derive(Copy, Clone, Debug, Eq, PartialEq)] pub enum SyncMode { @@ -408,29 +376,14 @@ pub trait ChainSync: Send { /// Notify about finalization of the given block. fn on_block_finalized(&mut self, hash: &Block::Hash, number: NumberFor); - /// Push a block announce validation. - /// - /// It is required that [`ChainSync::poll_block_announce_validation`] is called - /// to check for finished block announce validations. - fn push_block_announce_validation( + /// Notify about pre-validated block announcement. + fn on_validated_block_announce( &mut self, - who: PeerId, - hash: Block::Hash, - announce: BlockAnnounce, is_best: bool, + who: PeerId, + announce: &BlockAnnounce, ); - /// Poll block announce validation. - /// - /// Block announce validations can be pushed by using - /// [`ChainSync::push_block_announce_validation`]. - /// - /// This should be polled until it returns [`Poll::Pending`]. - fn poll_block_announce_validation( - &mut self, - cx: &mut std::task::Context<'_>, - ) -> Poll>; - /// Call when a peer has disconnected. /// Canceled obsolete block request may result in some blocks being ready for /// import, so this functions checks for such blocks and returns them. @@ -447,14 +400,7 @@ pub trait ChainSync: Send { ) -> Result>, String>; /// Advance the state of `ChainSync` - /// - /// Internally calls [`ChainSync::poll_block_announce_validation()`] and - /// this function should be polled until it returns [`Poll::Pending`] to - /// consume all pending events. - fn poll( - &mut self, - cx: &mut std::task::Context, - ) -> Poll>; + fn poll(&mut self, cx: &mut std::task::Context) -> Poll<()>; /// Send block request to peer fn send_block_request(&mut self, who: PeerId, request: BlockRequest); diff --git a/substrate/client/network/sync/src/block_announce_validator.rs b/substrate/client/network/sync/src/block_announce_validator.rs new file mode 100644 index 000000000000..f083f9e29e44 --- /dev/null +++ b/substrate/client/network/sync/src/block_announce_validator.rs @@ -0,0 +1,405 @@ +// This file is part of Substrate. + +// Copyright (C) Parity Technologies (UK) Ltd. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +//! `BlockAnnounceValidator` is responsible for async validation of block announcements. + +use crate::futures_stream::FuturesStream; +use futures::{Future, FutureExt, Stream, StreamExt}; +use libp2p::PeerId; +use log::{debug, error, trace, warn}; +use sc_network_common::sync::message::BlockAnnounce; +use sp_consensus::block_validation::Validation; +use sp_runtime::traits::{Block as BlockT, Header, Zero}; +use std::{ + collections::{hash_map::Entry, HashMap}, + default::Default, + pin::Pin, + task::{Context, Poll}, +}; + +/// Log target for this file. +const LOG_TARGET: &str = "sync"; + +/// Maximum number of concurrent block announce validations. +/// +/// If the queue reaches the maximum, we drop any new block +/// announcements. +const MAX_CONCURRENT_BLOCK_ANNOUNCE_VALIDATIONS: usize = 256; + +/// Maximum number of concurrent block announce validations per peer. +/// +/// See [`MAX_CONCURRENT_BLOCK_ANNOUNCE_VALIDATIONS`] for more information. +const MAX_CONCURRENT_BLOCK_ANNOUNCE_VALIDATIONS_PER_PEER: usize = 4; + +/// Item that yields [`Stream`] implementation of [`BlockAnnounceValidator`]. +#[derive(Debug, Clone, PartialEq, Eq)] +pub(crate) enum BlockAnnounceValidationResult { + /// The announcement failed at validation. + /// + /// The peer reputation should be decreased. + Failure { + /// The id of the peer that send us the announcement. + peer_id: PeerId, + /// Should the peer be disconnected? + disconnect: bool, + }, + /// The announcement was validated successfully and should be passed to [`crate::ChainSync`]. + Process { + /// The id of the peer that send us the announcement. + peer_id: PeerId, + /// Was this their new best block? + is_new_best: bool, + /// The announcement. + announce: BlockAnnounce, + }, + /// The block announcement should be skipped. + Skip { + /// The id of the peer that send us the announcement. + peer_id: PeerId, + }, +} + +impl BlockAnnounceValidationResult { + fn peer_id(&self) -> &PeerId { + match self { + BlockAnnounceValidationResult::Failure { peer_id, .. } | + BlockAnnounceValidationResult::Process { peer_id, .. } | + BlockAnnounceValidationResult::Skip { peer_id } => peer_id, + } + } +} + +/// Result of [`BlockAnnounceValidator::allocate_slot_for_block_announce_validation`]. +enum AllocateSlotForBlockAnnounceValidation { + /// Success, there is a slot for the block announce validation. + Allocated, + /// We reached the total maximum number of validation slots. + TotalMaximumSlotsReached, + /// We reached the maximum number of validation slots for the given peer. + MaximumPeerSlotsReached, +} + +pub(crate) struct BlockAnnounceValidator { + /// A type to check incoming block announcements. + validator: Box + Send>, + /// All block announcements that are currently being validated. + validations: FuturesStream< + Pin> + Send>>, + >, + /// Number of concurrent block announce validations per peer. + validations_per_peer: HashMap, +} + +impl BlockAnnounceValidator { + pub(crate) fn new( + validator: Box + Send>, + ) -> Self { + Self { + validator, + validations: Default::default(), + validations_per_peer: Default::default(), + } + } + + /// Push a block announce validation. + pub(crate) fn push_block_announce_validation( + &mut self, + peer_id: PeerId, + hash: B::Hash, + announce: BlockAnnounce, + is_best: bool, + ) { + let header = &announce.header; + let number = *header.number(); + debug!( + target: LOG_TARGET, + "Pre-validating received block announcement {:?} with number {:?} from {}", + hash, + number, + peer_id, + ); + + if number.is_zero() { + warn!( + target: LOG_TARGET, + "💔 Ignored genesis block (#0) announcement from {}: {}", + peer_id, + hash, + ); + return + } + + // Try to allocate a slot for this block announce validation. + match self.allocate_slot_for_block_announce_validation(&peer_id) { + AllocateSlotForBlockAnnounceValidation::Allocated => {}, + AllocateSlotForBlockAnnounceValidation::TotalMaximumSlotsReached => { + warn!( + target: LOG_TARGET, + "💔 Ignored block (#{} -- {}) announcement from {} because all validation slots are occupied.", + number, + hash, + peer_id, + ); + return + }, + AllocateSlotForBlockAnnounceValidation::MaximumPeerSlotsReached => { + warn!( + target: LOG_TARGET, + "💔 Ignored block (#{} -- {}) announcement from {} because all validation slots for this peer are occupied.", + number, + hash, + peer_id, + ); + return + }, + } + + // Let external validator check the block announcement. + let assoc_data = announce.data.as_ref().map_or(&[][..], |v| v.as_slice()); + let future = self.validator.validate(header, assoc_data); + + self.validations.push( + async move { + match future.await { + Ok(Validation::Success { is_new_best }) => { + let is_new_best = is_new_best || is_best; + + trace!( + target: LOG_TARGET, + "Block announcement validated successfully: from {}: {:?}. Local best: {}.", + peer_id, + announce.summary(), + is_new_best, + ); + + BlockAnnounceValidationResult::Process { is_new_best, announce, peer_id } + }, + Ok(Validation::Failure { disconnect }) => { + debug!( + target: LOG_TARGET, + "Block announcement validation failed: from {}, block {:?}. Disconnect: {}.", + peer_id, + hash, + disconnect, + ); + + BlockAnnounceValidationResult::Failure { peer_id, disconnect } + }, + Err(e) => { + debug!( + target: LOG_TARGET, + "💔 Ignoring block announcement validation from {} of block {:?} due to internal error: {}.", + peer_id, + hash, + e, + ); + + BlockAnnounceValidationResult::Skip { peer_id } + }, + } + } + .boxed(), + ); + } + + /// Checks if there is a slot for a block announce validation. + /// + /// The total number and the number per peer of concurrent block announce validations + /// is capped. + /// + /// Returns [`AllocateSlotForBlockAnnounceValidation`] to inform about the result. + /// + /// # Note + /// + /// It is *required* to call [`Self::deallocate_slot_for_block_announce_validation`] when the + /// validation is finished to clear the slot. + fn allocate_slot_for_block_announce_validation( + &mut self, + peer_id: &PeerId, + ) -> AllocateSlotForBlockAnnounceValidation { + if self.validations.len() >= MAX_CONCURRENT_BLOCK_ANNOUNCE_VALIDATIONS { + return AllocateSlotForBlockAnnounceValidation::TotalMaximumSlotsReached + } + + match self.validations_per_peer.entry(*peer_id) { + Entry::Vacant(entry) => { + entry.insert(1); + AllocateSlotForBlockAnnounceValidation::Allocated + }, + Entry::Occupied(mut entry) => { + if *entry.get() < MAX_CONCURRENT_BLOCK_ANNOUNCE_VALIDATIONS_PER_PEER { + *entry.get_mut() += 1; + AllocateSlotForBlockAnnounceValidation::Allocated + } else { + AllocateSlotForBlockAnnounceValidation::MaximumPeerSlotsReached + } + }, + } + } + + /// Should be called when a block announce validation is finished, to update the slots + /// of the peer that send the block announce. + fn deallocate_slot_for_block_announce_validation(&mut self, peer_id: &PeerId) { + match self.validations_per_peer.entry(*peer_id) { + Entry::Vacant(_) => { + error!( + target: LOG_TARGET, + "💔 Block announcement validation from peer {} finished for a slot that was not allocated!", + peer_id, + ); + }, + Entry::Occupied(mut entry) => match entry.get().checked_sub(1) { + Some(value) => + if value == 0 { + entry.remove(); + } else { + *entry.get_mut() = value; + }, + None => { + entry.remove(); + + error!( + target: LOG_TARGET, + "Invalid (zero) block announce validation slot counter for peer {peer_id}.", + ); + debug_assert!( + false, + "Invalid (zero) block announce validation slot counter for peer {peer_id}.", + ); + }, + }, + } + } +} + +impl Stream for BlockAnnounceValidator { + type Item = BlockAnnounceValidationResult; + + /// Poll for finished block announce validations. The stream never terminates. + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let validation = futures::ready!(self.validations.poll_next_unpin(cx)) + .expect("`FuturesStream` never terminates; qed"); + self.deallocate_slot_for_block_announce_validation(validation.peer_id()); + + Poll::Ready(Some(validation)) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::block_announce_validator::AllocateSlotForBlockAnnounceValidation; + use libp2p::PeerId; + use sp_consensus::block_validation::DefaultBlockAnnounceValidator; + use substrate_test_runtime_client::runtime::Block; + + #[test] + fn allocate_one_validation_slot() { + let mut validator = + BlockAnnounceValidator::::new(Box::new(DefaultBlockAnnounceValidator {})); + let peer_id = PeerId::random(); + + assert!(matches!( + validator.allocate_slot_for_block_announce_validation(&peer_id), + AllocateSlotForBlockAnnounceValidation::Allocated, + )); + } + + #[test] + fn allocate_validation_slots_for_two_peers() { + let mut validator = + BlockAnnounceValidator::::new(Box::new(DefaultBlockAnnounceValidator {})); + let peer_id_1 = PeerId::random(); + let peer_id_2 = PeerId::random(); + + assert!(matches!( + validator.allocate_slot_for_block_announce_validation(&peer_id_1), + AllocateSlotForBlockAnnounceValidation::Allocated, + )); + assert!(matches!( + validator.allocate_slot_for_block_announce_validation(&peer_id_2), + AllocateSlotForBlockAnnounceValidation::Allocated, + )); + } + + #[test] + fn maximum_validation_slots_per_peer() { + let mut validator = + BlockAnnounceValidator::::new(Box::new(DefaultBlockAnnounceValidator {})); + let peer_id = PeerId::random(); + + for _ in 0..MAX_CONCURRENT_BLOCK_ANNOUNCE_VALIDATIONS_PER_PEER { + assert!(matches!( + validator.allocate_slot_for_block_announce_validation(&peer_id), + AllocateSlotForBlockAnnounceValidation::Allocated, + )); + } + + assert!(matches!( + validator.allocate_slot_for_block_announce_validation(&peer_id), + AllocateSlotForBlockAnnounceValidation::MaximumPeerSlotsReached, + )); + } + + #[test] + fn validation_slots_per_peer_deallocated() { + let mut validator = + BlockAnnounceValidator::::new(Box::new(DefaultBlockAnnounceValidator {})); + let peer_id = PeerId::random(); + + for _ in 0..MAX_CONCURRENT_BLOCK_ANNOUNCE_VALIDATIONS_PER_PEER { + assert!(matches!( + validator.allocate_slot_for_block_announce_validation(&peer_id), + AllocateSlotForBlockAnnounceValidation::Allocated, + )); + } + + assert!(matches!( + validator.allocate_slot_for_block_announce_validation(&peer_id), + AllocateSlotForBlockAnnounceValidation::MaximumPeerSlotsReached, + )); + + validator.deallocate_slot_for_block_announce_validation(&peer_id); + + assert!(matches!( + validator.allocate_slot_for_block_announce_validation(&peer_id), + AllocateSlotForBlockAnnounceValidation::Allocated, + )); + } + + #[test] + fn maximum_validation_slots_for_all_peers() { + let mut validator = + BlockAnnounceValidator::::new(Box::new(DefaultBlockAnnounceValidator {})); + + for _ in 0..MAX_CONCURRENT_BLOCK_ANNOUNCE_VALIDATIONS { + validator.validations.push( + futures::future::ready(BlockAnnounceValidationResult::Skip { + peer_id: PeerId::random(), + }) + .boxed(), + ); + } + + let peer_id = PeerId::random(); + assert!(matches!( + validator.allocate_slot_for_block_announce_validation(&peer_id), + AllocateSlotForBlockAnnounceValidation::TotalMaximumSlotsReached, + )); + } +} diff --git a/substrate/client/network/sync/src/engine.rs b/substrate/client/network/sync/src/engine.rs index 65bd56a28958..9b97bf2b7c34 100644 --- a/substrate/client/network/sync/src/engine.rs +++ b/substrate/client/network/sync/src/engine.rs @@ -20,6 +20,9 @@ //! to tip and keep the blockchain up to date with network updates. use crate::{ + block_announce_validator::{ + BlockAnnounceValidationResult, BlockAnnounceValidator as BlockAnnounceValidatorStream, + }, service::{self, chain_sync::ToServiceCommand}, ChainSync, ClientError, SyncingService, }; @@ -45,7 +48,7 @@ use sc_network_common::{ sync::{ message::{BlockAnnounce, BlockAnnouncesHandshake, BlockState}, warp::WarpSyncParams, - BadPeer, ChainSync as ChainSyncT, ExtendedPeerInfo, PollBlockAnnounceValidation, SyncEvent, + BadPeer, ChainSync as ChainSyncT, ExtendedPeerInfo, SyncEvent, }, }; use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender}; @@ -239,6 +242,9 @@ pub struct SyncingEngine { /// Number of inbound peers accepted so far. num_in_peers: usize, + /// Async processor of block announce validations. + block_announce_validator: BlockAnnounceValidatorStream, + /// A cache for the data that was associated to a block announcement. block_announce_data_cache: LruMap>, @@ -352,7 +358,6 @@ where protocol_id, fork_id, roles, - block_announce_validator, max_parallel_downloads, max_blocks_per_request, warp_sync_params, @@ -389,6 +394,9 @@ where peers: HashMap::new(), block_announce_data_cache: LruMap::new(ByLength::new(cache_capacity)), block_announce_protocol_name, + block_announce_validator: BlockAnnounceValidatorStream::new( + block_announce_validator, + ), num_connected: num_connected.clone(), is_major_syncing: is_major_syncing.clone(), service_rx, @@ -453,9 +461,9 @@ where } } - fn update_peer_info(&mut self, who: &PeerId) { - if let Some(info) = self.chain_sync.peer_info(who) { - if let Some(ref mut peer) = self.peers.get_mut(who) { + fn update_peer_info(&mut self, peer_id: &PeerId) { + if let Some(info) = self.chain_sync.peer_info(peer_id) { + if let Some(ref mut peer) = self.peers.get_mut(peer_id) { peer.info.best_hash = info.best_hash; peer.info.best_number = info.best_number; } @@ -463,14 +471,16 @@ where } /// Process the result of the block announce validation. - pub fn process_block_announce_validation_result( + fn process_block_announce_validation_result( &mut self, - validation_result: PollBlockAnnounceValidation, + validation_result: BlockAnnounceValidationResult, ) { match validation_result { - PollBlockAnnounceValidation::Skip => {}, - PollBlockAnnounceValidation::Nothing { is_best: _, who, announce } => { - self.update_peer_info(&who); + BlockAnnounceValidationResult::Skip { peer_id: _ } => {}, + BlockAnnounceValidationResult::Process { is_new_best, peer_id, announce } => { + self.chain_sync.on_validated_block_announce(is_new_best, peer_id, &announce); + + self.update_peer_info(&peer_id); if let Some(data) = announce.data { if !data.is_empty() { @@ -478,41 +488,29 @@ where } } }, - PollBlockAnnounceValidation::Failure { who, disconnect } => { + BlockAnnounceValidationResult::Failure { peer_id, disconnect } => { if disconnect { self.network_service - .disconnect_peer(who, self.block_announce_protocol_name.clone()); + .disconnect_peer(peer_id, self.block_announce_protocol_name.clone()); } - self.network_service.report_peer(who, rep::BAD_BLOCK_ANNOUNCEMENT); + self.network_service.report_peer(peer_id, rep::BAD_BLOCK_ANNOUNCEMENT); }, } } /// Push a block announce validation. - /// - /// It is required that [`ChainSync::poll_block_announce_validation`] is - /// called later to check for finished validations. The result of the validation - /// needs to be passed to [`SyncingEngine::process_block_announce_validation_result`] - /// to finish the processing. - /// - /// # Note - /// - /// This will internally create a future, but this future will not be registered - /// in the task before being polled once. So, it is required to call - /// [`ChainSync::poll_block_announce_validation`] to ensure that the future is - /// registered properly and will wake up the task when being ready. pub fn push_block_announce_validation( &mut self, - who: PeerId, + peer_id: PeerId, announce: BlockAnnounce, ) { let hash = announce.header.hash(); - let peer = match self.peers.get_mut(&who) { + let peer = match self.peers.get_mut(&peer_id) { Some(p) => p, None => { - log::error!(target: "sync", "Received block announce from disconnected peer {}", who); + log::error!(target: "sync", "Received block announce from disconnected peer {}", peer_id); debug_assert!(false); return }, @@ -525,7 +523,8 @@ where BlockState::Normal => false, }; - self.chain_sync.push_block_announce_validation(who, hash, announce, is_best); + self.block_announce_validator + .push_block_announce_validation(peer_id, hash, announce, is_best); } } @@ -558,10 +557,10 @@ where .or_else(|| self.block_announce_data_cache.get(&hash).cloned()) .unwrap_or_default(); - for (who, ref mut peer) in self.peers.iter_mut() { + for (peer_id, ref mut peer) in self.peers.iter_mut() { let inserted = peer.known_blocks.insert(hash); if inserted { - log::trace!(target: "sync", "Announcing block {:?} to {}", hash, who); + log::trace!(target: "sync", "Announcing block {:?} to {}", hash, peer_id); let message = BlockAnnounce { header: header.clone(), state: if is_best { Some(BlockState::Best) } else { Some(BlockState::Normal) }, @@ -656,14 +655,14 @@ where } } }, - ToServiceCommand::JustificationImported(peer, hash, number, success) => { + ToServiceCommand::JustificationImported(peer_id, hash, number, success) => { self.chain_sync.on_justification_import(hash, number, success); if !success { - log::info!(target: "sync", "💔 Invalid justification provided by {} for #{}", peer, hash); + log::info!(target: "sync", "💔 Invalid justification provided by {} for #{}", peer_id, hash); self.network_service - .disconnect_peer(peer, self.block_announce_protocol_name.clone()); + .disconnect_peer(peer_id, self.block_announce_protocol_name.clone()); self.network_service.report_peer( - peer, + peer_id, ReputationChange::new_fatal("Invalid justification"), ); } @@ -698,8 +697,11 @@ where let _ = tx.send(self.chain_sync.num_sync_requests()); }, ToServiceCommand::PeersInfo(tx) => { - let peers_info = - self.peers.iter().map(|(id, peer)| (*id, peer.info.clone())).collect(); + let peers_info = self + .peers + .iter() + .map(|(peer_id, peer)| (*peer_id, peer.info.clone())) + .collect(); let _ = tx.send(peers_info); }, ToServiceCommand::OnBlockFinalized(hash, header) => @@ -742,14 +744,6 @@ where if let Ok(announce) = BlockAnnounce::decode(&mut message.as_ref()) { self.last_notification_io = Instant::now(); self.push_block_announce_validation(remote, announce); - - // Make sure that the newly added block announce validation future - // was polled once to be registered in the task. - if let Poll::Ready(res) = - self.chain_sync.poll_block_announce_validation(cx) - { - self.process_block_announce_validation_result(res) - } } else { log::warn!(target: "sub-libp2p", "Failed to decode block announce"); } @@ -770,10 +764,14 @@ where } } - // poll `ChainSync` last because of a block announcement was received through the - // event stream between `SyncingEngine` and `Protocol` and the validation finished - // right after it as queued, the resulting block request (if any) can be sent right away. - while let Poll::Ready(result) = self.chain_sync.poll(cx) { + // Drive `ChainSync`. + while let Poll::Ready(()) = self.chain_sync.poll(cx) {} + + // Poll block announce validations last, because if a block announcement was received + // through the event stream between `SyncingEngine` and `Protocol` and the validation + // finished right after it is queued, the resulting block request (if any) can be sent + // right away. + while let Poll::Ready(Some(result)) = self.block_announce_validator.poll_next_unpin(cx) { self.process_block_announce_validation_result(result); } @@ -783,15 +781,15 @@ where /// Called by peer when it is disconnecting. /// /// Returns a result if the handshake of this peer was indeed accepted. - pub fn on_sync_peer_disconnected(&mut self, peer: PeerId) -> Result<(), ()> { - if let Some(info) = self.peers.remove(&peer) { - if self.important_peers.contains(&peer) { - log::warn!(target: "sync", "Reserved peer {} disconnected", peer); + pub fn on_sync_peer_disconnected(&mut self, peer_id: PeerId) -> Result<(), ()> { + if let Some(info) = self.peers.remove(&peer_id) { + if self.important_peers.contains(&peer_id) { + log::warn!(target: "sync", "Reserved peer {} disconnected", peer_id); } else { - log::debug!(target: "sync", "{} disconnected", peer); + log::debug!(target: "sync", "{} disconnected", peer_id); } - if !self.default_peers_set_no_slot_connected_peers.remove(&peer) && + if !self.default_peers_set_no_slot_connected_peers.remove(&peer_id) && info.inbound && info.info.roles.is_full() { match self.num_in_peers.checked_sub(1) { @@ -808,9 +806,10 @@ where } } - self.chain_sync.peer_disconnected(&peer); - self.event_streams - .retain(|stream| stream.unbounded_send(SyncEvent::PeerDisconnected(peer)).is_ok()); + self.chain_sync.peer_disconnected(&peer_id); + self.event_streams.retain(|stream| { + stream.unbounded_send(SyncEvent::PeerDisconnected(peer_id)).is_ok() + }); Ok(()) } else { Err(()) @@ -824,35 +823,35 @@ where /// from. pub fn on_sync_peer_connected( &mut self, - who: PeerId, + peer_id: PeerId, status: &BlockAnnouncesHandshake, sink: NotificationsSink, inbound: bool, ) -> Result<(), ()> { - log::trace!(target: "sync", "New peer {} {:?}", who, status); + log::trace!(target: "sync", "New peer {} {:?}", peer_id, status); - if self.peers.contains_key(&who) { - log::error!(target: "sync", "Called on_sync_peer_connected with already connected peer {}", who); + if self.peers.contains_key(&peer_id) { + log::error!(target: "sync", "Called on_sync_peer_connected with already connected peer {}", peer_id); debug_assert!(false); return Err(()) } if status.genesis_hash != self.genesis_hash { - self.network_service.report_peer(who, rep::GENESIS_MISMATCH); + self.network_service.report_peer(peer_id, rep::GENESIS_MISMATCH); - if self.important_peers.contains(&who) { + if self.important_peers.contains(&peer_id) { log::error!( target: "sync", "Reserved peer id `{}` is on a different chain (our genesis: {} theirs: {})", - who, + peer_id, self.genesis_hash, status.genesis_hash, ); - } else if self.boot_node_ids.contains(&who) { + } else if self.boot_node_ids.contains(&peer_id) { log::error!( target: "sync", "Bootnode with peer id `{}` is on a different chain (our genesis: {} theirs: {})", - who, + peer_id, self.genesis_hash, status.genesis_hash, ); @@ -867,7 +866,7 @@ where return Err(()) } - let no_slot_peer = self.default_peers_set_no_slot_peers.contains(&who); + let no_slot_peer = self.default_peers_set_no_slot_peers.contains(&peer_id); let this_peer_reserved_slot: usize = if no_slot_peer { 1 } else { 0 }; // make sure to accept no more than `--in-peers` many full nodes @@ -875,7 +874,7 @@ where status.roles.is_full() && inbound && self.num_in_peers == self.max_in_peers { - log::debug!(target: "sync", "All inbound slots have been consumed, rejecting {who}"); + log::debug!(target: "sync", "All inbound slots have been consumed, rejecting {peer_id}"); return Err(()) } @@ -885,7 +884,7 @@ where self.default_peers_set_no_slot_connected_peers.len() + this_peer_reserved_slot { - log::debug!(target: "sync", "Too many full nodes, rejecting {}", who); + log::debug!(target: "sync", "Too many full nodes, rejecting {}", peer_id); return Err(()) } @@ -893,7 +892,7 @@ where (self.peers.len() - self.chain_sync.num_peers()) >= self.default_peers_set_num_light { // Make sure that not all slots are occupied by light clients. - log::debug!(target: "sync", "Too many light nodes, rejecting {}", who); + log::debug!(target: "sync", "Too many light nodes, rejecting {}", peer_id); return Err(()) } @@ -911,7 +910,7 @@ where }; let req = if peer.info.roles.is_full() { - match self.chain_sync.new_peer(who, peer.info.best_hash, peer.info.best_number) { + match self.chain_sync.new_peer(peer_id, peer.info.best_hash, peer.info.best_number) { Ok(req) => req, Err(BadPeer(id, repu)) => { self.network_service.report_peer(id, repu); @@ -922,22 +921,22 @@ where None }; - log::debug!(target: "sync", "Connected {}", who); + log::debug!(target: "sync", "Connected {}", peer_id); - self.peers.insert(who, peer); + self.peers.insert(peer_id, peer); if no_slot_peer { - self.default_peers_set_no_slot_connected_peers.insert(who); + self.default_peers_set_no_slot_connected_peers.insert(peer_id); } else if inbound && status.roles.is_full() { self.num_in_peers += 1; } if let Some(req) = req { - self.chain_sync.send_block_request(who, req); + self.chain_sync.send_block_request(peer_id, req); } self.event_streams - .retain(|stream| stream.unbounded_send(SyncEvent::PeerConnected(who)).is_ok()); + .retain(|stream| stream.unbounded_send(SyncEvent::PeerConnected(peer_id)).is_ok()); Ok(()) } diff --git a/substrate/client/network/sync/src/futures_stream.rs b/substrate/client/network/sync/src/futures_stream.rs new file mode 100644 index 000000000000..c33d582345b6 --- /dev/null +++ b/substrate/client/network/sync/src/futures_stream.rs @@ -0,0 +1,134 @@ +// This file is part of Substrate. + +// Copyright (C) Parity Technologies (UK) Ltd. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +//! A wrapper for [`FuturesUnordered`] that wakes the task up once a new future is pushed +//! for it to be polled automatically. It's [`Stream`] never terminates. + +use futures::{stream::FuturesUnordered, Future, Stream, StreamExt}; +use std::{ + pin::Pin, + task::{Context, Poll, Waker}, +}; + +/// Wrapper around [`FuturesUnordered`] that wakes a task up automatically. +pub struct FuturesStream { + futures: FuturesUnordered, + waker: Option, +} + +/// Surprizingly, `#[derive(Default)]` doesn't work on [`FuturesStream`]. +impl Default for FuturesStream { + fn default() -> FuturesStream { + FuturesStream { futures: Default::default(), waker: None } + } +} + +impl FuturesStream { + /// Push a future for processing. + pub fn push(&mut self, future: F) { + self.futures.push(future); + + if let Some(waker) = self.waker.take() { + waker.wake(); + } + } + + /// The number of futures in the stream. + pub fn len(&self) -> usize { + self.futures.len() + } +} + +impl Stream for FuturesStream { + type Item = ::Output; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let Poll::Ready(Some(result)) = self.futures.poll_next_unpin(cx) else { + self.waker = Some(cx.waker().clone()); + + return Poll::Pending + }; + + Poll::Ready(Some(result)) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use futures::future::{BoxFuture, FutureExt}; + + /// [`Stream`] implementation for [`FuturesStream`] relies on the undocumented + /// feature that [`FuturesUnordered`] can be polled and repeatedly yield + /// `Poll::Ready(None)` before any futures are added into it. + #[tokio::test] + async fn empty_futures_unordered_can_be_polled() { + let mut unordered = FuturesUnordered::>::default(); + + futures::future::poll_fn(|cx| { + assert_eq!(unordered.poll_next_unpin(cx), Poll::Ready(None)); + assert_eq!(unordered.poll_next_unpin(cx), Poll::Ready(None)); + + Poll::Ready(()) + }) + .await; + } + + /// [`Stream`] implementation for [`FuturesStream`] relies on the undocumented + /// feature that [`FuturesUnordered`] can be polled and repeatedly yield + /// `Poll::Ready(None)` after all the futures in it have resolved. + #[tokio::test] + async fn deplenished_futures_unordered_can_be_polled() { + let mut unordered = FuturesUnordered::>::default(); + + unordered.push(futures::future::ready(()).boxed()); + assert_eq!(unordered.next().await, Some(())); + + futures::future::poll_fn(|cx| { + assert_eq!(unordered.poll_next_unpin(cx), Poll::Ready(None)); + assert_eq!(unordered.poll_next_unpin(cx), Poll::Ready(None)); + + Poll::Ready(()) + }) + .await; + } + + #[tokio::test] + async fn empty_futures_stream_yields_pending() { + let mut stream = FuturesStream::>::default(); + + futures::future::poll_fn(|cx| { + assert_eq!(stream.poll_next_unpin(cx), Poll::Pending); + Poll::Ready(()) + }) + .await; + } + + #[tokio::test] + async fn futures_stream_resolves_futures_and_yields_pending() { + let mut stream = FuturesStream::default(); + stream.push(futures::future::ready(17)); + + futures::future::poll_fn(|cx| { + assert_eq!(stream.poll_next_unpin(cx), Poll::Ready(Some(17))); + assert_eq!(stream.poll_next_unpin(cx), Poll::Pending); + Poll::Ready(()) + }) + .await; + } +} diff --git a/substrate/client/network/sync/src/lib.rs b/substrate/client/network/sync/src/lib.rs index 175c1c43f46f..0c2013b14977 100644 --- a/substrate/client/network/sync/src/lib.rs +++ b/substrate/client/network/sync/src/lib.rs @@ -37,9 +37,7 @@ use crate::{ use codec::{Decode, DecodeAll, Encode}; use extra_requests::ExtraRequests; -use futures::{ - channel::oneshot, stream::FuturesUnordered, task::Poll, Future, FutureExt, StreamExt, -}; +use futures::{channel::oneshot, task::Poll, Future, FutureExt}; use libp2p::{request_response::OutboundFailure, PeerId}; use log::{debug, error, info, trace, warn}; use prost::Message; @@ -66,16 +64,12 @@ use sc_network_common::{ warp::{EncodedProof, WarpProofRequest, WarpSyncParams, WarpSyncPhase, WarpSyncProgress}, BadPeer, ChainSync as ChainSyncT, ImportResult, Metrics, OnBlockData, OnBlockJustification, OnStateData, OpaqueBlockRequest, OpaqueBlockResponse, OpaqueStateRequest, - OpaqueStateResponse, PeerInfo, PeerRequest, PollBlockAnnounceValidation, SyncMode, - SyncState, SyncStatus, + OpaqueStateResponse, PeerInfo, PeerRequest, SyncMode, SyncState, SyncStatus, }, }; use sp_arithmetic::traits::Saturating; use sp_blockchain::{Error as ClientError, HeaderBackend, HeaderMetadata}; -use sp_consensus::{ - block_validation::{BlockAnnounceValidator, Validation}, - BlockOrigin, BlockStatus, -}; +use sp_consensus::{BlockOrigin, BlockStatus}; use sp_runtime::{ traits::{ Block as BlockT, CheckedSub, Hash, HashingFor, Header as HeaderT, NumberFor, One, @@ -85,7 +79,7 @@ use sp_runtime::{ }; use std::{ - collections::{hash_map::Entry, HashMap, HashSet}, + collections::{HashMap, HashSet}, iter, ops::Range, pin::Pin, @@ -94,7 +88,9 @@ use std::{ pub use service::chain_sync::SyncingService; +mod block_announce_validator; mod extra_requests; +mod futures_stream; mod schema; pub mod block_request_handler; @@ -117,17 +113,6 @@ const MAX_DOWNLOAD_AHEAD: u32 = 2048; /// common block of a node. const MAX_BLOCKS_TO_LOOK_BACKWARDS: u32 = MAX_DOWNLOAD_AHEAD / 2; -/// Maximum number of concurrent block announce validations. -/// -/// If the queue reaches the maximum, we drop any new block -/// announcements. -const MAX_CONCURRENT_BLOCK_ANNOUNCE_VALIDATIONS: usize = 256; - -/// Maximum number of concurrent block announce validations per peer. -/// -/// See [`MAX_CONCURRENT_BLOCK_ANNOUNCE_VALIDATIONS`] for more information. -const MAX_CONCURRENT_BLOCK_ANNOUNCE_VALIDATIONS_PER_PEER: usize = 4; - /// Pick the state to sync as the latest finalized number minus this. const STATE_SYNC_FINALITY_THRESHOLD: u32 = 8; @@ -307,19 +292,12 @@ pub struct ChainSync { fork_targets: HashMap>, /// A set of peers for which there might be potential block requests allowed_requests: AllowedRequests, - /// A type to check incoming block announcements. - block_announce_validator: Box + Send>, /// Maximum number of peers to ask the same blocks in parallel. max_parallel_downloads: u32, /// Maximum blocks per request. max_blocks_per_request: u32, /// Total number of downloaded blocks. downloaded_blocks: usize, - /// All block announcement that are currently being validated. - block_announce_validation: - FuturesUnordered> + Send>>>, - /// Stats per peer about the number of concurrent block announce validations. - block_announce_validation_per_peer_stats: HashMap, /// State sync in progress, if any. state_sync: Option>, /// Warp sync in progress, if any. @@ -424,51 +402,6 @@ impl PeerSyncState { } } -/// Result of [`ChainSync::block_announce_validation`]. -#[derive(Debug, Clone, PartialEq, Eq)] -enum PreValidateBlockAnnounce { - /// The announcement failed at validation. - /// - /// The peer reputation should be decreased. - Failure { - /// Who sent the processed block announcement? - who: PeerId, - /// Should the peer be disconnected? - disconnect: bool, - }, - /// The pre-validation was sucessful and the announcement should be - /// further processed. - Process { - /// Is this the new best block of the peer? - is_new_best: bool, - /// The id of the peer that send us the announcement. - who: PeerId, - /// The announcement. - announce: BlockAnnounce, - }, - /// The announcement validation returned an error. - /// - /// An error means that *this* node failed to validate it because some internal error happened. - /// If the block announcement was invalid, [`Self::Failure`] is the correct variant to express - /// this. - Error { who: PeerId }, - /// The block announcement should be skipped. - /// - /// This should *only* be returned when there wasn't a slot registered - /// for this block announcement validation. - Skip, -} - -/// Result of [`ChainSync::has_slot_for_block_announce_validation`]. -enum HasSlotForBlockAnnounceValidation { - /// Yes, there is a slot for the block announce validation. - Yes, - /// We reached the total maximum number of validation slots. - TotalMaximumSlotsReached, - /// We reached the maximum number of validation slots for the given peer. - MaximumPeerSlotsReached, -} - impl ChainSyncT for ChainSync where B: BlockT, @@ -692,7 +625,7 @@ where self.extra_justifications.reset(); } - // The implementation is similar to on_block_announce with unknown parent hash. + // The implementation is similar to `on_validated_block_announce` with unknown parent hash. fn set_sync_fork_request( &mut self, mut peers: Vec, @@ -1107,119 +1040,88 @@ where } } - fn push_block_announce_validation( + fn on_validated_block_announce( &mut self, - who: PeerId, - hash: B::Hash, - announce: BlockAnnounce, is_best: bool, + who: PeerId, + announce: &BlockAnnounce, ) { - let header = &announce.header; - let number = *header.number(); - debug!( - target: "sync", - "Pre-validating received block announcement {:?} with number {:?} from {}", - hash, - number, - who, - ); + let number = *announce.header.number(); + let hash = announce.header.hash(); + let parent_status = + self.block_status(announce.header.parent_hash()).unwrap_or(BlockStatus::Unknown); + let known_parent = parent_status != BlockStatus::Unknown; + let ancient_parent = parent_status == BlockStatus::InChainPruned; - if number.is_zero() { - self.block_announce_validation.push( - async move { - warn!( - target: "sync", - "💔 Ignored genesis block (#0) announcement from {}: {}", - who, - hash, - ); - PreValidateBlockAnnounce::Skip - } - .boxed(), - ); + let known = self.is_known(&hash); + let peer = if let Some(peer) = self.peers.get_mut(&who) { + peer + } else { + error!(target: "sync", "💔 Called `on_validated_block_announce` with a bad peer ID"); + return + }; + + if let PeerSyncState::AncestorSearch { .. } = peer.state { + trace!(target: "sync", "Peer {} is in the ancestor search state.", who); return } - // Check if there is a slot for this block announce validation. - match self.has_slot_for_block_announce_validation(&who) { - HasSlotForBlockAnnounceValidation::Yes => {}, - HasSlotForBlockAnnounceValidation::TotalMaximumSlotsReached => { - self.block_announce_validation.push( - async move { - warn!( - target: "sync", - "💔 Ignored block (#{} -- {}) announcement from {} because all validation slots are occupied.", - number, - hash, - who, - ); - PreValidateBlockAnnounce::Skip - } - .boxed(), - ); - return - }, - HasSlotForBlockAnnounceValidation::MaximumPeerSlotsReached => { - self.block_announce_validation.push(async move { - warn!( - target: "sync", - "💔 Ignored block (#{} -- {}) announcement from {} because all validation slots for this peer are occupied.", - number, - hash, - who, - ); - PreValidateBlockAnnounce::Skip - }.boxed()); - return - }, + if is_best { + // update their best block + peer.best_number = number; + peer.best_hash = hash; } - // Let external validator check the block announcement. - let assoc_data = announce.data.as_ref().map_or(&[][..], |v| v.as_slice()); - let future = self.block_announce_validator.validate(header, assoc_data); + // If the announced block is the best they have and is not ahead of us, our common number + // is either one further ahead or it's the one they just announced, if we know about it. + if is_best { + if known && self.best_queued_number >= number { + self.update_peer_common_number(&who, number); + } else if announce.header.parent_hash() == &self.best_queued_hash || + known_parent && self.best_queued_number >= number + { + self.update_peer_common_number(&who, number.saturating_sub(One::one())); + } + } + self.allowed_requests.add(&who); - self.block_announce_validation.push( - async move { - match future.await { - Ok(Validation::Success { is_new_best }) => PreValidateBlockAnnounce::Process { - is_new_best: is_new_best || is_best, - announce, - who, - }, - Ok(Validation::Failure { disconnect }) => { - debug!( - target: "sync", - "Block announcement validation of block {:?} from {} failed", - hash, - who, - ); - PreValidateBlockAnnounce::Failure { who, disconnect } - }, - Err(e) => { - debug!( - target: "sync", - "💔 Block announcement validation of block {:?} errored: {}", - hash, - e, - ); - PreValidateBlockAnnounce::Error { who } - }, - } + // known block case + if known || self.is_already_downloading(&hash) { + trace!(target: "sync", "Known block announce from {}: {}", who, hash); + if let Some(target) = self.fork_targets.get_mut(&hash) { + target.peers.insert(who); } - .boxed(), - ); - } + return + } - fn poll_block_announce_validation( - &mut self, - cx: &mut std::task::Context, - ) -> Poll> { - match self.block_announce_validation.poll_next_unpin(cx) { - Poll::Ready(Some(res)) => { - self.peer_block_announce_validation_finished(&res); - Poll::Ready(self.finish_block_announce_validation(res)) - }, - _ => Poll::Pending, + if ancient_parent { + trace!( + target: "sync", + "Ignored ancient block announced from {}: {} {:?}", + who, + hash, + announce.header, + ); + return + } + + if self.status().state == SyncState::Idle { + trace!( + target: "sync", + "Added sync target for block announced from {}: {} {:?}", + who, + hash, + announce.summary(), + ); + self.fork_targets + .entry(hash) + .or_insert_with(|| ForkTarget { + number, + parent_hash: Some(*announce.header.parent_hash()), + peers: Default::default(), + }) + .peers + .insert(who); } } @@ -1319,10 +1221,7 @@ where .map_err(|error: codec::Error| error.to_string()) } - fn poll( - &mut self, - cx: &mut std::task::Context, - ) -> Poll> { + fn poll(&mut self, cx: &mut std::task::Context) -> Poll<()> { // Should be called before `process_outbound_requests` to ensure // that a potential target block is directly leading to requests. if let Some(warp_sync) = &mut self.warp_sync { @@ -1339,10 +1238,6 @@ where } } - if let Poll::Ready(announce) = self.poll_block_announce_validation(cx) { - return Poll::Ready(announce) - } - Poll::Pending } @@ -1395,7 +1290,6 @@ where protocol_id: ProtocolId, fork_id: &Option, roles: Roles, - block_announce_validator: Box + Send>, max_parallel_downloads: u32, max_blocks_per_request: u32, warp_sync_params: Option>, @@ -1430,12 +1324,9 @@ where queue_blocks: Default::default(), fork_targets: Default::default(), allowed_requests: Default::default(), - block_announce_validator, max_parallel_downloads, max_blocks_per_request, downloaded_blocks: 0, - block_announce_validation: Default::default(), - block_announce_validation_per_peer_stats: Default::default(), state_sync: None, warp_sync: None, import_existing: false, @@ -1586,186 +1477,6 @@ where self.allowed_requests.set_all(); } - /// Checks if there is a slot for a block announce validation. - /// - /// The total number and the number per peer of concurrent block announce validations - /// is capped. - /// - /// Returns [`HasSlotForBlockAnnounceValidation`] to inform about the result. - /// - /// # Note - /// - /// It is *required* to call [`Self::peer_block_announce_validation_finished`] when the - /// validation is finished to clear the slot. - fn has_slot_for_block_announce_validation( - &mut self, - peer: &PeerId, - ) -> HasSlotForBlockAnnounceValidation { - if self.block_announce_validation.len() >= MAX_CONCURRENT_BLOCK_ANNOUNCE_VALIDATIONS { - return HasSlotForBlockAnnounceValidation::TotalMaximumSlotsReached - } - - match self.block_announce_validation_per_peer_stats.entry(*peer) { - Entry::Vacant(entry) => { - entry.insert(1); - HasSlotForBlockAnnounceValidation::Yes - }, - Entry::Occupied(mut entry) => { - if *entry.get() < MAX_CONCURRENT_BLOCK_ANNOUNCE_VALIDATIONS_PER_PEER { - *entry.get_mut() += 1; - HasSlotForBlockAnnounceValidation::Yes - } else { - HasSlotForBlockAnnounceValidation::MaximumPeerSlotsReached - } - }, - } - } - - /// Should be called when a block announce validation is finished, to update the slots - /// of the peer that send the block announce. - fn peer_block_announce_validation_finished( - &mut self, - res: &PreValidateBlockAnnounce, - ) { - let peer = match res { - PreValidateBlockAnnounce::Failure { who, .. } | - PreValidateBlockAnnounce::Process { who, .. } | - PreValidateBlockAnnounce::Error { who } => who, - PreValidateBlockAnnounce::Skip => return, - }; - - match self.block_announce_validation_per_peer_stats.entry(*peer) { - Entry::Vacant(_) => { - error!( - target: "sync", - "💔 Block announcement validation from peer {} finished for that no slot was allocated!", - peer, - ); - }, - Entry::Occupied(mut entry) => { - *entry.get_mut() = entry.get().saturating_sub(1); - if *entry.get() == 0 { - entry.remove(); - } - }, - } - } - - /// This will finish processing of the block announcement. - fn finish_block_announce_validation( - &mut self, - pre_validation_result: PreValidateBlockAnnounce, - ) -> PollBlockAnnounceValidation { - let (announce, is_best, who) = match pre_validation_result { - PreValidateBlockAnnounce::Failure { who, disconnect } => { - debug!( - target: "sync", - "Failed announce validation: {:?}, disconnect: {}", - who, - disconnect, - ); - return PollBlockAnnounceValidation::Failure { who, disconnect } - }, - PreValidateBlockAnnounce::Process { announce, is_new_best, who } => - (announce, is_new_best, who), - PreValidateBlockAnnounce::Error { .. } | PreValidateBlockAnnounce::Skip => { - debug!( - target: "sync", - "Ignored announce validation", - ); - return PollBlockAnnounceValidation::Skip - }, - }; - - trace!( - target: "sync", - "Finished block announce validation: from {:?}: {:?}. local_best={}", - who, - announce.summary(), - is_best, - ); - - let number = *announce.header.number(); - let hash = announce.header.hash(); - let parent_status = - self.block_status(announce.header.parent_hash()).unwrap_or(BlockStatus::Unknown); - let known_parent = parent_status != BlockStatus::Unknown; - let ancient_parent = parent_status == BlockStatus::InChainPruned; - - let known = self.is_known(&hash); - let peer = if let Some(peer) = self.peers.get_mut(&who) { - peer - } else { - error!(target: "sync", "💔 Called on_block_announce with a bad peer ID"); - return PollBlockAnnounceValidation::Nothing { is_best, who, announce } - }; - - if let PeerSyncState::AncestorSearch { .. } = peer.state { - trace!(target: "sync", "Peer state is ancestor search."); - return PollBlockAnnounceValidation::Nothing { is_best, who, announce } - } - - if is_best { - // update their best block - peer.best_number = number; - peer.best_hash = hash; - } - - // If the announced block is the best they have and is not ahead of us, our common number - // is either one further ahead or it's the one they just announced, if we know about it. - if is_best { - if known && self.best_queued_number >= number { - self.update_peer_common_number(&who, number); - } else if announce.header.parent_hash() == &self.best_queued_hash || - known_parent && self.best_queued_number >= number - { - self.update_peer_common_number(&who, number - One::one()); - } - } - self.allowed_requests.add(&who); - - // known block case - if known || self.is_already_downloading(&hash) { - trace!(target: "sync", "Known block announce from {}: {}", who, hash); - if let Some(target) = self.fork_targets.get_mut(&hash) { - target.peers.insert(who); - } - return PollBlockAnnounceValidation::Nothing { is_best, who, announce } - } - - if ancient_parent { - trace!( - target: "sync", - "Ignored ancient block announced from {}: {} {:?}", - who, - hash, - announce.header, - ); - return PollBlockAnnounceValidation::Nothing { is_best, who, announce } - } - - if self.status().state == SyncState::Idle { - trace!( - target: "sync", - "Added sync target for block announced from {}: {} {:?}", - who, - hash, - announce.summary(), - ); - self.fork_targets - .entry(hash) - .or_insert_with(|| ForkTarget { - number, - parent_hash: Some(*announce.header.parent_hash()), - peers: Default::default(), - }) - .peers - .insert(who); - } - - PollBlockAnnounceValidation::Nothing { is_best, who, announce } - } - /// Restart the sync process. This will reset all pending block requests and return an iterator /// of new block requests to make to peers. Peers that were downloading finality data (i.e. /// their state was `DownloadingJustification`) are unaffected and will stay in the same state. @@ -3162,14 +2873,13 @@ fn validate_blocks( mod test { use super::*; use crate::service::network::NetworkServiceProvider; - use futures::{executor::block_on, future::poll_fn}; + use futures::executor::block_on; use sc_block_builder::BlockBuilderProvider; use sc_network_common::{ role::Role, - sync::message::{BlockData, BlockState, FromBlock}, + sync::message::{BlockAnnounce, BlockData, BlockState, FromBlock}, }; use sp_blockchain::HeaderBackend; - use sp_consensus::block_validation::DefaultBlockAnnounceValidator; use substrate_test_runtime_client::{ runtime::{Block, Hash, Header}, BlockBuilderExt, ClientBlockImportExt, ClientExt, DefaultTestClientBuilderExt, TestClient, @@ -3183,7 +2893,6 @@ mod test { // internally we should process the response as the justification not being available. let client = Arc::new(TestClientBuilder::new().build()); - let block_announce_validator = Box::new(DefaultBlockAnnounceValidator); let peer_id = PeerId::random(); let import_queue = Box::new(sc_consensus::import_queue::mock::MockImportQueueHandle::new()); @@ -3195,7 +2904,6 @@ mod test { ProtocolId::from("test-protocol-name"), &Some(String::from("test-fork-id")), Roles::from(&Role::Full), - block_announce_validator, 1, 64, None, @@ -3262,7 +2970,6 @@ mod test { ProtocolId::from("test-protocol-name"), &Some(String::from("test-fork-id")), Roles::from(&Role::Full), - Box::new(DefaultBlockAnnounceValidator), 1, 64, None, @@ -3344,23 +3051,16 @@ mod test { /// Send a block annoucnement for the given `header`. fn send_block_announce( header: Header, - peer_id: &PeerId, + peer_id: PeerId, sync: &mut ChainSync, ) { - let block_annnounce = BlockAnnounce { + let announce = BlockAnnounce { header: header.clone(), state: Some(BlockState::Best), data: Some(Vec::new()), }; - sync.push_block_announce_validation(*peer_id, header.hash(), block_annnounce, true); - - // Poll until we have procssed the block announcement - block_on(poll_fn(|cx| loop { - if sync.poll_block_announce_validation(cx).is_pending() { - break Poll::Ready(()) - } - })) + sync.on_validated_block_announce(true, peer_id, &announce); } /// Create a block response from the given `blocks`. @@ -3444,7 +3144,6 @@ mod test { ProtocolId::from("test-protocol-name"), &Some(String::from("test-fork-id")), Roles::from(&Role::Full), - Box::new(DefaultBlockAnnounceValidator), 5, 64, None, @@ -3491,7 +3190,7 @@ mod test { assert!(sync.block_requests().is_empty()); // Let peer2 announce a fork of block 3 - send_block_announce(block3_fork.header().clone(), &peer_id2, &mut sync); + send_block_announce(block3_fork.header().clone(), peer_id2, &mut sync); // Import and tell sync that we now have the fork. block_on(client.import(BlockOrigin::Own, block3_fork.clone())).unwrap(); @@ -3500,13 +3199,13 @@ mod test { let block4 = build_block_at(block3_fork.hash(), false); // Let peer2 announce block 4 and check that sync wants to get the block. - send_block_announce(block4.header().clone(), &peer_id2, &mut sync); + send_block_announce(block4.header().clone(), peer_id2, &mut sync); let request = get_block_request(&mut sync, FromBlock::Hash(block4.hash()), 2, &peer_id2); // Peer1 announces the same block, but as the common block is still `1`, sync will request // block 2 again. - send_block_announce(block4.header().clone(), &peer_id1, &mut sync); + send_block_announce(block4.header().clone(), peer_id1, &mut sync); let request2 = get_block_request(&mut sync, FromBlock::Number(2), 1, &peer_id1); @@ -3571,7 +3270,6 @@ mod test { ProtocolId::from("test-protocol-name"), &Some(String::from("test-fork-id")), Roles::from(&Role::Full), - Box::new(DefaultBlockAnnounceValidator), 5, 64, None, @@ -3647,7 +3345,7 @@ mod test { sync.queue_blocks.clear(); // Let peer2 announce that it finished syncing - send_block_announce(best_block.header().clone(), &peer_id2, &mut sync); + send_block_announce(best_block.header().clone(), peer_id2, &mut sync); let (peer1_req, peer2_req) = sync.block_requests().into_iter().fold((None, None), |res, req| { @@ -3729,7 +3427,6 @@ mod test { ProtocolId::from("test-protocol-name"), &Some(String::from("test-fork-id")), Roles::from(&Role::Full), - Box::new(DefaultBlockAnnounceValidator), 5, 64, None, @@ -3754,7 +3451,7 @@ mod test { sync.new_peer(peer_id1, common_block.hash(), *common_block.header().number()) .unwrap(); - send_block_announce(fork_blocks.last().unwrap().header().clone(), &peer_id1, &mut sync); + send_block_announce(fork_blocks.last().unwrap().header().clone(), peer_id1, &mut sync); let mut request = get_block_request(&mut sync, FromBlock::Number(info.best_number), 1, &peer_id1); @@ -3872,7 +3569,6 @@ mod test { ProtocolId::from("test-protocol-name"), &Some(String::from("test-fork-id")), Roles::from(&Role::Full), - Box::new(DefaultBlockAnnounceValidator), 5, 64, None, @@ -3897,7 +3593,7 @@ mod test { sync.new_peer(peer_id1, common_block.hash(), *common_block.header().number()) .unwrap(); - send_block_announce(fork_blocks.last().unwrap().header().clone(), &peer_id1, &mut sync); + send_block_announce(fork_blocks.last().unwrap().header().clone(), peer_id1, &mut sync); let mut request = get_block_request(&mut sync, FromBlock::Number(info.best_number), 1, &peer_id1); @@ -4017,7 +3713,6 @@ mod test { ProtocolId::from("test-protocol-name"), &Some(String::from("test-fork-id")), Roles::from(&Role::Full), - Box::new(DefaultBlockAnnounceValidator), 1, 64, None, @@ -4039,7 +3734,7 @@ mod test { // Create a "new" header and announce it let mut header = blocks[0].header().clone(); header.number = 4; - send_block_announce(header, &peer_id1, &mut sync); + send_block_announce(header, peer_id1, &mut sync); assert!(sync.fork_targets.len() == 1); sync.peer_disconnected(&peer_id1); @@ -4063,7 +3758,6 @@ mod test { ProtocolId::from("test-protocol-name"), &Some(String::from("test-fork-id")), Roles::from(&Role::Full), - Box::new(DefaultBlockAnnounceValidator), 1, 64, None, @@ -4107,7 +3801,6 @@ mod test { #[test] fn sync_restart_removes_block_but_not_justification_requests() { let mut client = Arc::new(TestClientBuilder::new().build()); - let block_announce_validator = Box::new(DefaultBlockAnnounceValidator); let import_queue = Box::new(sc_consensus::import_queue::mock::MockImportQueueHandle::new()); let (_chain_sync_network_provider, chain_sync_network_handle) = NetworkServiceProvider::new(); @@ -4117,7 +3810,6 @@ mod test { ProtocolId::from("test-protocol-name"), &Some(String::from("test-fork-id")), Roles::from(&Role::Full), - block_announce_validator, 1, 64, None, diff --git a/substrate/client/network/sync/src/mock.rs b/substrate/client/network/sync/src/mock.rs index 838c6cf7667a..d37095c17d2c 100644 --- a/substrate/client/network/sync/src/mock.rs +++ b/substrate/client/network/sync/src/mock.rs @@ -24,7 +24,7 @@ use libp2p::PeerId; use sc_network_common::sync::{ message::{BlockAnnounce, BlockData, BlockRequest, BlockResponse}, BadPeer, ChainSync as ChainSyncT, Metrics, OnBlockData, OnBlockJustification, - OpaqueBlockResponse, PeerInfo, PollBlockAnnounceValidation, SyncStatus, + OpaqueBlockResponse, PeerInfo, SyncStatus, }; use sp_runtime::traits::{Block as BlockT, NumberFor}; @@ -71,17 +71,12 @@ mockall::mock! { success: bool, ); fn on_block_finalized(&mut self, hash: &Block::Hash, number: NumberFor); - fn push_block_announce_validation( + fn on_validated_block_announce( &mut self, - who: PeerId, - hash: Block::Hash, - announce: BlockAnnounce, is_best: bool, + who: PeerId, + announce: &BlockAnnounce, ); - fn poll_block_announce_validation<'a>( - &mut self, - cx: &mut std::task::Context<'a>, - ) -> Poll>; fn peer_disconnected(&mut self, who: &PeerId); fn metrics(&self) -> Metrics; fn block_response_into_blocks( @@ -92,7 +87,7 @@ mockall::mock! { fn poll<'a>( &mut self, cx: &mut std::task::Context<'a>, - ) -> Poll>; + ) -> Poll<()>; fn send_block_request( &mut self, who: PeerId, diff --git a/substrate/client/utils/src/mpsc.rs b/substrate/client/utils/src/mpsc.rs index 36e44be5e295..039e03f9e618 100644 --- a/substrate/client/utils/src/mpsc.rs +++ b/substrate/client/utils/src/mpsc.rs @@ -123,6 +123,11 @@ impl TracingUnboundedSender { s }) } + + /// The number of elements in the channel (proxy function to [`async_channel::Sender`]). + pub fn len(&self) -> usize { + self.inner.len() + } } impl TracingUnboundedReceiver { @@ -139,6 +144,11 @@ impl TracingUnboundedReceiver { s }) } + + /// The number of elements in the channel (proxy function to [`async_channel::Receiver`]). + pub fn len(&self) -> usize { + self.inner.len() + } } impl Drop for TracingUnboundedReceiver {