Skip to content

Commit

Permalink
Polish the Subscription API (#2208)
Browse files Browse the repository at this point in the history
  • Loading branch information
serban300 authored and bkontur committed May 17, 2024
1 parent 160d12c commit 1cdb84f
Show file tree
Hide file tree
Showing 8 changed files with 189 additions and 152 deletions.
53 changes: 34 additions & 19 deletions bridges/relays/client-substrate/src/client/caching.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@
//! method calls.
use crate::{
client::{Client, SharedSubscriptionFactory},
client::{Client, SubscriptionBroadcaster},
error::{Error, Result},
AccountIdOf, AccountKeyPairOf, BlockNumberOf, Chain, ChainWithGrandpa, ChainWithTransactions,
HashOf, HeaderIdOf, HeaderOf, NonceOf, SignedBlockOf, SimpleRuntimeVersion, Subscription,
TransactionTracker, UnsignedTransaction, ANCIENT_BLOCK_THRESHOLD,
};
use std::future::Future;

use async_std::sync::{Arc, Mutex, RwLock};
use async_trait::async_trait;
Expand Down Expand Up @@ -54,8 +55,8 @@ pub struct CachingClient<C: Chain, B: Client<C>> {

/// Client data, shared by all `CachingClient` clones.
struct ClientData<C: Chain> {
grandpa_justifications: Arc<Mutex<Option<SharedSubscriptionFactory<Bytes>>>>,
beefy_justifications: Arc<Mutex<Option<SharedSubscriptionFactory<Bytes>>>>,
grandpa_justifications: Arc<Mutex<Option<SubscriptionBroadcaster<Bytes>>>>,
beefy_justifications: Arc<Mutex<Option<SubscriptionBroadcaster<Bytes>>>>,
// `quick_cache::sync::Cache` has the `get_or_insert_async` method, which fits our needs,
// but it uses synchronization primitives that are not aware of async execution. They
// can block the executor threads and cause deadlocks => let's use primitives from
Expand Down Expand Up @@ -112,6 +113,26 @@ impl<C: Chain, B: Client<C>> CachingClient<C, B> {
cache.write().await.insert(key.clone(), value.clone());
Ok(value)
}

async fn subscribe_finality_justifications<'a>(
&'a self,
maybe_broadcaster: &Mutex<Option<SubscriptionBroadcaster<Bytes>>>,
do_subscribe: impl Future<Output = Result<Subscription<Bytes>>> + 'a,
) -> Result<Subscription<Bytes>> {
let mut maybe_broadcaster = maybe_broadcaster.lock().await;
let broadcaster = match maybe_broadcaster.as_ref() {
Some(justifications) => justifications,
None => {
let broadcaster = match SubscriptionBroadcaster::new(do_subscribe.await?) {
Ok(broadcaster) => broadcaster,
Err(subscription) => return Ok(subscription),
};
maybe_broadcaster.get_or_insert(broadcaster)
},
};

broadcaster.subscribe().await
}
}

impl<C: Chain, B: Client<C>> std::fmt::Debug for CachingClient<C, B> {
Expand Down Expand Up @@ -192,14 +213,11 @@ impl<C: Chain, B: Client<C>> Client<C> for CachingClient<C, B> {
where
C: ChainWithGrandpa,
{
let mut grandpa_justifications = self.data.grandpa_justifications.lock().await;
if let Some(ref grandpa_justifications) = *grandpa_justifications {
grandpa_justifications.subscribe().await
} else {
let subscription = self.backend.subscribe_grandpa_finality_justifications().await?;
*grandpa_justifications = Some(subscription.factory());
Ok(subscription)
}
self.subscribe_finality_justifications(
&self.data.grandpa_justifications,
self.backend.subscribe_grandpa_finality_justifications(),
)
.await
}

async fn generate_grandpa_key_ownership_proof(
Expand All @@ -214,14 +232,11 @@ impl<C: Chain, B: Client<C>> Client<C> for CachingClient<C, B> {
}

async fn subscribe_beefy_finality_justifications(&self) -> Result<Subscription<Bytes>> {
let mut beefy_justifications = self.data.beefy_justifications.lock().await;
if let Some(ref beefy_justifications) = *beefy_justifications {
beefy_justifications.subscribe().await
} else {
let subscription = self.backend.subscribe_beefy_finality_justifications().await?;
*beefy_justifications = Some(subscription.factory());
Ok(subscription)
}
self.subscribe_finality_justifications(
&self.data.beefy_justifications,
self.backend.subscribe_beefy_finality_justifications(),
)
.await
}

async fn token_decimals(&self) -> Result<Option<u64>> {
Expand Down
2 changes: 1 addition & 1 deletion bridges/relays/client-substrate/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ mod rpc_api;
mod subscription;

pub use client::Client;
pub use subscription::{SharedSubscriptionFactory, Subscription, UnderlyingSubscription};
pub use subscription::{StreamDescription, Subscription, SubscriptionBroadcaster};

/// Type of RPC client with caching support.
pub type RpcWithCachingClient<C> = CachingClient<C, RpcClient<C>>;
Expand Down
65 changes: 35 additions & 30 deletions bridges/relays/client-substrate/src/client/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use crate::{
SubstrateFrameSystemClient, SubstrateGrandpaClient, SubstrateStateClient,
SubstrateSystemClient,
},
subscription::{Subscription, Unwrap},
subscription::{StreamDescription, Subscription},
Client,
},
error::{Error, Result},
Expand All @@ -40,8 +40,11 @@ use async_trait::async_trait;
use bp_runtime::HeaderIdProvider;
use codec::Encode;
use frame_support::weights::Weight;
use futures::{TryFutureExt, TryStreamExt};
use jsonrpsee::ws_client::{WsClient, WsClientBuilder};
use futures::TryFutureExt;
use jsonrpsee::{
core::{client::Subscription as RpcSubscription, ClientError},
ws_client::{WsClient, WsClientBuilder},
};
use num_traits::Zero;
use pallet_transaction_payment::RuntimeDispatchInfo;
use relay_utils::{relay_loop::RECONNECT_DELAY, STALL_TIMEOUT};
Expand Down Expand Up @@ -241,6 +244,25 @@ impl<C: Chain> RpcClient<C> {
})
.await
}

async fn subscribe_finality_justifications<Fut>(
&self,
gadget_name: &str,
do_subscribe: impl FnOnce(Arc<WsClient>) -> Fut + Send + 'static,
) -> Result<Subscription<Bytes>>
where
Fut: Future<Output = std::result::Result<RpcSubscription<Bytes>, ClientError>> + Send,
{
let subscription = self
.jsonrpsee_execute(move |client| async move { Ok(do_subscribe(client).await?) })
.map_err(|e| Error::failed_to_subscribe_justification::<C>(e))
.await?;

Ok(Subscription::new_forwarded(
StreamDescription::new(format!("{} justifications", gadget_name), C::NAME.into()),
subscription,
))
}
}

impl<C: Chain> Clone for RpcClient<C> {
Expand Down Expand Up @@ -329,19 +351,9 @@ impl<C: Chain> Client<C> for RpcClient<C> {
where
C: ChainWithGrandpa,
{
Subscription::new(
C::NAME.into(),
"GRANDPA justifications".into(),
self.jsonrpsee_execute(move |client| async move {
Ok(Box::new(
SubstrateGrandpaClient::<C>::subscribe_justifications(&*client)
.await?
.map_err(Into::into),
))
})
.map_err(|e| Error::failed_to_subscribe_justification::<C>(e))
.await?,
)
self.subscribe_finality_justifications("GRANDPA", move |client| async move {
SubstrateGrandpaClient::<C>::subscribe_justifications(&*client).await
})
.await
}

Expand All @@ -360,19 +372,9 @@ impl<C: Chain> Client<C> for RpcClient<C> {
}

async fn subscribe_beefy_finality_justifications(&self) -> Result<Subscription<Bytes>> {
Subscription::new(
C::NAME.into(),
"BEEFY justifications".into(),
self.jsonrpsee_execute(move |client| async move {
Ok(Box::new(
SubstrateBeefyClient::<C>::subscribe_justifications(&*client)
.await?
.map_err(Into::into),
))
})
.map_err(|e| Error::failed_to_subscribe_justification::<C>(e))
.await?,
)
self.subscribe_finality_justifications("BEEFY", move |client| async move {
SubstrateBeefyClient::<C>::subscribe_justifications(&*client).await
})
.await
}

Expand Down Expand Up @@ -511,7 +513,10 @@ impl<C: Chain> Client<C> for RpcClient<C> {
self_clone,
stall_timeout,
tx_hash,
Box::new(Unwrap::new(C::NAME.into(), "transaction events".into(), subscription)),
Subscription::new_forwarded(
StreamDescription::new("transaction events".into(), C::NAME.into()),
subscription,
),
))
})
.await
Expand Down
Loading

0 comments on commit 1cdb84f

Please sign in to comment.