diff --git a/safekeeper/src/receive_wal.rs b/safekeeper/src/receive_wal.rs index f97e127a1724..2410e22f450f 100644 --- a/safekeeper/src/receive_wal.rs +++ b/safekeeper/src/receive_wal.rs @@ -26,10 +26,11 @@ use std::net::SocketAddr; use std::sync::Arc; use tokio::io::AsyncRead; use tokio::io::AsyncWrite; +use tokio::sync::mpsc::error::SendTimeoutError; use tokio::sync::mpsc::{channel, Receiver, Sender}; use tokio::task; use tokio::task::JoinHandle; -use tokio::time::{Duration, MissedTickBehavior}; +use tokio::time::{Duration, Instant, MissedTickBehavior}; use tracing::*; use utils::id::TenantTimelineId; use utils::lsn::Lsn; @@ -384,9 +385,29 @@ async fn read_network_loop( msg_tx: Sender, mut next_msg: ProposerAcceptorMessage, ) -> Result<(), CopyStreamHandlerEnd> { + /// Threshold for logging slow WalAcceptor sends. + const SLOW_THRESHOLD: Duration = Duration::from_secs(5); + loop { - if msg_tx.send(next_msg).await.is_err() { - return Ok(()); // chan closed, WalAcceptor terminated + let started = Instant::now(); + match msg_tx.send_timeout(next_msg, SLOW_THRESHOLD).await { + Ok(()) => {} + // Slow send, log a message and keep trying. Log context has timeline ID. + Err(SendTimeoutError::Timeout(next_msg)) => { + warn!( + "slow WalAcceptor send blocked for {:.3}s", + Instant::now().duration_since(started).as_secs_f64() + ); + if msg_tx.send(next_msg).await.is_err() { + return Ok(()); // WalAcceptor terminated + } + warn!( + "slow WalAcceptor send completed after {:.3}s", + Instant::now().duration_since(started).as_secs_f64() + ) + } + // WalAcceptor terminated. + Err(SendTimeoutError::Closed(_)) => return Ok(()), } next_msg = read_message(pgb_reader).await?; }