Skip to content

Commit

Permalink
feat: implement JsonRpc transport for batch requests
Browse files Browse the repository at this point in the history
  • Loading branch information
thetheveloper committed Jul 16, 2024
1 parent a8ee4e2 commit 681d7a4
Show file tree
Hide file tree
Showing 7 changed files with 236 additions and 1 deletion.
35 changes: 35 additions & 0 deletions examples/jsonrpc_batch.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
use starknet::{
core::types::{BlockId, BlockTag},
providers::{
jsonrpc::{HttpTransport, JsonRpcClient},
Provider, Url,
},
};
use starknet_core::types::requests::{GetBlockWithTxHashesRequestRef, GetBlockWithTxsRequestRef};
use starknet_providers::jsonrpc::{JsonRpcMethod, JsonRpcRequestParams};

#[tokio::main]
async fn main() {
let provider = JsonRpcClient::new(HttpTransport::new(
Url::parse("https://starknet-sepolia.public.blastapi.io/rpc/v0_7").unwrap(),
));

let batch_mixed_results = provider.batch_requests(vec![(JsonRpcMethod::GetBlockWithTxHashes, JsonRpcRequestParams::GetBlockWithTxHashes(GetBlockWithTxHashesRequestRef {
block_id: BlockId::Tag(BlockTag::Latest).as_ref(),
})), (JsonRpcMethod::GetBlockWithTxs, JsonRpcRequestParams::GetBlockWithTxs(GetBlockWithTxsRequestRef {
block_id: BlockId::Tag(BlockTag::Latest).as_ref(),
}))]).await;

match batch_mixed_results {
Ok(v) => println!("{v:#?}"),
Err(e) => println!("Error: {e:#?}"),
}

let batched_blocks = provider.get_block_with_tx_hashes_batch(vec![BlockId::Tag(BlockTag::Latest), BlockId::Tag(BlockTag::Latest)]).await;

match batched_blocks {
Ok(v) => println!("{v:#?}"),
Err(e) => println!("Error: {e:#?}"),
}

}
44 changes: 43 additions & 1 deletion starknet-providers/src/any.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use async_trait::async_trait;
use serde::Serialize;
use starknet_core::types::{
BlockHashAndNumber, BlockId, BroadcastedDeclareTransaction,
BroadcastedDeployAccountTransaction, BroadcastedInvokeTransaction, BroadcastedTransaction,
Expand All @@ -11,7 +12,7 @@ use starknet_core::types::{
};

use crate::{
jsonrpc::{HttpTransport, JsonRpcClient},
jsonrpc::{HttpTransport, JsonRpcClient, JsonRpcMethod},
Provider, ProviderError, SequencerGatewayProvider,
};

Expand Down Expand Up @@ -663,4 +664,45 @@ impl Provider for AnyProvider {
}
}
}

async fn batch_requests<I, P>(
&self,
requests: I,
) -> Result<Vec<serde_json::Value>, ProviderError>
where
I: IntoIterator<Item = (JsonRpcMethod, P)> + Send + Sync,
P: Serialize + Send + Sync,
{
match self {
Self::JsonRpcHttp(inner) => {
<JsonRpcClient<HttpTransport> as Provider>::batch_requests(inner, requests).await
}
Self::SequencerGateway(inner) => {
<SequencerGatewayProvider as Provider>::batch_requests(inner, requests).await
}
}
}
async fn get_block_with_tx_hashes_batch<B>(
&self,
block_ids: Vec<B>,
) -> Result<Vec<MaybePendingBlockWithTxHashes>, ProviderError>
where
B: AsRef<BlockId> + Send + Sync,
{
match self {
Self::JsonRpcHttp(inner) => {
<JsonRpcClient<HttpTransport> as Provider>::get_block_with_tx_hashes_batch(
inner, block_ids,
)
.await
}
Self::SequencerGateway(inner) => {
<SequencerGatewayProvider as Provider>::get_block_with_tx_hashes_batch(
inner, block_ids,
)
.await
}
}
}

}
66 changes: 66 additions & 0 deletions starknet-providers/src/jsonrpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,13 +91,25 @@ pub enum JsonRpcMethod {
TraceBlockTransactions,
}

