Skip to content

Commit

Permalink
safekeeper: log slow WalAcceptor sends (#9564)
Browse files Browse the repository at this point in the history
## Problem

We don't have any observability into full WalAcceptor queues per
timeline.

## Summary of changes

Logs a message when a WalAcceptor send has blocked for 5 seconds, and
another message when the send completes. This implies that the log
frequency is at most once every 5 seconds per timeline, so we don't need
further throttling.
  • Loading branch information
erikgrinaker authored Nov 1, 2024
1 parent 8b3bcf7 commit 123816e
Showing 1 changed file with 24 additions and 3 deletions.
27 changes: 24 additions & 3 deletions safekeeper/src/receive_wal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,11 @@ use std::net::SocketAddr;
use std::sync::Arc;
use tokio::io::AsyncRead;
use tokio::io::AsyncWrite;
use tokio::sync::mpsc::error::SendTimeoutError;
use tokio::sync::mpsc::{channel, Receiver, Sender};
use tokio::task;
use tokio::task::JoinHandle;
use tokio::time::{Duration, MissedTickBehavior};
use tokio::time::{Duration, Instant, MissedTickBehavior};
use tracing::*;
use utils::id::TenantTimelineId;
use utils::lsn::Lsn;
Expand Down Expand Up @@ -384,9 +385,29 @@ async fn read_network_loop<IO: AsyncRead + AsyncWrite + Unpin>(
msg_tx: Sender<ProposerAcceptorMessage>,
mut next_msg: ProposerAcceptorMessage,
) -> Result<(), CopyStreamHandlerEnd> {
/// Threshold for logging slow WalAcceptor sends.
const SLOW_THRESHOLD: Duration = Duration::from_secs(5);

loop {
if msg_tx.send(next_msg).await.is_err() {
return Ok(()); // chan closed, WalAcceptor terminated
let started = Instant::now();
match msg_tx.send_timeout(next_msg, SLOW_THRESHOLD).await {
Ok(()) => {}
// Slow send, log a message and keep trying. Log context has timeline ID.
Err(SendTimeoutError::Timeout(next_msg)) => {
warn!(
"slow WalAcceptor send blocked for {:.3}s",
Instant::now().duration_since(started).as_secs_f64()
);
if msg_tx.send(next_msg).await.is_err() {
return Ok(()); // WalAcceptor terminated
}
warn!(
"slow WalAcceptor send completed after {:.3}s",
Instant::now().duration_since(started).as_secs_f64()
)
}
// WalAcceptor terminated.
Err(SendTimeoutError::Closed(_)) => return Ok(()),
}
next_msg = read_message(pgb_reader).await?;
}
Expand Down

1 comment on commit 123816e

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

5416 tests run: 5184 passed, 0 failed, 232 skipped (full report)


Flaky tests (1)

Postgres 17

Code coverage* (full report)

  • functions: 31.5% (7771 of 24690 functions)
  • lines: 48.9% (61030 of 124683 lines)

* collected from Rust tests only


The comment gets automatically updated with the latest test results
123816e at 2024-11-01T14:27:55.617Z :recycle:

Please sign in to comment.