Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add SignerEvent::NewNakamotoBlock and do not update a block to GloballyAccepted until event arrives #5516

Open
wants to merge 19 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
1855264
Add SignerEvent::NewNakamotoBlock and do not update a block to Global…
jferrant Dec 2, 2024
57848b3
Fix block state transitions and update some comments
jferrant Dec 2, 2024
a0cbee6
Merge branch 'develop' of https://github.com/stacks-network/stacks-co…
jferrant Dec 2, 2024
bf70c9f
Remove unused change
jferrant Dec 3, 2024
d4860f8
Rename NewNakamotoBlock to NewBlock
jferrant Dec 3, 2024
b473984
Add global_acceptance_depends_on_block_announcement
jferrant Dec 3, 2024
f3cdabb
Merge branch 'develop' of https://github.com/stacks-network/stacks-co…
jferrant Dec 6, 2024
b8d41be
Fix test to confirm reorg accross boundaries is possible if block not…
jferrant Dec 6, 2024
6649c8c
Increase the block proposal timeout in block_commit_delay test
jferrant Dec 7, 2024
252d4f1
Merge branch 'develop' of https://github.com/stacks-network/stacks-co…
jferrant Dec 10, 2024
5c1ae90
Merge branch 'develop' of https://github.com/stacks-network/stacks-co…
jferrant Dec 12, 2024
f96a33f
Implement process_event function for SignerEvent<T>
jferrant Dec 12, 2024
36f0ab5
Merge branch 'develop' of https://github.com/stacks-network/stacks-co…
jferrant Dec 12, 2024
ef7cb90
CRC: move TestFlag related functions to seperate test modules
jferrant Dec 12, 2024
2a4371f
Merge branch 'develop' of https://github.com/stacks-network/stacks-co…
jferrant Dec 13, 2024
150bfce
Do not wait for an exact number of acceptance and rejections
jferrant Dec 13, 2024
29f4901
Merge branch 'develop' of https://github.com/stacks-network/stacks-co…
jferrant Dec 16, 2024
a50825c
Missing changes from failed merge
jferrant Dec 16, 2024
1305a53
Merge branch 'develop' into feat/signer-subscribe-to-block-events
hstove Dec 16, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/bitcoin-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ jobs:
- tests::signer::v0::block_validation_response_timeout
- tests::signer::v0::tenure_extend_after_bad_commit
- tests::signer::v0::block_proposal_max_age_rejections
- tests::signer::v0::global_acceptance_depends_on_block_announcement
- tests::nakamoto_integrations::burn_ops_integration_test
- tests::nakamoto_integrations::check_block_heights
- tests::nakamoto_integrations::clarity_burn_state
Expand Down
184 changes: 82 additions & 102 deletions libsigner/src/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,13 @@ pub enum SignerEvent<T: SignerEventTrait> {
/// the time at which this event was received by the signer's event processor
received_time: SystemTime,
},
/// A new processed Stacks block was received from the node with the given block hash
NewBlock {
/// The block header hash for the newly processed stacks block
block_hash: Sha512Trunc256Sum,
/// The block height for the newly processed stacks block
block_height: u64,
},
}

