Skip to content

Commit

Permalink
feat: implement JsonRpc transport for batch requests
Browse files Browse the repository at this point in the history
  • Loading branch information
thetheveloper authored and xJonathanLEI committed Aug 27, 2024
1 parent 660a732 commit 3e084a5
Show file tree
Hide file tree
Showing 7 changed files with 265 additions and 1 deletion.
57 changes: 57 additions & 0 deletions examples/jsonrpc_batch.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
use starknet::{
core::types::{BlockId, BlockTag},
providers::{
jsonrpc::{HttpTransport, JsonRpcClient},
Provider, Url,
},
};
use starknet_core::types::requests::{GetBlockWithTxHashesRequestRef, GetBlockWithTxsRequestRef};
use starknet_providers::jsonrpc::{JsonRpcMethod, JsonRpcRequestParams};

#[tokio::main]
async fn main() {
// Create a new JSON RPC client using HTTP transport with the specified URL
let provider = JsonRpcClient::new(HttpTransport::new(
Url::parse("https://starknet-sepolia.public.blastapi.io/rpc/v0_7").unwrap(),
));

// batch_requests allows to define a vector of requests for batch processing, ensuring each request specifies its corresponding JsonRpcMethod and JsonRpcRequestParams.
// This approach allows for a generic way to handle batch requests.
let batch_mixed_results = provider
.batch_requests(vec![
// Request 1: Retrieve block data including transaction hashes.
(
JsonRpcMethod::GetBlockWithTxHashes,
JsonRpcRequestParams::GetBlockWithTxHashes(GetBlockWithTxHashesRequestRef {
block_id: BlockId::Tag(BlockTag::Latest).as_ref(),
}),
),
// Request 2: Retrieve block data including full transaction details.
(
JsonRpcMethod::GetBlockWithTxs,
JsonRpcRequestParams::GetBlockWithTxs(GetBlockWithTxsRequestRef {
block_id: BlockId::Tag(BlockTag::Latest).as_ref(),
}),
),
])
.await;

match batch_mixed_results {
Ok(v) => println!("{v:#?}"),
Err(e) => println!("Error: {e:#?}"),
}

// The following example demonstrates the process of sending a batch request to retrieve multiple blocks, each including transaction hashes.
// get_block_with_tx_hashes_batch utilizes a vector of BlockId parameters to construct the batch request.
let batched_blocks = provider
.get_block_with_tx_hashes_batch(vec![
BlockId::Tag(BlockTag::Latest),
BlockId::Tag(BlockTag::Latest),
])
.await;

match batched_blocks {
Ok(v) => println!("{v:#?}"),
Err(e) => println!("Error: {e:#?}"),
}
}
30 changes: 29 additions & 1 deletion starknet-providers/src/any.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use async_trait::async_trait;
use serde::Serialize;
use starknet_core::types::{
BlockHashAndNumber, BlockId, BroadcastedDeclareTransaction,
BroadcastedDeployAccountTransaction, BroadcastedInvokeTransaction, BroadcastedTransaction,
Expand All @@ -11,7 +12,7 @@ use starknet_core::types::{
};

use crate::{
jsonrpc::{HttpTransport, JsonRpcClient},
jsonrpc::{HttpTransport, JsonRpcClient, JsonRpcMethod},
Provider, ProviderError, SequencerGatewayProvider,
};

Expand Down Expand Up @@ -665,4 +666,31 @@ impl Provider for AnyProvider {
}
}
}

async fn batch_requests<I, P>(
&self,
requests: I,
) -> Result<Vec<serde_json::Value>, ProviderError>
where
I: IntoIterator<Item = (JsonRpcMethod, P)> + Send + Sync,
P: Serialize + Send + Sync,
{
match self {
Self::JsonRpcHttp(inner) => inner.batch_requests(requests).await,
Self::SequencerGateway(inner) => inner.batch_requests(requests).await,
}
}

async fn get_block_with_tx_hashes_batch<B>(
&self,
block_ids: Vec<B>,
) -> Result<Vec<MaybePendingBlockWithTxHashes>, ProviderError>
where
B: AsRef<BlockId> + Send + Sync,
{
match self {
Self::JsonRpcHttp(inner) => inner.get_block_with_tx_hashes_batch(block_ids).await,
Self::SequencerGateway(inner) => inner.get_block_with_tx_hashes_batch(block_ids).await,
}
}
}
72 changes: 72 additions & 0 deletions starknet-providers/src/jsonrpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,16 @@ pub enum JsonRpcMethod {
}

