diff --git a/stackslib/src/chainstate/nakamoto/mod.rs b/stackslib/src/chainstate/nakamoto/mod.rs index ca37e30121..1e22076b4d 100644 --- a/stackslib/src/chainstate/nakamoto/mod.rs +++ b/stackslib/src/chainstate/nakamoto/mod.rs @@ -2452,7 +2452,7 @@ impl NakamotoChainState { db_handle: &mut SortitionHandleConn, staging_db_tx: &NakamotoStagingBlocksTx, headers_conn: &Connection, - reward_set: RewardSet, + reward_set: &RewardSet, obtain_method: NakamotoBlockObtainMethod, ) -> Result { test_debug!("Consider Nakamoto block {}", &block.block_id()); @@ -2522,7 +2522,7 @@ impl NakamotoChainState { return Ok(false); }; - let signing_weight = match block.header.verify_signer_signatures(&reward_set) { + let signing_weight = match block.header.verify_signer_signatures(reward_set) { Ok(x) => x, Err(e) => { warn!("Received block, but the signer signatures are invalid"; diff --git a/stackslib/src/net/relay.rs b/stackslib/src/net/relay.rs index cb7d310321..b93171916c 100644 --- a/stackslib/src/net/relay.rs +++ b/stackslib/src/net/relay.rs @@ -1077,7 +1077,7 @@ impl Relayer { sort_handle, &staging_db_tx, headers_conn, - reward_set, + &reward_set, obtained_method, )?; staging_db_tx.commit()?; diff --git a/testnet/stacks-node/src/nakamoto_node.rs b/testnet/stacks-node/src/nakamoto_node.rs index 19af89a3bc..09f8c7285f 100644 --- a/testnet/stacks-node/src/nakamoto_node.rs +++ b/testnet/stacks-node/src/nakamoto_node.rs @@ -44,7 +44,8 @@ use crate::run_loop::RegisteredKey; pub mod miner; pub mod peer; pub mod relayer; -pub mod sign_coordinator; +pub mod signer_coordinator; +pub mod stackerdb_listener; use self::peer::PeerThread; use self::relayer::{RelayerDirective, RelayerThread}; diff --git a/testnet/stacks-node/src/nakamoto_node/miner.rs b/testnet/stacks-node/src/nakamoto_node/miner.rs index b15d0f4c7e..f649046096 100644 --- a/testnet/stacks-node/src/nakamoto_node/miner.rs +++ b/testnet/stacks-node/src/nakamoto_node/miner.rs @@ -41,14 +41,13 @@ use stacks::net::p2p::NetworkHandle; use stacks::net::stackerdb::StackerDBs; use stacks::net::{NakamotoBlocksData, StacksMessageType}; use stacks::util::get_epoch_time_secs; -use stacks::util::secp256k1::MessageSignature; use stacks_common::types::chainstate::{StacksAddress, StacksBlockId}; use stacks_common::types::{PrivateKey, StacksEpochId}; use stacks_common::util::vrf::VRFProof; use super::relayer::RelayerThread; -use super::sign_coordinator::SignCoordinator; use super::{Config, Error as NakamotoNodeError, EventDispatcher, Keychain}; +use crate::nakamoto_node::signer_coordinator::SignerCoordinator; use crate::nakamoto_node::VRF_MOCK_MINER_KEY; use crate::neon_node; use crate::run_loop::nakamoto::Globals; @@ -139,6 +138,8 @@ pub struct BlockMinerThread { burnchain: Burnchain, /// Last block mined last_block_mined: Option, + /// Number of blocks mined since a tenure change/extend + mined_blocks: u64, /// Copy of the node's registered VRF key registered_key: RegisteredKey, /// Burnchain block snapshot which elected this miner @@ -172,6 +173,7 @@ impl BlockMinerThread { keychain: rt.keychain.clone(), burnchain: rt.burnchain.clone(), last_block_mined: None, + mined_blocks: 0, registered_key, burn_election_block, burn_block, @@ -288,172 +290,237 @@ impl BlockMinerThread { let mut stackerdbs = StackerDBs::connect(&self.config.get_stacker_db_file_path(), true)?; let mut last_block_rejected = false; + let reward_set = self.load_signer_set()?; + let Some(miner_privkey) = self.config.miner.mining_key else { + return Err(NakamotoNodeError::MinerConfigurationFailed( + "No mining key configured, cannot mine", + )); + }; + let sortdb = SortitionDB::open( + &self.config.get_burn_db_file_path(), + true, + self.burnchain.pox_constants.clone(), + ) + .expect("FATAL: could not open sortition DB"); + let burn_tip = SortitionDB::get_canonical_burn_chain_tip(sortdb.conn()) + .expect("FATAL: failed to query sortition DB for canonical burn chain tip"); + + // Start the signer coordinator + let mut coordinator = SignerCoordinator::new( + self.event_dispatcher.stackerdb_channel.clone(), + self.globals.should_keep_running.clone(), + &reward_set, + &burn_tip, + &self.burnchain, + miner_privkey, + &self.config, + ) + .map_err(|e| { + NakamotoNodeError::SigningCoordinatorFailure(format!( + "Failed to initialize the signing coordinator. Cannot mine! {e:?}" + )) + })?; + // now, actually run this tenure loop { - #[cfg(test)] - if *TEST_MINE_STALL.lock().unwrap() == Some(true) { - // Do an extra check just so we don't log EVERY time. - warn!("Mining is stalled due to testing directive"); - while *TEST_MINE_STALL.lock().unwrap() == Some(true) { - std::thread::sleep(std::time::Duration::from_millis(10)); - } - warn!("Mining is no longer stalled due to testing directive. Continuing..."); + if let Err(e) = self.miner_main_loop( + &mut coordinator, + &sortdb, + &mut stackerdbs, + &mut last_block_rejected, + &reward_set, + ) { + // Before stopping this miner, shutdown the coordinator thread. + coordinator.shutdown(); + return Err(e); } - let new_block = loop { - // If we're mock mining, we may not have processed the block that the - // actual tenure winner committed to yet. So, before attempting to - // mock mine, check if the parent is processed. - if self.config.get_node_config(false).mock_mining { - let burn_db_path = self.config.get_burn_db_file_path(); - let mut burn_db = SortitionDB::open( - &burn_db_path, - true, - self.burnchain.pox_constants.clone(), - ) - .expect("FATAL: could not open sortition DB"); - let burn_tip_changed = self.check_burn_tip_changed(&burn_db); - let mut chain_state = neon_node::open_chainstate_with_faults(&self.config) - .expect("FATAL: could not open chainstate DB"); - match burn_tip_changed - .and_then(|_| self.load_block_parent_info(&mut burn_db, &mut chain_state)) - { - Ok(..) => {} - Err(NakamotoNodeError::ParentNotFound) => { - info!("Mock miner has not processed parent block yet, sleeping and trying again"); - thread::sleep(Duration::from_millis(ABORT_TRY_AGAIN_MS)); - continue; - } - Err(e) => { - warn!("Mock miner failed to load parent info: {e:?}"); - return Err(e); - } - } - } + } + } - match self.mine_block() { - Ok(x) => { - if !self.validate_timestamp(&x)? { - info!("Block mined too quickly. Will try again."; - "block_timestamp" => x.header.timestamp, - ); - continue; - } - break Some(x); - } - Err(NakamotoNodeError::MiningFailure(ChainstateError::MinerAborted)) => { - info!("Miner interrupted while mining, will try again"); - // sleep, and try again. if the miner was interrupted because the burnchain - // view changed, the next `mine_block()` invocation will error + /// The main loop for the miner thread. This is where the miner will mine + /// blocks and then attempt to sign and broadcast them. + fn miner_main_loop( + &mut self, + coordinator: &mut SignerCoordinator, + sortdb: &SortitionDB, + stackerdbs: &mut StackerDBs, + last_block_rejected: &mut bool, + reward_set: &RewardSet, + ) -> Result<(), NakamotoNodeError> { + #[cfg(test)] + if *TEST_MINE_STALL.lock().unwrap() == Some(true) { + // Do an extra check just so we don't log EVERY time. + warn!("Mining is stalled due to testing directive"); + while *TEST_MINE_STALL.lock().unwrap() == Some(true) { + std::thread::sleep(std::time::Duration::from_millis(10)); + } + warn!("Mining is no longer stalled due to testing directive. Continuing..."); + } + let new_block = loop { + // If we're mock mining, we may not have processed the block that the + // actual tenure winner committed to yet. So, before attempting to + // mock mine, check if the parent is processed. + if self.config.get_node_config(false).mock_mining { + let burn_db_path = self.config.get_burn_db_file_path(); + let mut burn_db = + SortitionDB::open(&burn_db_path, true, self.burnchain.pox_constants.clone()) + .expect("FATAL: could not open sortition DB"); + let burn_tip_changed = self.check_burn_tip_changed(&burn_db); + let mut chain_state = neon_node::open_chainstate_with_faults(&self.config) + .expect("FATAL: could not open chainstate DB"); + match burn_tip_changed + .and_then(|_| self.load_block_parent_info(&mut burn_db, &mut chain_state)) + { + Ok(..) => {} + Err(NakamotoNodeError::ParentNotFound) => { + info!("Mock miner has not processed parent block yet, sleeping and trying again"); thread::sleep(Duration::from_millis(ABORT_TRY_AGAIN_MS)); continue; } - Err(NakamotoNodeError::MiningFailure( - ChainstateError::NoTransactionsToMine, - )) => { - debug!("Miner did not find any transactions to mine"); - break None; - } Err(e) => { - warn!("Failed to mine block: {e:?}"); - - // try again, in case a new sortition is pending - self.globals - .raise_initiative(format!("MiningFailure: {e:?}")); - return Err(ChainstateError::MinerAborted.into()); + warn!("Mock miner failed to load parent info: {e:?}"); + return Err(e); } } - }; - - if let Some(mut new_block) = new_block { - Self::fault_injection_block_broadcast_stall(&new_block); - let (reward_set, signer_signature) = match self - .gather_signatures(&mut new_block, &mut stackerdbs) - { - Ok(x) => x, - Err(e) => match e { - NakamotoNodeError::StacksTipChanged => { - info!("Stacks tip changed while waiting for signatures"; - "signer_sighash" => %new_block.header.signer_signature_hash(), - "block_height" => new_block.header.chain_length, - "consensus_hash" => %new_block.header.consensus_hash, - ); - return Err(e); - } - NakamotoNodeError::BurnchainTipChanged => { - info!("Burnchain tip changed while waiting for signatures"; - "signer_sighash" => %new_block.header.signer_signature_hash(), - "block_height" => new_block.header.chain_length, - "consensus_hash" => %new_block.header.consensus_hash, - ); - return Err(e); - } - _ => { - // Sleep for a bit to allow signers to catch up - let pause_ms = if last_block_rejected { - self.config.miner.subsequent_rejection_pause_ms - } else { - self.config.miner.first_rejection_pause_ms - }; - - error!("Error while gathering signatures: {e:?}. Will try mining again in {pause_ms}."; - "signer_sighash" => %new_block.header.signer_signature_hash(), - "block_height" => new_block.header.chain_length, - "consensus_hash" => %new_block.header.consensus_hash, - ); - thread::sleep(Duration::from_millis(pause_ms)); - last_block_rejected = true; - continue; - } - }, - }; - last_block_rejected = false; + } - new_block.header.signer_signature = signer_signature; - if let Err(e) = self.broadcast(new_block.clone(), reward_set, &stackerdbs) { - warn!("Error accepting own block: {e:?}. Will try mining again."); + match self.mine_block(coordinator) { + Ok(x) => { + if !self.validate_timestamp(&x)? { + info!("Block mined too quickly. Will try again."; + "block_timestamp" => x.header.timestamp, + ); + continue; + } + break Some(x); + } + Err(NakamotoNodeError::MiningFailure(ChainstateError::MinerAborted)) => { + info!("Miner interrupted while mining, will try again"); + // sleep, and try again. if the miner was interrupted because the burnchain + // view changed, the next `mine_block()` invocation will error + thread::sleep(Duration::from_millis(ABORT_TRY_AGAIN_MS)); continue; - } else { - info!( - "Miner: Block signed by signer set and broadcasted"; - "signer_sighash" => %new_block.header.signer_signature_hash(), - "stacks_block_hash" => %new_block.header.block_hash(), - "stacks_block_id" => %new_block.header.block_id(), - "block_height" => new_block.header.chain_length, - "consensus_hash" => %new_block.header.consensus_hash, - ); } + Err(NakamotoNodeError::MiningFailure(ChainstateError::NoTransactionsToMine)) => { + debug!("Miner did not find any transactions to mine"); + break None; + } + Err(e) => { + warn!("Failed to mine block: {e:?}"); - // update mined-block counters and mined-tenure counters - self.globals.counters.bump_naka_mined_blocks(); - if self.last_block_mined.is_some() { - // this is the first block of the tenure, bump tenure counter - self.globals.counters.bump_naka_mined_tenures(); + // try again, in case a new sortition is pending + self.globals + .raise_initiative(format!("MiningFailure: {e:?}")); + return Err(ChainstateError::MinerAborted.into()); } + } + }; + + if let Some(mut new_block) = new_block { + Self::fault_injection_block_broadcast_stall(&new_block); + let mut chain_state = + neon_node::open_chainstate_with_faults(&self.config).map_err(|e| { + NakamotoNodeError::SigningCoordinatorFailure(format!( + "Failed to open chainstate DB. Cannot mine! {e:?}" + )) + })?; + let signer_signature = match coordinator.propose_block( + &mut new_block, + &self.burn_block, + &self.burnchain, + &sortdb, + &mut chain_state, + stackerdbs, + &self.globals.counters, + &self.burn_election_block.consensus_hash, + ) { + Ok(x) => x, + Err(e) => match e { + NakamotoNodeError::StacksTipChanged => { + info!("Stacks tip changed while waiting for signatures"; + "signer_sighash" => %new_block.header.signer_signature_hash(), + "block_height" => new_block.header.chain_length, + "consensus_hash" => %new_block.header.consensus_hash, + ); + return Err(e); + } + NakamotoNodeError::BurnchainTipChanged => { + info!("Burnchain tip changed while waiting for signatures"; + "signer_sighash" => %new_block.header.signer_signature_hash(), + "block_height" => new_block.header.chain_length, + "consensus_hash" => %new_block.header.consensus_hash, + ); + return Err(e); + } + _ => { + // Sleep for a bit to allow signers to catch up + let pause_ms = if *last_block_rejected { + self.config.miner.subsequent_rejection_pause_ms + } else { + self.config.miner.first_rejection_pause_ms + }; + + error!("Error while gathering signatures: {e:?}. Will try mining again in {pause_ms}."; + "signer_sighash" => %new_block.header.signer_signature_hash(), + "block_height" => new_block.header.chain_length, + "consensus_hash" => %new_block.header.consensus_hash, + ); + thread::sleep(Duration::from_millis(pause_ms)); + *last_block_rejected = true; + return Ok(()); + } + }, + }; + *last_block_rejected = false; - // wake up chains coordinator - Self::fault_injection_block_announce_stall(&new_block); - self.globals.coord().announce_new_stacks_block(); + new_block.header.signer_signature = signer_signature; + if let Err(e) = self.broadcast(new_block.clone(), reward_set, &stackerdbs) { + warn!("Error accepting own block: {e:?}. Will try mining again."); + return Ok(()); + } else { + info!( + "Miner: Block signed by signer set and broadcasted"; + "signer_sighash" => %new_block.header.signer_signature_hash(), + "stacks_block_hash" => %new_block.header.block_hash(), + "stacks_block_id" => %new_block.header.block_id(), + "block_height" => new_block.header.chain_length, + "consensus_hash" => %new_block.header.consensus_hash, + ); + } - self.last_block_mined = Some(new_block); + // update mined-block counters and mined-tenure counters + self.globals.counters.bump_naka_mined_blocks(); + if self.last_block_mined.is_some() { + // this is the first block of the tenure, bump tenure counter + self.globals.counters.bump_naka_mined_tenures(); } - let Ok(sort_db) = SortitionDB::open( - &self.config.get_burn_db_file_path(), - true, - self.burnchain.pox_constants.clone(), - ) else { - error!("Failed to open sortition DB. Will try mining again."); - continue; - }; + // wake up chains coordinator + Self::fault_injection_block_announce_stall(&new_block); + self.globals.coord().announce_new_stacks_block(); - let wait_start = Instant::now(); - while wait_start.elapsed() < self.config.miner.wait_on_interim_blocks { - thread::sleep(Duration::from_millis(ABORT_TRY_AGAIN_MS)); - if self.check_burn_tip_changed(&sort_db).is_err() { - return Err(NakamotoNodeError::BurnchainTipChanged); - } + self.last_block_mined = Some(new_block); + self.mined_blocks += 1; + } + + let Ok(sort_db) = SortitionDB::open( + &self.config.get_burn_db_file_path(), + true, + self.burnchain.pox_constants.clone(), + ) else { + error!("Failed to open sortition DB. Will try mining again."); + return Ok(()); + }; + + let wait_start = Instant::now(); + while wait_start.elapsed() < self.config.miner.wait_on_interim_blocks { + thread::sleep(Duration::from_millis(ABORT_TRY_AGAIN_MS)); + if self.check_burn_tip_changed(&sort_db).is_err() { + return Err(NakamotoNodeError::BurnchainTipChanged); } } + + Ok(()) } /// Load the signer set active for this miner's blocks. This is the @@ -521,68 +588,6 @@ impl BlockMinerThread { Ok(reward_set) } - /// Gather a list of signatures from the signers for the block - fn gather_signatures( - &mut self, - new_block: &mut NakamotoBlock, - stackerdbs: &mut StackerDBs, - ) -> Result<(RewardSet, Vec), NakamotoNodeError> { - let Some(miner_privkey) = self.config.miner.mining_key else { - return Err(NakamotoNodeError::MinerConfigurationFailed( - "No mining key configured, cannot mine", - )); - }; - let sort_db = SortitionDB::open( - &self.config.get_burn_db_file_path(), - true, - self.burnchain.pox_constants.clone(), - ) - .map_err(|e| { - NakamotoNodeError::SigningCoordinatorFailure(format!( - "Failed to open sortition DB. Cannot mine! {e:?}" - )) - })?; - - let reward_set = self.load_signer_set()?; - - if self.config.get_node_config(false).mock_mining { - return Ok((reward_set, Vec::new())); - } - - let mut coordinator = SignCoordinator::new( - &reward_set, - miner_privkey, - &self.config, - self.globals.should_keep_running.clone(), - self.event_dispatcher.stackerdb_channel.clone(), - ) - .map_err(|e| { - NakamotoNodeError::SigningCoordinatorFailure(format!( - "Failed to initialize the signing coordinator. Cannot mine! {e:?}" - )) - })?; - - let mut chain_state = - neon_node::open_chainstate_with_faults(&self.config).map_err(|e| { - NakamotoNodeError::SigningCoordinatorFailure(format!( - "Failed to open chainstate DB. Cannot mine! {e:?}" - )) - })?; - - let signature = coordinator.run_sign_v0( - new_block, - &self.burn_block, - &self.burnchain, - &sort_db, - &mut chain_state, - stackerdbs, - &self.globals.counters, - &self.burn_election_block.consensus_hash, - )?; - - Ok((reward_set, signature)) - } - /// Fault injection -- possibly fail to broadcast /// Return true to drop the block fn fault_injection_broadcast_fail(&self) -> bool { @@ -607,7 +612,7 @@ impl BlockMinerThread { sort_db: &SortitionDB, chain_state: &mut StacksChainState, block: &NakamotoBlock, - reward_set: RewardSet, + reward_set: &RewardSet, ) -> Result<(), ChainstateError> { if Self::fault_injection_skip_block_broadcast() { warn!( @@ -664,7 +669,7 @@ impl BlockMinerThread { fn broadcast( &mut self, block: NakamotoBlock, - reward_set: RewardSet, + reward_set: &RewardSet, stackerdbs: &StackerDBs, ) -> Result<(), NakamotoNodeError> { let mut chain_state = neon_node::open_chainstate_with_faults(&self.config) @@ -701,7 +706,7 @@ impl BlockMinerThread { let miners_contract_id = boot_code_id(MINERS_NAME, chain_state.mainnet); let mut miners_session = StackerDBSession::new(&rpc_socket.to_string(), miners_contract_id); - SignCoordinator::send_miners_message( + SignerCoordinator::send_miners_message( miner_privkey, &sort_db, &self.burn_block, @@ -993,8 +998,12 @@ impl BlockMinerThread { #[cfg_attr(test, mutants::skip)] /// Try to mine a Stacks block by assembling one from mempool transactions and sending a /// burnchain block-commit transaction. If we succeed, then return the assembled block. - fn mine_block(&mut self) -> Result { + fn mine_block( + &mut self, + coordinator: &mut SignerCoordinator, + ) -> Result { debug!("block miner thread ID is {:?}", thread::current().id()); + info!("Miner: Mining block"); let burn_db_path = self.config.get_burn_db_file_path(); let reward_set = self.load_signer_set()?; @@ -1037,6 +1046,7 @@ impl BlockMinerThread { &parent_block_info, vrf_proof, target_epoch_id, + coordinator, )?; parent_block_info.stacks_parent_header.microblock_tail = None; @@ -1118,24 +1128,45 @@ impl BlockMinerThread { #[cfg_attr(test, mutants::skip)] /// Create the tenure start info for the block we're going to build fn make_tenure_start_info( - &self, + &mut self, chainstate: &StacksChainState, parent_block_info: &ParentStacksBlockInfo, vrf_proof: VRFProof, target_epoch_id: StacksEpochId, + coordinator: &mut SignerCoordinator, ) -> Result { let current_miner_nonce = parent_block_info.coinbase_nonce; - let Some(parent_tenure_info) = &parent_block_info.parent_tenure else { - return Ok(NakamotoTenureInfo { - coinbase_tx: None, - tenure_change_tx: None, - }); + let parent_tenure_info = match &parent_block_info.parent_tenure { + Some(info) => info.clone(), + None => { + // We may be able to extend the current tenure + if self.last_block_mined.is_none() { + debug!("Miner: No parent tenure and no last block mined"); + return Ok(NakamotoTenureInfo { + coinbase_tx: None, + tenure_change_tx: None, + }); + } + ParentTenureInfo { + parent_tenure_blocks: self.mined_blocks, + parent_tenure_consensus_hash: self.burn_election_block.consensus_hash, + } + } }; if self.last_block_mined.is_some() { - return Ok(NakamotoTenureInfo { - coinbase_tx: None, - tenure_change_tx: None, - }); + // Check if we can extend the current tenure + let tenure_extend_timestamp = coordinator.get_tenure_extend_timestamp(); + if get_epoch_time_secs() < tenure_extend_timestamp { + return Ok(NakamotoTenureInfo { + coinbase_tx: None, + tenure_change_tx: None, + }); + } + info!("Miner: Time-based tenure extend"; + "current_timestamp" => get_epoch_time_secs(), + "tenure_extend_timestamp" => tenure_extend_timestamp, + ); + self.tenure_extend_reset(); } let parent_block_id = parent_block_info.stacks_parent_header.index_block_hash(); @@ -1202,6 +1233,13 @@ impl BlockMinerThread { Ok(()) } } + + fn tenure_extend_reset(&mut self) { + self.reason = MinerReason::Extended { + burn_view_consensus_hash: self.burn_block.consensus_hash, + }; + self.mined_blocks = 0; + } } impl ParentStacksBlockInfo { diff --git a/testnet/stacks-node/src/nakamoto_node/sign_coordinator.rs b/testnet/stacks-node/src/nakamoto_node/sign_coordinator.rs deleted file mode 100644 index f255696bea..0000000000 --- a/testnet/stacks-node/src/nakamoto_node/sign_coordinator.rs +++ /dev/null @@ -1,616 +0,0 @@ -// Copyright (C) 2024 Stacks Open Internet Foundation -// -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. -// -// You should have received a copy of the GNU General Public License -// along with this program. If not, see . - -use std::collections::BTreeMap; -use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::mpsc::Receiver; -use std::sync::{Arc, Mutex}; -use std::time::Duration; - -use hashbrown::{HashMap, HashSet}; -use libsigner::v0::messages::{ - BlockAccepted, BlockResponse, MinerSlotID, SignerMessage as SignerMessageV0, -}; -use libsigner::{BlockProposal, SignerEntries, SignerEvent, SignerSession, StackerDBSession}; -use stacks::burnchains::Burnchain; -use stacks::chainstate::burn::db::sortdb::SortitionDB; -use stacks::chainstate::burn::{BlockSnapshot, ConsensusHash}; -use stacks::chainstate::nakamoto::{NakamotoBlock, NakamotoBlockHeader, NakamotoChainState}; -use stacks::chainstate::stacks::boot::{NakamotoSignerEntry, RewardSet, MINERS_NAME, SIGNERS_NAME}; -use stacks::chainstate::stacks::db::StacksChainState; -use stacks::chainstate::stacks::events::StackerDBChunksEvent; -use stacks::chainstate::stacks::Error as ChainstateError; -use stacks::libstackerdb::StackerDBChunkData; -use stacks::net::stackerdb::StackerDBs; -use stacks::types::PublicKey; -use stacks::util::hash::MerkleHashFunc; -use stacks::util::secp256k1::MessageSignature; -use stacks::util_lib::boot::boot_code_id; -use stacks_common::bitvec::BitVec; -use stacks_common::codec::StacksMessageCodec; -use stacks_common::types::chainstate::{StacksPrivateKey, StacksPublicKey}; - -use super::Error as NakamotoNodeError; -use crate::event_dispatcher::StackerDBChannel; -use crate::neon::Counters; -use crate::Config; - -/// Fault injection flag to prevent the miner from seeing enough signer signatures. -/// Used to test that the signers will broadcast a block if it gets enough signatures -#[cfg(test)] -pub static TEST_IGNORE_SIGNERS: std::sync::Mutex> = std::sync::Mutex::new(None); - -/// How long should the coordinator poll on the event receiver before -/// waking up to check timeouts? -static EVENT_RECEIVER_POLL: Duration = Duration::from_millis(500); - -/// The `SignCoordinator` struct sole function is to serve as the coordinator for Nakamoto block signing. -/// This struct is used by Nakamoto miners to act as the coordinator for the blocks they produce. -pub struct SignCoordinator { - receiver: Option>, - message_key: StacksPrivateKey, - is_mainnet: bool, - miners_session: StackerDBSession, - signer_entries: HashMap, - weight_threshold: u32, - total_weight: u32, - keep_running: Arc, - pub next_signer_bitvec: BitVec<4000>, - stackerdb_channel: Arc>, -} - -impl Drop for SignCoordinator { - fn drop(&mut self) { - let stackerdb_channel = self - .stackerdb_channel - .lock() - .expect("FATAL: failed to lock stackerdb channel"); - stackerdb_channel.replace_receiver(self.receiver.take().expect( - "FATAL: lost possession of the StackerDB channel before dropping SignCoordinator", - )); - } -} - -impl SignCoordinator { - /// * `reward_set` - the active reward set data, used to construct the signer - /// set parameters. - /// * `aggregate_public_key` - the active aggregate key for this cycle - pub fn new( - reward_set: &RewardSet, - message_key: StacksPrivateKey, - config: &Config, - keep_running: Arc, - stackerdb_channel: Arc>, - ) -> Result { - let is_mainnet = config.is_mainnet(); - let Some(ref reward_set_signers) = reward_set.signers else { - error!("Could not initialize signing coordinator for reward set without signer"); - debug!("reward set: {reward_set:?}"); - return Err(ChainstateError::NoRegisteredSigners(0)); - }; - - let signer_entries = SignerEntries::parse(is_mainnet, reward_set_signers).map_err(|e| { - ChainstateError::InvalidStacksBlock(format!( - "Failed to parse NakamotoSignerEntries: {e:?}" - )) - })?; - let rpc_socket = config - .node - .get_rpc_loopback() - .ok_or_else(|| ChainstateError::MinerAborted)?; - let miners_contract_id = boot_code_id(MINERS_NAME, is_mainnet); - let miners_session = StackerDBSession::new(&rpc_socket.to_string(), miners_contract_id); - - let next_signer_bitvec: BitVec<4000> = BitVec::zeros( - reward_set_signers - .clone() - .len() - .try_into() - .expect("FATAL: signer set length greater than u16"), - ) - .expect("FATAL: unable to construct initial bitvec for signer set"); - - debug!( - "Initializing miner/coordinator"; - "num_signers" => signer_entries.signer_pks.len(), - "signer_public_keys" => ?signer_entries.signer_pks, - ); - - let total_weight = reward_set.total_signing_weight().map_err(|e| { - warn!("Failed to calculate total weight for the reward set: {e:?}"); - ChainstateError::NoRegisteredSigners(0) - })?; - - let threshold = NakamotoBlockHeader::compute_voting_weight_threshold(total_weight)?; - - let signer_public_keys = reward_set_signers - .iter() - .cloned() - .enumerate() - .map(|(idx, signer)| { - let Ok(slot_id) = u32::try_from(idx) else { - return Err(ChainstateError::InvalidStacksBlock( - "Signer index exceeds u32".into(), - )); - }; - Ok((slot_id, signer)) - }) - .collect::, ChainstateError>>()?; - #[cfg(test)] - { - // In test mode, short-circuit spinning up the SignCoordinator if the TEST_SIGNING - // channel has been created. This allows integration tests for the stacks-node - // independent of the stacks-signer. - use crate::tests::nakamoto_integrations::TEST_SIGNING; - if TEST_SIGNING.lock().unwrap().is_some() { - debug!("Short-circuiting spinning up coordinator from signer commitments. Using test signers channel."); - let (receiver, replaced_other) = stackerdb_channel - .lock() - .expect("FATAL: failed to lock StackerDB channel") - .register_miner_coordinator(); - if replaced_other { - warn!("Replaced the miner/coordinator receiver of a prior thread. Prior thread may have crashed."); - } - let sign_coordinator = Self { - message_key, - receiver: Some(receiver), - is_mainnet, - miners_session, - next_signer_bitvec, - signer_entries: signer_public_keys, - weight_threshold: threshold, - total_weight, - keep_running, - stackerdb_channel, - }; - return Ok(sign_coordinator); - } - } - - let (receiver, replaced_other) = stackerdb_channel - .lock() - .expect("FATAL: failed to lock StackerDB channel") - .register_miner_coordinator(); - if replaced_other { - warn!("Replaced the miner/coordinator receiver of a prior thread. Prior thread may have crashed."); - } - - Ok(Self { - receiver: Some(receiver), - message_key, - is_mainnet, - miners_session, - next_signer_bitvec, - signer_entries: signer_public_keys, - weight_threshold: threshold, - total_weight, - keep_running, - stackerdb_channel, - }) - } - - /// Send a message over the miners contract using a `StacksPrivateKey` - #[allow(clippy::too_many_arguments)] - pub fn send_miners_message( - miner_sk: &StacksPrivateKey, - sortdb: &SortitionDB, - tip: &BlockSnapshot, - stackerdbs: &StackerDBs, - message: M, - miner_slot_id: MinerSlotID, - is_mainnet: bool, - miners_session: &mut StackerDBSession, - election_sortition: &ConsensusHash, - ) -> Result<(), String> { - let Some(slot_range) = NakamotoChainState::get_miner_slot(sortdb, tip, election_sortition) - .map_err(|e| format!("Failed to read miner slot information: {e:?}"))? - else { - return Err("No slot for miner".into()); - }; - - let slot_id = slot_range - .start - .saturating_add(miner_slot_id.to_u8().into()); - if !slot_range.contains(&slot_id) { - return Err("Not enough slots for miner messages".into()); - } - // Get the LAST slot version number written to the DB. If not found, use 0. - // Add 1 to get the NEXT version number - // Note: we already check above for the slot's existence - let miners_contract_id = boot_code_id(MINERS_NAME, is_mainnet); - let slot_version = stackerdbs - .get_slot_version(&miners_contract_id, slot_id) - .map_err(|e| format!("Failed to read slot version: {e:?}"))? - .unwrap_or(0) - .saturating_add(1); - let mut chunk = StackerDBChunkData::new(slot_id, slot_version, message.serialize_to_vec()); - chunk - .sign(miner_sk) - .map_err(|_| "Failed to sign StackerDB chunk")?; - - match miners_session.put_chunk(&chunk) { - Ok(ack) => { - if ack.accepted { - debug!("Wrote message to stackerdb: {ack:?}"); - Ok(()) - } else { - Err(format!("{ack:?}")) - } - } - Err(e) => Err(format!("{e:?}")), - } - } - - /// Do we ignore signer signatures? - #[cfg(test)] - fn fault_injection_ignore_signatures() -> bool { - if *TEST_IGNORE_SIGNERS.lock().unwrap() == Some(true) { - return true; - } - false - } - - #[cfg(not(test))] - fn fault_injection_ignore_signatures() -> bool { - false - } - - /// Check if the tenure needs to change - fn check_burn_tip_changed(sortdb: &SortitionDB, burn_block: &BlockSnapshot) -> bool { - let cur_burn_chain_tip = SortitionDB::get_canonical_burn_chain_tip(sortdb.conn()) - .expect("FATAL: failed to query sortition DB for canonical burn chain tip"); - - if cur_burn_chain_tip.consensus_hash != burn_block.consensus_hash { - info!("SignCoordinator: Cancel signature aggregation; burnchain tip has changed"); - true - } else { - false - } - } - - /// Start gathering signatures for a Nakamoto block. - /// This function begins by sending a `BlockProposal` message - /// to the signers, and then waits for the signers to respond - /// with their signatures. It does so in two ways, concurrently: - /// * It waits for signer StackerDB messages with signatures. If enough signatures can be - /// found, then the block can be broadcast. - /// * It waits for the chainstate to contain the relayed block. If so, then its signatures are - /// loaded and returned. This can happen if the node receives the block via a signer who - /// fetched all signatures and assembled the signature vector, all before we could. - // Mutants skip here: this function is covered via integration tests, - // which the mutation testing does not see. - #[cfg_attr(test, mutants::skip)] - #[allow(clippy::too_many_arguments)] - pub fn run_sign_v0( - &mut self, - block: &NakamotoBlock, - burn_tip: &BlockSnapshot, - burnchain: &Burnchain, - sortdb: &SortitionDB, - chain_state: &mut StacksChainState, - stackerdbs: &StackerDBs, - counters: &Counters, - election_sortition: &ConsensusHash, - ) -> Result, NakamotoNodeError> { - let reward_cycle_id = burnchain - .block_height_to_reward_cycle(burn_tip.block_height) - .expect("FATAL: tried to initialize coordinator before first burn block height"); - - let block_proposal = BlockProposal { - block: block.clone(), - burn_height: burn_tip.block_height, - reward_cycle: reward_cycle_id, - }; - - let block_proposal_message = SignerMessageV0::BlockProposal(block_proposal); - debug!("Sending block proposal message to signers"; - "signer_signature_hash" => %block.header.signer_signature_hash(), - ); - Self::send_miners_message::( - &self.message_key, - sortdb, - burn_tip, - stackerdbs, - block_proposal_message, - MinerSlotID::BlockProposal, - self.is_mainnet, - &mut self.miners_session, - election_sortition, - ) - .map_err(NakamotoNodeError::SigningCoordinatorFailure)?; - counters.bump_naka_proposed_blocks(); - - #[cfg(test)] - { - info!( - "SignCoordinator: sent block proposal to .miners, waiting for test signing channel" - ); - // In test mode, short-circuit waiting for the signers if the TEST_SIGNING - // channel has been created. This allows integration tests for the stacks-node - // independent of the stacks-signer. - if let Some(signatures) = - crate::tests::nakamoto_integrations::TestSigningChannel::get_signature() - { - debug!("Short-circuiting waiting for signers, using test signature"); - return Ok(signatures); - } - } - - let Some(ref mut receiver) = self.receiver else { - return Err(NakamotoNodeError::SigningCoordinatorFailure( - "Failed to obtain the StackerDB event receiver".into(), - )); - }; - - let mut total_weight_signed: u32 = 0; - let mut total_reject_weight: u32 = 0; - let mut responded_signers = HashSet::new(); - let mut gathered_signatures = BTreeMap::new(); - - info!("SignCoordinator: beginning to watch for block signatures OR posted blocks."; - "threshold" => self.weight_threshold, - ); - - loop { - // look in the nakamoto staging db -- a block can only get stored there if it has - // enough signing weight to clear the threshold - if let Ok(Some((stored_block, _sz))) = chain_state - .nakamoto_blocks_db() - .get_nakamoto_block(&block.block_id()) - .map_err(|e| { - warn!( - "Failed to query chainstate for block {}: {e:?}", - &block.block_id() - ); - e - }) - { - debug!("SignCoordinator: Found signatures in relayed block"); - counters.bump_naka_signer_pushed_blocks(); - return Ok(stored_block.header.signer_signature); - } - - if Self::check_burn_tip_changed(sortdb, burn_tip) { - debug!("SignCoordinator: Exiting due to new burnchain tip"); - return Err(NakamotoNodeError::BurnchainTipChanged); - } - - // one of two things can happen: - // * we get enough signatures from stackerdb from the signers, OR - // * we see our block get processed in our chainstate (meaning, the signers broadcasted - // the block and our node got it and processed it) - let event = match receiver.recv_timeout(EVENT_RECEIVER_POLL) { - Ok(event) => event, - Err(std::sync::mpsc::RecvTimeoutError::Timeout) => { - continue; - } - Err(std::sync::mpsc::RecvTimeoutError::Disconnected) => { - return Err(NakamotoNodeError::SigningCoordinatorFailure( - "StackerDB event receiver disconnected".into(), - )) - } - }; - - // was the node asked to stop? - if !self.keep_running.load(Ordering::SeqCst) { - info!("SignerCoordinator: received node exit request. Aborting"); - return Err(NakamotoNodeError::ChannelClosed); - } - - // check to see if this event we got is a signer event - let is_signer_event = - event.contract_id.name.starts_with(SIGNERS_NAME) && event.contract_id.is_boot(); - - if !is_signer_event { - debug!("Ignoring StackerDB event for non-signer contract"; "contract" => %event.contract_id); - continue; - } - - let modified_slots = &event.modified_slots.clone(); - - let Ok(signer_event) = SignerEvent::::try_from(event).map_err(|e| { - warn!("Failure parsing StackerDB event into signer event. Ignoring message."; "err" => ?e); - }) else { - continue; - }; - let SignerEvent::SignerMessages(signer_set, messages) = signer_event else { - debug!("Received signer event other than a signer message. Ignoring."); - continue; - }; - if signer_set != u32::try_from(reward_cycle_id % 2).unwrap() { - debug!("Received signer event for other reward cycle. Ignoring."); - continue; - }; - let slot_ids = modified_slots - .iter() - .map(|chunk| chunk.slot_id) - .collect::>(); - - debug!("SignCoordinator: Received messages from signers"; - "count" => messages.len(), - "slot_ids" => ?slot_ids, - "threshold" => self.weight_threshold - ); - - for (message, slot_id) in messages.into_iter().zip(slot_ids) { - let Some(signer_entry) = &self.signer_entries.get(&slot_id) else { - return Err(NakamotoNodeError::SignerSignatureError( - "Signer entry not found".into(), - )); - }; - let Ok(signer_pubkey) = StacksPublicKey::from_slice(&signer_entry.signing_key) - else { - return Err(NakamotoNodeError::SignerSignatureError( - "Failed to parse signer public key".into(), - )); - }; - - if responded_signers.contains(&signer_pubkey) { - debug!( - "Signer {slot_id} already responded for block {}. Ignoring {message:?}.", block.header.signer_signature_hash(); - "stacks_block_hash" => %block.header.block_hash(), - "stacks_block_id" => %block.header.block_id() - ); - continue; - } - - match message { - SignerMessageV0::BlockResponse(BlockResponse::Accepted(accepted)) => { - let BlockAccepted { - signer_signature_hash: response_hash, - signature, - metadata, - response_data: _, // TOOD: utilize this info - } = accepted; - let block_sighash = block.header.signer_signature_hash(); - if block_sighash != response_hash { - warn!( - "Processed signature for a different block. Will try to continue."; - "signature" => %signature, - "block_signer_signature_hash" => %block_sighash, - "response_hash" => %response_hash, - "slot_id" => slot_id, - "reward_cycle_id" => reward_cycle_id, - "response_hash" => %response_hash, - "server_version" => %metadata.server_version - ); - continue; - } - debug!("SignCoordinator: Received valid signature from signer"; "slot_id" => slot_id, "signature" => %signature); - let Ok(valid_sig) = signer_pubkey.verify(block_sighash.bits(), &signature) - else { - warn!("Got invalid signature from a signer. Ignoring."); - continue; - }; - if !valid_sig { - warn!( - "Processed signature but didn't validate over the expected block. Ignoring"; - "signature" => %signature, - "block_signer_signature_hash" => %block_sighash, - "slot_id" => slot_id, - ); - continue; - } - - if Self::fault_injection_ignore_signatures() { - warn!("SignCoordinator: fault injection: ignoring well-formed signature for block"; - "block_signer_sighash" => %block_sighash, - "signer_pubkey" => signer_pubkey.to_hex(), - "signer_slot_id" => slot_id, - "signature" => %signature, - "signer_weight" => signer_entry.weight, - "total_weight_signed" => total_weight_signed, - "stacks_block_hash" => %block.header.block_hash(), - "stacks_block_id" => %block.header.block_id() - ); - continue; - } - - if !gathered_signatures.contains_key(&slot_id) { - total_weight_signed = total_weight_signed - .checked_add(signer_entry.weight) - .expect("FATAL: total weight signed exceeds u32::MAX"); - } - - info!("SignCoordinator: Signature Added to block"; - "block_signer_sighash" => %block_sighash, - "signer_pubkey" => signer_pubkey.to_hex(), - "signer_slot_id" => slot_id, - "signature" => %signature, - "signer_weight" => signer_entry.weight, - "total_weight_signed" => total_weight_signed, - "stacks_block_hash" => %block.header.block_hash(), - "stacks_block_id" => %block.header.block_id(), - "server_version" => metadata.server_version, - ); - gathered_signatures.insert(slot_id, signature); - responded_signers.insert(signer_pubkey); - } - SignerMessageV0::BlockResponse(BlockResponse::Rejected(rejected_data)) => { - let block_sighash = block.header.signer_signature_hash(); - if block_sighash != rejected_data.signer_signature_hash { - warn!( - "Processed rejection for a different block. Will try to continue."; - "block_signer_signature_hash" => %block_sighash, - "rejected_data.signer_signature_hash" => %rejected_data.signer_signature_hash, - "slot_id" => slot_id, - "reward_cycle_id" => reward_cycle_id, - ); - continue; - } - let rejected_pubkey = match rejected_data.recover_public_key() { - Ok(rejected_pubkey) => { - if rejected_pubkey != signer_pubkey { - warn!("Recovered public key from rejected data does not match signer's public key. Ignoring."); - continue; - } - rejected_pubkey - } - Err(e) => { - warn!("Failed to recover public key from rejected data: {e:?}. Ignoring."); - continue; - } - }; - responded_signers.insert(rejected_pubkey); - debug!( - "Signer {slot_id} rejected our block {}/{}", - &block.header.consensus_hash, - &block.header.block_hash() - ); - total_reject_weight = total_reject_weight - .checked_add(signer_entry.weight) - .expect("FATAL: total weight rejected exceeds u32::MAX"); - - if total_reject_weight.saturating_add(self.weight_threshold) - > self.total_weight - { - debug!( - "{total_reject_weight}/{} signers vote to reject our block {}/{}", - self.total_weight, - &block.header.consensus_hash, - &block.header.block_hash() - ); - counters.bump_naka_rejected_blocks(); - return Err(NakamotoNodeError::SignersRejected); - } - continue; - } - SignerMessageV0::BlockProposal(_) => { - debug!("Received block proposal message. Ignoring."); - continue; - } - SignerMessageV0::BlockPushed(_) => { - debug!("Received block pushed message. Ignoring."); - continue; - } - SignerMessageV0::MockSignature(_) - | SignerMessageV0::MockProposal(_) - | SignerMessageV0::MockBlock(_) => { - debug!("Received mock message. Ignoring."); - continue; - } - }; - } - // After gathering all signatures, return them if we've hit the threshold - if total_weight_signed >= self.weight_threshold { - info!("SignCoordinator: Received enough signatures. Continuing."; - "stacks_block_hash" => %block.header.block_hash(), - "stacks_block_id" => %block.header.block_id() - ); - return Ok(gathered_signatures.values().cloned().collect()); - } - } - } -} diff --git a/testnet/stacks-node/src/nakamoto_node/signer_coordinator.rs b/testnet/stacks-node/src/nakamoto_node/signer_coordinator.rs new file mode 100644 index 0000000000..70c9aab190 --- /dev/null +++ b/testnet/stacks-node/src/nakamoto_node/signer_coordinator.rs @@ -0,0 +1,376 @@ +// Copyright (C) 2024 Stacks Open Internet Foundation +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +use std::sync::atomic::AtomicBool; +use std::sync::{Arc, Mutex}; +use std::thread::JoinHandle; + +use libsigner::v0::messages::{MinerSlotID, SignerMessage as SignerMessageV0}; +use libsigner::{BlockProposal, SignerSession, StackerDBSession}; +use stacks::burnchains::Burnchain; +use stacks::chainstate::burn::db::sortdb::SortitionDB; +use stacks::chainstate::burn::{BlockSnapshot, ConsensusHash}; +use stacks::chainstate::nakamoto::{NakamotoBlock, NakamotoChainState}; +use stacks::chainstate::stacks::boot::{RewardSet, MINERS_NAME}; +use stacks::chainstate::stacks::db::StacksChainState; +use stacks::chainstate::stacks::Error as ChainstateError; +use stacks::codec::StacksMessageCodec; +use stacks::libstackerdb::StackerDBChunkData; +use stacks::net::stackerdb::StackerDBs; +use stacks::types::chainstate::{StacksBlockId, StacksPrivateKey}; +use stacks::util::hash::Sha512Trunc256Sum; +use stacks::util::secp256k1::MessageSignature; +use stacks::util_lib::boot::boot_code_id; + +use super::stackerdb_listener::StackerDBListenerComms; +use super::Error as NakamotoNodeError; +use crate::event_dispatcher::StackerDBChannel; +use crate::nakamoto_node::stackerdb_listener::{StackerDBListener, EVENT_RECEIVER_POLL}; +use crate::neon::Counters; +use crate::Config; + +/// The state of the signer database listener, used by the miner thread to +/// interact with the signer listener. +pub struct SignerCoordinator { + /// The private key used to sign messages from the miner + message_key: StacksPrivateKey, + /// Is this mainnet? + is_mainnet: bool, + /// The session for writing to the miners contract in the stackerdb + miners_session: StackerDBSession, + /// The total weight of all signers + total_weight: u32, + /// The weight threshold for block approval + weight_threshold: u32, + /// Interface to the StackerDB listener thread's data + stackerdb_comms: StackerDBListenerComms, + /// Keep running flag for the signer DB listener thread + keep_running: Arc, + /// Handle for the signer DB listener thread + listener_thread: Option>, +} + +impl SignerCoordinator { + /// Create a new `SignerCoordinator` instance. + /// This will spawn a new thread to listen for messages from the signer DB. + pub fn new( + stackerdb_channel: Arc>, + node_keep_running: Arc, + reward_set: &RewardSet, + burn_tip: &BlockSnapshot, + burnchain: &Burnchain, + message_key: StacksPrivateKey, + config: &Config, + ) -> Result { + info!("SignerCoordinator: starting up"); + let keep_running = Arc::new(AtomicBool::new(true)); + + // Create the stacker DB listener + let mut listener = StackerDBListener::new( + stackerdb_channel, + node_keep_running.clone(), + keep_running.clone(), + reward_set, + burn_tip, + burnchain, + )?; + let is_mainnet = config.is_mainnet(); + let rpc_socket = config + .node + .get_rpc_loopback() + .ok_or_else(|| ChainstateError::MinerAborted)?; + let miners_contract_id = boot_code_id(MINERS_NAME, is_mainnet); + let miners_session = StackerDBSession::new(&rpc_socket.to_string(), miners_contract_id); + + let mut sc = Self { + message_key, + is_mainnet, + miners_session, + total_weight: listener.total_weight, + weight_threshold: listener.weight_threshold, + stackerdb_comms: listener.get_comms(), + keep_running, + listener_thread: None, + }; + + // Spawn the signer DB listener thread + let listener_thread = std::thread::Builder::new() + .name("stackerdb_listener".to_string()) + .spawn(move || { + if let Err(e) = listener.run() { + error!("StackerDBListener: exited with error: {e:?}"); + } + }) + .map_err(|e| { + error!("Failed to spawn stackerdb_listener thread: {e:?}"); + ChainstateError::MinerAborted + })?; + + sc.listener_thread = Some(listener_thread); + + Ok(sc) + } + + /// Send a message over the miners contract using a `StacksPrivateKey` + #[allow(clippy::too_many_arguments)] + pub fn send_miners_message( + miner_sk: &StacksPrivateKey, + sortdb: &SortitionDB, + tip: &BlockSnapshot, + stackerdbs: &StackerDBs, + message: M, + miner_slot_id: MinerSlotID, + is_mainnet: bool, + miners_session: &mut StackerDBSession, + election_sortition: &ConsensusHash, + ) -> Result<(), String> { + let Some(slot_range) = NakamotoChainState::get_miner_slot(sortdb, tip, election_sortition) + .map_err(|e| format!("Failed to read miner slot information: {e:?}"))? + else { + return Err("No slot for miner".into()); + }; + + let slot_id = slot_range + .start + .saturating_add(miner_slot_id.to_u8().into()); + if !slot_range.contains(&slot_id) { + return Err("Not enough slots for miner messages".into()); + } + // Get the LAST slot version number written to the DB. If not found, use 0. + // Add 1 to get the NEXT version number + // Note: we already check above for the slot's existence + let miners_contract_id = boot_code_id(MINERS_NAME, is_mainnet); + let slot_version = stackerdbs + .get_slot_version(&miners_contract_id, slot_id) + .map_err(|e| format!("Failed to read slot version: {e:?}"))? + .unwrap_or(0) + .saturating_add(1); + let mut chunk = StackerDBChunkData::new(slot_id, slot_version, message.serialize_to_vec()); + chunk + .sign(miner_sk) + .map_err(|_| "Failed to sign StackerDB chunk")?; + + match miners_session.put_chunk(&chunk) { + Ok(ack) => { + if ack.accepted { + debug!("Wrote message to stackerdb: {ack:?}"); + Ok(()) + } else { + Err(format!("{ack:?}")) + } + } + Err(e) => Err(format!("{e:?}")), + } + } + + /// Propose a Nakamoto block and gather signatures for it. + /// This function begins by sending a `BlockProposal` message to the + /// signers, and then it waits for the signers to respond with their + /// signatures. It does so in two ways, concurrently: + /// * It waits for the signer DB listener to collect enough signatures to + /// accept or reject the block + /// * It waits for the chainstate to contain the relayed block. If so, then its signatures are + /// loaded and returned. This can happen if the node receives the block via a signer who + /// fetched all signatures and assembled the signature vector, all before we could. + // Mutants skip here: this function is covered via integration tests, + // which the mutation testing does not see. + #[cfg_attr(test, mutants::skip)] + #[allow(clippy::too_many_arguments)] + pub fn propose_block( + &mut self, + block: &NakamotoBlock, + burn_tip: &BlockSnapshot, + burnchain: &Burnchain, + sortdb: &SortitionDB, + chain_state: &mut StacksChainState, + stackerdbs: &StackerDBs, + counters: &Counters, + election_sortition: &ConsensusHash, + ) -> Result, NakamotoNodeError> { + // Add this block to the block status map. + self.stackerdb_comms.insert_block(&block.header); + + let reward_cycle_id = burnchain + .block_height_to_reward_cycle(burn_tip.block_height) + .expect("FATAL: tried to initialize coordinator before first burn block height"); + + let block_proposal = BlockProposal { + block: block.clone(), + burn_height: burn_tip.block_height, + reward_cycle: reward_cycle_id, + }; + + let block_proposal_message = SignerMessageV0::BlockProposal(block_proposal); + debug!("Sending block proposal message to signers"; + "signer_signature_hash" => %block.header.signer_signature_hash(), + ); + Self::send_miners_message::( + &self.message_key, + sortdb, + burn_tip, + stackerdbs, + block_proposal_message, + MinerSlotID::BlockProposal, + self.is_mainnet, + &mut self.miners_session, + election_sortition, + ) + .map_err(NakamotoNodeError::SigningCoordinatorFailure)?; + counters.bump_naka_proposed_blocks(); + + #[cfg(test)] + { + info!( + "SignerCoordinator: sent block proposal to .miners, waiting for test signing channel" + ); + // In test mode, short-circuit waiting for the signers if the TEST_SIGNING + // channel has been created. This allows integration tests for the stacks-node + // independent of the stacks-signer. + if let Some(signatures) = + crate::tests::nakamoto_integrations::TestSigningChannel::get_signature() + { + debug!("Short-circuiting waiting for signers, using test signature"); + return Ok(signatures); + } + } + + self.get_block_status( + &block.header.signer_signature_hash(), + &block.block_id(), + chain_state, + sortdb, + burn_tip, + counters, + ) + } + + /// Get the block status for a given block hash. + /// If we have not yet received enough signatures for this block, this + /// method will block until we do. If this block shows up in the staging DB + /// before we have enough signatures, we will return the signatures from + /// there. If a new burnchain tip is detected, we will return an error. + fn get_block_status( + &self, + block_signer_sighash: &Sha512Trunc256Sum, + block_id: &StacksBlockId, + chain_state: &mut StacksChainState, + sortdb: &SortitionDB, + burn_tip: &BlockSnapshot, + counters: &Counters, + ) -> Result, NakamotoNodeError> { + loop { + let block_status = match self.stackerdb_comms.wait_for_block_status( + block_signer_sighash, + EVENT_RECEIVER_POLL, + |status| { + status.total_weight_signed < self.weight_threshold + && status + .total_reject_weight + .saturating_add(self.weight_threshold) + <= self.total_weight + }, + )? { + Some(status) => status, + None => { + // If we just received a timeout, we should check if the burnchain + // tip has changed or if we received this signed block already in + // the staging db. + debug!("SignerCoordinator: Timeout waiting for block signatures"); + + // Look in the nakamoto staging db -- a block can only get stored there + // if it has enough signing weight to clear the threshold. + if let Ok(Some((stored_block, _sz))) = chain_state + .nakamoto_blocks_db() + .get_nakamoto_block(block_id) + .map_err(|e| { + warn!( + "Failed to query chainstate for block: {e:?}"; + "block_id" => %block_id, + "block_signer_sighash" => %block_signer_sighash, + ); + e + }) + { + debug!("SignCoordinator: Found signatures in relayed block"); + counters.bump_naka_signer_pushed_blocks(); + return Ok(stored_block.header.signer_signature); + } + + if Self::check_burn_tip_changed(sortdb, burn_tip) { + debug!("SignCoordinator: Exiting due to new burnchain tip"); + return Err(NakamotoNodeError::BurnchainTipChanged); + } + + continue; + } + }; + + if block_status + .total_reject_weight + .saturating_add(self.weight_threshold) + > self.total_weight + { + info!( + "{}/{} signers vote to reject block", + block_status.total_reject_weight, self.total_weight; + "block_signer_sighash" => %block_signer_sighash, + ); + counters.bump_naka_rejected_blocks(); + return Err(NakamotoNodeError::SignersRejected); + } else if block_status.total_weight_signed >= self.weight_threshold { + info!("Received enough signatures, block accepted"; + "block_signer_sighash" => %block_signer_sighash, + ); + return Ok(block_status.gathered_signatures.values().cloned().collect()); + } else { + return Err(NakamotoNodeError::SigningCoordinatorFailure( + "Unblocked without reaching the threshold".into(), + )); + } + } + } + + /// Get the timestamp at which at least 70% of the signing power should be + /// willing to accept a time-based tenure extension. + pub fn get_tenure_extend_timestamp(&self) -> u64 { + self.stackerdb_comms + .get_tenure_extend_timestamp(self.weight_threshold) + } + + /// Check if the tenure needs to change + fn check_burn_tip_changed(sortdb: &SortitionDB, burn_block: &BlockSnapshot) -> bool { + let cur_burn_chain_tip = SortitionDB::get_canonical_burn_chain_tip(sortdb.conn()) + .expect("FATAL: failed to query sortition DB for canonical burn chain tip"); + + if cur_burn_chain_tip.consensus_hash != burn_block.consensus_hash { + info!("SignerCoordinator: Cancel signature aggregation; burnchain tip has changed"); + true + } else { + false + } + } + + pub fn shutdown(&mut self) { + if let Some(listener_thread) = self.listener_thread.take() { + info!("SignerCoordinator: shutting down stacker db listener thread"); + self.keep_running + .store(false, std::sync::atomic::Ordering::Relaxed); + if let Err(e) = listener_thread.join() { + error!("Failed to join signer listener thread: {e:?}"); + } + debug!("SignerCoordinator: stacker db listener thread has shut down"); + } + } +} diff --git a/testnet/stacks-node/src/nakamoto_node/stackerdb_listener.rs b/testnet/stacks-node/src/nakamoto_node/stackerdb_listener.rs new file mode 100644 index 0000000000..f9ada97e57 --- /dev/null +++ b/testnet/stacks-node/src/nakamoto_node/stackerdb_listener.rs @@ -0,0 +1,551 @@ +// Copyright (C) 2024 Stacks Open Internet Foundation +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +use std::collections::BTreeMap; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::mpsc::Receiver; +use std::sync::{Arc, Condvar, Mutex}; +use std::time::Duration; + +use hashbrown::{HashMap, HashSet}; +#[cfg(test)] +use lazy_static::lazy_static; +use libsigner::v0::messages::{BlockAccepted, BlockResponse, SignerMessage as SignerMessageV0}; +use libsigner::SignerEvent; +use stacks::burnchains::Burnchain; +use stacks::chainstate::burn::BlockSnapshot; +use stacks::chainstate::nakamoto::NakamotoBlockHeader; +use stacks::chainstate::stacks::boot::{NakamotoSignerEntry, RewardSet, SIGNERS_NAME}; +use stacks::chainstate::stacks::events::StackerDBChunksEvent; +use stacks::chainstate::stacks::Error as ChainstateError; +use stacks::types::chainstate::StacksPublicKey; +use stacks::types::PublicKey; +use stacks::util::hash::{MerkleHashFunc, Sha512Trunc256Sum}; +use stacks::util::secp256k1::MessageSignature; + +use super::Error as NakamotoNodeError; +use crate::event_dispatcher::StackerDBChannel; +#[cfg(test)] +use crate::neon::TestFlag; + +#[cfg(test)] +lazy_static! { + /// Fault injection flag to prevent the miner from seeing enough signer signatures. + /// Used to test that the signers will broadcast a block if it gets enough signatures + pub static ref TEST_IGNORE_SIGNERS: TestFlag = TestFlag::default(); +} + +/// How long should the coordinator poll on the event receiver before +/// waking up to check timeouts? +pub static EVENT_RECEIVER_POLL: Duration = Duration::from_millis(500); + +#[derive(Debug, Clone)] +pub struct BlockStatus { + pub responded_signers: HashSet, + pub gathered_signatures: BTreeMap, + pub total_weight_signed: u32, + pub total_reject_weight: u32, +} + +#[derive(Debug, Clone)] +pub(crate) struct TimestampInfo { + pub timestamp: u64, + pub weight: u32, +} + +/// The listener for the StackerDB, which listens for messages from the +/// signers and tracks the state of block signatures and idle timestamps. +pub struct StackerDBListener { + /// Channel to communicate with StackerDB + stackerdb_channel: Arc>, + /// Receiver end of the StackerDB events channel + receiver: Option>, + /// Flag to shut the node down + node_keep_running: Arc, + /// Flag to shut the listener down + keep_running: Arc, + /// The signer set for this tenure (0 or 1) + signer_set: u32, + /// The total weight of all signers + pub(crate) total_weight: u32, + /// The weight threshold for block approval + pub(crate) weight_threshold: u32, + /// The signer entries for this tenure (keyed by slot_id) + signer_entries: HashMap, + /// Tracks signatures for blocks + /// - key: Sha512Trunc256Sum (signer signature hash) + /// - value: BlockStatus + pub(crate) blocks: Arc<(Mutex>, Condvar)>, + /// Tracks the timestamps from signers to decide when they should be + /// willing to accept time-based tenure extensions + /// - key: StacksPublicKey + /// - value: TimestampInfo + pub(crate) signer_idle_timestamps: Arc>>, +} + +/// Interface for other threads to retrieve info from the StackerDBListener +pub struct StackerDBListenerComms { + /// Tracks signatures for blocks + /// - key: Sha512Trunc256Sum (signer signature hash) + /// - value: BlockStatus + blocks: Arc<(Mutex>, Condvar)>, + /// Tracks the timestamps from signers to decide when they should be + /// willing to accept time-based tenure extensions + /// - key: StacksPublicKey + /// - value: TimestampInfo + signer_idle_timestamps: Arc>>, +} + +impl StackerDBListener { + pub fn new( + stackerdb_channel: Arc>, + node_keep_running: Arc, + keep_running: Arc, + reward_set: &RewardSet, + burn_tip: &BlockSnapshot, + burnchain: &Burnchain, + ) -> Result { + let (receiver, replaced_other) = stackerdb_channel + .lock() + .expect("FATAL: failed to lock StackerDB channel") + .register_miner_coordinator(); + if replaced_other { + warn!("Replaced the miner/coordinator receiver of a prior thread. Prior thread may have crashed."); + } + + let total_weight = reward_set.total_signing_weight().map_err(|e| { + warn!("Failed to calculate total weight for the reward set: {e:?}"); + ChainstateError::NoRegisteredSigners(0) + })?; + + let weight_threshold = NakamotoBlockHeader::compute_voting_weight_threshold(total_weight)?; + + let reward_cycle_id = burnchain + .block_height_to_reward_cycle(burn_tip.block_height) + .expect("FATAL: tried to initialize coordinator before first burn block height"); + let signer_set = + u32::try_from(reward_cycle_id % 2).expect("FATAL: reward cycle id % 2 exceeds u32"); + + let Some(ref reward_set_signers) = reward_set.signers else { + error!("Could not initialize signing coordinator for reward set without signer"); + debug!("reward set: {reward_set:?}"); + return Err(ChainstateError::NoRegisteredSigners(0)); + }; + + let signer_entries = reward_set_signers + .iter() + .cloned() + .enumerate() + .map(|(idx, signer)| { + let Ok(slot_id) = u32::try_from(idx) else { + return Err(ChainstateError::InvalidStacksBlock( + "Signer index exceeds u32".into(), + )); + }; + Ok((slot_id, signer)) + }) + .collect::, ChainstateError>>()?; + + Ok(Self { + stackerdb_channel, + receiver: Some(receiver), + node_keep_running, + keep_running, + signer_set, + total_weight, + weight_threshold, + signer_entries, + blocks: Arc::new((Mutex::new(HashMap::new()), Condvar::new())), + signer_idle_timestamps: Arc::new(Mutex::new(HashMap::new())), + }) + } + + pub fn get_comms(&self) -> StackerDBListenerComms { + StackerDBListenerComms { + blocks: self.blocks.clone(), + signer_idle_timestamps: self.signer_idle_timestamps.clone(), + } + } + + /// Run the StackerDB listener. + pub fn run(&mut self) -> Result<(), NakamotoNodeError> { + info!("StackerDBListener: Starting up"); + + let Some(receiver) = &self.receiver else { + return Err(NakamotoNodeError::SigningCoordinatorFailure( + "StackerDBListener: Failed to obtain the StackerDB event receiver".into(), + )); + }; + + loop { + // was the node asked to stop? + if !self.node_keep_running.load(Ordering::SeqCst) { + info!("StackerDBListener: received node exit request. Aborting"); + return Ok(()); + } + + // was the listener asked to stop? + if !self.keep_running.load(Ordering::SeqCst) { + info!("StackerDBListener: received listener exit request. Aborting"); + return Ok(()); + } + + let event = match receiver.recv_timeout(EVENT_RECEIVER_POLL) { + Ok(event) => event, + Err(std::sync::mpsc::RecvTimeoutError::Timeout) => { + debug!("StackerDBListener: No StackerDB event received. Checking flags and polling again."); + continue; + } + Err(std::sync::mpsc::RecvTimeoutError::Disconnected) => { + warn!("StackerDBListener: StackerDB event receiver disconnected"); + return Err(NakamotoNodeError::SigningCoordinatorFailure( + "StackerDB event receiver disconnected".into(), + )); + } + }; + + // check to see if this event we got is a signer event + let is_signer_event = + event.contract_id.name.starts_with(SIGNERS_NAME) && event.contract_id.is_boot(); + + if !is_signer_event { + debug!("StackerDBListener: Ignoring StackerDB event for non-signer contract"; "contract" => %event.contract_id); + continue; + } + + let modified_slots = &event.modified_slots.clone(); + + let Ok(signer_event) = SignerEvent::::try_from(event).map_err(|e| { + warn!("StackerDBListener: Failure parsing StackerDB event into signer event. Ignoring message."; "err" => ?e); + }) else { + continue; + }; + let SignerEvent::SignerMessages(signer_set, messages) = signer_event else { + debug!("StackerDBListener: Received signer event other than a signer message. Ignoring."); + continue; + }; + if signer_set != self.signer_set { + debug!( + "StackerDBListener: Received signer event for other reward cycle. Ignoring." + ); + continue; + }; + let slot_ids = modified_slots + .iter() + .map(|chunk| chunk.slot_id) + .collect::>(); + + debug!("StackerDBListener: Received messages from signers"; + "count" => messages.len(), + "slot_ids" => ?slot_ids, + ); + + for (message, slot_id) in messages.into_iter().zip(slot_ids) { + let Some(signer_entry) = &self.signer_entries.get(&slot_id) else { + return Err(NakamotoNodeError::SignerSignatureError( + "Signer entry not found".into(), + )); + }; + let Ok(signer_pubkey) = StacksPublicKey::from_slice(&signer_entry.signing_key) + else { + return Err(NakamotoNodeError::SignerSignatureError( + "Failed to parse signer public key".into(), + )); + }; + + match message { + SignerMessageV0::BlockResponse(BlockResponse::Accepted(accepted)) => { + let BlockAccepted { + signer_signature_hash: block_sighash, + signature, + metadata, + response_data, + } = accepted; + let tenure_extend_timestamp = response_data.tenure_extend_timestamp; + + let (lock, cvar) = &*self.blocks; + let mut blocks = lock.lock().expect("FATAL: failed to lock block status"); + + let block = match blocks.get_mut(&block_sighash) { + Some(block) => block, + None => { + info!( + "StackerDBListener: Received signature for block that we did not request. Ignoring."; + "signature" => %signature, + "block_signer_sighash" => %block_sighash, + "slot_id" => slot_id, + "signer_set" => self.signer_set, + ); + continue; + } + }; + + let Ok(valid_sig) = signer_pubkey.verify(block_sighash.bits(), &signature) + else { + warn!( + "StackerDBListener: Got invalid signature from a signer. Ignoring." + ); + continue; + }; + if !valid_sig { + warn!( + "StackerDBListener: Processed signature but didn't validate over the expected block. Ignoring"; + "signature" => %signature, + "block_signer_signature_hash" => %block_sighash, + "slot_id" => slot_id, + ); + continue; + } + + if Self::fault_injection_ignore_signatures() { + warn!("StackerDBListener: fault injection: ignoring well-formed signature for block"; + "block_signer_sighash" => %block_sighash, + "signer_pubkey" => signer_pubkey.to_hex(), + "signer_slot_id" => slot_id, + "signature" => %signature, + "signer_weight" => signer_entry.weight, + "total_weight_signed" => block.total_weight_signed, + ); + continue; + } + + if !block.gathered_signatures.contains_key(&slot_id) { + block.total_weight_signed = block + .total_weight_signed + .checked_add(signer_entry.weight) + .expect("FATAL: total weight signed exceeds u32::MAX"); + } + + info!("StackerDBListener: Signature Added to block"; + "block_signer_sighash" => %block_sighash, + "signer_pubkey" => signer_pubkey.to_hex(), + "signer_slot_id" => slot_id, + "signature" => %signature, + "signer_weight" => signer_entry.weight, + "total_weight_signed" => block.total_weight_signed, + "tenure_extend_timestamp" => tenure_extend_timestamp, + "server_version" => metadata.server_version, + ); + block.gathered_signatures.insert(slot_id, signature); + block.responded_signers.insert(signer_pubkey); + + if block.total_weight_signed >= self.weight_threshold { + // Signal to anyone waiting on this block that we have enough signatures + cvar.notify_all(); + } + + // Update the idle timestamp for this signer + self.update_idle_timestamp( + signer_pubkey, + tenure_extend_timestamp, + signer_entry.weight, + ); + } + SignerMessageV0::BlockResponse(BlockResponse::Rejected(rejected_data)) => { + let (lock, cvar) = &*self.blocks; + let mut blocks = lock.lock().expect("FATAL: failed to lock block status"); + + let block = match blocks.get_mut(&rejected_data.signer_signature_hash) { + Some(block) => block, + None => { + info!( + "StackerDBListener: Received rejection for block that we did not request. Ignoring."; + "block_signer_sighash" => %rejected_data.signer_signature_hash, + "slot_id" => slot_id, + "signer_set" => self.signer_set, + ); + continue; + } + }; + + let rejected_pubkey = match rejected_data.recover_public_key() { + Ok(rejected_pubkey) => { + if rejected_pubkey != signer_pubkey { + warn!("StackerDBListener: Recovered public key from rejected data does not match signer's public key. Ignoring."); + continue; + } + rejected_pubkey + } + Err(e) => { + warn!("StackerDBListener: Failed to recover public key from rejected data: {e:?}. Ignoring."); + continue; + } + }; + block.responded_signers.insert(rejected_pubkey); + block.total_reject_weight = block + .total_reject_weight + .checked_add(signer_entry.weight) + .expect("FATAL: total weight rejected exceeds u32::MAX"); + + info!("StackerDBListener: Signer rejected block"; + "block_signer_sighash" => %rejected_data.signer_signature_hash, + "signer_pubkey" => rejected_pubkey.to_hex(), + "signer_slot_id" => slot_id, + "signature" => %rejected_data.signature, + "signer_weight" => signer_entry.weight, + "total_weight_signed" => block.total_weight_signed, + "reason" => rejected_data.reason, + "reason_code" => %rejected_data.reason_code, + "tenure_extend_timestamp" => rejected_data.response_data.tenure_extend_timestamp, + "server_version" => rejected_data.metadata.server_version, + ); + + if block + .total_reject_weight + .saturating_add(self.weight_threshold) + > self.total_weight + { + // Signal to anyone waiting on this block that we have enough rejections + cvar.notify_all(); + } + + // Update the idle timestamp for this signer + self.update_idle_timestamp( + signer_pubkey, + rejected_data.response_data.tenure_extend_timestamp, + signer_entry.weight, + ); + } + SignerMessageV0::BlockProposal(_) => { + debug!("Received block proposal message. Ignoring."); + } + SignerMessageV0::BlockPushed(_) => { + debug!("Received block pushed message. Ignoring."); + } + SignerMessageV0::MockSignature(_) + | SignerMessageV0::MockProposal(_) + | SignerMessageV0::MockBlock(_) => { + debug!("Received mock message. Ignoring."); + } + }; + } + } + } + + fn update_idle_timestamp(&self, signer_pubkey: StacksPublicKey, timestamp: u64, weight: u32) { + let mut idle_timestamps = self + .signer_idle_timestamps + .lock() + .expect("FATAL: failed to lock idle timestamps"); + let timestamp_info = TimestampInfo { timestamp, weight }; + idle_timestamps.insert(signer_pubkey, timestamp_info); + } + + /// Do we ignore signer signatures? + #[cfg(test)] + fn fault_injection_ignore_signatures() -> bool { + TEST_IGNORE_SIGNERS.get() + } + + #[cfg(not(test))] + fn fault_injection_ignore_signatures() -> bool { + false + } +} + +impl Drop for StackerDBListener { + fn drop(&mut self) { + let stackerdb_channel = self + .stackerdb_channel + .lock() + .expect("FATAL: failed to lock stackerdb channel"); + stackerdb_channel.replace_receiver(self.receiver.take().expect( + "FATAL: lost possession of the StackerDB channel before dropping SignCoordinator", + )); + } +} + +impl StackerDBListenerComms { + /// Insert a block into the block status map with initial values. + pub fn insert_block(&self, block: &NakamotoBlockHeader) { + let (lock, _cvar) = &*self.blocks; + let mut blocks = lock.lock().expect("FATAL: failed to lock block status"); + let block_status = BlockStatus { + responded_signers: HashSet::new(), + gathered_signatures: BTreeMap::new(), + total_weight_signed: 0, + total_reject_weight: 0, + }; + blocks.insert(block.signer_signature_hash(), block_status); + } + + /// Get the status for `block` from the Stacker DB listener. + /// If the block is not found in the map, return an error. + /// If the block is found, call `condition` to check if the block status + /// satisfies the condition. + /// If the condition is satisfied, return the block status as + /// `Ok(Some(status))`. + /// If the condition is not satisfied, wait for it to be satisfied. + /// If the timeout is reached, return `Ok(None)`. + pub fn wait_for_block_status( + &self, + block_signer_sighash: &Sha512Trunc256Sum, + timeout: Duration, + condition: F, + ) -> Result, NakamotoNodeError> + where + F: Fn(&BlockStatus) -> bool, + { + let (lock, cvar) = &*self.blocks; + let blocks = lock.lock().expect("FATAL: failed to lock block status"); + + let (guard, timeout_result) = cvar + .wait_timeout_while(blocks, timeout, |map| { + let Some(status) = map.get(block_signer_sighash) else { + return true; + }; + condition(status) + }) + .expect("FATAL: failed to wait on block status cond var"); + + // If we timed out, return None + if timeout_result.timed_out() { + return Ok(None); + } + match guard.get(block_signer_sighash) { + Some(status) => Ok(Some(status.clone())), + None => Err(NakamotoNodeError::SigningCoordinatorFailure( + "Block not found in status map".into(), + )), + } + } + + /// Get the timestamp at which at least 70% of the signing power should be + /// willing to accept a time-based tenure extension. + pub fn get_tenure_extend_timestamp(&self, weight_threshold: u32) -> u64 { + let signer_idle_timestamps = self + .signer_idle_timestamps + .lock() + .expect("FATAL: failed to lock signer idle timestamps"); + debug!("SignerCoordinator: signer_idle_timestamps: {signer_idle_timestamps:?}"); + let mut idle_timestamps = signer_idle_timestamps.values().collect::>(); + idle_timestamps.sort_by_key(|info| info.timestamp); + let mut weight_sum = 0; + for info in idle_timestamps { + weight_sum += info.weight; + if weight_sum >= weight_threshold { + debug!("SignerCoordinator: 70% threshold reached for tenure extension timestamp"; + "timestamp" => info.timestamp, + ); + return info.timestamp; + } + } + + // We don't have enough information to reach a 70% threshold at any + // time, so return u64::MAX to indicate that we should not extend the + // tenure. + u64::MAX + } +} diff --git a/testnet/stacks-node/src/neon_node.rs b/testnet/stacks-node/src/neon_node.rs index b688db100d..1639f93c43 100644 --- a/testnet/stacks-node/src/neon_node.rs +++ b/testnet/stacks-node/src/neon_node.rs @@ -223,7 +223,7 @@ use crate::burnchains::{make_bitcoin_indexer, Error as BurnchainControllerError} use crate::chain_data::MinerStats; use crate::config::NodeConfig; use crate::globals::{NeonGlobals as Globals, RelayerDirective}; -use crate::nakamoto_node::sign_coordinator::SignCoordinator; +use crate::nakamoto_node::signer_coordinator::SignerCoordinator; use crate::run_loop::neon::RunLoop; use crate::run_loop::RegisteredKey; use crate::ChainTip; @@ -2364,7 +2364,7 @@ impl BlockMinerThread { let mut miners_stackerdb = StackerDBSession::new(&self.config.node.rpc_bind, miner_contract_id); - SignCoordinator::send_miners_message( + SignerCoordinator::send_miners_message( &mining_key, &burn_db, &self.burn_block, @@ -2392,7 +2392,7 @@ impl BlockMinerThread { }; info!("Sending mock block to stackerdb: {mock_block:?}"); - SignCoordinator::send_miners_message( + SignerCoordinator::send_miners_message( &mining_key, &burn_db, &self.burn_block, diff --git a/testnet/stacks-node/src/tests/signer/v0.rs b/testnet/stacks-node/src/tests/signer/v0.rs index 398c9aa2e5..0051902852 100644 --- a/testnet/stacks-node/src/tests/signer/v0.rs +++ b/testnet/stacks-node/src/tests/signer/v0.rs @@ -69,7 +69,7 @@ use crate::event_dispatcher::MinedNakamotoBlockEvent; use crate::nakamoto_node::miner::{ TEST_BLOCK_ANNOUNCE_STALL, TEST_BROADCAST_STALL, TEST_MINE_STALL, }; -use crate::nakamoto_node::sign_coordinator::TEST_IGNORE_SIGNERS; +use crate::nakamoto_node::stackerdb_listener::TEST_IGNORE_SIGNERS; use crate::neon::Counters; use crate::run_loop::boot_nakamoto; use crate::tests::nakamoto_integrations::{ @@ -2513,7 +2513,7 @@ fn signers_broadcast_signed_blocks() { }) .expect("Timed out waiting for first nakamoto block to be mined"); - TEST_IGNORE_SIGNERS.lock().unwrap().replace(true); + TEST_IGNORE_SIGNERS.set(true); let blocks_before = signer_test .running_nodes .nakamoto_blocks_mined @@ -2798,7 +2798,7 @@ fn empty_sortition_before_approval() { let stacks_height_before = info.stacks_tip_height; info!("Forcing miner to ignore signatures for next block"); - TEST_IGNORE_SIGNERS.lock().unwrap().replace(true); + TEST_IGNORE_SIGNERS.set(true); info!("Pausing block commits to trigger an empty sortition."); signer_test @@ -2851,7 +2851,7 @@ fn empty_sortition_before_approval() { .replace(false); info!("Stop ignoring signers and wait for the tip to advance"); - TEST_IGNORE_SIGNERS.lock().unwrap().replace(false); + TEST_IGNORE_SIGNERS.set(false); wait_for(60, || { let info = get_chain_info(&signer_test.running_nodes.conf); @@ -5608,7 +5608,7 @@ fn miner_recovers_when_broadcast_block_delay_across_tenures_occurs() { // broadcasted to the miner so it can end its tenure before block confirmation obtained // Clear the stackerdb chunks info!("Forcing miner to ignore block responses for block N+1"); - TEST_IGNORE_SIGNERS.lock().unwrap().replace(true); + TEST_IGNORE_SIGNERS.set(true); info!("Delaying signer block N+1 broadcasting to the miner"); TEST_PAUSE_BLOCK_BROADCAST.lock().unwrap().replace(true); test_observer::clear(); @@ -5735,7 +5735,7 @@ fn miner_recovers_when_broadcast_block_delay_across_tenures_occurs() { .expect("Timed out waiting for block proposal of N+1' block proposal"); info!("Allowing miner to accept block responses again. "); - TEST_IGNORE_SIGNERS.lock().unwrap().replace(false); + TEST_IGNORE_SIGNERS.set(false); info!("Allowing signers to broadcast block N+1 to the miner"); TEST_PAUSE_BLOCK_BROADCAST.lock().unwrap().replace(false);