From 691696000e205f87ac7fdde100694831458789f5 Mon Sep 17 00:00:00 2001 From: Kirill Fomichev Date: Thu, 27 Apr 2023 11:57:06 -0300 Subject: [PATCH] bigtable: add counters --- storage-bigtable/src/bigtable.rs | 53 ++++++++++ storage-bigtable/src/lib.rs | 174 +++++++++++++++++++++++++++++++ 2 files changed, 227 insertions(+) diff --git a/storage-bigtable/src/bigtable.rs b/storage-bigtable/src/bigtable.rs index b4bfe040a3..13bba2e726 100644 --- a/storage-bigtable/src/bigtable.rs +++ b/storage-bigtable/src/bigtable.rs @@ -9,6 +9,7 @@ use { backoff::{future::retry, Error as BackoffError, ExponentialBackoff}, log::*, std::{ + collections::HashMap, str::FromStr, time::{Duration, Instant}, }, @@ -781,6 +782,31 @@ impl) -> InterceptedRequestResult> BigTable { .collect()) } + pub async fn get_bincode_cells2( + &mut self, + table: &str, + keys: &[RowKey], + ) -> Result<(HashMap>, usize)> + where + T: serde::de::DeserializeOwned, + { + let mut size = 0; + let rows = self + .get_multi_row_data(table, keys) + .await? + .into_iter() + .map(|(key, row_data)| { + size += row_data.len(); + let key_str = key.to_string(); + ( + key, + deserialize_bincode_cell_data(&row_data, table, key_str), + ) + }) + .collect(); + Ok((rows, size)) + } + pub async fn get_protobuf_cell

(&mut self, table: &str, key: RowKey) -> Result

