Skip to content

Commit

Permalink
[loader-v2] Fixing global cache reads & read-before-write on publish (#…
Browse files Browse the repository at this point in the history
…15285) (#15298)

- Enforces read-before-write for module publishes.
- Records all module reads in captured reads, not just per-block.
- Adds a workload + test to publish and call modules.

Co-authored-by: Igor <[email protected]>
(cherry picked from commit 0a16e9e)

Co-authored-by: George Mitenkov <[email protected]>
  • Loading branch information
github-actions[bot] and georgemitenkov authored Nov 18, 2024
1 parent 2bb2d43 commit be3ccc5
Show file tree
Hide file tree
Showing 22 changed files with 604 additions and 441 deletions.
16 changes: 14 additions & 2 deletions aptos-move/aptos-vm-types/src/resolver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,8 +176,13 @@ pub trait TModuleView {

/// Allows to query state information, e.g. its usage.
pub trait StateStorageView {
type Key;

fn id(&self) -> StateViewId;

/// Reads the state value from the DB. Used to enforce read-before-write for module writes.
fn read_state_value(&self, state_key: &Self::Key) -> Result<(), StateviewError>;

fn get_usage(&self) -> Result<StateStorageUsage, StateviewError>;
}

Expand All @@ -203,7 +208,7 @@ pub trait TExecutorView<K, T, L, I, V>:
+ TModuleView<Key = K>
+ TAggregatorV1View<Identifier = K>
+ TDelayedFieldView<Identifier = I, ResourceKey = K, ResourceGroupTag = T>
+ StateStorageView
+ StateStorageView<Key = K>
{
}

Expand All @@ -212,7 +217,7 @@ impl<A, K, T, L, I, V> TExecutorView<K, T, L, I, V> for A where
+ TModuleView<Key = K>
+ TAggregatorV1View<Identifier = K>
+ TDelayedFieldView<Identifier = I, ResourceKey = K, ResourceGroupTag = T>
+ StateStorageView
+ StateStorageView<Key = K>
{
}

Expand Down Expand Up @@ -278,10 +283,17 @@ impl<S> StateStorageView for S
where
S: StateView,
{
type Key = StateKey;

fn id(&self) -> StateViewId {
self.id()
}

fn read_state_value(&self, state_key: &Self::Key) -> Result<(), StateviewError> {
self.get_state_value(state_key)?;
Ok(())
}

fn get_usage(&self) -> Result<StateStorageUsage, StateviewError> {
self.get_usage().map_err(Into::into)
}
Expand Down
6 changes: 6 additions & 0 deletions aptos-move/aptos-vm/src/data_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -312,10 +312,16 @@ impl<S: StateView> AsMoveResolver<S> for S {
}

impl<'e, E: ExecutorView> StateStorageView for StorageAdapter<'e, E> {
type Key = StateKey;

fn id(&self) -> StateViewId {
self.executor_view.id()
}

fn read_state_value(&self, state_key: &Self::Key) -> Result<(), StateviewError> {
self.executor_view.read_state_value(state_key)
}

fn get_usage(&self) -> Result<StateStorageUsage, StateviewError> {
self.executor_view.get_usage()
}
Expand Down
2 changes: 1 addition & 1 deletion aptos-move/aptos-vm/src/move_vm_ext/resolver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ pub trait AptosMoveResolver:
+ ModuleResolver
+ ResourceResolver
+ ResourceGroupResolver
+ StateStorageView
+ StateStorageView<Key = StateKey>
+ TableResolver
+ AsExecutorView
+ AsResourceGroupView
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -318,10 +318,16 @@ impl<'r> TModuleView for ExecutorViewWithChangeSet<'r> {
}

impl<'r> StateStorageView for ExecutorViewWithChangeSet<'r> {
type Key = StateKey;

fn id(&self) -> StateViewId {
self.base_executor_view.id()
}

fn read_state_value(&self, state_key: &Self::Key) -> Result<(), StateviewError> {
self.base_executor_view.read_state_value(state_key)
}

fn get_usage(&self) -> Result<StateStorageUsage, StateviewError> {
Err(StateviewError::Other(
"Unexpected access to get_usage()".to_string(),
Expand Down
16 changes: 16 additions & 0 deletions aptos-move/aptos-vm/src/move_vm_ext/write_op_converter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,22 @@ impl<'r> WriteOpConverter<'r> {
)?;

let state_key = StateKey::module_id(&module_id);

// Enforce read-before-write:
// Modules can live in global cache, and so the DB may not see a module read even
// when it gets republished. This violates read-before-write property. Here, we on
// purpose enforce this by registering a read to the DB directly.
// Note that we also do it here so that in case of storage errors, only a single
// transaction fails (e.g., if doing this read before commit in block executor we
// have no way to alter the transaction outputs at that point).
self.remote.read_state_value(&state_key).map_err(|err| {
let msg = format!(
"Error when enforcing read-before-write for module {}::{}: {:?}",
addr, name, err
);
PartialVMError::new(StatusCode::STORAGE_ERROR).with_message(msg)
})?;

writes.insert(state_key, ModuleWrite::new(module_id, write_op));
}
Ok(writes)
Expand Down
73 changes: 31 additions & 42 deletions aptos-move/block-executor/src/captured_reads.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,13 +292,16 @@ impl DelayedFieldRead {
}
}

/// Represents a module read, either from immutable cross-block cache, or from code [SyncCodeCache]
/// used by block executor (per-block cache). This way, when transaction needs to read a module
/// from [SyncCodeCache] it can first check the read-set here.
/// Represents a module read, either from global module cache that spans multiple blocks, or from
/// per-block cache used by block executor to add committed modules. When transaction reads a
/// module, it should first check the read-set here, to ensure that if some module A has been read,
/// the same A is read again within the same transaction.
enum ModuleRead<DC, VC, S> {
/// Read from the cross-block module cache.
GlobalCache,
/// Read from per-block cache ([SyncCodeCache]) used by parallel execution.
/// Read from the global module cache. Modules in this cache have storage version, but require
/// different validation - a check that they have not been overridden.
GlobalCache(Arc<ModuleCode<DC, VC, S>>),
/// Read from per-block cache that contains committed (by specified transaction) and newly
/// loaded from storage (i.e., not yet moved to global module cache) modules.
PerBlockCache(Option<(Arc<ModuleCode<DC, VC, S>>, Option<TxnIndex>)>),
}

Expand Down Expand Up @@ -615,8 +618,8 @@ where
}

/// Records the read to global cache that spans across multiple blocks.
pub(crate) fn capture_global_cache_read(&mut self, key: K) {
self.module_reads.insert(key, ModuleRead::GlobalCache);
pub(crate) fn capture_global_cache_read(&mut self, key: K, read: Arc<ModuleCode<DC, VC, S>>) {
self.module_reads.insert(key, ModuleRead::GlobalCache(read));
}

/// Records the read to per-block level cache.
Expand All @@ -629,22 +632,19 @@ where
.insert(key, ModuleRead::PerBlockCache(read));
}

/// If the module has been previously read from [SyncCodeCache], returns it. Returns a panic
/// error if the read was cached for the global cross-module cache (we do not capture values
/// for those).
/// If the module has been previously read, returns it.
pub(crate) fn get_module_read(
&self,
key: &K,
) -> Result<CacheRead<Option<(Arc<ModuleCode<DC, VC, S>>, Option<TxnIndex>)>>, PanicError> {
Ok(match self.module_reads.get(key) {
) -> CacheRead<Option<(Arc<ModuleCode<DC, VC, S>>, Option<TxnIndex>)>> {
match self.module_reads.get(key) {
Some(ModuleRead::PerBlockCache(read)) => CacheRead::Hit(read.clone()),
Some(ModuleRead::GlobalCache) => {
return Err(PanicError::CodeInvariantError(
"Global module cache reads do not capture values".to_string(),
));
Some(ModuleRead::GlobalCache(read)) => {
// From global cache, we return a storage version.
CacheRead::Hit(Some((read.clone(), None)))
},
None => CacheRead::Miss,
})
}
}

/// For every module read that was captured, checks if the reads are still the same:
Expand All @@ -661,7 +661,7 @@ where
}

self.module_reads.iter().all(|(key, read)| match read {
ModuleRead::GlobalCache => global_module_cache.contains_valid(key),
ModuleRead::GlobalCache(_) => global_module_cache.contains_valid(key),
ModuleRead::PerBlockCache(previous) => {
let current_version = per_block_module_cache.get_module_version(key);
let previous_version = previous.as_ref().map(|(_, version)| *version);
Expand Down Expand Up @@ -1537,20 +1537,6 @@ mod test {
);
}

#[test]
fn test_global_cache_module_reads_are_not_recorded() {
let mut captured_reads = CapturedReads::<
TestTransactionType,
u32,
MockDeserializedCode,
MockVerifiedCode,
MockExtension,
>::new();

captured_reads.capture_global_cache_read(0);
assert!(captured_reads.get_module_read(&0).is_err())
}

#[test]
fn test_global_cache_module_reads() {
let mut captured_reads = CapturedReads::<
Expand All @@ -1563,11 +1549,13 @@ mod test {
let mut global_module_cache = GlobalModuleCache::empty();
let per_block_module_cache = SyncModuleCache::empty();

global_module_cache.insert(0, mock_verified_code(0, MockExtension::new(8)));
captured_reads.capture_global_cache_read(0);
let module_0 = mock_verified_code(0, MockExtension::new(8));
global_module_cache.insert(0, module_0.clone());
captured_reads.capture_global_cache_read(0, module_0);

global_module_cache.insert(1, mock_verified_code(1, MockExtension::new(8)));
captured_reads.capture_global_cache_read(1);
let module_1 = mock_verified_code(1, MockExtension::new(8));
global_module_cache.insert(1, module_1.clone());
captured_reads.capture_global_cache_read(1, module_1);

assert!(captured_reads.validate_module_reads(&global_module_cache, &per_block_module_cache));

Expand Down Expand Up @@ -1613,18 +1601,18 @@ mod test {
captured_reads.capture_per_block_cache_read(0, Some((a, Some(2))));
assert!(matches!(
captured_reads.get_module_read(&0),
Ok(CacheRead::Hit(Some(_)))
CacheRead::Hit(Some(_))
));

captured_reads.capture_per_block_cache_read(1, None);
assert!(matches!(
captured_reads.get_module_read(&1),
Ok(CacheRead::Hit(None))
CacheRead::Hit(None)
));

assert!(matches!(
captured_reads.get_module_read(&2),
Ok(CacheRead::Miss)
CacheRead::Miss
));
}

Expand Down Expand Up @@ -1701,8 +1689,9 @@ mod test {
let per_block_module_cache = SyncModuleCache::empty();

// Module exists in global cache.
global_module_cache.insert(0, mock_verified_code(0, MockExtension::new(8)));
captured_reads.capture_global_cache_read(0);
let m = mock_verified_code(0, MockExtension::new(8));
global_module_cache.insert(0, m.clone());
captured_reads.capture_global_cache_read(0, m);
assert!(captured_reads.validate_module_reads(&global_module_cache, &per_block_module_cache));

// Assume we republish this module: validation must fail.
Expand Down
59 changes: 27 additions & 32 deletions aptos-move/block-executor/src/code_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,44 +136,39 @@ impl<'a, T: Transaction, S: TStateView<Key = T::Key>, X: Executable> ModuleCache
Self::Version,
)>,
> {
// First, look up the module in the cross-block global module cache. Record the read for
// later validation in case the read module is republished.
if let Some(module) = self.global_module_cache.get_valid(key) {
match &self.latest_view {
ViewState::Sync(state) => state
.captured_reads
.borrow_mut()
.capture_global_cache_read(key.clone()),
ViewState::Unsync(state) => {
state.read_set.borrow_mut().capture_module_read(key.clone())
},
}
return Ok(Some((module, Self::Version::default())));
}

// Global cache miss: check module cache in versioned/unsync maps.
match &self.latest_view {
ViewState::Sync(state) => {
// Check the transaction-level cache with already read modules first.
let cache_read = state.captured_reads.borrow().get_module_read(key)?;
match cache_read {
CacheRead::Hit(read) => Ok(read),
CacheRead::Miss => {
// If the module has not been accessed by this transaction, go to the
// module cache and record the read.
let read = state
.versioned_map
.module_cache()
.get_module_or_build_with(key, builder)?;
state
.captured_reads
.borrow_mut()
.capture_per_block_cache_read(key.clone(), read.clone());
Ok(read)
},
if let CacheRead::Hit(read) = state.captured_reads.borrow().get_module_read(key) {
return Ok(read);
}

// Otherwise, it is a miss. Check global cache.
if let Some(module) = self.global_module_cache.get_valid(key) {
state
.captured_reads
.borrow_mut()
.capture_global_cache_read(key.clone(), module.clone());
return Ok(Some((module, Self::Version::default())));
}

// If not global cache, check per-block cache.
let read = state
.versioned_map
.module_cache()
.get_module_or_build_with(key, builder)?;
state
.captured_reads
.borrow_mut()
.capture_per_block_cache_read(key.clone(), read.clone());
Ok(read)
},
ViewState::Unsync(state) => {
if let Some(module) = self.global_module_cache.get_valid(key) {
state.read_set.borrow_mut().capture_module_read(key.clone());
return Ok(Some((module, Self::Version::default())));
}

let read = state
.unsync_map
.module_cache()
Expand Down
7 changes: 7 additions & 0 deletions aptos-move/block-executor/src/view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1622,10 +1622,17 @@ impl<'a, T: Transaction, S: TStateView<Key = T::Key>, X: Executable> TModuleView
impl<'a, T: Transaction, S: TStateView<Key = T::Key>, X: Executable> StateStorageView
for LatestView<'a, T, S, X>
{
type Key = T::Key;

fn id(&self) -> StateViewId {
self.base_view.id()
}

fn read_state_value(&self, state_key: &Self::Key) -> Result<(), StateviewError> {
self.base_view.get_state_value(state_key)?;
Ok(())
}

fn get_usage(&self) -> Result<StateStorageUsage, StateviewError> {
self.base_view.get_usage()
}
Expand Down
6 changes: 4 additions & 2 deletions aptos-move/e2e-benchmark/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ fn execute_and_time_entry_point(
let mut rng = StdRng::seed_from_u64(14);
let entry_fun = entry_point
.create_payload(
package.get_module_id(entry_point.module_name()),
package,
entry_point.module_name(),
Some(&mut rng),
Some(publisher_address),
)
Expand Down Expand Up @@ -221,7 +222,8 @@ fn main() {
&publisher,
1,
init_entry_point.create_payload(
package.get_module_id(init_entry_point.module_name()),
&package,
init_entry_point.module_name(),
Some(&mut rng),
Some(publisher.address()),
),
Expand Down
6 changes: 4 additions & 2 deletions aptos-move/e2e-move-tests/src/tests/gas.rs
Original file line number Diff line number Diff line change
Expand Up @@ -684,7 +684,8 @@ fn test_txn_generator_workloads_calibrate_gas() {
runner.harness.run_transaction_payload(
&publisher,
init_entry_point.create_payload(
package.get_module_id(init_entry_point.module_name()),
&package,
init_entry_point.module_name(),
Some(&mut rng),
Some(publisher.address()),
),
Expand All @@ -695,7 +696,8 @@ fn test_txn_generator_workloads_calibrate_gas() {
&format!("entry_point_{entry_point:?}"),
&user,
entry_point.create_payload(
package.get_module_id(entry_point.module_name()),
&package,
entry_point.module_name(),
Some(&mut rng),
Some(publisher.address()),
),
Expand Down
Loading

0 comments on commit be3ccc5

Please sign in to comment.