diff --git a/Cargo.lock b/Cargo.lock index 0002ec1..ce3b198 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1343,11 +1343,19 @@ name = "flume" version = "0.10.14" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1657b4441c3403d9f7b3409e47575237dac27b1b5726df654a6ecbf92f0f7577" +dependencies = [ + "spin 0.9.8", +] + +[[package]] +name = "flume" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "55ac459de2512911e4b674ce33cf20befaba382d05b62b008afc1c8b57cbf181" dependencies = [ "futures-core", "futures-sink", - "pin-project", - "spin 0.9.4", + "spin 0.9.8", ] [[package]] @@ -1915,16 +1923,16 @@ checksum = "f9b7d56ba4a8344d6be9729995e6b06f928af29998cdf79fe390cbf6b1fee838" [[package]] name = "lapin" -version = "2.1.1" +version = "2.3.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bd03ea5831b44775e296239a64851e2fd14a80a363d202ba147009ffc994ff0f" +checksum = "fae02c316a8a5922ce7518afa6b6c00e9a099f8e59587567e3331efdd11b8ceb" dependencies = [ "amq-protocol", "async-global-executor-trait", "async-reactor-trait", "async-trait", "executor-trait", - "flume", + "flume 0.11.0", "futures-core", "futures-io", "parking_lot", @@ -2300,7 +2308,7 @@ version = "0.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "681030a937600a36906c185595136d26abfebb4aa9c65701cefcaf8578bb982b" dependencies = [ - "proc-macro-crate 1.2.1", + "proc-macro-crate 3.1.0", "proc-macro2", "quote", "syn 2.0.66", @@ -2454,26 +2462,6 @@ dependencies = [ "num", ] -[[package]] -name = "pin-project" -version = "1.0.12" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ad29a609b6bcd67fee905812e544992d216af9d755757c05ed2d0e15a74c6ecc" -dependencies = [ - "pin-project-internal", -] - -[[package]] -name = "pin-project-internal" -version = "1.0.12" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "069bdb1e05adc7a8990dce9cc75370895fbe4e3d58b9b73bf1aee56359344a55" -dependencies = [ - "proc-macro2", - "quote", - "syn 1.0.102", -] - [[package]] name = "pin-project-lite" version = "0.2.9" @@ -2493,7 +2481,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d894b67aa7a4bf295db5e85349078c604edaa6fa5c8721e8eca3c7729a27f2ac" dependencies = [ "doc-comment", - "flume", + "flume 0.10.14", "parking_lot", "tracing", ] @@ -3603,9 +3591,9 @@ checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d" [[package]] name = "spin" -version = "0.9.4" +version = "0.9.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7f6002a767bff9e83f8eeecf883ecb8011875a21ae8da43bffb817a57e78cc09" +checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" dependencies = [ "lock_api", ] diff --git a/compose.yml b/compose.yml new file mode 100644 index 0000000..34cab28 --- /dev/null +++ b/compose.yml @@ -0,0 +1,24 @@ +services: + geyser-plugin: + build: . + ports: + - 8899:8899 + - 8900:8900 + volumes: + - ./crates/plugin/compose_config.json:/geyser/compose_config.json + depends_on: + rabbitmq: + condition: service_healthy + environment: + - RUST_LOG=warn + + rabbitmq: + image: rabbitmq:3-management + ports: + - 5672:5672 + - 15672:15672 + healthcheck: + test: rabbitmq-diagnostics -q check_port_connectivity + interval: 2s + timeout: 15s + retries: 5 \ No newline at end of file diff --git a/crates/plugin/local_config.json b/crates/plugin/local_config.json new file mode 100644 index 0000000..4fd53df --- /dev/null +++ b/crates/plugin/local_config.json @@ -0,0 +1,32 @@ +{ + "libpath": "../../target/debug/libholaplex_indexer_rabbitmq_geyser.so", + "amqp": { + "network": "mainnet", + "address": "amqp://guest:guest@localhost:5672" + }, + "jobs": { + "limit": 16 + }, + "metrics": {}, + "chainProgress": { + "blockMeta": true, + "slotStatus": true + }, + "accounts": { + "owners": {}, + "pubkeys": {}, + "startup": false + }, + "instructions": { + "programs": {}, + "allTokenCalls": true + }, + "transactions": { + "programs": { + "TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA": "defi", + "11111111111111111111111111111111": "defi" + }, + "pubkeys": {} + }, + "datumProgramInclusions": {} +} \ No newline at end of file diff --git a/crates/plugin/src/sender.rs b/crates/plugin/src/sender.rs index e959f1f..4736bd2 100644 --- a/crates/plugin/src/sender.rs +++ b/crates/plugin/src/sender.rs @@ -1,4 +1,4 @@ -use std::sync::Arc; +use std::{sync::Arc, thread, time::Duration}; use indexer_rabbitmq::{ geyser::{CommittmentLevel, Message, Producer, QueueType, StartupType}, @@ -44,41 +44,76 @@ impl Sender { name: impl Into, startup_type: StartupType, ) -> Result { - let conn = Connection::connect( - &amqp.address, - ConnectionProperties::default() - .with_connection_name(name.into()) - .with_executor(tokio_executor_trait::Tokio::current()) - .with_reactor(tokio_reactor_trait::Tokio), - ) - .await?; - - Producer::new( - &conn, - QueueType::new( - amqp.network, - startup_type, - &Suffix::ProductionUnchecked, - &Suffix::ProductionUnchecked, - CommittmentLevel::Processed, - "unused".to_string(), - )?, - ) - .await + let amqp_name = name.into(); + let mut tries = 0; + let producer = loop { + let delay = Self::get_retry_delay(tries); + thread::sleep(Duration::from_millis(delay)); + tries += 1; + + let Ok(conn) = Connection::connect( + &amqp.address, + ConnectionProperties::default() + .with_connection_name(amqp_name.clone()) + .with_executor(tokio_executor_trait::Tokio::current()) + .with_reactor(tokio_reactor_trait::Tokio), + ) + .await + else { + continue; + }; + + let Ok(prod) = Producer::new( + &conn, + QueueType::new( + amqp.network, + startup_type, + &Suffix::ProductionUnchecked, + &Suffix::ProductionUnchecked, + CommittmentLevel::Processed, + "unused".to_string(), + )?, + ) + .await + else { + continue; + }; + + break prod; + }; + + Ok(producer) } - async fn connect<'a>( - &'a self, - prod: RwLockReadGuard<'a, Producer>, - ) -> Result, indexer_rabbitmq::Error> { - // Anti-deadlock safeguard - force the current reader to hand us their - // lock so we can make sure it's destroyed. - std::mem::drop(prod); - let mut prod = self.producer.write().await; + async fn connect(&self) -> Result, indexer_rabbitmq::Error> { + let mut producer = self.producer.write().await; + + if producer.is_connected() { + // This thread was in line for a write, + // but another thread has already handled the reconnection. + // + // Downgrade to a read so we can retry to send our msg. + let read_lock = producer.downgrade(); + return Ok(read_lock); + } + + log::info!("Reconnecting to AMQP server..."); + + // This thread now has the write lock, + // all others should begin waiting for a read. + // Either through a `send` call, or the `downgrade` check above. - *prod = Self::create_producer(&self.amqp, self.name.as_str(), self.startup_type).await?; + // Reconnect to AMQP server + *producer = + Self::create_producer(&self.amqp, self.name.as_str(), self.startup_type).await?; - Ok(prod.downgrade()) + log::info!("Reconnected to AMQP server!"); + + // Release the write lock, by downgrading, and handing a read lock to the original caller, + // so they can send their message + let producer = producer.downgrade(); + + Ok(producer) } pub async fn send(&self, msg: Message, route: &str) { @@ -91,28 +126,42 @@ impl Sender { } let metrics = &self.metrics; + // If we're in the middle of a reconnect, we'll be waiting here, + // until we can get a read lock, which is what we want. let prod = self.producer.read().await; if prod .write(&msg, Some(route)) .await .map_err(log_err(&metrics.errs)) - .is_ok() + .is_err() { - return; - } + // Drop the read lock. This thread is going to attempt to "promote" itself, + // and try to get a write lock to reconnect. + // + // This will also help allow a writer to grab the Producer if one's waiting + std::mem::drop(prod); - metrics.reconnects.log(1); - let Ok(p) = self.connect(prod).await.map_err(log_err(&metrics.errs)) else { - return; - }; + loop { + let Ok(new_prod) = self.connect().await.map_err(log_err(&metrics.errs)) else { + continue; + }; - match p - .write(&msg, Some(route)) - .await - .map_err(log_err(&metrics.errs)) - { - Ok(()) | Err(()) => (), // Type-level assertion that we consumed the error + if new_prod + .write(&msg, Some(route)) + .await + .map_err(log_err(&metrics.errs)) + .is_ok() + { + // All is well. Unravel this madness + break; + } + } } } + + // Linear backoff maxxing out at 1000ms + fn get_retry_delay(tries: usize) -> u64 { + (tries * 100).min(1000) as u64 + } } diff --git a/crates/rabbitmq/Cargo.toml b/crates/rabbitmq/Cargo.toml index 886553e..d437fab 100644 --- a/crates/rabbitmq/Cargo.toml +++ b/crates/rabbitmq/Cargo.toml @@ -28,7 +28,7 @@ consume-json = [] [dependencies] clap = { version = "3.1.12", default-features = false, features = ["env", "std"], optional = true } futures-util = "0.3.19" -lapin = "2.0.3" +lapin = "2.3.4" log = "0.4.14" rand = "0.8.5" rmp-serde = "1.0.0-beta.2" diff --git a/crates/rabbitmq/src/lib.rs b/crates/rabbitmq/src/lib.rs index 3e5fc0f..8b39e68 100644 --- a/crates/rabbitmq/src/lib.rs +++ b/crates/rabbitmq/src/lib.rs @@ -7,7 +7,7 @@ missing_debug_implementations, missing_copy_implementations )] -#![warn(clippy::pedantic, clippy::cargo, missing_docs)] +#![warn(clippy::pedantic, clippy::cargo)] pub extern crate lapin; diff --git a/crates/rabbitmq/src/producer.rs b/crates/rabbitmq/src/producer.rs index b19a854..cc46798 100644 --- a/crates/rabbitmq/src/producer.rs +++ b/crates/rabbitmq/src/producer.rs @@ -74,4 +74,8 @@ where Ok(()) } + + pub fn is_connected(&self) -> bool { + self.chan.status().connected() + } }