Skip to content

Commit

Permalink
safekeeper: check for non-consecutive writes in safekeeper.rs
Browse files Browse the repository at this point in the history
wal_storage.rs already checks this, but since this is a quite legit scenario
check it at safekeeper.rs (consensus level) as well.

ref #8212

This is a take 2; previous PR #8640 had been reverted because interplay
with another change broke test_last_log_term_switch.
  • Loading branch information
arssher committed Sep 3, 2024
1 parent 6d8572d commit 3427fd1
Show file tree
Hide file tree
Showing 3 changed files with 113 additions and 23 deletions.
126 changes: 103 additions & 23 deletions safekeeper/src/safekeeper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -875,6 +875,29 @@ where
return Ok(Some(AcceptorProposerMessage::AppendResponse(resp)));
}

// Disallow any non-sequential writes, which can result in gaps or
// overwrites. If we need to move the pointer, ProposerElected message
// should have truncated WAL first accordingly. Note that the first
// condition (WAL rewrite) is quite expected in real world; it happens
// when walproposer reconnects to safekeeper and writes some more data
// while first connection still gets some packets later. It might be
// better to not log this as error! above.
let write_lsn = self.wal_store.write_lsn();
if write_lsn > msg.h.begin_lsn {
bail!(
"append request rewrites WAL written before, write_lsn={}, msg lsn={}",
write_lsn,
msg.h.begin_lsn
);
}
if write_lsn < msg.h.begin_lsn && write_lsn != Lsn(0) {
bail!(
"append request creates gap in written WAL, write_lsn={}, msg lsn={}",
write_lsn,
msg.h.begin_lsn,
);
}

// Now we know that we are in the same term as the proposer,
// processing the message.

