diff --git a/Cargo.lock b/Cargo.lock index a727fae2c8b0ee..9a61e27a12c5e9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7547,6 +7547,7 @@ dependencies = [ "solana-svm", "solana-test-validator", "solana-tpu-client", + "solana-unified-scheduler-pool", "solana-version", "solana-vote-program", "spl-token-2022", diff --git a/core/src/validator.rs b/core/src/validator.rs index b71c11cd967d34..97ef0a01ef87ad 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -262,6 +262,7 @@ pub struct ValidatorConfig { pub generator_config: Option, pub use_snapshot_archives_at_startup: UseSnapshotArchivesAtStartup, pub wen_restart_proto_path: Option, + pub unified_scheduler_handler_threads: Option, } impl Default for ValidatorConfig { @@ -329,6 +330,7 @@ impl Default for ValidatorConfig { generator_config: None, use_snapshot_archives_at_startup: UseSnapshotArchivesAtStartup::default(), wen_restart_proto_path: None, + unified_scheduler_handler_threads: None, } } } @@ -813,9 +815,16 @@ impl Validator { match &config.block_verification_method { BlockVerificationMethod::BlockstoreProcessor => { info!("no scheduler pool is installed for block verification..."); + if let Some(count) = config.unified_scheduler_handler_threads { + warn!( + "--unified-scheduler-handler-threads={count} is ignored because unified \ + scheduler isn't enabled" + ); + } } BlockVerificationMethod::UnifiedScheduler => { let scheduler_pool = DefaultSchedulerPool::new_dyn( + config.unified_scheduler_handler_threads, config.runtime_config.log_messages_bytes_limit, transaction_status_sender.clone(), Some(replay_vote_sender.clone()), diff --git a/ledger-tool/src/ledger_utils.rs b/ledger-tool/src/ledger_utils.rs index 2663a205fb5f37..116b21527ae4d8 100644 --- a/ledger-tool/src/ledger_utils.rs +++ b/ledger-tool/src/ledger_utils.rs @@ -291,9 +291,17 @@ pub fn load_and_process_ledger( "Using: block-verification-method: {}", block_verification_method, ); + let unified_scheduler_handler_threads = + value_t!(arg_matches, "unified_scheduler_handler_threads", usize).ok(); match block_verification_method { BlockVerificationMethod::BlockstoreProcessor => { info!("no scheduler pool is installed for block verification..."); + if let Some(count) = unified_scheduler_handler_threads { + warn!( + "--unified-scheduler-handler-threads={count} is ignored because unified \ + scheduler isn't enabled" + ); + } } BlockVerificationMethod::UnifiedScheduler => { let no_transaction_status_sender = None; @@ -303,6 +311,7 @@ pub fn load_and_process_ledger( .write() .unwrap() .install_scheduler_pool(DefaultSchedulerPool::new_dyn( + unified_scheduler_handler_threads, process_options.runtime_config.log_messages_bytes_limit, no_transaction_status_sender, no_replay_vote_sender, diff --git a/ledger-tool/src/main.rs b/ledger-tool/src/main.rs index d6218fad6915f0..9b299cfadcbcf2 100644 --- a/ledger-tool/src/main.rs +++ b/ledger-tool/src/main.rs @@ -28,7 +28,7 @@ use { input_parsers::{cluster_type_of, pubkey_of, pubkeys_of}, input_validators::{ is_parsable, is_pow2, is_pubkey, is_pubkey_or_keypair, is_slot, is_valid_percentage, - validate_maximum_full_snapshot_archives_to_retain, + is_within_range, validate_maximum_full_snapshot_archives_to_retain, validate_maximum_incremental_snapshot_archives_to_retain, }, }, @@ -72,6 +72,7 @@ use { transaction::{MessageHash, SanitizedTransaction, SimpleAddressLoader}, }, solana_stake_program::stake_state::{self, PointValue}, + solana_unified_scheduler_pool::DefaultSchedulerPool, solana_vote_program::{ self, vote_state::{self, VoteState}, @@ -852,6 +853,16 @@ fn main() { .hidden(hidden_unless_forced()) .help(BlockVerificationMethod::cli_message()), ) + .arg( + Arg::with_name("unified_scheduler_handler_threads") + .long("unified-scheduler-handler-threads") + .value_name("COUNT") + .takes_value(true) + .validator(|s| is_within_range(s, 1..)) + .global(true) + .hidden(hidden_unless_forced()) + .help(DefaultSchedulerPool::cli_message()), + ) .arg( Arg::with_name("output_format") .long("output") diff --git a/local-cluster/src/validator_configs.rs b/local-cluster/src/validator_configs.rs index 537dd6495f32e1..33883bb02c1d77 100644 --- a/local-cluster/src/validator_configs.rs +++ b/local-cluster/src/validator_configs.rs @@ -68,6 +68,7 @@ pub fn safe_clone_config(config: &ValidatorConfig) -> ValidatorConfig { generator_config: config.generator_config.clone(), use_snapshot_archives_at_startup: config.use_snapshot_archives_at_startup, wen_restart_proto_path: config.wen_restart_proto_path.clone(), + unified_scheduler_handler_threads: config.unified_scheduler_handler_threads, } } diff --git a/programs/sbf/Cargo.lock b/programs/sbf/Cargo.lock index 6ab36567f1a744..1b8d422d42ba7c 100644 --- a/programs/sbf/Cargo.lock +++ b/programs/sbf/Cargo.lock @@ -6546,6 +6546,7 @@ dependencies = [ "solana-svm", "solana-test-validator", "solana-tpu-client", + "solana-unified-scheduler-pool", "solana-version", "solana-vote-program", "symlink", diff --git a/unified-scheduler-pool/src/lib.rs b/unified-scheduler-pool/src/lib.rs index deae3697807705..09ded82ee88e7d 100644 --- a/unified-scheduler-pool/src/lib.rs +++ b/unified-scheduler-pool/src/lib.rs @@ -34,7 +34,7 @@ use { marker::PhantomData, sync::{ atomic::{AtomicU64, Ordering::Relaxed}, - Arc, Mutex, Weak, + Arc, Mutex, OnceLock, Weak, }, thread::{self, JoinHandle}, }, @@ -48,6 +48,7 @@ type AtomicSchedulerId = AtomicU64; #[derive(Debug)] pub struct SchedulerPool, TH: TaskHandler> { scheduler_inners: Mutex>, + handler_count: usize, handler_context: HandlerContext, // weak_self could be elided by changing InstalledScheduler::take_scheduler()'s receiver to // Arc from &Self, because SchedulerPool is used as in the form of Arc @@ -83,13 +84,20 @@ where // Some internal impl and test code want an actual concrete type, NOT the // `dyn InstalledSchedulerPool`. So don't merge this into `Self::new_dyn()`. fn new( + handler_count: Option, log_messages_bytes_limit: Option, transaction_status_sender: Option, replay_vote_sender: Option, prioritization_fee_cache: Arc, ) -> Arc { + let handler_count = handler_count.unwrap_or(1); + // we're hard-coding the number of handler thread to 1, meaning this impl is currently + // single-threaded still. + assert_eq!(handler_count, 1); // replace this with assert!(handler_count >= 1) later + Arc::new_cyclic(|weak_self| Self { scheduler_inners: Mutex::default(), + handler_count, handler_context: HandlerContext { log_messages_bytes_limit, transaction_status_sender, @@ -105,12 +113,14 @@ where // This apparently-meaningless wrapper is handy, because some callers explicitly want // `dyn InstalledSchedulerPool` to be returned for type inference convenience. pub fn new_dyn( + handler_count: Option, log_messages_bytes_limit: Option, transaction_status_sender: Option, replay_vote_sender: Option, prioritization_fee_cache: Arc, ) -> InstalledSchedulerPoolArc { Self::new( + handler_count, log_messages_bytes_limit, transaction_status_sender, replay_vote_sender, @@ -145,6 +155,37 @@ where S::spawn(self.self_arc(), context) } } + + pub fn default_handler_count() -> usize { + Self::calculate_default_handler_count( + thread::available_parallelism() + .ok() + .map(|non_zero| non_zero.get()), + ) + } + + pub fn calculate_default_handler_count(detected_cpu_core_count: Option) -> usize { + // Divide by 4 just not to consume all available CPUs just with handler threads, sparing for + // other active forks and other subsystems. + // Also, if available_parallelism fails (which should be very rare), use 4 threads, + // as a relatively conservatism assumption of modern multi-core systems ranging from + // engineers' laptops to production servers. + detected_cpu_core_count + .map(|core_count| (core_count / 4).max(1)) + .unwrap_or(4) + } + + pub fn cli_message() -> &'static str { + static MESSAGE: OnceLock = OnceLock::new(); + + MESSAGE.get_or_init(|| { + format!( + "Change the number of the unified scheduler's transaction execution threads \ + dedicated to each block, otherwise calculated as cpu_cores/4 [default: {}]", + Self::default_handler_count() + ) + }) + } } impl InstalledSchedulerPool for SchedulerPool @@ -372,7 +413,6 @@ pub struct PooledSchedulerInner, TH: TaskHandler> { struct ThreadManager, TH: TaskHandler> { scheduler_id: SchedulerId, pool: Arc>, - handler_count: usize, new_task_sender: Sender, new_task_receiver: Receiver, session_result_sender: Sender>, @@ -384,13 +424,9 @@ struct ThreadManager, TH: TaskHandler> { impl PooledScheduler { fn do_spawn(pool: Arc>, initial_context: SchedulingContext) -> Self { - // we're hard-coding the number of handler thread to 1, meaning this impl is currently - // single-threaded still. - let handler_count = 1; - Self::from_inner( PooledSchedulerInner:: { - thread_manager: ThreadManager::new(pool, handler_count), + thread_manager: ThreadManager::new(pool), }, initial_context, ) @@ -398,14 +434,14 @@ impl PooledScheduler { } impl, TH: TaskHandler> ThreadManager { - fn new(pool: Arc>, handler_count: usize) -> Self { + fn new(pool: Arc>) -> Self { let (new_task_sender, new_task_receiver) = unbounded(); let (session_result_sender, session_result_receiver) = unbounded(); + let handler_count = pool.handler_count; Self { scheduler_id: pool.new_scheduler_id(), pool, - handler_count, new_task_sender, new_task_receiver, session_result_sender, @@ -477,7 +513,7 @@ impl, TH: TaskHandler> ThreadManager { // 5. the handler thread reply back to the scheduler thread as an executed task. // 6. the scheduler thread post-processes the executed task. let scheduler_main_loop = || { - let handler_count = self.handler_count; + let handler_count = self.pool.handler_count; let session_result_sender = self.session_result_sender.clone(); let new_task_receiver = self.new_task_receiver.clone(); @@ -613,7 +649,7 @@ impl, TH: TaskHandler> ThreadManager { .unwrap(), ); - self.handler_threads = (0..self.handler_count) + self.handler_threads = (0..self.pool.handler_count) .map({ |thx| { thread::Builder::new() @@ -760,7 +796,7 @@ mod tests { let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64)); let pool = - DefaultSchedulerPool::new_dyn(None, None, None, ignored_prioritization_fee_cache); + DefaultSchedulerPool::new_dyn(None, None, None, None, ignored_prioritization_fee_cache); // this indirectly proves that there should be circular link because there's only one Arc // at this moment now @@ -775,7 +811,7 @@ mod tests { let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64)); let pool = - DefaultSchedulerPool::new_dyn(None, None, None, ignored_prioritization_fee_cache); + DefaultSchedulerPool::new_dyn(None, None, None, None, ignored_prioritization_fee_cache); let bank = Arc::new(Bank::default_for_tests()); let context = SchedulingContext::new(bank); let scheduler = pool.take_scheduler(context); @@ -789,7 +825,8 @@ mod tests { solana_logger::setup(); let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64)); - let pool = DefaultSchedulerPool::new(None, None, None, ignored_prioritization_fee_cache); + let pool = + DefaultSchedulerPool::new(None, None, None, None, ignored_prioritization_fee_cache); let bank = Arc::new(Bank::default_for_tests()); let context = &SchedulingContext::new(bank); @@ -817,7 +854,8 @@ mod tests { solana_logger::setup(); let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64)); - let pool = DefaultSchedulerPool::new(None, None, None, ignored_prioritization_fee_cache); + let pool = + DefaultSchedulerPool::new(None, None, None, None, ignored_prioritization_fee_cache); let bank = Arc::new(Bank::default_for_tests()); let context = &SchedulingContext::new(bank); let mut scheduler = pool.do_take_scheduler(context.clone()); @@ -835,7 +873,8 @@ mod tests { solana_logger::setup(); let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64)); - let pool = DefaultSchedulerPool::new(None, None, None, ignored_prioritization_fee_cache); + let pool = + DefaultSchedulerPool::new(None, None, None, None, ignored_prioritization_fee_cache); let old_bank = &Arc::new(Bank::default_for_tests()); let new_bank = &Arc::new(Bank::default_for_tests()); assert!(!Arc::ptr_eq(old_bank, new_bank)); @@ -861,7 +900,7 @@ mod tests { let mut bank_forks = bank_forks.write().unwrap(); let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64)); let pool = - DefaultSchedulerPool::new_dyn(None, None, None, ignored_prioritization_fee_cache); + DefaultSchedulerPool::new_dyn(None, None, None, None, ignored_prioritization_fee_cache); bank_forks.install_scheduler_pool(pool); } @@ -875,7 +914,7 @@ mod tests { let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64)); let pool = - DefaultSchedulerPool::new_dyn(None, None, None, ignored_prioritization_fee_cache); + DefaultSchedulerPool::new_dyn(None, None, None, None, ignored_prioritization_fee_cache); let bank = Bank::default_for_tests(); let bank_forks = BankForks::new_rw_arc(bank); @@ -928,7 +967,7 @@ mod tests { let bank = setup_dummy_fork_graph(bank); let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64)); let pool = - DefaultSchedulerPool::new_dyn(None, None, None, ignored_prioritization_fee_cache); + DefaultSchedulerPool::new_dyn(None, None, None, None, ignored_prioritization_fee_cache); let context = SchedulingContext::new(bank.clone()); assert_eq!(bank.transaction_count(), 0); @@ -953,7 +992,7 @@ mod tests { let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64)); let pool = - DefaultSchedulerPool::new_dyn(None, None, None, ignored_prioritization_fee_cache); + DefaultSchedulerPool::new_dyn(None, None, None, None, ignored_prioritization_fee_cache); let context = SchedulingContext::new(bank.clone()); let mut scheduler = pool.take_scheduler(context); @@ -1159,6 +1198,7 @@ mod tests { None, None, None, + None, ignored_prioritization_fee_cache, ); let scheduler = pool.take_scheduler(context); @@ -1193,4 +1233,18 @@ mod tests { fn test_scheduler_schedule_execution_recent_blockhash_edge_case_without_race() { do_test_scheduler_schedule_execution_recent_blockhash_edge_case::(); } + + #[test] + fn test_default_handler_count() { + for (detected, expected) in [(32, 8), (4, 1), (2, 1)] { + assert_eq!( + DefaultSchedulerPool::calculate_default_handler_count(Some(detected)), + expected + ); + } + assert_eq!( + DefaultSchedulerPool::calculate_default_handler_count(None), + 4 + ); + } } diff --git a/validator/Cargo.toml b/validator/Cargo.toml index 4028221cd7ce68..5cc76a810116b3 100644 --- a/validator/Cargo.toml +++ b/validator/Cargo.toml @@ -61,6 +61,7 @@ solana-streamer = { workspace = true } solana-svm = { workspace = true } solana-test-validator = { workspace = true } solana-tpu-client = { workspace = true } +solana-unified-scheduler-pool = { workspace = true } solana-version = { workspace = true } solana-vote-program = { workspace = true } symlink = { workspace = true } diff --git a/validator/src/cli.rs b/validator/src/cli.rs index 84f63d3503a3c2..8424d7973f0705 100644 --- a/validator/src/cli.rs +++ b/validator/src/cli.rs @@ -47,6 +47,7 @@ use { self, MAX_BATCH_SEND_RATE_MS, MAX_TRANSACTION_BATCH_SIZE, }, solana_tpu_client::tpu_client::DEFAULT_TPU_CONNECTION_POOL_SIZE, + solana_unified_scheduler_pool::DefaultSchedulerPool, std::{path::PathBuf, str::FromStr}, }; @@ -1530,6 +1531,15 @@ pub fn app<'a>(version: &'a str, default_args: &'a DefaultArgs) -> App<'a, 'a> { .possible_values(BlockProductionMethod::cli_names()) .help(BlockProductionMethod::cli_message()), ) + .arg( + Arg::with_name("unified_scheduler_handler_threads") + .long("unified-scheduler-handler-threads") + .hidden(hidden_unless_forced()) + .value_name("COUNT") + .takes_value(true) + .validator(|s| is_within_range(s, 1..)) + .help(DefaultSchedulerPool::cli_message()), + ) .arg( Arg::with_name("wen_restart") .long("wen-restart") diff --git a/validator/src/main.rs b/validator/src/main.rs index 3c27fec0199bcb..ec70796130e7d2 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -1671,6 +1671,8 @@ pub fn main() { BlockProductionMethod ) .unwrap_or_default(); + validator_config.unified_scheduler_handler_threads = + value_t!(matches, "unified_scheduler_handler_threads", usize).ok(); validator_config.ledger_column_options = LedgerColumnOptions { compression_type: match matches.value_of("rocksdb_ledger_compression") {