Skip to content

Commit

Permalink
Blob use is only an oracle the first time per chain. (#2972)
Browse files Browse the repository at this point in the history
* Blob use is only an oracle the first time per chain.

* Publishing is not an oracle call.

* Update handle_certificates_to_create_application test.

* blob_used and blob_published
  • Loading branch information
afck authored Nov 27, 2024
1 parent 4cac5fb commit a7c3228
Show file tree
Hide file tree
Showing 10 changed files with 99 additions and 49 deletions.
4 changes: 2 additions & 2 deletions linera-chain/src/certificate/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ impl CertificateValueT for ValidatedBlock {
}

fn required_blob_ids(&self) -> HashSet<BlobId> {
self.inner().outcome.required_blob_ids().clone()
self.inner().required_blob_ids()
}

#[cfg(with_testing)]
Expand All @@ -198,7 +198,7 @@ impl CertificateValueT for ConfirmedBlock {
}

fn required_blob_ids(&self) -> HashSet<BlobId> {
self.executed_block().outcome.required_blob_ids().clone()
self.executed_block().required_blob_ids()
}
}

Expand Down
9 changes: 6 additions & 3 deletions linera-chain/src/data_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -697,11 +697,14 @@ impl ExecutedBlock {
}

pub fn required_blob_ids(&self) -> HashSet<BlobId> {
self.outcome.required_blob_ids()
let mut blob_ids = self.outcome.oracle_blob_ids();
blob_ids.extend(self.block.published_blob_ids());
blob_ids
}

pub fn requires_blob(&self, blob_id: &BlobId) -> bool {
self.required_blob_ids().contains(blob_id)
self.outcome.oracle_blob_ids().contains(blob_id)
|| self.block.published_blob_ids().contains(blob_id)
}
}

Expand All @@ -713,7 +716,7 @@ impl BlockExecutionOutcome {
}
}

