From c6f3e1bb158e8389a31c274df2f2822a084fe029 Mon Sep 17 00:00:00 2001 From: Alessandro Decina Date: Sat, 21 Dec 2024 12:40:23 +1100 Subject: [PATCH] rpc: improve latency by not blocking worker threads polling IO notifications (#3242) * rpc: limit the number of blocking threads in tokio runtime By default tokio allows up to 512 blocking threas. We don't want that many threads, as they'd slow down other validator threads. * rpc: make getMultipleAccounts async Make the function async and use tokio::task::spawn_blocking() to execute CPU-bound code in background. This prevents stalling the worker threads polling IO notifications and serving other non CPU-bound rpc methods. * rpc: make getAccount async * rpc: run get_filtered_program_accounts with task::spawn_blocking get_filtered_program_accounts can be used to retrieve _a list_ of accounts that match some filters. This is CPU bound and can block the calling thread for a significant amount of time when copying many/large accounts. * rpc: use our custom runtime to spawn blocking tasks Pass the custom runtime to JsonRpcRequestProcessor and use it to spawn blocking tasks from rpc methods. * Make `get_blocks()` and `get_block()` yieldy When these methods reach out to Blockstore, yield the thread * Make `get_supply()` yieldy When this method reaches out to accounts_db (through a call to `calculate_non_circulating_supply()`), yield the thread. * Make `get_first_available_block()` yieldy When this method reaches out to blockstore, yield the thread * Make `get_transaction()` yieldy When this method reaches out to blockstore, yield the thread * Make `get_token_supply()` yieldy When this method reaches out to methods on bank that do reads, yield the thread * Make the choice of `cpus / 4` as the default for `rpc_blocking_threads` * Encode blocks async * Revert "Make `get_first_available_block()` yieldy" This blockstore method doesn't actually do expensive reads. This reverts commit 3bbc57f8ebc70e712a8a0ef66bbdff20011b9b7c. * Revert "Make `get_blocks()` and `get_block()` yieldy" Kept the `spawn_blocking` around: * Call to `get_rooted_block` * Call to `get_complete_block` This reverts commit 710f9c69a6b256ccec235153e4826912bb654f15. * Revert "Make `get_token_supply()` yieldy" * Reverted the change to `interest_bearing_config` * Reverted moving `bank.get_account(&mint)` to the background pool This reverts commit 02f5c9473064b5e2eb9288fb61e8b10bc8df72df. * Share spawned call to `calculate_non_circulating_supply` between `get_supply` and `get_largest_accounts` * Create a shim for `get_filtered_indexed_accounts` that sends the work to the background thread internally * Send call to `get_largest_accounts` to the background pool --------- Co-authored-by: Steven Luscher --- rpc/src/rpc.rs | 673 +++++++++++++++++++++++++++-------------- rpc/src/rpc_service.rs | 52 +++- validator/src/cli.rs | 23 ++ validator/src/main.rs | 1 + 4 files changed, 507 insertions(+), 242 deletions(-) diff --git a/rpc/src/rpc.rs b/rpc/src/rpc.rs index dfe856ee0b6a1a..baffa72b8e0788 100644 --- a/rpc/src/rpc.rs +++ b/rpc/src/rpc.rs @@ -8,7 +8,11 @@ use { base64::{prelude::BASE64_STANDARD, Engine}, bincode::{config::Options, serialize}, crossbeam_channel::{unbounded, Receiver, Sender}, - jsonrpc_core::{futures::future, types::error, BoxFuture, Error, Metadata, Result}, + jsonrpc_core::{ + futures::future::{self, FutureExt, OptionFuture}, + types::error, + BoxFuture, Error, Metadata, Result, + }, jsonrpc_derive::rpc, solana_account_decoder::{ encode_ui_account, @@ -18,7 +22,7 @@ use { }, solana_accounts_db::{ accounts::AccountAddressFilter, - accounts_index::{AccountIndex, AccountSecondaryIndexes, IndexKey, ScanConfig}, + accounts_index::{AccountIndex, AccountSecondaryIndexes, IndexKey, ScanConfig, ScanResult}, }, solana_client::connection_cache::Protocol, solana_entry::entry::Entry, @@ -55,7 +59,7 @@ use { bank_forks::BankForks, commitment::{BlockCommitmentArray, BlockCommitmentCache}, installed_scheduler_pool::BankWithScheduler, - non_circulating_supply::calculate_non_circulating_supply, + non_circulating_supply::{calculate_non_circulating_supply, NonCirculatingSupply}, prioritization_fee_cache::PrioritizationFeeCache, snapshot_config::SnapshotConfig, snapshot_utils, @@ -79,6 +83,7 @@ use { self, AddressLoader, MessageHash, SanitizedTransaction, TransactionError, VersionedTransaction, MAX_TX_ACCOUNT_LOCKS, }, + transaction_context::TransactionAccount, }, solana_send_transaction_service::send_transaction_service::TransactionInfo, solana_stake_program, @@ -112,6 +117,7 @@ use { }, time::Duration, }, + tokio::runtime::Runtime, }; #[cfg(test)] use { @@ -150,7 +156,7 @@ fn is_finalized( && (blockstore.is_root(slot) || bank.status_cache_ancestors().contains(&slot)) } -#[derive(Debug, Default, Clone)] +#[derive(Debug, Clone)] pub struct JsonRpcConfig { pub enable_rpc_transaction_history: bool, pub enable_extended_tx_metadata_storage: bool, @@ -161,6 +167,7 @@ pub struct JsonRpcConfig { pub max_multiple_accounts: Option, pub account_indexes: AccountSecondaryIndexes, pub rpc_threads: usize, + pub rpc_blocking_threads: usize, pub rpc_niceness_adj: i8, pub full_api: bool, pub rpc_scan_and_fix_roots: bool, @@ -169,6 +176,28 @@ pub struct JsonRpcConfig { pub disable_health_check: bool, } +impl Default for JsonRpcConfig { + fn default() -> Self { + Self { + enable_rpc_transaction_history: Default::default(), + enable_extended_tx_metadata_storage: Default::default(), + faucet_addr: Option::default(), + health_check_slot_distance: Default::default(), + skip_preflight_health_check: bool::default(), + rpc_bigtable_config: Option::default(), + max_multiple_accounts: Option::default(), + account_indexes: AccountSecondaryIndexes::default(), + rpc_threads: 1, + rpc_blocking_threads: 1, + rpc_niceness_adj: Default::default(), + full_api: Default::default(), + rpc_scan_and_fix_roots: Default::default(), + max_request_body_size: Option::default(), + disable_health_check: Default::default(), + } + } +} + impl JsonRpcConfig { pub fn default_for_test() -> Self { Self { @@ -223,6 +252,7 @@ pub struct JsonRpcRequestProcessor { max_complete_transaction_status_slot: Arc, max_complete_rewards_slot: Arc, prioritization_fee_cache: Arc, + runtime: Arc, } impl Metadata for JsonRpcRequestProcessor {} @@ -253,6 +283,51 @@ impl JsonRpcRequestProcessor { Ok(bank) } + async fn calculate_non_circulating_supply( + &self, + bank: &Arc, + ) -> ScanResult { + let bank = Arc::clone(bank); + self.runtime + .spawn_blocking(move || calculate_non_circulating_supply(&bank)) + .await + .expect("Failed to spawn blocking task") + } + + pub async fn get_filtered_indexed_accounts( + &self, + bank: &Arc, + index_key: &IndexKey, + program_id: &Pubkey, + filters: Vec, + sort_results: bool, + ) -> ScanResult> { + let bank = Arc::clone(bank); + let index_key = index_key.to_owned(); + let program_id = program_id.to_owned(); + self.runtime + .spawn_blocking(move || { + bank.get_filtered_indexed_accounts( + &index_key, + |account| { + // The program-id account index checks for Account owner on inclusion. + // However, due to the current AccountsDb implementation, an account may + // remain in storage as a zero-lamport AccountSharedData::Default() after + // being wiped and reinitialized in later updates. We include the redundant + // filters here to avoid returning these accounts. + account.owner().eq(&program_id) + && filters + .iter() + .all(|filter_type| filter_allows(filter_type, account)) + }, + &ScanConfig::new(!sort_results), + bank.byte_limit_for_scans(), + ) + }) + .await + .expect("Failed to spawn blocking task") + } + #[allow(deprecated)] fn bank(&self, commitment: Option) -> Arc { debug!("RPC commitment_config: {:?}", commitment); @@ -329,6 +404,7 @@ impl JsonRpcRequestProcessor { max_complete_transaction_status_slot: Arc, max_complete_rewards_slot: Arc, prioritization_fee_cache: Arc, + runtime: Arc, ) -> (Self, Receiver) { let (transaction_sender, transaction_receiver) = unbounded(); ( @@ -351,6 +427,7 @@ impl JsonRpcRequestProcessor { max_complete_transaction_status_slot, max_complete_rewards_slot, prioritization_fee_cache, + runtime, }, transaction_receiver, ) @@ -362,6 +439,8 @@ impl JsonRpcRequestProcessor { socket_addr_space: SocketAddrSpace, connection_cache: Arc, ) -> Self { + use crate::rpc_service::service_runtime; + let genesis_hash = bank.hash(); let bank_forks = BankForks::new_rw_arc(bank); let bank = bank_forks.read().unwrap().root_bank(); @@ -401,8 +480,15 @@ impl JsonRpcRequestProcessor { let slot = bank.slot(); let optimistically_confirmed_bank = Arc::new(RwLock::new(OptimisticallyConfirmedBank { bank })); + let config = JsonRpcConfig::default(); + let JsonRpcConfig { + rpc_threads, + rpc_blocking_threads, + rpc_niceness_adj, + .. + } = config; Self { - config: JsonRpcConfig::default(), + config, snapshot_config: None, bank_forks, block_commitment_cache: Arc::new(RwLock::new(BlockCommitmentCache::new( @@ -430,12 +516,13 @@ impl JsonRpcRequestProcessor { max_complete_transaction_status_slot: Arc::new(AtomicU64::default()), max_complete_rewards_slot: Arc::new(AtomicU64::default()), prioritization_fee_cache: Arc::new(PrioritizationFeeCache::default()), + runtime: service_runtime(rpc_threads, rpc_blocking_threads, rpc_niceness_adj), } } - pub fn get_account_info( + pub async fn get_account_info( &self, - pubkey: &Pubkey, + pubkey: Pubkey, config: Option, ) -> Result>> { let RpcAccountInfoConfig { @@ -450,11 +537,18 @@ impl JsonRpcRequestProcessor { })?; let encoding = encoding.unwrap_or(UiAccountEncoding::Binary); - let response = get_encoded_account(&bank, pubkey, encoding, data_slice, None)?; + let response = self + .runtime + .spawn_blocking({ + let bank = Arc::clone(&bank); + move || get_encoded_account(&bank, &pubkey, encoding, data_slice, None) + }) + .await + .expect("rpc: get_encoded_account panicked")?; Ok(new_response(&bank, response)) } - pub fn get_multiple_accounts( + pub async fn get_multiple_accounts( &self, pubkeys: Vec, config: Option, @@ -471,10 +565,18 @@ impl JsonRpcRequestProcessor { })?; let encoding = encoding.unwrap_or(UiAccountEncoding::Base64); - let accounts = pubkeys - .into_iter() - .map(|pubkey| get_encoded_account(&bank, &pubkey, encoding, data_slice, None)) - .collect::>>()?; + let mut accounts = Vec::with_capacity(pubkeys.len()); + for pubkey in pubkeys { + let bank = Arc::clone(&bank); + accounts.push( + self.runtime + .spawn_blocking(move || { + get_encoded_account(&bank, &pubkey, encoding, data_slice, None) + }) + .await + .expect("rpc: get_encoded_account panicked")?, + ); + } Ok(new_response(&bank, accounts)) } @@ -487,9 +589,9 @@ impl JsonRpcRequestProcessor { .get_minimum_balance_for_rent_exemption(data_len) } - pub fn get_program_accounts( + pub async fn get_program_accounts( &self, - program_id: &Pubkey, + program_id: Pubkey, config: Option, mut filters: Vec, with_context: bool, @@ -508,30 +610,38 @@ impl JsonRpcRequestProcessor { let encoding = encoding.unwrap_or(UiAccountEncoding::Binary); optimize_filters(&mut filters); let keyed_accounts = { - if let Some(owner) = get_spl_token_owner_filter(program_id, &filters) { + if let Some(owner) = get_spl_token_owner_filter(&program_id, &filters) { self.get_filtered_spl_token_accounts_by_owner( - &bank, + Arc::clone(&bank), program_id, - &owner, + owner, filters, sort_results, - )? - } else if let Some(mint) = get_spl_token_mint_filter(program_id, &filters) { + ) + .await? + } else if let Some(mint) = get_spl_token_mint_filter(&program_id, &filters) { self.get_filtered_spl_token_accounts_by_mint( - &bank, + Arc::clone(&bank), program_id, - &mint, + mint, filters, sort_results, - )? + ) + .await? } else { - self.get_filtered_program_accounts(&bank, program_id, filters, sort_results)? + self.get_filtered_program_accounts( + Arc::clone(&bank), + program_id, + filters, + sort_results, + ) + .await? } }; - let accounts = if is_known_spl_token_id(program_id) + let accounts = if is_known_spl_token_id(&program_id) && encoding == UiAccountEncoding::JsonParsed { - get_parsed_token_accounts(bank.clone(), keyed_accounts.into_iter()).collect() + get_parsed_token_accounts(Arc::clone(&bank), keyed_accounts.into_iter()).collect() } else { keyed_accounts .into_iter() @@ -925,7 +1035,7 @@ impl JsonRpcRequestProcessor { largest_accounts_cache.set_largest_accounts(filter, slot, accounts) } - fn get_largest_accounts( + async fn get_largest_accounts( &self, config: Option, ) -> RpcCustomResult>> { @@ -940,11 +1050,11 @@ impl JsonRpcRequestProcessor { }) } else { let (addresses, address_filter) = if let Some(filter) = config.clone().filter { - let non_circulating_supply = - calculate_non_circulating_supply(&bank).map_err(|e| { - RpcCustomError::ScanError { - message: e.to_string(), - } + let non_circulating_supply = self + .calculate_non_circulating_supply(&bank) + .await + .map_err(|e| RpcCustomError::ScanError { + message: e.to_string(), })?; let addresses = non_circulating_supply.accounts.into_iter().collect(); let address_filter = match filter { @@ -955,13 +1065,21 @@ impl JsonRpcRequestProcessor { } else { (HashSet::new(), AccountAddressFilter::Exclude) }; - let accounts = bank - .get_largest_accounts( - NUM_LARGEST_ACCOUNTS, - &addresses, - address_filter, - sort_results, - ) + let accounts = self + .runtime + .spawn_blocking({ + let bank = Arc::clone(&bank); + move || { + bank.get_largest_accounts( + NUM_LARGEST_ACCOUNTS, + &addresses, + address_filter, + sort_results, + ) + } + }) + .await + .expect("Failed to spawn blocking task") .map_err(|e| RpcCustomError::ScanError { message: e.to_string(), })? @@ -977,16 +1095,18 @@ impl JsonRpcRequestProcessor { } } - fn get_supply( + async fn get_supply( &self, config: Option, ) -> RpcCustomResult> { let config = config.unwrap_or_default(); let bank = self.bank(config.commitment); let non_circulating_supply = - calculate_non_circulating_supply(&bank).map_err(|e| RpcCustomError::ScanError { - message: e.to_string(), - })?; + self.calculate_non_circulating_supply(&bank) + .await + .map_err(|e| RpcCustomError::ScanError { + message: e.to_string(), + })?; let total_supply = bank.capitalization(); let non_circulating_accounts = if config.exclude_non_circulating_accounts_list { vec![] @@ -1191,42 +1311,65 @@ impl JsonRpcRequestProcessor { .highest_super_majority_root() { self.check_blockstore_writes_complete(slot)?; - let result = self.blockstore.get_rooted_block(slot, true); + let result = self + .runtime + .spawn_blocking({ + let blockstore = Arc::clone(&self.blockstore); + move || blockstore.get_rooted_block(slot, true) + }) + .await + .expect("Failed to spawn blocking task"); self.check_blockstore_root(&result, slot)?; - let encode_block = |confirmed_block: ConfirmedBlock| -> Result { - let mut encoded_block = confirmed_block - .encode_with_options(encoding, encoding_options) - .map_err(RpcCustomError::from)?; + let encode_block = |confirmed_block: ConfirmedBlock| async move { + let mut encoded_block = self + .runtime + .spawn_blocking(move || { + confirmed_block + .encode_with_options(encoding, encoding_options) + .map_err(RpcCustomError::from) + }) + .await + .expect("Failed to spawn blocking task")?; if slot == 0 { encoded_block.block_time = Some(self.genesis_creation_time()); encoded_block.block_height = Some(0); } - Ok(encoded_block) + Ok::(encoded_block) }; if result.is_err() { if let Some(bigtable_ledger_storage) = &self.bigtable_ledger_storage { let bigtable_result = bigtable_ledger_storage.get_confirmed_block(slot).await; self.check_bigtable_result(&bigtable_result)?; - return bigtable_result.ok().map(encode_block).transpose(); + let encoded_block_future: OptionFuture<_> = + bigtable_result.ok().map(encode_block).into(); + return encoded_block_future.await.transpose(); } } self.check_slot_cleaned_up(&result, slot)?; - return result + let encoded_block_future: OptionFuture<_> = result .ok() .map(ConfirmedBlock::from) .map(encode_block) - .transpose(); + .into(); + return encoded_block_future.await.transpose(); } else if commitment.is_confirmed() { // Check if block is confirmed let confirmed_bank = self.bank(Some(CommitmentConfig::confirmed())); if confirmed_bank.status_cache_ancestors().contains(&slot) { self.check_blockstore_writes_complete(slot)?; - let result = self.blockstore.get_complete_block(slot, true); - return result + let result = self + .runtime + .spawn_blocking({ + let blockstore = Arc::clone(&self.blockstore); + move || blockstore.get_complete_block(slot, true) + }) + .await + .expect("Failed to spawn blocking task"); + let encoded_block_future: OptionFuture<_> = result .ok() .map(ConfirmedBlock::from) - .map(|mut confirmed_block| -> Result { + .map(|mut confirmed_block| async move { if confirmed_block.block_time.is_none() || confirmed_block.block_height.is_none() { @@ -1241,12 +1384,20 @@ impl JsonRpcRequestProcessor { } } } - - Ok(confirmed_block - .encode_with_options(encoding, encoding_options) - .map_err(RpcCustomError::from)?) + let encoded_block = self + .runtime + .spawn_blocking(move || { + confirmed_block + .encode_with_options(encoding, encoding_options) + .map_err(RpcCustomError::from) + }) + .await + .expect("Failed to spawn blocking task")?; + + Ok(encoded_block) }) - .transpose(); + .into(); + return encoded_block_future.await.transpose(); } } } else { @@ -1604,13 +1755,22 @@ impl JsonRpcRequestProcessor { if self.config.enable_rpc_transaction_history { let confirmed_bank = self.bank(Some(CommitmentConfig::confirmed())); - let confirmed_transaction = if commitment.is_confirmed() { - let highest_confirmed_slot = confirmed_bank.slot(); - self.blockstore - .get_complete_transaction(signature, highest_confirmed_slot) - } else { - self.blockstore.get_rooted_transaction(signature) - }; + let confirmed_transaction = self + .runtime + .spawn_blocking({ + let blockstore = Arc::clone(&self.blockstore); + let confirmed_bank = Arc::clone(&confirmed_bank); + move || { + if commitment.is_confirmed() { + let highest_confirmed_slot = confirmed_bank.slot(); + blockstore.get_complete_transaction(signature, highest_confirmed_slot) + } else { + blockstore.get_rooted_transaction(signature) + } + } + }) + .await + .expect("Failed to spawn blocking task"); let encode_transaction = |confirmed_tx_with_meta: ConfirmedTransactionWithStatusMeta| -> Result { @@ -1886,13 +2046,13 @@ impl JsonRpcRequestProcessor { Ok(new_response(&bank, supply)) } - pub fn get_token_largest_accounts( + pub async fn get_token_largest_accounts( &self, - mint: &Pubkey, + mint: Pubkey, commitment: Option, ) -> Result>> { let bank = self.bank(commitment); - let (mint_owner, data) = get_mint_owner_and_additional_data(&bank, mint)?; + let (mint_owner, data) = get_mint_owner_and_additional_data(&bank, &mint)?; if !is_known_spl_token_id(&mint_owner) { return Err(Error::invalid_params( "Invalid param: not a Token mint".to_string(), @@ -1901,8 +2061,15 @@ impl JsonRpcRequestProcessor { let mut token_balances = BinaryHeap::>::with_capacity(NUM_LARGEST_ACCOUNTS); - for (address, account) in - self.get_filtered_spl_token_accounts_by_mint(&bank, &mint_owner, mint, vec![], true)? + for (address, account) in self + .get_filtered_spl_token_accounts_by_mint( + Arc::clone(&bank), + mint_owner, + mint, + vec![], + true, + ) + .await? { let amount = StateWithExtensions::::unpack(account.data()) .map(|account| account.base.amount) @@ -1935,9 +2102,9 @@ impl JsonRpcRequestProcessor { Ok(new_response(&bank, token_balances)) } - pub fn get_token_accounts_by_owner( + pub async fn get_token_accounts_by_owner( &self, - owner: &Pubkey, + owner: Pubkey, token_account_filter: TokenAccountsFilter, config: Option, sort_results: bool, @@ -1964,13 +2131,15 @@ impl JsonRpcRequestProcessor { ))); } - let keyed_accounts = self.get_filtered_spl_token_accounts_by_owner( - &bank, - &token_program_id, - owner, - filters, - sort_results, - )?; + let keyed_accounts = self + .get_filtered_spl_token_accounts_by_owner( + Arc::clone(&bank), + token_program_id, + owner, + filters, + sort_results, + ) + .await?; let accounts = if encoding == UiAccountEncoding::JsonParsed { get_parsed_token_accounts(bank.clone(), keyed_accounts.into_iter()).collect() } else { @@ -1987,9 +2156,9 @@ impl JsonRpcRequestProcessor { Ok(new_response(&bank, accounts)) } - pub fn get_token_accounts_by_delegate( + pub async fn get_token_accounts_by_delegate( &self, - delegate: &Pubkey, + delegate: Pubkey, token_account_filter: TokenAccountsFilter, config: Option, sort_results: bool, @@ -2019,16 +2188,23 @@ impl JsonRpcRequestProcessor { // Optional filter on Mint address, uses mint account index for scan let keyed_accounts = if let Some(mint) = mint { self.get_filtered_spl_token_accounts_by_mint( - &bank, - &token_program_id, - &mint, + Arc::clone(&bank), + token_program_id, + mint, filters, sort_results, - )? + ) + .await? } else { // Filter on Token Account state filters.push(RpcFilterType::TokenAccountState); - self.get_filtered_program_accounts(&bank, &token_program_id, filters, sort_results)? + self.get_filtered_program_accounts( + Arc::clone(&bank), + token_program_id, + filters, + sort_results, + ) + .await? }; let accounts = if encoding == UiAccountEncoding::JsonParsed { get_parsed_token_accounts(bank.clone(), keyed_accounts.into_iter()).collect() @@ -2047,66 +2223,63 @@ impl JsonRpcRequestProcessor { } /// Use a set of filters to get an iterator of keyed program accounts from a bank - fn get_filtered_program_accounts( + async fn get_filtered_program_accounts( &self, - bank: &Bank, - program_id: &Pubkey, + bank: Arc, + program_id: Pubkey, mut filters: Vec, sort_results: bool, ) -> RpcCustomResult> { optimize_filters(&mut filters); - let filter_closure = |account: &AccountSharedData| { - filters - .iter() - .all(|filter_type| filter_allows(filter_type, account)) - }; if self .config .account_indexes .contains(&AccountIndex::ProgramId) { - if !self.config.account_indexes.include_key(program_id) { + if !self.config.account_indexes.include_key(&program_id) { return Err(RpcCustomError::KeyExcludedFromSecondaryIndex { index_key: program_id.to_string(), }); } - Ok(bank - .get_filtered_indexed_accounts( - &IndexKey::ProgramId(*program_id), - |account| { - // The program-id account index checks for Account owner on inclusion. However, due - // to the current AccountsDb implementation, an account may remain in storage as a - // zero-lamport AccountSharedData::Default() after being wiped and reinitialized in later - // updates. We include the redundant filters here to avoid returning these - // accounts. - account.owner() == program_id && filter_closure(account) - }, - &ScanConfig::new(!sort_results), - bank.byte_limit_for_scans(), - ) - .map_err(|e| RpcCustomError::ScanError { - message: e.to_string(), - })?) + self.get_filtered_indexed_accounts( + &bank, + &IndexKey::ProgramId(program_id), + &program_id, + filters, + sort_results, + ) + .await + .map_err(|e| RpcCustomError::ScanError { + message: e.to_string(), + }) } else { // this path does not need to provide a mb limit because we only want to support secondary indexes - Ok(bank - .get_filtered_program_accounts( - program_id, - filter_closure, - &ScanConfig::new(!sort_results), - ) - .map_err(|e| RpcCustomError::ScanError { - message: e.to_string(), - })?) + self.runtime + .spawn_blocking(move || { + bank.get_filtered_program_accounts( + &program_id, + |account: &AccountSharedData| { + filters + .iter() + .all(|filter_type| filter_allows(filter_type, account)) + }, + &ScanConfig::new(!sort_results), + ) + .map_err(|e| RpcCustomError::ScanError { + message: e.to_string(), + }) + }) + .await + .expect("Failed to spawn blocking task") } } /// Get an iterator of spl-token accounts by owner address - fn get_filtered_spl_token_accounts_by_owner( + async fn get_filtered_spl_token_accounts_by_owner( &self, - bank: &Bank, - program_id: &Pubkey, - owner_key: &Pubkey, + bank: Arc, + program_id: Pubkey, + owner_key: Pubkey, mut filters: Vec, sort_results: bool, ) -> RpcCustomResult> { @@ -2128,37 +2301,34 @@ impl JsonRpcRequestProcessor { .account_indexes .contains(&AccountIndex::SplTokenOwner) { - if !self.config.account_indexes.include_key(owner_key) { + if !self.config.account_indexes.include_key(&owner_key) { return Err(RpcCustomError::KeyExcludedFromSecondaryIndex { index_key: owner_key.to_string(), }); } - Ok(bank - .get_filtered_indexed_accounts( - &IndexKey::SplTokenOwner(*owner_key), - |account| { - account.owner() == program_id - && filters - .iter() - .all(|filter_type| filter_allows(filter_type, account)) - }, - &ScanConfig::new(!sort_results), - bank.byte_limit_for_scans(), - ) - .map_err(|e| RpcCustomError::ScanError { - message: e.to_string(), - })?) + self.get_filtered_indexed_accounts( + &bank, + &IndexKey::SplTokenOwner(owner_key), + &program_id, + filters, + sort_results, + ) + .await + .map_err(|e| RpcCustomError::ScanError { + message: e.to_string(), + }) } else { self.get_filtered_program_accounts(bank, program_id, filters, sort_results) + .await } } /// Get an iterator of spl-token accounts by mint address - fn get_filtered_spl_token_accounts_by_mint( + async fn get_filtered_spl_token_accounts_by_mint( &self, - bank: &Bank, - program_id: &Pubkey, - mint_key: &Pubkey, + bank: Arc, + program_id: Pubkey, + mint_key: Pubkey, mut filters: Vec, sort_results: bool, ) -> RpcCustomResult> { @@ -2179,28 +2349,25 @@ impl JsonRpcRequestProcessor { .account_indexes .contains(&AccountIndex::SplTokenMint) { - if !self.config.account_indexes.include_key(mint_key) { + if !self.config.account_indexes.include_key(&mint_key) { return Err(RpcCustomError::KeyExcludedFromSecondaryIndex { index_key: mint_key.to_string(), }); } - Ok(bank - .get_filtered_indexed_accounts( - &IndexKey::SplTokenMint(*mint_key), - |account| { - account.owner() == program_id - && filters - .iter() - .all(|filter_type| filter_allows(filter_type, account)) - }, - &ScanConfig::new(!sort_results), - bank.byte_limit_for_scans(), - ) - .map_err(|e| RpcCustomError::ScanError { - message: e.to_string(), - })?) + self.get_filtered_indexed_accounts( + &bank, + &IndexKey::SplTokenMint(mint_key), + &program_id, + filters, + sort_results, + ) + .await + .map_err(|e| RpcCustomError::ScanError { + message: e.to_string(), + }) } else { self.get_filtered_program_accounts(bank, program_id, filters, sort_results) + .await } } @@ -3020,7 +3187,7 @@ pub mod rpc_accounts { meta: Self::Metadata, pubkey_str: String, config: Option, - ) -> Result>>; + ) -> BoxFuture>>>; #[rpc(meta, name = "getMultipleAccounts")] fn get_multiple_accounts( @@ -3028,7 +3195,7 @@ pub mod rpc_accounts { meta: Self::Metadata, pubkey_strs: Vec, config: Option, - ) -> Result>>>; + ) -> BoxFuture>>>>; #[rpc(meta, name = "getBlockCommitment")] fn get_block_commitment( @@ -3067,10 +3234,13 @@ pub mod rpc_accounts { meta: Self::Metadata, pubkey_str: String, config: Option, - ) -> Result>> { + ) -> BoxFuture>>> { debug!("get_account_info rpc request received: {:?}", pubkey_str); - let pubkey = verify_pubkey(&pubkey_str)?; - meta.get_account_info(&pubkey, config) + async move { + let pubkey = verify_pubkey(&pubkey_str)?; + meta.get_account_info(pubkey, config).await + } + .boxed() } fn get_multiple_accounts( @@ -3078,26 +3248,28 @@ pub mod rpc_accounts { meta: Self::Metadata, pubkey_strs: Vec, config: Option, - ) -> Result>>> { + ) -> BoxFuture>>>> { debug!( "get_multiple_accounts rpc request received: {:?}", pubkey_strs.len() ); - - let max_multiple_accounts = meta - .config - .max_multiple_accounts - .unwrap_or(MAX_MULTIPLE_ACCOUNTS); - if pubkey_strs.len() > max_multiple_accounts { - return Err(Error::invalid_params(format!( - "Too many inputs provided; max {max_multiple_accounts}" - ))); + async move { + let max_multiple_accounts = meta + .config + .max_multiple_accounts + .unwrap_or(MAX_MULTIPLE_ACCOUNTS); + if pubkey_strs.len() > max_multiple_accounts { + return Err(Error::invalid_params(format!( + "Too many inputs provided; max {max_multiple_accounts}" + ))); + } + let pubkeys = pubkey_strs + .into_iter() + .map(|pubkey_str| verify_pubkey(&pubkey_str)) + .collect::>>()?; + meta.get_multiple_accounts(pubkeys, config).await } - let pubkeys = pubkey_strs - .into_iter() - .map(|pubkey_str| verify_pubkey(&pubkey_str)) - .collect::>>()?; - meta.get_multiple_accounts(pubkeys, config) + .boxed() } fn get_block_commitment( @@ -3151,21 +3323,21 @@ pub mod rpc_accounts_scan { meta: Self::Metadata, program_id_str: String, config: Option, - ) -> Result>>; + ) -> BoxFuture>>>; #[rpc(meta, name = "getLargestAccounts")] fn get_largest_accounts( &self, meta: Self::Metadata, config: Option, - ) -> Result>>; + ) -> BoxFuture>>>; #[rpc(meta, name = "getSupply")] fn get_supply( &self, meta: Self::Metadata, config: Option, - ) -> Result>; + ) -> BoxFuture>>; // SPL Token-specific RPC endpoints // See https://github.com/solana-labs/solana-program-library/releases/tag/token-v2.0.0 for @@ -3177,7 +3349,7 @@ pub mod rpc_accounts_scan { meta: Self::Metadata, mint_str: String, commitment: Option, - ) -> Result>>; + ) -> BoxFuture>>>; #[rpc(meta, name = "getTokenAccountsByOwner")] fn get_token_accounts_by_owner( @@ -3186,7 +3358,7 @@ pub mod rpc_accounts_scan { owner_str: String, token_account_filter: RpcTokenAccountsFilter, config: Option, - ) -> Result>>; + ) -> BoxFuture>>>; #[rpc(meta, name = "getTokenAccountsByDelegate")] fn get_token_accounts_by_delegate( @@ -3195,7 +3367,7 @@ pub mod rpc_accounts_scan { delegate_str: String, token_account_filter: RpcTokenAccountsFilter, config: Option, - ) -> Result>>; + ) -> BoxFuture>>>; } pub struct AccountsScanImpl; @@ -3207,49 +3379,53 @@ pub mod rpc_accounts_scan { meta: Self::Metadata, program_id_str: String, config: Option, - ) -> Result>> { + ) -> BoxFuture>>> { debug!( "get_program_accounts rpc request received: {:?}", program_id_str ); - let program_id = verify_pubkey(&program_id_str)?; - let (config, filters, with_context, sort_results) = if let Some(config) = config { - ( - Some(config.account_config), - config.filters.unwrap_or_default(), - config.with_context.unwrap_or_default(), - config.sort_results.unwrap_or(true), - ) - } else { - (None, vec![], false, true) - }; - if filters.len() > MAX_GET_PROGRAM_ACCOUNT_FILTERS { - return Err(Error::invalid_params(format!( - "Too many filters provided; max {MAX_GET_PROGRAM_ACCOUNT_FILTERS}" - ))); - } - for filter in &filters { - verify_filter(filter)?; + async move { + let program_id = verify_pubkey(&program_id_str)?; + let (config, filters, with_context, sort_results) = if let Some(config) = config { + ( + Some(config.account_config), + config.filters.unwrap_or_default(), + config.with_context.unwrap_or_default(), + config.sort_results.unwrap_or(true), + ) + } else { + (None, vec![], false, true) + }; + if filters.len() > MAX_GET_PROGRAM_ACCOUNT_FILTERS { + return Err(Error::invalid_params(format!( + "Too many filters provided; max {MAX_GET_PROGRAM_ACCOUNT_FILTERS}" + ))); + } + for filter in &filters { + verify_filter(filter)?; + } + meta.get_program_accounts(program_id, config, filters, with_context, sort_results) + .await } - meta.get_program_accounts(&program_id, config, filters, with_context, sort_results) + .boxed() } fn get_largest_accounts( &self, meta: Self::Metadata, config: Option, - ) -> Result>> { + ) -> BoxFuture>>> { debug!("get_largest_accounts rpc request received"); - Ok(meta.get_largest_accounts(config)?) + async move { Ok(meta.get_largest_accounts(config).await?) }.boxed() } fn get_supply( &self, meta: Self::Metadata, config: Option, - ) -> Result> { + ) -> BoxFuture>> { debug!("get_supply rpc request received"); - Ok(meta.get_supply(config)?) + async move { Ok(meta.get_supply(config).await?) }.boxed() } fn get_token_largest_accounts( @@ -3257,13 +3433,16 @@ pub mod rpc_accounts_scan { meta: Self::Metadata, mint_str: String, commitment: Option, - ) -> Result>> { + ) -> BoxFuture>>> { debug!( "get_token_largest_accounts rpc request received: {:?}", mint_str ); - let mint = verify_pubkey(&mint_str)?; - meta.get_token_largest_accounts(&mint, commitment) + async move { + let mint = verify_pubkey(&mint_str)?; + meta.get_token_largest_accounts(mint, commitment).await + } + .boxed() } fn get_token_accounts_by_owner( @@ -3272,14 +3451,18 @@ pub mod rpc_accounts_scan { owner_str: String, token_account_filter: RpcTokenAccountsFilter, config: Option, - ) -> Result>> { + ) -> BoxFuture>>> { debug!( "get_token_accounts_by_owner rpc request received: {:?}", owner_str ); - let owner = verify_pubkey(&owner_str)?; - let token_account_filter = verify_token_account_filter(token_account_filter)?; - meta.get_token_accounts_by_owner(&owner, token_account_filter, config, true) + async move { + let owner = verify_pubkey(&owner_str)?; + let token_account_filter = verify_token_account_filter(token_account_filter)?; + meta.get_token_accounts_by_owner(owner, token_account_filter, config, true) + .await + } + .boxed() } fn get_token_accounts_by_delegate( @@ -3288,14 +3471,18 @@ pub mod rpc_accounts_scan { delegate_str: String, token_account_filter: RpcTokenAccountsFilter, config: Option, - ) -> Result>> { + ) -> BoxFuture>>> { debug!( "get_token_accounts_by_delegate rpc request received: {:?}", delegate_str ); - let delegate = verify_pubkey(&delegate_str)?; - let token_account_filter = verify_token_account_filter(token_account_filter)?; - meta.get_token_accounts_by_delegate(&delegate, token_account_filter, config, true) + async move { + let delegate = verify_pubkey(&delegate_str)?; + let token_account_filter = verify_token_account_filter(token_account_filter)?; + meta.get_token_accounts_by_delegate(delegate, token_account_filter, config, true) + .await + } + .boxed() } } } @@ -4335,6 +4522,7 @@ pub mod tests { optimistically_confirmed_bank_tracker::{ BankNotification, OptimisticallyConfirmedBankTracker, }, + rpc_service::service_runtime, rpc_subscriptions::RpcSubscriptions, }, bincode::deserialize, @@ -4513,6 +4701,12 @@ pub mod tests { let optimistically_confirmed_bank = OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks); + let JsonRpcConfig { + rpc_threads, + rpc_blocking_threads, + rpc_niceness_adj, + .. + } = config; let meta = JsonRpcRequestProcessor::new( config, None, @@ -4531,6 +4725,7 @@ pub mod tests { max_complete_transaction_status_slot.clone(), max_complete_rewards_slot, Arc::new(PrioritizationFeeCache::default()), + service_runtime(rpc_threads, rpc_blocking_threads, rpc_niceness_adj), ) .0; @@ -6479,8 +6674,15 @@ pub mod tests { .my_contact_info() .tpu(connection_cache.protocol()) .unwrap(); + let config = JsonRpcConfig::default(); + let JsonRpcConfig { + rpc_threads, + rpc_blocking_threads, + rpc_niceness_adj, + .. + } = config; let (meta, receiver) = JsonRpcRequestProcessor::new( - JsonRpcConfig::default(), + config, None, bank_forks.clone(), block_commitment_cache, @@ -6497,6 +6699,7 @@ pub mod tests { Arc::new(AtomicU64::default()), Arc::new(AtomicU64::default()), Arc::new(PrioritizationFeeCache::default()), + service_runtime(rpc_threads, rpc_blocking_threads, rpc_niceness_adj), ); let client = ConnectionCacheClient::::new( connection_cache.clone(), @@ -6751,8 +6954,15 @@ pub mod tests { .unwrap(); let optimistically_confirmed_bank = OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks); + let config = JsonRpcConfig::default(); + let JsonRpcConfig { + rpc_threads, + rpc_blocking_threads, + rpc_niceness_adj, + .. + } = config; let (request_processor, receiver) = JsonRpcRequestProcessor::new( - JsonRpcConfig::default(), + config, None, bank_forks.clone(), block_commitment_cache, @@ -6769,6 +6979,7 @@ pub mod tests { Arc::new(AtomicU64::default()), Arc::new(AtomicU64::default()), Arc::new(PrioritizationFeeCache::default()), + service_runtime(rpc_threads, rpc_blocking_threads, rpc_niceness_adj), ); let client = ConnectionCacheClient::::new( connection_cache.clone(), @@ -8400,8 +8611,15 @@ pub mod tests { optimistically_confirmed_bank.clone(), )); + let config = JsonRpcConfig::default(); + let JsonRpcConfig { + rpc_threads, + rpc_blocking_threads, + rpc_niceness_adj, + .. + } = config; let (meta, _receiver) = JsonRpcRequestProcessor::new( - JsonRpcConfig::default(), + config, None, bank_forks.clone(), block_commitment_cache, @@ -8418,6 +8636,7 @@ pub mod tests { max_complete_transaction_status_slot, max_complete_rewards_slot, Arc::new(PrioritizationFeeCache::default()), + service_runtime(rpc_threads, rpc_blocking_threads, rpc_niceness_adj), ); let mut io = MetaIoHandler::default(); diff --git a/rpc/src/rpc_service.rs b/rpc/src/rpc_service.rs index 2c582be72ea5a6..78dab869ecaee6 100644 --- a/rpc/src/rpc_service.rs +++ b/rpc/src/rpc_service.rs @@ -362,6 +362,7 @@ impl JsonRpcService { info!("rpc bound to {:?}", rpc_addr); info!("rpc configuration: {:?}", config); let rpc_threads = 1.max(config.rpc_threads); + let rpc_blocking_threads = 1.max(config.rpc_blocking_threads); let rpc_niceness_adj = config.rpc_niceness_adj; let health = Arc::new(RpcHealth::new( @@ -381,21 +382,7 @@ impl JsonRpcService { .tpu(connection_cache.protocol()) .map_err(|err| format!("{err}"))?; - // sadly, some parts of our current rpc implemention block the jsonrpc's - // _socket-listening_ event loop for too long, due to (blocking) long IO or intesive CPU, - // causing no further processing of incoming requests and ultimatily innocent clients timing-out. - // So create a (shared) multi-threaded event_loop for jsonrpc and set its .threads() to 1, - // so that we avoid the single-threaded event loops from being created automatically by - // jsonrpc for threads when .threads(N > 1) is given. - let runtime = Arc::new( - tokio::runtime::Builder::new_multi_thread() - .worker_threads(rpc_threads) - .on_thread_start(move || renice_this_thread(rpc_niceness_adj).unwrap()) - .thread_name("solRpcEl") - .enable_all() - .build() - .expect("Runtime"), - ); + let runtime = service_runtime(rpc_threads, rpc_blocking_threads, rpc_niceness_adj); let exit_bigtable_ledger_upload_service = Arc::new(AtomicBool::new(false)); @@ -473,6 +460,7 @@ impl JsonRpcService { max_complete_transaction_status_slot, max_complete_rewards_slot, prioritization_fee_cache, + Arc::clone(&runtime), ); let leader_info = @@ -586,6 +574,40 @@ impl JsonRpcService { } } +pub fn service_runtime( + rpc_threads: usize, + rpc_blocking_threads: usize, + rpc_niceness_adj: i8, +) -> Arc { + // The jsonrpc_http_server crate supports two execution models: + // + // - By default, it spawns a number of threads - configured with .threads(N) - and runs a + // single-threaded futures executor in each thread. + // - Alternatively when configured with .event_loop_executor(executor) and .threads(1), + // it executes all the tasks on the given executor, not spawning any extra internal threads. + // + // We use the latter configuration, using a multi threaded tokio runtime as the executor. We + // do this so we can configure the number of worker threads, the number of blocking threads + // and then use tokio::task::spawn_blocking() to avoid blocking the worker threads on CPU + // bound operations like getMultipleAccounts. This results in reduced latency, since fast + // rpc calls (the majority) are not blocked by slow CPU bound ones. + // + // NB: `rpc_blocking_threads` shouldn't be set too high (defaults to num_cpus / 2). Too many + // (busy) blocking threads could compete with CPU time with other validator threads and + // negatively impact performance. + let runtime = Arc::new( + tokio::runtime::Builder::new_multi_thread() + .worker_threads(rpc_threads) + .max_blocking_threads(rpc_blocking_threads) + .on_thread_start(move || renice_this_thread(rpc_niceness_adj).unwrap()) + .thread_name("solRpcEl") + .enable_all() + .build() + .expect("Runtime"), + ); + runtime +} + #[cfg(test)] mod tests { use { diff --git a/validator/src/cli.rs b/validator/src/cli.rs index aed7a3bffc1d9f..1cdaf02b49dc24 100644 --- a/validator/src/cli.rs +++ b/validator/src/cli.rs @@ -959,6 +959,27 @@ pub fn app<'a>(version: &'a str, default_args: &'a DefaultArgs) -> App<'a, 'a> { .default_value(&default_args.rpc_threads) .help("Number of threads to use for servicing RPC requests"), ) + .arg( + Arg::with_name("rpc_blocking_threads") + .long("rpc-blocking-threads") + .value_name("NUMBER") + .validator(is_parsable::) + .validator(|value| { + value + .parse::() + .map_err(|err| format!("error parsing '{value}': {err}")) + .and_then(|threads| { + if threads > 0 { + Ok(()) + } else { + Err("value must be >= 1".to_string()) + } + }) + }) + .takes_value(true) + .default_value(&default_args.rpc_blocking_threads) + .help("Number of blocking threads to use for servicing CPU bound RPC requests (eg getMultipleAccounts)"), + ) .arg( Arg::with_name("rpc_niceness_adj") .long("rpc-niceness-adjustment") @@ -2270,6 +2291,7 @@ pub struct DefaultArgs { pub rpc_send_transaction_batch_size: String, pub rpc_send_transaction_retry_pool_max_size: String, pub rpc_threads: String, + pub rpc_blocking_threads: String, pub rpc_niceness_adjustment: String, pub rpc_bigtable_timeout: String, pub rpc_bigtable_instance_name: String, @@ -2362,6 +2384,7 @@ impl DefaultArgs { .retry_pool_max_size .to_string(), rpc_threads: num_cpus::get().to_string(), + rpc_blocking_threads: 1.max(num_cpus::get() / 4).to_string(), rpc_niceness_adjustment: "0".to_string(), rpc_bigtable_timeout: "30".to_string(), rpc_bigtable_instance_name: solana_storage_bigtable::DEFAULT_INSTANCE_NAME.to_string(), diff --git a/validator/src/main.rs b/validator/src/main.rs index a7de615b3be9ac..cecf9f873d3019 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -1492,6 +1492,7 @@ pub fn main() { ), disable_health_check: false, rpc_threads: value_t_or_exit!(matches, "rpc_threads", usize), + rpc_blocking_threads: value_t_or_exit!(matches, "rpc_blocking_threads", usize), rpc_niceness_adj: value_t_or_exit!(matches, "rpc_niceness_adj", i8), account_indexes: account_indexes.clone(), rpc_scan_and_fix_roots: matches.is_present("rpc_scan_and_fix_roots"),