/// Trait to implement a stop-signaler for the event receiver thread.
Expand Down Expand Up @@ -298,29 +305,25 @@ impl<T: SignerEventTrait> EventReceiver<T> for SignerEventReceiver<T> {
&request.method(),
)));
}
debug!("Processing {} event", request.url());
if request.url() == "/stackerdb_chunks" {
process_stackerdb_event(event_receiver.local_addr, request)
.map_err(|e| {
error!("Error processing stackerdb_chunks message"; "err" => ?e);
e
})
process_event::<T, StackerDBChunksEvent>(request)
} else if request.url() == "/proposal_response" {
process_proposal_response(request)
process_event::<T, BlockValidateResponse>(request)
} else if request.url() == "/new_burn_block" {
process_new_burn_block_event(request)
process_event::<T, BurnBlockEvent>(request)
} else if request.url() == "/shutdown" {
event_receiver.stop_signal.store(true, Ordering::SeqCst);
return Err(EventError::Terminated);
Err(EventError::Terminated)
} else if request.url() == "/new_block" {
process_event::<T, BlockEvent>(request)
} else {
let url = request.url().to_string();
// `/new_block` is expected, but not specifically handled. do not log.
if &url != "/new_block" {
debug!(
"[{:?}] next_event got request with unexpected url {}, return OK so other side doesn't keep sending this",
event_receiver.local_addr,
url
);
}
debug!(
"[{:?}] next_event got request with unexpected url {}, return OK so other side doesn't keep sending this",
event_receiver.local_addr,
url
);
ack_dispatcher(request);
Err(EventError::UnrecognizedEvent(url))
}
Expand Down Expand Up @@ -385,12 +388,13 @@ fn ack_dispatcher(request: HttpRequest) {

// TODO: add tests from mutation testing results #4835
#[cfg_attr(test, mutants::skip)]
/// Process a stackerdb event from the node
fn process_stackerdb_event<T: SignerEventTrait>(
local_addr: Option<SocketAddr>,
mut request: HttpRequest,
) -> Result<SignerEvent<T>, EventError> {
fn process_event<T, E>(mut request: HttpRequest) -> Result<SignerEvent<T>, EventError>
where
T: SignerEventTrait,
E: serde::de::DeserializeOwned + TryInto<SignerEvent<T>, Error = EventError>,
{
let mut body = String::new();

if let Err(e) = request.as_reader().read_to_string(&mut body) {
error!("Failed to read body: {:?}", &e);
ack_dispatcher(request);
Expand All @@ -399,27 +403,12 @@ fn process_stackerdb_event<T: SignerEventTrait>(
&e
)));
}

debug!("Got stackerdb_chunks event"; "chunks_event_body" => %body);
let event: StackerDBChunksEvent = serde_json::from_slice(body.as_bytes())
// Regardless of whether we successfully deserialize, we should ack the dispatcher so they don't keep resending it
ack_dispatcher(request);
let json_event: E = serde_json::from_slice(body.as_bytes())
.map_err(|e| EventError::Deserialize(format!("Could not decode body to JSON: {:?}", &e)))?;

let event_contract_id = event.contract_id.clone();

let signer_event = match SignerEvent::try_from(event) {
Err(e) => {
info!(
"[{:?}] next_event got event from an unexpected contract id {}, return OK so other side doesn't keep sending this",
local_addr,
event_contract_id
);
ack_dispatcher(request);
return Err(e);
}
Ok(x) => x,
};

ack_dispatcher(request);
let signer_event: SignerEvent<T> = json_event.try_into()?;

Ok(signer_event)
}
Expand Down Expand Up @@ -466,78 +455,69 @@ impl<T: SignerEventTrait> TryFrom<StackerDBChunksEvent> for SignerEvent<T> {
}
}