pub fn required_blob_ids(&self) -> HashSet<BlobId> {
pub fn oracle_blob_ids(&self) -> HashSet<BlobId> {
let mut required_blob_ids = HashSet::new();
for responses in &self.oracle_responses {
for response in responses {
Expand Down
12 changes: 7 additions & 5 deletions linera-core/src/unit_tests/client_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1435,7 +1435,8 @@ where
.await;
assert_matches!(
result,
Err(ChainClientError::RemoteNodeError(NodeError::BlobsNotFound(not_found_blob_ids))) if not_found_blob_ids == [blob0_id]
Err(ChainClientError::RemoteNodeError(NodeError::BlobsNotFound(not_found_blob_ids)))
if not_found_blob_ids == [blob0_id]
);

// Take one validator down
Expand All @@ -1456,16 +1457,17 @@ where
// But another one goes down
builder.set_fault_type([3], FaultType::Offline).await;

// Try to read the blob. This is a different client but on the same chain, so when we synchronize this with the validators
// before executing the block, we'll actually download and cache locally the blobs that were published by `client_a`.
// So this will succeed.
// Try to read the blob. This is a different client but on the same chain, so when we
// synchronize this with the validators before executing the block, we'll actually download
// and cache locally the blobs that were published by `client_a`. So this will succeed.
client1_b.prepare_chain().await?;
let certificate = client1_b
.execute_operation(SystemOperation::ReadBlob { blob_id: blob0_id }.into())
.await?
.unwrap();
assert_eq!(certificate.round, Round::MultiLeader(0));
assert!(certificate.executed_block().requires_blob(&blob0_id));
// The blob is not new on this chain, so it is not required.
assert!(!certificate.executed_block().requires_blob(&blob0_id));

builder
.set_fault_type([0, 1, 2], FaultType::DontSendConfirmVote)
Expand Down
8 changes: 4 additions & 4 deletions linera-core/src/unit_tests/wasm_worker_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
#![allow(clippy::large_futures)]
#![cfg(any(feature = "wasmer", feature = "wasmtime"))]

use std::collections::BTreeSet;

use linera_base::{
crypto::KeyPair,
data_types::{
Expand Down Expand Up @@ -135,6 +137,7 @@ where
committees: [(Epoch::ZERO, committee.clone())].into_iter().collect(),
ownership: ChainOwnership::single(publisher_key_pair.public()),
timestamp: Timestamp::from(1),
used_blobs: BTreeSet::from([contract_blob_id, service_blob_id]),
..SystemExecutionState::new(Epoch::ZERO, publisher_chain, admin_id)
};
let publisher_state_hash = publisher_system_state.clone().into_hash().await;
Expand All @@ -143,10 +146,7 @@ where
messages: vec![Vec::new()],
events: vec![Vec::new()],
state_hash: publisher_state_hash,
oracle_responses: vec![vec![
OracleResponse::Blob(contract_blob_id),
OracleResponse::Blob(service_blob_id),
]],
oracle_responses: vec![vec![]],
}
.with(publish_block),
);
Expand Down
2 changes: 0 additions & 2 deletions linera-core/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,8 +199,6 @@ pub enum WorkerError {
MissingCertificateValue,
#[error("The hash certificate doesn't match its value.")]
InvalidLiteCertificate,
#[error("An additional value was provided that is not required: {value_hash}.")]
UnneededValue { value_hash: CryptoHash },
#[error("An additional blob was provided that is not required: {blob_id}.")]
UnneededBlob { blob_id: BlobId },
#[error("The blobs provided in the proposal were not the published ones, in order.")]
Expand Down
3 changes: 3 additions & 0 deletions linera-execution/src/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,9 @@ where
.register_application(application_description)
.await?;

self.system.used_blobs.insert(&contract_blob.id())?;
self.system.used_blobs.insert(&service_blob.id())?;

self.context()
.extra()
.user_contracts()
Expand Down
9 changes: 5 additions & 4 deletions linera-execution/src/execution_state_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -311,12 +311,13 @@ where

ReadBlobContent { blob_id, callback } => {
let blob = self.system.read_blob_content(blob_id).await?;
callback.respond(blob);
let is_new = self.system.blob_used(None, blob_id).await?;
callback.respond((blob, is_new))
}

AssertBlobExists { blob_id, callback } => {
self.system.assert_blob_exists(blob_id).await?;
callback.respond(())
callback.respond(self.system.blob_used(None, blob_id).await?)
}
}

Expand Down Expand Up @@ -480,12 +481,12 @@ pub enum ExecutionRequest {
ReadBlobContent {
blob_id: BlobId,
#[debug(skip)]
callback: Sender<BlobContent>,
callback: Sender<(BlobContent, bool)>,
},

AssertBlobExists {
blob_id: BlobId,
#[debug(skip)]
callback: Sender<()>,
callback: Sender<bool>,
},
}
17 changes: 11 additions & 6 deletions linera-execution/src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1024,22 +1024,27 @@ impl<UserInstance> BaseRuntime for SyncRuntimeInternal<UserInstance> {

fn read_data_blob(&mut self, hash: &CryptoHash) -> Result<Vec<u8>, ExecutionError> {
let blob_id = BlobId::new(*hash, BlobType::Data);
self.transaction_tracker
.replay_oracle_response(OracleResponse::Blob(blob_id))?;
let blob_content = self
let (blob_content, is_new) = self
.execution_state_sender
.send_request(|callback| ExecutionRequest::ReadBlobContent { blob_id, callback })?
.recv_response()?;
if is_new {
self.transaction_tracker
.replay_oracle_response(OracleResponse::Blob(blob_id))?;
}
Ok(blob_content.inner_bytes())
}

fn assert_data_blob_exists(&mut self, hash: &CryptoHash) -> Result<(), ExecutionError> {
let blob_id = BlobId::new(*hash, BlobType::Data);
self.transaction_tracker
.replay_oracle_response(OracleResponse::Blob(blob_id))?;
self.execution_state_sender
let is_new = self
.execution_state_sender
.send_request(|callback| ExecutionRequest::AssertBlobExists { blob_id, callback })?
.recv_response()?;
if is_new {
self.transaction_tracker
.replay_oracle_response(OracleResponse::Blob(blob_id))?;
}
Ok(())
}
}
Expand Down
74 changes: 52 additions & 22 deletions linera-execution/src/system.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ pub struct SystemExecutionStateView<C> {
pub closed: HashedRegisterView<C, bool>,
/// Permissions for applications on this chain.
pub application_permissions: HashedRegisterView<C, ApplicationPermissions>,
/// Blobs that have been used or published on this chain.
pub used_blobs: HashedSetView<C, BlobId>,
}

/// The configuration for a new chain.
Expand Down Expand Up @@ -609,14 +611,14 @@ where
outcome.messages.push(message);
}
PublishBytecode { bytecode_id } => {
txn_tracker.replay_oracle_response(OracleResponse::Blob(BlobId::new(
self.blob_published(&BlobId::new(
bytecode_id.contract_blob_hash,
BlobType::ContractBytecode,
)))?;
txn_tracker.replay_oracle_response(OracleResponse::Blob(BlobId::new(
))?;
self.blob_published(&BlobId::new(
bytecode_id.service_blob_hash,
BlobType::ServiceBytecode,
)))?;
))?;
}
CreateApplication {
bytecode_id,
Expand Down Expand Up @@ -664,14 +666,11 @@ where
outcome.messages.push(message);
}
PublishDataBlob { blob_hash } => {
txn_tracker.replay_oracle_response(OracleResponse::Blob(BlobId::new(
blob_hash,
BlobType::Data,
)))?;
self.blob_published(&BlobId::new(blob_hash, BlobType::Data))?;
}
ReadBlob { blob_id } => {
txn_tracker.replay_oracle_response(OracleResponse::Blob(blob_id))?;
self.read_blob_content(blob_id).await?;
self.blob_used(Some(txn_tracker), blob_id).await?;
}
}

Expand Down Expand Up @@ -1006,12 +1005,41 @@ where
Ok(messages)
}

