diff --git a/components/consensusmanager/src/lib.rs b/components/consensusmanager/src/lib.rs index 0d41229e8..93611ce3d 100644 --- a/components/consensusmanager/src/lib.rs +++ b/components/consensusmanager/src/lib.rs @@ -21,9 +21,6 @@ pub trait ConsensusCtl: Sync + Send { /// Set as current active consensus fn make_active(&self); - - /// Delete this consensus instance from memory and disk permanently - fn delete(&self); } pub type DynConsensusCtl = Arc; @@ -37,6 +34,13 @@ pub trait ConsensusFactory: Sync + Send { /// Close the factory and cleanup any shared resources used by it fn close(&self); + + /// If the node is not configured as archival -- delete inactive consensus entries and their databases + fn delete_inactive_consensus_entries(&self); + + /// Delete the staging consensus entry and its database (this is done even if the node is archival + /// since staging reflects non-final data) + fn delete_staging_entry(&self); } /// Test-only mock factory @@ -54,6 +58,14 @@ impl ConsensusFactory for MockFactory { fn close(&self) { unimplemented!() } + + fn delete_inactive_consensus_entries(&self) { + unimplemented!() + } + + fn delete_staging_entry(&self) { + unimplemented!() + } } /// Defines a trait which handles consensus resets for external parts of the system. We avoid using @@ -146,6 +158,14 @@ impl ConsensusManager { debug!("[Consensus manager] all consensus threads exited"); self.factory.close(); } + + pub fn delete_inactive_consensus_entries(&self) { + self.factory.delete_inactive_consensus_entries(); + } + + pub fn delete_staging_entry(&self) { + self.factory.delete_staging_entry(); + } } impl Service for ConsensusManager { @@ -185,7 +205,11 @@ impl StagingConsensus { for handler in handlers { handler.handle_consensus_reset(); } - // TODO: Delete non active consensus entries + // Drop `prev` so that deletion below succeeds + drop(prev); + // Staging was committed and is now the active consensus so we can delete + // any pervious, now inactive, consensus entries + self.manager.delete_inactive_consensus_entries(); } pub fn cancel(self) { @@ -193,7 +217,10 @@ impl StagingConsensus { for handle in self.handles { handle.join().unwrap(); } - self.staging.ctl.delete(); + // Drop staging (and DB refs therein) so that the delete operation below succeeds + drop(self.staging); + // Delete the canceled staging consensus + self.manager.delete_staging_entry(); } } diff --git a/consensus/src/consensus/ctl.rs b/consensus/src/consensus/ctl.rs index cae037721..4b2eb1511 100644 --- a/consensus/src/consensus/ctl.rs +++ b/consensus/src/consensus/ctl.rs @@ -39,27 +39,6 @@ impl ConsensusCtl for Ctl { fn make_active(&self) { // TODO: pass a value to make sure the correct consensus is committed self.management_store.write().commit_staging_consensus().unwrap(); - - // TODO: delete previous active - } - - fn delete(&self) { - // TODO: see above - self.management_store.write().cancel_staging_consensus().unwrap(); - - // TODO: delete staging - // for _ in 0..16 { - // if self.consensus_db_ref.strong_count() > 0 { - // // Sometimes another thread is shuting-down and cleaning resources - // std::thread::sleep(std::time::Duration::from_millis(500)); - // } else { - // break; - // } - // } - // assert_eq!(self.consensus_db_ref.strong_count(), 0, "DB has strong references and cannot be deleted"); - // let options = rocksdb::Options::default(); - // DB::destroy(&options, self.consensus_db_path.clone()) - // .expect("DB is expected to be deletable since there are no references to it"); } } @@ -76,8 +55,4 @@ impl ConsensusCtl for Consensus { fn make_active(&self) { unimplemented!() } - - fn delete(&self) { - unimplemented!() - } } diff --git a/consensus/src/consensus/factory.rs b/consensus/src/consensus/factory.rs index cebd0356c..f497a79dc 100644 --- a/consensus/src/consensus/factory.rs +++ b/consensus/src/consensus/factory.rs @@ -6,7 +6,7 @@ use itertools::Itertools; use kaspa_consensus_core::config::Config; use kaspa_consensus_notify::root::ConsensusNotificationRoot; use kaspa_consensusmanager::{ConsensusFactory, ConsensusInstance, DynConsensusCtl, SessionLock}; -use kaspa_core::{debug, time::unix_now}; +use kaspa_core::{debug, time::unix_now, warn}; use kaspa_database::{ prelude::{BatchDbWriter, CachedDbAccess, CachedDbItem, DirectDbWriter, StoreError, StoreResult, StoreResultExtensions, DB}, registry::DatabaseStorePrefixes, @@ -95,6 +95,15 @@ impl MultiConsensusManagementStore { } } + // This function assumes metadata is already set + pub fn staging_consensus_entry(&mut self) -> Option { + let metadata = self.metadata.read().unwrap(); + match metadata.staging_consensus_key { + Some(key) => Some(self.entries.read(key.into()).unwrap()), + None => None, + } + } + pub fn save_new_active_consensus(&mut self, entry: ConsensusEntry) -> StoreResult<()> { let key = entry.key; if self.entries.has(key.into())? { @@ -113,8 +122,6 @@ impl MultiConsensusManagementStore { pub fn new_staging_consensus_entry(&mut self) -> StoreResult { let mut metadata = self.metadata.read()?; - // TODO: handle the case where `staging_consensus_key` is already some (perhaps from a previous interrupted run) - metadata.max_key_used += 1; let new_key = metadata.max_key_used; metadata.staging_consensus_key = Some(new_key); @@ -152,7 +159,7 @@ impl MultiConsensusManagementStore { }) } - fn iterate_non_active(&self) -> impl Iterator>> + '_ { + fn iterate_inactive_entries(&self) -> impl Iterator>> + '_ { let current_consensus_key = self.metadata.read().unwrap().current_consensus_key; self.iterator().filter(move |entry_result| { if let Ok(entry) = entry_result { @@ -213,29 +220,9 @@ impl Factory { management_store.write().set_is_archival_node(config.is_archival); let factory = Self { management_store, config, db_root_dir, db_parallelism, notification_root, counters, tx_script_cache_counters }; - factory.clean_non_active_consensus_entries(); + factory.delete_inactive_consensus_entries(); factory } - - pub fn clean_non_active_consensus_entries(&self) { - if self.config.is_archival { - return; - } - - let mut write_guard = self.management_store.write(); - let entries_to_delete = write_guard.iterate_non_active().collect_vec(); - for entry_result in entries_to_delete.iter() { - let entry = entry_result.as_ref().unwrap(); - let dir = self.db_root_dir.join(entry.directory_name.clone()); - if dir.exists() { - fs::remove_dir_all(dir).unwrap(); - } - } - - for entry_result in entries_to_delete { - write_guard.delete_entry(entry_result.unwrap()).unwrap(); - } - } } impl ConsensusFactory for Factory { @@ -307,4 +294,53 @@ impl ConsensusFactory for Factory { debug!("Consensus factory: closing"); self.notification_root.close(); } + + fn delete_inactive_consensus_entries(&self) { + // Staging entry is deleted also by archival nodes since it represents non-final data + self.delete_staging_entry(); + + if self.config.is_archival { + return; + } + + let mut write_guard = self.management_store.write(); + let entries_to_delete = write_guard + .iterate_inactive_entries() + .filter_map(|entry_result| { + let entry = entry_result.unwrap(); + let dir = self.db_root_dir.join(entry.directory_name.clone()); + if dir.exists() { + match fs::remove_dir_all(dir) { + Ok(_) => Some(entry), + Err(e) => { + warn!("Error deleting consensus entry {}: {}", entry.key, e); + None + } + } + } else { + Some(entry) + } + }) + .collect_vec(); + + for entry in entries_to_delete { + write_guard.delete_entry(entry).unwrap(); + } + } + + fn delete_staging_entry(&self) { + let mut write_guard = self.management_store.write(); + if let Some(entry) = write_guard.staging_consensus_entry() { + let dir = self.db_root_dir.join(entry.directory_name.clone()); + match fs::remove_dir_all(dir) { + Ok(_) => { + write_guard.delete_entry(entry).unwrap(); + } + Err(e) => { + warn!("Error deleting staging consensus entry {}: {}", entry.key, e); + } + }; + write_guard.cancel_staging_consensus().unwrap(); + } + } } diff --git a/consensus/src/consensus/test_consensus.rs b/consensus/src/consensus/test_consensus.rs index 959cd719f..5d97fee8a 100644 --- a/consensus/src/consensus/test_consensus.rs +++ b/consensus/src/consensus/test_consensus.rs @@ -282,4 +282,12 @@ impl ConsensusFactory for TestConsensusFactory { fn close(&self) { self.tc.notification_root().close(); } + + fn delete_inactive_consensus_entries(&self) { + unimplemented!() + } + + fn delete_staging_entry(&self) { + unimplemented!() + } }