Skip to content

Commit

Permalink
Make chain manager track the locked block's blobs. (#3006)
Browse files Browse the repository at this point in the history
* Make test_finalize_locked_block_with_blobs harder. (Fails now.)

* In the chain manager, store exactly the locked block's blobs.

* pending_blobs → locked_blobs

* Deduplicate loading the remaining blobs.

* In the updater, send only the required blobs.

* Apply the proposal size limit to read blobs, too.

* Use handle_certificate; add comments.

* Rename block reader functions; merge required_blobs lines.

* Update linera-core/src/local_node.rs

Co-authored-by: deuszx <[email protected]>
Signed-off-by: Andreas Fackler <[email protected]>

---------

Signed-off-by: Andreas Fackler <[email protected]>
Co-authored-by: deuszx <[email protected]>
  • Loading branch information
afck and deuszx authored Dec 5, 2024
1 parent c260aeb commit bc250ca
Show file tree
Hide file tree
Showing 11 changed files with 208 additions and 231 deletions.
42 changes: 21 additions & 21 deletions linera-chain/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ use serde::{Deserialize, Serialize};

use crate::{
block::{ConfirmedBlock, Timeout, ValidatedBlock},
data_types::{Block, BlockExecutionOutcome, BlockProposal, LiteVote, ProposalContent, Vote},
data_types::{Block, BlockProposal, ExecutedBlock, LiteVote, Vote},
types::{Hashed, TimeoutCertificate, ValidatedBlockCertificate},
ChainError,
};
Expand Down Expand Up @@ -119,6 +119,9 @@ pub struct ChainManager {
/// validator).
#[debug(skip_if = Option::is_none)]
pub locked: Option<ValidatedBlockCertificate>,
/// These are blobs published or read by the locked block.
#[debug(skip_if = BTreeMap::is_empty)]
pub locked_blobs: BTreeMap<BlobId, Blob>,
/// Latest leader timeout certificate we have received.
#[debug(skip_if = Option::is_none)]
pub timeout: Option<TimeoutCertificate>,
Expand Down Expand Up @@ -147,9 +150,6 @@ pub struct ChainManager {
/// The owners that take over in fallback mode.
#[debug(skip_if = BTreeMap::is_empty)]
pub fallback_owners: BTreeMap<Owner, (PublicKey, u64)>,
/// These are blobs belonging to proposed or validated blocks that have not been confirmed yet.
#[debug(skip_if = BTreeMap::is_empty)]
pub pending_blobs: BTreeMap<BlobId, Blob>,
}

doc_scalar!(
Expand Down Expand Up @@ -219,7 +219,7 @@ impl ChainManager {
round_timeout,
current_round,
fallback_owners,
pending_blobs: BTreeMap::new(),
locked_blobs: BTreeMap::new(),
})
}