/// JSON-RPC request.
#[derive(Debug, Clone, Serialize)]
#[serde(untagged)]
pub enum JsonRpcRequestParams<'a> {
/// Parameters for getting a block with transaction hashes.
GetBlockWithTxHashes(GetBlockWithTxHashesRequestRef<'a>),
/// Parameters for getting a block with full transactions.
GetBlockWithTxs(GetBlockWithTxsRequestRef<'a>),
}

/// Represents a JSON-RPC request with a unique identifier.
#[derive(Debug, Clone)]
pub struct JsonRpcRequest {
/// ID of the request. Useful for identifying responses in certain transports like `WebSocket`.
Expand All @@ -136,6 +146,13 @@ pub struct JsonRpcRequest {

/// Typed request data for Starknet JSON-RPC requests.
#[derive(Debug, Clone)]
pub struct JsonRpcRequests {
/// A list of JSON-RPC requests.
pub requests: Vec<JsonRpcRequest>,
}

/// Represents the data for various JSON-RPC requests
#[derive(Debug, Clone, Serialize)]
pub enum JsonRpcRequestData {
/// Request data for `starknet_specVersion`.
SpecVersion(SpecVersionRequest),
Expand Down Expand Up @@ -303,6 +320,32 @@ where
}
}
}

async fn send_requests<I, P, R>(&self, requests: I) -> Result<Vec<R>, ProviderError>
where
I: IntoIterator<Item = (JsonRpcMethod, P)> + Send + Sync,
P: Serialize + Send + Sync,
R: DeserializeOwned,
{
let responses = self
.transport
.send_requests(requests)
.await
.map_err(JsonRpcClientError::TransportError)?;

responses
.into_iter()
.map(|response| match response {
JsonRpcResponse::Success { result, .. } => Ok(result),
JsonRpcResponse::Error { error, .. } => {
Err(match TryInto::<StarknetError>::try_into(&error) {
Ok(error) => ProviderError::StarknetError(error),
Err(_) => JsonRpcClientError::<T::Error>::JsonRpcError(error).into(),
})
}
})
.collect()
}
}

#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
Expand All @@ -317,6 +360,35 @@ where
.await
}

async fn batch_requests<I, P>(
&self,
requests: I,
) -> Result<Vec<serde_json::Value>, ProviderError>
where
I: IntoIterator<Item = (JsonRpcMethod, P)> + Send + Sync,
P: Serialize + Send + Sync,
{
self.send_requests(requests).await
}

async fn get_block_with_tx_hashes_batch<B>(
&self,
block_ids: Vec<B>,
) -> Result<Vec<MaybePendingBlockWithTxHashes>, ProviderError>
where
B: AsRef<BlockId> + Send + Sync,
{
let requests = block_ids.iter().map(|block_id| {
(
JsonRpcMethod::GetBlockWithTxHashes,
GetBlockWithTxHashesRequestRef {
block_id: block_id.as_ref(),
},
)
});
self.send_requests(requests).await
}

