-
Notifications
You must be signed in to change notification settings - Fork 4.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add --unified-scheduler-handler-threads #35195
Changes from 3 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -28,7 +28,7 @@ use { | |
input_parsers::{cluster_type_of, pubkey_of, pubkeys_of}, | ||
input_validators::{ | ||
is_parsable, is_pow2, is_pubkey, is_pubkey_or_keypair, is_slot, is_valid_percentage, | ||
validate_maximum_full_snapshot_archives_to_retain, | ||
is_within_range, validate_maximum_full_snapshot_archives_to_retain, | ||
validate_maximum_incremental_snapshot_archives_to_retain, | ||
}, | ||
}, | ||
|
@@ -72,6 +72,7 @@ use { | |
transaction::{MessageHash, SanitizedTransaction, SimpleAddressLoader}, | ||
}, | ||
solana_stake_program::stake_state::{self, PointValue}, | ||
solana_unified_scheduler_pool::DefaultSchedulerPool, | ||
solana_vote_program::{ | ||
self, | ||
vote_state::{self, VoteState}, | ||
|
@@ -847,6 +848,16 @@ fn main() { | |
.hidden(hidden_unless_forced()) | ||
.help(BlockVerificationMethod::cli_message()), | ||
) | ||
.arg( | ||
Arg::with_name("unified_scheduler_handler_threads") | ||
.long("unified-scheduler-handler-threads") | ||
.value_name("COUNT") | ||
.takes_value(true) | ||
.validator(|s| is_within_range(s, 1..)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i thought about introducing some sensible upper bound like 1024, but opted not to do so because this flag doesn't need to be friendly that much and that restriction would hamper some pathological stress testing (again, this isn't strong point, as this is veerry niche & hypothetical use case). |
||
.global(true) | ||
.hidden(hidden_unless_forced()) | ||
.help(DefaultSchedulerPool::cli_message()), | ||
Comment on lines
+852
to
+859
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the actual builder method invocation order and lack of |
||
) | ||
.arg( | ||
Arg::with_name("output_format") | ||
.long("output") | ||
|
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -34,7 +34,7 @@ use { | |
marker::PhantomData, | ||
sync::{ | ||
atomic::{AtomicU64, Ordering::Relaxed}, | ||
Arc, Mutex, Weak, | ||
Arc, Mutex, OnceLock, Weak, | ||
}, | ||
thread::{self, JoinHandle}, | ||
}, | ||
|
@@ -48,6 +48,7 @@ type AtomicSchedulerId = AtomicU64; | |
#[derive(Debug)] | ||
pub struct SchedulerPool<S: SpawnableScheduler<TH>, TH: TaskHandler> { | ||
scheduler_inners: Mutex<Vec<S::Inner>>, | ||
handler_count: usize, | ||
handler_context: HandlerContext, | ||
// weak_self could be elided by changing InstalledScheduler::take_scheduler()'s receiver to | ||
// Arc<Self> from &Self, because SchedulerPool is used as in the form of Arc<SchedulerPool> | ||
|
@@ -83,13 +84,20 @@ where | |
// Some internal impl and test code want an actual concrete type, NOT the | ||
// `dyn InstalledSchedulerPool`. So don't merge this into `Self::new_dyn()`. | ||
fn new( | ||
handler_count: Option<usize>, | ||
log_messages_bytes_limit: Option<usize>, | ||
transaction_status_sender: Option<TransactionStatusSender>, | ||
replay_vote_sender: Option<ReplayVoteSender>, | ||
prioritization_fee_cache: Arc<PrioritizationFeeCache>, | ||
) -> Arc<Self> { | ||
let handler_count = handler_count.unwrap_or(1); | ||
// we're hard-coding the number of handler thread to 1, meaning this impl is currently | ||
// single-threaded still. | ||
assert_eq!(handler_count, 1); // replace this with assert!(handler_count >= 1) later | ||
|
||
Comment on lines
+93
to
+97
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. so, (diff would be a lot clearer if i had more foresight at #34676..) |
||
Arc::new_cyclic(|weak_self| Self { | ||
scheduler_inners: Mutex::default(), | ||
handler_count, | ||
handler_context: HandlerContext { | ||
log_messages_bytes_limit, | ||
transaction_status_sender, | ||
|
@@ -105,12 +113,14 @@ where | |
// This apparently-meaningless wrapper is handy, because some callers explicitly want | ||
// `dyn InstalledSchedulerPool` to be returned for type inference convenience. | ||
pub fn new_dyn( | ||
handler_count: Option<usize>, | ||
log_messages_bytes_limit: Option<usize>, | ||
transaction_status_sender: Option<TransactionStatusSender>, | ||
replay_vote_sender: Option<ReplayVoteSender>, | ||
prioritization_fee_cache: Arc<PrioritizationFeeCache>, | ||
) -> InstalledSchedulerPoolArc { | ||
Self::new( | ||
handler_count, | ||
log_messages_bytes_limit, | ||
transaction_status_sender, | ||
replay_vote_sender, | ||
|
@@ -145,6 +155,37 @@ where | |
S::spawn(self.self_arc(), context) | ||
} | ||
} | ||
|
||
pub fn default_handler_count() -> usize { | ||
Self::calculate_default_handler_count( | ||
thread::available_parallelism() | ||
.ok() | ||
.map(|non_zero| non_zero.get()), | ||
) | ||
} | ||
|
||
pub fn calculate_default_handler_count(detected_cpu_core_count: Option<usize>) -> usize { | ||
// Divide by 4 just not to consume all available CPUs just with handler threads, sparing for | ||
// other active forks and other subsystems. | ||
// Also, if available_parallelism fails (which should be very rare), use 4 threads, | ||
// as a relatively conservatism assumption of modern multi-core systems ranging from | ||
// engineers' laptops to production servers. | ||
detected_cpu_core_count | ||
.map(|core_count| (core_count / 4).max(1)) | ||
.unwrap_or(4) | ||
} | ||
|
||
pub fn cli_message() -> &'static str { | ||
static MESSAGE: OnceLock<String> = OnceLock::new(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. back into the very beginnings of this looong unified scheduler journey, |
||
|
||
MESSAGE.get_or_init(|| { | ||
format!( | ||
"Change the number of the unified scheduler's transaction execution threads \ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. while i'm leaving impl jargon ( |
||
dedicated to each block, otherwise calculated as cpu_cores/4 [default: {}]", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
also, this whole message is specifically worded to put |
||
Self::default_handler_count() | ||
) | ||
}) | ||
} | ||
} | ||
|
||
impl<S, TH> InstalledSchedulerPool for SchedulerPool<S, TH> | ||
|
@@ -372,7 +413,6 @@ pub struct PooledSchedulerInner<S: SpawnableScheduler<TH>, TH: TaskHandler> { | |
struct ThreadManager<S: SpawnableScheduler<TH>, TH: TaskHandler> { | ||
scheduler_id: SchedulerId, | ||
pool: Arc<SchedulerPool<S, TH>>, | ||
handler_count: usize, | ||
new_task_sender: Sender<NewTaskPayload>, | ||
new_task_receiver: Receiver<NewTaskPayload>, | ||
session_result_sender: Sender<Option<ResultWithTimings>>, | ||
|
@@ -384,28 +424,24 @@ struct ThreadManager<S: SpawnableScheduler<TH>, TH: TaskHandler> { | |
|
||
impl<TH: TaskHandler> PooledScheduler<TH> { | ||
fn do_spawn(pool: Arc<SchedulerPool<Self, TH>>, initial_context: SchedulingContext) -> Self { | ||
// we're hard-coding the number of handler thread to 1, meaning this impl is currently | ||
// single-threaded still. | ||
let handler_count = 1; | ||
|
||
Self::from_inner( | ||
PooledSchedulerInner::<Self, TH> { | ||
thread_manager: ThreadManager::new(pool, handler_count), | ||
thread_manager: ThreadManager::new(pool), | ||
}, | ||
initial_context, | ||
) | ||
} | ||
} | ||
|
||
impl<S: SpawnableScheduler<TH>, TH: TaskHandler> ThreadManager<S, TH> { | ||
fn new(pool: Arc<SchedulerPool<S, TH>>, handler_count: usize) -> Self { | ||
fn new(pool: Arc<SchedulerPool<S, TH>>) -> Self { | ||
let (new_task_sender, new_task_receiver) = unbounded(); | ||
let (session_result_sender, session_result_receiver) = unbounded(); | ||
let handler_count = pool.handler_count; | ||
|
||
Self { | ||
scheduler_id: pool.new_scheduler_id(), | ||
pool, | ||
handler_count, | ||
new_task_sender, | ||
new_task_receiver, | ||
session_result_sender, | ||
|
@@ -477,7 +513,7 @@ impl<S: SpawnableScheduler<TH>, TH: TaskHandler> ThreadManager<S, TH> { | |
// 5. the handler thread reply back to the scheduler thread as an executed task. | ||
// 6. the scheduler thread post-processes the executed task. | ||
let scheduler_main_loop = || { | ||
let handler_count = self.handler_count; | ||
let handler_count = self.pool.handler_count; | ||
let session_result_sender = self.session_result_sender.clone(); | ||
let new_task_receiver = self.new_task_receiver.clone(); | ||
|
||
|
@@ -613,7 +649,7 @@ impl<S: SpawnableScheduler<TH>, TH: TaskHandler> ThreadManager<S, TH> { | |
.unwrap(), | ||
); | ||
|
||
self.handler_threads = (0..self.handler_count) | ||
self.handler_threads = (0..self.pool.handler_count) | ||
.map({ | ||
|thx| { | ||
thread::Builder::new() | ||
|
@@ -760,7 +796,7 @@ mod tests { | |
|
||
let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64)); | ||
let pool = | ||
DefaultSchedulerPool::new_dyn(None, None, None, ignored_prioritization_fee_cache); | ||
DefaultSchedulerPool::new_dyn(None, None, None, None, ignored_prioritization_fee_cache); | ||
|
||
// this indirectly proves that there should be circular link because there's only one Arc | ||
// at this moment now | ||
|
@@ -775,7 +811,7 @@ mod tests { | |
|
||
let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64)); | ||
let pool = | ||
DefaultSchedulerPool::new_dyn(None, None, None, ignored_prioritization_fee_cache); | ||
DefaultSchedulerPool::new_dyn(None, None, None, None, ignored_prioritization_fee_cache); | ||
let bank = Arc::new(Bank::default_for_tests()); | ||
let context = SchedulingContext::new(bank); | ||
let scheduler = pool.take_scheduler(context); | ||
|
@@ -789,7 +825,8 @@ mod tests { | |
solana_logger::setup(); | ||
|
||
let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64)); | ||
let pool = DefaultSchedulerPool::new(None, None, None, ignored_prioritization_fee_cache); | ||
let pool = | ||
DefaultSchedulerPool::new(None, None, None, None, ignored_prioritization_fee_cache); | ||
let bank = Arc::new(Bank::default_for_tests()); | ||
let context = &SchedulingContext::new(bank); | ||
|
||
|
@@ -817,7 +854,8 @@ mod tests { | |
solana_logger::setup(); | ||
|
||
let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64)); | ||
let pool = DefaultSchedulerPool::new(None, None, None, ignored_prioritization_fee_cache); | ||
let pool = | ||
DefaultSchedulerPool::new(None, None, None, None, ignored_prioritization_fee_cache); | ||
let bank = Arc::new(Bank::default_for_tests()); | ||
let context = &SchedulingContext::new(bank); | ||
let mut scheduler = pool.do_take_scheduler(context.clone()); | ||
|
@@ -835,7 +873,8 @@ mod tests { | |
solana_logger::setup(); | ||
|
||
let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64)); | ||
let pool = DefaultSchedulerPool::new(None, None, None, ignored_prioritization_fee_cache); | ||
let pool = | ||
DefaultSchedulerPool::new(None, None, None, None, ignored_prioritization_fee_cache); | ||
let old_bank = &Arc::new(Bank::default_for_tests()); | ||
let new_bank = &Arc::new(Bank::default_for_tests()); | ||
assert!(!Arc::ptr_eq(old_bank, new_bank)); | ||
|
@@ -861,7 +900,7 @@ mod tests { | |
let mut bank_forks = bank_forks.write().unwrap(); | ||
let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64)); | ||
let pool = | ||
DefaultSchedulerPool::new_dyn(None, None, None, ignored_prioritization_fee_cache); | ||
DefaultSchedulerPool::new_dyn(None, None, None, None, ignored_prioritization_fee_cache); | ||
bank_forks.install_scheduler_pool(pool); | ||
} | ||
|
||
|
@@ -875,7 +914,7 @@ mod tests { | |
|
||
let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64)); | ||
let pool = | ||
DefaultSchedulerPool::new_dyn(None, None, None, ignored_prioritization_fee_cache); | ||
DefaultSchedulerPool::new_dyn(None, None, None, None, ignored_prioritization_fee_cache); | ||
|
||
let bank = Bank::default_for_tests(); | ||
let bank_forks = BankForks::new_rw_arc(bank); | ||
|
@@ -928,7 +967,7 @@ mod tests { | |
let bank = setup_dummy_fork_graph(bank); | ||
let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64)); | ||
let pool = | ||
DefaultSchedulerPool::new_dyn(None, None, None, ignored_prioritization_fee_cache); | ||
DefaultSchedulerPool::new_dyn(None, None, None, None, ignored_prioritization_fee_cache); | ||
let context = SchedulingContext::new(bank.clone()); | ||
|
||
assert_eq!(bank.transaction_count(), 0); | ||
|
@@ -953,7 +992,7 @@ mod tests { | |
|
||
let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64)); | ||
let pool = | ||
DefaultSchedulerPool::new_dyn(None, None, None, ignored_prioritization_fee_cache); | ||
DefaultSchedulerPool::new_dyn(None, None, None, None, ignored_prioritization_fee_cache); | ||
let context = SchedulingContext::new(bank.clone()); | ||
let mut scheduler = pool.take_scheduler(context); | ||
|
||
|
@@ -1159,6 +1198,7 @@ mod tests { | |
None, | ||
None, | ||
None, | ||
None, | ||
ignored_prioritization_fee_cache, | ||
); | ||
let scheduler = pool.take_scheduler(context); | ||
|
@@ -1193,4 +1233,18 @@ mod tests { | |
fn test_scheduler_schedule_execution_recent_blockhash_edge_case_without_race() { | ||
do_test_scheduler_schedule_execution_recent_blockhash_edge_case::<false>(); | ||
} | ||
|
||
#[test] | ||
fn test_default_handler_count() { | ||
for (detected, expected) in [(32, 8), (4, 1), (2, 1)] { | ||
assert_eq!( | ||
DefaultSchedulerPool::calculate_default_handler_count(Some(detected)), | ||
expected | ||
); | ||
} | ||
assert_eq!( | ||
DefaultSchedulerPool::calculate_default_handler_count(None), | ||
4 | ||
); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
outside
unified-scheduler-pool
code, i thinkhandler thread count
is too verbose and impl-oriented naming. so, I usedhandler threads
intentionally. Also note that there's bunch of--*-threads
cli flags insolana-validator
as precedents. so, ending at-treads
should be no-brainer.on the other hand,
handler
is left for technical rigidness, because there's additional threadscScheduler
, which are created for each scheduler. and the thread doesn't count towards tounified-scheduler-handler-threads
.