From 9d588f541ced17ca1fc54426e41d4cc58f4d2e63 Mon Sep 17 00:00:00 2001 From: steviez Date: Tue, 19 Nov 2024 10:56:49 -0600 Subject: [PATCH] blockstore: Make WriteBatch operations go through LedgerColumn (#3687) This change is in preparation for a future change that will shift rocksdb columns to be stack allocated. In order to accomplish that, the conversion from Column::Index to a byte array will happen within LedgerColumn functions. So, this change shifts per-column logic from WriteBatch to LedgerColumn --- ledger/src/blockstore.rs | 175 ++++++++++++---------- ledger/src/blockstore/blockstore_purge.rs | 117 +++++++-------- ledger/src/blockstore_db.rs | 144 +++++++++--------- 3 files changed, 217 insertions(+), 219 deletions(-) diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index 7db1568b421f1d..3669b9fc0581c1 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -1141,11 +1141,8 @@ impl Blockstore { metrics.chaining_elapsed_us += start.as_us(); let mut start = Measure::start("Commit Working Sets"); - let (should_signal, newly_completed_slots) = commit_slot_meta_working_set( - &slot_meta_working_set, - &self.completed_slots_senders.lock().unwrap(), - &mut write_batch, - )?; + let (should_signal, newly_completed_slots) = + self.commit_slot_meta_working_set(&slot_meta_working_set, &mut write_batch)?; for (erasure_set, working_erasure_meta) in erasure_metas.iter() { if !working_erasure_meta.should_write() { @@ -1212,7 +1209,8 @@ impl Blockstore { continue; } let (slot, fec_set_index) = erasure_set.store_key(); - write_batch.put::( + self.erasure_meta_cf.put_in_batch( + &mut write_batch, (slot, u64::from(fec_set_index)), working_erasure_meta.as_ref(), )?; @@ -1223,7 +1221,8 @@ impl Blockstore { // No need to rewrite the column continue; } - write_batch.put::( + self.merkle_root_meta_cf.put_in_batch( + &mut write_batch, erasure_set.store_key(), working_merkle_root_meta.as_ref(), )?; @@ -1231,7 +1230,11 @@ impl Blockstore { for (&slot, index_working_set_entry) in index_working_set.iter() { if index_working_set_entry.did_insert_occur { - write_batch.put::(slot, &index_working_set_entry.index)?; + self.index_cf.put_in_batch( + &mut write_batch, + slot, + &index_working_set_entry.index, + )?; } } start.stop(); @@ -1658,7 +1661,9 @@ impl Blockstore { {} is not full, marking slot dead", shred_index, slot_meta.received, slot ); - write_batch.put::(slot, &true).unwrap(); + self.dead_slots_cf + .put_in_batch(write_batch, slot, &true) + .unwrap(); } if !self.should_insert_data_shred( @@ -1731,7 +1736,8 @@ impl Blockstore { // Commit step: commit all changes to the mutable structures at once, or none at all. // We don't want only a subset of these changes going through. - write_batch.put_bytes::((slot, shred_index), shred.payload())?; + self.code_shred_cf + .put_bytes_in_batch(write_batch, (slot, shred_index), shred.payload())?; index_meta.coding_mut().insert(shred_index); Ok(()) @@ -2194,7 +2200,11 @@ impl Blockstore { // Commit step: commit all changes to the mutable structures at once, or none at all. // We don't want only a subset of these changes going through. - write_batch.put_bytes::((slot, index), shred.bytes_to_store())?; + self.data_shred_cf.put_bytes_in_batch( + write_batch, + (slot, index), + shred.bytes_to_store(), + )?; data_index.insert(index); let newly_completed_data_sets = update_slot_meta( last_in_slot, @@ -2940,7 +2950,7 @@ impl Blockstore { keys_with_writable: impl Iterator, status: TransactionStatusMeta, transaction_index: usize, - db_write_batch: &mut WriteBatch<'_>, + db_write_batch: &mut WriteBatch, ) -> Result<()> { self.write_transaction_status_helper( slot, @@ -2949,7 +2959,8 @@ impl Blockstore { status, transaction_index, |address, slot, tx_index, signature, writeable| { - db_write_batch.put::( + self.address_signatures_cf.put_in_batch( + db_write_batch, (*address, slot, tx_index, signature), &AddressSignatureMeta { writeable }, ) @@ -2989,9 +3000,10 @@ impl Blockstore { signature: &Signature, slot: Slot, memos: String, - db_write_batch: &mut WriteBatch<'_>, + db_write_batch: &mut WriteBatch, ) -> Result<()> { - db_write_batch.put::((*signature, slot), &memos) + self.transaction_memos_cf + .put_in_batch(db_write_batch, (*signature, slot), &memos) } /// Acquires the `lowest_cleanup_slot` lock and returns a tuple of the held lock @@ -3989,7 +4001,8 @@ impl Blockstore { frozen_hash, is_duplicate_confirmed: true, }); - write_batch.put::(slot, &data)?; + self.bank_hash_cf + .put_in_batch(&mut write_batch, slot, &data)?; } self.db.write(write_batch)?; @@ -4001,7 +4014,7 @@ impl Blockstore { let mut max_new_rooted_slot = 0; for slot in rooted_slots { max_new_rooted_slot = std::cmp::max(max_new_rooted_slot, *slot); - write_batch.put::(*slot, &true)?; + self.roots_cf.put_in_batch(&mut write_batch, *slot, &true)?; } self.db.write(write_batch)?; @@ -4306,7 +4319,8 @@ impl Blockstore { // slot match the flags of slots that become connected the typical way. root_meta.set_parent_connected(); root_meta.set_connected(); - write_batch.put::(root_meta.slot, &root_meta)?; + self.meta_cf + .put_in_batch(&mut write_batch, root_meta.slot, &root_meta)?; let mut next_slots = VecDeque::from(root_meta.next_slots); while !next_slots.is_empty() { @@ -4318,7 +4332,8 @@ impl Blockstore { if meta.set_parent_connected() { next_slots.extend(meta.next_slots.iter()); } - write_batch.put::(meta.slot, &meta)?; + self.meta_cf + .put_in_batch(&mut write_batch, meta.slot, &meta)?; } self.db.write(write_batch)?; @@ -4360,7 +4375,7 @@ impl Blockstore { // Write all the newly changed slots in new_chained_slots to the write_batch for (slot, meta) in new_chained_slots.iter() { let meta: &SlotMeta = &RefCell::borrow(meta); - write_batch.put::(*slot, meta)?; + self.meta_cf.put_in_batch(write_batch, *slot, meta)?; } Ok(()) } @@ -4436,14 +4451,15 @@ impl Blockstore { // If the parent of `slot` is a newly inserted orphan, insert it into the orphans // column family if RefCell::borrow(&*prev_slot_meta).is_orphan() { - write_batch.put::(prev_slot, &true)?; + self.orphans_cf + .put_in_batch(write_batch, prev_slot, &true)?; } } } // At this point this slot has received a parent, so it's no longer an orphan if was_orphan_slot { - write_batch.delete::(slot)?; + self.orphans_cf.delete_in_batch(write_batch, slot)?; } } @@ -4506,6 +4522,50 @@ impl Blockstore { Ok(()) } + /// For each slot in the slot_meta_working_set which has any change, include + /// corresponding updates to cf::SlotMeta via the specified `write_batch`. + /// The `write_batch` will later be atomically committed to the blockstore. + /// + /// Arguments: + /// - `slot_meta_working_set`: a map that maintains slot-id to its `SlotMeta` + /// mapping. + /// - `write_batch`: the write batch which includes all the updates of the + /// the current write and ensures their atomicity. + /// + /// On success, the function returns an Ok result with pair where: + /// - `should_signal`: a boolean flag indicating whether to send signal. + /// - `newly_completed_slots`: a subset of slot_meta_working_set which are + /// newly completed. + fn commit_slot_meta_working_set( + &self, + slot_meta_working_set: &HashMap, + write_batch: &mut WriteBatch, + ) -> Result<(bool, Vec)> { + let mut should_signal = false; + let mut newly_completed_slots = vec![]; + let completed_slots_senders = self.completed_slots_senders.lock().unwrap(); + + // Check if any metadata was changed, if so, insert the new version of the + // metadata into the write batch + for (slot, slot_meta_entry) in slot_meta_working_set.iter() { + // Any slot that wasn't written to should have been filtered out by now. + assert!(slot_meta_entry.did_insert_occur); + let meta: &SlotMeta = &RefCell::borrow(&*slot_meta_entry.new_slot_meta); + let meta_backup = &slot_meta_entry.old_slot_meta; + if !completed_slots_senders.is_empty() && is_newly_completed_slot(meta, meta_backup) { + newly_completed_slots.push(*slot); + } + // Check if the working copy of the metadata has changed + if Some(meta) != meta_backup.as_ref() { + should_signal = should_signal || slot_has_updates(meta, meta_backup); + self.meta_cf.put_in_batch(write_batch, *slot, meta)?; + } + } + + Ok((should_signal, newly_completed_slots)) + } + /// Obtain the SlotMeta from the in-memory slot_meta_working_set or load /// it from the database if it does not exist in slot_meta_working_set. /// @@ -4625,7 +4685,7 @@ impl Blockstore { res } - pub fn get_write_batch(&self) -> std::result::Result, BlockstoreError> { + pub fn get_write_batch(&self) -> std::result::Result { self.db.batch() } @@ -4751,51 +4811,6 @@ fn send_signals( } } -/// For each slot in the slot_meta_working_set which has any change, include -/// corresponding updates to cf::SlotMeta via the specified `write_batch`. -/// The `write_batch` will later be atomically committed to the blockstore. -/// -/// Arguments: -/// - `slot_meta_working_set`: a map that maintains slot-id to its `SlotMeta` -/// mapping. -/// - `completed_slot_senders`: the units which are responsible for sending -/// signals for completed slots. -/// - `write_batch`: the write batch which includes all the updates of the -/// the current write and ensures their atomicity. -/// -/// On success, the function returns an Ok result with pair where: -/// - `should_signal`: a boolean flag indicating whether to send signal. -/// - `newly_completed_slots`: a subset of slot_meta_working_set which are -/// newly completed. -fn commit_slot_meta_working_set( - slot_meta_working_set: &HashMap, - completed_slots_senders: &[Sender>], - write_batch: &mut WriteBatch, -) -> Result<(bool, Vec)> { - let mut should_signal = false; - let mut newly_completed_slots = vec![]; - - // Check if any metadata was changed, if so, insert the new version of the - // metadata into the write batch - for (slot, slot_meta_entry) in slot_meta_working_set.iter() { - // Any slot that wasn't written to should have been filtered out by now. - assert!(slot_meta_entry.did_insert_occur); - let meta: &SlotMeta = &RefCell::borrow(&*slot_meta_entry.new_slot_meta); - let meta_backup = &slot_meta_entry.old_slot_meta; - if !completed_slots_senders.is_empty() && is_newly_completed_slot(meta, meta_backup) { - newly_completed_slots.push(*slot); - } - // Check if the working copy of the metadata has changed - if Some(meta) != meta_backup.as_ref() { - should_signal = should_signal || slot_has_updates(meta, meta_backup); - write_batch.put::(*slot, meta)?; - } - } - - Ok((should_signal, newly_completed_slots)) -} - /// Returns the `SlotMeta` of the specified `slot` from the two cached states: /// `working_set` and `chained_slots`. If both contain the `SlotMeta`, then /// the latest one from the `working_set` will be returned. @@ -7522,11 +7537,9 @@ pub mod tests { ); for (erasure_set, working_merkle_root_meta) in merkle_root_metas { - write_batch - .put::( - erasure_set.store_key(), - working_merkle_root_meta.as_ref(), - ) + blockstore + .merkle_root_meta_cf + .put(erasure_set.store_key(), working_merkle_root_meta.as_ref()) .unwrap(); } blockstore.db.write(write_batch).unwrap(); @@ -7721,11 +7734,9 @@ pub mod tests { ); for (erasure_set, working_merkle_root_meta) in merkle_root_metas { - write_batch - .put::( - erasure_set.store_key(), - working_merkle_root_meta.as_ref(), - ) + blockstore + .merkle_root_meta_cf + .put(erasure_set.store_key(), working_merkle_root_meta.as_ref()) .unwrap(); } blockstore.db.write(write_batch).unwrap(); @@ -11913,8 +11924,8 @@ pub mod tests { .unwrap(); let mut write_batch = blockstore.db.batch().unwrap(); blockstore - .db - .delete_range_cf::(&mut write_batch, slot, slot) + .merkle_root_meta_cf + .delete_range_in_batch(&mut write_batch, slot, slot) .unwrap(); blockstore.db.write(write_batch).unwrap(); assert!(blockstore @@ -11979,8 +11990,8 @@ pub mod tests { // an older version. let mut write_batch = blockstore.db.batch().unwrap(); blockstore - .db - .delete_range_cf::(&mut write_batch, slot, slot) + .merkle_root_meta_cf + .delete_range_in_batch(&mut write_batch, slot, slot) .unwrap(); blockstore.db.write(write_batch).unwrap(); assert!(blockstore diff --git a/ledger/src/blockstore/blockstore_purge.rs b/ledger/src/blockstore/blockstore_purge.rs index 661b8780c50ee2..0497fec7dab3f3 100644 --- a/ledger/src/blockstore/blockstore_purge.rs +++ b/ledger/src/blockstore/blockstore_purge.rs @@ -1,6 +1,5 @@ use { super::*, - crate::blockstore_db::ColumnIndexDeprecation, solana_sdk::message::AccountKeys, std::{cmp::max, time::Instant}, }; @@ -167,7 +166,8 @@ impl Blockstore { parent_slot_meta .next_slots .retain(|&next_slot| next_slot != slot); - write_batch.put::(parent_slot, &parent_slot_meta)?; + self.meta_cf + .put_in_batch(&mut write_batch, parent_slot, &parent_slot_meta)?; } else { error!( "Parent slot meta {} for child {} is missing or cleaned up. @@ -179,7 +179,8 @@ impl Blockstore { // Retain a SlotMeta for `slot` with the `next_slots` field retained slot_meta.clear_unconfirmed_slot(); - write_batch.put::(slot, &slot_meta)?; + self.meta_cf + .put_in_batch(&mut write_batch, slot, &slot_meta)?; self.db.write(write_batch).inspect_err(|e| { error!( @@ -253,68 +254,68 @@ impl Blockstore { purge_type: PurgeType, ) -> Result { let columns_purged = self - .db - .delete_range_cf::(write_batch, from_slot, to_slot) + .meta_cf + .delete_range_in_batch(write_batch, from_slot, to_slot) .is_ok() & self - .db - .delete_range_cf::(write_batch, from_slot, to_slot) + .bank_hash_cf + .delete_range_in_batch(write_batch, from_slot, to_slot) .is_ok() & self - .db - .delete_range_cf::(write_batch, from_slot, to_slot) + .roots_cf + .delete_range_in_batch(write_batch, from_slot, to_slot) .is_ok() & self - .db - .delete_range_cf::(write_batch, from_slot, to_slot) + .data_shred_cf + .delete_range_in_batch(write_batch, from_slot, to_slot) .is_ok() & self - .db - .delete_range_cf::(write_batch, from_slot, to_slot) + .code_shred_cf + .delete_range_in_batch(write_batch, from_slot, to_slot) .is_ok() & self - .db - .delete_range_cf::(write_batch, from_slot, to_slot) + .dead_slots_cf + .delete_range_in_batch(write_batch, from_slot, to_slot) .is_ok() & self - .db - .delete_range_cf::(write_batch, from_slot, to_slot) + .duplicate_slots_cf + .delete_range_in_batch(write_batch, from_slot, to_slot) .is_ok() & self - .db - .delete_range_cf::(write_batch, from_slot, to_slot) + .erasure_meta_cf + .delete_range_in_batch(write_batch, from_slot, to_slot) .is_ok() & self - .db - .delete_range_cf::(write_batch, from_slot, to_slot) + .orphans_cf + .delete_range_in_batch(write_batch, from_slot, to_slot) .is_ok() & self - .db - .delete_range_cf::(write_batch, from_slot, to_slot) + .index_cf + .delete_range_in_batch(write_batch, from_slot, to_slot) .is_ok() & self - .db - .delete_range_cf::(write_batch, from_slot, to_slot) + .rewards_cf + .delete_range_in_batch(write_batch, from_slot, to_slot) .is_ok() & self - .db - .delete_range_cf::(write_batch, from_slot, to_slot) + .blocktime_cf + .delete_range_in_batch(write_batch, from_slot, to_slot) .is_ok() & self - .db - .delete_range_cf::(write_batch, from_slot, to_slot) + .perf_samples_cf + .delete_range_in_batch(write_batch, from_slot, to_slot) .is_ok() & self - .db - .delete_range_cf::(write_batch, from_slot, to_slot) + .block_height_cf + .delete_range_in_batch(write_batch, from_slot, to_slot) .is_ok() & self - .db - .delete_range_cf::(write_batch, from_slot, to_slot) + .optimistic_slots_cf + .delete_range_in_batch(write_batch, from_slot, to_slot) .is_ok() & self - .db - .delete_range_cf::(write_batch, from_slot, to_slot) + .merkle_root_meta_cf + .delete_range_in_batch(write_batch, from_slot, to_slot) .is_ok(); match purge_type { @@ -462,21 +463,17 @@ impl Blockstore { .flat_map(|entry| entry.transactions); for (i, transaction) in transactions.enumerate() { if let Some(&signature) = transaction.signatures.first() { - batch.delete::((signature, slot))?; - batch.delete::((signature, slot))?; + self.transaction_status_cf + .delete_in_batch(batch, (signature, slot))?; + self.transaction_memos_cf + .delete_in_batch(batch, (signature, slot))?; if !primary_indexes.is_empty() { - batch.delete_raw::( - &cf::TransactionMemos::deprecated_key(signature), - )?; + self.transaction_memos_cf + .delete_deprecated_in_batch(batch, signature)?; } for primary_index in &primary_indexes { - batch.delete_raw::( - &cf::TransactionStatus::deprecated_key(( - *primary_index, - signature, - slot, - )), - )?; + self.transaction_status_cf + .delete_deprecated_in_batch(batch, (*primary_index, signature, slot))?; } let meta = self.read_transaction_status((signature, slot))?; @@ -489,20 +486,14 @@ impl Blockstore { let transaction_index = u32::try_from(i).map_err(|_| BlockstoreError::TransactionIndexOverflow)?; for pubkey in account_keys.iter() { - batch.delete::(( - *pubkey, - slot, - transaction_index, - signature, - ))?; + self.address_signatures_cf.delete_in_batch( + batch, + (*pubkey, slot, transaction_index, signature), + )?; for primary_index in &primary_indexes { - batch.delete_raw::( - &cf::AddressSignatures::deprecated_key(( - *primary_index, - *pubkey, - slot, - signature, - )), + self.address_signatures_cf.delete_deprecated_in_batch( + batch, + (*primary_index, *pubkey, slot, signature), )?; } } @@ -512,12 +503,14 @@ impl Blockstore { let mut update_highest_primary_index_slot = false; if index0.max_slot >= from_slot && index0.max_slot <= to_slot { index0.max_slot = from_slot.saturating_sub(1); - batch.put::(0, &index0)?; + self.transaction_status_index_cf + .put_in_batch(batch, 0, &index0)?; update_highest_primary_index_slot = true; } if index1.max_slot >= from_slot && index1.max_slot <= to_slot { index1.max_slot = from_slot.saturating_sub(1); - batch.put::(1, &index1)?; + self.transaction_status_index_cf + .put_in_batch(batch, 1, &index1)?; update_highest_primary_index_slot = true } if update_highest_primary_index_slot { diff --git a/ledger/src/blockstore_db.rs b/ledger/src/blockstore_db.rs index 2c211045d3ae00..3b469a6a385366 100644 --- a/ledger/src/blockstore_db.rs +++ b/ledger/src/blockstore_db.rs @@ -32,7 +32,7 @@ use { }, solana_storage_proto::convert::generated, std::{ - collections::{HashMap, HashSet}, + collections::HashSet, ffi::{CStr, CString}, fs, marker::PhantomData, @@ -1399,9 +1399,25 @@ impl LedgerColumn { } } -pub struct WriteBatch<'a> { +pub struct WriteBatch { write_batch: RWriteBatch, - map: HashMap<&'static str, &'a ColumnFamily>, +} + +impl WriteBatch { + fn put_cf(&mut self, cf: &ColumnFamily, key: &[u8], value: &[u8]) -> Result<()> { + self.write_batch.put_cf(cf, key, value); + Ok(()) + } + + fn delete_cf(&mut self, cf: &ColumnFamily, key: &[u8]) -> Result<()> { + self.write_batch.delete_cf(cf, key); + Ok(()) + } + + fn delete_range_cf(&mut self, cf: &ColumnFamily, from: &[u8], to: &[u8]) -> Result<()> { + self.write_batch.delete_range_cf(cf, from, to); + Ok(()) + } } impl Database { @@ -1450,12 +1466,7 @@ impl Database { pub fn batch(&self) -> Result { let write_batch = self.backend.batch(); - let map = Rocks::columns() - .into_iter() - .map(|desc| (desc, self.backend.cf_handle(desc))) - .collect(); - - Ok(WriteBatch { write_batch, map }) + Ok(WriteBatch { write_batch }) } pub fn write(&self, batch: WriteBatch) -> Result<()> { @@ -1466,25 +1477,6 @@ impl Database { Ok(fs_extra::dir::get_size(&self.path)?) } - /// Adds a \[`from`, `to`\] range that deletes all entries between the `from` slot - /// and `to` slot inclusively. If `from` slot and `to` slot are the same, then all - /// entries in that slot will be removed. - /// - pub fn delete_range_cf(&self, batch: &mut WriteBatch, from: Slot, to: Slot) -> Result<()> - where - C: Column + ColumnName, - { - let cf = self.cf_handle::(); - // Note that the default behavior of rocksdb's delete_range_cf deletes - // files within [from, to), while our purge logic applies to [from, to]. - // - // For consistency, we make our delete_range_cf works for [from, to] by - // adjusting the `to` slot range by 1. - let from_index = C::as_index(from); - let to_index = C::as_index(to.saturating_add(1)); - batch.delete_range_cf::(cf, from_index, to_index) - } - /// Delete files whose slot range is within \[`from`, `to`\]. pub fn delete_file_in_range_cf(&self, from: Slot, to: Slot) -> Result<()> where @@ -1621,6 +1613,16 @@ where result } + pub fn put_bytes_in_batch( + &self, + batch: &mut WriteBatch, + key: C::Index, + value: &[u8], + ) -> Result<()> { + let key = C::key(key); + batch.put_cf(self.handle(), &key, value) + } + /// Retrieves the specified RocksDB integer property of the current /// column family. /// @@ -1646,6 +1648,28 @@ where } result } + + pub fn delete_in_batch(&self, batch: &mut WriteBatch, key: C::Index) -> Result<()> { + let key = C::key(key); + batch.delete_cf(self.handle(), &key) + } + + /// Adds a \[`from`, `to`\] range that deletes all entries between the `from` slot + /// and `to` slot inclusively. If `from` slot and `to` slot are the same, then all + /// entries in that slot will be removed. + pub fn delete_range_in_batch(&self, batch: &mut WriteBatch, from: Slot, to: Slot) -> Result<()> + where + C: Column + ColumnName, + { + // Note that the default behavior of rocksdb's delete_range_cf deletes + // files within [from, to), while our purge logic applies to [from, to]. + // + // For consistency, we make our delete_range_cf works for [from, to] by + // adjusting the `to` slot range by 1. + let from_key = C::key(C::as_index(from)); + let to_key = C::key(C::as_index(to.saturating_add(1))); + batch.delete_range_cf(self.handle(), &from_key, &to_key) + } } impl LedgerColumn @@ -1728,6 +1752,17 @@ where } result } + + pub fn put_in_batch( + &self, + batch: &mut WriteBatch, + key: C::Index, + value: &C::Type, + ) -> Result<()> { + let key = C::key(key); + let serialized_value = serialize(value)?; + batch.put_cf(self.handle(), &key, &serialized_value) + } } impl LedgerColumn @@ -1851,55 +1886,14 @@ where .map(|index| (index, value)) })) } -} - -impl<'a> WriteBatch<'a> { - pub fn put_bytes(&mut self, key: C::Index, bytes: &[u8]) -> Result<()> { - self.write_batch - .put_cf(self.get_cf::(), C::key(key), bytes); - Ok(()) - } - - pub fn delete(&mut self, key: C::Index) -> Result<()> { - self.delete_raw::(&C::key(key)) - } - - pub(crate) fn delete_raw(&mut self, key: &[u8]) -> Result<()> { - self.write_batch.delete_cf(self.get_cf::(), key); - Ok(()) - } - pub fn put( - &mut self, - key: C::Index, - value: &C::Type, - ) -> Result<()> { - let serialized_value = serialize(&value)?; - self.write_batch - .put_cf(self.get_cf::(), C::key(key), serialized_value); - Ok(()) - } - - #[inline] - fn get_cf(&self) -> &'a ColumnFamily { - self.map[C::NAME] - } - - /// Adds a \[`from`, `to`) range deletion entry to the batch. - /// - /// Note that the \[`from`, `to`) deletion range of WriteBatch::delete_range_cf - /// is different from \[`from`, `to`\] of Database::delete_range_cf as we makes - /// the semantics of Database::delete_range_cf matches the blockstore purge - /// logic. - fn delete_range_cf( - &mut self, - cf: &ColumnFamily, - from: C::Index, - to: C::Index, // exclusive + pub(crate) fn delete_deprecated_in_batch( + &self, + batch: &mut WriteBatch, + key: C::DeprecatedIndex, ) -> Result<()> { - self.write_batch - .delete_range_cf(cf, C::key(from), C::key(to)); - Ok(()) + let key = C::deprecated_key(key); + batch.delete_cf(self.handle(), &key) } }