pub async fn read_blob_content(&mut self, blob_id: BlobId) -> Result<BlobContent, ViewError> {
self.context()
.extra()
.get_blob(blob_id)
.await
.map(Into::into)
/// Records a blob that is used in this block. If this is the first use on this chain, creates
/// an oracle response for it.
pub(crate) async fn blob_used(
&mut self,
txn_tracker: Option<&mut TransactionTracker>,
blob_id: BlobId,
) -> Result<bool, SystemExecutionError> {
if self.used_blobs.contains(&blob_id).await? {
return Ok(false); // Nothing to do.
}
self.used_blobs.insert(&blob_id)?;
if let Some(txn_tracker) = txn_tracker {
txn_tracker.replay_oracle_response(OracleResponse::Blob(blob_id))?;
}
Ok(true)
}

/// Records a blob that is published in this block. This does not create an oracle entry, and
/// the blob can be used without using an oracle in the future on this chain.
fn blob_published(&mut self, blob_id: &BlobId) -> Result<(), SystemExecutionError> {
self.used_blobs.insert(blob_id)?;
Ok(())
}

pub async fn read_blob_content(
&mut self,
blob_id: BlobId,
) -> Result<BlobContent, SystemExecutionError> {
match self.context().extra().get_blob(blob_id).await {
Ok(blob) => Ok(blob.into()),
Err(ViewError::BlobsNotFound(_)) => {
Err(SystemExecutionError::BlobsNotFound(vec![blob_id]))
}
Err(error) => Err(error.into()),
}
}

pub async fn assert_blob_exists(
Expand Down Expand Up @@ -1054,12 +1082,14 @@ where
missing_blobs.push(service_bytecode_blob_id);
}

if missing_blobs.is_empty() {
txn_tracker.replay_oracle_response(OracleResponse::Blob(contract_bytecode_blob_id))?;
txn_tracker.replay_oracle_response(OracleResponse::Blob(service_bytecode_blob_id))?;
Ok(())
} else {
Err(SystemExecutionError::BlobsNotFound(missing_blobs))
}
ensure!(
missing_blobs.is_empty(),
SystemExecutionError::BlobsNotFound(missing_blobs)
);
self.blob_used(Some(txn_tracker), contract_bytecode_blob_id)
.await?;
self.blob_used(Some(txn_tracker), service_bytecode_blob_id)
.await?;
Ok(())
}
}
10 changes: 9 additions & 1 deletion linera-execution/src/test_utils/system_execution_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use custom_debug_derive::Debug;
use linera_base::{
crypto::CryptoHash,
data_types::{Amount, ApplicationPermissions, Blob, Timestamp},
identifiers::{ApplicationId, ChainDescription, ChainId, Owner},
identifiers::{ApplicationId, BlobId, ChainDescription, ChainId, Owner},
ownership::ChainOwnership,
};
use linera_views::{
Expand Down Expand Up @@ -46,6 +46,7 @@ pub struct SystemExecutionState {
pub balances: BTreeMap<Owner, Amount>,
pub timestamp: Timestamp,
pub registry: ApplicationRegistry,
pub used_blobs: BTreeSet<BlobId>,
#[debug(skip_if = Not::not)]
pub closed: bool,
pub application_permissions: ApplicationPermissions,
Expand Down Expand Up @@ -108,6 +109,7 @@ impl SystemExecutionState {
balances,
timestamp,
registry,
used_blobs,
closed,
application_permissions,
extra_blobs,
Expand Down Expand Up @@ -160,6 +162,12 @@ impl SystemExecutionState {
.registry
.import(registry)
.expect("serialization of registry components should not fail");
for blob_id in used_blobs {
view.system
.used_blobs
.insert(&blob_id)
.expect("inserting blob IDs should not fail");
}
view.system.closed.set(closed);
view.system
.application_permissions
Expand Down

0 comments on commit a7c3228

Please sign in to comment.