where P: prost::Message + Default, @@ -827,6 +853,33 @@ impl) -> InterceptedRequestResult> BigTable { })) } + pub async fn get_protobuf_or_bincode_cells2<'a, B, P>( + &mut self, + table: &'a str, + row_keys: impl IntoIterator, + ) -> Result, usize)> + 'a> + where + B: serde::de::DeserializeOwned, + P: prost::Message + Default, + { + Ok(self + .get_multi_row_data( + table, + row_keys.into_iter().collect::>().as_slice(), + ) + .await? + .into_iter() + .map(|(key, row_data)| { + let size = row_data.iter().fold(0, |acc, row| acc + row.1.len()); + let key_str = key.to_string(); + ( + key, + deserialize_protobuf_or_bincode_cell_data(&row_data, table, key_str).unwrap(), + size, + ) + })) + } + pub async fn put_bincode_cells( &mut self, table: &str, diff --git a/storage-bigtable/src/lib.rs b/storage-bigtable/src/lib.rs index 3af928a626..e2701caf44 100644 --- a/storage-bigtable/src/lib.rs +++ b/storage-bigtable/src/lib.rs @@ -80,6 +80,16 @@ impl std::convert::From for Error { } } +impl Error { + pub fn is_rpc_unauthenticated(&self) -> bool { + if let Error::BigTableError(bigtable::Error::Rpc(status)) = self { + status.code() == tonic::Code::Unauthenticated + } else { + false + } + } +} + pub type Result = std::result::Result; // Convert a slot to its bucket representation whereby lower slots are always lexically ordered @@ -764,6 +774,170 @@ impl LedgerStorage { } } + // Fetches and gets a vector of confirmed blocks via a multirow fetch + pub async fn get_confirmed_blocks_with_data2<'a>( + &self, + slots: &'a [Slot], + ) -> Result, usize)> + 'a> { + debug!( + "LedgerStorage::get_confirmed_blocks_with_data request received: {:?}", + slots + ); + inc_new_counter_debug!("storage-bigtable-query", 1); + let mut bigtable = self.connection.client(); + let row_keys = slots.iter().copied().map(slot_to_blocks_key); + Ok(bigtable + .get_protobuf_or_bincode_cells2("blocks", row_keys) + .await? + .map( + |(row_key, block_cell_data, size): ( + RowKey, + bigtable::CellData, + usize, + )| { + let block = match block_cell_data { + bigtable::CellData::Bincode(block) => block.into(), + bigtable::CellData::Protobuf(block) => match block.try_into() { + Ok(block) => block, + Err(_) => return (None, size), + }, + }; + (Some((key_to_slot(&row_key).unwrap(), block)), size) + }, + )) + } + + /// Fetch blocks and transactions + pub async fn get_confirmed_blocks_transactions( + &self, + blocks: &[Slot], + transactions: &[String], + transactions_status: &[String], + ) -> Result<( + Vec<(Slot, ConfirmedBlock)>, + Vec, + HashMap, + usize, + )> { + let mut bigtable = self.connection.client(); + + let mut blocks_resp = Vec::with_capacity(blocks.len()); + let mut transactions_resp = Vec::with_capacity(transactions.len()); + let mut transactions_status_resp = HashMap::new(); + let mut size = 0; + + // Collect slots for request + let mut blocks_map: HashMap> = HashMap::new(); + for block in blocks { + blocks_map.entry(*block).or_default(); + } + + // Fetch transactions info and collect slots + if !transactions.is_empty() || !transactions_status.is_empty() { + let mut keys = Vec::with_capacity(transactions.len() + transactions_status.len()); + keys.extend(transactions.iter().cloned()); + keys.extend(transactions_status.iter().cloned()); + + let (mut cells, bt_size) = bigtable + .get_bincode_cells2::("tx", keys.as_slice()) + .await?; + size += bt_size; + + for signature in transactions_status { + if let Some(Ok(info)) = cells.get(signature) { + transactions_status_resp.insert( + signature.clone(), + TransactionStatus { + slot: info.slot, + confirmations: None, + status: match &info.err { + Some(err) => Err(err.clone()), + None => Ok(()), + }, + err: info.err.clone(), + confirmation_status: Some(TransactionConfirmationStatus::Finalized), + }, + ); + } + } + for signature in transactions { + if let Some((signature, Ok(TransactionInfo { slot, index, .. }))) = + cells.remove_entry(signature) + { + blocks_map.entry(slot).or_default().push((index, signature)); + } + } + } + + // Fetch blocks + if !blocks_map.is_empty() { + let keys = blocks_map.keys().copied().collect::>(); + let cells = self.get_confirmed_blocks_with_data2(&keys).await?; + for (maybe_slot_block, row_size) in cells { + size += row_size; + if let Some((slot, block)) = maybe_slot_block { + if let Some(entries) = blocks_map.get(&slot) { + for (index, signature) in entries.iter() { + if let Some(tx_with_meta) = block.transactions.get(*index as usize) { + if tx_with_meta.transaction_signature().to_string() != *signature { + warn!( + "Transaction info or confirmed block for {} is corrupt", + signature + ); + } else { + transactions_resp.push(ConfirmedTransactionWithStatusMeta { + slot, + tx_with_meta: tx_with_meta.clone(), + block_time: block.block_time, + }); + } + } + } + blocks_resp.push((slot, block)); + } + } + } + } + + Ok(( + blocks_resp, + transactions_resp, + transactions_status_resp, + size, + )) + } + + /// Fetch TX index for transactions + pub async fn get_txindex( + &self, + transactions: &[String], + ) -> Result<(Vec>, usize)> { + let mut bigtable = self.connection.client(); + + let mut response = Vec::with_capacity(transactions.len()); + let mut size = 0; + + // Fetch transactions info and collect slots + if transactions.is_empty() { + response.resize(transactions.len(), None); + } else { + let (cells, bt_size) = bigtable + .get_bincode_cells2::("tx", transactions) + .await?; + size += bt_size; + + for signature in transactions { + if let Some(Ok(TransactionInfo { slot, index, .. })) = cells.get(signature) { + response.push(Some((*slot, *index))); + } else { + response.push(None); + } + } + } + + Ok((response, size)) + } + /// Get confirmed signatures for the provided address, in descending ledger order /// /// address: address to search for