/// Process a proposal response from the node
fn process_proposal_response<T: SignerEventTrait>(
mut request: HttpRequest,
) -> Result<SignerEvent<T>, EventError> {
debug!("Got proposal_response event");
let mut body = String::new();
if let Err(e) = request.as_reader().read_to_string(&mut body) {
error!("Failed to read body: {:?}", &e);
impl<T: SignerEventTrait> TryFrom<BlockValidateResponse> for SignerEvent<T> {
type Error = EventError;

if let Err(e) = request.respond(HttpResponse::empty(200u16)) {
error!("Failed to respond to request: {:?}", &e);
}
return Err(EventError::MalformedRequest(format!(
"Failed to read body: {:?}",
&e
)));
fn try_from(block_validate_response: BlockValidateResponse) -> Result<Self, Self::Error> {
Ok(SignerEvent::BlockValidationResponse(
block_validate_response,
))
}
}

let event: BlockValidateResponse = serde_json::from_slice(body.as_bytes())
.map_err(|e| EventError::Deserialize(format!("Could not decode body to JSON: {:?}", &e)))?;
#[derive(Debug, Deserialize)]
struct BurnBlockEvent {
burn_block_hash: String,
burn_block_height: u64,
reward_recipients: Vec<serde_json::Value>,
reward_slot_holders: Vec<String>,
burn_amount: u64,
}

if let Err(e) = request.respond(HttpResponse::empty(200u16)) {
error!("Failed to respond to request: {:?}", &e);
impl<T: SignerEventTrait> TryFrom<BurnBlockEvent> for SignerEvent<T> {
type Error = EventError;

fn try_from(burn_block_event: BurnBlockEvent) -> Result<Self, Self::Error> {
let burn_header_hash = burn_block_event
.burn_block_hash
.get(2..)
.ok_or_else(|| EventError::Deserialize("Hex string should be 0x prefixed".into()))
.and_then(|hex| {
BurnchainHeaderHash::from_hex(hex)
.map_err(|e| EventError::Deserialize(format!("Invalid hex string: {e}")))
})?;

Ok(SignerEvent::NewBurnBlock {
burn_height: burn_block_event.burn_block_height,
received_time: SystemTime::now(),
burn_header_hash,
})
}
}

Ok(SignerEvent::BlockValidationResponse(event))
#[derive(Debug, Deserialize)]
struct BlockEvent {
block_hash: String,
block_height: u64,
}

/// Process a new burn block event from the node
fn process_new_burn_block_event<T: SignerEventTrait>(
mut request: HttpRequest,
) -> Result<SignerEvent<T>, EventError> {
debug!("Got burn_block event");
let mut body = String::new();
if let Err(e) = request.as_reader().read_to_string(&mut body) {
error!("Failed to read body: {:?}", &e);
impl<T: SignerEventTrait> TryFrom<BlockEvent> for SignerEvent<T> {
type Error = EventError;

if let Err(e) = request.respond(HttpResponse::empty(200u16)) {
error!("Failed to respond to request: {:?}", &e);
}
return Err(EventError::MalformedRequest(format!(
"Failed to read body: {:?}",
&e
)));
}
#[derive(Debug, Deserialize)]
struct TempBurnBlockEvent {
burn_block_hash: String,
burn_block_height: u64,
reward_recipients: Vec<serde_json::Value>,
reward_slot_holders: Vec<String>,
burn_amount: u64,
}
let temp: TempBurnBlockEvent = serde_json::from_slice(body.as_bytes())
.map_err(|e| EventError::Deserialize(format!("Could not decode body to JSON: {:?}", &e)))?;
let burn_header_hash = temp
.burn_block_hash
.get(2..)
.ok_or_else(|| EventError::Deserialize("Hex string should be 0x prefixed".into()))
.and_then(|hex| {
BurnchainHeaderHash::from_hex(hex)
.map_err(|e| EventError::Deserialize(format!("Invalid hex string: {e}")))
})?;
let event = SignerEvent::NewBurnBlock {
burn_height: temp.burn_block_height,
received_time: SystemTime::now(),
burn_header_hash,
};
if let Err(e) = request.respond(HttpResponse::empty(200u16)) {
error!("Failed to respond to request: {:?}", &e);
fn try_from(block_event: BlockEvent) -> Result<Self, Self::Error> {
let block_hash: Sha512Trunc256Sum = block_event
.block_hash
.get(2..)
.ok_or_else(|| EventError::Deserialize("Hex string should be 0x prefixed".into()))
.and_then(|hex| {
Sha512Trunc256Sum::from_hex(hex)
.map_err(|e| EventError::Deserialize(format!("Invalid hex string: {e}")))
})?;
Ok(SignerEvent::NewBlock {
block_hash,
block_height: block_event.block_height,
})
}
Ok(event)
}

pub fn get_signers_db_signer_set_message_id(name: &str) -> Option<(u32, u32)> {
Expand Down
3 changes: 3 additions & 0 deletions stacks-common/src/util/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ use std::path::Path;
use std::time::{SystemTime, UNIX_EPOCH};
use std::{error, fmt, thread, time};

#[cfg(any(test, feature = "testing"))]
jferrant marked this conversation as resolved.
Show resolved Hide resolved
pub mod tests;

pub fn get_epoch_time_secs() -> u64 {
let start = SystemTime::now();
let since_the_epoch = start
Expand Down
99 changes: 99 additions & 0 deletions stacks-common/src/util/tests.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
// Copyright (C) 2020-2024 Stacks Open Internet Foundation
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use std::sync::{Arc, Mutex};
/// `TestFlag` is a thread-safe utility designed for managing shared state in testing scenarios. It wraps
/// a value of type `T` inside an `Arc<Mutex<Option<T>>>`, allowing you to set and retrieve a value
/// across different parts of your codebase while ensuring thread safety.
///
/// This structure is particularly useful when:
/// - You need a global or static variable in tests.
/// - You want to control the execution of custom test code paths by setting and checking a shared value.
///
/// # Type Parameter
/// - `T`: The type of the value managed by the `TestFlag`. It must implement the `Default` and `Clone` traits.
///
/// # Examples
///
/// ```rust
/// use stacks_common::util::tests::TestFlag;
/// use std::sync::{Arc, Mutex};
///
/// // Create a TestFlag instance
/// let test_flag = TestFlag::default();
///
/// // Set a value in the test flag
/// test_flag.set("test_value".to_string());
///
/// // Retrieve the value
/// assert_eq!(test_flag.get(), "test_value".to_string());
///
/// // Reset the value to default
/// test_flag.set("".to_string());
/// assert_eq!(test_flag.get(), "".to_string());
/// ```
#[derive(Clone)]
pub struct TestFlag<T>(pub Arc<Mutex<Option<T>>>);

impl<T: Default + Clone> Default for TestFlag<T> {
fn default() -> Self {
Self(Arc::new(Mutex::new(None)))
}
}

impl<T: Default + Clone> TestFlag<T> {
/// Sets the value of the test flag.
///
/// This method updates the value stored inside the `TestFlag`, replacing any existing value.
///
/// # Arguments
/// - `value`: The new value to set for the `TestFlag`.
///
/// # Examples
///
/// ```rust
/// let test_flag = TestFlag::default();
/// test_flag.set(42);
/// assert_eq!(test_flag.get(), 42);
/// ```
pub fn set(&self, value: T) {
*self.0.lock().unwrap() = Some(value);
}

/// Retrieves the current value of the test flag.
///
/// If no value has been set, this method returns the default value for the type `T`.
///
/// # Returns
/// - The current value of the test flag, or the default value of `T` if none has been set.
///
/// # Examples
///
/// ```rust
/// let test_flag = TestFlag::default();
///
/// // Get the default value
/// assert_eq!(test_flag.get(), 0); // For T = i32, default is 0
///
/// // Set a value
/// test_flag.set(123);
///
/// // Get the updated value
/// assert_eq!(test_flag.get(), 123);
/// ```
pub fn get(&self) -> T {
self.0.lock().unwrap().clone().unwrap_or_default().clone()
}
}
1 change: 1 addition & 0 deletions stacks-signer/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ and this project adheres to the versioning scheme outlined in the [README.md](RE

## Changed
- Improvements to the stale signer cleanup logic: deletes the prior signer if it has no remaining unprocessed blocks in its database
- Signers now listen to new block events from the stacks node to determine whether a block has been successfully appended to the chain tip

## [3.1.0.0.1.0]

Expand Down
2 changes: 1 addition & 1 deletion stacks-signer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ serde_stacker = "0.1"
slog = { version = "2.5.2", features = [ "max_level_trace" ] }
slog-json = { version = "2.3.0", optional = true }
slog-term = "2.6.0"
stacks-common = { path = "../stacks-common" }
stacks-common = { path = "../stacks-common", features = ["testing"] }
stackslib = { path = "../stackslib" }
thiserror = { workspace = true }
tiny_http = { version = "0.12", optional = true }
Expand Down
Loading
Loading