Skip to content

Commit

Permalink
Merge branch 'feature/get-blockchain-config'
Browse files Browse the repository at this point in the history
  • Loading branch information
Rexagon committed Sep 6, 2024
2 parents 63e4dfc + 04664ad commit 084d353
Show file tree
Hide file tree
Showing 8 changed files with 129 additions and 48 deletions.
3 changes: 2 additions & 1 deletion nekoton-proto/src/protos/rpc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,8 @@ message Response {

message GetBlockchainConfig {
int32 global_id = 1;
bytes config = 2;
bytes config = 2;
uint32 seqno = 3;
}

message GetAccountsByCodeHash {
Expand Down
2 changes: 2 additions & 0 deletions nekoton-proto/src/protos/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,8 @@ pub mod response {
pub global_id: i32,
#[prost(bytes = "bytes", tag = "2")]
pub config: ::prost::bytes::Bytes,
#[prost(uint32, tag = "3")]
pub seqno: u32,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
Expand Down
1 change: 1 addition & 0 deletions nekoton-transport/src/jrpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ impl JrpcClient {
);

let client = reqwest::ClientBuilder::new()
.http2_prior_knowledge()
.default_headers(headers)
.build()
.context("failed to build http client")?;
Expand Down
22 changes: 18 additions & 4 deletions src/transport/gql/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;

use anyhow::Result;
use anyhow::{Context, Result};
use serde::Deserialize;
use ton_block::{Account, Deserializable, Message, MsgAddressInt, Serializable};

Expand All @@ -14,7 +14,7 @@ use crate::external::{GqlConnection, GqlRequest};

use self::queries::*;
use super::models::*;
use super::utils::ConfigCache;
use super::utils::{ConfigCache, ConfigResponse};
use super::{Transport, TransportInfo};

mod queries;
Expand Down Expand Up @@ -173,6 +173,20 @@ impl GqlTransport {

Ok(block_id)
}

async fn fetch_config(&self) -> Result<ConfigResponse> {
let block = self.get_latest_key_block().await?;
let seqno = block.info.read_struct()?.seq_no();
let extra = block.read_extra()?;
let master = extra.read_custom()?.context("invalid key block")?;
let config = master.config().context("invalid key block")?.clone();

Ok(ConfigResponse {
global_id: block.global_id,
seqno,
config,
})
}
}

#[cfg_attr(not(feature = "non_threadsafe"), async_trait::async_trait)]
Expand Down Expand Up @@ -356,7 +370,7 @@ impl Transport for GqlTransport {
async fn get_capabilities(&self, clock: &dyn Clock) -> Result<NetworkCapabilities> {
let (capabilities, _) = self
.config_cache
.get_blockchain_config(self, clock, false)
.get_blockchain_config(clock, false, || self.fetch_config())
.await?;
Ok(capabilities)
}
Expand All @@ -368,7 +382,7 @@ impl Transport for GqlTransport {
) -> Result<ton_executor::BlockchainConfig> {
let (_, config) = self
.config_cache
.get_blockchain_config(self, clock, force)
.get_blockchain_config(clock, force, || self.fetch_config())
.await?;
Ok(config)
}
Expand Down
20 changes: 18 additions & 2 deletions src/transport/jrpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,22 @@ impl JrpcTransport {
accounts_cache: AccountsCache::new(),
}
}

async fn fetch_config(&self) -> Result<ConfigResponse> {
let req = external::JrpcRequest {
data: make_jrpc_request("getBlockchainConfig", &()),
requires_db: true,
};
self.connection
.post(req)
.await
.map(|data| tiny_jsonrpc::parse_response(&data))?
.map(|block: GetBlockchainConfigResponse| ConfigResponse {
global_id: block.global_id,
seqno: block.seqno,
config: block.config,
})
}
}

#[cfg_attr(not(feature = "non_threadsafe"), async_trait::async_trait)]
Expand Down Expand Up @@ -198,7 +214,7 @@ impl Transport for JrpcTransport {
async fn get_capabilities(&self, clock: &dyn Clock) -> Result<NetworkCapabilities> {
let (capabilities, _) = self
.config_cache
.get_blockchain_config(self, clock, false)
.get_blockchain_config(clock, false, || self.fetch_config())
.await?;
Ok(capabilities)
}
Expand All @@ -210,7 +226,7 @@ impl Transport for JrpcTransport {
) -> Result<ton_executor::BlockchainConfig> {
let (_, config) = self
.config_cache
.get_blockchain_config(self, clock, force)
.get_blockchain_config(clock, force, || self.fetch_config())
.await?;
Ok(config)
}
Expand Down
10 changes: 10 additions & 0 deletions src/transport/jrpc/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,13 @@ pub struct GetBlockResponse {
#[serde(with = "serde_ton_block")]
pub block: ton_block::Block,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct GetBlockchainConfigResponse {
pub global_id: i32,
#[serde(default)]
pub seqno: u32,
#[serde(with = "serde_ton_block")]
pub config: ton_block::ConfigParams,
}
30 changes: 28 additions & 2 deletions src/transport/proto/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,32 @@ impl ProtoTransport {
accounts_cache: AccountsCache::new(),
}
}

async fn fetch_config(&self) -> Result<ConfigResponse> {
let data = rpc::Request {
call: Some(rpc::request::Call::GetBlockchainConfig(())),
};

let req = external::ProtoRequest {
data: data.encode_to_vec(),
requires_db: true,
};

let data = self.connection.post(req).await?;
let response = rpc::Response::decode(Bytes::from(data))?;

match response.result {
Some(rpc::response::Result::GetBlockchainConfig(res)) => {
let params = ton_block::ConfigParams::construct_from_bytes(res.config.as_ref())?;
Ok(ConfigResponse {
global_id: res.global_id,
seqno: res.seqno,
config: params,
})
}
_ => Err(ProtoClientError::InvalidResponse.into()),
}
}
}

#[cfg_attr(not(feature = "non_threadsafe"), async_trait::async_trait)]
Expand Down Expand Up @@ -289,7 +315,7 @@ impl Transport for ProtoTransport {
async fn get_capabilities(&self, clock: &dyn Clock) -> Result<NetworkCapabilities> {
let (capabilities, _) = self
.config_cache
.get_blockchain_config(self, clock, false)
.get_blockchain_config(clock, false, || self.fetch_config())
.await?;
Ok(capabilities)
}
Expand All @@ -301,7 +327,7 @@ impl Transport for ProtoTransport {
) -> Result<ton_executor::BlockchainConfig> {
let (_, config) = self
.config_cache
.get_blockchain_config(self, clock, force)
.get_blockchain_config(clock, force, || self.fetch_config())
.await?;
Ok(config)
}
Expand Down
89 changes: 50 additions & 39 deletions src/transport/utils.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
use std::sync::Arc;

use anyhow::Result;
use futures_util::Future;
use quick_cache::sync::Cache as QuickCache;
use tokio::sync::Mutex;

use nekoton_utils::*;

use super::models::RawContractState;
use super::Transport;
use crate::core::models::NetworkCapabilities;

#[allow(unused)]
Expand Down Expand Up @@ -56,13 +56,18 @@ impl AccountsCache {

pub struct ConfigCache {
use_default_config: bool,
min_cache_for: Option<u32>,
state: Mutex<Option<ConfigCacheState>>,
}

impl ConfigCache {
pub fn new(use_default_config: bool) -> Self {
// TODO: Move to params or connection config
const MIN_CACHE_FOR: u32 = 60;

Self {
use_default_config,
min_cache_for: Some(MIN_CACHE_FOR),
state: Mutex::new(if use_default_config {
Some(ConfigCacheState {
capabilities: NetworkCapabilities {
Expand All @@ -71,6 +76,7 @@ impl ConfigCache {
},
config: ton_executor::BlockchainConfig::default(),
last_key_block_seqno: 0,
updated_at: 0,
phase: ConfigCachePhase::WainingNextValidatorsSet { deadline: u32::MAX },
})
} else {
Expand All @@ -79,30 +85,35 @@ impl ConfigCache {
}
}

pub async fn get_blockchain_config(
pub async fn get_blockchain_config<F, Fut>(
&self,
transport: &dyn Transport,
clock: &dyn Clock,
force: bool,
) -> Result<(NetworkCapabilities, ton_executor::BlockchainConfig)> {
f: F,
) -> Result<(NetworkCapabilities, ton_executor::BlockchainConfig)>
where
F: FnOnce() -> Fut,
Fut: Future<Output = Result<ConfigResponse>>,
{
let mut cache = self.state.lock().await;

let now = clock.now_sec_u64() as u32;

Ok(match &*cache {
None => {
let (capabilities, config, key_block_seqno) = fetch_config(transport).await?;
let (capabilities, config, key_block_seqno) = fetch_config(f).await?;
let phase = compute_next_phase(now, &config, None, key_block_seqno)?;
*cache = Some(ConfigCacheState {
capabilities,
config: config.clone(),
last_key_block_seqno: key_block_seqno,
updated_at: now,
phase,
});
(capabilities, config)
}
Some(a) if force && !self.use_default_config || cache_expired(now, a.phase) => {
let (capabilities, config, key_block_seqno) = fetch_config(transport).await?;
Some(a) if force && !self.use_default_config || self.cache_expired(now, a) => {
let (capabilities, config, key_block_seqno) = fetch_config(f).await?;
let phase = compute_next_phase(
now,
&config,
Expand All @@ -113,45 +124,54 @@ impl ConfigCache {
capabilities,
config: config.clone(),
last_key_block_seqno: key_block_seqno,
updated_at: now,
phase,
});
(capabilities, config)
}
Some(a) => (a.capabilities, a.config.clone()),
})
}
}

async fn fetch_config(
transport: &dyn Transport,
) -> Result<(NetworkCapabilities, ton_executor::BlockchainConfig, u32)> {
let block = transport.get_latest_key_block().await?;

let info = block.info.read_struct()?;

let extra = block
.read_extra()
.map_err(|_| QueryConfigError::InvalidBlock)?;
fn cache_expired(&self, now: u32, state: &ConfigCacheState) -> bool {
if let Some(min_cache_for) = self.min_cache_for {
if now <= state.updated_at.saturating_add(min_cache_for) {
return false;
}
}

let master = extra
.read_custom()
.map_err(|_| QueryConfigError::InvalidBlock)?
.ok_or(QueryConfigError::InvalidBlock)?;
match state.phase {
ConfigCachePhase::WaitingKeyBlock => true,
ConfigCachePhase::WaitingElectionsEnd { deadline }
| ConfigCachePhase::WainingNextValidatorsSet { deadline } => now > deadline,
}
}
}

let params = master
.config()
.ok_or(QueryConfigError::InvalidBlock)?
.clone();
async fn fetch_config<F, Fut>(
f: F,
) -> Result<(NetworkCapabilities, ton_executor::BlockchainConfig, u32)>
where
F: FnOnce() -> Fut,
Fut: Future<Output = Result<ConfigResponse>>,
{
let res = f().await?;

let config = ton_executor::BlockchainConfig::with_config(params, block.global_id)
let config = ton_executor::BlockchainConfig::with_config(res.config, res.global_id)
.map_err(|_| QueryConfigError::InvalidConfig)?;

let capabilities = NetworkCapabilities {
global_id: block.global_id,
global_id: res.global_id,
raw: config.capabilites(),
};

Ok((capabilities, config, info.seq_no()))
Ok((capabilities, config, res.seqno))
}

pub struct ConfigResponse {
pub global_id: i32,
pub seqno: u32,
pub config: ton_block::ConfigParams,
}

fn compute_next_phase(
Expand Down Expand Up @@ -179,18 +199,11 @@ fn compute_next_phase(
}
}

fn cache_expired(now: u32, phase: ConfigCachePhase) -> bool {
match phase {
ConfigCachePhase::WaitingKeyBlock => true,
ConfigCachePhase::WaitingElectionsEnd { deadline }
| ConfigCachePhase::WainingNextValidatorsSet { deadline } => now > deadline,
}
}

struct ConfigCacheState {
capabilities: NetworkCapabilities,
config: ton_executor::BlockchainConfig,
last_key_block_seqno: u32,
updated_at: u32,
phase: ConfigCachePhase,
}

Expand All @@ -203,8 +216,6 @@ enum ConfigCachePhase {

#[derive(thiserror::Error, Debug)]
enum QueryConfigError {
#[error("Invalid block")]
InvalidBlock,
#[error("Invalid config")]
InvalidConfig,
}

0 comments on commit 084d353

Please sign in to comment.