Skip to content

Commit

Permalink
Delete inactive entries after staging commit (kaspanet#301)
Browse files Browse the repository at this point in the history
* Delete inactive entries after staging commit

* Delete staging db when cancelled

* Address review comments

* renaming + comments + one missing drop
  • Loading branch information
someone235 authored Oct 25, 2023
1 parent 19ea1a6 commit c6b9a7b
Show file tree
Hide file tree
Showing 4 changed files with 101 additions and 55 deletions.
37 changes: 32 additions & 5 deletions components/consensusmanager/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,6 @@ pub trait ConsensusCtl: Sync + Send {

/// Set as current active consensus
fn make_active(&self);

/// Delete this consensus instance from memory and disk permanently
fn delete(&self);
}

pub type DynConsensusCtl = Arc<dyn ConsensusCtl>;
Expand All @@ -37,6 +34,13 @@ pub trait ConsensusFactory: Sync + Send {

/// Close the factory and cleanup any shared resources used by it
fn close(&self);

/// If the node is not configured as archival -- delete inactive consensus entries and their databases
fn delete_inactive_consensus_entries(&self);

/// Delete the staging consensus entry and its database (this is done even if the node is archival
/// since staging reflects non-final data)
fn delete_staging_entry(&self);
}

/// Test-only mock factory
Expand All @@ -54,6 +58,14 @@ impl ConsensusFactory for MockFactory {
fn close(&self) {
unimplemented!()
}

fn delete_inactive_consensus_entries(&self) {
unimplemented!()
}

fn delete_staging_entry(&self) {
unimplemented!()
}
}

/// Defines a trait which handles consensus resets for external parts of the system. We avoid using
Expand Down Expand Up @@ -146,6 +158,14 @@ impl ConsensusManager {
debug!("[Consensus manager] all consensus threads exited");
self.factory.close();
}

pub fn delete_inactive_consensus_entries(&self) {
self.factory.delete_inactive_consensus_entries();
}

pub fn delete_staging_entry(&self) {
self.factory.delete_staging_entry();
}
}

impl Service for ConsensusManager {
Expand Down Expand Up @@ -185,15 +205,22 @@ impl StagingConsensus {
for handler in handlers {
handler.handle_consensus_reset();
}
// TODO: Delete non active consensus entries
// Drop `prev` so that deletion below succeeds
drop(prev);
// Staging was committed and is now the active consensus so we can delete
// any pervious, now inactive, consensus entries
self.manager.delete_inactive_consensus_entries();
}

pub fn cancel(self) {
self.staging.ctl.stop();
for handle in self.handles {
handle.join().unwrap();
}
self.staging.ctl.delete();
// Drop staging (and DB refs therein) so that the delete operation below succeeds
drop(self.staging);
// Delete the canceled staging consensus
self.manager.delete_staging_entry();
}
}

Expand Down
25 changes: 0 additions & 25 deletions consensus/src/consensus/ctl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,27 +39,6 @@ impl ConsensusCtl for Ctl {
fn make_active(&self) {
// TODO: pass a value to make sure the correct consensus is committed
self.management_store.write().commit_staging_consensus().unwrap();

// TODO: delete previous active
}

fn delete(&self) {
// TODO: see above
self.management_store.write().cancel_staging_consensus().unwrap();

// TODO: delete staging
// for _ in 0..16 {
// if self.consensus_db_ref.strong_count() > 0 {
// // Sometimes another thread is shuting-down and cleaning resources
// std::thread::sleep(std::time::Duration::from_millis(500));
// } else {
// break;
// }
// }
// assert_eq!(self.consensus_db_ref.strong_count(), 0, "DB has strong references and cannot be deleted");
// let options = rocksdb::Options::default();
// DB::destroy(&options, self.consensus_db_path.clone())
// .expect("DB is expected to be deletable since there are no references to it");
}
}

Expand All @@ -76,8 +55,4 @@ impl ConsensusCtl for Consensus {
fn make_active(&self) {
unimplemented!()
}

fn delete(&self) {
unimplemented!()
}
}
86 changes: 61 additions & 25 deletions consensus/src/consensus/factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use itertools::Itertools;
use kaspa_consensus_core::config::Config;
use kaspa_consensus_notify::root::ConsensusNotificationRoot;
use kaspa_consensusmanager::{ConsensusFactory, ConsensusInstance, DynConsensusCtl, SessionLock};
use kaspa_core::{debug, time::unix_now};
use kaspa_core::{debug, time::unix_now, warn};
use kaspa_database::{
prelude::{BatchDbWriter, CachedDbAccess, CachedDbItem, DirectDbWriter, StoreError, StoreResult, StoreResultExtensions, DB},
registry::DatabaseStorePrefixes,
Expand Down Expand Up @@ -95,6 +95,15 @@ impl MultiConsensusManagementStore {
}
}

