Skip to content

Commit

Permalink
validator: Add CLI args to control rocksdb threadpool sizes (#4214)
Browse files Browse the repository at this point in the history
Plumb threadpool sizes from validator CLI through to rocksdb, and set
threadpool sizes directly instead of using increase_parallelism()
  • Loading branch information
steviez authored Dec 31, 2024
1 parent ed15395 commit 225f899
Show file tree
Hide file tree
Showing 6 changed files with 99 additions and 26 deletions.
6 changes: 2 additions & 4 deletions ledger-tool/src/ledger_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@ use {
solana_ledger::{
bank_forks_utils::{self, BankForksUtilsError},
blockstore::{Blockstore, BlockstoreError},
blockstore_options::{
AccessType, BlockstoreOptions, BlockstoreRecoveryMode, LedgerColumnOptions,
},
blockstore_options::{AccessType, BlockstoreOptions, BlockstoreRecoveryMode},
blockstore_processor::{
self, BlockstoreProcessorError, ProcessOptions, TransactionStatusSender,
},
Expand Down Expand Up @@ -471,7 +469,7 @@ pub fn open_blockstore(
access_type: access_type.clone(),
recovery_mode: wal_recovery_mode.clone(),
enforce_ulimit_nofile,
column_options: LedgerColumnOptions::default(),
..BlockstoreOptions::default()
},
) {
Ok(blockstore) => blockstore,
Expand Down
9 changes: 5 additions & 4 deletions ledger/src/blockstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use {
blockstore_meta::*,
blockstore_metrics::BlockstoreRpcApiMetrics,
blockstore_options::{
AccessType, BlockstoreOptions, LedgerColumnOptions, BLOCKSTORE_DIRECTORY_ROCKS_LEVEL,
BlockstoreOptions, LedgerColumnOptions, BLOCKSTORE_DIRECTORY_ROCKS_LEVEL,
},
blockstore_processor::BlockstoreProcessorError,
leader_schedule_cache::LeaderScheduleCache,
Expand Down Expand Up @@ -90,7 +90,9 @@ pub mod blockstore_purge;
use static_assertions::const_assert_eq;
pub use {
crate::{
blockstore_db::BlockstoreError,
blockstore_db::{
default_num_compaction_threads, default_num_flush_threads, BlockstoreError,
},
blockstore_meta::{OptimisticSlotMetaVersioned, SlotMeta},
blockstore_metrics::BlockstoreInsertionMetrics,
},
Expand Down Expand Up @@ -4961,10 +4963,9 @@ pub fn create_new_ledger(
let blockstore = Blockstore::open_with_options(
ledger_path,
BlockstoreOptions {
access_type: AccessType::Primary,
recovery_mode: None,
enforce_ulimit_nofile: false,
column_options: column_options.clone(),
..BlockstoreOptions::default()
},
)?;
let ticks_per_slot = genesis_config.ticks_per_slot;
Expand Down
51 changes: 37 additions & 14 deletions ledger/src/blockstore_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ use {
fs,
marker::PhantomData,
mem,
num::NonZeroUsize,
path::{Path, PathBuf},
sync::{
atomic::{AtomicBool, AtomicU64, Ordering},
Expand Down Expand Up @@ -415,13 +416,12 @@ pub(crate) struct Rocks {

impl Rocks {
pub(crate) fn open(path: PathBuf, options: BlockstoreOptions) -> Result<Rocks> {
let access_type = options.access_type.clone();
let recovery_mode = options.recovery_mode.clone();

fs::create_dir_all(&path)?;

// Use default database options
let mut db_options = get_db_options(&access_type);
let mut db_options = get_db_options(&options);
if let Some(recovery_mode) = recovery_mode {
db_options.set_wal_recovery_mode(recovery_mode.into());
}
Expand All @@ -430,7 +430,7 @@ impl Rocks {
let column_options = Arc::from(options.column_options);

// Open the database
let db = match access_type {
let db = match options.access_type {
AccessType::Primary | AccessType::PrimaryForMaintenance => {
DB::open_cf_descriptors(&db_options, &path, cf_descriptors)?
}
Expand All @@ -452,7 +452,7 @@ impl Rocks {
let rocks = Rocks {
db,
path,
access_type,
access_type: options.access_type,
oldest_slot,
column_options,
write_batch_perf_status: PerfSamplingStatus::default(),
Expand Down Expand Up @@ -1991,28 +1991,39 @@ fn process_cf_options_advanced<C: 'static + Column + ColumnName>(
}
}

fn get_db_options(access_type: &AccessType) -> Options {
fn get_db_options(blockstore_options: &BlockstoreOptions) -> Options {
let mut options = Options::default();

// Create missing items to support a clean start
options.create_if_missing(true);
options.create_missing_column_families(true);

// Per the docs, a good value for this is the number of cores on the machine
options.increase_parallelism(num_cpus::get() as i32);

// rocksdb builds two threadpools: low and high priority. The low priority
// pool is used for compactions whereas the high priority pool is used for
// memtable flushes. Separate pools are created so that compactions are
// unable to stall memtable flushes (which could stall memtable writes).
let mut env = rocksdb::Env::new().unwrap();
// While a compaction is ongoing, all the background threads
// could be used by the compaction. This can stall writes which
// need to flush the memtable. Add some high-priority background threads
// which can service these writes.
env.set_high_priority_background_threads(4);
env.set_low_priority_background_threads(
blockstore_options.num_rocksdb_compaction_threads.get() as i32,
);
env.set_high_priority_background_threads(
blockstore_options.num_rocksdb_flush_threads.get() as i32
);
options.set_env(&env);
// rocksdb will try to scale threadpool sizes automatically based on the
// value set for max_background_jobs. The automatic scaling can increase,
// but not decrease the number of threads in each pool. But, we already
// set desired threadpool sizes with set_low_priority_background_threads()
// and set_high_priority_background_threads(). So, set max_background_jobs
// to a small number (2) so that rocksdb will leave the previously
// configured threadpool sizes as-is. The value (2) would result in one
// low priority and one high priority thread which is the minimum for each.
options.set_max_background_jobs(2);

// Set max total wal size to 4G.
options.set_max_total_wal_size(4 * 1024 * 1024 * 1024);

if should_disable_auto_compactions(access_type) {
if should_disable_auto_compactions(&blockstore_options.access_type) {
options.set_disable_auto_compactions(true);
}

Expand All @@ -2024,6 +2035,18 @@ fn get_db_options(access_type: &AccessType) -> Options {
options
}

/// The default number of threads to use for rocksdb compaction in the rocksdb
/// low priority threadpool
pub fn default_num_compaction_threads() -> NonZeroUsize {
NonZeroUsize::new(num_cpus::get()).expect("thread count is non-zero")
}

/// The default number of threads to use for rocksdb memtable flushes in the
/// rocksdb high priority threadpool
pub fn default_num_flush_threads() -> NonZeroUsize {
NonZeroUsize::new((num_cpus::get() / 4).max(1)).expect("thread count is non-zero")
}

// Returns whether automatic compactions should be disabled for the entire
// database based upon the given access type.
fn should_disable_auto_compactions(access_type: &AccessType) -> bool {
Expand Down
15 changes: 11 additions & 4 deletions ledger/src/blockstore_options.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
use rocksdb::{DBCompressionType as RocksCompressionType, DBRecoveryMode};
use {
crate::blockstore_db::{default_num_compaction_threads, default_num_flush_threads},
rocksdb::{DBCompressionType as RocksCompressionType, DBRecoveryMode},
std::num::NonZeroUsize,
};

/// The subdirectory under ledger directory where the Blockstore lives
pub const BLOCKSTORE_DIRECTORY_ROCKS_LEVEL: &str = "rocksdb";
Expand All @@ -13,6 +17,8 @@ pub struct BlockstoreOptions {
// desired open file descriptor limit cannot be configured. Default: true.
pub enforce_ulimit_nofile: bool,
pub column_options: LedgerColumnOptions,
pub num_rocksdb_compaction_threads: NonZeroUsize,
pub num_rocksdb_flush_threads: NonZeroUsize,
}

impl Default for BlockstoreOptions {
Expand All @@ -25,17 +31,18 @@ impl Default for BlockstoreOptions {
recovery_mode: None,
enforce_ulimit_nofile: true,
column_options: LedgerColumnOptions::default(),
num_rocksdb_compaction_threads: default_num_compaction_threads(),
num_rocksdb_flush_threads: default_num_flush_threads(),
}
}
}

impl BlockstoreOptions {
pub fn default_for_tests() -> Self {
Self {
access_type: AccessType::Primary,
recovery_mode: None,
// No need to enforce the limit in tests
enforce_ulimit_nofile: false,
column_options: LedgerColumnOptions::default(),
..BlockstoreOptions::default()
}
}
}
Expand Down
40 changes: 40 additions & 0 deletions validator/src/cli/thread_args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ pub struct DefaultThreadArgs {
pub rayon_global_threads: String,
pub replay_forks_threads: String,
pub replay_transactions_threads: String,
pub rocksdb_compaction_threads: String,
pub rocksdb_flush_threads: String,
pub tvu_receive_threads: String,
pub tvu_sigverify_threads: String,
}
Expand All @@ -36,6 +38,8 @@ impl Default for DefaultThreadArgs {
replay_forks_threads: ReplayForksThreadsArg::bounded_default().to_string(),
replay_transactions_threads: ReplayTransactionsThreadsArg::bounded_default()
.to_string(),
rocksdb_compaction_threads: RocksdbCompactionThreadsArg::bounded_default().to_string(),
rocksdb_flush_threads: RocksdbFlushThreadsArg::bounded_default().to_string(),
tvu_receive_threads: TvuReceiveThreadsArg::bounded_default().to_string(),
tvu_sigverify_threads: TvuShredSigverifyThreadsArg::bounded_default().to_string(),
}
Expand All @@ -52,6 +56,8 @@ pub fn thread_args<'a>(defaults: &DefaultThreadArgs) -> Vec<Arg<'_, 'a>> {
new_thread_arg::<RayonGlobalThreadsArg>(&defaults.rayon_global_threads),
new_thread_arg::<ReplayForksThreadsArg>(&defaults.replay_forks_threads),
new_thread_arg::<ReplayTransactionsThreadsArg>(&defaults.replay_transactions_threads),
new_thread_arg::<RocksdbCompactionThreadsArg>(&defaults.rocksdb_compaction_threads),
new_thread_arg::<RocksdbFlushThreadsArg>(&defaults.rocksdb_flush_threads),
new_thread_arg::<TvuReceiveThreadsArg>(&defaults.tvu_receive_threads),
new_thread_arg::<TvuShredSigverifyThreadsArg>(&defaults.tvu_sigverify_threads),
]
Expand All @@ -77,6 +83,8 @@ pub struct NumThreadConfig {
pub rayon_global_threads: NonZeroUsize,
pub replay_forks_threads: NonZeroUsize,
pub replay_transactions_threads: NonZeroUsize,
pub rocksdb_compaction_threads: NonZeroUsize,
pub rocksdb_flush_threads: NonZeroUsize,
pub tvu_receive_threads: NonZeroUsize,
pub tvu_sigverify_threads: NonZeroUsize,
}
Expand Down Expand Up @@ -119,6 +127,16 @@ pub fn parse_num_threads_args(matches: &ArgMatches) -> NumThreadConfig {
ReplayTransactionsThreadsArg::NAME,
NonZeroUsize
),
rocksdb_compaction_threads: value_t_or_exit!(
matches,
RocksdbCompactionThreadsArg::NAME,
NonZeroUsize
),
rocksdb_flush_threads: value_t_or_exit!(
matches,
RocksdbFlushThreadsArg::NAME,
NonZeroUsize
),
tvu_receive_threads: value_t_or_exit!(matches, TvuReceiveThreadsArg::NAME, NonZeroUsize),
tvu_sigverify_threads: value_t_or_exit!(
matches,
Expand Down Expand Up @@ -257,6 +275,28 @@ impl ThreadArg for ReplayTransactionsThreadsArg {
}
}

struct RocksdbCompactionThreadsArg;
impl ThreadArg for RocksdbCompactionThreadsArg {
const NAME: &'static str = "rocksdb_compaction_threads";
const LONG_NAME: &'static str = "rocksdb-compaction-threads";
const HELP: &'static str = "Number of threads to use for rocksdb (Blockstore) compactions";

fn default() -> usize {
solana_ledger::blockstore::default_num_compaction_threads().get()
}
}

struct RocksdbFlushThreadsArg;
impl ThreadArg for RocksdbFlushThreadsArg {
const NAME: &'static str = "rocksdb_flush_threads";
const LONG_NAME: &'static str = "rocksdb-flush-threads";
const HELP: &'static str = "Number of threads to use for rocksdb (Blockstore) memtable flushes";

fn default() -> usize {
solana_ledger::blockstore::default_num_flush_threads().get()
}
}

struct TvuReceiveThreadsArg;
impl ThreadArg for TvuReceiveThreadsArg {
const NAME: &'static str = "tvu_receive_threads";
Expand Down
4 changes: 4 additions & 0 deletions validator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -906,6 +906,8 @@ pub fn main() {
rayon_global_threads,
replay_forks_threads,
replay_transactions_threads,
rocksdb_compaction_threads,
rocksdb_flush_threads,
tvu_receive_threads,
tvu_sigverify_threads,
} = cli::thread_args::parse_num_threads_args(&matches);
Expand Down Expand Up @@ -1054,6 +1056,8 @@ pub fn main() {
enforce_ulimit_nofile: true,
// The validator needs primary (read/write)
access_type: AccessType::Primary,
num_rocksdb_compaction_threads: rocksdb_compaction_threads,
num_rocksdb_flush_threads: rocksdb_flush_threads,
};

let accounts_hash_cache_path = matches
Expand Down

0 comments on commit 225f899

Please sign in to comment.