Skip to content

Commit

Permalink
Split "on_success_quorum" into smaller and more manageable functions
Browse files Browse the repository at this point in the history
  • Loading branch information
autholykos committed Oct 10, 2024
1 parent 9e015dc commit 7f90d32
Showing 1 changed file with 94 additions and 69 deletions.
163 changes: 94 additions & 69 deletions node/src/chain/fsm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -302,80 +302,105 @@ impl<N: Network, DB: database::DB, VM: vm::VMExecution> SimpleFSM<N, DB, VM> {
qmsg: &Quorum,
metadata: Option<Metadata>,
) {
// Clean up attestation cache
self.clean_att_cache();

if let RatificationResult::Success(Vote::Valid(candidate)) =
qmsg.att.result
{
let db = self.acc.read().await.db.clone();
let tip_header = self.acc.read().await.tip_header().await;
let tip_height = tip_header.height;
let quorum_height = qmsg.header.round;

let quorum_blk = if quorum_height > tip_height + 1 {
// Quorum from future

// We do not check the db because we currently do not store
// candidates from the future
None
} else if (quorum_height == tip_height + 1)
|| (quorum_height == tip_height && tip_header.hash != candidate)
{
// If Quorum is for at height tip+1 or tip (but for a different
// candidate) we try to fetch the candidate from the DB
let res = db
.read()
.await
.view(|t| t.fetch_candidate_block(&candidate));

match res {
Ok(b) => b,
Err(_) => None,
}
} else {
// INFO: we currently ignore Quorum messages from the past
None
};

let attestation = qmsg.att;

if let Some(mut blk) = quorum_blk {
// Candidate found. We can build the "full" block
info!(
event = "new block",
src = "quorum_msg",
blk_height = blk.header().height,
blk_hash = to_str(&blk.header().hash),
);

// Attach the Attestation to the block
blk.set_attestation(attestation);

// Handle the new block
let res = self.on_block_event(blk, metadata).await;
match res {
Ok(_) => {}
Err(e) => {
error!("Error on block handling: {e}");
}
}
} else {
// Candidate block not found
debug!(
event = "Candidate not found. Requesting it to the network",
hash = to_str(&candidate),
height = quorum_height,
);

// Cache the attestation and request the candidate from the
// network.
self.flood_request_block(candidate, attestation).await;

let (candidate, attestation) = match qmsg.att.result {
RatificationResult::Success(Vote::Valid(candidate)) => (candidate, qmsg.att),
_ => {
error!("Invalid Quorum message");
return;
}
};

let quorum_blk = self.fetch_quorum_block(candidate, qmsg.header.round).await;

match quorum_blk {
Some(blk) => self.handle_found_quorum_block(blk, attestation, metadata).await,
None => self.handle_missing_quorum_block(candidate, attestation, qmsg.header.round).await,
}
}

/// This function encapsulates the logic for fetching a quorum block based on the candidate hash and round number. Here's a breakdown of what it does:
/// 1. It first acquires the necessary data: the database, tip header, and tip height.
/// 2. It then checks the round number against the tip height:
/// - If the round is from the future (greater than tip height + 1), or in the past (less than tip height), it returns None.
/// - If the round is either for the next block (tip height + 1) or for the current height but with a different hash, it attempts to fetch the candidate block from the database.
/// 3. When fetching from the database, it uses the view method to access the transaction, then calls fetch_candidate_block. The ok().flatten() at the end handles both the Result from the database operation and the Option from fetch_candidate_block.
async fn fetch_quorum_block(
&self,
candidate: [u8; 32],
round: u64,
) -> Option<Block> {
let db = self.acc.read().await.db.clone();
let tip_header = self.acc.read().await.tip_header().await;
let tip_height = tip_header.height;

if (round == tip_height + 1) || (round == tip_height && tip_header.hash != candidate) {
// Quorum from height tip+1 or tip but for a different candidate
// We try to fetch the candidate from the DB
db.read()
.await
.view(|t| t.fetch_candidate_block(&candidate))
.ok()
.flatten()
} else {
error!("Invalid Quorum message");
// Quorum from the future or the past
None
}
}

/// Handles the case where a quorum block is found in the database.
/// It performs the following actions:
/// 1. Logs the event
/// 2. Attaches the attestation to the block
/// 3. Passes the block to the on_block_event method for further processing
async fn handle_found_quorum_block(
&mut self,
mut blk: Block,
attestation: Attestation,
metadata: Option<Metadata>,
) {
// Candidate found. We can build the "full" block
info!(
event = "new block",
src = "quorum_msg",
blk_height = blk.header().height,
blk_hash = to_str(&blk.header().hash),
);

// Attach the Attestation to the block
blk.set_attestation(attestation);

// Handle the new block
match self.on_block_event(blk, metadata).await {
Ok(_) => {}
Err(e) => {
error!("Error on block handling: {e}");
}
}
}

/// Handles the case where a quorum block is not found in the database.
/// It performs the following actions:
/// 1. Logs the event
/// 2. Caches the attestation
/// 3. Requests the candidate block from the network
async fn handle_missing_quorum_block(
&mut self,
candidate: [u8; 32],
attestation: Attestation,
quorum_height: u64,
) {
// Candidate block not found
debug!(
event = "Candidate not found. Requesting it from the network",
hash = to_str(&candidate),
height = quorum_height,
);

// Cache the attestation and request the candidate from the network
self.flood_request_block(candidate, attestation).await;
}

pub(crate) async fn on_heartbeat_event(&mut self) -> anyhow::Result<()> {
self.stalled_sm.on_heartbeat_event().await;
Expand Down

0 comments on commit 7f90d32

Please sign in to comment.