diff --git a/rust/monovertex/src/forwarder.rs b/rust/monovertex/src/forwarder.rs index 1ba928123c..0b875dff6f 100644 --- a/rust/monovertex/src/forwarder.rs +++ b/rust/monovertex/src/forwarder.rs @@ -1,5 +1,12 @@ use std::collections::HashMap; +use chrono::Utc; +use tokio::task::JoinSet; +use tokio::time::sleep; +use tokio_util::sync::CancellationToken; +use tracing::log::warn; +use tracing::{debug, info}; + use crate::config::{config, OnFailureStrategy}; use crate::error::{Error, Result}; use crate::message::{Message, Offset}; @@ -8,12 +15,6 @@ use crate::metrics::forward_metrics; use crate::sink::{proto, SinkClient}; use crate::source::SourceClient; use crate::transformer::TransformerClient; -use chrono::Utc; -use tokio::task::JoinSet; -use tokio::time::sleep; -use tokio_util::sync::CancellationToken; -use tracing::log::warn; -use tracing::{debug, info}; /// Forwarder is responsible for reading messages from the source, applying transformation if /// transformer is present, writing the messages to the sink, and then acknowledging the messages @@ -223,50 +224,52 @@ impl Forwarder { // we will overwrite this vec with failed messages and will keep retrying. let mut messages_to_send = messages; + // check what is the failure strategy in the config + let strategy = config().sink_retry_on_fail_strategy.clone(); + // only breaks out of this loop based on the retry strategy unless all the messages have been written to sink // successfully. - loop { - while attempts < config().sink_max_retry_attempts { - let status = self - .write_to_sink_once(&mut error_map, &mut fallback_msgs, &mut messages_to_send) - .await; - match status { - Ok(true) => break, - Ok(false) => { - attempts += 1; - warn!( - "Retry attempt {} due to retryable error. Errors: {:?}", - attempts, error_map - ); - } - Err(e) => Err(e)?, - } - } - - // If after the retries we still have messages to process, handle the post retry failures - let need_retry = self.handle_sink_post_retry( - &mut attempts, - &mut error_map, - &mut fallback_msgs, - &mut messages_to_send, - ); - match need_retry { - // if we are done with the messages, break the loop - Ok(false) => break, - // if we need to retry, reset the attempts and error_map - Ok(true) => { - attempts = 0; - error_map.clear(); + while attempts < config().sink_max_retry_attempts || strategy == OnFailureStrategy::Retry { + let status = self + .write_to_sink_once(&mut error_map, &mut fallback_msgs, &mut messages_to_send) + .await; + match status { + Ok(true) => break, + Ok(false) => { + attempts += 1; + warn!( + "Retry attempt {} due to retryable error. Errors: {:?}", + attempts, error_map + ); } Err(e) => Err(e)?, } } + // If after the retries we still have messages to process, handle the post retry failures + let need_retry = handle_sink_post_retry( + &mut attempts, + &mut error_map, + &mut fallback_msgs, + &mut messages_to_send, + strategy, + self.common_labels.clone(), + ); + + // if we need to retry, return with error as we have exhausted the retries + if need_retry? { + return Err(Error::SinkError(format!( + "Failed to write messages to sink after {} attempts. Errors: {:?}", + attempts, error_map + ))); + } + // If there are fallback messages, write them to the fallback sink if !fallback_msgs.is_empty() { self.handle_fallback_messages(fallback_msgs).await?; } + // update the metric for the end to end time taken to write to the sink forward_metrics() .sink_time .get_or_create(&self.common_labels) @@ -281,58 +284,6 @@ impl Forwarder { Ok(()) } - /// Handles the post retry failures based on the configured strategy, - /// returns true if we need to retry, else false. - fn handle_sink_post_retry( - &mut self, - attempts: &mut u16, - error_map: &mut HashMap, - fallback_msgs: &mut Vec, - messages_to_send: &mut Vec, - ) -> Result { - // if we are done with the messages, break the loop - if messages_to_send.is_empty() { - return Ok(false); - } - // check what is the failure strategy in the config - let strategy = config().sink_retry_on_fail_strategy.clone(); - match strategy { - // if we need to retry, return true - OnFailureStrategy::Retry => { - warn!( - "Using onFailure Retry, Retry attempts {} completed", - attempts - ); - return Ok(true); - } - // if we need to drop the messages, log and return false - OnFailureStrategy::Drop => { - // log that we are dropping the messages as requested - warn!( - "Dropping messages after {} attempts. Errors: {:?}", - attempts, error_map - ); - // update the metrics - forward_metrics() - .dropped_total - .get_or_create(&self.common_labels) - .inc_by(messages_to_send.len() as u64); - } - // if we need to move the messages to the fallback, return false - OnFailureStrategy::Fallback => { - // log that we are moving the messages to the fallback as requested - warn!( - "Moving messages to fallback after {} attempts. Errors: {:?}", - attempts, error_map - ); - // move the messages to the fallback messages - fallback_msgs.append(messages_to_send); - } - } - // if we are done with the messages, break the loop - Ok(false) - } - /// Writes to sink once and will return true if successful, else false. Please note that it /// mutates is incoming fields. async fn write_to_sink_once( @@ -514,6 +465,57 @@ impl Forwarder { } } +/// Handles the post retry failures based on the configured strategy, +/// returns true if we need to retry, else false. +fn handle_sink_post_retry( + attempts: &mut u16, + error_map: &mut HashMap, + fallback_msgs: &mut Vec, + messages_to_send: &mut Vec, + strategy: OnFailureStrategy, + common_labels: Vec<(String, String)>, +) -> Result { + // if we are done with the messages, break the loop + if messages_to_send.is_empty() { + return Ok(false); + } + match strategy { + // if we need to retry, return true + OnFailureStrategy::Retry => { + warn!( + "Using onFailure Retry, Retry attempts {} completed", + attempts + ); + return Ok(true); + } + // if we need to drop the messages, log and return false + OnFailureStrategy::Drop => { + // log that we are dropping the messages as requested + warn!( + "Dropping messages after {} attempts. Errors: {:?}", + attempts, error_map + ); + // update the metrics + forward_metrics() + .dropped_total + .get_or_create(&common_labels) + .inc_by(messages_to_send.len() as u64); + } + // if we need to move the messages to the fallback, return false + OnFailureStrategy::Fallback => { + // log that we are moving the messages to the fallback as requested + warn!( + "Moving messages to fallback after {} attempts. Errors: {:?}", + attempts, error_map + ); + // move the messages to the fallback messages + fallback_msgs.append(messages_to_send); + } + } + // if we are done with the messages, break the loop + Ok(false) +} + #[cfg(test)] mod tests { use std::collections::HashSet; @@ -1038,3 +1040,114 @@ mod tests { .expect("failed to join fb sink server task"); } } + +#[cfg(test)] +mod tests_retry { + use super::*; + + impl Message { + // dummy method to test the retry logic + pub(crate) fn dummy() -> Self { + Self { + keys: vec![], + value: vec![], + offset: Offset { + offset: "".to_string(), + partition_id: 0, + }, + event_time: Utc::now(), + id: "".to_string(), + headers: HashMap::new(), + } + } + } + + #[test] + fn test_handle_sink_post_retry_empty_messages() { + let mut attempts = 0; + let mut error_map = HashMap::new(); + let mut fallback_msgs = vec![]; + let mut messages_to_send = vec![]; + let common_labels = vec![]; + + let result = handle_sink_post_retry( + &mut attempts, + &mut error_map, + &mut fallback_msgs, + &mut messages_to_send, + OnFailureStrategy::Retry, + common_labels, + ) + .unwrap(); + assert_eq!(result, false); + } + + #[test] + fn test_handle_sink_post_retry_retry() { + let mut attempts = 0; + let mut error_map = HashMap::new(); + let mut fallback_msgs = vec![]; + let mut messages_to_send = vec![Message::dummy()]; + + let result = handle_sink_post_retry( + &mut attempts, + &mut error_map, + &mut fallback_msgs, + &mut messages_to_send, + OnFailureStrategy::Retry, + vec![], + ) + .unwrap(); + assert_eq!(result, true); + } + + #[test] + fn test_handle_sink_post_retry_drop() { + let mut attempts = 0; + let mut error_map = HashMap::new(); + let mut fallback_msgs = vec![]; + let mut messages_to_send = vec![Message::dummy()]; + + // check the metric before the drop + let val = forward_metrics().dropped_total.get_or_create(&vec![]).get(); + assert_eq!(val, 0); + let result = handle_sink_post_retry( + &mut attempts, + &mut error_map, + &mut fallback_msgs, + &mut messages_to_send, + OnFailureStrategy::Drop, + vec![], + ) + .unwrap(); + assert_eq!(result, false); + // check if the metric is updated + let val = forward_metrics().dropped_total.get_or_create(&vec![]).get(); + assert_eq!(val, 1) + } + + #[test] + fn test_handle_sink_post_retry_fallback() { + let mut attempts = 0; + let mut error_map = HashMap::new(); + let mut fallback_msgs = vec![]; + let mut messages_to_send = vec![Message::dummy()]; + + // check if the fallback messages are updated + assert_eq!(fallback_msgs.len(), 0); + + let result = handle_sink_post_retry( + &mut attempts, + &mut error_map, + &mut fallback_msgs, + &mut messages_to_send, + OnFailureStrategy::Fallback, + vec![], + ) + .unwrap(); + assert_eq!(result, false); + assert!(messages_to_send.is_empty()); + // check if the fallback messages are updated + assert_eq!(fallback_msgs.len(), 1); + } +}