Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Force client disconnects if the Geyser gRPC is lagging behind #470

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,194 changes: 1,135 additions & 59 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ criterion = "0.5.1"
crossbeam-channel = "0.5.8"
env_logger = "0.11.3"
futures = "0.3.24"
once_cell = "1.19.0"
git-version = "0.3.5"
hex = "0.4.3"
hostname = "0.4.0"
Expand All @@ -55,6 +56,7 @@ solana-logger = "~2.0.15"
solana-sdk = "~2.0.15"
solana-storage-proto = "~2.0.15"
solana-transaction-status = "~2.0.15"
solana-client = "~2.0.15"
smallvec = "1.13.2"
spl-token-2022 = "4.0.0"
thiserror = "1.0.63"
Expand Down
2 changes: 2 additions & 0 deletions yellowstone-grpc-geyser/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ log = { workspace = true }
prometheus = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
solana-client = { workspace = true }
solana-logger = { workspace = true }
solana-sdk = { workspace = true }
solana-transaction-status = { workspace = true }
Expand All @@ -46,6 +47,7 @@ tokio-stream = { workspace = true }
tonic = { workspace = true, features = ["gzip", "zstd", "tls", "tls-roots"] }
tonic-health = { workspace = true }
yellowstone-grpc-proto = { workspace = true, features = ["convert", "plugin"] }
once_cell = {workspace = true}

[build-dependencies]
anyhow = { workspace = true }
Expand Down
1 change: 1 addition & 0 deletions yellowstone-grpc-geyser/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ pub struct Config {
/// Collect client filters, processed slot and make it available on prometheus port `/debug_clients`
#[serde(default)]
pub debug_clients_http: bool,
pub rpc_url: Option<String>,
}

impl Config {
Expand Down
19 changes: 17 additions & 2 deletions yellowstone-grpc-geyser/src/grpc.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use {
crate::{
config::ConfigGrpc,
metrics::{self, DebugClientMessage},
metrics::{self, commitment_level_as_str, DebugClientMessage},
monitor::{self, HEALTH_CHECK_SLOT_DISTANCE},
version::GrpcVersionInfo,
},
anyhow::Context,
Expand Down Expand Up @@ -744,7 +745,7 @@ impl GrpcService {
});
info!("client #{id}: new");

let mut is_alive = true;
let mut is_alive: bool = true;
if let Some(snapshot_rx) = snapshot_rx.take() {
Self::client_loop_snapshot(
id,
Expand Down Expand Up @@ -800,6 +801,20 @@ impl GrpcService {
}
}
message = messages_rx.recv() => {
let latest_slot = monitor::LATEST_SLOT.load(Ordering::SeqCst);
if latest_slot > 0 {
if let Ok(last_slot_plugin) = metrics::SLOT_STATUS_PLUGIN
.get_metric_with_label_values(&[commitment_level_as_str(CommitmentLevel::Processed)])
{
let last_updated_slot = last_slot_plugin.get();
if (last_updated_slot + HEALTH_CHECK_SLOT_DISTANCE as i64) < latest_slot as i64 {
error!("Latest slot from plugin is lagged, plugin is lagging behind disconnecting client #{id}");
stream_tx.send(Err(Status::internal("gRPC plugin is lagging behind. Disconnecting client."))).await.unwrap();
break 'outer;
}
}
}

let (commitment, messages) = match message {
Ok((commitment, messages)) => (commitment, messages),
Err(broadcast::error::RecvError::Closed) => {
Expand Down
1 change: 1 addition & 0 deletions yellowstone-grpc-geyser/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
pub mod config;
pub mod grpc;
pub mod metrics;
pub mod monitor;
pub mod plugin;
pub mod version;

Expand Down
4 changes: 2 additions & 2 deletions yellowstone-grpc-geyser/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ lazy_static::lazy_static! {
&["status"]
).unwrap();

static ref SLOT_STATUS_PLUGIN: IntGaugeVec = IntGaugeVec::new(
pub static ref SLOT_STATUS_PLUGIN: IntGaugeVec = IntGaugeVec::new(
Opts::new("slot_status_plugin", "Latest processed slot in the plugin to client queues"),
&["status"]
).unwrap();
Expand Down Expand Up @@ -375,7 +375,7 @@ pub fn missed_status_message_inc(status: CommitmentLevel) {
.inc()
}

const fn commitment_level_as_str(commitment: CommitmentLevel) -> &'static str {
pub const fn commitment_level_as_str(commitment: CommitmentLevel) -> &'static str {
match commitment {
CommitmentLevel::Processed => "processed",
CommitmentLevel::Confirmed => "confirmed",
Expand Down
46 changes: 46 additions & 0 deletions yellowstone-grpc-geyser/src/monitor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
use std::{
sync::{
atomic::{AtomicU64, Ordering},
Arc,
},
time::Duration,
};

use once_cell::sync::Lazy;
use solana_client::nonblocking::rpc_client::RpcClient;
use solana_sdk::commitment_config::CommitmentConfig;
use tokio::time::{interval, sleep};

pub const HEALTH_CHECK_SLOT_DISTANCE: u64 = 100;

pub static LATEST_SLOT: Lazy<Arc<AtomicU64>> = Lazy::new(|| Arc::new(AtomicU64::new(0)));

pub async fn fetch_current_slot_with_infinite_retry(client: &RpcClient) -> u64 {
loop {
match client
.get_slot_with_commitment(CommitmentConfig::processed())
.await
{
Ok(slot) => {
return slot;
}
Err(e) => {
log::error!("Failed to fetch current slot: {}", e);
sleep(Duration::from_secs(5)).await;
}
}
}
}

pub async fn update_latest_slot(rpc_client: &RpcClient) {
let slot = fetch_current_slot_with_infinite_retry(rpc_client).await;
LATEST_SLOT.fetch_max(slot, Ordering::SeqCst);
}

pub async fn update_latest_slot_loop(rpc_client: RpcClient) {
let mut interval = interval(Duration::from_millis(100));
loop {
interval.tick().await;
update_latest_slot(&rpc_client).await;
}
}
7 changes: 7 additions & 0 deletions yellowstone-grpc-geyser/src/plugin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@ use {
config::Config,
grpc::GrpcService,
metrics::{self, PrometheusService},
monitor::update_latest_slot_loop,
},
agave_geyser_plugin_interface::geyser_plugin_interface::{
GeyserPlugin, GeyserPluginError, ReplicaAccountInfoVersions, ReplicaBlockInfoVersions,
ReplicaEntryInfoVersions, ReplicaTransactionInfoVersions, Result as PluginResult,
SlotStatus,
},
solana_client::nonblocking::rpc_client::RpcClient,
std::{
concat, env,
sync::{
Expand Down Expand Up @@ -77,6 +79,11 @@ impl GeyserPlugin for Plugin {
.build()
.map_err(|error| GeyserPluginError::Custom(Box::new(error)))?;

if let Some(rpc_url) = config.rpc_url {
let rpc_client = RpcClient::new(rpc_url);
runtime.spawn(update_latest_slot_loop(rpc_client));
}

let (snapshot_channel, grpc_channel, grpc_shutdown, prometheus) =
runtime.block_on(async move {
let (debug_client_tx, debug_client_rx) = mpsc::unbounded_channel();
Expand Down