diff --git a/src/config.rs b/src/config.rs index cbc7d9a..9c019a5 100644 --- a/src/config.rs +++ b/src/config.rs @@ -28,37 +28,22 @@ use { }; /// Plugin config. -#[derive(Deserialize)] +#[derive(Debug, Deserialize)] +#[serde(deny_unknown_fields)] pub struct Config { + #[allow(dead_code)] + libpath: String, + /// Kafka config. pub kafka: HashMap, + /// Graceful shutdown timeout. #[serde(default)] pub shutdown_timeout_ms: u64, - /// Kafka topic to send account updates to. - #[serde(default)] - pub update_account_topic: String, - /// Kafka topic to send slot status updates to. - #[serde(default)] - pub slot_status_topic: String, - /// Kafka topic to send transaction to. - #[serde(default)] - pub transaction_topic: String, - /// List of programs to ignore. - #[serde(default)] - pub program_ignores: Vec, - /// List of programs to include - #[serde(default)] - pub program_filters: Vec, - // List of accounts to include - #[serde(default)] - pub account_filters: Vec, - /// Publish all accounts on startup. - #[serde(default)] - pub publish_all_accounts: bool, - /// Wrap all event message in a single message type. - #[serde(default)] - pub wrap_messages: bool, + + /// Accounts, transactions filters + pub filters: Vec, + /// Prometheus endpoint. #[serde(default)] pub prometheus: Option, @@ -67,16 +52,10 @@ pub struct Config { impl Default for Config { fn default() -> Self { Self { + libpath: "".to_owned(), kafka: HashMap::new(), shutdown_timeout_ms: 30_000, - update_account_topic: "".to_owned(), - slot_status_topic: "".to_owned(), - transaction_topic: "".to_owned(), - program_ignores: Vec::new(), - program_filters: Vec::new(), - account_filters: Vec::new(), - publish_all_accounts: false, - wrap_messages: false, + filters: vec![], prometheus: None, } } @@ -119,4 +98,47 @@ impl Config { } } +/// Plugin config. +#[derive(Debug, Deserialize)] +#[serde(deny_unknown_fields, default)] +pub struct ConfigFilter { + /// Kafka topic to send account updates to. + pub update_account_topic: String, + /// Kafka topic to send slot status updates to. + pub slot_status_topic: String, + /// Kafka topic to send transaction to. + pub transaction_topic: String, + /// List of programs to ignore. + pub program_ignores: Vec, + /// List of programs to include + pub program_filters: Vec, + // List of accounts to include + pub account_filters: Vec, + /// Publish all accounts on startup. + pub publish_all_accounts: bool, + /// Publish vote transactions. + pub include_vote_transactions: bool, + /// Publish failed transactions. + pub include_failed_transactions: bool, + /// Wrap all event message in a single message type. + pub wrap_messages: bool, +} + +impl Default for ConfigFilter { + fn default() -> Self { + Self { + update_account_topic: "".to_owned(), + slot_status_topic: "".to_owned(), + transaction_topic: "".to_owned(), + program_ignores: Vec::new(), + program_filters: Vec::new(), + account_filters: Vec::new(), + publish_all_accounts: false, + include_vote_transactions: true, + include_failed_transactions: true, + wrap_messages: false, + } + } +} + pub type Producer = ThreadedProducer; diff --git a/src/filter.rs b/src/filter.rs index 11d43bf..c843a74 100644 --- a/src/filter.rs +++ b/src/filter.rs @@ -13,20 +13,30 @@ // limitations under the License. use { - crate::Config, + crate::ConfigFilter, solana_program::pubkey::Pubkey, std::{collections::HashSet, str::FromStr}, }; pub struct Filter { - program_ignores: HashSet<[u8; 32]>, - program_filters: HashSet<[u8; 32]>, - account_filters: HashSet<[u8; 32]>, + pub publish_all_accounts: bool, + pub program_ignores: HashSet<[u8; 32]>, + pub program_filters: HashSet<[u8; 32]>, + pub account_filters: HashSet<[u8; 32]>, + pub include_vote_transactions: bool, + pub include_failed_transactions: bool, + + pub update_account_topic: String, + pub slot_status_topic: String, + pub transaction_topic: String, + + pub wrap_messages: bool, } impl Filter { - pub fn new(config: &Config) -> Self { + pub fn new(config: &ConfigFilter) -> Self { Self { + publish_all_accounts: config.publish_all_accounts, program_ignores: config .program_ignores .iter() @@ -42,6 +52,14 @@ impl Filter { .iter() .flat_map(|p| Pubkey::from_str(p).ok().map(|p| p.to_bytes())) .collect(), + include_vote_transactions: config.include_vote_transactions, + include_failed_transactions: config.include_failed_transactions, + + update_account_topic: config.update_account_topic.clone(), + slot_status_topic: config.slot_status_topic.clone(), + transaction_topic: config.transaction_topic.clone(), + + wrap_messages: config.wrap_messages, } } @@ -61,25 +79,33 @@ impl Filter { Err(_error) => true, } } + + pub fn wants_vote_tx(&self) -> bool { + self.include_vote_transactions + } + + pub fn wants_failed_tx(&self) -> bool { + self.include_failed_transactions + } } #[cfg(test)] mod tests { use { - crate::{Config, Filter}, + crate::{ConfigFilter, Filter}, solana_program::pubkey::Pubkey, std::str::FromStr, }; #[test] fn test_filter() { - let config = Config { + let config = ConfigFilter { program_ignores: vec![ "Sysvar1111111111111111111111111111111111111".to_owned(), "Vote111111111111111111111111111111111111111".to_owned(), ], program_filters: vec!["9xQeWvG816bUx9EPjHmaT23yvVM2ZWbrrpZb9PusVFin".to_owned()], - ..Config::default() + ..Default::default() }; let filter = Filter::new(&config); @@ -99,13 +125,13 @@ mod tests { #[test] fn test_owner_filter() { - let config = Config { + let config = ConfigFilter { program_ignores: vec![ "Sysvar1111111111111111111111111111111111111".to_owned(), "Vote111111111111111111111111111111111111111".to_owned(), ], program_filters: vec!["9xQeWvG816bUx9EPjHmaT23yvVM2ZWbrrpZb9PusVFin".to_owned()], - ..Config::default() + ..Default::default() }; let filter = Filter::new(&config); @@ -131,10 +157,10 @@ mod tests { #[test] fn test_account_filter() { - let config = Config { + let config = ConfigFilter { program_filters: vec!["9xQeWvG816bUx9EPjHmaT23yvVM2ZWbrrpZb9PusVFin".to_owned()], account_filters: vec!["5KKsLVU6TcbVDK4BS6K1DGDxnh4Q9xjYJ8XaDCG5t8ht".to_owned()], - ..Config::default() + ..Default::default() }; let filter = Filter::new(&config); diff --git a/src/lib.rs b/src/lib.rs index 551e5c9..d5c1223 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -23,7 +23,7 @@ mod publisher; mod version; pub use { - config::{Config, Producer}, + config::{Config, ConfigFilter, Producer}, event::*, filter::Filter, plugin::KafkaPlugin, diff --git a/src/plugin.rs b/src/plugin.rs index 58ef6b0..f8dd696 100644 --- a/src/plugin.rs +++ b/src/plugin.rs @@ -35,8 +35,7 @@ use { #[derive(Default)] pub struct KafkaPlugin { publisher: Option, - filter: Option, - publish_all_accounts: bool, + filter: Option>, prometheus: Option, } @@ -63,7 +62,6 @@ impl GeyserPlugin for KafkaPlugin { config_file ); let config = Config::read_from(config_file)?; - self.publish_all_accounts = config.publish_all_accounts; let (version_n, version_s) = get_rdkafka_version(); info!("rd_kafka_version: {:#08x}, {}", version_n, version_s); @@ -79,7 +77,7 @@ impl GeyserPlugin for KafkaPlugin { .create_prometheus() .map_err(|error| PluginError::Custom(Box::new(error)))?; self.publisher = Some(publisher); - self.filter = Some(Filter::new(&config)); + self.filter = Some(config.filters.iter().map(Filter::new).collect()); self.prometheus = prometheus; info!("Spawned producer"); @@ -100,33 +98,39 @@ impl GeyserPlugin for KafkaPlugin { slot: u64, is_startup: bool, ) -> PluginResult<()> { - if is_startup && !self.publish_all_accounts { + let filters = self.unwrap_filters(); + if is_startup && filters.iter().all(|filter| !filter.publish_all_accounts) { return Ok(()); } let info = Self::unwrap_update_account(account); - let filter = self.unwrap_filter(); - if !filter.wants_program(info.owner) && !filter.wants_account(info.pubkey) { - Self::log_ignore_account_update(info); - return Ok(()); + let publisher = self.unwrap_publisher(); + for filter in filters { + if !filter.update_account_topic.is_empty() { + if !filter.wants_program(info.owner) && !filter.wants_account(info.pubkey) { + Self::log_ignore_account_update(info); + continue; + } + + let event = UpdateAccountEvent { + slot, + pubkey: info.pubkey.to_vec(), + lamports: info.lamports, + owner: info.owner.to_vec(), + executable: info.executable, + rent_epoch: info.rent_epoch, + data: info.data.to_vec(), + write_version: info.write_version, + txn_signature: info.txn.map(|v| v.signature().as_ref().to_owned()), + }; + + publisher + .update_account(event, filter.wrap_messages, &filter.update_account_topic) + .map_err(|e| PluginError::AccountsUpdateError { msg: e.to_string() })?; + } } - let event = UpdateAccountEvent { - slot, - pubkey: info.pubkey.to_vec(), - lamports: info.lamports, - owner: info.owner.to_vec(), - executable: info.executable, - rent_epoch: info.rent_epoch, - data: info.data.to_vec(), - write_version: info.write_version, - txn_signature: info.txn.map(|v| v.signature().as_ref().to_owned()), - }; - - let publisher = self.unwrap_publisher(); - publisher - .update_account(event) - .map_err(|e| PluginError::AccountsUpdateError { msg: e.to_string() }) + Ok(()) } fn update_slot_status( @@ -136,19 +140,21 @@ impl GeyserPlugin for KafkaPlugin { status: PluginSlotStatus, ) -> PluginResult<()> { let publisher = self.unwrap_publisher(); - if !publisher.wants_slot_status() { - return Ok(()); + for filter in self.unwrap_filters() { + if !filter.slot_status_topic.is_empty() { + let event = SlotStatusEvent { + slot, + parent: parent.unwrap_or(0), + status: SlotStatus::from(status).into(), + }; + + publisher + .update_slot_status(event, filter.wrap_messages, &filter.slot_status_topic) + .map_err(|e| PluginError::AccountsUpdateError { msg: e.to_string() })?; + } } - let event = SlotStatusEvent { - slot, - parent: parent.unwrap_or(0), - status: SlotStatus::from(status).into(), - }; - - publisher - .update_slot_status(event) - .map_err(|e| PluginError::SlotStatusUpdateError { msg: e.to_string() }) + Ok(()) } fn notify_transaction( @@ -156,43 +162,54 @@ impl GeyserPlugin for KafkaPlugin { transaction: ReplicaTransactionInfoVersions, slot: u64, ) -> PluginResult<()> { - let publisher = self.unwrap_publisher(); - if !publisher.wants_transaction() { - return Ok(()); - } - - let filter = self.unwrap_filter(); let info = Self::unwrap_transaction(transaction); - - let maybe_ignored = info - .transaction - .message() - .account_keys() - .iter() - .find(|pubkey| { - !(filter.wants_program(pubkey.as_ref()) || filter.wants_account(pubkey.as_ref())) - }); - if let Some(ignored) = maybe_ignored { - debug!( - "Ignoring transaction {:?} due to account key: {:?}", - info.signature, ignored - ); - return Ok(()); + let publisher = self.unwrap_publisher(); + for filter in self.unwrap_filters() { + if !filter.transaction_topic.is_empty() { + let is_failed = info.transaction_status_meta.status.is_err(); + if (!filter.wants_vote_tx() && info.is_vote) + || (!filter.wants_failed_tx() && is_failed) + { + debug!("Ignoring vote/failed transaction"); + continue; + } + + if !info + .transaction + .message() + .account_keys() + .iter() + .any(|pubkey| { + filter.wants_program(pubkey.as_ref()) + || filter.wants_account(pubkey.as_ref()) + }) + { + debug!("Ignoring transaction {:?}", info.signature); + continue; + } + + let event = Self::build_transaction_event(slot, info); + publisher + .update_transaction(event, filter.wrap_messages, &filter.transaction_topic) + .map_err(|e| PluginError::TransactionUpdateError { msg: e.to_string() })?; + } } - let event = Self::build_transaction_event(slot, info); - - publisher - .update_transaction(event) - .map_err(|e| PluginError::TransactionUpdateError { msg: e.to_string() }) + Ok(()) } fn account_data_notifications_enabled(&self) -> bool { - self.unwrap_publisher().wants_update_account() + let filters = self.unwrap_filters(); + filters + .iter() + .any(|filter| !filter.update_account_topic.is_empty()) } fn transaction_notifications_enabled(&self) -> bool { - self.unwrap_publisher().wants_transaction() + let filters = self.unwrap_filters(); + filters + .iter() + .any(|filter| !filter.transaction_topic.is_empty()) } } @@ -205,7 +222,7 @@ impl KafkaPlugin { self.publisher.as_ref().expect("publisher is unavailable") } - fn unwrap_filter(&self) -> &Filter { + fn unwrap_filters(&self) -> &Vec { self.filter.as_ref().expect("filter is unavailable") } diff --git a/src/publisher.rs b/src/publisher.rs index 6cf26e9..41b587c 100644 --- a/src/publisher.rs +++ b/src/publisher.rs @@ -32,12 +32,6 @@ use { pub struct Publisher { producer: ThreadedProducer, shutdown_timeout: Duration, - - update_account_topic: String, - slot_status_topic: String, - transaction_topic: String, - - wrap_messages: bool, } impl Publisher { @@ -45,24 +39,23 @@ impl Publisher { Self { producer, shutdown_timeout: Duration::from_millis(config.shutdown_timeout_ms), - update_account_topic: config.update_account_topic.clone(), - slot_status_topic: config.slot_status_topic.clone(), - transaction_topic: config.transaction_topic.clone(), - wrap_messages: config.wrap_messages, } } - pub fn update_account(&self, ev: UpdateAccountEvent) -> Result<(), KafkaError> { + pub fn update_account( + &self, + ev: UpdateAccountEvent, + wrap_messages: bool, + topic: &str, + ) -> Result<(), KafkaError> { let temp_key; - let (key, buf) = if self.wrap_messages { + let (key, buf) = if wrap_messages { temp_key = self.copy_and_prepend(ev.pubkey.as_slice(), 65u8); (&temp_key, Self::encode_with_wrapper(Account(Box::new(ev)))) } else { (&ev.pubkey, ev.encode_to_vec()) }; - let record = BaseRecord::, _>::to(&self.update_account_topic) - .key(key) - .payload(&buf); + let record = BaseRecord::, _>::to(topic).key(key).payload(&buf); let result = self.producer.send(record).map(|_| ()).map_err(|(e, _)| e); UPLOAD_ACCOUNTS_TOTAL .with_label_values(&[if result.is_ok() { "success" } else { "failed" }]) @@ -70,18 +63,21 @@ impl Publisher { result } - pub fn update_slot_status(&self, ev: SlotStatusEvent) -> Result<(), KafkaError> { + pub fn update_slot_status( + &self, + ev: SlotStatusEvent, + wrap_messages: bool, + topic: &str, + ) -> Result<(), KafkaError> { let temp_key; - let (key, buf) = if self.wrap_messages { + let (key, buf) = if wrap_messages { temp_key = self.copy_and_prepend(&ev.slot.to_le_bytes(), 83u8); (&temp_key, Self::encode_with_wrapper(Slot(Box::new(ev)))) } else { temp_key = ev.slot.to_le_bytes().to_vec(); (&temp_key, ev.encode_to_vec()) }; - let record = BaseRecord::, _>::to(&self.slot_status_topic) - .key(key) - .payload(&buf); + let record = BaseRecord::, _>::to(topic).key(key).payload(&buf); let result = self.producer.send(record).map(|_| ()).map_err(|(e, _)| e); UPLOAD_SLOTS_TOTAL .with_label_values(&[if result.is_ok() { "success" } else { "failed" }]) @@ -89,9 +85,14 @@ impl Publisher { result } - pub fn update_transaction(&self, ev: TransactionEvent) -> Result<(), KafkaError> { + pub fn update_transaction( + &self, + ev: TransactionEvent, + wrap_messages: bool, + topic: &str, + ) -> Result<(), KafkaError> { let temp_key; - let (key, buf) = if self.wrap_messages { + let (key, buf) = if wrap_messages { temp_key = self.copy_and_prepend(ev.signature.as_slice(), 84u8); ( &temp_key, @@ -100,9 +101,7 @@ impl Publisher { } else { (&ev.signature, ev.encode_to_vec()) }; - let record = BaseRecord::, _>::to(&self.transaction_topic) - .key(key) - .payload(&buf); + let record = BaseRecord::, _>::to(topic).key(key).payload(&buf); let result = self.producer.send(record).map(|_| ()).map_err(|(e, _)| e); UPLOAD_TRANSACTIONS_TOTAL .with_label_values(&[if result.is_ok() { "success" } else { "failed" }]) @@ -110,18 +109,6 @@ impl Publisher { result } - pub fn wants_update_account(&self) -> bool { - !self.update_account_topic.is_empty() - } - - pub fn wants_slot_status(&self) -> bool { - !self.slot_status_topic.is_empty() - } - - pub fn wants_transaction(&self) -> bool { - !self.transaction_topic.is_empty() - } - fn encode_with_wrapper(message: EventMessage) -> Vec { MessageWrapper { event_message: Some(message),