Expand Down Expand Up @@ -399,33 +399,31 @@ impl ChainManager {
pub fn create_vote(
&mut self,
proposal: BlockProposal,
outcome: BlockExecutionOutcome,
executed_block: ExecutedBlock,
key_pair: Option<&KeyPair>,
local_time: Timestamp,
blobs: BTreeMap<BlobId, Blob>,
) -> Option<Either<&Vote<ValidatedBlock>, &Vote<ConfirmedBlock>>> {
// Record the proposed block, so it can be supplied to clients that request it.
self.proposed = Some(proposal.clone());
self.update_current_round(local_time);
let ProposalContent { block, round, .. } = proposal.content;
let executed_block = outcome.with(block);
let round = proposal.content.round;

// If the validated block certificate is more recent, update our locked block.
if let Some(lite_cert) = proposal.validated_block_certificate {
if let Some(lite_cert) = &proposal.validated_block_certificate {
if self
.locked
.as_ref()
.map_or(true, |locked| locked.round < lite_cert.round)
{
let value = Hashed::new(ValidatedBlock::new(executed_block.clone()));
if let Some(certificate) = lite_cert.with_value(value) {
if let Some(certificate) = lite_cert.clone().with_value(value) {
self.locked = Some(certificate);
self.locked_blobs = blobs;
}
}
}

for blob in proposal.blobs {
self.pending_blobs.insert(blob.id(), blob);
}
// Record the proposed block, so it can be supplied to clients that request it.
self.proposed = Some(proposal);
self.update_current_round(local_time);

if let Some(key_pair) = key_pair {
// If this is a fast block, vote to confirm. Otherwise vote to validate.
Expand Down Expand Up @@ -455,6 +453,7 @@ impl ChainManager {
validated: ValidatedBlockCertificate,
key_pair: Option<&KeyPair>,
local_time: Timestamp,
blobs: BTreeMap<BlobId, Blob>,
) {
let round = validated.round;
// Validators only change their locked block if the new one is included in a proposal in the
Expand All @@ -464,6 +463,7 @@ impl ChainManager {
}
let confirmed_block = ConfirmedBlock::new(validated.inner().executed_block().clone());
self.locked = Some(validated);
self.locked_blobs = blobs;
self.update_current_round(local_time);
if let Some(key_pair) = key_pair {
// Vote to confirm.
Expand Down Expand Up @@ -623,9 +623,9 @@ pub struct ChainManagerInfo {
/// The timestamp when the current round times out.
#[debug(skip_if = Option::is_none)]
pub round_timeout: Option<Timestamp>,
/// These are blobs belonging to proposed or validated blocks that have not been confirmed yet.
#[debug(skip_if = BTreeMap::is_empty)]
pub pending_blobs: BTreeMap<BlobId, Blob>,
/// These are blobs belonging to the locked block.
#[debug(skip_if = Vec::is_empty)]
pub locked_blobs: Vec<Blob>,
}

impl From<&ChainManager> for ChainManagerInfo {
Expand All @@ -649,7 +649,7 @@ impl From<&ChainManager> for ChainManagerInfo {
current_round,
leader: manager.round_leader(current_round).cloned(),
round_timeout: manager.round_timeout,
pending_blobs: BTreeMap::new(),
locked_blobs: Vec::new(),
}
}
}
Expand All @@ -659,6 +659,7 @@ impl ChainManagerInfo {
pub fn add_values(&mut self, manager: &ChainManager) {
self.requested_proposed = manager.proposed.clone().map(Box::new);
self.requested_locked = manager.locked.clone().map(Box::new);
self.locked_blobs = manager.locked_blobs.values().cloned().collect();
self.requested_confirmed = manager
.confirmed_vote
.as_ref()
Expand All @@ -667,7 +668,6 @@ impl ChainManagerInfo {
.validated_vote
.as_ref()
.map(|vote| Box::new(vote.value.clone()));
self.pending_blobs = manager.pending_blobs.clone();
}

/// Returns whether the `identity` is allowed to propose a block in `round`.
Expand Down
33 changes: 15 additions & 18 deletions linera-core/src/chain_worker/state/attempted_changes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,17 @@ where
local_time: Timestamp,
) -> Result<(), WorkerError> {
// Create the vote and store it in the chain state.
let executed_block = outcome.with(proposal.content.block.clone());
let blobs = if proposal.validated_block_certificate.is_some() {
self.state
.get_required_blobs(&executed_block, &proposal.blobs)
.await?
} else {
BTreeMap::new()
};
let key_pair = self.state.config.key_pair();
let manager = self.state.chain.manager.get_mut();
match manager.create_vote(proposal, outcome, self.state.config.key_pair(), local_time) {
match manager.create_vote(proposal, executed_block, key_pair, local_time, blobs) {
// Cache the value we voted on, so the client doesn't have to send it again.
Some(Either::Left(vote)) => {
self.state
Expand Down Expand Up @@ -206,18 +215,13 @@ where
// Verify that no unrelated blobs were provided.
self.state
.check_for_unneeded_blobs(&required_blob_ids, blobs)?;
let remaining_required_blob_ids = required_blob_ids
.difference(&blobs.iter().map(|blob| blob.id()).collect())
.cloned()
.collect();
self.state
.check_no_missing_blobs(&remaining_required_blob_ids)
.await?;
let blobs = self.state.get_required_blobs(executed_block, blobs).await?;
let old_round = self.state.chain.manager.get().current_round;
self.state.chain.manager.get_mut().create_final_vote(
certificate,
self.state.config.key_pair(),
self.state.storage.clock().current_time(),
blobs,
);
let info = ChainInfoResponse::new(&self.state.chain, self.state.config.key_pair());
self.save().await?;
Expand Down Expand Up @@ -297,21 +301,14 @@ where
// Verify that no unrelated blobs were provided.
self.state
.check_for_unneeded_blobs(&required_blob_ids, blobs)?;
let remaining_required_blob_ids = required_blob_ids
.difference(&blobs.iter().map(|blob| blob.id()).collect())
.cloned()
.collect();
let mut blobs_in_block = self
.state
.get_blobs_and_checks_storage(&remaining_required_blob_ids)
.await?;
blobs_in_block.extend_from_slice(blobs);
let blobs = self.state.get_required_blobs(executed_block, blobs).await?;
let blobs = blobs.into_values().collect::<Vec<_>>();

let certificate_hash = certificate.hash();

self.state
.storage
.write_blobs_and_certificate(&blobs_in_block, &certificate)
.write_blobs_and_certificate(&blobs, &certificate)
.await?;

// Update the blob state with last used certificate hash.
Expand Down
86 changes: 41 additions & 45 deletions linera-core/src/chain_worker/state/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -309,31 +309,6 @@ where
Ok(())
}

/// Returns an error if the block requires a blob we don't have.
/// Looks for the blob in: chain manager's pending blobs and storage.
async fn check_no_missing_blobs(
&self,
required_blob_ids: &HashSet<BlobId>,
) -> Result<(), WorkerError> {
let pending_blobs = &self.chain.manager.get().pending_blobs;
let missing_blob_ids = required_blob_ids
.iter()
.filter(|blob_id| !pending_blobs.contains_key(blob_id))
.cloned()
.collect::<Vec<_>>();

let missing_blob_ids = self
.storage
.missing_blobs(missing_blob_ids.as_slice())
.await?;

if missing_blob_ids.is_empty() {
return Ok(());
}

Err(WorkerError::BlobsNotFound(missing_blob_ids))
}

/// Returns an error if unrelated blobs were provided.
fn check_for_unneeded_blobs(
&self,
Expand All @@ -352,30 +327,51 @@ where
Ok(())
}

/// Returns the blobs requested by their `blob_ids` that are in the chain manager's pending blobs
/// and checks that they are otherwise in storage.
async fn get_blobs_and_checks_storage(
/// Returns the blobs required by the given executed block. The ones that are not passed in
/// are read from the chain manager or from storage.
async fn get_required_blobs(
&self,
blob_ids: &HashSet<BlobId>,
) -> Result<Vec<Blob>, WorkerError> {
let pending_blobs = &self.chain.manager.get().pending_blobs;

let mut found_blobs = Vec::new();
let mut missing_blob_ids = Vec::new();
for blob_id in blob_ids {
if let Some(blob) = pending_blobs.get(blob_id) {
found_blobs.push(blob.clone());
} else {
missing_blob_ids.push(*blob_id);
executed_block: &ExecutedBlock,
blobs: &[Blob],
) -> Result<BTreeMap<BlobId, Blob>, WorkerError> {
let mut blob_ids = executed_block.required_blob_ids();
let manager = self.chain.manager.get();

let mut found_blobs = BTreeMap::new();

for blob in manager
.proposed
.iter()
.flat_map(|proposal| &proposal.blobs)
.chain(blobs)
{
if blob_ids.remove(&blob.id()) {
found_blobs.insert(blob.id(), blob.clone());
}
}
let not_found_blob_ids = self.storage.missing_blobs(&missing_blob_ids).await?;

if not_found_blob_ids.is_empty() {
Ok(found_blobs)
} else {
Err(WorkerError::BlobsNotFound(not_found_blob_ids))
blob_ids.retain(|blob_id| {
if let Some(blob) = manager.locked_blobs.get(blob_id) {
found_blobs.insert(*blob_id, blob.clone());
false
} else {
true
}
});
let missing_blob_ids = blob_ids.into_iter().collect::<Vec<_>>();
let blobs_from_storage = self.storage.read_blobs(&missing_blob_ids).await?;
let mut not_found_blob_ids = Vec::new();
for (blob_id, maybe_blob) in missing_blob_ids.into_iter().zip(blobs_from_storage) {
if let Some(blob) = maybe_blob {
found_blobs.insert(blob_id, blob);
} else {
not_found_blob_ids.push(blob_id);
}
}
ensure!(
not_found_blob_ids.is_empty(),
WorkerError::BlobsNotFound(not_found_blob_ids)
);
Ok(found_blobs)
}

/// Adds any newly created chains to the set of `tracked_chains`, if the parent chain is
Expand Down
18 changes: 13 additions & 5 deletions linera-core/src/chain_worker/state/temporary_changes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,6 @@ where
published_blob_ids.iter().copied().eq(provided_blob_ids),
WorkerError::WrongBlobsInProposal
);
block.check_proposal_size(policy.maximum_block_proposal_size, blobs)?;
for blob in blobs {
Self::check_blob_size(blob.content(), &policy)?;
}
Expand All @@ -233,26 +232,35 @@ where
forced_oracle_responses.clone(),
))
.await?;

let executed_block = outcome.with(block.clone());
let required_blobs = self
.0
.get_required_blobs(&executed_block, blobs)
.await?
.into_values()
.collect::<Vec<_>>();
block.check_proposal_size(policy.maximum_block_proposal_size, &required_blobs)?;
if let Some(lite_certificate) = &validated_block_certificate {
let value = Hashed::new(ValidatedBlock::new(outcome.clone().with(block.clone())));
let value = Hashed::new(ValidatedBlock::new(executed_block.clone()));
lite_certificate
.clone()
.with_value(value)
.ok_or_else(|| WorkerError::InvalidLiteCertificate)?;
}
ensure!(
!round.is_fast() || !outcome.has_oracle_responses(),
!round.is_fast() || !executed_block.outcome.has_oracle_responses(),
WorkerError::FastBlockUsingOracles
);
// Check if the counters of tip_state would be valid.
self.0
.chain
.tip_state
.get()
.verify_counters(block, &outcome)?;
.verify_counters(block, &executed_block.outcome)?;
// Verify that the resulting chain would have no unconfirmed incoming messages.
self.0.chain.validate_incoming_bundles().await?;
Ok(Some((outcome, local_time)))
Ok(Some((executed_block.outcome, local_time)))
}

/// Prepares a [`ChainInfoResponse`] for a [`ChainInfoQuery`].
Expand Down
Loading

0 comments on commit bc250ca

Please sign in to comment.