Expand Down Expand Up @@ -960,10 +983,7 @@ mod tests {
use postgres_ffi::{XLogSegNo, WAL_SEGMENT_SIZE};

use super::*;
use crate::{
state::{EvictionState, PersistedPeers, TimelinePersistentState},
wal_storage::Storage,
};
use crate::state::{EvictionState, PersistedPeers, TimelinePersistentState};
use std::{ops::Deref, str::FromStr, time::Instant};

// fake storage for tests
Expand Down Expand Up @@ -1005,6 +1025,10 @@ mod tests {

#[async_trait::async_trait]
impl wal_storage::Storage for DummyWalStore {
fn write_lsn(&self) -> Lsn {
self.lsn
}

fn flush_lsn(&self) -> Lsn {
self.lsn
}
Expand Down Expand Up @@ -1078,7 +1102,7 @@ mod tests {
let mut sk = SafeKeeper::new(TimelineState::new(storage), wal_store, NodeId(0)).unwrap();

let mut ar_hdr = AppendRequestHeader {
term: 1,
term: 2,
term_start_lsn: Lsn(3),
begin_lsn: Lsn(1),
end_lsn: Lsn(2),
Expand All @@ -1092,24 +1116,29 @@ mod tests {
};

let pem = ProposerElected {
term: 1,
start_streaming_at: Lsn(3),
term_history: TermHistory(vec![TermLsn {
term: 1,
lsn: Lsn(3),
}]),
timeline_start_lsn: Lsn(0),
term: 2,
start_streaming_at: Lsn(1),
term_history: TermHistory(vec![
TermLsn {
term: 1,
lsn: Lsn(1),
},
TermLsn {
term: 2,
lsn: Lsn(3),
},
]),
timeline_start_lsn: Lsn(1),
};
sk.process_msg(&ProposerAcceptorMessage::Elected(pem))
.await
.unwrap();

// check that AppendRequest before term_start_lsn doesn't switch last_log_term.
let resp = sk
.process_msg(&ProposerAcceptorMessage::AppendRequest(append_request))
.await;
assert!(resp.is_ok());
assert_eq!(sk.get_last_log_term(), 0);
sk.process_msg(&ProposerAcceptorMessage::AppendRequest(append_request))
.await
.unwrap();
assert_eq!(sk.get_last_log_term(), 1);

// but record at term_start_lsn does the switch
ar_hdr.begin_lsn = Lsn(2);
Expand All @@ -1118,12 +1147,63 @@ mod tests {
h: ar_hdr,
wal_data: Bytes::from_static(b"b"),
};
let resp = sk
.process_msg(&ProposerAcceptorMessage::AppendRequest(append_request))
.await;
assert!(resp.is_ok());
sk.wal_store.truncate_wal(Lsn(3)).await.unwrap(); // imitate the complete record at 3 %)
assert_eq!(sk.get_last_log_term(), 1);
sk.process_msg(&ProposerAcceptorMessage::AppendRequest(append_request))
.await
.unwrap();
assert_eq!(sk.get_last_log_term(), 2);
}

#[tokio::test]
async fn test_non_consecutive_write() {
let storage = InMemoryState {
persisted_state: test_sk_state(),
};
let wal_store = DummyWalStore { lsn: Lsn(0) };

let mut sk = SafeKeeper::new(TimelineState::new(storage), wal_store, NodeId(0)).unwrap();

let pem = ProposerElected {
term: 1,
start_streaming_at: Lsn(1),
term_history: TermHistory(vec![TermLsn {
term: 1,
lsn: Lsn(1),
}]),
timeline_start_lsn: Lsn(1),
};
sk.process_msg(&ProposerAcceptorMessage::Elected(pem))
.await
.unwrap();

let ar_hdr = AppendRequestHeader {
term: 1,
term_start_lsn: Lsn(3),
begin_lsn: Lsn(1),
end_lsn: Lsn(2),
commit_lsn: Lsn(0),
truncate_lsn: Lsn(0),
proposer_uuid: [0; 16],
};
let append_request = AppendRequest {
h: ar_hdr.clone(),
wal_data: Bytes::from_static(b"b"),
};

// do write ending at 2, it should be ok
sk.process_msg(&ProposerAcceptorMessage::AppendRequest(append_request))
.await
.unwrap();
let mut ar_hrd2 = ar_hdr.clone();
ar_hrd2.begin_lsn = Lsn(4);
ar_hrd2.end_lsn = Lsn(5);
let append_request = AppendRequest {
h: ar_hdr,
wal_data: Bytes::from_static(b"b"),
};
// and now starting at 4, it must fail
sk.process_msg(&ProposerAcceptorMessage::AppendRequest(append_request))
.await
.unwrap_err();
}

#[test]
Expand Down
6 changes: 6 additions & 0 deletions safekeeper/src/wal_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ use utils::{id::TenantTimelineId, lsn::Lsn};

#[async_trait::async_trait]
pub trait Storage {
// Last written LSN.
fn write_lsn(&self) -> Lsn;
/// LSN of last durably stored WAL record.
fn flush_lsn(&self) -> Lsn;

Expand Down Expand Up @@ -327,6 +329,10 @@ impl PhysicalStorage {

#[async_trait::async_trait]
impl Storage for PhysicalStorage {
// Last written LSN.
fn write_lsn(&self) -> Lsn {
self.write_lsn
}
/// flush_lsn returns LSN of last durably stored WAL record.
fn flush_lsn(&self) -> Lsn {
self.flush_record_lsn
Expand Down
4 changes: 4 additions & 0 deletions safekeeper/tests/walproposer_sim/safekeeper_disk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,10 @@ impl DiskWALStorage {

#[async_trait::async_trait]
impl wal_storage::Storage for DiskWALStorage {
// Last written LSN.
fn write_lsn(&self) -> Lsn {
self.write_lsn
}
/// LSN of last durably stored WAL record.
fn flush_lsn(&self) -> Lsn {
self.flush_record_lsn
Expand Down

0 comments on commit 3427fd1

Please sign in to comment.