#[derive(Debug, Clone, Serialize)]
#[serde(untagged)]
pub enum JsonRpcRequestParams<'a> {
GetBlockWithTxHashes(GetBlockWithTxHashesRequestRef<'a>),
GetBlockWithTxs(GetBlockWithTxsRequestRef<'a>),
}

#[derive(Debug, Clone)]
pub struct JsonRpcRequest {
pub id: u64,
pub data: JsonRpcRequestData,
}

#[derive(Debug, Clone)]
pub struct JsonRpcRequests {
pub requests: Vec<JsonRpcRequest>,
}

#[derive(Debug, Clone, Serialize)]
pub enum JsonRpcRequestData {
SpecVersion(SpecVersionRequest),
GetBlockWithTxHashes(GetBlockWithTxHashesRequest),
Expand Down Expand Up @@ -204,6 +216,31 @@ where
}
}
}
async fn send_requests<I, P, R>(&self, requests: I) -> Result<Vec<R>, ProviderError>
where
I: IntoIterator<Item = (JsonRpcMethod, P)> + Send + Sync,
P: Serialize + Send + Sync,
R: DeserializeOwned,
{
let responses = self
.transport
.send_requests(requests)
.await
.map_err(JsonRpcClientError::TransportError)?;

responses
.into_iter()
.map(|response| match response {
JsonRpcResponse::Success { result, .. } => Ok(result),
JsonRpcResponse::Error { error, .. } => {
Err(match TryInto::<StarknetError>::try_into(&error) {
Ok(error) => ProviderError::StarknetError(error),
Err(_) => JsonRpcClientError::<T::Error>::JsonRpcError(error).into(),
})
}
})
.collect()
}
}

#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
Expand All @@ -218,6 +255,35 @@ where
.await
}

async fn batch_requests<I, P>(
&self,
requests: I,
) -> Result<Vec<serde_json::Value>, ProviderError>
where
I: IntoIterator<Item = (JsonRpcMethod, P)> + Send + Sync,
P: Serialize + Send + Sync,
{
self.send_requests(requests).await
}

async fn get_block_with_tx_hashes_batch<B>(
&self,
block_ids: Vec<B>,
) -> Result<Vec<MaybePendingBlockWithTxHashes>, ProviderError>
where
B: AsRef<BlockId> + Send + Sync,
{
let requests = block_ids.iter().map(|b_id| {
(
JsonRpcMethod::GetBlockWithTxHashes,
GetBlockWithTxHashesRequestRef {
block_id: b_id.as_ref(),
},
)
});
self.send_requests(requests).await
}

