From 116d008735f05692a38f9a5e20f2dd7a9e18a650 Mon Sep 17 00:00:00 2001 From: Ashwin Sekar Date: Sat, 2 Mar 2024 20:23:55 -0800 Subject: [PATCH] blockstore: atomize slot clearing, relax parent slot meta check (#35124) * blockstore: atomize slot clearing, relax parent slot meta check clear_unconfirmed_slot can leave blockstore in an irrecoverable state if it panics in the middle. write batch this function, so that any errors can be recovered after restart. additionally relax the constraint that the parent slot meta must exist, as it could have been cleaned up if outdated. * pr feedback: use PurgeType, don't pass slot_meta * pr feedback: add unit test * pr feedback: refactor into separate function * pr feedback: add special columns to helper, err msg, comments * pr feedback: reword comments and write batch error message * pr feedback: bubble write_batch error to caller * pr feedback: reword comments Co-authored-by: steviez --------- Co-authored-by: steviez (cherry picked from commit cc4072bce8aefb5bb7612cf47988b5f0b4fba014) # Conflicts: # ledger/src/blockstore.rs # ledger/src/blockstore/blockstore_purge.rs --- ledger/src/blockstore.rs | 20 +- ledger/src/blockstore/blockstore_purge.rs | 388 +++++++++++++++++++++- 2 files changed, 394 insertions(+), 14 deletions(-) diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index 8b964e5b3ce9a1..8210a160d0b72d 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -1111,9 +1111,8 @@ impl Blockstore { self.completed_slots_senders.lock().unwrap().clear(); } - /// Range-delete all entries which prefix matches the specified `slot`, - /// remove `slot` its' parents SlotMeta next_slots list, and - /// clear `slot`'s SlotMeta (except for next_slots). + /// Clear `slot` from the Blockstore, see ``Blockstore::purge_slot_cleanup_chaining` + /// for more details. /// /// This function currently requires `insert_shreds_lock`, as both /// `clear_unconfirmed_slot()` and `insert_shreds_handle_duplicate()` @@ -1121,6 +1120,7 @@ impl Blockstore { /// family. pub fn clear_unconfirmed_slot(&self, slot: Slot) { let _lock = self.insert_shreds_lock.lock().unwrap(); +<<<<<<< HEAD if let Some(mut slot_meta) = self .meta(slot) .expect("Couldn't fetch from SlotMeta column family") @@ -1152,9 +1152,21 @@ impl Blockstore { .expect("Couldn't insert into SlotMeta column family"); } else { error!( +======= + // Purge the slot and insert an empty `SlotMeta` with only the `next_slots` field preserved. + // Shreds inherently know their parent slot, and a parent's SlotMeta `next_slots` list + // will be updated when the child is inserted (see `Blockstore::handle_chaining()`). + // However, we are only purging and repairing the parent slot here. Since the child will not be + // reinserted the chaining will be lost. In order for bank forks discovery to ingest the child, + // we must retain the chain by preserving `next_slots`. + match self.purge_slot_cleanup_chaining(slot) { + Ok(_) => {} + Err(BlockstoreError::SlotUnavailable) => error!( +>>>>>>> cc4072bce8 (blockstore: atomize slot clearing, relax parent slot meta check (#35124)) "clear_unconfirmed_slot() called on slot {} with no SlotMeta", slot - ); + ), + Err(e) => panic!("Purge database operations failed {}", e), } } diff --git a/ledger/src/blockstore/blockstore_purge.rs b/ledger/src/blockstore/blockstore_purge.rs index 92f9453eabb6ed..5f3406ff7e903e 100644 --- a/ledger/src/blockstore/blockstore_purge.rs +++ b/ledger/src/blockstore/blockstore_purge.rs @@ -129,6 +129,7 @@ impl Blockstore { } } + #[cfg(test)] pub(crate) fn run_purge( &self, from_slot: Slot, @@ -138,11 +139,60 @@ impl Blockstore { self.run_purge_with_stats(from_slot, to_slot, purge_type, &mut PurgeStats::default()) } + /// Purges all columns relating to `slot`. + /// + /// Additionally, we cleanup the parent of `slot` by clearing `slot` from + /// the parent's `next_slots`. We reinsert an orphaned `slot_meta` for `slot` + /// that preserves `slot`'s `next_slots`. This ensures that `slot`'s fork is + /// replayable upon repair of `slot`. + pub(crate) fn purge_slot_cleanup_chaining(&self, slot: Slot) -> Result { + let Some(mut slot_meta) = self.meta(slot)? else { + return Err(BlockstoreError::SlotUnavailable); + }; + let mut write_batch = self.db.batch()?; + + let columns_purged = self.purge_range(&mut write_batch, slot, slot, PurgeType::Exact)?; + + if let Some(parent_slot) = slot_meta.parent_slot { + let parent_slot_meta = self.meta(parent_slot)?; + if let Some(mut parent_slot_meta) = parent_slot_meta { + // .retain() is a linear scan; however, next_slots should + // only contain several elements so this isn't so bad + parent_slot_meta + .next_slots + .retain(|&next_slot| next_slot != slot); + write_batch.put::(parent_slot, &parent_slot_meta)?; + } else { + error!( + "Parent slot meta {} for child {} is missing or cleaned up. + Falling back to orphan repair to remedy the situation", + parent_slot, slot + ); + } + } + + // Retain a SlotMeta for `slot` with the `next_slots` field retained + slot_meta.clear_unconfirmed_slot(); + write_batch.put::(slot, &slot_meta)?; + + self.db.write(write_batch).inspect_err(|e| { + error!( + "Error: {:?} while submitting write batch for slot {:?}", + e, slot + ) + })?; + Ok(columns_purged) + } + /// A helper function to `purge_slots` that executes the ledger clean up. /// The cleanup applies to \[`from_slot`, `to_slot`\]. /// /// When `from_slot` is 0, any sst-file with a key-range completely older /// than `to_slot` will also be deleted. + /// + /// Note: slots > `to_slot` that chained to a purged slot are not properly + /// cleaned up. This function is not intended to be used if such slots need + /// to be replayed. pub(crate) fn run_purge_with_stats( &self, from_slot: Slot, @@ -150,11 +200,10 @@ impl Blockstore { purge_type: PurgeType, purge_stats: &mut PurgeStats, ) -> Result { - let mut write_batch = self - .db - .batch() - .expect("Database Error: Failed to get write batch"); + let mut write_batch = self.db.batch()?; + let mut delete_range_timer = Measure::start("delete_range"); +<<<<<<< HEAD let mut columns_purged = self .db .delete_range_cf::(&mut write_batch, from_slot, to_slot) @@ -241,16 +290,18 @@ impl Blockstore { // in no spiky periodic huge delete_range for them. } } +======= + let columns_purged = self.purge_range(&mut write_batch, from_slot, to_slot, purge_type)?; +>>>>>>> cc4072bce8 (blockstore: atomize slot clearing, relax parent slot meta check (#35124)) delete_range_timer.stop(); let mut write_timer = Measure::start("write_batch"); - if let Err(e) = self.db.write(write_batch) { + self.db.write(write_batch).inspect(|e| { error!( - "Error: {:?} while submitting write batch for slot {:?} retrying...", - e, from_slot - ); - return Err(e); - } + "Error: {:?} while submitting write batch for purge from_slot {} to_slot {}", + e, from_slot, to_slot + ) + })?; write_timer.stop(); let mut purge_files_in_range_timer = Measure::start("delete_file_in_range"); @@ -281,6 +332,93 @@ impl Blockstore { Ok(columns_purged) } + fn purge_range( + &self, + write_batch: &mut WriteBatch, + from_slot: Slot, + to_slot: Slot, + purge_type: PurgeType, + ) -> Result { + let columns_purged = self + .db + .delete_range_cf::(write_batch, from_slot, to_slot) + .is_ok() + & self + .db + .delete_range_cf::(write_batch, from_slot, to_slot) + .is_ok() + & self + .db + .delete_range_cf::(write_batch, from_slot, to_slot) + .is_ok() + & self + .db + .delete_range_cf::(write_batch, from_slot, to_slot) + .is_ok() + & self + .db + .delete_range_cf::(write_batch, from_slot, to_slot) + .is_ok() + & self + .db + .delete_range_cf::(write_batch, from_slot, to_slot) + .is_ok() + & self + .db + .delete_range_cf::(write_batch, from_slot, to_slot) + .is_ok() + & self + .db + .delete_range_cf::(write_batch, from_slot, to_slot) + .is_ok() + & self + .db + .delete_range_cf::(write_batch, from_slot, to_slot) + .is_ok() + & self + .db + .delete_range_cf::(write_batch, from_slot, to_slot) + .is_ok() + & self + .db + .delete_range_cf::(write_batch, from_slot, to_slot) + .is_ok() + & self + .db + .delete_range_cf::(write_batch, from_slot, to_slot) + .is_ok() + & self + .db + .delete_range_cf::(write_batch, from_slot, to_slot) + .is_ok() + & self + .db + .delete_range_cf::(write_batch, from_slot, to_slot) + .is_ok() + & self + .db + .delete_range_cf::(write_batch, from_slot, to_slot) + .is_ok() + & self + .db + .delete_range_cf::(write_batch, from_slot, to_slot) + .is_ok(); + + match purge_type { + PurgeType::Exact => { + self.purge_special_columns_exact(write_batch, from_slot, to_slot)?; + } + PurgeType::CompactionFilter => { + // No explicit action is required here because this purge type completely and + // indefinitely relies on the proper working of compaction filter for those + // special column families, never toggling the primary index from the current + // one. Overall, this enables well uniformly distributed writes, resulting + // in no spiky periodic huge delete_range for them. + } + } + Ok(columns_purged) + } + fn purge_files_in_range(&self, from_slot: Slot, to_slot: Slot) -> bool { self.db .delete_file_in_range_cf::(from_slot, to_slot) @@ -1198,4 +1336,234 @@ pub mod tests { .purge_special_columns_exact(&mut write_batch, slot, slot + 1) .unwrap(); } +<<<<<<< HEAD +======= + + #[test] + fn test_purge_special_columns_compaction_filter() { + let ledger_path = get_tmp_ledger_path_auto_delete!(); + let blockstore = Blockstore::open(ledger_path.path()).unwrap(); + let max_slot = 19; + + clear_and_repopulate_transaction_statuses_for_test(&blockstore, max_slot); + let first_index = { + let mut status_entry_iterator = blockstore + .db + .iter::(IteratorMode::Start) + .unwrap(); + status_entry_iterator.next().unwrap().0 + }; + let last_index = { + let mut status_entry_iterator = blockstore + .db + .iter::(IteratorMode::End) + .unwrap(); + status_entry_iterator.next().unwrap().0 + }; + + let oldest_slot = 3; + blockstore.db.set_oldest_slot(oldest_slot); + blockstore.db.compact_range_cf::( + &cf::TransactionStatus::key(first_index), + &cf::TransactionStatus::key(last_index), + ); + + let status_entry_iterator = blockstore + .db + .iter::(IteratorMode::Start) + .unwrap(); + let mut count = 0; + for ((_signature, slot), _value) in status_entry_iterator { + assert!(slot >= oldest_slot); + count += 1; + } + assert_eq!(count, max_slot - (oldest_slot - 1)); + + clear_and_repopulate_transaction_statuses_for_test(&blockstore, max_slot); + let first_index = { + let mut status_entry_iterator = blockstore + .db + .iter::(IteratorMode::Start) + .unwrap(); + status_entry_iterator.next().unwrap().0 + }; + let last_index = { + let mut status_entry_iterator = blockstore + .db + .iter::(IteratorMode::End) + .unwrap(); + status_entry_iterator.next().unwrap().0 + }; + + let oldest_slot = 12; + blockstore.db.set_oldest_slot(oldest_slot); + blockstore.db.compact_range_cf::( + &cf::TransactionStatus::key(first_index), + &cf::TransactionStatus::key(last_index), + ); + + let status_entry_iterator = blockstore + .db + .iter::(IteratorMode::Start) + .unwrap(); + let mut count = 0; + for ((_signature, slot), _value) in status_entry_iterator { + assert!(slot >= oldest_slot); + count += 1; + } + assert_eq!(count, max_slot - (oldest_slot - 1)); + } + + #[test] + fn test_purge_transaction_memos_compaction_filter() { + let ledger_path = get_tmp_ledger_path_auto_delete!(); + let blockstore = Blockstore::open(ledger_path.path()).unwrap(); + let oldest_slot = 5; + + fn random_signature() -> Signature { + use rand::Rng; + + let mut key = [0u8; 64]; + rand::thread_rng().fill(&mut key[..]); + Signature::from(key) + } + + // Insert some deprecated TransactionMemos + blockstore + .transaction_memos_cf + .put_deprecated(random_signature(), &"this is a memo".to_string()) + .unwrap(); + blockstore + .transaction_memos_cf + .put_deprecated(random_signature(), &"another memo".to_string()) + .unwrap(); + // Set clean_slot_0 to false, since we have deprecated memos + blockstore.db.set_clean_slot_0(false); + + // Insert some current TransactionMemos + blockstore + .transaction_memos_cf + .put( + (random_signature(), oldest_slot - 1), + &"this is a new memo in slot 4".to_string(), + ) + .unwrap(); + blockstore + .transaction_memos_cf + .put( + (random_signature(), oldest_slot), + &"this is a memo in slot 5 ".to_string(), + ) + .unwrap(); + + let first_index = { + let mut memos_iterator = blockstore + .transaction_memos_cf + .iterator_cf_raw_key(IteratorMode::Start); + memos_iterator.next().unwrap().unwrap().0 + }; + let last_index = { + let mut memos_iterator = blockstore + .transaction_memos_cf + .iterator_cf_raw_key(IteratorMode::End); + memos_iterator.next().unwrap().unwrap().0 + }; + + // Purge at slot 0 should not affect any memos + blockstore.db.set_oldest_slot(0); + blockstore + .db + .compact_range_cf::(&first_index, &last_index); + let memos_iterator = blockstore + .transaction_memos_cf + .iterator_cf_raw_key(IteratorMode::Start); + let mut count = 0; + for item in memos_iterator { + let _item = item.unwrap(); + count += 1; + } + assert_eq!(count, 4); + + // Purge at oldest_slot without clean_slot_0 only purges the current memo at slot 4 + blockstore.db.set_oldest_slot(oldest_slot); + blockstore + .db + .compact_range_cf::(&first_index, &last_index); + let memos_iterator = blockstore + .transaction_memos_cf + .iterator_cf_raw_key(IteratorMode::Start); + let mut count = 0; + for item in memos_iterator { + let (key, _value) = item.unwrap(); + let slot = ::index(&key).1; + assert!(slot == 0 || slot >= oldest_slot); + count += 1; + } + assert_eq!(count, 3); + + // Purge at oldest_slot with clean_slot_0 purges deprecated memos + blockstore.db.set_clean_slot_0(true); + blockstore + .db + .compact_range_cf::(&first_index, &last_index); + let memos_iterator = blockstore + .transaction_memos_cf + .iterator_cf_raw_key(IteratorMode::Start); + let mut count = 0; + for item in memos_iterator { + let (key, _value) = item.unwrap(); + let slot = ::index(&key).1; + assert!(slot >= oldest_slot); + count += 1; + } + assert_eq!(count, 1); + } + + #[test] + fn test_purge_slot_cleanup_chaining_missing_slot_meta() { + let ledger_path = get_tmp_ledger_path_auto_delete!(); + let blockstore = Blockstore::open(ledger_path.path()).unwrap(); + + let (shreds, _) = make_many_slot_entries(0, 10, 5); + blockstore.insert_shreds(shreds, None, false).unwrap(); + + assert!(matches!( + blockstore.purge_slot_cleanup_chaining(11).unwrap_err(), + BlockstoreError::SlotUnavailable + )); + } + + #[test] + fn test_purge_slot_cleanup_chaining() { + let ledger_path = get_tmp_ledger_path_auto_delete!(); + let blockstore = Blockstore::open(ledger_path.path()).unwrap(); + + let (shreds, _) = make_many_slot_entries(0, 10, 5); + blockstore.insert_shreds(shreds, None, false).unwrap(); + let (slot_11, _) = make_slot_entries(11, 4, 5, true); + blockstore.insert_shreds(slot_11, None, false).unwrap(); + let (slot_12, _) = make_slot_entries(12, 5, 5, true); + blockstore.insert_shreds(slot_12, None, false).unwrap(); + + blockstore.purge_slot_cleanup_chaining(5).unwrap(); + + let slot_meta = blockstore.meta(5).unwrap().unwrap(); + let expected_slot_meta = SlotMeta { + slot: 5, + // Only the next_slots should be preserved + next_slots: vec![6, 12], + ..SlotMeta::default() + }; + assert_eq!(slot_meta, expected_slot_meta); + + let parent_slot_meta = blockstore.meta(4).unwrap().unwrap(); + assert_eq!(parent_slot_meta.next_slots, vec![11]); + + let child_slot_meta = blockstore.meta(6).unwrap().unwrap(); + assert_eq!(child_slot_meta.parent_slot.unwrap(), 5); + + let child_slot_meta = blockstore.meta(12).unwrap().unwrap(); + assert_eq!(child_slot_meta.parent_slot.unwrap(), 5); + } +>>>>>>> cc4072bce8 (blockstore: atomize slot clearing, relax parent slot meta check (#35124)) }