diff --git a/.gitignore b/.gitignore index c17e43995f8..3aa0264ee2f 100644 --- a/.gitignore +++ b/.gitignore @@ -49,3 +49,4 @@ result /test/ /iroha-java/ /lcov.info +**/test-smartcontracts/ diff --git a/README.md b/README.md index 457197f1d55..d89ee642b3e 100644 --- a/README.md +++ b/README.md @@ -87,6 +87,13 @@ bash ./scripts/test_env.sh setup bash ./scripts/tests/register_mint_quantity.sh bash ./scripts/test_env.sh cleanup ``` +To generate WASM files for smart contracts, use the provided script `generate_wasm.sh`. If you are in the root directory of iroha run the following command: + +```bash +bash ./scripts/generate_wasm.sh [path/to/smartcontracts] +``` + +The generated WASM files will be saved in a generated directory `test-smartcontracts`, relative to your current working directory. The default path for smart contracts in this project is `client/tests/integration/smartcontracts`. diff --git a/cli/src/lib.rs b/cli/src/lib.rs index 18ceae7caac..6de12024e6a 100644 --- a/cli/src/lib.rs +++ b/cli/src/lib.rs @@ -128,8 +128,11 @@ impl NetworkRelay { } match msg { - SumeragiPacket(data) => { - self.sumeragi.incoming_message(*data); + SumeragiBlock(data) => { + self.sumeragi.incoming_block_message(*data); + } + SumeragiControlFlow(data) => { + self.sumeragi.incoming_control_flow_message(*data); } BlockSync(data) => self.block_sync.message(*data).await, TransactionGossiper(data) => self.gossiper.gossip(*data).await, diff --git a/client/tests/integration/connected_peers.rs b/client/tests/integration/connected_peers.rs index 20221dd0d2a..6cc1df7cf26 100644 --- a/client/tests/integration/connected_peers.rs +++ b/client/tests/integration/connected_peers.rs @@ -95,6 +95,8 @@ fn connected_peers_with_f(faults: u64, start_port: Option) -> Result<()> { thread::sleep(pipeline_time * 2); // Wait for some time to allow peers to connect let (removed_peer, removed_peer_client) = peer_clients.remove(removed_peer_idx); + thread::sleep(pipeline_time * 2); // Wait for some time to allow peers to disconnect + check_status(&peer_clients, 2); let status = removed_peer_client.get_status()?; // Peer might have been disconnected before getting the block diff --git a/client_cli/pytests/src/client_cli/client_cli.py b/client_cli/pytests/src/client_cli/client_cli.py index f715f31b246..cdd33b58c59 100644 --- a/client_cli/pytests/src/client_cli/client_cli.py +++ b/client_cli/pytests/src/client_cli/client_cli.py @@ -166,8 +166,9 @@ def asset(self, asset_definition=None, account=None, value_of_value_type=None): """ self.command.insert(3, 'asset') if asset_definition and account and value_of_value_type: - self.command.append('--account=' + account.name + '@' + asset_definition.domain) - self.command.append('--asset=' + repr(asset_definition)) + self.command.append( + '--asset-id=' + + asset_definition.name + '#' + account.domain + '#' + account.name + '@' + asset_definition.domain) self.command.append( '--' + asset_definition.value_type.lower() + '=' + value_of_value_type) self.execute() @@ -190,9 +191,9 @@ def transfer(self, asset, source_account, target_account, quantity: str): """ self.command.append('asset') self.command.append('transfer') - self.command.append('--from=' + repr(source_account)) self.command.append('--to=' + repr(target_account)) - self.command.append('--asset-id=' + repr(asset)) + self.command.append( + '--asset-id=' + asset.name + '#' + source_account.domain + '#' + source_account.name + '@' + asset.domain) self.command.append('--quantity=' + quantity) self.execute() return self @@ -211,8 +212,8 @@ def burn(self, account, asset, quantity: str): """ self.command.append('asset') self.command.append('burn') - self.command.append('--account=' + repr(account)) - self.command.append('--asset=' + repr(asset)) + self.command.append( + '--asset-id=' + asset.name + '#' + account.domain + '#' + account.name + '@' + asset.domain) self.command.append('--quantity=' + quantity) self.execute() return self @@ -230,7 +231,7 @@ def definition(self, asset: str, domain: str, value_type: str): :return: The current ClientCli object. :rtype: ClientCli """ - self.command.append('--id=' + asset + '#' + domain) + self.command.append('--definition-id=' + asset + '#' + domain) self.command.append('--value-type=' + value_type) self.execute() return self diff --git a/client_cli/src/main.rs b/client_cli/src/main.rs index 2084d35d562..0238fdc32c6 100644 --- a/client_cli/src/main.rs +++ b/client_cli/src/main.rs @@ -795,9 +795,9 @@ mod asset { /// Register subcommand of asset #[derive(clap::Args, Debug)] pub struct Register { - /// Asset id for registering (in form of `name#domain_name') - #[arg(short, long)] - pub id: AssetDefinitionId, + /// Asset definition id for registering (in form of `asset#domain_name`) + #[arg(long)] + pub definition_id: AssetDefinitionId, /// Mintability of asset #[arg(short, long)] pub unmintable: bool, @@ -811,16 +811,16 @@ mod asset { impl RunArgs for Register { fn run(self, context: &mut dyn RunContext) -> Result<()> { let Self { - id, + definition_id, value_type, unmintable, metadata, } = self; let mut asset_definition = match value_type { - AssetValueType::Quantity => AssetDefinition::quantity(id), - AssetValueType::BigQuantity => AssetDefinition::big_quantity(id), - AssetValueType::Fixed => AssetDefinition::fixed(id), - AssetValueType::Store => AssetDefinition::store(id), + AssetValueType::Quantity => AssetDefinition::quantity(definition_id), + AssetValueType::BigQuantity => AssetDefinition::big_quantity(definition_id), + AssetValueType::Fixed => AssetDefinition::fixed(definition_id), + AssetValueType::Store => AssetDefinition::store(definition_id), }; if unmintable { asset_definition = asset_definition.mintable_once(); @@ -835,12 +835,9 @@ mod asset { /// Command for minting asset in existing Iroha account #[derive(clap::Args, Debug)] pub struct Mint { - /// Account id where asset is stored (in form of `name@domain_name') - #[arg(long)] - pub account: AccountId, - /// Asset id from which to mint (in form of `name#domain_name') + /// Asset id for the asset (in form of `asset##account@domain_name`) #[arg(long)] - pub asset: AssetDefinitionId, + pub asset_id: AssetId, /// Quantity to mint #[arg(short, long)] pub quantity: u32, @@ -851,15 +848,12 @@ mod asset { impl RunArgs for Mint { fn run(self, context: &mut dyn RunContext) -> Result<()> { let Self { - account, - asset, + asset_id, quantity, metadata, } = self; - let mint_asset = iroha_client::data_model::isi::Mint::asset_quantity( - quantity, - AssetId::new(asset, account), - ); + let mint_asset = + iroha_client::data_model::isi::Mint::asset_quantity(quantity, asset_id); submit([mint_asset], metadata.load()?, context) .wrap_err("Failed to mint asset of type `NumericValue::U32`") } @@ -868,12 +862,9 @@ mod asset { /// Command for minting asset in existing Iroha account #[derive(clap::Args, Debug)] pub struct Burn { - /// Account id where asset is stored (in form of `name@domain_name') + /// Asset id for the asset (in form of `asset##account@domain_name`) #[arg(long)] - pub account: AccountId, - /// Asset id from which to mint (in form of `name#domain_name') - #[arg(long)] - pub asset: AssetDefinitionId, + pub asset_id: AssetId, /// Quantity to mint #[arg(short, long)] pub quantity: u32, @@ -884,15 +875,12 @@ mod asset { impl RunArgs for Burn { fn run(self, context: &mut dyn RunContext) -> Result<()> { let Self { - account, - asset, + asset_id, quantity, metadata, } = self; - let burn_asset = iroha_client::data_model::isi::Burn::asset_quantity( - quantity, - AssetId::new(asset, account), - ); + let burn_asset = + iroha_client::data_model::isi::Burn::asset_quantity(quantity, asset_id); submit([burn_asset], metadata.load()?, context) .wrap_err("Failed to burn asset of type `NumericValue::U32`") } @@ -901,15 +889,12 @@ mod asset { /// Transfer asset between accounts #[derive(clap::Args, Debug)] pub struct Transfer { - /// Account from which to transfer (in form `name@domain_name') - #[arg(short, long)] - pub from: AccountId, - /// Account to which to transfer (in form `name@domain_name') - #[arg(short, long)] + /// Account to which to transfer (in form `name@domain_name`) + #[arg(long)] pub to: AccountId, - /// Asset id to transfer (in form like `name#domain_name') - #[arg(short, long)] - pub asset_id: AssetDefinitionId, + /// Asset id to transfer (in form like `asset##account@domain_name`) + #[arg(long)] + pub asset_id: AssetId, /// Quantity of asset as number #[arg(short, long)] pub quantity: u32, @@ -920,17 +905,13 @@ mod asset { impl RunArgs for Transfer { fn run(self, context: &mut dyn RunContext) -> Result<()> { let Self { - from, to, asset_id, quantity, metadata, } = self; - let transfer_asset = iroha_client::data_model::isi::Transfer::asset_quantity( - AssetId::new(asset_id, from), - quantity, - to, - ); + let transfer_asset = + iroha_client::data_model::isi::Transfer::asset_quantity(asset_id, quantity, to); submit([transfer_asset], metadata.load()?, context).wrap_err("Failed to transfer asset") } } @@ -938,19 +919,15 @@ mod asset { /// Get info of asset #[derive(clap::Args, Debug)] pub struct Get { - /// Account where asset is stored (in form of `name@domain_name') - #[arg(long)] - pub account: AccountId, - /// Asset name to lookup (in form of `name#domain_name') + /// Asset id for the asset (in form of `asset##account@domain_name`) #[arg(long)] - pub asset: AssetDefinitionId, + pub asset_id: AssetId, } impl RunArgs for Get { fn run(self, context: &mut dyn RunContext) -> Result<()> { - let Self { account, asset } = self; + let Self { asset_id } = self; let iroha_client = Client::new(context.configuration())?; - let asset_id = AssetId::new(asset, account); let asset = iroha_client .request(asset::by_id(asset_id)) .wrap_err("Failed to get asset.")?; @@ -989,7 +966,7 @@ mod asset { #[derive(clap::Args, Debug)] pub struct SetKeyValue { - /// AssetId for the Store asset (in form of `asset##account@domain_name') + /// Asset id for the Store asset (in form of `asset##account@domain_name`) #[clap(long)] pub asset_id: AssetId, /// The key for the store value @@ -1021,7 +998,7 @@ mod asset { } #[derive(clap::Args, Debug)] pub struct RemoveKeyValue { - /// AssetId for the Store asset (in form of `asset##account@domain_name') + /// Asset id for the Store asset (in form of `asset##account@domain_name`) #[clap(long)] pub asset_id: AssetId, /// The key for the store value @@ -1040,7 +1017,7 @@ mod asset { #[derive(clap::Args, Debug)] pub struct GetKeyValue { - /// AssetId for the Store asset (in form of `asset##account@domain_name') + /// Asset id for the Store asset (in form of `asset##account@domain_name`) #[clap(long)] pub asset_id: AssetId, /// The key for the store value diff --git a/core/src/block_sync.rs b/core/src/block_sync.rs index d2d150d102e..2ff4ffe05ca 100644 --- a/core/src/block_sync.rs +++ b/core/src/block_sync.rs @@ -129,7 +129,6 @@ impl BlockSynchronizer { pub mod message { //! Module containing messages for [`BlockSynchronizer`](super::BlockSynchronizer). use super::*; - use crate::sumeragi::view_change::ProofChain; /// Get blocks after some block #[derive(Debug, Clone, Decode, Encode)] @@ -234,12 +233,11 @@ pub mod message { } } Message::ShareBlocks(ShareBlocks { blocks, .. }) => { - use crate::sumeragi::message::{Message, MessagePacket}; + use crate::sumeragi::message::BlockMessage; for block in blocks.clone() { - block_sync.sumeragi.incoming_message(MessagePacket::new( - ProofChain::default(), - Some(Message::BlockSyncUpdate(block.into())), - )); + block_sync + .sumeragi + .incoming_block_message(BlockMessage::BlockSyncUpdate(block.into())); } } } diff --git a/core/src/lib.rs b/core/src/lib.rs index 4b2850ea514..50b47920da2 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -25,8 +25,9 @@ use parity_scale_codec::{Decode, Encode}; use tokio::sync::broadcast; use crate::{ - block_sync::message::Message as BlockSyncMessage, prelude::*, - sumeragi::message::MessagePacket as SumeragiPacket, + block_sync::message::Message as BlockSyncMessage, + prelude::*, + sumeragi::message::{BlockMessage, ControlFlowMessage}, }; /// The interval at which sumeragi checks if there are tx in the `queue`. @@ -59,8 +60,10 @@ pub type EventsSender = broadcast::Sender; /// The network message #[derive(Clone, Debug, Encode, Decode)] pub enum NetworkMessage { - /// Blockchain message - SumeragiPacket(Box), + /// Blockchain concensus data message + SumeragiBlock(Box), + /// Blockchain concensus control flow message + SumeragiControlFlow(Box), /// Block sync message BlockSync(Box), /// Transaction gossiper message diff --git a/core/src/sumeragi/main_loop.rs b/core/src/sumeragi/main_loop.rs index 97046f1d9ab..1fb04aa9e7b 100644 --- a/core/src/sumeragi/main_loop.rs +++ b/core/src/sumeragi/main_loop.rs @@ -43,7 +43,7 @@ pub struct Sumeragi { /// Receiver channel, for control flow messages. pub control_message_receiver: mpsc::Receiver, /// Receiver channel. - pub message_receiver: mpsc::Receiver, + pub message_receiver: mpsc::Receiver, /// Only used in testing. Causes the genesis peer to withhold blocks when it /// is the proxy tail. pub debug_force_soft_fork: bool, @@ -83,12 +83,13 @@ impl Sumeragi { /// # Errors /// Fails if network sending fails #[instrument(skip(self, packet))] - fn post_packet_to(&self, packet: MessagePacket, peer: &PeerId) { + fn post_packet_to(&self, packet: BlockMessage, peer: &PeerId) { if peer == &self.peer_id { return; } + let post = iroha_p2p::Post { - data: NetworkMessage::SumeragiPacket(Box::new(packet)), + data: NetworkMessage::SumeragiBlock(Box::new(packet)), peer_id: peer.clone(), }; self.network.post(post); @@ -97,7 +98,7 @@ impl Sumeragi { #[allow(clippy::needless_pass_by_value, single_use_lifetimes)] // TODO: uncomment when anonymous lifetimes are stable fn broadcast_packet_to<'peer_id>( &self, - msg: MessagePacket, + msg: BlockMessage, ids: impl IntoIterator + Send, ) { for peer_id in ids { @@ -105,9 +106,16 @@ impl Sumeragi { } } - fn broadcast_packet(&self, msg: MessagePacket) { + fn broadcast_packet(&self, msg: BlockMessage) { + let broadcast = iroha_p2p::Broadcast { + data: NetworkMessage::SumeragiBlock(Box::new(msg)), + }; + self.network.broadcast(broadcast); + } + + fn broadcast_control_flow_packet(&self, msg: ControlFlowMessage) { let broadcast = iroha_p2p::Broadcast { - data: NetworkMessage::SumeragiPacket(Box::new(msg)), + data: NetworkMessage::SumeragiControlFlow(Box::new(msg)), }; self.network.broadcast(broadcast); } @@ -140,48 +148,78 @@ impl Sumeragi { fn receive_network_packet( &self, view_change_proof_chain: &mut ProofChain, - control_message_in_a_row_counter: &mut usize, - ) -> Option { + ) -> (Option, bool) { const MAX_CONTROL_MSG_IN_A_ROW: usize = 25; - if *control_message_in_a_row_counter < MAX_CONTROL_MSG_IN_A_ROW { - *control_message_in_a_row_counter += 1; - self.control_message_receiver + let mut block_msg = None; + + let mut should_sleep = true; + for _ in 0..MAX_CONTROL_MSG_IN_A_ROW { + let maybe_block = block_msg.take().or_else(|| { + self.message_receiver + .try_recv() + .map_err(|recv_error| { + assert!( + recv_error != mpsc::TryRecvError::Disconnected, + "Sumeragi message pump disconnected. This is not a recoverable error." + ) + }) + .ok() + }); + + if let Ok(msg) = self.control_message_receiver .try_recv() .map_err(|recv_error| { assert!( recv_error != mpsc::TryRecvError::Disconnected, "Sumeragi control message pump disconnected. This is not a recoverable error." ) - }) - .ok() - .map(std::convert::Into::into) - } else { - None - }.or_else(|| { - *control_message_in_a_row_counter = 0; - self - .message_receiver - .try_recv() - .map_err(|recv_error| { - assert!( - recv_error != mpsc::TryRecvError::Disconnected, - "Sumeragi message pump disconnected. This is not a recoverable error." - ) - }) - .ok() - }) - .and_then(|packet : MessagePacket| { - if let Err(error) = view_change_proof_chain.merge( - packet.view_change_proofs, - &self.current_topology.ordered_peers, - self.current_topology.max_faults(), - self.wsv.latest_block_hash(), - ) { - trace!(%error, "Failed to add proofs into view change proof chain") + }) { + should_sleep = false; + if let Err(error) = view_change_proof_chain.merge( + msg.view_change_proofs, + &self.current_topology.ordered_peers, + self.current_topology.max_faults(), + self.wsv.latest_block_hash(), + ) { + trace!(%error, "Failed to add proofs into view change proof chain") + } + + let current_view_change_index = view_change_proof_chain.verify_with_state( + &self.current_topology.ordered_peers, + self.current_topology.max_faults(), + self.wsv.latest_block_hash(), + ) as u64; + + let mut should_prune = false; + + if let Some(msg) = block_msg.as_ref() { + let vc_index : Option = match msg { + BlockMessage::BlockCreated(bc) => Some(bc.block.payload().header.view_change_index), + BlockMessage::BlockSigned(_) => None, // Signed and Committed contain no block. + BlockMessage::BlockCommitted(_) => None, + BlockMessage::BlockSyncUpdate(_) => None, // Block sync updates are exempt from early pruning + }; + if let Some(vc_index) = vc_index { + if vc_index < current_view_change_index { + should_prune = true; + } + } + } + + block_msg = if should_prune { + None + } else { + maybe_block + }; + } else { + block_msg = maybe_block; + break; } - packet.message - }) + } + + should_sleep &= block_msg.is_none(); + (block_msg, should_sleep) } fn init_listen_for_genesis( @@ -198,42 +236,39 @@ impl Sumeragi { })?; match self.message_receiver.try_recv() { - Ok(packet) => { - if let Some(message) = packet.message { - let mut new_wsv = self.wsv.clone(); - - let block = match message { - Message::BlockCreated(BlockCreated { block }) - | Message::BlockSyncUpdate(BlockSyncUpdate { block }) => block, - msg => { - trace!(?msg, "Not handling the message, waiting for genesis..."); - continue; - } - }; + Ok(message) => { + let mut new_wsv = self.wsv.clone(); + + let block = match message { + BlockMessage::BlockCreated(BlockCreated { block }) + | BlockMessage::BlockSyncUpdate(BlockSyncUpdate { block }) => block, + msg => { + trace!(?msg, "Not handling the message, waiting for genesis..."); + continue; + } + }; - let block = match ValidBlock::validate( - block, - &self.current_topology, - &self.chain_id, - &mut new_wsv, - ) - .and_then(|block| { - block - .commit(&self.current_topology) - .map_err(|(block, error)| (block.into(), error)) - }) { - Ok(block) => block, - Err((_, error)) => { - error!(?error, "Received invalid genesis block"); - continue; - } - }; + let block = match ValidBlock::validate( + block, + &self.current_topology, + &self.chain_id, + &mut new_wsv, + ) + .and_then(|block| { + block + .commit(&self.current_topology) + .map_err(|(block, error)| (block.into(), error)) + }) { + Ok(block) => block, + Err((_, error)) => { + error!(?error, "Received invalid genesis block"); + continue; + } + }; - new_wsv.world_mut().trusted_peers_ids = - block.payload().commit_topology.clone(); - self.commit_block(block, new_wsv); - return Err(EarlyReturn::GenesisBlockReceivedAndCommitted); - } + new_wsv.world_mut().trusted_peers_ids = block.payload().commit_topology.clone(); + self.commit_block(block, new_wsv); + return Err(EarlyReturn::GenesisBlockReceivedAndCommitted); } Err(mpsc::TryRecvError::Disconnected) => return Err(EarlyReturn::Disconnected), _ => (), @@ -260,10 +295,7 @@ impl Sumeragi { .sign(self.key_pair.clone()) .expect("Genesis signing failed"); - let genesis_msg = MessagePacket::new( - ProofChain::default(), - Some(BlockCreated::from(genesis.clone()).into()), - ); + let genesis_msg = BlockCreated::from(genesis.clone()).into(); let genesis = genesis .commit(&self.current_topology) @@ -398,29 +430,6 @@ impl Sumeragi { Some(VotingBlock::new(signed_block, new_wsv)) } - fn suggest_view_change( - &self, - view_change_proof_chain: &mut ProofChain, - current_view_change_index: u64, - ) { - let suspect_proof = - ProofBuilder::new(self.wsv.latest_block_hash(), current_view_change_index) - .sign(self.key_pair.clone()) - .expect("Proof signing failed"); - - view_change_proof_chain - .insert_proof( - &self.current_topology.ordered_peers, - self.current_topology.max_faults(), - self.wsv.latest_block_hash(), - suspect_proof, - ) - .unwrap_or_else(|err| error!("{err}")); - - let msg = MessagePacket::new(view_change_proof_chain.clone(), None); - self.broadcast_packet(msg); - } - fn prune_view_change_proofs_and_calculate_current_index( &self, view_change_proof_chain: &mut ProofChain, @@ -436,10 +445,9 @@ impl Sumeragi { #[allow(clippy::too_many_lines)] fn handle_message( &mut self, - message: Message, + message: BlockMessage, voting_block: &mut Option, current_view_change_index: u64, - view_change_proof_chain: &mut ProofChain, voting_signatures: &mut Vec>, ) { let current_topology = &self.current_topology; @@ -448,7 +456,7 @@ impl Sumeragi { #[allow(clippy::suspicious_operation_groupings)] match (message, role) { - (Message::BlockSyncUpdate(BlockSyncUpdate { block }), _) => { + (BlockMessage::BlockSyncUpdate(BlockSyncUpdate { block }), _) => { let block_hash = block.hash(); info!(%addr, %role, hash=%block_hash, "Block sync update received"); @@ -501,7 +509,7 @@ impl Sumeragi { } } ( - Message::BlockCommitted(BlockCommitted { hash, signatures }), + BlockMessage::BlockCommitted(BlockCommitted { hash, signatures }), Role::Leader | Role::ValidatingPeer | Role::ProxyTail | Role::ObservingPeer, ) => { let is_consensus_required = current_topology.is_consensus_required().is_some(); @@ -536,7 +544,7 @@ impl Sumeragi { error!(%addr, %role, %hash, "Peer missing voting block") } } - (Message::BlockCreated(block_created), Role::ValidatingPeer) => { + (BlockMessage::BlockCreated(block_created), Role::ValidatingPeer) => { let current_topology = current_topology .is_consensus_required() .expect("Peer has `ValidatingPeer` role, which mean that current topology require consensus"); @@ -544,10 +552,7 @@ impl Sumeragi { if let Some(v_block) = self.vote_for_block(¤t_topology, block_created) { let block_hash = v_block.block.payload().hash(); - let msg = MessagePacket::new( - view_change_proof_chain.clone(), - Some(BlockSigned::from(v_block.block.clone()).into()), - ); + let msg = BlockSigned::from(v_block.block.clone()).into(); self.broadcast_packet_to(msg, [current_topology.proxy_tail()]); info!(%addr, %block_hash, "Block validated, signed and forwarded"); @@ -555,7 +560,7 @@ impl Sumeragi { *voting_block = Some(v_block); } } - (Message::BlockCreated(block_created), Role::ObservingPeer) => { + (BlockMessage::BlockCreated(block_created), Role::ObservingPeer) => { let current_topology = current_topology.is_consensus_required().expect( "Peer has `ObservingPeer` role, which mean that current topology require consensus", ); @@ -564,12 +569,10 @@ impl Sumeragi { if current_view_change_index >= 1 { let block_hash = v_block.block.payload().hash(); - let msg = MessagePacket::new( - view_change_proof_chain.clone(), - Some(BlockSigned::from(v_block.block.clone()).into()), + self.broadcast_packet_to( + BlockSigned::from(v_block.block.clone()).into(), + [current_topology.proxy_tail()], ); - - self.broadcast_packet_to(msg, [current_topology.proxy_tail()]); info!(%addr, %block_hash, "Block validated, signed and forwarded"); *voting_block = Some(v_block); } else { @@ -577,7 +580,7 @@ impl Sumeragi { } } } - (Message::BlockCreated(block_created), Role::ProxyTail) => { + (BlockMessage::BlockCreated(block_created), Role::ProxyTail) => { if let Some(mut new_block) = self.vote_for_block(current_topology, block_created) { // NOTE: Up until this point it was unknown which block is expected to be received, // therefore all the signatures (of any hash) were collected and will now be pruned @@ -585,7 +588,7 @@ impl Sumeragi { *voting_block = Some(new_block); } } - (Message::BlockSigned(BlockSigned { hash, signatures }), Role::ProxyTail) => { + (BlockMessage::BlockSigned(BlockSigned { hash, signatures }), Role::ProxyTail) => { trace!(block_hash=%hash, "Received block signatures"); let roles: &[Role] = if current_view_change_index >= 1 { @@ -621,7 +624,6 @@ impl Sumeragi { &mut self, voting_block: &mut Option, current_view_change_index: u64, - view_change_proof_chain: &mut ProofChain, round_start_time: &Instant, #[cfg_attr(not(debug_assertions), allow(unused_variables))] is_genesis_peer: bool, ) { @@ -662,10 +664,7 @@ impl Sumeragi { info!(%addr, block_payload_hash=%new_block.payload().hash(), "Block created"); *voting_block = Some(VotingBlock::new(new_block.clone(), new_wsv)); - let msg = MessagePacket::new( - view_change_proof_chain.clone(), - Some(BlockCreated::from(new_block).into()), - ); + let msg = BlockCreated::from(new_block).into(); if current_view_change_index >= 1 { self.broadcast_packet(msg); } else { @@ -674,12 +673,9 @@ impl Sumeragi { } else { match new_block.commit(current_topology) { Ok(committed_block) => { - let msg = MessagePacket::new( - view_change_proof_chain.clone(), - Some(BlockCommitted::from(committed_block.clone()).into()), + self.broadcast_packet( + BlockCommitted::from(committed_block.clone()).into(), ); - - self.broadcast_packet(msg); self.commit_block(committed_block, new_wsv); } Err((_, error)) => error!(%addr, role=%Role::Leader, ?error), @@ -697,10 +693,7 @@ impl Sumeragi { Ok(committed_block) => { info!(voting_block_hash = %committed_block.hash(), "Block reached required number of votes"); - let msg = MessagePacket::new( - view_change_proof_chain.clone(), - Some(BlockCommitted::from(committed_block.clone()).into()), - ); + let msg = BlockCommitted::from(committed_block.clone()).into(); let current_topology = current_topology .is_consensus_required() @@ -858,15 +851,11 @@ pub(crate) fn run( // Instant when the previous view change or round happened. let mut last_view_change_time = Instant::now(); - // Internal variable used to pick receiver channel. Initialize to zero. - let mut control_message_in_a_row_counter = 0; - while !should_terminate(&mut shutdown_receiver) { if should_sleep { let span = span!(Level::TRACE, "main_thread_sleep"); let _enter = span.enter(); std::thread::sleep(std::time::Duration::from_millis(5)); - should_sleep = false; } let span_for_sumeragi_cycle = span!(Level::TRACE, "main_thread_cycle"); let _enter_for_sumeragi_cycle = span_for_sumeragi_cycle.enter(); @@ -912,50 +901,63 @@ pub(crate) fn run( &mut view_change_time, ); + if let Some(message) = { + let (msg, sleep) = sumeragi.receive_network_packet(&mut view_change_proof_chain); + should_sleep = sleep; + msg + } { + sumeragi.handle_message( + message, + &mut voting_block, + current_view_change_index, + &mut voting_signatures, + ); + } + + // State could be changed after handling message so it is necessary to reset state before handling message independent step + let current_view_change_index = sumeragi + .prune_view_change_proofs_and_calculate_current_index(&mut view_change_proof_chain); + + // We broadcast our view change suggestion after having processed the latest from others inside `receive_network_packet` let node_expects_block = !sumeragi.transaction_cache.is_empty(); - if node_expects_block && last_view_change_time.elapsed() > view_change_time { + if (node_expects_block || current_view_change_index > 0) + && last_view_change_time.elapsed() > view_change_time + { let role = sumeragi.current_topology.role(&sumeragi.peer_id); - if let Some(VotingBlock { block, .. }) = voting_block.as_ref() { - // NOTE: Suspecting the tail node because it hasn't yet committed a block produced by leader - warn!(peer_public_key=%sumeragi.peer_id.public_key, %role, block=%block.payload().hash(), "Block not committed in due time, requesting view change..."); - } else { - // NOTE: Suspecting the leader node because it hasn't produced a block - // If the current node has a transaction, the leader should have as well - warn!(peer_public_key=%sumeragi.peer_id.public_key, %role, "No block produced in due time, requesting view change..."); + if node_expects_block { + if let Some(VotingBlock { block, .. }) = voting_block.as_ref() { + // NOTE: Suspecting the tail node because it hasn't yet committed a block produced by leader + warn!(peer_public_key=%sumeragi.peer_id.public_key, %role, block=%block.payload().hash(), "Block not committed in due time, requesting view change..."); + } else { + // NOTE: Suspecting the leader node because it hasn't produced a block + // If the current node has a transaction, the leader should have as well + warn!(peer_public_key=%sumeragi.peer_id.public_key, %role, "No block produced in due time, requesting view change..."); + } + + let suspect_proof = + ProofBuilder::new(sumeragi.wsv.latest_block_hash(), current_view_change_index) + .sign(sumeragi.key_pair.clone()) + .expect("Proof signing failed"); + + view_change_proof_chain + .insert_proof( + &sumeragi.current_topology.ordered_peers, + sumeragi.current_topology.max_faults(), + sumeragi.wsv.latest_block_hash(), + suspect_proof, + ) + .unwrap_or_else(|err| error!("{err}")); } - sumeragi.suggest_view_change(&mut view_change_proof_chain, current_view_change_index); + let msg = ControlFlowMessage::new(view_change_proof_chain.clone()); + sumeragi.broadcast_control_flow_packet(msg); // NOTE: View change must be periodically suggested until it is accepted. // Must be initialized to pipeline time but can increase by chosen amount view_change_time += sumeragi.pipeline_time(); } - sumeragi - .receive_network_packet( - &mut view_change_proof_chain, - &mut control_message_in_a_row_counter, - ) - .map_or_else( - || { - should_sleep = true; - }, - |message| { - sumeragi.handle_message( - message, - &mut voting_block, - current_view_change_index, - &mut view_change_proof_chain, - &mut voting_signatures, - ); - }, - ); - - // State could be changed after handling message so it is necessary to reset state before handling message independent step - let current_view_change_index = sumeragi - .prune_view_change_proofs_and_calculate_current_index(&mut view_change_proof_chain); - reset_state( &sumeragi.peer_id, sumeragi.pipeline_time(), @@ -977,7 +979,6 @@ pub(crate) fn run( sumeragi.process_message_independent( &mut voting_block, current_view_change_index, - &mut view_change_proof_chain, &round_start_time, is_genesis_peer, ); diff --git a/core/src/sumeragi/message.rs b/core/src/sumeragi/message.rs index 5aa47890cdd..329f9ba135e 100644 --- a/core/src/sumeragi/message.rs +++ b/core/src/sumeragi/message.rs @@ -7,30 +7,10 @@ use parity_scale_codec::{Decode, Encode}; use super::view_change; use crate::block::{CommittedBlock, ValidBlock}; -/// Helper structure, wrapping messages and view change proofs. -#[derive(Debug, Clone, Decode, Encode)] -pub struct MessagePacket { - /// Proof of view change. As part of this message handling, all - /// peers which agree with view change should sign it. - pub view_change_proofs: view_change::ProofChain, - /// Actual Sumeragi message in this packet. - pub message: Option, -} - -impl MessagePacket { - /// Construct [`Self`] - pub fn new(view_change_proofs: view_change::ProofChain, message: Option) -> Self { - Self { - view_change_proofs, - message, - } - } -} - #[allow(clippy::enum_variant_names)] /// Message's variants that are used by peers to communicate in the process of consensus. #[derive(Debug, Clone, Decode, Encode, FromVariant)] -pub enum Message { +pub enum BlockMessage { /// This message is sent by leader to all validating peers, when a new block is created. BlockCreated(BlockCreated), /// This message is sent by validating peers to proxy tail and observing peers when they have signed this block. @@ -42,18 +22,17 @@ pub enum Message { } /// Specialization of `MessagePacket` +#[derive(Debug, Clone, Decode, Encode)] pub struct ControlFlowMessage { /// Proof of view change. As part of this message handling, all /// peers which agree with view change should sign it. pub view_change_proofs: view_change::ProofChain, } -impl From for MessagePacket { - fn from(m: ControlFlowMessage) -> MessagePacket { - MessagePacket { - view_change_proofs: m.view_change_proofs, - message: None, - } +impl ControlFlowMessage { + /// Helper function to construct a `ControlFlowMessage` + pub fn new(view_change_proofs: view_change::ProofChain) -> ControlFlowMessage { + ControlFlowMessage { view_change_proofs } } } diff --git a/core/src/sumeragi/mod.rs b/core/src/sumeragi/mod.rs index c939e89bac1..b5d0a64f360 100644 --- a/core/src/sumeragi/mod.rs +++ b/core/src/sumeragi/mod.rs @@ -26,10 +26,7 @@ pub mod view_change; use parking_lot::Mutex; -use self::{ - message::{Message, *}, - view_change::ProofChain, -}; +use self::{message::*, view_change::ProofChain}; use crate::{kura::Kura, prelude::*, queue::Queue, EventsSender, IrohaNetwork, NetworkMessage}; /* @@ -55,7 +52,7 @@ pub struct SumeragiHandle { _thread_handle: Arc, // Should be dropped after `_thread_handle` to prevent sumeargi thread from panicking control_message_sender: mpsc::SyncSender, - message_sender: mpsc::SyncSender, + message_sender: mpsc::SyncSender, } impl SumeragiHandle { @@ -203,20 +200,21 @@ impl SumeragiHandle { &self.metrics } + /// Deposit a sumeragi control flow network message. + pub fn incoming_control_flow_message(&self, msg: ControlFlowMessage) { + if let Err(error) = self.control_message_sender.try_send(msg) { + self.metrics.dropped_messages.inc(); + error!( + ?error, + "This peer is faulty. \ + Incoming control messages have to be dropped due to low processing speed." + ); + } + } + /// Deposit a sumeragi network message. - pub fn incoming_message(&self, msg: MessagePacket) { - if msg.message.is_none() { - if let Err(error) = self.control_message_sender.try_send(ControlFlowMessage { - view_change_proofs: msg.view_change_proofs, - }) { - self.metrics.dropped_messages.inc(); - error!( - ?error, - "This peer is faulty. \ - Incoming control messages have to be dropped due to low processing speed." - ); - } - } else if let Err(error) = self.message_sender.try_send(msg) { + pub fn incoming_block_message(&self, msg: BlockMessage) { + if let Err(error) = self.message_sender.try_send(msg) { self.metrics.dropped_messages.inc(); error!( ?error, diff --git a/scripts/generate_wasm.sh b/scripts/generate_wasm.sh new file mode 100755 index 00000000000..e693108933f --- /dev/null +++ b/scripts/generate_wasm.sh @@ -0,0 +1,28 @@ +#!/bin/sh + +# Default source directory +DEFAULT_SOURCE_DIR="client/tests/integration/smartcontracts" + +# If no arguments are provided, use the default source directory +if [ "$#" -eq 0 ]; then + SOURCE_DIR="$DEFAULT_SOURCE_DIR" +else + SOURCE_DIR="$1" +fi + +TARGET_DIR="test-smartcontracts" + +mkdir -p "$TARGET_DIR" + +for folder in "$SOURCE_DIR"/*; do + if [ -d "$folder" ] && [ "$(basename "$folder")" != ".cargo" ]; then + + folder_name=$(basename "$folder") + target_wasm_file_path="${TARGET_DIR}/${folder_name}.wasm" + # Build the smart contracts + cargo run --bin iroha_wasm_builder_cli -- build "$folder" --optimize --outfile "$target_wasm_file_path" + + fi +done + +echo "Smart contracts build complete."