Skip to content

Commit

Permalink
Implement file descriptor budget pool (kaspanet#284)
Browse files Browse the repository at this point in the history
* "Implement file descriptor budget pool"

In this commit, a file descriptor budget pool has been implemented for better allocation of file descriptors. The feature has been added in a new library named "fd_budget_pool".

The implementation uses atomic operations to keep track of acquired file descriptors and ensures that the total number of acquired descriptors never exceeds a system-specific limit. A FDGuard structure was introduced to automatically release the acquired file descriptors when they are no longer in use. The ordering of atomic operations is marked as 'to do' for later optimization considerations. The get_limit function will check the system and determine the maximum allowable file descriptors.

Additionally, tests are written to confirm the correct functioning of the feature. Changes have also been made to Cargo files to include the new library and its dependencies. The ultimate goal of this feature is to prevent system-level issues arising from exhausting file descriptors.

* "Move fd_budget_pool to utils module"

Moved the 'fd_budget_pool' module into the 'utils' module as 'fd_budget'. This was done to consolidate all utility functions into one module to make the codebase cleaner and easier to maintain. This commit involves removal of 'fd_budget_pool' from the workspace and updated the 'Cargo.toml' and 'lib.rs' files accordingly. The code was also refactored to improve error handling.

* Optimize file descriptor usage across databases.

Many database interfaces were changed to support setting a file descriptor limit upon creation of the DB. This allows better allocation of the total file descriptor budget across the multiple databases instantiated across the system. A utility function was also added to the utils crate to manage the allocation of file descriptors. This will lead to a more stable system under high load and better use of system resources.

* Add 'rlimit' dependency for non-wasm32 targets

The 'rlimit' dependency was added in the Cargo.toml file under the condition that the target architecture is not 'wasm32'. This is to ensure that appropriate resource limits can be set for the given non-wasm32 targets.

* Adjust test_db_relations_store to include drop operation

Added a 'drop(lt)' function to the 'test_db_relations_store()' function in relations.rs. The function 'lt' was previously created but never used. The new 'drop(lt)' ensures that 'lt' is cleared from memory when it is no longer needed, optimizing the performance and memory usage.

* decrease limit for daemon_sanity_test

* use constants defining db file limits

* add fd_total_budget to mempool benchmark

* Add cleanup step to core and task runtime

This commit introduces a cleanup phase by clearing services in both core and task runtime. This is done to ensure that all services are dropped and properly disposed of after being used. For the task runtime, this cleanup happens after the async-runtime worker stops. For the core, cleanup occurs once it's shut down. This step further advances the robustness of our termination handling.

* Refactor 'get_limit' function to 'limit' in 'fd_budget'

The function 'get_limit' in the file 'fd_budget.rs' has been renamed to 'limit' to reflect its getter style and improve the code readability. This change affects the references to 'get_limit' from 'mempool_benchmarks.rs' and 'main.rs' files as well, which are updated accordingly. Additionally, updated the 'fd_total_budget' calculation in 'main.rs' to subtract 32 instead of 64.

* Refactor fd_budget test function name

The test function in fd_budget.rs was renamed from 'it_works' to 'test_acquire_and_release_guards'. This change was made to better describe the function's purpose and operations. The refined name illustrates that this function tests both the acquisition and release of guards, hence providing clarity to the code.

* Make fd_total_budget field private

* Updated fd_budget for unsupported OS fallback

Modified the fd_budget.rs file to return a fallback value of 512 for unsupported operating systems rather than panicking. This improvement provides better stability for the application when running in different environments.

* Modify fd_budget for better resource allocation

    Adjusted fd_budget allocation within multiple files to enhance resource utilization across various components of the application. The changes include dividing fd_total_budget equally among active and staging consensuses and assigning fd_budget based on the given limit values in each module. These modifications were implemented to ensure more efficient and fair distribution of resources, and to prevent potential resource allocation issues for unsupported operating systems.

* "Refactor fd_budget import statement

The import statement for the fd_budget module was refactored across multiple files for better readability. Instead of directly importing the limit function, the whole fd_budget module is being imported, and calls to limit function are then prefixed with 'fd_budget::'. This provides clearer code structure and facilitates easier tracking of module usage."

* add positive check for fd_budget

---------

Co-authored-by: Ori Newman <[email protected]>
  • Loading branch information
biryukovmaxim and someone235 authored Oct 29, 2023
1 parent 83f840a commit ae7dae3
Show file tree
Hide file tree
Showing 28 changed files with 295 additions and 99 deletions.
3 changes: 2 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,6 @@ kaspa-wrpc-server = { version = "0.1.7", path = "rpc/wrpc/server" }
kaspa-wrpc-wasm = { version = "0.1.7", path = "rpc/wrpc/wasm" }
kaspad = { version = "0.1.7", path = "kaspad" }
kaspa-perf-monitor = { path = "metrics/perf_monitor" }

# external
thiserror = "1"
faster-hex = "0.6"
Expand Down
2 changes: 1 addition & 1 deletion components/addressmanager/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -543,7 +543,7 @@ mod address_store_with_cache {
// Assert that initial distribution is skewed, and hence not uniform from the outset.
assert!(bucket_reduction_ratio >= 1.25);

let db = create_temp_db!(ConnBuilder::default());
let db = create_temp_db!(ConnBuilder::default().with_files_limit(10));
let config = Config::new(SIMNET_PARAMS);
let (am, _) = AddressManager::new(Arc::new(config), db.1, Arc::new(TickService::default()));

Expand Down
29 changes: 25 additions & 4 deletions consensus/src/consensus/factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ pub struct Factory {
notification_root: Arc<ConsensusNotificationRoot>,
counters: Arc<ProcessingCounters>,
tx_script_cache_counters: Arc<TxScriptCacheCounters>,
fd_budget: i32,
}

impl Factory {
Expand All @@ -211,15 +212,25 @@ impl Factory {
notification_root: Arc<ConsensusNotificationRoot>,
counters: Arc<ProcessingCounters>,
tx_script_cache_counters: Arc<TxScriptCacheCounters>,
fd_budget: i32,
) -> Self {
assert!(fd_budget > 0, "fd_budget has to be positive");
let mut config = config.clone();
#[cfg(feature = "devnet-prealloc")]
set_genesis_utxo_commitment_from_config(&mut config);
config.process_genesis = false;
let management_store = Arc::new(RwLock::new(MultiConsensusManagementStore::new(management_db)));
management_store.write().set_is_archival_node(config.is_archival);
let factory =
Self { management_store, config, db_root_dir, db_parallelism, notification_root, counters, tx_script_cache_counters };
let factory = Self {
management_store,
config,
db_root_dir,
db_parallelism,
notification_root,
counters,
tx_script_cache_counters,
fd_budget,
};
factory.delete_inactive_consensus_entries();
factory
}
Expand All @@ -245,7 +256,12 @@ impl ConsensusFactory for Factory {
};

let dir = self.db_root_dir.join(entry.directory_name.clone());
let db = kaspa_database::prelude::ConnBuilder::default().with_db_path(dir).with_parallelism(self.db_parallelism).build();
let db = kaspa_database::prelude::ConnBuilder::default()
.with_db_path(dir)
.with_parallelism(self.db_parallelism)
.with_files_limit(self.fd_budget / 2) // active and staging consensuses should have equal budgets
.build()
.unwrap();

let session_lock = SessionLock::new();
let consensus = Arc::new(Consensus::new(
Expand Down Expand Up @@ -274,7 +290,12 @@ impl ConsensusFactory for Factory {

let entry = self.management_store.write().new_staging_consensus_entry().unwrap();
let dir = self.db_root_dir.join(entry.directory_name);
let db = kaspa_database::prelude::ConnBuilder::default().with_db_path(dir).with_parallelism(self.db_parallelism).build();
let db = kaspa_database::prelude::ConnBuilder::default()
.with_db_path(dir)
.with_parallelism(self.db_parallelism)
.with_files_limit(self.fd_budget / 2) // active and staging consensuses should have equal budgets
.build()
.unwrap();

let session_lock = SessionLock::new();
let consensus = Arc::new(Consensus::new(
Expand Down
4 changes: 2 additions & 2 deletions consensus/src/consensus/test_consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ impl TestConsensus {

/// Creates a test consensus instance based on `config` with a temp DB and the provided `notification_sender`
pub fn with_notifier(config: &Config, notification_sender: Sender<Notification>) -> Self {
let (db_lifetime, db) = create_temp_db!(ConnBuilder::default());
let (db_lifetime, db) = create_temp_db!(ConnBuilder::default().with_files_limit(10));
let notification_root = Arc::new(ConsensusNotificationRoot::new(notification_sender));
let counters = Default::default();
let tx_script_cache_counters = Default::default();
Expand All @@ -87,7 +87,7 @@ impl TestConsensus {

/// Creates a test consensus instance based on `config` with a temp DB and no notifier
pub fn new(config: &Config) -> Self {
let (db_lifetime, db) = create_temp_db!(ConnBuilder::default());
let (db_lifetime, db) = create_temp_db!(ConnBuilder::default().with_files_limit(10));
let (dummy_notification_sender, _) = async_channel::unbounded();
let notification_root = Arc::new(ConsensusNotificationRoot::new(dummy_notification_sender));
let counters = Default::default();
Expand Down
5 changes: 3 additions & 2 deletions consensus/src/model/stores/relations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,7 @@ impl RelationsStore for MemoryRelationsStore {
mod tests {
use super::*;
use crate::processes::relations::RelationsStoreExtensions;
use kaspa_database::create_temp_db;

#[test]
fn test_memory_relations_store() {
Expand All @@ -299,9 +300,9 @@ mod tests {

#[test]
fn test_db_relations_store() {
let db_tempdir = kaspa_database::utils::get_kaspa_tempdir();
let db = Arc::new(DB::open_default(db_tempdir.path().to_owned().to_str().unwrap()).unwrap());
let (lt, db) = create_temp_db!(kaspa_database::prelude::ConnBuilder::default().with_files_limit(10));
test_relations_store(DbRelationsStore::new(db, 0, 2));
drop(lt)
}

fn test_relations_store<T: RelationsStore>(mut store: T) {
Expand Down
2 changes: 1 addition & 1 deletion consensus/src/processes/pruning_proof/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ impl PruningProofManager {
let proof_pp_header = proof[0].last().expect("checked if empty");
let proof_pp = proof_pp_header.hash;
let proof_pp_level = calc_block_level(proof_pp_header, self.max_block_level);
let (db_lifetime, db) = kaspa_database::create_temp_db!(ConnBuilder::default());
let (db_lifetime, db) = kaspa_database::create_temp_db!(ConnBuilder::default().with_files_limit(10));
let headers_store = Arc::new(DbHeadersStore::new(db.clone(), 2 * self.pruning_proof_m)); // TODO: Think about cache size
let ghostdag_stores = (0..=self.max_block_level)
.map(|level| Arc::new(DbGhostdagStore::new(db.clone(), level, 2 * self.pruning_proof_m)))
Expand Down
4 changes: 2 additions & 2 deletions consensus/src/processes/reachability/inquirer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,7 @@ mod tests {
/// Runs a DAG test-case with full verification using the staging store mechanism.
/// Note: runtime is quadratic in the number of blocks so should be used with mildly small DAGs (~50)
fn run_dag_test_case_with_staging(test: &DagTestCase) {
let (_lifetime, db) = create_temp_db!(ConnBuilder::default());
let (_lifetime, db) = create_temp_db!(ConnBuilder::default().with_files_limit(10));
let cache_size = test.blocks.len() as u64 / 3;
let reachability = RwLock::new(DbReachabilityStore::new(db.clone(), cache_size));
let relations = RwLock::new(DbRelationsStore::with_prefix(db.clone(), &[], 0));
Expand Down Expand Up @@ -533,7 +533,7 @@ mod tests {
run_dag_test_case(&mut relations, &mut reachability, &test);

// Run with direct DB stores
let (_lifetime, db) = create_temp_db!(ConnBuilder::default());
let (_lifetime, db) = create_temp_db!(ConnBuilder::default().with_files_limit(10));
let cache_size = test.blocks.len() as u64 / 3;
let mut reachability = DbReachabilityStore::new(db.clone(), cache_size);
let mut relations = DbRelationsStore::new(db, 0, cache_size);
Expand Down
2 changes: 1 addition & 1 deletion consensus/src/processes/relations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ mod tests {

#[test]
fn test_delete_level_relations_zero_cache() {
let (_lifetime, db) = create_temp_db!(ConnBuilder::default());
let (_lifetime, db) = create_temp_db!(ConnBuilder::default().with_files_limit(10));
let cache_size = 0;
let mut relations = DbRelationsStore::new(db.clone(), 0, cache_size);
relations.insert(ORIGIN, Default::default()).unwrap();
Expand Down
3 changes: 3 additions & 0 deletions core/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ impl Core {
}
}

// Drop all services and cleanup
self.services.lock().unwrap().clear();

trace!("... core is shut down");
}
}
Expand Down
3 changes: 3 additions & 0 deletions core/src/task/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,9 @@ impl AsyncRuntime {
.collect::<Vec<TaskJoinHandle<AsyncServiceResult<()>>>>();
try_join_all(futures).await.unwrap();

// Drop all services and cleanup
self.services.lock().unwrap().clear();

trace!("async-runtime worker stopped");
}

Expand Down
1 change: 0 additions & 1 deletion database/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,3 @@ tempfile.workspace = true

enum-primitive-derive = "0.2.2"
num-traits = "0.2.15"
rlimit = "0.10.1"
29 changes: 27 additions & 2 deletions database/src/db.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,37 @@
use rocksdb::{DBWithThreadMode, MultiThreaded};
use std::ops::{Deref, DerefMut};
use std::path::PathBuf;

pub use conn_builder::ConnBuilder;
use kaspa_utils::fd_budget::FDGuard;

mod conn_builder;

/// The DB type used for Kaspad stores
pub type DB = DBWithThreadMode<MultiThreaded>;
pub struct DB {
inner: DBWithThreadMode<MultiThreaded>,
_fd_guard: FDGuard,
}

impl DB {
pub fn new(inner: DBWithThreadMode<MultiThreaded>, fd_guard: FDGuard) -> Self {
Self { inner, _fd_guard: fd_guard }
}
}

impl DerefMut for DB {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.inner
}
}

impl Deref for DB {
type Target = DBWithThreadMode<MultiThreaded>;

fn deref(&self) -> &Self::Target {
&self.inner
}
}

/// Deletes an existing DB if it exists
pub fn delete_db(db_dir: PathBuf) {
Expand All @@ -15,5 +40,5 @@ pub fn delete_db(db_dir: PathBuf) {
}
let options = rocksdb::Options::default();
let path = db_dir.to_str().unwrap();
DB::destroy(&options, path).expect("DB is expected to be deletable");
<DBWithThreadMode<MultiThreaded>>::destroy(&options, path).expect("DB is expected to be deletable");
}
Loading

0 comments on commit ae7dae3

Please sign in to comment.