/// Get block information with transaction hashes given the block id
async fn get_block_with_tx_hashes<B>(
&self,
Expand Down
42 changes: 42 additions & 0 deletions starknet-providers/src/jsonrpc/transports/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,4 +110,46 @@ impl JsonRpcTransport for HttpTransport {

Ok(parsed_response)
}

async fn send_requests<I, P, R>(
&self,
requests: I,
) -> Result<Vec<JsonRpcResponse<R>>, Self::Error>
where
I: IntoIterator<Item = (JsonRpcMethod, P)> + Send,
P: Serialize + Send,
R: DeserializeOwned,
{
let batch_requests: Vec<_> = requests
.into_iter()
.enumerate()
.map(|(id, (method, params))| JsonRpcRequest {
id: id as u64 + 1,
jsonrpc: "2.0",
method,
params,
})
.collect();

let serialized_batch = serde_json::to_string(&batch_requests).map_err(Self::Error::Json)?;

let mut request = self
.client
.post(self.url.clone())
.body(serialized_batch)
.header("Content-Type", "application/json");

for (name, value) in &self.headers {
request = request.header(name, value);
}

let response = request.send().await.map_err(Self::Error::Reqwest)?;

let response_body = response.text().await.map_err(Self::Error::Reqwest)?;
trace!("Response from JSON-RPC: {}", response_body);

let parsed_response = serde_json::from_str(&response_body).map_err(Self::Error::Json)?;

Ok(parsed_response)
}
}
10 changes: 10 additions & 0 deletions starknet-providers/src/jsonrpc/transports/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,14 @@ pub trait JsonRpcTransport {
where
P: Serialize + Send + Sync,
R: DeserializeOwned;

/// Sends multiple JSON-RPC requests and retrieves their responses.
async fn send_requests<I, P, R>(
&self,
requests: I,
) -> Result<Vec<JsonRpcResponse<R>>, Self::Error>
where
I: IntoIterator<Item = (JsonRpcMethod, P)> + Send + Sync,
P: Serialize + Send + Sync,
R: DeserializeOwned;
}
32 changes: 32 additions & 0 deletions starknet-providers/src/provider.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use async_trait::async_trait;
use auto_impl::auto_impl;
use serde::Serialize;
use starknet_core::types::{
BlockHashAndNumber, BlockId, BroadcastedDeclareTransaction,
BroadcastedDeployAccountTransaction, BroadcastedInvokeTransaction, BroadcastedTransaction,
Expand All @@ -22,6 +23,9 @@ use std::{any::Any, error::Error, fmt::Debug};
/// The legacy [`SequencerGatewayProvider`](crate::sequencer::SequencerGatewayProvider) still
/// implements this trait for backward compatibility reasons, but most of its methods no longer work
/// in practice, as public sequencer servers have generally block access to most methods.
use crate::jsonrpc::JsonRpcMethod;

/// Represents a provider interface for interacting with the Starknet network.
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
#[auto_impl(&, Box, Arc)]
Expand Down Expand Up @@ -311,6 +315,34 @@ pub trait Provider {
Err(ProviderError::ArrayLengthMismatch)
}
}

/// Executes a batch of JSON-RPC requests concurrently.
///
/// This method takes an iterator of requests, where each request is a tuple consisting of a
/// JSON-RPC method and its parameters. It returns a vector of results, each encoded as a
/// `serde_json::Value`.
///
/// # Type Parameters
/// - `I`: An iterator type where each item is a tuple containing a `JsonRpcMethod` and parameters `P`.
/// - `P`: The type of the parameters to be serialized for the JSON-RPC request.
///
/// # Errors
/// Returns `ProviderError` if any of the requests fail.
async fn batch_requests<I, P>(
&self,
requests: I,
) -> Result<Vec<serde_json::Value>, ProviderError>
where
I: IntoIterator<Item = (JsonRpcMethod, P)> + Send + Sync,
P: Serialize + Send + Sync;

/// Retrieves blocks information with transaction hashes for a batch of block IDs.
async fn get_block_with_tx_hashes_batch<B>(
&self,
block_ids: Vec<B>,
) -> Result<Vec<MaybePendingBlockWithTxHashes>, ProviderError>
where
B: AsRef<BlockId> + Send + Sync;
}

/// Trait for implementation-specific error type. These errors are irrelevant in most cases,
Expand Down
23 changes: 23 additions & 0 deletions starknet-providers/src/sequencer/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
use std::any::Any;

use async_trait::async_trait;
use serde::Serialize;
use starknet_core::types::{
BlockHashAndNumber, BlockId, BroadcastedDeclareTransaction,
BroadcastedDeployAccountTransaction, BroadcastedInvokeTransaction, BroadcastedTransaction,
Expand All @@ -15,6 +16,7 @@ use starknet_core::types::{
};

use crate::{
jsonrpc::JsonRpcMethod,
provider::ProviderImplError,
sequencer::{models::conversions::ConversionError, GatewayClientError},
Provider, ProviderError, SequencerGatewayProvider,
Expand All @@ -30,6 +32,27 @@ impl Provider for SequencerGatewayProvider {
Ok(String::from("0.7.1"))
}

async fn batch_requests<I, P>(
&self,
requests: I,
) -> Result<Vec<serde_json::Value>, ProviderError>
where
I: IntoIterator<Item = (JsonRpcMethod, P)> + Send + Sync,
P: Serialize + Send + Sync,
{
Ok(self.batch_requests(requests).await?)
}

async fn get_block_with_tx_hashes_batch<B>(
&self,
block_ids: Vec<B>,
) -> Result<Vec<MaybePendingBlockWithTxHashes>, ProviderError>
where
B: AsRef<BlockId> + Send + Sync,
{
Ok(self.get_block_with_tx_hashes_batch(block_ids).await?)
}

async fn get_block_with_tx_hashes<B>(
&self,
block_id: B,
Expand Down

0 comments on commit 3e084a5

Please sign in to comment.