Skip to content

Commit

Permalink
Fix block state transitions and update some comments
Browse files Browse the repository at this point in the history
Signed-off-by: Jacinta Ferrant <[email protected]>
  • Loading branch information
jferrant committed Dec 2, 2024
1 parent 1855264 commit 57848b3
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 43 deletions.
34 changes: 13 additions & 21 deletions stacks-signer/src/signerdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,17 +230,9 @@ impl BlockInfo {
}
match state {
BlockState::Unprocessed => false,
BlockState::LocallyAccepted => {
matches!(
prev_state,
BlockState::Unprocessed | BlockState::LocallyAccepted
)
}
BlockState::LocallyRejected => {
matches!(
prev_state,
BlockState::Unprocessed | BlockState::LocallyRejected
)
BlockState::LocallyAccepted | BlockState::LocallyRejected => {
!matches!(prev_state, BlockState::GloballyRejected)
&& !matches!(prev_state, BlockState::GloballyAccepted)
}
BlockState::GloballyAccepted => !matches!(prev_state, BlockState::GloballyRejected),
BlockState::GloballyRejected => !matches!(prev_state, BlockState::GloballyAccepted),
Expand Down Expand Up @@ -1245,7 +1237,14 @@ mod tests {
assert_eq!(block.state, BlockState::LocallyAccepted);
assert!(!block.check_state(BlockState::Unprocessed));
assert!(block.check_state(BlockState::LocallyAccepted));
assert!(!block.check_state(BlockState::LocallyRejected));
assert!(block.check_state(BlockState::LocallyRejected));
assert!(block.check_state(BlockState::GloballyAccepted));
assert!(block.check_state(BlockState::GloballyRejected));

block.move_to(BlockState::LocallyRejected).unwrap();
assert!(!block.check_state(BlockState::Unprocessed));
assert!(block.check_state(BlockState::LocallyAccepted));
assert!(block.check_state(BlockState::LocallyRejected));
assert!(block.check_state(BlockState::GloballyAccepted));
assert!(block.check_state(BlockState::GloballyRejected));

Expand All @@ -1257,15 +1256,8 @@ mod tests {
assert!(block.check_state(BlockState::GloballyAccepted));
assert!(!block.check_state(BlockState::GloballyRejected));

// Must manually override as will not be able to move from GloballyAccepted to LocallyAccepted
block.state = BlockState::LocallyRejected;
assert!(!block.check_state(BlockState::Unprocessed));
assert!(!block.check_state(BlockState::LocallyAccepted));
assert!(block.check_state(BlockState::LocallyRejected));
assert!(block.check_state(BlockState::GloballyAccepted));
assert!(block.check_state(BlockState::GloballyRejected));

block.move_to(BlockState::GloballyRejected).unwrap();
// Must manually override as will not be able to move from GloballyAccepted to GloballyRejected
block.state = BlockState::GloballyRejected;
assert!(!block.check_state(BlockState::Unprocessed));
assert!(!block.check_state(BlockState::LocallyAccepted));
assert!(!block.check_state(BlockState::LocallyRejected));
Expand Down
66 changes: 44 additions & 22 deletions stacks-signer/src/v0/signer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@ pub static TEST_PAUSE_BLOCK_BROADCAST: std::sync::Mutex<Option<bool>> = std::syn
/// Skip broadcasting the block to the network
pub static TEST_SKIP_BLOCK_BROADCAST: std::sync::Mutex<Option<bool>> = std::sync::Mutex::new(None);

#[cfg(any(test, feature = "testing"))]
/// Skip any block responses from other signers
pub static TEST_IGNORE_BLOCK_RESPONSES: std::sync::Mutex<Option<bool>> =
std::sync::Mutex::new(None);

/// The stacks signer registered for the reward cycle
#[derive(Debug)]
pub struct Signer {
Expand Down Expand Up @@ -476,10 +481,7 @@ impl Signer {
self.test_reject_block_proposal(block_proposal, &mut block_info, block_response);

if let Some(block_response) = block_response {
// We know proposal is invalid. Send rejection message, do not do further validation
if let Err(e) = block_info.mark_locally_rejected() {
warn!("{self}: Failed to mark block as locally rejected: {e:?}",);
};
// We know proposal is invalid. Send rejection message, do not do further validation and do not store it.
debug!("{self}: Broadcasting a block response to stacks node: {block_response:?}");
let res = self
.stackerdb
Expand Down Expand Up @@ -535,6 +537,10 @@ impl Signer {
stacks_client: &StacksClient,
block_response: &BlockResponse,
) {
#[cfg(any(test, feature = "testing"))]
if self.test_ignore_block_responses(block_response) {
return;
}
match block_response {
BlockResponse::Accepted(accepted) => {
self.handle_block_signature(stacks_client, accepted);
Expand Down Expand Up @@ -870,7 +876,7 @@ impl Signer {
// Not enough rejection signatures to make a decision
return;
}
debug!("{self}: {total_reject_weight}/{total_weight} signers voteed to reject the block {block_hash}");
debug!("{self}: {total_reject_weight}/{total_weight} signers voted to reject the block {block_hash}");
if let Err(e) = block_info.mark_globally_rejected() {
warn!("{self}: Failed to mark block as globally rejected: {e:?}",);
}
Expand Down Expand Up @@ -999,7 +1005,7 @@ impl Signer {
return;
};
// move block to LOCALLY accepted state.
// We only mark this GLOBALLY accepted if we manage to broadcast it...
// It is only considered globally accepted IFF we receive a new block event confirming it OR see the chain tip of the node advance to it.
if let Err(e) = block_info.mark_locally_accepted(true) {
// Do not abort as we should still try to store the block signature threshold
warn!("{self}: Failed to mark block as locally accepted: {e:?}");
Expand All @@ -1012,22 +1018,8 @@ impl Signer {
panic!("{self} Failed to write block to signerdb: {e}");
});
#[cfg(any(test, feature = "testing"))]
{
if *TEST_PAUSE_BLOCK_BROADCAST.lock().unwrap() == Some(true) {
// Do an extra check just so we don't log EVERY time.
warn!("Block broadcast is stalled due to testing directive.";
"block_id" => %block_info.block.block_id(),
"height" => block_info.block.header.chain_length,
);
while *TEST_PAUSE_BLOCK_BROADCAST.lock().unwrap() == Some(true) {
std::thread::sleep(std::time::Duration::from_millis(10));
}
info!("Block validation is no longer stalled due to testing directive.";
"block_id" => %block_info.block.block_id(),
"height" => block_info.block.header.chain_length,
);
}
}
self.test_pause_block_broadcast(&block_info);

self.broadcast_signed_block(stacks_client, block_info.block, &addrs_to_sigs);
if self
.submitted_block_proposal
Expand Down Expand Up @@ -1137,6 +1129,36 @@ impl Signer {
}
}

#[cfg(any(test, feature = "testing"))]
fn test_ignore_block_responses(&self, block_response: &BlockResponse) -> bool {
if *TEST_IGNORE_BLOCK_RESPONSES.lock().unwrap() == Some(true) {
warn!(
"{self}: Ignoring block response due to testing directive";
"block_response" => %block_response
);
return true;
}
false
}

#[cfg(any(test, feature = "testing"))]
fn test_pause_block_broadcast(&self, block_info: &BlockInfo) {
if *TEST_PAUSE_BLOCK_BROADCAST.lock().unwrap() == Some(true) {
// Do an extra check just so we don't log EVERY time.
warn!("{self}: Block broadcast is stalled due to testing directive.";
"block_id" => %block_info.block.block_id(),
"height" => block_info.block.header.chain_length,
);
while *TEST_PAUSE_BLOCK_BROADCAST.lock().unwrap() == Some(true) {
std::thread::sleep(std::time::Duration::from_millis(10));
}
info!("{self}: Block validation is no longer stalled due to testing directive.";
"block_id" => %block_info.block.block_id(),
"height" => block_info.block.header.chain_length,
);
}
}

/// Send a mock signature to stackerdb to prove we are still alive
fn mock_sign(&mut self, mock_proposal: MockProposal) {
info!("{self}: Mock signing mock proposal: {mock_proposal:?}");
Expand Down
21 changes: 21 additions & 0 deletions testnet/stacks-node/src/event_dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ use url::Url;

use super::config::{EventKeyType, EventObserverConfig};

#[cfg(test)]
pub static TEST_SKIP_BLOCK_ANNOUNCEMENT: std::sync::Mutex<Option<bool>> =
std::sync::Mutex::new(None);

#[derive(Debug, Clone)]
struct EventObserver {
/// Path to the database where pending payloads are stored. If `None`, then
Expand Down Expand Up @@ -1299,6 +1303,11 @@ impl EventDispatcher {

let mature_rewards = serde_json::Value::Array(mature_rewards_vec);

#[cfg(any(test, feature = "testing"))]
if test_skip_block_announcement(&block) {
return;
}

for (observer_id, filtered_events_ids) in dispatch_matrix.iter().enumerate() {
let filtered_events: Vec<_> = filtered_events_ids
.iter()
Expand Down Expand Up @@ -1695,6 +1704,18 @@ impl EventDispatcher {
}
}

#[cfg(any(test, feature = "testing"))]
fn test_skip_block_announcement(block: &StacksBlockEventData) -> bool {
if *TEST_SKIP_BLOCK_ANNOUNCEMENT.lock().unwrap() == Some(true) {
warn!(
"Skipping new block announcement due to testing directive";
"block_hash" => %block.block_hash
);
return true;
}
false
}

#[cfg(test)]
mod test {
use std::net::TcpListener;
Expand Down

0 comments on commit 57848b3

Please sign in to comment.