diff --git a/core/src/light_protocol/common/ledger_info.rs b/core/src/light_protocol/common/ledger_info.rs index 86df8a034b..80daf16439 100644 --- a/core/src/light_protocol/common/ledger_info.rs +++ b/core/src/light_protocol/common/ledger_info.rs @@ -42,8 +42,10 @@ impl LedgerInfo { .get_data_manager() .block_by_hash(&hash, false /* update_cache */) .map(|b| (*b).clone()) - // FIXME: what's this internal error? - .ok_or(ErrorKind::InternalError.into()) + .ok_or_else(|| { + ErrorKind::InternalError(format!("Block {:?} not found", hash)) + .into() + }) } /// Get header `hash`, if it exists. @@ -53,12 +55,15 @@ impl LedgerInfo { .get_data_manager() .block_header_by_hash(&hash) .map(|h| (*h).clone()) - .ok_or(ErrorKind::InternalError.into()) + .ok_or_else(|| { + ErrorKind::InternalError(format!("Header {:?} not found", hash)) + .into() + }) } /// Get hash of block at `height` on the pivot chain, if it exists. #[inline] - pub fn pivot_hash_of(&self, height: u64) -> Result { + fn pivot_hash_of(&self, height: u64) -> Result { let epoch = EpochNumber::Number(height); Ok(self.consensus.get_hash_from_epoch_number(epoch)?) } @@ -81,7 +86,7 @@ impl LedgerInfo { /// Get the correct deferred state root of the block at `height` on the /// pivot chain based on local execution information. #[inline] - pub fn correct_deferred_state_root_hash_of( + fn correct_deferred_state_root_hash_of( &self, height: u64, ) -> Result { let epoch = height.saturating_sub(DEFERRED_STATE_EPOCH_COUNT); @@ -92,7 +97,7 @@ impl LedgerInfo { /// Get the correct deferred receipts root of the block at `height` on the /// pivot chain based on local execution information. #[inline] - pub fn correct_deferred_receipts_root_hash_of( + fn correct_deferred_receipts_root_hash_of( &self, height: u64, ) -> Result { let epoch = height.saturating_sub(DEFERRED_STATE_EPOCH_COUNT); @@ -102,13 +107,19 @@ impl LedgerInfo { .get_data_manager() .get_epoch_execution_commitment(&pivot) .map(|c| c.receipts_root) - .ok_or(ErrorKind::InternalError.into()) + .ok_or_else(|| { + ErrorKind::InternalError(format!( + "Execution commitments for {:?} not found", + pivot + )) + .into() + }) } /// Get the correct deferred logs bloom root of the block at `height` on the /// pivot chain based on local execution information. #[inline] - pub fn correct_deferred_logs_root_hash_of( + fn correct_deferred_logs_root_hash_of( &self, height: u64, ) -> Result { let epoch = height.saturating_sub(DEFERRED_STATE_EPOCH_COUNT); @@ -118,7 +129,13 @@ impl LedgerInfo { .get_data_manager() .get_epoch_execution_commitment(&pivot) .map(|c| c.logs_bloom_hash) - .ok_or(ErrorKind::InternalError.into()) + .ok_or_else(|| { + ErrorKind::InternalError(format!( + "Execution commitments for {:?} not found", + pivot + )) + .into() + }) } /// Get the number of epochs per snapshot period. @@ -129,7 +146,7 @@ impl LedgerInfo { /// Get the state trie corresponding to the execution of `epoch`. #[inline] - pub fn state_of(&self, epoch: u64) -> Result { + fn state_of(&self, epoch: u64) -> Result { let pivot = self.pivot_hash_of(epoch)?; let maybe_state_index = self @@ -145,7 +162,12 @@ impl LedgerInfo { match state { Some(Ok(Some(state))) => Ok(state), - _ => Err(ErrorKind::InternalError.into()), + _ => { + bail!(ErrorKind::InternalError(format!( + "State of epoch {} not found", + epoch + ))); + } } } @@ -156,7 +178,12 @@ impl LedgerInfo { ) -> Result { match self.state_of(epoch)?.get_state_root() { Ok(root) => Ok(root), - _ => Err(ErrorKind::InternalError.into()), + Err(e) => { + bail!(ErrorKind::InternalError(format!( + "State root of epoch {} not found: {:?}", + epoch, e + ))); + } } } @@ -207,7 +234,13 @@ impl LedgerInfo { false, /* update_cache */ ) .map(|res| (*res.block_receipts).clone()) - .ok_or(ErrorKind::InternalError.into()) + .ok_or_else(|| { + ErrorKind::InternalError(format!( + "Receipts of epoch {} not found", + epoch + )) + .into() + }) }) .collect() } @@ -232,7 +265,13 @@ impl LedgerInfo { false, /* update_cache */ ) .map(|res| res.bloom) - .ok_or(ErrorKind::InternalError.into()) + .ok_or_else(|| { + ErrorKind::InternalError(format!( + "Logs bloom of epoch {} not found", + epoch + )) + .into() + }) }) .collect::, Error>>()?; diff --git a/core/src/light_protocol/error.rs b/core/src/light_protocol/error.rs index 53ae7ded87..913c22cf39 100644 --- a/core/src/light_protocol/error.rs +++ b/core/src/light_protocol/error.rs @@ -3,15 +3,18 @@ // See http://www.gnu.org/licenses/ use crate::{ - message::{Message, MsgId}, + message::{Message, MsgId, RequestId}, network::{self, NetworkContext, UpdateNodeOperation}, statedb, - sync::message::Throttled, + sync::{message::Throttled, node_type::NodeType}, }; +use cfx_types::{H160, H256}; use error_chain::ChainedError; use network::node_table::NodeId; -use primitives::{filter::FilterError, ChainIdParams}; +use parking_lot::Mutex; +use primitives::{filter::FilterError, ChainIdParams, StateRoot}; use rlp::DecoderError; +use std::sync::Arc; error_chain! { links { @@ -30,29 +33,34 @@ error_chain! { display("packet already throttled: {:?}", msg_name), } - ChainIdMismatch{ours: ChainIdParams, theirs: ChainIdParams} { + ChainIdMismatch{ ours: ChainIdParams, theirs: ChainIdParams } { description("ChainId mismatch"), - display("ChainId mismatch, ours {:?}, theirs {:?}.", ours, theirs), + display("ChainId mismatch, ours={:?}, theirs={:?}.", ours, theirs), } - GenesisMismatch { + ClonableErrorWrapper(error: ClonableError) { + description("Clonable error"), + display("{:?}", error.0.lock().to_string()), + } + + GenesisMismatch{ ours: H256, theirs: H256 } { description("Genesis mismatch"), - display("Genesis mismatch"), + display("Genesis mismatch, ours={:?}, theirs={:?}.", ours, theirs), } - InternalError { + InternalError(details: String) { description("Internal error"), - display("Internal error"), + display("Internal error: {:?}", details), } - InvalidBloom { - description("Invalid bloom"), - display("Invalid bloom"), + InvalidBloom{ epoch: u64, expected: H256, received: H256 } { + description("Logs bloom hash validation failed"), + display("Logs bloom hash validation for epoch {} failed, expected={:?}, received={:?}", epoch, expected, received), } - InvalidLedgerProof { - description("Invalid ledger proof"), - display("Invalid ledger proof"), + InvalidLedgerProofSize{ hash: H256, expected: u64, received: u64 } { + description("Invalid ledger proof size"), + display("Invalid ledger proof size for header {:?}: expected={}, received={}", hash, expected, received), } InvalidMessageFormat { @@ -60,44 +68,54 @@ error_chain! { display("Invalid message format"), } - InvalidReceipts { - description("Invalid receipts"), - display("Invalid receipts"), + InvalidPreviousStateRoot{ current_epoch: u64, snapshot_epoch_count: u64, root: Option } { + description("Invalid previous state root"), + display("Invalid previous state root for epoch {} with snapshot epoch count {}: {:?}", current_epoch, snapshot_epoch_count, root), } - InvalidStateProof(reason: &'static str) { + InvalidReceipts{ epoch: u64, expected: H256, received: H256 } { + description("Receipts root validation failed"), + display("Receipts root validation for epoch {} failed, expected={:?}, received={:?}", epoch, expected, received), + } + + InvalidStateProof{ epoch: u64, key: Vec, value: Option>, reason: &'static str } { description("Invalid state proof"), - display("Invalid state proof: {}", reason), + display("Invalid state proof for key {:?} and value {:?} in epoch {}: {:?}", value, key, epoch, reason), } - InvalidStateRoot { - description("Invalid state root"), - display("Invalid state root"), + InvalidStateRoot{ epoch: u64, expected: H256, received: H256 } { + description("State root validation failed"), + display("State root validation for epoch {} failed, expected={:?}, received={:?}", epoch, expected, received), } - InvalidStorageRootProof(reason: &'static str) { + InvalidStorageRootProof{ epoch: u64, address: H160, reason: &'static str } { description("Invalid storage root proof"), - display("Invalid storage root proof: {}", reason), + display("Invalid storage root proof for address {:?} in epoch {}: {}", address, epoch, reason), } - InvalidTxInfo { + InvalidTxInfo{ reason: String } { description("Invalid tx info"), - display("Invalid tx info"), + display("Invalid tx info: {:?}", reason), } - InvalidTxRoot { - description("Invalid tx root"), - display("Invalid tx root"), + InvalidTxRoot{ hash: H256, expected: H256, received: H256 } { + description("Transaction root validation failed"), + display("Transaction root validation for block {:?} failed, expected={:?}, received={:?}", hash, expected, received), } - InvalidTxSignature { + InvalidTxSignature{ hash: H256 } { description("Invalid tx signature"), - display("Invalid tx signature"), + display("Invalid signature for transaction {:?}", hash), + } + + InvalidWitnessRoot{ hash: H256, expected: H256, received: H256 } { + description("Witness root validation failed"), + display("Witness root validation for header {:?} failed, expected={:?}, received={:?}", hash, expected, received), } - SendStatusFailed { + SendStatusFailed{ peer: NodeId } { description("Send status failed"), - display("Send status failed"), + display("Failed to send status to peer {:?}", peer), } Timeout(details: String) { @@ -110,49 +128,41 @@ error_chain! { display("packet {:?} throttled: {:?}", msg_name, response), } - UnableToProduceProof { - description("Unable to produce proof"), - display("Unable to produce proof"), - } - - UnableToProduceTxInfo(details: String) { + UnableToProduceTxInfo{ reason: String } { description("Unable to produce tx info"), - display("Unable to produce tx info: {:?}", details), + display("Unable to produce tx info: {:?}", reason), } - UnexpectedMessage { + UnexpectedMessage{ expected: Vec, received: MsgId } { description("Unexpected message"), - display("Unexpected message"), + display("Unexpected message id={:?}, expected one of {:?}", received, expected), } - UnexpectedPeerType { + UnexpectedPeerType{ node_type: NodeType } { description("Unexpected peer type"), - display("Unexpected peer type"), - } - - UnexpectedRequestId { - description("Unexpected request id"), - display("Unexpected request id"), + display("Unexpected peer type: {:?}", node_type), } - UnexpectedResponse { + UnexpectedResponse{ expected: Option, received: RequestId } { description("Unexpected response"), - display("Unexpected response"), + display("Unexpected response id; expected = {:?}, received = {:?}", expected, received), } - UnknownMessage { + UnknownMessage{ id: MsgId } { description("Unknown message"), - display("Unknown message"), + display("Unknown message: {:?}", id), } - UnknownPeer { - description("Unknown peer"), - display("Unknown peer"), + WitnessUnavailable{ epoch: u64 } { + description("Witness unavailable"), + display("Witness for epoch {} is not available", epoch), } } } -pub fn handle(io: &dyn NetworkContext, peer: &NodeId, msg_id: MsgId, e: Error) { +pub fn handle( + io: &dyn NetworkContext, peer: &NodeId, msg_id: MsgId, e: &Error, +) { warn!( "Error while handling message, peer={}, msg_id={:?}, error={}", peer, @@ -166,9 +176,14 @@ pub fn handle(io: &dyn NetworkContext, peer: &NodeId, msg_id: MsgId, e: Error) { // NOTE: do not use wildcard; this way, the compiler // will help covering all the cases. - match e.0 { + match &e.0 { + // for wrapped errors, handle based on the inner error + ErrorKind::ClonableErrorWrapper(e) => { + handle(io, peer, msg_id, &*e.0.lock()) + } + ErrorKind::Filter(_) - | ErrorKind::InternalError + | ErrorKind::InternalError(_) // NOTE: we should be tolerant of non-critical errors, // e.g. do not disconnect on requesting non-existing epoch @@ -176,44 +191,44 @@ pub fn handle(io: &dyn NetworkContext, peer: &NodeId, msg_id: MsgId, e: Error) { // NOTE: in order to let other protocols run, // we should not disconnect on protocol failure - | ErrorKind::SendStatusFailed + | ErrorKind::SendStatusFailed{..} | ErrorKind::Timeout(_) - // NOTE: if we do not have a confirmed (non-blamed) block - // with the info needed to produce a state root proof, we - // should not disconnect the peer - | ErrorKind::UnableToProduceProof - // if the tx requested has been removed locally, // we should not disconnect the peer - | ErrorKind::UnableToProduceTxInfo(_) + | ErrorKind::UnableToProduceTxInfo{..} + + // if the witness is not available, it is probably + // due to the local witness sync process + | ErrorKind::WitnessUnavailable{..} // NOTE: to help with backward-compatibility, we // should not disconnect on `UnknownMessage` - | ErrorKind::UnknownMessage => disconnect = false, + | ErrorKind::UnknownMessage{..} => disconnect = false, - ErrorKind::GenesisMismatch + ErrorKind::GenesisMismatch{..} | ErrorKind::ChainIdMismatch{..} - | ErrorKind::UnexpectedMessage - | ErrorKind::UnexpectedPeerType - | ErrorKind::UnknownPeer => op = Some(UpdateNodeOperation::Failure), + | ErrorKind::UnexpectedMessage{..} + | ErrorKind::UnexpectedPeerType{..} => op = Some(UpdateNodeOperation::Failure), - ErrorKind::UnexpectedRequestId | ErrorKind::UnexpectedResponse => { + ErrorKind::UnexpectedResponse{..} => { op = Some(UpdateNodeOperation::Demotion) } - ErrorKind::InvalidBloom - | ErrorKind::InvalidLedgerProof + ErrorKind::InvalidBloom{..} + | ErrorKind::InvalidLedgerProofSize{..} | ErrorKind::InvalidMessageFormat - | ErrorKind::InvalidReceipts - | ErrorKind::InvalidStateProof(_) - | ErrorKind::InvalidStateRoot - | ErrorKind::InvalidStorageRootProof(_) - | ErrorKind::InvalidTxInfo - | ErrorKind::InvalidTxRoot - | ErrorKind::InvalidTxSignature + | ErrorKind::InvalidPreviousStateRoot{..} + | ErrorKind::InvalidReceipts{..} + | ErrorKind::InvalidStateProof{..} + | ErrorKind::InvalidStateRoot{..} + | ErrorKind::InvalidStorageRootProof{..} + | ErrorKind::InvalidTxInfo{..} + | ErrorKind::InvalidTxRoot{..} + | ErrorKind::InvalidTxSignature{..} + | ErrorKind::InvalidWitnessRoot{..} | ErrorKind::AlreadyThrottled(_) | ErrorKind::Decoder(_) => op = Some(UpdateNodeOperation::Remove), @@ -274,3 +289,16 @@ pub fn handle(io: &dyn NetworkContext, peer: &NodeId, msg_id: MsgId, e: Error) { io.disconnect_peer(peer, op, reason.as_str()); } } + +#[derive(Clone, Debug)] +pub struct ClonableError(Arc>); + +impl Into for ClonableError { + fn into(self) -> Error { ErrorKind::ClonableErrorWrapper(self).into() } +} + +impl From for ClonableError { + fn from(e: Error) -> ClonableError { + ClonableError(Arc::new(Mutex::new(e))) + } +} diff --git a/core/src/light_protocol/handler/mod.rs b/core/src/light_protocol/handler/mod.rs index be248b50d9..d641b7f61d 100644 --- a/core/src/light_protocol/handler/mod.rs +++ b/core/src/light_protocol/handler/mod.rs @@ -8,6 +8,7 @@ use crate::{ consensus::SharedConsensusGraph, light_protocol::{ common::{validate_chain_id, FullPeerState, Peers}, + error::*, handle_error, message::{ msgid, BlockHashes as GetBlockHashesResponse, @@ -21,8 +22,8 @@ use crate::{ TxInfos as GetTxInfosResponse, Txs as GetTxsResponse, WitnessInfo as GetWitnessInfoResponse, }, - Error, ErrorKind, LIGHT_PROTOCOL_OLD_VERSIONS_TO_SUPPORT, - LIGHT_PROTOCOL_VERSION, LIGHT_PROTO_V1, + LIGHT_PROTOCOL_OLD_VERSIONS_TO_SUPPORT, LIGHT_PROTOCOL_VERSION, + LIGHT_PROTO_V1, }, message::{decode_msg, decode_rlp_and_check_deprecation, Message, MsgId}, network::{NetworkContext, NetworkProtocolHandler}, @@ -211,28 +212,28 @@ impl Handler { #[inline] fn get_existing_peer_state( &self, peer: &NodeId, - ) -> Result>, Error> { + ) -> Result>> { match self.peers.get(&peer) { Some(state) => Ok(state), None => { // NOTE: this should not happen as we register // all peers in `on_peer_connected` - error!("Received message from unknown peer={:?}", peer); - Err(ErrorKind::InternalError.into()) + bail!(ErrorKind::InternalError(format!( + "Received message from unknown peer={:?}", + peer + ))); } } } #[allow(unused)] #[inline] - fn peer_version(&self, peer: &NodeId) -> Result { + fn peer_version(&self, peer: &NodeId) -> Result { Ok(self.get_existing_peer_state(peer)?.read().protocol_version) } #[inline] - fn validate_peer_state( - &self, peer: &NodeId, msg_id: MsgId, - ) -> Result<(), Error> { + fn validate_peer_state(&self, peer: &NodeId, msg_id: MsgId) -> Result<()> { let state = self.get_existing_peer_state(&peer)?; if msg_id != msgid::STATUS_PONG_DEPRECATED @@ -240,39 +241,43 @@ impl Handler { && !state.read().handshake_completed { warn!("Received msg={:?} from handshaking peer={:?}", msg_id, peer); - return Err(ErrorKind::UnexpectedMessage.into()); + bail!(ErrorKind::UnexpectedMessage { + expected: vec![ + msgid::STATUS_PING_DEPRECATED, + msgid::STATUS_PING_V2 + ], + received: msg_id, + }); } Ok(()) } #[inline] - fn validate_peer_type(&self, node_type: &NodeType) -> Result<(), Error> { + fn validate_peer_type(&self, node_type: NodeType) -> Result<()> { match node_type { NodeType::Archive => Ok(()), NodeType::Full => Ok(()), - _ => Err(ErrorKind::UnexpectedPeerType.into()), + _ => bail!(ErrorKind::UnexpectedPeerType { node_type }), } } #[inline] - fn validate_genesis_hash(&self, genesis: H256) -> Result<(), Error> { - match self.consensus.get_data_manager().true_genesis.hash() { - h if h == genesis => Ok(()), - h => { - debug!( - "Genesis mismatch (ours: {:?}, theirs: {:?})", - h, genesis - ); - Err(ErrorKind::GenesisMismatch.into()) - } + fn validate_genesis_hash(&self, genesis: H256) -> Result<()> { + let ours = self.consensus.get_data_manager().true_genesis.hash(); + let theirs = genesis; + + if ours != theirs { + bail!(ErrorKind::GenesisMismatch { ours, theirs }); } + + Ok(()) } #[rustfmt::skip] fn dispatch_message( &self, io: &dyn NetworkContext, peer: &NodeId, msg_id: MsgId, rlp: Rlp, - ) -> Result<(), Error> { + ) -> Result<()> { trace!("Dispatching message: peer={:?}, msg_id={:?}", peer, msg_id); self.validate_peer_state(peer, msg_id)?; let min_supported_ver = self.minimum_supported_version(); @@ -300,7 +305,7 @@ impl Handler { // request was throttled by service provider msgid::THROTTLED => self.on_throttled(io, peer, decode_rlp_and_check_deprecation(&rlp, min_supported_ver, protocol)?), - _ => Err(ErrorKind::UnknownMessage.into()), + _ => bail!(ErrorKind::UnknownMessage{id: msg_id}), } } @@ -356,7 +361,7 @@ impl Handler { fn send_status( &self, io: &dyn NetworkContext, peer: &NodeId, peer_protocol_version: ProtocolVersion, - ) -> Result<(), Error> + ) -> Result<()> { let msg: Box; @@ -389,7 +394,7 @@ impl Handler { #[inline] pub fn send_raw_tx( &self, io: &dyn NetworkContext, peer: &NodeId, raw: Vec, - ) -> Result<(), Error> { + ) -> Result<()> { let msg: Box = Box::new(SendRawTx { raw }); msg.send(io, peer)?; Ok(()) @@ -397,10 +402,10 @@ impl Handler { fn on_status_v2( &self, io: &dyn NetworkContext, peer: &NodeId, status: StatusPongV2, - ) -> Result<(), Error> { + ) -> Result<()> { info!("on_status peer={:?} status={:?}", peer, status); - self.validate_peer_type(&status.node_type)?; + self.validate_peer_type(status.node_type)?; self.validate_genesis_hash(status.genesis_hash)?; validate_chain_id( &self.consensus.get_config().chain_id, @@ -424,7 +429,7 @@ impl Handler { fn on_status_deprecated( &self, io: &dyn NetworkContext, peer: &NodeId, status: StatusPongDeprecatedV1, - ) -> Result<(), Error> + ) -> Result<()> { info!("on_status peer={:?} status={:?}", peer, status); @@ -444,7 +449,7 @@ impl Handler { fn on_block_hashes( &self, io: &dyn NetworkContext, _peer: &NodeId, resp: GetBlockHashesResponse, - ) -> Result<(), Error> + ) -> Result<()> { debug!("on_block_hashes resp={:?}", resp); @@ -460,7 +465,7 @@ impl Handler { fn on_block_headers( &self, io: &dyn NetworkContext, peer: &NodeId, resp: GetBlockHeadersResponse, - ) -> Result<(), Error> + ) -> Result<()> { debug!("on_block_headers resp={:?}", resp); @@ -477,7 +482,7 @@ impl Handler { fn on_block_txs( &self, io: &dyn NetworkContext, peer: &NodeId, resp: GetBlockTxsResponse, - ) -> Result<(), Error> + ) -> Result<()> { debug!("on_block_txs resp={:?}", resp); @@ -493,7 +498,7 @@ impl Handler { fn on_blooms( &self, io: &dyn NetworkContext, peer: &NodeId, resp: GetBloomsResponse, - ) -> Result<(), Error> { + ) -> Result<()> { debug!("on_blooms resp={:?}", resp); self.blooms @@ -505,7 +510,7 @@ impl Handler { fn on_new_block_hashes( &self, io: &dyn NetworkContext, peer: &NodeId, msg: NewBlockHashes, - ) -> Result<(), Error> { + ) -> Result<()> { debug!("on_new_block_hashes msg={:?}", msg); if self.catch_up_mode() { @@ -530,7 +535,7 @@ impl Handler { fn on_receipts( &self, io: &dyn NetworkContext, peer: &NodeId, resp: GetReceiptsResponse, - ) -> Result<(), Error> + ) -> Result<()> { debug!("on_receipts resp={:?}", resp); @@ -547,7 +552,7 @@ impl Handler { fn on_state_entries( &self, io: &dyn NetworkContext, peer: &NodeId, resp: GetStateEntriesResponse, - ) -> Result<(), Error> + ) -> Result<()> { debug!("on_state_entries resp={:?}", resp); @@ -564,7 +569,7 @@ impl Handler { fn on_state_roots( &self, io: &dyn NetworkContext, peer: &NodeId, resp: GetStateRootsResponse, - ) -> Result<(), Error> + ) -> Result<()> { debug!("on_state_roots resp={:?}", resp); @@ -581,7 +586,7 @@ impl Handler { fn on_storage_roots( &self, io: &dyn NetworkContext, peer: &NodeId, resp: GetStorageRootsResponse, - ) -> Result<(), Error> + ) -> Result<()> { debug!("on_storage_roots resp={:?}", resp); @@ -597,7 +602,7 @@ impl Handler { fn on_txs( &self, io: &dyn NetworkContext, peer: &NodeId, resp: GetTxsResponse, - ) -> Result<(), Error> { + ) -> Result<()> { debug!("on_txs resp={:?}", resp); self.txs @@ -609,7 +614,7 @@ impl Handler { fn on_tx_infos( &self, io: &dyn NetworkContext, peer: &NodeId, resp: GetTxInfosResponse, - ) -> Result<(), Error> { + ) -> Result<()> { debug!("on_tx_infos resp={:?}", resp); self.tx_infos @@ -622,7 +627,7 @@ impl Handler { fn on_witness_info( &self, io: &dyn NetworkContext, peer: &NodeId, resp: GetWitnessInfoResponse, - ) -> Result<(), Error> + ) -> Result<()> { debug!("on_witness_info resp={:?}", resp); @@ -678,7 +683,7 @@ impl Handler { fn on_throttled( &self, _io: &dyn NetworkContext, peer: &NodeId, resp: Throttled, - ) -> Result<(), Error> { + ) -> Result<()> { debug!("on_throttled resp={:?}", resp); let peer = self.get_existing_peer_state(peer)?; @@ -729,7 +734,7 @@ impl NetworkProtocolHandler for Handler { io, peer, msgid::INVALID, - ErrorKind::InvalidMessageFormat.into(), + &ErrorKind::InvalidMessageFormat.into(), ) } }; @@ -737,7 +742,7 @@ impl NetworkProtocolHandler for Handler { debug!("on_message: peer={:?}, msgid={:?}", peer, msg_id); if let Err(e) = self.dispatch_message(io, peer, msg_id.into(), rlp) { - handle_error(io, peer, msg_id.into(), e); + handle_error(io, peer, msg_id.into(), &e); } } @@ -770,7 +775,7 @@ impl NetworkProtocolHandler for Handler { io, peer, msgid::INVALID, - ErrorKind::SendStatusFailed.into(), + &ErrorKind::SendStatusFailed { peer: *peer }.into(), ); } } diff --git a/core/src/light_protocol/handler/sync/block_txs.rs b/core/src/light_protocol/handler/sync/block_txs.rs index c746b878b2..b0b7b2ef8b 100644 --- a/core/src/light_protocol/handler/sync/block_txs.rs +++ b/core/src/light_protocol/handler/sync/block_txs.rs @@ -12,8 +12,8 @@ use crate::{ consensus::SharedConsensusGraph, light_protocol::{ common::{FullPeerState, LedgerInfo, Peers}, + error::*, message::{msgid, BlockTxsWithHash, GetBlockTxs}, - Error, ErrorKind, }, message::{Message, RequestId}, network::NetworkContext, @@ -25,6 +25,7 @@ use crate::{ UniqueId, }; use cfx_types::H256; +use futures::future::FutureExt; use lru_time_cache::LruCache; use network::node_table::NodeId; use parking_lot::RwLock; @@ -41,6 +42,8 @@ struct Statistics { // prioritize earlier requests type MissingBlockTxs = TimeOrdered; +type PendingBlockTxs = PendingItem, ClonableError>; + pub struct BlockTxs { // helper API for retrieving ledger information ledger: LedgerInfo, @@ -55,7 +58,7 @@ pub struct BlockTxs { txs: Arc, // block txs received from full node - verified: Arc>>>>, + verified: Arc>>, } impl BlockTxs { @@ -92,20 +95,28 @@ impl BlockTxs { #[inline] pub fn request( &self, hash: H256, - ) -> impl Future> { - if !self.verified.read().contains_key(&hash) { + ) -> impl Future>> { + let mut verified = self.verified.write(); + + if !verified.contains_key(&hash) { let missing = MissingBlockTxs::new(hash); self.sync_manager.insert_waiting(std::iter::once(missing)); } + verified + .entry(hash) + .or_insert(PendingItem::pending()) + .clear_error(); + FutureItem::new(hash, self.verified.clone()) + .map(|res| res.map_err(|e| e.into())) } #[inline] pub fn receive( &self, peer: &NodeId, id: RequestId, block_txs: impl Iterator, - ) -> Result<(), Error> + ) -> Result<()> { for BlockTxsWithHash { hash, block_txs } in block_txs { debug!("Validating block_txs {:?} with hash {}", block_txs, hash); @@ -122,14 +133,21 @@ impl BlockTxs { #[inline] pub fn validate_and_store( &self, hash: H256, block_txs: Vec, - ) -> Result<(), Error> { - // validate and store each transaction - for tx in &block_txs { - self.txs.validate_and_store(tx.clone())?; - } - + ) -> Result<()> { // validate block txs - self.validate_block_txs(hash, &block_txs)?; + if let Err(e) = self.validate_block_txs(hash, &block_txs) { + // forward error to both rpc caller(s) and sync handler + // so we need to make it clonable + let e = ClonableError::from(e); + + self.verified + .write() + .entry(hash) + .or_insert(PendingItem::pending()) + .set_error(e.clone()); + + bail!(e); + } // store block bodies by block hash self.verified @@ -156,7 +174,7 @@ impl BlockTxs { #[inline] fn send_request( &self, io: &dyn NetworkContext, peer: &NodeId, hashes: Vec, - ) -> Result, Error> { + ) -> Result> { debug!("send_request peer={:?} hashes={:?}", peer, hashes); if hashes.is_empty() { @@ -185,20 +203,22 @@ impl BlockTxs { #[inline] pub fn validate_block_txs( &self, hash: H256, txs: &Vec, - ) -> Result<(), Error> { - // NOTE: tx signatures have been validated previously - - let local = *self.ledger.header(hash)?.transactions_root(); + ) -> Result<()> { + // validate each transaction first + for tx in txs { + self.txs.validate_tx(&tx)?; + } + let expected = *self.ledger.header(hash)?.transactions_root(); let txs: Vec<_> = txs.iter().map(|tx| Arc::new(tx.clone())).collect(); let received = compute_transaction_root(&txs); - if received != local { - warn!( - "Tx root validation failed, received={:?}, local={:?}", - received, local - ); - return Err(ErrorKind::InvalidTxRoot.into()); + if received != expected { + bail!(ErrorKind::InvalidTxRoot { + hash, + expected, + received, + }); } Ok(()) diff --git a/core/src/light_protocol/handler/sync/blooms.rs b/core/src/light_protocol/handler/sync/blooms.rs index eb57cf1ac3..43a03b931c 100644 --- a/core/src/light_protocol/handler/sync/blooms.rs +++ b/core/src/light_protocol/handler/sync/blooms.rs @@ -13,8 +13,8 @@ use crate::{ hash::keccak, light_protocol::{ common::{FullPeerState, Peers}, + error::*, message::{msgid, BloomWithEpoch, GetBlooms}, - Error, ErrorKind, }, message::{Message, RequestId}, network::NetworkContext, @@ -29,6 +29,7 @@ use super::{ common::{FutureItem, KeyOrdered, PendingItem, SyncManager}, witnesses::Witnesses, }; +use futures::future::FutureExt; use network::node_table::NodeId; #[derive(Debug)] @@ -41,6 +42,8 @@ struct Statistics { // prioritize higher epochs type MissingBloom = KeyOrdered; +type PendingBloom = PendingItem; + pub struct Blooms { // series of unique request ids request_id_allocator: Arc, @@ -49,7 +52,7 @@ pub struct Blooms { sync_manager: SyncManager, // bloom filters received from full node - verified: Arc>>>, + verified: Arc>>, // witness sync manager witnesses: Arc, @@ -84,26 +87,32 @@ impl Blooms { } #[inline] - pub fn request(&self, epoch: u64) -> impl Future { + pub fn request(&self, epoch: u64) -> impl Future> { + let mut verified = self.verified.write(); + if epoch == 0 { - self.verified - .write() - .insert(0, PendingItem::ready(Bloom::zero())); + verified.insert(0, PendingItem::ready(Bloom::zero())); } - if !self.verified.read().contains_key(&epoch) { + if !verified.contains_key(&epoch) { let missing = MissingBloom::new(epoch); self.sync_manager.insert_waiting(std::iter::once(missing)); } + verified + .entry(epoch) + .or_insert(PendingItem::pending()) + .clear_error(); + FutureItem::new(epoch, self.verified.clone()) + .map(|res| res.map_err(|e| e.into())) } #[inline] pub fn receive( &self, peer: &NodeId, id: RequestId, blooms: impl Iterator, - ) -> Result<(), Error> + ) -> Result<()> { for BloomWithEpoch { epoch, bloom } in blooms { debug!("Validating bloom {:?} with epoch {}", bloom, epoch); @@ -118,11 +127,21 @@ impl Blooms { } #[inline] - pub fn validate_and_store( - &self, epoch: u64, bloom: Bloom, - ) -> Result<(), Error> { + pub fn validate_and_store(&self, epoch: u64, bloom: Bloom) -> Result<()> { // validate bloom - self.validate_bloom(epoch, bloom)?; + if let Err(e) = self.validate_bloom(epoch, bloom) { + // forward error to both rpc caller(s) and sync handler + // so we need to make it clonable + let e = ClonableError::from(e); + + self.verified + .write() + .entry(epoch) + .or_insert(PendingItem::pending()) + .set_error(e.clone()); + + bail!(e); + } // store bloom by epoch self.verified @@ -149,7 +168,7 @@ impl Blooms { #[inline] fn send_request( &self, io: &dyn NetworkContext, peer: &NodeId, epochs: Vec, - ) -> Result, Error> { + ) -> Result> { debug!("send_request peer={:?} epochs={:?}", peer, epochs); if epochs.is_empty() { @@ -175,29 +194,23 @@ impl Blooms { } #[inline] - fn validate_bloom(&self, epoch: u64, bloom: Bloom) -> Result<(), Error> { + fn validate_bloom(&self, epoch: u64, bloom: Bloom) -> Result<()> { // calculate received bloom hash let received = keccak(bloom); // retrieve local bloom hash - let local = match self.witnesses.root_hashes_of(epoch) { + let expected = match self.witnesses.root_hashes_of(epoch) { Some((_, _, bloom_hash)) => bloom_hash, - None => { - warn!( - "Bloom hash not found, epoch={}, bloom={:?}", - epoch, bloom - ); - return Err(ErrorKind::InternalError.into()); - } + None => bail!(ErrorKind::WitnessUnavailable { epoch }), }; // check - if received != local { - warn!( - "Bloom validation failed, received={:?}, local={:?}", - received, local - ); - return Err(ErrorKind::InvalidBloom.into()); + if received != expected { + bail!(ErrorKind::InvalidBloom { + epoch, + expected, + received, + }); } Ok(()) diff --git a/core/src/light_protocol/handler/sync/common/future_item.rs b/core/src/light_protocol/handler/sync/common/future_item.rs index 97e4a32321..a1daef824f 100644 --- a/core/src/light_protocol/handler/sync/common/future_item.rs +++ b/core/src/light_protocol/handler/sync/common/future_item.rs @@ -14,20 +14,25 @@ use std::{ task::{Context, Poll, Waker}, }; -pub enum PendingItem { - Ready(T), +pub enum PendingItem { + Ready(Item), Pending(Vec), + Error(Err), } -impl PendingItem { - pub fn pending() -> PendingItem { Self::Pending(vec![]) } +impl PendingItem { + pub fn pending() -> Self { Self::Pending(vec![]) } - pub fn ready(item: T) -> PendingItem { Self::Ready(item) } -} + pub fn ready(item: Item) -> Self { Self::Ready(item) } + + pub fn clear_error(&mut self) { + if let Self::Error(_) = self { + *self = Self::pending(); + } + } -impl PendingItem { // NOTE: `set` has to be called in a thread-safe environment - pub fn set(&mut self, item: T) { + pub fn set(&mut self, item: Item) { match self { Self::Ready(_old) => { // FIXME: we might want to check if old == item and raise an @@ -46,43 +51,77 @@ impl PendingItem { w.wake(); } } + Self::Error(_) => { + // if we managed to verify the item, we do not care about the + // error anymore. wakers must have been notified when `self` was + // set to `Error`, so they either received an error or haven't + // polled yet. + *self = Self::Ready(item); + } + } + } + + // NOTE: `set_error` has to be called in a thread-safe environment + pub fn set_error(&mut self, err: Err) { + match self { + Self::Ready(_) => { + // if we already have a verified value, we do not care about + // errors anymore + } + Self::Pending(ws) => { + // move `ws` out + let ws = std::mem::replace(ws, Vec::::new()); + + // transform `self` + *self = Self::Error(err); + + // notify waiting futures + for w in ws { + w.wake(); + } + } + Self::Error(_) => { + *self = Self::Error(err); + } } } } -impl PendingItem { +impl PendingItem { // NOTE: `poll` has to be called in a thread-safe environment - fn poll(&mut self, ctx: &mut Context) -> Poll { + fn poll(&mut self, ctx: &mut Context) -> Poll> { match self { - Self::Ready(item) => Poll::Ready(item.clone()), + Self::Ready(item) => Poll::Ready(Ok(item.clone())), Self::Pending(ws) => { // FIXME: is it safe to keep old wakers? ws.push(ctx.waker().clone()); Poll::Pending } + Self::Error(e) => Poll::Ready(Err(e.clone())), } } } -pub struct FutureItem { +pub struct FutureItem { key: K, - verified: Arc>>>, + verified: Arc>>>, } -impl FutureItem { +impl FutureItem { pub fn new( - key: K, verified: Arc>>>, - ) -> FutureItem { + key: K, verified: Arc>>>, + ) -> FutureItem { FutureItem { key, verified } } } -impl Future for FutureItem +impl Future for FutureItem where K: Clone + Eq + Hash + Ord, V: Clone, + E: Clone, { - type Output = V; + type Output = Result; fn poll(self: Pin<&mut Self>, ctx: &mut Context) -> Poll { self.verified @@ -102,13 +141,58 @@ mod tests { use std::{sync::Arc, time::Duration}; use tokio::{runtime::Runtime, time::delay_for}; + #[test] + fn test_set() { + const KEY: u64 = 1; + const VALUE: u64 = 2; + const ERROR: u64 = 3; + + let cache = LruCache::>::with_capacity(1); + let verified = Arc::new(RwLock::new(cache)); + + let mut runtime = Runtime::new().expect("Unable to create a runtime"); + + // set error + verified + .write() + .entry(KEY) + .or_insert(PendingItem::pending()) + .set_error(ERROR); + + // caller should get the error + let res = runtime.block_on(FutureItem::new(KEY, verified.clone())); + assert_eq!(res, Err(ERROR)); + + // set value + verified + .write() + .entry(KEY) + .or_insert(PendingItem::pending()) + .set(VALUE); + + // caller should get the value + let res = runtime.block_on(FutureItem::new(KEY, verified.clone())); + assert_eq!(res, Ok(VALUE)); + + // set error again + verified + .write() + .entry(KEY) + .or_insert(PendingItem::pending()) + .set_error(ERROR); + + // result is not overwritten by error + let res = runtime.block_on(FutureItem::new(KEY, verified.clone())); + assert_eq!(res, Ok(VALUE)); + } + #[test] fn test_concurrent_access() { const KEY: u64 = 1; const VALUE: u64 = 2; const DELAY: u64 = 10; - let cache = LruCache::>::with_capacity(1); + let cache = LruCache::>::with_capacity(1); let verified = Arc::new(RwLock::new(cache)); // we will simulate 3 concurrent accesses to the same item @@ -141,8 +225,8 @@ mod tests { let mut runtime = Runtime::new().expect("Unable to create a runtime"); let (res1, (res2, res3), _) = runtime.block_on(join3(fut1, fut2, fut3)); - assert_eq!(res1, VALUE); - assert_eq!(res2, VALUE); - assert_eq!(res3, VALUE); + assert_eq!(res1, Ok(VALUE)); + assert_eq!(res2, Ok(VALUE)); + assert_eq!(res3, Ok(VALUE)); } } diff --git a/core/src/light_protocol/handler/sync/common/ledger_proof.rs b/core/src/light_protocol/handler/sync/common/ledger_proof.rs index c3ba3b2184..2181653e0e 100644 --- a/core/src/light_protocol/handler/sync/common/ledger_proof.rs +++ b/core/src/light_protocol/handler/sync/common/ledger_proof.rs @@ -31,7 +31,7 @@ impl Index for LedgerProof { impl LedgerProof { pub fn validate(&self, witness: &BlockHeader) -> Result<(), Error> { // extract proof hashes and corresponding local root hash - let (hashes, local_root_hash) = match self { + let (hashes, expected) = match self { LedgerProof::StateRoot(hashes) => { (hashes, *witness.deferred_state_root()) } @@ -44,19 +44,19 @@ impl LedgerProof { }; // validate the number of hashes provided against local witness blame + let hash = witness.hash(); let blame = witness.blame() as u64; if hashes.len() as u64 != blame + 1 { - info!( - "Invalid number of hashes provided: expected={}, received={}", - blame + 1, - hashes.len() - ); - return Err(ErrorKind::InvalidLedgerProof.into()); + bail!(ErrorKind::InvalidLedgerProofSize { + hash, + expected: blame + 1, + received: hashes.len() as u64 + }); } // compute witness deferred root hash from the hashes provided - let received_root_hash = match blame { + let received = match blame { 0 => hashes[0], _ => { let hashes = hashes.clone(); @@ -65,12 +65,12 @@ impl LedgerProof { }; // validate against local witness deferred state root hash - if received_root_hash != local_root_hash { - info!( - "Witness root hash mismatch: local={:?}, received={:?}", - local_root_hash, received_root_hash - ); - return Err(ErrorKind::InvalidLedgerProof.into()); + if received != expected { + bail!(ErrorKind::InvalidWitnessRoot { + hash, + expected, + received, + }); } Ok(()) diff --git a/core/src/light_protocol/handler/sync/common/sync_manager.rs b/core/src/light_protocol/handler/sync/common/sync_manager.rs index e545835b07..f8eb9a4ca9 100644 --- a/core/src/light_protocol/handler/sync/common/sync_manager.rs +++ b/core/src/light_protocol/handler/sync/common/sync_manager.rs @@ -93,8 +93,10 @@ where match self.peers.get(peer) { Some(state) => Ok(state), None => { - error!("Received message from unknown peer={:?}", peer); - Err(ErrorKind::InternalError.into()) + bail!(ErrorKind::InternalError(format!( + "Received message from unknown peer={:?}", + peer + ))); } } } @@ -123,7 +125,10 @@ where ThrottleResult::Success => Ok(id), ThrottleResult::Throttled(_) => Ok(id), ThrottleResult::AlreadyThrottled => { - Err(ErrorKind::UnexpectedResponse.into()) + bail!(ErrorKind::UnexpectedResponse { + expected: id, + received: request_id, + }); } } } diff --git a/core/src/light_protocol/handler/sync/receipts.rs b/core/src/light_protocol/handler/sync/receipts.rs index 2631ac10b8..0d1ccf0968 100644 --- a/core/src/light_protocol/handler/sync/receipts.rs +++ b/core/src/light_protocol/handler/sync/receipts.rs @@ -11,8 +11,8 @@ use std::{future::Future, sync::Arc}; use crate::{ light_protocol::{ common::{FullPeerState, Peers}, + error::*, message::{msgid, GetReceipts, ReceiptsWithEpoch}, - Error, ErrorKind, }, message::{Message, RequestId}, network::NetworkContext, @@ -29,6 +29,7 @@ use super::{ witnesses::Witnesses, }; use crate::verification::compute_receipts_root; +use futures::future::FutureExt; use network::node_table::NodeId; #[derive(Debug)] @@ -41,6 +42,8 @@ struct Statistics { // prioritize higher epochs type MissingReceipts = KeyOrdered; +type PendingReceipts = PendingItem, ClonableError>; + pub struct Receipts { // series of unique request ids request_id_allocator: Arc, @@ -49,7 +52,7 @@ pub struct Receipts { sync_manager: SyncManager, // epoch receipts received from full node - verified: Arc>>>>, + verified: Arc>>, // witness sync manager witnesses: Arc, @@ -86,24 +89,32 @@ impl Receipts { #[inline] pub fn request( &self, epoch: u64, - ) -> impl Future> { + ) -> impl Future>> { + let mut verified = self.verified.write(); + if epoch == 0 { - self.verified.write().insert(0, PendingItem::ready(vec![])); + verified.insert(0, PendingItem::ready(vec![])); } - if !self.verified.read().contains_key(&epoch) { + if !verified.contains_key(&epoch) { let missing = MissingReceipts::new(epoch); self.sync_manager.insert_waiting(std::iter::once(missing)); } + verified + .entry(epoch) + .or_insert(PendingItem::pending()) + .clear_error(); + FutureItem::new(epoch, self.verified.clone()) + .map(|res| res.map_err(|e| e.into())) } #[inline] pub fn receive( &self, peer: &NodeId, id: RequestId, receipts: impl Iterator, - ) -> Result<(), Error> + ) -> Result<()> { for ReceiptsWithEpoch { epoch, @@ -127,9 +138,21 @@ impl Receipts { #[inline] pub fn validate_and_store( &self, epoch: u64, receipts: Vec, - ) -> Result<(), Error> { + ) -> Result<()> { // validate receipts - self.validate_receipts(epoch, &receipts)?; + if let Err(e) = self.validate_receipts(epoch, &receipts) { + // forward error to both rpc caller(s) and sync handler + // so we need to make it clonable + let e = ClonableError::from(e); + + self.verified + .write() + .entry(epoch) + .or_insert(PendingItem::pending()) + .set_error(e.clone()); + + bail!(e); + } // store receipts by epoch self.verified @@ -156,7 +179,7 @@ impl Receipts { #[inline] fn send_request( &self, io: &dyn NetworkContext, peer: &NodeId, epochs: Vec, - ) -> Result, Error> { + ) -> Result> { debug!("send_request peer={:?} epochs={:?}", peer, epochs); if epochs.is_empty() { @@ -185,7 +208,7 @@ impl Receipts { #[inline] fn validate_receipts( &self, epoch: u64, receipts: &Vec, - ) -> Result<(), Error> { + ) -> Result<()> { // calculate received receipts root // convert Vec> -> Vec>> // for API compatibility @@ -198,24 +221,18 @@ impl Receipts { let received = compute_receipts_root(&rs); // retrieve local receipts root - let local = match self.witnesses.root_hashes_of(epoch) { + let expected = match self.witnesses.root_hashes_of(epoch) { Some((_, receipts_root, _)) => receipts_root, - None => { - warn!( - "Receipt root not found, epoch={}, receipts={:?}", - epoch, receipts - ); - return Err(ErrorKind::InternalError.into()); - } + None => bail!(ErrorKind::WitnessUnavailable { epoch }), }; // check - if received != local { - warn!( - "Receipt validation failed, received={:?}, local={:?}", - received, local - ); - return Err(ErrorKind::InvalidBloom.into()); + if received != expected { + bail!(ErrorKind::InvalidReceipts { + epoch, + expected, + received, + }); } Ok(()) diff --git a/core/src/light_protocol/handler/sync/state_entries.rs b/core/src/light_protocol/handler/sync/state_entries.rs index 9951c95470..ac00f55dca 100644 --- a/core/src/light_protocol/handler/sync/state_entries.rs +++ b/core/src/light_protocol/handler/sync/state_entries.rs @@ -30,6 +30,7 @@ use super::{ common::{FutureItem, PendingItem, SyncManager, TimeOrdered}, state_roots::StateRoots, }; +use futures::future::FutureExt; use network::node_table::NodeId; use primitives::StorageKey; @@ -44,6 +45,8 @@ struct Statistics { type MissingStateEntry = TimeOrdered; +type PendingStateEntry = PendingItem; + pub struct StateEntries { // series of unique request ids request_id_allocator: Arc, @@ -55,7 +58,7 @@ pub struct StateEntries { sync_manager: SyncManager, // state entries received from full node - verified: Arc>>>, + verified: Arc>>, } impl StateEntries { @@ -90,10 +93,11 @@ impl StateEntries { #[inline] pub fn request_now( &self, io: &dyn NetworkContext, epoch: u64, key: Vec, - ) -> impl Future { + ) -> impl Future> { + let mut verified = self.verified.write(); let key = StateKey { epoch, key }; - if !self.verified.read().contains_key(&key) { + if !verified.contains_key(&key) { let missing = std::iter::once(MissingStateEntry::new(key.clone())); self.sync_manager.request_now(missing, |peer, keys| { @@ -101,7 +105,13 @@ impl StateEntries { }); } + verified + .entry(key.clone()) + .or_insert(PendingItem::pending()) + .clear_error(); + FutureItem::new(key, self.verified.clone()) + .map(|res| res.map_err(|e| e.into())) } #[inline] @@ -111,7 +121,10 @@ impl StateEntries { ) -> Result<()> { for StateEntryWithKey { key, entry, proof } in entries { - debug!("Validating state entry {:?} with key {:?}", entry, key); + debug!( + "Validating state entry {:?} with key {:?} and proof {:?}", + entry, key, proof + ); match self.sync_manager.check_if_requested(peer, id, &key)? { None => continue, @@ -127,7 +140,21 @@ impl StateEntries { &self, key: StateKey, entry: Option>, proof: StateEntryProof, ) -> Result<()> { // validate state entry - self.validate_state_entry(key.epoch, &key.key, &entry, proof)?; + if let Err(e) = + self.validate_state_entry(key.epoch, &key.key, &entry, proof) + { + // forward error to both rpc caller(s) and sync handler + // so we need to make it clonable + let e = ClonableError::from(e); + + self.verified + .write() + .entry(key.clone()) + .or_insert(PendingItem::pending()) + .set_error(e.clone()); + + bail!(e); + } // store state entry by state key self.verified @@ -192,10 +219,11 @@ impl StateEntries { self.state_roots .validate_state_root(epoch, &state_root) - .chain_err(|| { - ErrorKind::InvalidStateProof( - "Validation of current state root failed", - ) + .chain_err(|| ErrorKind::InvalidStateProof { + epoch, + key: key.clone(), + value: value.clone(), + reason: "Validation of current state root failed", })?; // validate previous state root @@ -203,10 +231,11 @@ impl StateEntries { self.state_roots .validate_prev_snapshot_state_root(epoch, &maybe_prev_root) - .chain_err(|| { - ErrorKind::InvalidStateProof( - "Validation of previous state root failed", - ) + .chain_err(|| ErrorKind::InvalidStateProof { + epoch, + key: key.clone(), + value: value.clone(), + reason: "Validation of previous state root failed", })?; // construct padding @@ -224,11 +253,12 @@ impl StateEntries { state_root, maybe_intermediate_padding, ) { - warn!("Invalid state proof for {:?} under key {:?}", value, key); - return Err(ErrorKind::InvalidStateProof( - "State proof validation failed", - ) - .into()); + bail!(ErrorKind::InvalidStateProof { + epoch, + key: key.clone(), + value: value.clone(), + reason: "Validation of merkle proof failed", + }); } Ok(()) diff --git a/core/src/light_protocol/handler/sync/state_roots.rs b/core/src/light_protocol/handler/sync/state_roots.rs index a46d1fdc7c..0373a5ed5f 100644 --- a/core/src/light_protocol/handler/sync/state_roots.rs +++ b/core/src/light_protocol/handler/sync/state_roots.rs @@ -12,8 +12,8 @@ use std::{future::Future, sync::Arc}; use crate::{ light_protocol::{ common::{FullPeerState, Peers}, + error::*, message::{msgid, GetStateRoots, StateRootWithEpoch}, - Error, ErrorKind, }, message::{Message, RequestId}, network::NetworkContext, @@ -28,6 +28,7 @@ use super::{ common::{FutureItem, PendingItem, SyncManager, TimeOrdered}, witnesses::Witnesses, }; +use futures::future::FutureExt; use network::node_table::NodeId; #[derive(Debug)] @@ -39,6 +40,8 @@ struct Statistics { type MissingStateRoot = TimeOrdered; +type PendingStateRoot = PendingItem; + pub struct StateRoots { // series of unique request ids request_id_allocator: Arc, @@ -50,7 +53,7 @@ pub struct StateRoots { sync_manager: SyncManager, // bloom filters received from full node - verified: Arc>>>, + verified: Arc>>, // witness sync manager witnesses: Arc, @@ -98,8 +101,10 @@ impl StateRoots { #[inline] pub fn request_now( &self, io: &dyn NetworkContext, epoch: u64, - ) -> impl Future { - if !self.verified.read().contains_key(&epoch) { + ) -> impl Future> { + let mut verified = self.verified.write(); + + if !verified.contains_key(&epoch) { let missing = std::iter::once(MissingStateRoot::new(epoch)); self.sync_manager.request_now(missing, |peer, epochs| { @@ -107,14 +112,20 @@ impl StateRoots { }); } + verified + .entry(epoch) + .or_insert(PendingItem::pending()) + .clear_error(); + FutureItem::new(epoch, self.verified.clone()) + .map(|res| res.map_err(|e| e.into())) } #[inline] pub fn receive( &self, peer: &NodeId, id: RequestId, state_roots: impl Iterator, - ) -> Result<(), Error> + ) -> Result<()> { for StateRootWithEpoch { epoch, state_root } in state_roots { debug!( @@ -134,9 +145,21 @@ impl StateRoots { #[inline] pub fn validate_and_store( &self, epoch: u64, state_root: StateRoot, - ) -> Result<(), Error> { + ) -> Result<()> { // validate state root - self.validate_state_root(epoch, &state_root)?; + if let Err(e) = self.validate_state_root(epoch, &state_root) { + // forward error to both rpc caller(s) and sync handler + // so we need to make it clonable + let e = ClonableError::from(e); + + self.verified + .write() + .entry(epoch) + .or_insert(PendingItem::pending()) + .set_error(e.clone()); + + bail!(e); + } // store state root by epoch self.verified @@ -163,7 +186,7 @@ impl StateRoots { #[inline] fn send_request( &self, io: &dyn NetworkContext, peer: &NodeId, epochs: Vec, - ) -> Result, Error> { + ) -> Result> { debug!("send_request peer={:?} epochs={:?}", peer, epochs); if epochs.is_empty() { @@ -192,29 +215,23 @@ impl StateRoots { #[inline] pub fn validate_state_root( &self, epoch: u64, state_root: &StateRoot, - ) -> Result<(), Error> { + ) -> Result<()> { // calculate received state root hash let received = state_root.compute_state_root_hash(); // retrieve local state root hash - let local = match self.witnesses.root_hashes_of(epoch) { + let expected = match self.witnesses.root_hashes_of(epoch) { Some((state_root, _, _)) => state_root, - None => { - warn!( - "State root hash not found, epoch={}, state_root={:?}", - epoch, state_root - ); - return Err(ErrorKind::InternalError.into()); - } + None => bail!(ErrorKind::WitnessUnavailable { epoch }), }; // check - if received != local { - warn!( - "State root validation failed, received={:?}, local={:?}", - received, local - ); - return Err(ErrorKind::InvalidStateRoot.into()); + if received != expected { + bail!(ErrorKind::InvalidStateRoot { + epoch, + expected, + received, + }); } Ok(()) @@ -224,25 +241,38 @@ impl StateRoots { pub fn validate_prev_snapshot_state_root( &self, current_epoch: u64, maybe_prev_snapshot_state_root: &Option, - ) -> Result<(), Error> + ) -> Result<()> { + let snapshot_epoch_count = self.snapshot_epoch_count; + match maybe_prev_snapshot_state_root { Some(ref root) => { // root provided for non-existent epoch - if current_epoch <= self.snapshot_epoch_count { - return Err(ErrorKind::InvalidStateRoot.into()); + if current_epoch <= snapshot_epoch_count { + // previous root should not have been provided + // for the first snapshot period + bail!(ErrorKind::InvalidPreviousStateRoot { + current_epoch, + snapshot_epoch_count, + root: maybe_prev_snapshot_state_root.clone() + }); } // root provided for previous snapshot self.validate_state_root( - current_epoch - self.snapshot_epoch_count, + current_epoch - snapshot_epoch_count, &root, )?; } None => { - // root not provided even though previous snapshot exists - if current_epoch > self.snapshot_epoch_count { - return Err(ErrorKind::InvalidStateRoot.into()); + if current_epoch > snapshot_epoch_count { + // previous root should have been provided + // for subsequent snapshot periods + bail!(ErrorKind::InvalidPreviousStateRoot { + current_epoch, + snapshot_epoch_count, + root: maybe_prev_snapshot_state_root.clone() + }); } } } diff --git a/core/src/light_protocol/handler/sync/storage_roots.rs b/core/src/light_protocol/handler/sync/storage_roots.rs index 7ca84c7828..145e06fb6b 100644 --- a/core/src/light_protocol/handler/sync/storage_roots.rs +++ b/core/src/light_protocol/handler/sync/storage_roots.rs @@ -31,9 +31,9 @@ use super::{ state_roots::StateRoots, }; use cfx_types::H160; +use futures::future::FutureExt; use network::node_table::NodeId; use primitives::{StorageKey, StorageRoot}; - #[derive(Debug)] struct Statistics { cached: usize, @@ -43,6 +43,8 @@ struct Statistics { type MissingStorageRoot = TimeOrdered; +type PendingStorageRoot = PendingItem, ClonableError>; + pub struct StorageRoots { // series of unique request ids request_id_allocator: Arc, @@ -54,8 +56,7 @@ pub struct StorageRoots { sync_manager: SyncManager, // state entries received from full node - verified: - Arc>>>>, + verified: Arc>>, } impl StorageRoots { @@ -90,10 +91,11 @@ impl StorageRoots { #[inline] pub fn request_now( &self, io: &dyn NetworkContext, epoch: u64, address: H160, - ) -> impl Future> { + ) -> impl Future>> { + let mut verified = self.verified.write(); let key = StorageRootKey { epoch, address }; - if !self.verified.read().contains_key(&key) { + if !verified.contains_key(&key) { let missing = std::iter::once(MissingStorageRoot::new(key.clone())); self.sync_manager.request_now(missing, |peer, keys| { @@ -101,7 +103,13 @@ impl StorageRoots { }); } + verified + .entry(key.clone()) + .or_insert(PendingItem::pending()) + .clear_error(); + FutureItem::new(key, self.verified.clone()) + .map(|res| res.map_err(|e| e.into())) } #[inline] @@ -129,7 +137,21 @@ impl StorageRoots { ) -> Result<()> { // validate storage root - self.validate_storage_root(key.epoch, &key.address, &root, proof)?; + if let Err(e) = + self.validate_storage_root(key.epoch, key.address, &root, proof) + { + // forward error to both rpc caller(s) and sync handler + // so we need to make it clonable + let e = ClonableError::from(e); + + self.verified + .write() + .entry(key.clone()) + .or_insert(PendingItem::pending()) + .set_error(e.clone()); + + bail!(e); + } // store storage root by storage root key self.verified @@ -187,7 +209,7 @@ impl StorageRoots { #[inline] fn validate_storage_root( - &self, epoch: u64, address: &H160, storage_root: &Option, + &self, epoch: u64, address: H160, storage_root: &Option, proof: StorageRootProof, ) -> Result<()> { @@ -196,10 +218,10 @@ impl StorageRoots { self.state_roots .validate_state_root(epoch, &state_root) - .chain_err(|| { - ErrorKind::InvalidStorageRootProof( - "Validation of current state root failed", - ) + .chain_err(|| ErrorKind::InvalidStorageRootProof { + epoch, + address, + reason: "Validation of current state root failed", })?; // validate previous state root @@ -207,10 +229,10 @@ impl StorageRoots { self.state_roots .validate_prev_snapshot_state_root(epoch, &maybe_prev_root) - .chain_err(|| { - ErrorKind::InvalidStorageRootProof( - "Validation of previous state root failed", - ) + .chain_err(|| ErrorKind::InvalidStorageRootProof { + epoch, + address, + reason: "Validation of previous state root failed", })?; // construct padding @@ -230,14 +252,11 @@ impl StorageRoots { state_root, maybe_intermediate_padding, ) { - warn!( - "Invalid storage root proof for {:?} under key {:?}", - storage_root, key - ); - return Err(ErrorKind::InvalidStorageRootProof( - "Validation of merkle proof failed", - ) - .into()); + bail!(ErrorKind::InvalidStorageRootProof { + epoch, + address, + reason: "Validation of merkle proof failed", + }); } Ok(()) diff --git a/core/src/light_protocol/handler/sync/tx_infos.rs b/core/src/light_protocol/handler/sync/tx_infos.rs index 5b92480801..82e6d2f8cf 100644 --- a/core/src/light_protocol/handler/sync/tx_infos.rs +++ b/core/src/light_protocol/handler/sync/tx_infos.rs @@ -18,8 +18,8 @@ use crate::{ consensus::SharedConsensusGraph, light_protocol::{ common::{FullPeerState, LedgerInfo, Peers}, + error::*, message::{msgid, GetTxInfos, TxInfo}, - Error, ErrorKind, }, message::{Message, RequestId}, network::NetworkContext, @@ -32,6 +32,7 @@ use crate::{ }, UniqueId, }; +use futures::future::FutureExt; use network::node_table::NodeId; #[derive(Debug)] @@ -47,6 +48,8 @@ type MissingTxInfo = TimeOrdered; // FIXME: struct pub type TxInfoValidated = (SignedTransaction, Receipt, TransactionIndex, U256); +type PendingTxInfo = PendingItem; + pub struct TxInfos { // helper API for retrieving ledger information ledger: LedgerInfo, @@ -58,7 +61,7 @@ pub struct TxInfos { sync_manager: SyncManager, // block txs received from full node - verified: Arc>>>, + verified: Arc>>, // witness sync manager pub witnesses: Arc, @@ -97,8 +100,10 @@ impl TxInfos { #[inline] pub fn request_now( &self, io: &dyn NetworkContext, hash: H256, - ) -> impl Future { - if !self.verified.read().contains_key(&hash) { + ) -> impl Future> { + let mut verified = self.verified.write(); + + if !verified.contains_key(&hash) { let missing = std::iter::once(MissingTxInfo::new(hash)); self.sync_manager.request_now(missing, |peer, hashes| { @@ -106,14 +111,20 @@ impl TxInfos { }); } + verified + .entry(hash) + .or_insert(PendingItem::pending()) + .clear_error(); + FutureItem::new(hash, self.verified.clone()) + .map(|res| res.map_err(|e| e.into())) } #[inline] pub fn receive( &self, peer: &NodeId, id: RequestId, infos: impl Iterator, - ) -> Result<(), Error> + ) -> Result<()> { for info in infos { debug!("Validating tx_info {:?}", info); @@ -132,7 +143,29 @@ impl TxInfos { } #[inline] - pub fn validate_and_store(&self, info: TxInfo) -> Result<(), Error> { + fn validate_and_store(&self, info: TxInfo) -> Result<()> { + let tx_hash = info.tx.hash(); + + // validate bloom + if let Err(e) = self.validate_and_store_tx_info(info) { + // forward error to both rpc caller(s) and sync handler + // so we need to make it clonable + let e = ClonableError::from(e); + + self.verified + .write() + .entry(tx_hash) + .or_insert(PendingItem::pending()) + .set_error(e.clone()); + + bail!(e); + } + + Ok(()) + } + + #[inline] + fn validate_and_store_tx_info(&self, info: TxInfo) -> Result<()> { let TxInfo { epoch, @@ -156,35 +189,41 @@ impl TxInfos { // quick check for well-formedness if block_index_in_epoch >= num_blocks_in_epoch { - debug!( - "Inconsisent block index: {} >= {}", - block_index_in_epoch, num_blocks_in_epoch - ); - return Err(ErrorKind::InvalidTxInfo.into()); + bail!(ErrorKind::InvalidTxInfo { + reason: format!( + "Inconsisent block index: {} >= {}", + block_index_in_epoch, num_blocks_in_epoch + ) + }); } if tx_index_in_block >= num_txs_in_block { - debug!( - "Inconsisent tx index: {} >= {}", - tx_index_in_block, num_txs_in_block - ); - return Err(ErrorKind::InvalidTxInfo.into()); + bail!(ErrorKind::InvalidTxInfo { + reason: format!( + "Inconsisent tx index: {} >= {}", + tx_index_in_block, num_txs_in_block + ) + }); } // only executed instances of the transaction are acceptable; // receipts belonging to non-executed instances should not be sent if receipt.outcome_status != 0 && receipt.outcome_status != 1 { - debug!( - "Unexpected outcome status in tx info: {}", - receipt.outcome_status - ); - return Err(ErrorKind::InvalidTxInfo.into()); + bail!(ErrorKind::InvalidTxInfo { + reason: format!( + "Unexpected outcome status in tx info: {}", + receipt.outcome_status + ) + }); } let block_hash = match self.ledger.block_hashes_in(epoch)? { hs if hs.len() != num_blocks_in_epoch => { - debug!("Number of blocks in epoch mismatch: local = {}, received = {}", hs.len(), num_blocks_in_epoch); - return Err(ErrorKind::InvalidTxInfo.into()); + bail!(ErrorKind::InvalidTxInfo { + reason: format!( + "Number of blocks in epoch mismatch: local = {}, received = {}", + hs.len(), num_blocks_in_epoch), + }); } hs => hs[block_index_in_epoch], }; @@ -215,20 +254,16 @@ impl TxInfos { tx_hash, &tx_proof, ) { - debug!("Transaction proof verification failed"); - return Err(ErrorKind::InvalidTxInfo.into()); + bail!(ErrorKind::InvalidTxInfo { + reason: "Transaction proof verification failed".to_owned() + }); } // verify receipt proof let verified_epoch_receipts_root = match self.witnesses.root_hashes_of(epoch) { Some((_, receipts_root, _)) => receipts_root, - None => { - // TODO(thegaram): signal to RPC layer that the - // corresponding roots are not available yet - warn!("Receipt root not found, epoch={}", epoch,); - return Err(ErrorKind::InternalError.into()); - } + None => bail!(ErrorKind::WitnessUnavailable { epoch }), }; trace!( @@ -261,8 +296,9 @@ impl TxInfos { &receipt, &receipt_proof, ) { - debug!("Receipt proof verification failed"); - return Err(ErrorKind::InvalidTxInfo.into()); + bail!(ErrorKind::InvalidTxInfo { + reason: "Receipt proof verification failed".to_owned() + }); } // find prior gas used @@ -288,8 +324,10 @@ impl TxInfos { &prev_receipt, &prev_receipt_proof, ) { - debug!("Receipt proof verification failed"); - return Err(ErrorKind::InvalidTxInfo.into()); + bail!(ErrorKind::InvalidTxInfo { + reason: "Previous receipt proof verification failed" + .to_owned() + }); } prev_receipt.accumulated_gas_used @@ -297,17 +335,17 @@ impl TxInfos { // not the first receipt but no previous receipt was provided (_, maybe_prev_receipt, maybe_prev_receipt_proof) => { - debug!( - "Expected two receipts; received one. - tx_index_in_block = {:?}, - maybe_prev_receipt = {:?}, - maybe_prev_receipt_proof = {:?}", - tx_index_in_block, - maybe_prev_receipt, - maybe_prev_receipt_proof - ); - - return Err(ErrorKind::InvalidTxInfo.into()); + bail!(ErrorKind::InvalidTxInfo { + reason: format!( + "Expected two receipts; received one. + tx_index_in_block = {:?}, + maybe_prev_receipt = {:?}, + maybe_prev_receipt_proof = {:?}", + tx_index_in_block, + maybe_prev_receipt, + maybe_prev_receipt_proof + ) + }); } }; @@ -336,7 +374,7 @@ impl TxInfos { #[inline] fn send_request( &self, io: &dyn NetworkContext, peer: &NodeId, hashes: Vec, - ) -> Result, Error> { + ) -> Result> { debug!("send_request peer={:?} hashes={:?}", peer, hashes); if hashes.is_empty() { diff --git a/core/src/light_protocol/handler/sync/txs.rs b/core/src/light_protocol/handler/sync/txs.rs index 33d655492f..606f4ab918 100644 --- a/core/src/light_protocol/handler/sync/txs.rs +++ b/core/src/light_protocol/handler/sync/txs.rs @@ -13,8 +13,8 @@ use std::{future::Future, sync::Arc}; use crate::{ light_protocol::{ common::{FullPeerState, Peers}, + error::*, message::{msgid, GetTxs}, - Error, ErrorKind, }, message::{Message, RequestId}, network::NetworkContext, @@ -26,6 +26,7 @@ use crate::{ }; use super::common::{FutureItem, PendingItem, SyncManager, TimeOrdered}; +use futures::future::FutureExt; use network::node_table::NodeId; #[derive(Debug)] @@ -38,6 +39,8 @@ struct Statistics { // prioritize earlier requests type MissingTx = TimeOrdered; +type PendingTx = PendingItem; + pub struct Txs { // series of unique request ids request_id_allocator: Arc, @@ -46,7 +49,7 @@ pub struct Txs { sync_manager: SyncManager, // txs received from full node - verified: Arc>>>, + verified: Arc>>, } impl Txs { @@ -77,8 +80,10 @@ impl Txs { #[inline] pub fn request_now( &self, io: &dyn NetworkContext, hash: H256, - ) -> impl Future { - if !self.verified.read().contains_key(&hash) { + ) -> impl Future> { + let mut verified = self.verified.write(); + + if !verified.contains_key(&hash) { let missing = std::iter::once(MissingTx::new(hash)); self.sync_manager.request_now(missing, |peer, hashes| { @@ -86,14 +91,20 @@ impl Txs { }); } + verified + .entry(hash) + .or_insert(PendingItem::pending()) + .clear_error(); + FutureItem::new(hash, self.verified.clone()) + .map(|res| res.map_err(|e| e.into())) } #[inline] pub fn receive( &self, peer: &NodeId, id: RequestId, txs: impl Iterator, - ) -> Result<(), Error> + ) -> Result<()> { for tx in txs { let hash = tx.hash(); @@ -109,11 +120,23 @@ impl Txs { } #[inline] - pub fn validate_and_store( - &self, tx: SignedTransaction, - ) -> Result<(), Error> { + fn validate_and_store(&self, tx: SignedTransaction) -> Result<()> { let hash = tx.hash(); - self.validate_tx(&tx)?; + + // validate tx + if let Err(e) = self.validate_tx(&tx) { + // forward error to both rpc caller(s) and sync handler + // so we need to make it clonable + let e = ClonableError::from(e); + + self.verified + .write() + .entry(hash) + .or_insert(PendingItem::pending()) + .set_error(e.clone()); + + bail!(e); + } self.verified .write() @@ -139,7 +162,7 @@ impl Txs { #[inline] fn send_request( &self, io: &dyn NetworkContext, peer: &NodeId, hashes: Vec, - ) -> Result, Error> { + ) -> Result> { debug!("send_request peer={:?} hashes={:?}", peer, hashes); if hashes.is_empty() { @@ -165,12 +188,12 @@ impl Txs { } #[inline] - fn validate_tx(&self, tx: &SignedTransaction) -> Result<(), Error> { + pub fn validate_tx(&self, tx: &SignedTransaction) -> Result<()> { match tx.verify_public(false /* skip */) { Ok(true) => {} _ => { warn!("Tx signature verification failed for {:?}", tx); - return Err(ErrorKind::InvalidTxSignature.into()); + bail!(ErrorKind::InvalidTxSignature { hash: tx.hash() }); } } diff --git a/core/src/light_protocol/handler/sync/witnesses.rs b/core/src/light_protocol/handler/sync/witnesses.rs index c518860952..b97709ee12 100644 --- a/core/src/light_protocol/handler/sync/witnesses.rs +++ b/core/src/light_protocol/handler/sync/witnesses.rs @@ -303,7 +303,10 @@ impl Witnesses { Some(w) => w, None => { warn!("Unable to get witness!"); - return Err(ErrorKind::InternalError.into()); + bail!(ErrorKind::InternalError(format!( + "Witness at height {} is not available", + height + ))); } }; diff --git a/core/src/light_protocol/provider.rs b/core/src/light_protocol/provider.rs index 47b2a2353f..70ee80c3ac 100644 --- a/core/src/light_protocol/provider.rs +++ b/core/src/light_protocol/provider.rs @@ -17,6 +17,7 @@ use crate::{ partition_results, validate_chain_id, LedgerInfo, LightPeerState, Peers, }, + error::*, handle_error, message::{ msgid, BlockHashes as GetBlockHashesResponse, @@ -36,9 +37,8 @@ use crate::{ TxInfos as GetTxInfosResponse, Txs as GetTxsResponse, WitnessInfo as GetWitnessInfoResponse, }, - Error, ErrorKind, LIGHT_PROTOCOL_ID, - LIGHT_PROTOCOL_OLD_VERSIONS_TO_SUPPORT, LIGHT_PROTOCOL_VERSION, - LIGHT_PROTO_V1, + LIGHT_PROTOCOL_ID, LIGHT_PROTOCOL_OLD_VERSIONS_TO_SUPPORT, + LIGHT_PROTOCOL_VERSION, LIGHT_PROTO_V1, }, message::{decode_msg, decode_rlp_and_check_deprecation, Message, MsgId}, network::{ @@ -121,7 +121,7 @@ impl Provider { pub fn register( self: &Arc, network: Arc, - ) -> Result<(), String> { + ) -> std::result::Result<(), String> { network .register_protocol( self.clone(), @@ -136,27 +136,27 @@ impl Provider { #[inline] fn get_existing_peer_state( &self, peer: &NodeId, - ) -> Result>, Error> { + ) -> Result>> { match self.peers.get(peer) { Some(state) => Ok(state), None => { // NOTE: this should not happen as we register // all peers in `on_peer_connected` - error!("Received message from unknown peer={:?}", peer); - bail!(ErrorKind::InternalError) + bail!(ErrorKind::InternalError(format!( + "Received message from unknown peer={:?}", + peer + ))) } } } #[inline] - fn peer_version(&self, peer: &NodeId) -> Result { + fn peer_version(&self, peer: &NodeId) -> Result { Ok(self.get_existing_peer_state(peer)?.read().protocol_version) } #[inline] - fn validate_peer_state( - &self, peer: &NodeId, msg_id: MsgId, - ) -> Result<(), Error> { + fn validate_peer_state(&self, peer: &NodeId, msg_id: MsgId) -> Result<()> { let state = self.get_existing_peer_state(&peer)?; if msg_id != msgid::STATUS_PING_DEPRECATED @@ -164,7 +164,13 @@ impl Provider { && !state.read().handshake_completed { warn!("Received msg={:?} from handshaking peer={:?}", msg_id, peer); - bail!(ErrorKind::UnexpectedMessage); + bail!(ErrorKind::UnexpectedMessage { + expected: vec![ + msgid::STATUS_PING_DEPRECATED, + msgid::STATUS_PING_V2 + ], + received: msg_id, + }); } Ok(()) @@ -173,7 +179,7 @@ impl Provider { #[rustfmt::skip] fn dispatch_message( &self, io: &dyn NetworkContext, peer: &NodeId, msg_id: MsgId, rlp: Rlp, - ) -> Result<(), Error> { + ) -> Result<()> { trace!("Dispatching message: peer={:?}, msg_id={:?}", peer, msg_id); self.validate_peer_state(peer, msg_id)?; let min_supported_ver = self.minimum_supported_version(); @@ -194,7 +200,7 @@ impl Provider { msgid::GET_BLOCK_TXS => self.on_get_block_txs(io, peer, decode_rlp_and_check_deprecation(&rlp, min_supported_ver, protocol)?), msgid::GET_TX_INFOS => self.on_get_tx_infos(io, peer, decode_rlp_and_check_deprecation(&rlp, min_supported_ver, protocol)?), msgid::GET_STORAGE_ROOTS => self.on_get_storage_roots(io, peer, decode_rlp_and_check_deprecation(&rlp, min_supported_ver, protocol)?), - _ => bail!(ErrorKind::UnknownMessage), + _ => bail!(ErrorKind::UnknownMessage{id: msg_id}), } } @@ -218,20 +224,18 @@ impl Provider { } #[inline] - fn tx_info_by_hash(&self, hash: H256) -> Result { + fn tx_info_by_hash(&self, hash: H256) -> Result { let (tx, tx_index, receipt) = match self.consensus.get_transaction_info_by_hash(&hash) { None => { - bail!(ErrorKind::UnableToProduceTxInfo(format!( - "Unable to get tx info for {:?}", - hash - ))); + bail!(ErrorKind::UnableToProduceTxInfo { + reason: format!("Unable to get tx info for {:?}", hash) + }); } Some((_, _, None)) => { - bail!(ErrorKind::UnableToProduceTxInfo(format!( - "Unable to get receipt for {:?}", - hash - ))); + bail!(ErrorKind::UnableToProduceTxInfo { + reason: format!("Unable to get receipt for {:?}", hash) + }); } Some((tx, tx_index, Some((receipt, _)))) => { assert_eq!(tx.hash(), hash); // sanity check @@ -240,17 +244,7 @@ impl Provider { }; let block_hash = tx_index.block_hash; - - let block = match self.ledger.block(block_hash) { - Ok(b) => b, - Err(e) => { - bail!(ErrorKind::UnableToProduceTxInfo(format!( - "Unable to retrieve block {:?}: {}", - block_hash, e - ))); - } - }; - + let block = self.ledger.block(block_hash)?; let tx_index_in_block = tx_index.index; let num_txs_in_block = block.transactions.len(); @@ -260,20 +254,24 @@ impl Provider { let epoch = match self.consensus.get_block_epoch_number(&block_hash) { Some(epoch) => epoch, None => { - bail!(ErrorKind::UnableToProduceTxInfo(format!( - "Unable to get epoch number for block {:?}", - block_hash - ))); + bail!(ErrorKind::UnableToProduceTxInfo { + reason: format!( + "Unable to get epoch number for block {:?}", + block_hash + ) + }); } }; let epoch_hashes = match self.ledger.block_hashes_in(epoch) { Ok(hs) => hs, Err(e) => { - bail!(ErrorKind::UnableToProduceTxInfo(format!( - "Unable to find epoch hashes for {}: {}", - epoch, e - ))); + bail!(ErrorKind::UnableToProduceTxInfo { + reason: format!( + "Unable to find epoch hashes for {}: {}", + epoch, e + ) + }); } }; @@ -283,22 +281,22 @@ impl Provider { match epoch_hashes.iter().position(|h| *h == block_hash) { Some(id) => id, None => { - bail!(ErrorKind::UnableToProduceTxInfo(format!( - "Unable to find {:?} in epoch {}", - block_hash, epoch - ))); + bail!(ErrorKind::UnableToProduceTxInfo { + reason: format!( + "Unable to find {:?} in epoch {}", + block_hash, epoch + ) + }); } }; - let epoch_receipts = match self.ledger.receipts_of(epoch) { - Ok(rs) => rs.iter().cloned().map(Arc::new).collect::>(), - Err(e) => { - bail!(ErrorKind::UnableToProduceTxInfo(format!( - "Unable to retrieve receipts for {}: {}", - epoch, e - ))); - } - }; + let epoch_receipts = self + .ledger + .receipts_of(epoch)? + .iter() + .cloned() + .map(Arc::new) + .collect::>(); let epoch_receipt_proof = compute_epoch_receipt_proof( &epoch_receipts, @@ -348,7 +346,7 @@ impl Provider { fn send_status( &self, io: &dyn NetworkContext, peer: &NodeId, - ) -> Result<(), Error> { + ) -> Result<()> { let best_info = self.consensus.best_info(); let genesis_hash = self.graph.data_man.true_genesis.hash(); @@ -378,44 +376,40 @@ impl Provider { } #[inline] - fn validate_peer_type(&self, node_type: &NodeType) -> Result<(), Error> { + fn validate_peer_type(&self, node_type: NodeType) -> Result<()> { match node_type { NodeType::Light => Ok(()), - _ => bail!(ErrorKind::UnexpectedPeerType), + _ => bail!(ErrorKind::UnexpectedPeerType { node_type }), } } #[inline] - fn validate_genesis_hash(&self, genesis: H256) -> Result<(), Error> { - match self.graph.data_man.true_genesis.hash() { - h if h == genesis => Ok(()), - h => { - debug!( - "Genesis mismatch (ours: {:?}, theirs: {:?})", - h, genesis - ); - bail!(ErrorKind::GenesisMismatch) - } + fn validate_genesis_hash(&self, genesis: H256) -> Result<()> { + let ours = self.graph.data_man.true_genesis.hash(); + let theirs = genesis; + + if ours != theirs { + bail!(ErrorKind::GenesisMismatch { ours, theirs }); } + + Ok(()) } fn on_status_v2( &self, io: &dyn NetworkContext, peer: &NodeId, status: StatusPingV2, - ) -> Result<(), Error> { + ) -> Result<()> { info!("on_status peer={:?} status={:?}", peer, status); self.throttle(peer, &status)?; - self.validate_peer_type(&status.node_type)?; + self.validate_peer_type(status.node_type)?; self.validate_genesis_hash(status.genesis_hash)?; validate_chain_id( &self.consensus.get_config().chain_id, &status.chain_id, )?; - if let Err(e) = self.send_status(io, peer) { - warn!("Failed to send status to peer={:?}: {:?}", peer, e); - bail!(ErrorKind::SendStatusFailed); - }; + self.send_status(io, peer) + .chain_err(|| ErrorKind::SendStatusFailed { peer: *peer })?; let state = self.get_existing_peer_state(peer)?; let mut state = state.write(); @@ -426,7 +420,7 @@ impl Provider { fn on_status_deprecated( &self, io: &dyn NetworkContext, peer: &NodeId, status: StatusPingDeprecatedV1, - ) -> Result<(), Error> + ) -> Result<()> { self.on_status_v2( io, @@ -441,7 +435,7 @@ impl Provider { fn on_get_state_roots( &self, io: &dyn NetworkContext, peer: &NodeId, req: GetStateRoots, - ) -> Result<(), Error> { + ) -> Result<()> { debug!("on_get_state_roots req={:?}", req); self.throttle(peer, &req)?; let request_id = req.request_id; @@ -450,7 +444,7 @@ impl Provider { .epochs .into_iter() .take(MAX_ITEMS_TO_SEND) - .map::, _>(|epoch| { + .map::, _>(|epoch| { let state_root = self.ledger.state_root_of(epoch)?.state_root; Ok(StateRootWithEpoch { epoch, state_root }) }); @@ -470,7 +464,7 @@ impl Provider { Ok(()) } - fn state_entry(&self, key: StateKey) -> Result { + fn state_entry(&self, key: StateKey) -> Result { let snapshot_epoch_count = self.ledger.snapshot_epoch_count() as u64; // state root in current snapshot period @@ -501,7 +495,7 @@ impl Provider { fn on_get_state_entries( &self, io: &dyn NetworkContext, peer: &NodeId, req: GetStateEntries, - ) -> Result<(), Error> { + ) -> Result<()> { debug!("on_get_state_entries req={:?}", req); self.throttle(peer, &req)?; let request_id = req.request_id; @@ -533,7 +527,7 @@ impl Provider { fn on_get_block_hashes_by_epoch( &self, io: &dyn NetworkContext, peer: &NodeId, req: GetBlockHashesByEpoch, - ) -> Result<(), Error> + ) -> Result<()> { debug!("on_get_block_hashes_by_epoch req={:?}", req); self.throttle(peer, &req)?; @@ -565,7 +559,7 @@ impl Provider { fn on_get_block_headers( &self, io: &dyn NetworkContext, peer: &NodeId, req: GetBlockHeaders, - ) -> Result<(), Error> { + ) -> Result<()> { debug!("on_get_block_headers req={:?}", req); self.throttle(peer, &req)?; let request_id = req.request_id; @@ -574,7 +568,7 @@ impl Provider { .hashes .iter() .take(MAX_HEADERS_TO_SEND) - .map::, _>(|h| { + .map::, _>(|h| { self.graph .data_man .block_header_by_hash(&h) @@ -605,7 +599,7 @@ impl Provider { fn on_send_raw_tx( &self, _io: &dyn NetworkContext, peer: &NodeId, req: SendRawTx, - ) -> Result<(), Error> { + ) -> Result<()> { debug!("on_send_raw_tx req={:?}", req); self.throttle(peer, &req)?; let tx: TransactionWithSignature = rlp::decode(&req.raw)?; @@ -629,18 +623,17 @@ impl Provider { } _ => { // NOTE: this should not happen - error!( + bail!(ErrorKind::InternalError(format!( "insert_new_transactions failed: {:?}, {:?}", passed, failed - ); - bail!(ErrorKind::InternalError) + ))) } } } fn on_get_receipts( &self, io: &dyn NetworkContext, peer: &NodeId, req: GetReceipts, - ) -> Result<(), Error> { + ) -> Result<()> { debug!("on_get_receipts req={:?}", req); self.throttle(peer, &req)?; let request_id = req.request_id; @@ -671,7 +664,7 @@ impl Provider { fn on_get_txs( &self, io: &dyn NetworkContext, peer: &NodeId, req: GetTxs, - ) -> Result<(), Error> { + ) -> Result<()> { debug!("on_get_txs req={:?}", req); self.throttle(peer, &req)?; let request_id = req.request_id; @@ -680,7 +673,7 @@ impl Provider { .hashes .into_iter() .take(MAX_TXS_TO_SEND) - .map::, _>(|h| { + .map::, _>(|h| { self.tx_by_hash(h).ok_or_else(|| { ErrorKind::Msg(format!("Tx {:?} not found", h)).into() }) @@ -701,7 +694,7 @@ impl Provider { fn on_get_witness_info( &self, io: &dyn NetworkContext, peer: &NodeId, req: GetWitnessInfo, - ) -> Result<(), Error> { + ) -> Result<()> { debug!("on_get_witness_info req={:?}", req); self.throttle(peer, &req)?; let request_id = req.request_id; @@ -727,7 +720,7 @@ impl Provider { fn on_get_blooms( &self, io: &dyn NetworkContext, peer: &NodeId, req: GetBlooms, - ) -> Result<(), Error> { + ) -> Result<()> { debug!("on_get_blooms req={:?}", req); self.throttle(peer, &req)?; let request_id = req.request_id; @@ -753,7 +746,7 @@ impl Provider { fn on_get_block_txs( &self, io: &dyn NetworkContext, peer: &NodeId, req: GetBlockTxs, - ) -> Result<(), Error> { + ) -> Result<()> { debug!("on_get_block_txs req={:?}", req); self.throttle(peer, &req)?; let request_id = req.request_id; @@ -762,7 +755,7 @@ impl Provider { .hashes .into_iter() .take(MAX_ITEMS_TO_SEND) - .map::, _>(|h| { + .map::, _>(|h| { let block = self.ledger.block(h)?; let block_txs = block @@ -795,7 +788,7 @@ impl Provider { fn on_get_tx_infos( &self, io: &dyn NetworkContext, peer: &NodeId, req: GetTxInfos, - ) -> Result<(), Error> { + ) -> Result<()> { debug!("on_get_tx_infos req={:?}", req); self.throttle(peer, &req)?; let request_id = req.request_id; @@ -819,9 +812,7 @@ impl Provider { Ok(()) } - fn storage_root( - &self, key: StorageRootKey, - ) -> Result { + fn storage_root(&self, key: StorageRootKey) -> Result { let snapshot_epoch_count = self.ledger.snapshot_epoch_count() as u64; // state root in current snapshot period @@ -852,7 +843,7 @@ impl Provider { fn on_get_storage_roots( &self, io: &dyn NetworkContext, peer: &NodeId, req: GetStorageRoots, - ) -> Result<(), Error> { + ) -> Result<()> { debug!("on_get_storage_roots req={:?}", req); self.throttle(peer, &req)?; let request_id = req.request_id; @@ -882,7 +873,7 @@ impl Provider { fn broadcast( &self, io: &dyn NetworkContext, mut peers: Vec, msg: &dyn Message, - ) -> Result<(), Error> + ) -> Result<()> { debug!("broadcast peers={:?}", peers); @@ -906,9 +897,7 @@ impl Provider { Ok(()) } - pub fn relay_hashes( - self: &Arc, hashes: Vec, - ) -> Result<(), Error> { + pub fn relay_hashes(self: &Arc, hashes: Vec) -> Result<()> { debug!("relay_hashes hashes={:?}", hashes); if hashes.is_empty() { @@ -919,8 +908,9 @@ impl Provider { let network = match self.network.upgrade() { Some(network) => network, None => { - error!("Network unavailable, not relaying hashes"); - bail!(ErrorKind::InternalError); + bail!(ErrorKind::InternalError( + "Network unavailable, not relaying hashes".to_owned() + )); } }; @@ -937,9 +927,7 @@ impl Provider { Ok(()) } - fn throttle( - &self, peer: &NodeId, msg: &T, - ) -> Result<(), Error> { + fn throttle(&self, peer: &NodeId, msg: &T) -> Result<()> { let peer = self.get_existing_peer_state(peer)?; let bucket_name = msg.msg_name().to_string(); @@ -990,7 +978,7 @@ impl NetworkProtocolHandler for Provider { io, peer, msgid::INVALID, - ErrorKind::InvalidMessageFormat.into(), + &ErrorKind::InvalidMessageFormat.into(), ) } }; @@ -998,7 +986,7 @@ impl NetworkProtocolHandler for Provider { debug!("on_message: peer={:?}, msgid={:?}", peer, msg_id); if let Err(e) = self.dispatch_message(io, peer, msg_id.into(), rlp) { - handle_error(io, peer, msg_id.into(), e); + handle_error(io, peer, msg_id.into(), &e); } } diff --git a/core/src/light_protocol/query_service.rs b/core/src/light_protocol/query_service.rs index 7b95385f17..db30e9b189 100644 --- a/core/src/light_protocol/query_service.rs +++ b/core/src/light_protocol/query_service.rs @@ -48,8 +48,10 @@ type TxInfo = ( // Because of this, our RPC runtime cannot handle tokio@0.2 timing primitives. // As a temporary workaround, we use the old `tokio_timer::Timeout` instead. async fn with_timeout( - d: Duration, msg: String, fut: impl Future + Send + Sync, -) -> Result { + d: Duration, msg: String, + fut: impl Future> + Send + Sync, +) -> Result +{ // convert `fut` into futures@0.1 let fut = fut.unit_error().boxed().compat(); @@ -63,7 +65,7 @@ async fn with_timeout( // set error message with_timeout .await - .map_err(|_| ErrorKind::Timeout(msg).into()) + .map_err(|_| Error::from(ErrorKind::Timeout(msg)))? } pub struct QueryService { diff --git a/core/src/sync/node_type.rs b/core/src/sync/node_type.rs index 61fa213b32..871f007d44 100644 --- a/core/src/sync/node_type.rs +++ b/core/src/sync/node_type.rs @@ -5,7 +5,7 @@ use malloc_size_of_derive::MallocSizeOf as DeriveMallocSizeOf; use rlp::{Decodable, DecoderError, Encodable, Rlp, RlpStream}; -#[derive(Clone, Debug, PartialEq, DeriveMallocSizeOf)] +#[derive(Clone, Copy, Debug, PartialEq, DeriveMallocSizeOf)] #[repr(u8)] pub enum NodeType { Archive,