Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[loader-v2] Fixing global cache reads & read-before-write on publish #15285

Merged
merged 5 commits into from
Nov 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions aptos-move/aptos-vm-types/src/resolver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,8 +176,13 @@ pub trait TModuleView {

/// Allows to query state information, e.g. its usage.
pub trait StateStorageView {
type Key;

fn id(&self) -> StateViewId;

/// Reads the state value from the DB. Used to enforce read-before-write for module writes.
fn read_state_value(&self, state_key: &Self::Key) -> Result<(), StateviewError>;

fn get_usage(&self) -> Result<StateStorageUsage, StateviewError>;
}

Expand All @@ -203,7 +208,7 @@ pub trait TExecutorView<K, T, L, I, V>:
+ TModuleView<Key = K>
+ TAggregatorV1View<Identifier = K>
+ TDelayedFieldView<Identifier = I, ResourceKey = K, ResourceGroupTag = T>
+ StateStorageView
+ StateStorageView<Key = K>
{
}

Expand All @@ -212,7 +217,7 @@ impl<A, K, T, L, I, V> TExecutorView<K, T, L, I, V> for A where
+ TModuleView<Key = K>
+ TAggregatorV1View<Identifier = K>
+ TDelayedFieldView<Identifier = I, ResourceKey = K, ResourceGroupTag = T>
+ StateStorageView
+ StateStorageView<Key = K>
{
}

Expand Down Expand Up @@ -278,10 +283,17 @@ impl<S> StateStorageView for S
where
S: StateView,
{
type Key = StateKey;

fn id(&self) -> StateViewId {
self.id()
}

fn read_state_value(&self, state_key: &Self::Key) -> Result<(), StateviewError> {
self.get_state_value(state_key)?;
Ok(())
}

fn get_usage(&self) -> Result<StateStorageUsage, StateviewError> {
self.get_usage().map_err(Into::into)
}
Expand Down
6 changes: 6 additions & 0 deletions aptos-move/aptos-vm/src/data_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -312,10 +312,16 @@ impl<S: StateView> AsMoveResolver<S> for S {
}

impl<'e, E: ExecutorView> StateStorageView for StorageAdapter<'e, E> {
type Key = StateKey;

fn id(&self) -> StateViewId {
self.executor_view.id()
}

fn read_state_value(&self, state_key: &Self::Key) -> Result<(), StateviewError> {
self.executor_view.read_state_value(state_key)
}

fn get_usage(&self) -> Result<StateStorageUsage, StateviewError> {
self.executor_view.get_usage()
}
Expand Down
2 changes: 1 addition & 1 deletion aptos-move/aptos-vm/src/move_vm_ext/resolver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ pub trait AptosMoveResolver:
+ ModuleResolver
+ ResourceResolver
+ ResourceGroupResolver
+ StateStorageView
+ StateStorageView<Key = StateKey>
+ TableResolver
+ AsExecutorView
+ AsResourceGroupView
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -318,10 +318,16 @@ impl<'r> TModuleView for ExecutorViewWithChangeSet<'r> {
}

impl<'r> StateStorageView for ExecutorViewWithChangeSet<'r> {
type Key = StateKey;

fn id(&self) -> StateViewId {
self.base_executor_view.id()
}

fn read_state_value(&self, state_key: &Self::Key) -> Result<(), StateviewError> {
self.base_executor_view.read_state_value(state_key)
}

fn get_usage(&self) -> Result<StateStorageUsage, StateviewError> {
Err(StateviewError::Other(
"Unexpected access to get_usage()".to_string(),
Expand Down
16 changes: 16 additions & 0 deletions aptos-move/aptos-vm/src/move_vm_ext/write_op_converter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,22 @@ impl<'r> WriteOpConverter<'r> {
)?;

let state_key = StateKey::module_id(&module_id);

// Enforce read-before-write:
// Modules can live in global cache, and so the DB may not see a module read even
// when it gets republished. This violates read-before-write property. Here, we on
// purpose enforce this by registering a read to the DB directly.
// Note that we also do it here so that in case of storage errors, only a single
// transaction fails (e.g., if doing this read before commit in block executor we
// have no way to alter the transaction outputs at that point).
self.remote.read_state_value(&state_key).map_err(|err| {
let msg = format!(
"Error when enforcing read-before-write for module {}::{}: {:?}",
addr, name, err
);
PartialVMError::new(StatusCode::STORAGE_ERROR).with_message(msg)
})?;

writes.insert(state_key, ModuleWrite::new(module_id, write_op));
}
Ok(writes)
Expand Down
73 changes: 31 additions & 42 deletions aptos-move/block-executor/src/captured_reads.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,13 +292,16 @@ impl DelayedFieldRead {
}
}

/// Represents a module read, either from immutable cross-block cache, or from code [SyncCodeCache]
/// used by block executor (per-block cache). This way, when transaction needs to read a module
/// from [SyncCodeCache] it can first check the read-set here.
/// Represents a module read, either from global module cache that spans multiple blocks, or from
/// per-block cache used by block executor to add committed modules. When transaction reads a
/// module, it should first check the read-set here, to ensure that if some module A has been read,
/// the same A is read again within the same transaction.
enum ModuleRead<DC, VC, S> {
/// Read from the cross-block module cache.
GlobalCache,
/// Read from per-block cache ([SyncCodeCache]) used by parallel execution.
/// Read from the global module cache. Modules in this cache have storage version, but require
/// different validation - a check that they have not been overridden.
GlobalCache(Arc<ModuleCode<DC, VC, S>>),
/// Read from per-block cache that contains committed (by specified transaction) and newly
/// loaded from storage (i.e., not yet moved to global module cache) modules.
PerBlockCache(Option<(Arc<ModuleCode<DC, VC, S>>, Option<TxnIndex>)>),
}

Expand Down Expand Up @@ -615,8 +618,8 @@ where
}

/// Records the read to global cache that spans across multiple blocks.
pub(crate) fn capture_global_cache_read(&mut self, key: K) {
self.module_reads.insert(key, ModuleRead::GlobalCache);
pub(crate) fn capture_global_cache_read(&mut self, key: K, read: Arc<ModuleCode<DC, VC, S>>) {
self.module_reads.insert(key, ModuleRead::GlobalCache(read));
}

/// Records the read to per-block level cache.
Expand All @@ -629,22 +632,19 @@ where
.insert(key, ModuleRead::PerBlockCache(read));
}

/// If the module has been previously read from [SyncCodeCache], returns it. Returns a panic
/// error if the read was cached for the global cross-module cache (we do not capture values
/// for those).
/// If the module has been previously read, returns it.
pub(crate) fn get_module_read(
&self,
key: &K,
) -> Result<CacheRead<Option<(Arc<ModuleCode<DC, VC, S>>, Option<TxnIndex>)>>, PanicError> {
Ok(match self.module_reads.get(key) {
) -> CacheRead<Option<(Arc<ModuleCode<DC, VC, S>>, Option<TxnIndex>)>> {
match self.module_reads.get(key) {
Some(ModuleRead::PerBlockCache(read)) => CacheRead::Hit(read.clone()),
Some(ModuleRead::GlobalCache) => {
return Err(PanicError::CodeInvariantError(
"Global module cache reads do not capture values".to_string(),
));
Some(ModuleRead::GlobalCache(read)) => {
// From global cache, we return a storage version.
CacheRead::Hit(Some((read.clone(), None)))
},
None => CacheRead::Miss,
})
}
}

/// For every module read that was captured, checks if the reads are still the same:
Expand All @@ -661,7 +661,7 @@ where
}

self.module_reads.iter().all(|(key, read)| match read {
ModuleRead::GlobalCache => global_module_cache.contains_valid(key),
ModuleRead::GlobalCache(_) => global_module_cache.contains_valid(key),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this whole match be equivalent to:

        self.module_reads.iter().all(|(key, read)| {
            let previous_version = match read {
              ModuleRead::GlobalCache(_) => None, // i.e. storage version
              ModuleRead::PerBlockCache(previous) => previous.as_ref().map(|(_, version)| *version);
            };
            let current_version = per_block_module_cache.get_module_version(key);
            current_version == previous_version
        })

why do we need to update GlobalCache at all while executing a block?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do if we read first from it (to know if entry is overridden or not). An alternative is to check lower level cache first, but this means performance penalty due to locking.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code can be somewhat equivalent, but:

let current_version = per_block_module_cache.get_module_version(key);

causes a prefetch of storage version by default. We would need to special case validation to not do it. An we also end up locking the cache (shard, worst case), instead of checking an atomic bool

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is because we may publish a module that invalidates the global cache that's being read I think

ModuleRead::PerBlockCache(previous) => {
let current_version = per_block_module_cache.get_module_version(key);
let previous_version = previous.as_ref().map(|(_, version)| *version);
Expand Down Expand Up @@ -1537,20 +1537,6 @@ mod test {
);
}

#[test]
fn test_global_cache_module_reads_are_not_recorded() {
let mut captured_reads = CapturedReads::<
TestTransactionType,
u32,
MockDeserializedCode,
MockVerifiedCode,
MockExtension,
>::new();

captured_reads.capture_global_cache_read(0);
assert!(captured_reads.get_module_read(&0).is_err())
}

#[test]
fn test_global_cache_module_reads() {
let mut captured_reads = CapturedReads::<
Expand All @@ -1563,11 +1549,13 @@ mod test {
let mut global_module_cache = GlobalModuleCache::empty();
let per_block_module_cache = SyncModuleCache::empty();

global_module_cache.insert(0, mock_verified_code(0, MockExtension::new(8)));
captured_reads.capture_global_cache_read(0);
let module_0 = mock_verified_code(0, MockExtension::new(8));
global_module_cache.insert(0, module_0.clone());
captured_reads.capture_global_cache_read(0, module_0);

global_module_cache.insert(1, mock_verified_code(1, MockExtension::new(8)));
captured_reads.capture_global_cache_read(1);
let module_1 = mock_verified_code(1, MockExtension::new(8));
global_module_cache.insert(1, module_1.clone());
captured_reads.capture_global_cache_read(1, module_1);

assert!(captured_reads.validate_module_reads(&global_module_cache, &per_block_module_cache));

Expand Down Expand Up @@ -1613,18 +1601,18 @@ mod test {
captured_reads.capture_per_block_cache_read(0, Some((a, Some(2))));
assert!(matches!(
captured_reads.get_module_read(&0),
Ok(CacheRead::Hit(Some(_)))
CacheRead::Hit(Some(_))
));

captured_reads.capture_per_block_cache_read(1, None);
assert!(matches!(
captured_reads.get_module_read(&1),
Ok(CacheRead::Hit(None))
CacheRead::Hit(None)
));

assert!(matches!(
captured_reads.get_module_read(&2),
Ok(CacheRead::Miss)
CacheRead::Miss
));
}

Expand Down Expand Up @@ -1701,8 +1689,9 @@ mod test {
let per_block_module_cache = SyncModuleCache::empty();

// Module exists in global cache.
global_module_cache.insert(0, mock_verified_code(0, MockExtension::new(8)));
captured_reads.capture_global_cache_read(0);
let m = mock_verified_code(0, MockExtension::new(8));
global_module_cache.insert(0, m.clone());
captured_reads.capture_global_cache_read(0, m);
assert!(captured_reads.validate_module_reads(&global_module_cache, &per_block_module_cache));

// Assume we republish this module: validation must fail.
Expand Down
59 changes: 27 additions & 32 deletions aptos-move/block-executor/src/code_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,44 +136,39 @@ impl<'a, T: Transaction, S: TStateView<Key = T::Key>, X: Executable> ModuleCache
Self::Version,
)>,
> {
// First, look up the module in the cross-block global module cache. Record the read for
// later validation in case the read module is republished.
if let Some(module) = self.global_module_cache.get_valid(key) {
match &self.latest_view {
ViewState::Sync(state) => state
.captured_reads
.borrow_mut()
.capture_global_cache_read(key.clone()),
ViewState::Unsync(state) => {
state.read_set.borrow_mut().capture_module_read(key.clone())
},
}
return Ok(Some((module, Self::Version::default())));
}

// Global cache miss: check module cache in versioned/unsync maps.
match &self.latest_view {
ViewState::Sync(state) => {
// Check the transaction-level cache with already read modules first.
let cache_read = state.captured_reads.borrow().get_module_read(key)?;
match cache_read {
CacheRead::Hit(read) => Ok(read),
CacheRead::Miss => {
// If the module has not been accessed by this transaction, go to the
// module cache and record the read.
let read = state
.versioned_map
.module_cache()
.get_module_or_build_with(key, builder)?;
state
.captured_reads
.borrow_mut()
.capture_per_block_cache_read(key.clone(), read.clone());
Ok(read)
},
if let CacheRead::Hit(read) = state.captured_reads.borrow().get_module_read(key) {
return Ok(read);
}

// Otherwise, it is a miss. Check global cache.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we check global cache before checking state.versioned_map.module_cache ?

on rolling commit - are we updating GlobalCache itself?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We update global cache at rolling commit - if published keys exist in global cache, we mark them as invalid. So reads to them results in a cache miss and we fallback to MVHashMap where we have placed the write at commit time.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can check versioned before, but then you end up acquiring a lock for potentially non-republished module (publish is rare). If 32 threads do this for aptos-framework, this is bad.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So instead, we lookup in global first, but check an atomic bool flag there (better than a lock), so we optimize for read case

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, then I would rename PerBlockCache to UnfinalizedBlockCache or something like that - to make it clear it only ever refers to things before rolling commit, and GlobalCache is global and updated within the block itself.

(you can do that in separate PR of course :) )

if let Some(module) = self.global_module_cache.get_valid(key) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we reverse the order of checking now? (I was wondering for the previous pr about the order too)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now, we always check local cache first. If it is not there, we as before check 1) global first, if valid, 2) per-block next. In both cases, clone the module to captured reads (local cache). So next read always reads the same thing. Does this make sense?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was asking more about why we checked global cache in previous pr, is this an orthogonal change or we need to reverse the order now?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we still first check global cache.

What is added here is check in captured reads - meaning whether this same transaction has already read it, and if it did - do not read it again

state
.captured_reads
.borrow_mut()
.capture_global_cache_read(key.clone(), module.clone());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it would be a total overkill here but I wonder if we can do RAAI style little struct that captures things on drop, to make sure different paths of getting things all get recorded in captured reads (not just for modules). But I suppose we don't have such complex flows anywhere else.

return Ok(Some((module, Self::Version::default())));
}

// If not global cache, check per-block cache.
let read = state
.versioned_map
.module_cache()
.get_module_or_build_with(key, builder)?;
state
.captured_reads
.borrow_mut()
.capture_per_block_cache_read(key.clone(), read.clone());
Ok(read)
},
ViewState::Unsync(state) => {
if let Some(module) = self.global_module_cache.get_valid(key) {
state.read_set.borrow_mut().capture_module_read(key.clone());
return Ok(Some((module, Self::Version::default())));
}

let read = state
.unsync_map
.module_cache()
Expand Down
7 changes: 7 additions & 0 deletions aptos-move/block-executor/src/view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1622,10 +1622,17 @@ impl<'a, T: Transaction, S: TStateView<Key = T::Key>, X: Executable> TModuleView
impl<'a, T: Transaction, S: TStateView<Key = T::Key>, X: Executable> StateStorageView
for LatestView<'a, T, S, X>
{
type Key = T::Key;

fn id(&self) -> StateViewId {
self.base_view.id()
}

fn read_state_value(&self, state_key: &Self::Key) -> Result<(), StateviewError> {
self.base_view.get_state_value(state_key)?;
Ok(())
}

fn get_usage(&self) -> Result<StateStorageUsage, StateviewError> {
self.base_view.get_usage()
}
Expand Down
6 changes: 4 additions & 2 deletions aptos-move/e2e-benchmark/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ fn execute_and_time_entry_point(
let mut rng = StdRng::seed_from_u64(14);
let entry_fun = entry_point
.create_payload(
package.get_module_id(entry_point.module_name()),
package,
entry_point.module_name(),
Some(&mut rng),
Some(publisher_address),
)
Expand Down Expand Up @@ -221,7 +222,8 @@ fn main() {
&publisher,
1,
init_entry_point.create_payload(
package.get_module_id(init_entry_point.module_name()),
&package,
init_entry_point.module_name(),
Some(&mut rng),
Some(publisher.address()),
),
Expand Down
6 changes: 4 additions & 2 deletions aptos-move/e2e-move-tests/src/tests/gas.rs
Original file line number Diff line number Diff line change
Expand Up @@ -684,7 +684,8 @@ fn test_txn_generator_workloads_calibrate_gas() {
runner.harness.run_transaction_payload(
&publisher,
init_entry_point.create_payload(
package.get_module_id(init_entry_point.module_name()),
&package,
init_entry_point.module_name(),
Some(&mut rng),
Some(publisher.address()),
),
Expand All @@ -695,7 +696,8 @@ fn test_txn_generator_workloads_calibrate_gas() {
&format!("entry_point_{entry_point:?}"),
&user,
entry_point.create_payload(
package.get_module_id(entry_point.module_name()),
&package,
entry_point.module_name(),
Some(&mut rng),
Some(publisher.address()),
),
Expand Down
Loading
Loading