From 225f899fa5248e62174b7a2405d3eafd75260b70 Mon Sep 17 00:00:00 2001 From: steviez Date: Tue, 31 Dec 2024 12:02:51 -0600 Subject: [PATCH] validator: Add CLI args to control rocksdb threadpool sizes (#4214) Plumb threadpool sizes from validator CLI through to rocksdb, and set threadpool sizes directly instead of using increase_parallelism() --- ledger-tool/src/ledger_utils.rs | 6 ++-- ledger/src/blockstore.rs | 9 +++--- ledger/src/blockstore_db.rs | 51 +++++++++++++++++++++++--------- ledger/src/blockstore_options.rs | 15 +++++++--- validator/src/cli/thread_args.rs | 40 +++++++++++++++++++++++++ validator/src/main.rs | 4 +++ 6 files changed, 99 insertions(+), 26 deletions(-) diff --git a/ledger-tool/src/ledger_utils.rs b/ledger-tool/src/ledger_utils.rs index 04b5ee314895a5..6382b35bfc88c0 100644 --- a/ledger-tool/src/ledger_utils.rs +++ b/ledger-tool/src/ledger_utils.rs @@ -17,9 +17,7 @@ use { solana_ledger::{ bank_forks_utils::{self, BankForksUtilsError}, blockstore::{Blockstore, BlockstoreError}, - blockstore_options::{ - AccessType, BlockstoreOptions, BlockstoreRecoveryMode, LedgerColumnOptions, - }, + blockstore_options::{AccessType, BlockstoreOptions, BlockstoreRecoveryMode}, blockstore_processor::{ self, BlockstoreProcessorError, ProcessOptions, TransactionStatusSender, }, @@ -471,7 +469,7 @@ pub fn open_blockstore( access_type: access_type.clone(), recovery_mode: wal_recovery_mode.clone(), enforce_ulimit_nofile, - column_options: LedgerColumnOptions::default(), + ..BlockstoreOptions::default() }, ) { Ok(blockstore) => blockstore, diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index 16a1a3cc566492..bf30910f9b39bc 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -12,7 +12,7 @@ use { blockstore_meta::*, blockstore_metrics::BlockstoreRpcApiMetrics, blockstore_options::{ - AccessType, BlockstoreOptions, LedgerColumnOptions, BLOCKSTORE_DIRECTORY_ROCKS_LEVEL, + BlockstoreOptions, LedgerColumnOptions, BLOCKSTORE_DIRECTORY_ROCKS_LEVEL, }, blockstore_processor::BlockstoreProcessorError, leader_schedule_cache::LeaderScheduleCache, @@ -90,7 +90,9 @@ pub mod blockstore_purge; use static_assertions::const_assert_eq; pub use { crate::{ - blockstore_db::BlockstoreError, + blockstore_db::{ + default_num_compaction_threads, default_num_flush_threads, BlockstoreError, + }, blockstore_meta::{OptimisticSlotMetaVersioned, SlotMeta}, blockstore_metrics::BlockstoreInsertionMetrics, }, @@ -4961,10 +4963,9 @@ pub fn create_new_ledger( let blockstore = Blockstore::open_with_options( ledger_path, BlockstoreOptions { - access_type: AccessType::Primary, - recovery_mode: None, enforce_ulimit_nofile: false, column_options: column_options.clone(), + ..BlockstoreOptions::default() }, )?; let ticks_per_slot = genesis_config.ticks_per_slot; diff --git a/ledger/src/blockstore_db.rs b/ledger/src/blockstore_db.rs index 8a674b2a3f5be1..d31a0fc0ebdcd5 100644 --- a/ledger/src/blockstore_db.rs +++ b/ledger/src/blockstore_db.rs @@ -37,6 +37,7 @@ use { fs, marker::PhantomData, mem, + num::NonZeroUsize, path::{Path, PathBuf}, sync::{ atomic::{AtomicBool, AtomicU64, Ordering}, @@ -415,13 +416,12 @@ pub(crate) struct Rocks { impl Rocks { pub(crate) fn open(path: PathBuf, options: BlockstoreOptions) -> Result { - let access_type = options.access_type.clone(); let recovery_mode = options.recovery_mode.clone(); fs::create_dir_all(&path)?; // Use default database options - let mut db_options = get_db_options(&access_type); + let mut db_options = get_db_options(&options); if let Some(recovery_mode) = recovery_mode { db_options.set_wal_recovery_mode(recovery_mode.into()); } @@ -430,7 +430,7 @@ impl Rocks { let column_options = Arc::from(options.column_options); // Open the database - let db = match access_type { + let db = match options.access_type { AccessType::Primary | AccessType::PrimaryForMaintenance => { DB::open_cf_descriptors(&db_options, &path, cf_descriptors)? } @@ -452,7 +452,7 @@ impl Rocks { let rocks = Rocks { db, path, - access_type, + access_type: options.access_type, oldest_slot, column_options, write_batch_perf_status: PerfSamplingStatus::default(), @@ -1991,28 +1991,39 @@ fn process_cf_options_advanced( } } -fn get_db_options(access_type: &AccessType) -> Options { +fn get_db_options(blockstore_options: &BlockstoreOptions) -> Options { let mut options = Options::default(); // Create missing items to support a clean start options.create_if_missing(true); options.create_missing_column_families(true); - // Per the docs, a good value for this is the number of cores on the machine - options.increase_parallelism(num_cpus::get() as i32); - + // rocksdb builds two threadpools: low and high priority. The low priority + // pool is used for compactions whereas the high priority pool is used for + // memtable flushes. Separate pools are created so that compactions are + // unable to stall memtable flushes (which could stall memtable writes). let mut env = rocksdb::Env::new().unwrap(); - // While a compaction is ongoing, all the background threads - // could be used by the compaction. This can stall writes which - // need to flush the memtable. Add some high-priority background threads - // which can service these writes. - env.set_high_priority_background_threads(4); + env.set_low_priority_background_threads( + blockstore_options.num_rocksdb_compaction_threads.get() as i32, + ); + env.set_high_priority_background_threads( + blockstore_options.num_rocksdb_flush_threads.get() as i32 + ); options.set_env(&env); + // rocksdb will try to scale threadpool sizes automatically based on the + // value set for max_background_jobs. The automatic scaling can increase, + // but not decrease the number of threads in each pool. But, we already + // set desired threadpool sizes with set_low_priority_background_threads() + // and set_high_priority_background_threads(). So, set max_background_jobs + // to a small number (2) so that rocksdb will leave the previously + // configured threadpool sizes as-is. The value (2) would result in one + // low priority and one high priority thread which is the minimum for each. + options.set_max_background_jobs(2); // Set max total wal size to 4G. options.set_max_total_wal_size(4 * 1024 * 1024 * 1024); - if should_disable_auto_compactions(access_type) { + if should_disable_auto_compactions(&blockstore_options.access_type) { options.set_disable_auto_compactions(true); } @@ -2024,6 +2035,18 @@ fn get_db_options(access_type: &AccessType) -> Options { options } +/// The default number of threads to use for rocksdb compaction in the rocksdb +/// low priority threadpool +pub fn default_num_compaction_threads() -> NonZeroUsize { + NonZeroUsize::new(num_cpus::get()).expect("thread count is non-zero") +} + +/// The default number of threads to use for rocksdb memtable flushes in the +/// rocksdb high priority threadpool +pub fn default_num_flush_threads() -> NonZeroUsize { + NonZeroUsize::new((num_cpus::get() / 4).max(1)).expect("thread count is non-zero") +} + // Returns whether automatic compactions should be disabled for the entire // database based upon the given access type. fn should_disable_auto_compactions(access_type: &AccessType) -> bool { diff --git a/ledger/src/blockstore_options.rs b/ledger/src/blockstore_options.rs index 977323fe9c3ffc..15a9ff1041ed40 100644 --- a/ledger/src/blockstore_options.rs +++ b/ledger/src/blockstore_options.rs @@ -1,4 +1,8 @@ -use rocksdb::{DBCompressionType as RocksCompressionType, DBRecoveryMode}; +use { + crate::blockstore_db::{default_num_compaction_threads, default_num_flush_threads}, + rocksdb::{DBCompressionType as RocksCompressionType, DBRecoveryMode}, + std::num::NonZeroUsize, +}; /// The subdirectory under ledger directory where the Blockstore lives pub const BLOCKSTORE_DIRECTORY_ROCKS_LEVEL: &str = "rocksdb"; @@ -13,6 +17,8 @@ pub struct BlockstoreOptions { // desired open file descriptor limit cannot be configured. Default: true. pub enforce_ulimit_nofile: bool, pub column_options: LedgerColumnOptions, + pub num_rocksdb_compaction_threads: NonZeroUsize, + pub num_rocksdb_flush_threads: NonZeroUsize, } impl Default for BlockstoreOptions { @@ -25,6 +31,8 @@ impl Default for BlockstoreOptions { recovery_mode: None, enforce_ulimit_nofile: true, column_options: LedgerColumnOptions::default(), + num_rocksdb_compaction_threads: default_num_compaction_threads(), + num_rocksdb_flush_threads: default_num_flush_threads(), } } } @@ -32,10 +40,9 @@ impl Default for BlockstoreOptions { impl BlockstoreOptions { pub fn default_for_tests() -> Self { Self { - access_type: AccessType::Primary, - recovery_mode: None, + // No need to enforce the limit in tests enforce_ulimit_nofile: false, - column_options: LedgerColumnOptions::default(), + ..BlockstoreOptions::default() } } } diff --git a/validator/src/cli/thread_args.rs b/validator/src/cli/thread_args.rs index 589fa7edf598ae..ddf819347c47aa 100644 --- a/validator/src/cli/thread_args.rs +++ b/validator/src/cli/thread_args.rs @@ -18,6 +18,8 @@ pub struct DefaultThreadArgs { pub rayon_global_threads: String, pub replay_forks_threads: String, pub replay_transactions_threads: String, + pub rocksdb_compaction_threads: String, + pub rocksdb_flush_threads: String, pub tvu_receive_threads: String, pub tvu_sigverify_threads: String, } @@ -36,6 +38,8 @@ impl Default for DefaultThreadArgs { replay_forks_threads: ReplayForksThreadsArg::bounded_default().to_string(), replay_transactions_threads: ReplayTransactionsThreadsArg::bounded_default() .to_string(), + rocksdb_compaction_threads: RocksdbCompactionThreadsArg::bounded_default().to_string(), + rocksdb_flush_threads: RocksdbFlushThreadsArg::bounded_default().to_string(), tvu_receive_threads: TvuReceiveThreadsArg::bounded_default().to_string(), tvu_sigverify_threads: TvuShredSigverifyThreadsArg::bounded_default().to_string(), } @@ -52,6 +56,8 @@ pub fn thread_args<'a>(defaults: &DefaultThreadArgs) -> Vec> { new_thread_arg::(&defaults.rayon_global_threads), new_thread_arg::(&defaults.replay_forks_threads), new_thread_arg::(&defaults.replay_transactions_threads), + new_thread_arg::(&defaults.rocksdb_compaction_threads), + new_thread_arg::(&defaults.rocksdb_flush_threads), new_thread_arg::(&defaults.tvu_receive_threads), new_thread_arg::(&defaults.tvu_sigverify_threads), ] @@ -77,6 +83,8 @@ pub struct NumThreadConfig { pub rayon_global_threads: NonZeroUsize, pub replay_forks_threads: NonZeroUsize, pub replay_transactions_threads: NonZeroUsize, + pub rocksdb_compaction_threads: NonZeroUsize, + pub rocksdb_flush_threads: NonZeroUsize, pub tvu_receive_threads: NonZeroUsize, pub tvu_sigverify_threads: NonZeroUsize, } @@ -119,6 +127,16 @@ pub fn parse_num_threads_args(matches: &ArgMatches) -> NumThreadConfig { ReplayTransactionsThreadsArg::NAME, NonZeroUsize ), + rocksdb_compaction_threads: value_t_or_exit!( + matches, + RocksdbCompactionThreadsArg::NAME, + NonZeroUsize + ), + rocksdb_flush_threads: value_t_or_exit!( + matches, + RocksdbFlushThreadsArg::NAME, + NonZeroUsize + ), tvu_receive_threads: value_t_or_exit!(matches, TvuReceiveThreadsArg::NAME, NonZeroUsize), tvu_sigverify_threads: value_t_or_exit!( matches, @@ -257,6 +275,28 @@ impl ThreadArg for ReplayTransactionsThreadsArg { } } +struct RocksdbCompactionThreadsArg; +impl ThreadArg for RocksdbCompactionThreadsArg { + const NAME: &'static str = "rocksdb_compaction_threads"; + const LONG_NAME: &'static str = "rocksdb-compaction-threads"; + const HELP: &'static str = "Number of threads to use for rocksdb (Blockstore) compactions"; + + fn default() -> usize { + solana_ledger::blockstore::default_num_compaction_threads().get() + } +} + +struct RocksdbFlushThreadsArg; +impl ThreadArg for RocksdbFlushThreadsArg { + const NAME: &'static str = "rocksdb_flush_threads"; + const LONG_NAME: &'static str = "rocksdb-flush-threads"; + const HELP: &'static str = "Number of threads to use for rocksdb (Blockstore) memtable flushes"; + + fn default() -> usize { + solana_ledger::blockstore::default_num_flush_threads().get() + } +} + struct TvuReceiveThreadsArg; impl ThreadArg for TvuReceiveThreadsArg { const NAME: &'static str = "tvu_receive_threads"; diff --git a/validator/src/main.rs b/validator/src/main.rs index d1ef0ba4821568..4a0be757008720 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -906,6 +906,8 @@ pub fn main() { rayon_global_threads, replay_forks_threads, replay_transactions_threads, + rocksdb_compaction_threads, + rocksdb_flush_threads, tvu_receive_threads, tvu_sigverify_threads, } = cli::thread_args::parse_num_threads_args(&matches); @@ -1054,6 +1056,8 @@ pub fn main() { enforce_ulimit_nofile: true, // The validator needs primary (read/write) access_type: AccessType::Primary, + num_rocksdb_compaction_threads: rocksdb_compaction_threads, + num_rocksdb_flush_threads: rocksdb_flush_threads, }; let accounts_hash_cache_path = matches