Skip to content

Commit

Permalink
Merge pull request #49 from step-finance/dev
Browse files Browse the repository at this point in the history
Dev -> Live
  • Loading branch information
quellen-sol authored Jul 15, 2024
2 parents 46f0404 + e92221d commit c8263d7
Show file tree
Hide file tree
Showing 7 changed files with 175 additions and 78 deletions.
50 changes: 19 additions & 31 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

24 changes: 24 additions & 0 deletions compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
services:
geyser-plugin:
build: .
ports:
- 8899:8899
- 8900:8900
volumes:
- ./crates/plugin/compose_config.json:/geyser/compose_config.json
depends_on:
rabbitmq:
condition: service_healthy
environment:
- RUST_LOG=warn

rabbitmq:
image: rabbitmq:3-management
ports:
- 5672:5672
- 15672:15672
healthcheck:
test: rabbitmq-diagnostics -q check_port_connectivity
interval: 2s
timeout: 15s
retries: 5
32 changes: 32 additions & 0 deletions crates/plugin/local_config.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
{
"libpath": "../../target/debug/libholaplex_indexer_rabbitmq_geyser.so",
"amqp": {
"network": "mainnet",
"address": "amqp://guest:guest@localhost:5672"
},
"jobs": {
"limit": 16
},
"metrics": {},
"chainProgress": {
"blockMeta": true,
"slotStatus": true
},
"accounts": {
"owners": {},
"pubkeys": {},
"startup": false
},
"instructions": {
"programs": {},
"allTokenCalls": true
},
"transactions": {
"programs": {
"TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA": "defi",
"11111111111111111111111111111111": "defi"
},
"pubkeys": {}
},
"datumProgramInclusions": {}
}
139 changes: 94 additions & 45 deletions crates/plugin/src/sender.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::sync::Arc;
use std::{sync::Arc, thread, time::Duration};

use indexer_rabbitmq::{
geyser::{CommittmentLevel, Message, Producer, QueueType, StartupType},
Expand Down Expand Up @@ -44,41 +44,76 @@ impl Sender {
name: impl Into<indexer_rabbitmq::lapin::types::LongString>,
startup_type: StartupType,
) -> Result<Producer, indexer_rabbitmq::Error> {
let conn = Connection::connect(
&amqp.address,
ConnectionProperties::default()
.with_connection_name(name.into())
.with_executor(tokio_executor_trait::Tokio::current())
.with_reactor(tokio_reactor_trait::Tokio),
)
.await?;

Producer::new(
&conn,
QueueType::new(
amqp.network,
startup_type,
&Suffix::ProductionUnchecked,
&Suffix::ProductionUnchecked,
CommittmentLevel::Processed,
"unused".to_string(),
)?,
)
.await
let amqp_name = name.into();
let mut tries = 0;
let producer = loop {
let delay = Self::get_retry_delay(tries);
thread::sleep(Duration::from_millis(delay));
tries += 1;

let Ok(conn) = Connection::connect(
&amqp.address,
ConnectionProperties::default()
.with_connection_name(amqp_name.clone())
.with_executor(tokio_executor_trait::Tokio::current())
.with_reactor(tokio_reactor_trait::Tokio),
)
.await
else {
continue;
};

let Ok(prod) = Producer::new(
&conn,
QueueType::new(
amqp.network,
startup_type,
&Suffix::ProductionUnchecked,
&Suffix::ProductionUnchecked,
CommittmentLevel::Processed,
"unused".to_string(),
)?,
)
.await
else {
continue;
};

break prod;
};

Ok(producer)
}

async fn connect<'a>(
&'a self,
prod: RwLockReadGuard<'a, Producer>,
) -> Result<RwLockReadGuard<'a, Producer>, indexer_rabbitmq::Error> {
// Anti-deadlock safeguard - force the current reader to hand us their
// lock so we can make sure it's destroyed.
std::mem::drop(prod);
let mut prod = self.producer.write().await;
async fn connect(&self) -> Result<RwLockReadGuard<Producer>, indexer_rabbitmq::Error> {
let mut producer = self.producer.write().await;

if producer.is_connected() {
// This thread was in line for a write,
// but another thread has already handled the reconnection.
//
// Downgrade to a read so we can retry to send our msg.
let read_lock = producer.downgrade();
return Ok(read_lock);
}

log::info!("Reconnecting to AMQP server...");

// This thread now has the write lock,
// all others should begin waiting for a read.
// Either through a `send` call, or the `downgrade` check above.

*prod = Self::create_producer(&self.amqp, self.name.as_str(), self.startup_type).await?;
// Reconnect to AMQP server
*producer =
Self::create_producer(&self.amqp, self.name.as_str(), self.startup_type).await?;

Ok(prod.downgrade())
log::info!("Reconnected to AMQP server!");

// Release the write lock, by downgrading, and handing a read lock to the original caller,
// so they can send their message
let producer = producer.downgrade();

Ok(producer)
}

pub async fn send(&self, msg: Message, route: &str) {
Expand All @@ -91,28 +126,42 @@ impl Sender {
}

let metrics = &self.metrics;
// If we're in the middle of a reconnect, we'll be waiting here,
// until we can get a read lock, which is what we want.
let prod = self.producer.read().await;

if prod
.write(&msg, Some(route))
.await
.map_err(log_err(&metrics.errs))
.is_ok()
.is_err()
{
return;
}
// Drop the read lock. This thread is going to attempt to "promote" itself,
// and try to get a write lock to reconnect.
//
// This will also help allow a writer to grab the Producer if one's waiting
std::mem::drop(prod);

metrics.reconnects.log(1);
let Ok(p) = self.connect(prod).await.map_err(log_err(&metrics.errs)) else {
return;
};
loop {
let Ok(new_prod) = self.connect().await.map_err(log_err(&metrics.errs)) else {
continue;
};

match p
.write(&msg, Some(route))
.await
.map_err(log_err(&metrics.errs))
{
Ok(()) | Err(()) => (), // Type-level assertion that we consumed the error
if new_prod
.write(&msg, Some(route))
.await
.map_err(log_err(&metrics.errs))
.is_ok()
{
// All is well. Unravel this madness
break;
}
}
}
}

// Linear backoff maxxing out at 1000ms
fn get_retry_delay(tries: usize) -> u64 {
(tries * 100).min(1000) as u64
}
}
2 changes: 1 addition & 1 deletion crates/rabbitmq/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ consume-json = []
[dependencies]
clap = { version = "3.1.12", default-features = false, features = ["env", "std"], optional = true }
futures-util = "0.3.19"
lapin = "2.0.3"
lapin = "2.3.4"
log = "0.4.14"
rand = "0.8.5"
rmp-serde = "1.0.0-beta.2"
Expand Down
2 changes: 1 addition & 1 deletion crates/rabbitmq/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
missing_debug_implementations,
missing_copy_implementations
)]
#![warn(clippy::pedantic, clippy::cargo, missing_docs)]
#![warn(clippy::pedantic, clippy::cargo)]

pub extern crate lapin;

Expand Down
4 changes: 4 additions & 0 deletions crates/rabbitmq/src/producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,4 +74,8 @@ where

Ok(())
}

pub fn is_connected(&self) -> bool {
self.chan.status().connected()
}
}

0 comments on commit c8263d7

Please sign in to comment.