// This function assumes metadata is already set
pub fn staging_consensus_entry(&mut self) -> Option<ConsensusEntry> {
let metadata = self.metadata.read().unwrap();
match metadata.staging_consensus_key {
Some(key) => Some(self.entries.read(key.into()).unwrap()),
None => None,
}
}

pub fn save_new_active_consensus(&mut self, entry: ConsensusEntry) -> StoreResult<()> {
let key = entry.key;
if self.entries.has(key.into())? {
Expand All @@ -113,8 +122,6 @@ impl MultiConsensusManagementStore {
pub fn new_staging_consensus_entry(&mut self) -> StoreResult<ConsensusEntry> {
let mut metadata = self.metadata.read()?;

// TODO: handle the case where `staging_consensus_key` is already some (perhaps from a previous interrupted run)

metadata.max_key_used += 1;
let new_key = metadata.max_key_used;
metadata.staging_consensus_key = Some(new_key);
Expand Down Expand Up @@ -152,7 +159,7 @@ impl MultiConsensusManagementStore {
})
}

fn iterate_non_active(&self) -> impl Iterator<Item = Result<ConsensusEntry, Box<dyn Error>>> + '_ {
fn iterate_inactive_entries(&self) -> impl Iterator<Item = Result<ConsensusEntry, Box<dyn Error>>> + '_ {
let current_consensus_key = self.metadata.read().unwrap().current_consensus_key;
self.iterator().filter(move |entry_result| {
if let Ok(entry) = entry_result {
Expand Down Expand Up @@ -213,29 +220,9 @@ impl Factory {
management_store.write().set_is_archival_node(config.is_archival);
let factory =
Self { management_store, config, db_root_dir, db_parallelism, notification_root, counters, tx_script_cache_counters };
factory.clean_non_active_consensus_entries();
factory.delete_inactive_consensus_entries();
factory
}

pub fn clean_non_active_consensus_entries(&self) {
if self.config.is_archival {
return;
}

let mut write_guard = self.management_store.write();
let entries_to_delete = write_guard.iterate_non_active().collect_vec();
for entry_result in entries_to_delete.iter() {
let entry = entry_result.as_ref().unwrap();
let dir = self.db_root_dir.join(entry.directory_name.clone());
if dir.exists() {
fs::remove_dir_all(dir).unwrap();
}
}

for entry_result in entries_to_delete {
write_guard.delete_entry(entry_result.unwrap()).unwrap();
}
}
}

impl ConsensusFactory for Factory {
Expand Down Expand Up @@ -307,4 +294,53 @@ impl ConsensusFactory for Factory {
debug!("Consensus factory: closing");
self.notification_root.close();
}

fn delete_inactive_consensus_entries(&self) {
// Staging entry is deleted also by archival nodes since it represents non-final data
self.delete_staging_entry();

if self.config.is_archival {
return;
}

let mut write_guard = self.management_store.write();
let entries_to_delete = write_guard
.iterate_inactive_entries()
.filter_map(|entry_result| {
let entry = entry_result.unwrap();
let dir = self.db_root_dir.join(entry.directory_name.clone());
if dir.exists() {
match fs::remove_dir_all(dir) {
Ok(_) => Some(entry),
Err(e) => {
warn!("Error deleting consensus entry {}: {}", entry.key, e);
None
}
}
} else {
Some(entry)
}
})
.collect_vec();

for entry in entries_to_delete {
write_guard.delete_entry(entry).unwrap();
}
}

fn delete_staging_entry(&self) {
let mut write_guard = self.management_store.write();
if let Some(entry) = write_guard.staging_consensus_entry() {
let dir = self.db_root_dir.join(entry.directory_name.clone());
match fs::remove_dir_all(dir) {
Ok(_) => {
write_guard.delete_entry(entry).unwrap();
}
Err(e) => {
warn!("Error deleting staging consensus entry {}: {}", entry.key, e);
}
};
write_guard.cancel_staging_consensus().unwrap();
}
}
}
8 changes: 8 additions & 0 deletions consensus/src/consensus/test_consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,4 +282,12 @@ impl ConsensusFactory for TestConsensusFactory {
fn close(&self) {
self.tc.notification_root().close();
}

fn delete_inactive_consensus_entries(&self) {
unimplemented!()
}

fn delete_staging_entry(&self) {
unimplemented!()
}
}

0 comments on commit c6b9a7b

Please sign in to comment.