diff --git a/safekeeper/src/safekeeper.rs b/safekeeper/src/safekeeper.rs index 486954c7b9f4..dbe0034de29d 100644 --- a/safekeeper/src/safekeeper.rs +++ b/safekeeper/src/safekeeper.rs @@ -875,6 +875,29 @@ where return Ok(Some(AcceptorProposerMessage::AppendResponse(resp))); } + // Disallow any non-sequential writes, which can result in gaps or + // overwrites. If we need to move the pointer, ProposerElected message + // should have truncated WAL first accordingly. Note that the first + // condition (WAL rewrite) is quite expected in real world; it happens + // when walproposer reconnects to safekeeper and writes some more data + // while first connection still gets some packets later. It might be + // better to not log this as error! above. + let write_lsn = self.wal_store.write_lsn(); + if write_lsn > msg.h.begin_lsn { + bail!( + "append request rewrites WAL written before, write_lsn={}, msg lsn={}", + write_lsn, + msg.h.begin_lsn + ); + } + if write_lsn < msg.h.begin_lsn && write_lsn != Lsn(0) { + bail!( + "append request creates gap in written WAL, write_lsn={}, msg lsn={}", + write_lsn, + msg.h.begin_lsn, + ); + } + // Now we know that we are in the same term as the proposer, // processing the message. @@ -960,10 +983,7 @@ mod tests { use postgres_ffi::{XLogSegNo, WAL_SEGMENT_SIZE}; use super::*; - use crate::{ - state::{EvictionState, PersistedPeers, TimelinePersistentState}, - wal_storage::Storage, - }; + use crate::state::{EvictionState, PersistedPeers, TimelinePersistentState}; use std::{ops::Deref, str::FromStr, time::Instant}; // fake storage for tests @@ -1003,6 +1023,10 @@ mod tests { } impl wal_storage::Storage for DummyWalStore { + fn write_lsn(&self) -> Lsn { + self.lsn + } + fn flush_lsn(&self) -> Lsn { self.lsn } @@ -1076,7 +1100,7 @@ mod tests { let mut sk = SafeKeeper::new(TimelineState::new(storage), wal_store, NodeId(0)).unwrap(); let mut ar_hdr = AppendRequestHeader { - term: 1, + term: 2, term_start_lsn: Lsn(3), begin_lsn: Lsn(1), end_lsn: Lsn(2), @@ -1090,24 +1114,29 @@ mod tests { }; let pem = ProposerElected { - term: 1, - start_streaming_at: Lsn(3), - term_history: TermHistory(vec![TermLsn { - term: 1, - lsn: Lsn(3), - }]), - timeline_start_lsn: Lsn(0), + term: 2, + start_streaming_at: Lsn(1), + term_history: TermHistory(vec![ + TermLsn { + term: 1, + lsn: Lsn(1), + }, + TermLsn { + term: 2, + lsn: Lsn(3), + }, + ]), + timeline_start_lsn: Lsn(1), }; sk.process_msg(&ProposerAcceptorMessage::Elected(pem)) .await .unwrap(); // check that AppendRequest before term_start_lsn doesn't switch last_log_term. - let resp = sk - .process_msg(&ProposerAcceptorMessage::AppendRequest(append_request)) - .await; - assert!(resp.is_ok()); - assert_eq!(sk.get_last_log_term(), 0); + sk.process_msg(&ProposerAcceptorMessage::AppendRequest(append_request)) + .await + .unwrap(); + assert_eq!(sk.get_last_log_term(), 1); // but record at term_start_lsn does the switch ar_hdr.begin_lsn = Lsn(2); @@ -1116,12 +1145,63 @@ mod tests { h: ar_hdr, wal_data: Bytes::from_static(b"b"), }; - let resp = sk - .process_msg(&ProposerAcceptorMessage::AppendRequest(append_request)) - .await; - assert!(resp.is_ok()); - sk.wal_store.truncate_wal(Lsn(3)).await.unwrap(); // imitate the complete record at 3 %) - assert_eq!(sk.get_last_log_term(), 1); + sk.process_msg(&ProposerAcceptorMessage::AppendRequest(append_request)) + .await + .unwrap(); + assert_eq!(sk.get_last_log_term(), 2); + } + + #[tokio::test] + async fn test_non_consecutive_write() { + let storage = InMemoryState { + persisted_state: test_sk_state(), + }; + let wal_store = DummyWalStore { lsn: Lsn(0) }; + + let mut sk = SafeKeeper::new(TimelineState::new(storage), wal_store, NodeId(0)).unwrap(); + + let pem = ProposerElected { + term: 1, + start_streaming_at: Lsn(1), + term_history: TermHistory(vec![TermLsn { + term: 1, + lsn: Lsn(1), + }]), + timeline_start_lsn: Lsn(1), + }; + sk.process_msg(&ProposerAcceptorMessage::Elected(pem)) + .await + .unwrap(); + + let ar_hdr = AppendRequestHeader { + term: 1, + term_start_lsn: Lsn(3), + begin_lsn: Lsn(1), + end_lsn: Lsn(2), + commit_lsn: Lsn(0), + truncate_lsn: Lsn(0), + proposer_uuid: [0; 16], + }; + let append_request = AppendRequest { + h: ar_hdr.clone(), + wal_data: Bytes::from_static(b"b"), + }; + + // do write ending at 2, it should be ok + sk.process_msg(&ProposerAcceptorMessage::AppendRequest(append_request)) + .await + .unwrap(); + let mut ar_hrd2 = ar_hdr.clone(); + ar_hrd2.begin_lsn = Lsn(4); + ar_hrd2.end_lsn = Lsn(5); + let append_request = AppendRequest { + h: ar_hdr, + wal_data: Bytes::from_static(b"b"), + }; + // and now starting at 4, it must fail + sk.process_msg(&ProposerAcceptorMessage::AppendRequest(append_request)) + .await + .unwrap_err(); } #[test] diff --git a/safekeeper/src/wal_storage.rs b/safekeeper/src/wal_storage.rs index 6fd7c91a6857..89c2e98a946b 100644 --- a/safekeeper/src/wal_storage.rs +++ b/safekeeper/src/wal_storage.rs @@ -37,6 +37,8 @@ use pq_proto::SystemId; use utils::{id::TenantTimelineId, lsn::Lsn}; pub trait Storage { + // Last written LSN. + fn write_lsn(&self) -> Lsn; /// LSN of last durably stored WAL record. fn flush_lsn(&self) -> Lsn; @@ -329,6 +331,10 @@ impl PhysicalStorage { } impl Storage for PhysicalStorage { + // Last written LSN. + fn write_lsn(&self) -> Lsn { + self.write_lsn + } /// flush_lsn returns LSN of last durably stored WAL record. fn flush_lsn(&self) -> Lsn { self.flush_record_lsn diff --git a/safekeeper/tests/walproposer_sim/safekeeper_disk.rs b/safekeeper/tests/walproposer_sim/safekeeper_disk.rs index 6b31edb1f2e0..b854754ecfb3 100644 --- a/safekeeper/tests/walproposer_sim/safekeeper_disk.rs +++ b/safekeeper/tests/walproposer_sim/safekeeper_disk.rs @@ -175,6 +175,10 @@ impl DiskWALStorage { } impl wal_storage::Storage for DiskWALStorage { + // Last written LSN. + fn write_lsn(&self) -> Lsn { + self.write_lsn + } /// LSN of last durably stored WAL record. fn flush_lsn(&self) -> Lsn { self.flush_record_lsn