From 6e81724c9ec713f14ba5a95a898a663057e06343 Mon Sep 17 00:00:00 2001 From: Ivan Kalinin Date: Fri, 31 May 2024 16:44:12 +0200 Subject: [PATCH 1/3] feat(transport): use `getBlockchainConfig` for jrpc and proto --- nekoton-proto/src/protos/rpc.proto | 3 +- nekoton-proto/src/protos/rpc.rs | 2 ++ src/transport/gql/mod.rs | 22 +++++++++--- src/transport/jrpc/mod.rs | 20 +++++++++-- src/transport/jrpc/models.rs | 9 +++++ src/transport/proto/mod.rs | 30 ++++++++++++++-- src/transport/utils.rs | 58 ++++++++++++++---------------- 7 files changed, 104 insertions(+), 40 deletions(-) diff --git a/nekoton-proto/src/protos/rpc.proto b/nekoton-proto/src/protos/rpc.proto index ee75dfeb2..4b64ef298 100644 --- a/nekoton-proto/src/protos/rpc.proto +++ b/nekoton-proto/src/protos/rpc.proto @@ -83,7 +83,8 @@ message Response { message GetBlockchainConfig { int32 global_id = 1; - bytes config = 2; + bytes config = 2; + uint32 seqno = 3; } message GetAccountsByCodeHash { diff --git a/nekoton-proto/src/protos/rpc.rs b/nekoton-proto/src/protos/rpc.rs index 1851b1e0d..bcf308c06 100644 --- a/nekoton-proto/src/protos/rpc.rs +++ b/nekoton-proto/src/protos/rpc.rs @@ -141,6 +141,8 @@ pub mod response { pub global_id: i32, #[prost(bytes = "bytes", tag = "2")] pub config: ::prost::bytes::Bytes, + #[prost(uint32, tag = "3")] + pub seqno: u32, } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] diff --git a/src/transport/gql/mod.rs b/src/transport/gql/mod.rs index 17790133f..d849d802e 100644 --- a/src/transport/gql/mod.rs +++ b/src/transport/gql/mod.rs @@ -2,7 +2,7 @@ use std::str::FromStr; use std::sync::Arc; use std::time::Duration; -use anyhow::Result; +use anyhow::{Context, Result}; use serde::Deserialize; use ton_block::{Account, Deserializable, Message, MsgAddressInt, Serializable}; @@ -14,7 +14,7 @@ use crate::external::{GqlConnection, GqlRequest}; use self::queries::*; use super::models::*; -use super::utils::ConfigCache; +use super::utils::{ConfigCache, ConfigResponse}; use super::{Transport, TransportInfo}; mod queries; @@ -173,6 +173,20 @@ impl GqlTransport { Ok(block_id) } + + async fn fetch_config(&self) -> Result { + let block = self.get_latest_key_block().await?; + let seqno = block.info.read_struct()?.seq_no(); + let extra = block.read_extra()?; + let master = extra.read_custom()?.context("invalid key block")?; + let config = master.config().context("invalid key block")?.clone(); + + Ok(ConfigResponse { + global_id: block.global_id, + seqno, + config, + }) + } } #[cfg_attr(not(feature = "non_threadsafe"), async_trait::async_trait)] @@ -356,7 +370,7 @@ impl Transport for GqlTransport { async fn get_capabilities(&self, clock: &dyn Clock) -> Result { let (capabilities, _) = self .config_cache - .get_blockchain_config(self, clock, false) + .get_blockchain_config(clock, false, || self.fetch_config()) .await?; Ok(capabilities) } @@ -368,7 +382,7 @@ impl Transport for GqlTransport { ) -> Result { let (_, config) = self .config_cache - .get_blockchain_config(self, clock, force) + .get_blockchain_config(clock, force, || self.fetch_config()) .await?; Ok(config) } diff --git a/src/transport/jrpc/mod.rs b/src/transport/jrpc/mod.rs index 1e86bce5a..c152d82cd 100644 --- a/src/transport/jrpc/mod.rs +++ b/src/transport/jrpc/mod.rs @@ -31,6 +31,22 @@ impl JrpcTransport { accounts_cache: AccountsCache::new(), } } + + async fn fetch_config(&self) -> Result { + let req = external::JrpcRequest { + data: make_jrpc_request("getBlockchainConfig", &()), + requires_db: true, + }; + self.connection + .post(req) + .await + .map(|data| tiny_jsonrpc::parse_response(&data))? + .map(|block: GetBlockchainConfigResponse| ConfigResponse { + global_id: block.global_id, + seqno: block.seqno, + config: block.config, + }) + } } #[cfg_attr(not(feature = "non_threadsafe"), async_trait::async_trait)] @@ -198,7 +214,7 @@ impl Transport for JrpcTransport { async fn get_capabilities(&self, clock: &dyn Clock) -> Result { let (capabilities, _) = self .config_cache - .get_blockchain_config(self, clock, false) + .get_blockchain_config(clock, false, || self.fetch_config()) .await?; Ok(capabilities) } @@ -210,7 +226,7 @@ impl Transport for JrpcTransport { ) -> Result { let (_, config) = self .config_cache - .get_blockchain_config(self, clock, force) + .get_blockchain_config(clock, force, || self.fetch_config()) .await?; Ok(config) } diff --git a/src/transport/jrpc/models.rs b/src/transport/jrpc/models.rs index a6b05d758..35e02ce02 100644 --- a/src/transport/jrpc/models.rs +++ b/src/transport/jrpc/models.rs @@ -60,3 +60,12 @@ pub struct GetBlockResponse { #[serde(with = "serde_ton_block")] pub block: ton_block::Block, } + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct GetBlockchainConfigResponse { + pub global_id: i32, + pub seqno: u32, + #[serde(with = "serde_ton_block")] + pub config: ton_block::ConfigParams, +} diff --git a/src/transport/proto/mod.rs b/src/transport/proto/mod.rs index c38d3ca86..a4494292c 100644 --- a/src/transport/proto/mod.rs +++ b/src/transport/proto/mod.rs @@ -31,6 +31,32 @@ impl ProtoTransport { accounts_cache: AccountsCache::new(), } } + + async fn fetch_config(&self) -> Result { + let data = rpc::Request { + call: Some(rpc::request::Call::GetBlockchainConfig(())), + }; + + let req = external::ProtoRequest { + data: data.encode_to_vec(), + requires_db: true, + }; + + let data = self.connection.post(req).await?; + let response = rpc::Response::decode(Bytes::from(data))?; + + match response.result { + Some(rpc::response::Result::GetBlockchainConfig(res)) => { + let params = ton_block::ConfigParams::construct_from_bytes(res.config.as_ref())?; + Ok(ConfigResponse { + global_id: res.global_id, + seqno: res.seqno, + config: params, + }) + } + _ => Err(ProtoClientError::InvalidResponse.into()), + } + } } #[cfg_attr(not(feature = "non_threadsafe"), async_trait::async_trait)] @@ -289,7 +315,7 @@ impl Transport for ProtoTransport { async fn get_capabilities(&self, clock: &dyn Clock) -> Result { let (capabilities, _) = self .config_cache - .get_blockchain_config(self, clock, false) + .get_blockchain_config(clock, false, || self.fetch_config()) .await?; Ok(capabilities) } @@ -301,7 +327,7 @@ impl Transport for ProtoTransport { ) -> Result { let (_, config) = self .config_cache - .get_blockchain_config(self, clock, force) + .get_blockchain_config(clock, force, || self.fetch_config()) .await?; Ok(config) } diff --git a/src/transport/utils.rs b/src/transport/utils.rs index f6f3fd1f4..d0dc1dabd 100644 --- a/src/transport/utils.rs +++ b/src/transport/utils.rs @@ -1,13 +1,13 @@ use std::sync::Arc; use anyhow::Result; +use futures_util::Future; use quick_cache::sync::Cache as QuickCache; use tokio::sync::Mutex; use nekoton_utils::*; use super::models::RawContractState; -use super::Transport; use crate::core::models::NetworkCapabilities; #[allow(unused)] @@ -79,19 +79,23 @@ impl ConfigCache { } } - pub async fn get_blockchain_config( + pub async fn get_blockchain_config( &self, - transport: &dyn Transport, clock: &dyn Clock, force: bool, - ) -> Result<(NetworkCapabilities, ton_executor::BlockchainConfig)> { + f: F, + ) -> Result<(NetworkCapabilities, ton_executor::BlockchainConfig)> + where + F: FnOnce() -> Fut, + Fut: Future>, + { let mut cache = self.state.lock().await; let now = clock.now_sec_u64() as u32; Ok(match &*cache { None => { - let (capabilities, config, key_block_seqno) = fetch_config(transport).await?; + let (capabilities, config, key_block_seqno) = fetch_config(f).await?; let phase = compute_next_phase(now, &config, None, key_block_seqno)?; *cache = Some(ConfigCacheState { capabilities, @@ -102,7 +106,7 @@ impl ConfigCache { (capabilities, config) } Some(a) if force && !self.use_default_config || cache_expired(now, a.phase) => { - let (capabilities, config, key_block_seqno) = fetch_config(transport).await?; + let (capabilities, config, key_block_seqno) = fetch_config(f).await?; let phase = compute_next_phase( now, &config, @@ -122,36 +126,30 @@ impl ConfigCache { } } -async fn fetch_config( - transport: &dyn Transport, -) -> Result<(NetworkCapabilities, ton_executor::BlockchainConfig, u32)> { - let block = transport.get_latest_key_block().await?; +async fn fetch_config( + f: F, +) -> Result<(NetworkCapabilities, ton_executor::BlockchainConfig, u32)> +where + F: FnOnce() -> Fut, + Fut: Future>, +{ + let res = f().await?; - let info = block.info.read_struct()?; - - let extra = block - .read_extra() - .map_err(|_| QueryConfigError::InvalidBlock)?; - - let master = extra - .read_custom() - .map_err(|_| QueryConfigError::InvalidBlock)? - .ok_or(QueryConfigError::InvalidBlock)?; - - let params = master - .config() - .ok_or(QueryConfigError::InvalidBlock)? - .clone(); - - let config = ton_executor::BlockchainConfig::with_config(params, block.global_id) + let config = ton_executor::BlockchainConfig::with_config(res.config, res.global_id) .map_err(|_| QueryConfigError::InvalidConfig)?; let capabilities = NetworkCapabilities { - global_id: block.global_id, + global_id: res.global_id, raw: config.capabilites(), }; - Ok((capabilities, config, info.seq_no())) + Ok((capabilities, config, res.seqno)) +} + +pub struct ConfigResponse { + pub global_id: i32, + pub seqno: u32, + pub config: ton_block::ConfigParams, } fn compute_next_phase( @@ -203,8 +201,6 @@ enum ConfigCachePhase { #[derive(thiserror::Error, Debug)] enum QueryConfigError { - #[error("Invalid block")] - InvalidBlock, #[error("Invalid config")] InvalidConfig, } From 244418f18afa13f72236cfa034116db6e6c9c694 Mon Sep 17 00:00:00 2001 From: Ivan Kalinin Date: Tue, 11 Jun 2024 23:00:22 +0200 Subject: [PATCH 2/3] feat(transport): add `http2_prior_knowledge` to client --- nekoton-transport/src/jrpc.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/nekoton-transport/src/jrpc.rs b/nekoton-transport/src/jrpc.rs index 5d089a445..ea3b853de 100644 --- a/nekoton-transport/src/jrpc.rs +++ b/nekoton-transport/src/jrpc.rs @@ -20,6 +20,7 @@ impl JrpcClient { ); let client = reqwest::ClientBuilder::new() + .http2_prior_knowledge() .default_headers(headers) .build() .context("failed to build http client")?; From 04664ade15ebe4c59ed889f6043a37c1a07f3f69 Mon Sep 17 00:00:00 2001 From: Ivan Kalinin Date: Thu, 27 Jun 2024 18:12:23 +0200 Subject: [PATCH 3/3] feat(core): force config cache for 1m --- src/transport/utils.rs | 33 ++++++++++++++++++++++++--------- 1 file changed, 24 insertions(+), 9 deletions(-) diff --git a/src/transport/utils.rs b/src/transport/utils.rs index d0dc1dabd..1d0762fdb 100644 --- a/src/transport/utils.rs +++ b/src/transport/utils.rs @@ -56,13 +56,18 @@ impl AccountsCache { pub struct ConfigCache { use_default_config: bool, + min_cache_for: Option, state: Mutex>, } impl ConfigCache { pub fn new(use_default_config: bool) -> Self { + // TODO: Move to params or connection config + const MIN_CACHE_FOR: u32 = 60; + Self { use_default_config, + min_cache_for: Some(MIN_CACHE_FOR), state: Mutex::new(if use_default_config { Some(ConfigCacheState { capabilities: NetworkCapabilities { @@ -71,6 +76,7 @@ impl ConfigCache { }, config: ton_executor::BlockchainConfig::default(), last_key_block_seqno: 0, + updated_at: 0, phase: ConfigCachePhase::WainingNextValidatorsSet { deadline: u32::MAX }, }) } else { @@ -101,11 +107,12 @@ impl ConfigCache { capabilities, config: config.clone(), last_key_block_seqno: key_block_seqno, + updated_at: now, phase, }); (capabilities, config) } - Some(a) if force && !self.use_default_config || cache_expired(now, a.phase) => { + Some(a) if force && !self.use_default_config || self.cache_expired(now, a) => { let (capabilities, config, key_block_seqno) = fetch_config(f).await?; let phase = compute_next_phase( now, @@ -117,6 +124,7 @@ impl ConfigCache { capabilities, config: config.clone(), last_key_block_seqno: key_block_seqno, + updated_at: now, phase, }); (capabilities, config) @@ -124,6 +132,20 @@ impl ConfigCache { Some(a) => (a.capabilities, a.config.clone()), }) } + + fn cache_expired(&self, now: u32, state: &ConfigCacheState) -> bool { + if let Some(min_cache_for) = self.min_cache_for { + if now <= state.updated_at.saturating_add(min_cache_for) { + return false; + } + } + + match state.phase { + ConfigCachePhase::WaitingKeyBlock => true, + ConfigCachePhase::WaitingElectionsEnd { deadline } + | ConfigCachePhase::WainingNextValidatorsSet { deadline } => now > deadline, + } + } } async fn fetch_config( @@ -177,18 +199,11 @@ fn compute_next_phase( } } -fn cache_expired(now: u32, phase: ConfigCachePhase) -> bool { - match phase { - ConfigCachePhase::WaitingKeyBlock => true, - ConfigCachePhase::WaitingElectionsEnd { deadline } - | ConfigCachePhase::WainingNextValidatorsSet { deadline } => now > deadline, - } -} - struct ConfigCacheState { capabilities: NetworkCapabilities, config: ton_executor::BlockchainConfig, last_key_block_seqno: u32, + updated_at: u32, phase: ConfigCachePhase, }