/// Get block information with transaction hashes given the block id
async fn get_block_with_tx_hashes<B>(
&self,
Expand Down
42 changes: 42 additions & 0 deletions starknet-providers/src/jsonrpc/transports/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,4 +101,46 @@ impl JsonRpcTransport for HttpTransport {

Ok(parsed_response)
}

async fn send_requests<I, P, R>(
&self,
requests: I,
) -> Result<Vec<JsonRpcResponse<R>>, Self::Error>
where
I: IntoIterator<Item = (JsonRpcMethod, P)> + Send,
P: Serialize + Send,
R: DeserializeOwned,
{
let batch_requests: Vec<_> = requests
.into_iter()
.enumerate()
.map(|(id, (method, params))| JsonRpcRequest {
id: id as u64 + 1,
jsonrpc: "2.0",
method,
params,
})
.collect();

let serialized_batch = serde_json::to_string(&batch_requests).map_err(Self::Error::Json)?;

let mut request = self
.client
.post(self.url.clone())
.body(serialized_batch)
.header("Content-Type", "application/json");

for (name, value) in self.headers.iter() {
request = request.header(name, value);
}

let response = request.send().await.map_err(Self::Error::Reqwest)?;

let response_body = response.text().await.map_err(Self::Error::Reqwest)?;
trace!("Response from JSON-RPC: {}", response_body);

let parsed_response = serde_json::from_str(&response_body).map_err(Self::Error::Json)?;

Ok(parsed_response)
}
}
9 changes: 9 additions & 0 deletions starknet-providers/src/jsonrpc/transports/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,13 @@ pub trait JsonRpcTransport {
where
P: Serialize + Send + Sync,
R: DeserializeOwned;

async fn send_requests<I, P, R>(
&self,
requests: I,
) -> Result<Vec<JsonRpcResponse<R>>, Self::Error>
where
I: IntoIterator<Item = (JsonRpcMethod, P)> + Send + Sync,
P: Serialize + Send + Sync,
R: DeserializeOwned;
}
18 changes: 18 additions & 0 deletions starknet-providers/src/provider.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use async_trait::async_trait;
use auto_impl::auto_impl;
use serde::Serialize;
use starknet_core::types::{
BlockHashAndNumber, BlockId, BroadcastedDeclareTransaction,
BroadcastedDeployAccountTransaction, BroadcastedInvokeTransaction, BroadcastedTransaction,
Expand All @@ -12,6 +13,8 @@ use starknet_core::types::{
};
use std::{any::Any, error::Error, fmt::Debug};

use crate::jsonrpc::JsonRpcMethod;

#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
#[auto_impl(&, Box, Arc)]
Expand Down Expand Up @@ -298,6 +301,21 @@ pub trait Provider {
Err(ProviderError::ArrayLengthMismatch)
}
}

async fn batch_requests<I, P>(
&self,
requests: I,
) -> Result<Vec<serde_json::Value>, ProviderError>
where
I: IntoIterator<Item = (JsonRpcMethod, P)> + Send + Sync,
P: Serialize + Send + Sync;

async fn get_block_with_tx_hashes_batch<B>(
&self,
block_ids: Vec<B>,
) -> Result<Vec<MaybePendingBlockWithTxHashes>, ProviderError>
where
B: AsRef<BlockId> + Send + Sync;
}

/// Trait for implementation-specific error type. These errors are irrelevant in most cases,
Expand Down
23 changes: 23 additions & 0 deletions starknet-providers/src/sequencer/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
use std::any::Any;

use async_trait::async_trait;
use serde::Serialize;
use starknet_core::types::{
BlockHashAndNumber, BlockId, BroadcastedDeclareTransaction,
BroadcastedDeployAccountTransaction, BroadcastedInvokeTransaction, BroadcastedTransaction,
Expand All @@ -15,6 +16,7 @@ use starknet_core::types::{
};

use crate::{
jsonrpc::JsonRpcMethod,
provider::ProviderImplError,
sequencer::{models::conversions::ConversionError, GatewayClientError},
Provider, ProviderError, SequencerGatewayProvider,
Expand All @@ -30,6 +32,27 @@ impl Provider for SequencerGatewayProvider {
Ok(String::from("0.7.1"))
}

async fn batch_requests<I, P>(
&self,
requests: I,
) -> Result<Vec<serde_json::Value>, ProviderError>
where
I: IntoIterator<Item = (JsonRpcMethod, P)> + Send + Sync,
P: Serialize + Send + Sync,
{
Ok(self.batch_requests(requests).await?)
}

async fn get_block_with_tx_hashes_batch<B>(
&self,
block_ids: Vec<B>,
) -> Result<Vec<MaybePendingBlockWithTxHashes>, ProviderError>
where
B: AsRef<BlockId> + Send + Sync,
{
Ok(self.get_block_with_tx_hashes_batch(block_ids).await?)
}

async fn get_block_with_tx_hashes<B>(
&self,
block_id: B,
Expand Down

0 comments on commit 681d7a4

Please sign in to comment.