Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[cp][aptos-release-v1.23] [consensus] sync improvements to help slow nodes sync better #15376

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions consensus/src/quorum_store/batch_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ pub struct BatchStore {
batch_quota: usize,
validator_signer: ValidatorSigner,
persist_subscribers: DashMap<HashValue, Vec<oneshot::Sender<PersistedValue>>>,
expiration_buffer_usecs: u64,
}

impl BatchStore {
Expand All @@ -128,6 +129,7 @@ impl BatchStore {
db_quota: usize,
batch_quota: usize,
validator_signer: ValidatorSigner,
expiration_buffer_usecs: u64,
) -> Self {
let db_clone = db.clone();
let batch_store = Self {
Expand All @@ -142,6 +144,7 @@ impl BatchStore {
batch_quota,
validator_signer,
persist_subscribers: DashMap::new(),
expiration_buffer_usecs,
};
let db_content = db_clone
.get_all_batches()
Expand Down Expand Up @@ -283,15 +286,19 @@ impl BatchStore {
// pub(crate) for testing
#[allow(clippy::unwrap_used)]
pub(crate) fn clear_expired_payload(&self, certified_time: u64) -> Vec<HashValue> {
let expired_digests = self.expirations.lock().unwrap().expire(certified_time);
// To help slow nodes catch up via execution without going to state sync we keep the blocks for 60 extra seconds
// after the expiration time. This will help remote peers fetch batches that just expired but are within their
// execution window.
let expiration_time = certified_time.saturating_sub(self.expiration_buffer_usecs);
let expired_digests = self.expirations.lock().unwrap().expire(expiration_time);
let mut ret = Vec::new();
for h in expired_digests {
let removed_value = match self.db_cache.entry(h) {
Occupied(entry) => {
// We need to check up-to-date expiration again because receiving the same
// digest with a higher expiration would update the persisted value and
// effectively extend the expiration.
if entry.get().expiration() <= certified_time {
if entry.get().expiration() <= expiration_time {
self.persist_subscribers.remove(entry.get().digest());
Some(entry.remove())
} else {
Expand Down
1 change: 1 addition & 0 deletions consensus/src/quorum_store/quorum_store_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,7 @@ impl InnerBuilder {
self.config.db_quota,
self.config.batch_quota,
signer,
Duration::from_secs(60).as_micros() as u64,
));
self.batch_store = Some(batch_store.clone());
let batch_reader = Arc::new(BatchReaderImpl::new(batch_store.clone(), batch_requester));
Expand Down
1 change: 1 addition & 0 deletions consensus/src/quorum_store/tests/batch_store_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ pub fn batch_store_for_test(memory_quota: usize) -> Arc<BatchStore> {
2001, // db quota
2001, // batch quota
signers[0].clone(),
0,
))
}

Expand Down
6 changes: 3 additions & 3 deletions testsuite/forge-cli/src/suites/realistic_environment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -303,9 +303,9 @@ pub(crate) fn realistic_env_max_load_test(
.add_system_metrics_threshold(SystemMetricsThreshold::new(
// Check that we don't use more than 18 CPU cores for 15% of the time.
MetricsThreshold::new(25.0, 15),
// Memory starts around 7GB, and grows around 1.4GB/hr in this test.
// Memory starts around 8GB, and grows around 1.4GB/hr in this test.
// Check that we don't use more than final expected memory for more than 20% of the time.
MetricsThreshold::new_gb(7.0 + 1.4 * (duration_secs as f64 / 3600.0), 20),
MetricsThreshold::new_gb(8.0 + 1.4 * (duration_secs as f64 / 3600.0), 20),
))
.add_no_restarts()
.add_wait_for_catchup_s(
Expand All @@ -316,7 +316,7 @@ pub(crate) fn realistic_env_max_load_test(
.add_latency_threshold(4.5, LatencyType::P70)
.add_chain_progress(StateProgressThreshold {
max_non_epoch_no_progress_secs: 15.0,
max_epoch_no_progress_secs: 15.0,
max_epoch_no_progress_secs: 16.0,
max_non_epoch_round_gap: 4,
max_epoch_round_gap: 4,
});
Expand Down
Loading