diff --git a/safekeeper/src/receive_wal.rs b/safekeeper/src/receive_wal.rs index a0a96c6e99cc..2edcc4ef6f32 100644 --- a/safekeeper/src/receive_wal.rs +++ b/safekeeper/src/receive_wal.rs @@ -562,6 +562,9 @@ impl WalAcceptor { // Don't flush the WAL on every append, only periodically via flush_ticker. // This batches multiple appends per fsync. If the channel is empty after // sending the reply, we'll schedule an immediate flush. + // + // Note that a flush can still happen on segment bounds, which will result + // in an AppendResponse. if let ProposerAcceptorMessage::AppendRequest(append_request) = msg { msg = ProposerAcceptorMessage::NoFlushAppendRequest(append_request); dirty = true; diff --git a/safekeeper/src/safekeeper.rs b/safekeeper/src/safekeeper.rs index f4983d44d0fe..6eb69f0b7ce2 100644 --- a/safekeeper/src/safekeeper.rs +++ b/safekeeper/src/safekeeper.rs @@ -947,6 +947,7 @@ where // while first connection still gets some packets later. It might be // better to not log this as error! above. let write_lsn = self.wal_store.write_lsn(); + let flush_lsn = self.wal_store.flush_lsn(); if write_lsn > msg.h.begin_lsn { bail!( "append request rewrites WAL written before, write_lsn={}, msg lsn={}", @@ -1004,7 +1005,9 @@ where ); // If flush_lsn hasn't updated, AppendResponse is not very useful. - if !require_flush { + // This is the common case for !require_flush, but a flush can still + // happen on segment bounds. + if !require_flush && flush_lsn == self.flush_lsn() { return Ok(None); } diff --git a/safekeeper/src/wal_storage.rs b/safekeeper/src/wal_storage.rs index c3bb6cd12c26..e338d7073105 100644 --- a/safekeeper/src/wal_storage.rs +++ b/safekeeper/src/wal_storage.rs @@ -113,6 +113,13 @@ pub struct PhysicalStorage { /// non-aligned chunks of data. write_record_lsn: Lsn, + /// The last LSN flushed to disk. May be in the middle of a record. + /// + /// NB: when the rest of the system refers to `flush_lsn`, it usually + /// actually refers to `flush_record_lsn`. This ambiguity can be dangerous + /// and should be resolved. + flush_lsn: Lsn, + /// The LSN of the last WAL record flushed to disk. flush_record_lsn: Lsn, @@ -211,6 +218,7 @@ impl PhysicalStorage { system_id: state.server.system_id, write_lsn, write_record_lsn: write_lsn, + flush_lsn, flush_record_lsn: flush_lsn, decoder: WalStreamDecoder::new(write_lsn, state.server.pg_version / 10000), file: None, @@ -295,8 +303,9 @@ impl PhysicalStorage { } } - /// Write WAL bytes, which are known to be located in a single WAL segment. - async fn write_in_segment(&mut self, segno: u64, xlogoff: usize, buf: &[u8]) -> Result<()> { + /// Write WAL bytes, which are known to be located in a single WAL segment. Returns true if the + /// segment was completed, closed, and flushed to disk. + async fn write_in_segment(&mut self, segno: u64, xlogoff: usize, buf: &[u8]) -> Result { let mut file = if let Some(file) = self.file.take() { file } else { @@ -320,20 +329,24 @@ impl PhysicalStorage { let (wal_file_path, wal_file_partial_path) = wal_file_paths(&self.timeline_dir, segno, self.wal_seg_size); fs::rename(wal_file_partial_path, wal_file_path).await?; + Ok(true) } else { // otherwise, file can be reused later self.file = Some(file); + Ok(false) } - - Ok(()) } /// Writes WAL to the segment files, until everything is writed. If some segments /// are fully written, they are flushed to disk. The last (partial) segment can /// be flushed separately later. /// - /// Updates `write_lsn`. + /// Updates `write_lsn` and `flush_lsn`. async fn write_exact(&mut self, pos: Lsn, mut buf: &[u8]) -> Result<()> { + // TODO: this shouldn't be possible, except possibly with write_lsn == 0. + // Rename this method to `append_exact`, and make it append-only, removing + // the `pos` parameter and this check. For this reason, we don't update + // `flush_lsn` here. if self.write_lsn != pos { // need to flush the file before discarding it if let Some(file) = self.file.take() { @@ -355,9 +368,13 @@ impl PhysicalStorage { buf.len() }; - self.write_in_segment(segno, xlogoff, &buf[..bytes_write]) + let flushed = self + .write_in_segment(segno, xlogoff, &buf[..bytes_write]) .await?; self.write_lsn += bytes_write as u64; + if flushed { + self.flush_lsn = self.write_lsn; + } buf = &buf[bytes_write..]; } @@ -371,6 +388,9 @@ impl Storage for PhysicalStorage { self.write_lsn } /// flush_lsn returns LSN of last durably stored WAL record. + /// + /// TODO: flush_lsn() returns flush_record_lsn, but write_lsn() returns write_lsn: confusing. + #[allow(clippy::misnamed_getters)] fn flush_lsn(&self) -> Lsn { self.flush_record_lsn } @@ -424,8 +444,9 @@ impl Storage for PhysicalStorage { self.metrics.observe_write_seconds(write_seconds); self.metrics.observe_write_bytes(buf.len()); - // figure out last record's end lsn for reporting (if we got the - // whole record) + // Figure out the last record's end LSN and update `write_record_lsn` + // (if we got a whole record). The write may also have closed and + // flushed a segment, so update `flush_record_lsn` as well. if self.decoder.available() != startpos { info!( "restart decoder from {} to {}", @@ -436,12 +457,15 @@ impl Storage for PhysicalStorage { self.decoder = WalStreamDecoder::new(startpos, pg_version); } self.decoder.feed_bytes(buf); - loop { - match self.decoder.poll_decode()? { - None => break, // no full record yet - Some((lsn, _rec)) => { - self.write_record_lsn = lsn; - } + + if self.write_record_lsn <= self.flush_lsn { + // We may have flushed a previously written record. + self.flush_record_lsn = self.write_record_lsn; + } + while let Some((lsn, _rec)) = self.decoder.poll_decode()? { + self.write_record_lsn = lsn; + if lsn <= self.flush_lsn { + self.flush_record_lsn = lsn; } } @@ -458,19 +482,17 @@ impl Storage for PhysicalStorage { self.fdatasync_file(&unflushed_file).await?; self.file = Some(unflushed_file); } else { - // We have unflushed data (write_lsn != flush_lsn), but no file. - // This should only happen if last file was fully written and flushed, - // but haven't updated flush_lsn yet. - if self.write_lsn.segment_offset(self.wal_seg_size) != 0 { - bail!( - "unexpected unflushed data with no open file, write_lsn={}, flush_lsn={}", - self.write_lsn, - self.flush_record_lsn - ); - } + // We have unflushed data (write_lsn != flush_lsn), but no file. This + // shouldn't happen, since the segment is flushed on close. + bail!( + "unexpected unflushed data with no open file, write_lsn={}, flush_lsn={}", + self.write_lsn, + self.flush_record_lsn + ); } // everything is flushed now, let's update flush_lsn + self.flush_lsn = self.write_lsn; self.flush_record_lsn = self.write_record_lsn; Ok(()) } @@ -517,6 +539,7 @@ impl Storage for PhysicalStorage { self.pending_wal_truncation = true; self.write_lsn = end_pos; + self.flush_lsn = end_pos; self.write_record_lsn = end_pos; self.flush_record_lsn = end_pos;