Skip to content

Commit

Permalink
blockstore: Make WriteBatch operations go through LedgerColumn (#3687)
Browse files Browse the repository at this point in the history
This change is in preparation for a future change that will shift
rocksdb columns to be stack allocated. In order to accomplish that,
the conversion from Column::Index to a byte array will happen within
LedgerColumn functions.

So, this change shifts per-column logic from WriteBatch to LedgerColumn
  • Loading branch information
steviez authored Nov 19, 2024
1 parent be47703 commit 9d588f5
Show file tree
Hide file tree
Showing 3 changed files with 217 additions and 219 deletions.
175 changes: 93 additions & 82 deletions ledger/src/blockstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1141,11 +1141,8 @@ impl Blockstore {
metrics.chaining_elapsed_us += start.as_us();

let mut start = Measure::start("Commit Working Sets");
let (should_signal, newly_completed_slots) = commit_slot_meta_working_set(
&slot_meta_working_set,
&self.completed_slots_senders.lock().unwrap(),
&mut write_batch,
)?;
let (should_signal, newly_completed_slots) =
self.commit_slot_meta_working_set(&slot_meta_working_set, &mut write_batch)?;

for (erasure_set, working_erasure_meta) in erasure_metas.iter() {
if !working_erasure_meta.should_write() {
Expand Down Expand Up @@ -1212,7 +1209,8 @@ impl Blockstore {
continue;
}
let (slot, fec_set_index) = erasure_set.store_key();
write_batch.put::<cf::ErasureMeta>(
self.erasure_meta_cf.put_in_batch(
&mut write_batch,
(slot, u64::from(fec_set_index)),
working_erasure_meta.as_ref(),
)?;
Expand All @@ -1223,15 +1221,20 @@ impl Blockstore {
// No need to rewrite the column
continue;
}
write_batch.put::<cf::MerkleRootMeta>(
self.merkle_root_meta_cf.put_in_batch(
&mut write_batch,
erasure_set.store_key(),
working_merkle_root_meta.as_ref(),
)?;
}

for (&slot, index_working_set_entry) in index_working_set.iter() {
if index_working_set_entry.did_insert_occur {
write_batch.put::<cf::Index>(slot, &index_working_set_entry.index)?;
self.index_cf.put_in_batch(
&mut write_batch,
slot,
&index_working_set_entry.index,
)?;
}
}
start.stop();
Expand Down Expand Up @@ -1658,7 +1661,9 @@ impl Blockstore {
{} is not full, marking slot dead",
shred_index, slot_meta.received, slot
);
write_batch.put::<cf::DeadSlots>(slot, &true).unwrap();
self.dead_slots_cf
.put_in_batch(write_batch, slot, &true)
.unwrap();
}

if !self.should_insert_data_shred(
Expand Down Expand Up @@ -1731,7 +1736,8 @@ impl Blockstore {

// Commit step: commit all changes to the mutable structures at once, or none at all.
// We don't want only a subset of these changes going through.
write_batch.put_bytes::<cf::ShredCode>((slot, shred_index), shred.payload())?;
self.code_shred_cf
.put_bytes_in_batch(write_batch, (slot, shred_index), shred.payload())?;
index_meta.coding_mut().insert(shred_index);

Ok(())
Expand Down Expand Up @@ -2194,7 +2200,11 @@ impl Blockstore {

// Commit step: commit all changes to the mutable structures at once, or none at all.
// We don't want only a subset of these changes going through.
write_batch.put_bytes::<cf::ShredData>((slot, index), shred.bytes_to_store())?;
self.data_shred_cf.put_bytes_in_batch(
write_batch,
(slot, index),
shred.bytes_to_store(),
)?;
data_index.insert(index);
let newly_completed_data_sets = update_slot_meta(
last_in_slot,
Expand Down Expand Up @@ -2940,7 +2950,7 @@ impl Blockstore {
keys_with_writable: impl Iterator<Item = (&'a Pubkey, bool)>,
status: TransactionStatusMeta,
transaction_index: usize,
db_write_batch: &mut WriteBatch<'_>,
db_write_batch: &mut WriteBatch,
) -> Result<()> {
self.write_transaction_status_helper(
slot,
Expand All @@ -2949,7 +2959,8 @@ impl Blockstore {
status,
transaction_index,
|address, slot, tx_index, signature, writeable| {
db_write_batch.put::<cf::AddressSignatures>(
self.address_signatures_cf.put_in_batch(
db_write_batch,
(*address, slot, tx_index, signature),
&AddressSignatureMeta { writeable },
)
Expand Down Expand Up @@ -2989,9 +3000,10 @@ impl Blockstore {
signature: &Signature,
slot: Slot,
memos: String,
db_write_batch: &mut WriteBatch<'_>,
db_write_batch: &mut WriteBatch,
) -> Result<()> {
db_write_batch.put::<cf::TransactionMemos>((*signature, slot), &memos)
self.transaction_memos_cf
.put_in_batch(db_write_batch, (*signature, slot), &memos)
}

/// Acquires the `lowest_cleanup_slot` lock and returns a tuple of the held lock
Expand Down Expand Up @@ -3989,7 +4001,8 @@ impl Blockstore {
frozen_hash,
is_duplicate_confirmed: true,
});
write_batch.put::<cf::BankHash>(slot, &data)?;
self.bank_hash_cf
.put_in_batch(&mut write_batch, slot, &data)?;
}

self.db.write(write_batch)?;
Expand All @@ -4001,7 +4014,7 @@ impl Blockstore {
let mut max_new_rooted_slot = 0;
for slot in rooted_slots {
max_new_rooted_slot = std::cmp::max(max_new_rooted_slot, *slot);
write_batch.put::<cf::Root>(*slot, &true)?;
self.roots_cf.put_in_batch(&mut write_batch, *slot, &true)?;
}

self.db.write(write_batch)?;
Expand Down Expand Up @@ -4306,7 +4319,8 @@ impl Blockstore {
// slot match the flags of slots that become connected the typical way.
root_meta.set_parent_connected();
root_meta.set_connected();
write_batch.put::<cf::SlotMeta>(root_meta.slot, &root_meta)?;
self.meta_cf
.put_in_batch(&mut write_batch, root_meta.slot, &root_meta)?;

let mut next_slots = VecDeque::from(root_meta.next_slots);
while !next_slots.is_empty() {
Expand All @@ -4318,7 +4332,8 @@ impl Blockstore {
if meta.set_parent_connected() {
next_slots.extend(meta.next_slots.iter());
}
write_batch.put::<cf::SlotMeta>(meta.slot, &meta)?;
self.meta_cf
.put_in_batch(&mut write_batch, meta.slot, &meta)?;
}

self.db.write(write_batch)?;
Expand Down Expand Up @@ -4360,7 +4375,7 @@ impl Blockstore {
// Write all the newly changed slots in new_chained_slots to the write_batch
for (slot, meta) in new_chained_slots.iter() {
let meta: &SlotMeta = &RefCell::borrow(meta);
write_batch.put::<cf::SlotMeta>(*slot, meta)?;
self.meta_cf.put_in_batch(write_batch, *slot, meta)?;
}
Ok(())
}
Expand Down Expand Up @@ -4436,14 +4451,15 @@ impl Blockstore {
// If the parent of `slot` is a newly inserted orphan, insert it into the orphans
// column family
if RefCell::borrow(&*prev_slot_meta).is_orphan() {
write_batch.put::<cf::Orphans>(prev_slot, &true)?;
self.orphans_cf
.put_in_batch(write_batch, prev_slot, &true)?;
}
}
}

// At this point this slot has received a parent, so it's no longer an orphan
if was_orphan_slot {
write_batch.delete::<cf::Orphans>(slot)?;
self.orphans_cf.delete_in_batch(write_batch, slot)?;
}
}

Expand Down Expand Up @@ -4506,6 +4522,50 @@ impl Blockstore {
Ok(())
}

/// For each slot in the slot_meta_working_set which has any change, include
/// corresponding updates to cf::SlotMeta via the specified `write_batch`.
/// The `write_batch` will later be atomically committed to the blockstore.
///
/// Arguments:
/// - `slot_meta_working_set`: a map that maintains slot-id to its `SlotMeta`
/// mapping.
/// - `write_batch`: the write batch which includes all the updates of the
/// the current write and ensures their atomicity.
///
/// On success, the function returns an Ok result with <should_signal,
/// newly_completed_slots> pair where:
/// - `should_signal`: a boolean flag indicating whether to send signal.
/// - `newly_completed_slots`: a subset of slot_meta_working_set which are
/// newly completed.
fn commit_slot_meta_working_set(
&self,
slot_meta_working_set: &HashMap<u64, SlotMetaWorkingSetEntry>,
write_batch: &mut WriteBatch,
) -> Result<(bool, Vec<u64>)> {
let mut should_signal = false;
let mut newly_completed_slots = vec![];
let completed_slots_senders = self.completed_slots_senders.lock().unwrap();

// Check if any metadata was changed, if so, insert the new version of the
// metadata into the write batch
for (slot, slot_meta_entry) in slot_meta_working_set.iter() {
// Any slot that wasn't written to should have been filtered out by now.
assert!(slot_meta_entry.did_insert_occur);
let meta: &SlotMeta = &RefCell::borrow(&*slot_meta_entry.new_slot_meta);
let meta_backup = &slot_meta_entry.old_slot_meta;
if !completed_slots_senders.is_empty() && is_newly_completed_slot(meta, meta_backup) {
newly_completed_slots.push(*slot);
}
// Check if the working copy of the metadata has changed
if Some(meta) != meta_backup.as_ref() {
should_signal = should_signal || slot_has_updates(meta, meta_backup);
self.meta_cf.put_in_batch(write_batch, *slot, meta)?;
}
}

Ok((should_signal, newly_completed_slots))
}

/// Obtain the SlotMeta from the in-memory slot_meta_working_set or load
/// it from the database if it does not exist in slot_meta_working_set.
///
Expand Down Expand Up @@ -4625,7 +4685,7 @@ impl Blockstore {
res
}

pub fn get_write_batch(&self) -> std::result::Result<WriteBatch<'_>, BlockstoreError> {
pub fn get_write_batch(&self) -> std::result::Result<WriteBatch, BlockstoreError> {
self.db.batch()
}

Expand Down Expand Up @@ -4751,51 +4811,6 @@ fn send_signals(
}
}

/// For each slot in the slot_meta_working_set which has any change, include
/// corresponding updates to cf::SlotMeta via the specified `write_batch`.
/// The `write_batch` will later be atomically committed to the blockstore.
///
/// Arguments:
/// - `slot_meta_working_set`: a map that maintains slot-id to its `SlotMeta`
/// mapping.
/// - `completed_slot_senders`: the units which are responsible for sending
/// signals for completed slots.
/// - `write_batch`: the write batch which includes all the updates of the
/// the current write and ensures their atomicity.
///
/// On success, the function returns an Ok result with <should_signal,
/// newly_completed_slots> pair where:
/// - `should_signal`: a boolean flag indicating whether to send signal.
/// - `newly_completed_slots`: a subset of slot_meta_working_set which are
/// newly completed.
fn commit_slot_meta_working_set(
slot_meta_working_set: &HashMap<u64, SlotMetaWorkingSetEntry>,
completed_slots_senders: &[Sender<Vec<u64>>],
write_batch: &mut WriteBatch,
) -> Result<(bool, Vec<u64>)> {
let mut should_signal = false;
let mut newly_completed_slots = vec![];

// Check if any metadata was changed, if so, insert the new version of the
// metadata into the write batch
for (slot, slot_meta_entry) in slot_meta_working_set.iter() {
// Any slot that wasn't written to should have been filtered out by now.
assert!(slot_meta_entry.did_insert_occur);
let meta: &SlotMeta = &RefCell::borrow(&*slot_meta_entry.new_slot_meta);
let meta_backup = &slot_meta_entry.old_slot_meta;
if !completed_slots_senders.is_empty() && is_newly_completed_slot(meta, meta_backup) {
newly_completed_slots.push(*slot);
}
// Check if the working copy of the metadata has changed
if Some(meta) != meta_backup.as_ref() {
should_signal = should_signal || slot_has_updates(meta, meta_backup);
write_batch.put::<cf::SlotMeta>(*slot, meta)?;
}
}

Ok((should_signal, newly_completed_slots))
}

/// Returns the `SlotMeta` of the specified `slot` from the two cached states:
/// `working_set` and `chained_slots`. If both contain the `SlotMeta`, then
/// the latest one from the `working_set` will be returned.
Expand Down Expand Up @@ -7522,11 +7537,9 @@ pub mod tests {
);

for (erasure_set, working_merkle_root_meta) in merkle_root_metas {
write_batch
.put::<cf::MerkleRootMeta>(
erasure_set.store_key(),
working_merkle_root_meta.as_ref(),
)
blockstore
.merkle_root_meta_cf
.put(erasure_set.store_key(), working_merkle_root_meta.as_ref())
.unwrap();
}
blockstore.db.write(write_batch).unwrap();
Expand Down Expand Up @@ -7721,11 +7734,9 @@ pub mod tests {
);

for (erasure_set, working_merkle_root_meta) in merkle_root_metas {
write_batch
.put::<cf::MerkleRootMeta>(
erasure_set.store_key(),
working_merkle_root_meta.as_ref(),
)
blockstore
.merkle_root_meta_cf
.put(erasure_set.store_key(), working_merkle_root_meta.as_ref())
.unwrap();
}
blockstore.db.write(write_batch).unwrap();
Expand Down Expand Up @@ -11913,8 +11924,8 @@ pub mod tests {
.unwrap();
let mut write_batch = blockstore.db.batch().unwrap();
blockstore
.db
.delete_range_cf::<cf::MerkleRootMeta>(&mut write_batch, slot, slot)
.merkle_root_meta_cf
.delete_range_in_batch(&mut write_batch, slot, slot)
.unwrap();
blockstore.db.write(write_batch).unwrap();
assert!(blockstore
Expand Down Expand Up @@ -11979,8 +11990,8 @@ pub mod tests {
// an older version.
let mut write_batch = blockstore.db.batch().unwrap();
blockstore
.db
.delete_range_cf::<cf::MerkleRootMeta>(&mut write_batch, slot, slot)
.merkle_root_meta_cf
.delete_range_in_batch(&mut write_batch, slot, slot)
.unwrap();
blockstore.db.write(write_batch).unwrap();
assert!(blockstore
Expand Down
Loading

0 comments on commit 9d588f5

Please sign in to comment.