From 66fa176cc8ff78cfc1464d1affababaa604edecf Mon Sep 17 00:00:00 2001 From: Konstantin Knizhnik Date: Fri, 15 Sep 2023 17:47:29 +0300 Subject: [PATCH] Handle update of VM in XLOG_HEAP_LOCK/XLOG_HEAP2_LOCK_UPDATED WAL records (#4896) ## Problem VM should be updated if XLH_LOCK_ALL_FROZEN_CLEARED flags is set in XLOG_HEAP_LOCK,XLOG_HEAP_2_LOCK_UPDATED WAL records ## Summary of changes Add handling of this records in walingest.rs ## Checklist before requesting a review - [ ] I have performed a self-review of my code. - [ ] If it is a core feature, I have added thorough tests. - [ ] Do we need to implement analytics? if so did you add the relevant metrics to the dashboard? - [ ] If this PR requires public announcement, mark it with /release-notes label and add several sentences in this section. ## Checklist before merging - [ ] Do not forget to reformat commit message to not include the above checklist --------- Co-authored-by: Konstantin Knizhnik --- libs/postgres_ffi/src/pg_constants.rs | 3 + pageserver/src/walingest.rs | 60 +++++++++++++--- pageserver/src/walrecord.rs | 94 +++++++++++++++++++++++- test_runner/regress/test_vm_bits.py | 100 ++++++++++++++++++++++++++ 4 files changed, 244 insertions(+), 13 deletions(-) diff --git a/libs/postgres_ffi/src/pg_constants.rs b/libs/postgres_ffi/src/pg_constants.rs index 1d196c3fe764..9690dc0eb662 100644 --- a/libs/postgres_ffi/src/pg_constants.rs +++ b/libs/postgres_ffi/src/pg_constants.rs @@ -137,9 +137,12 @@ pub const XLOG_HEAP_INSERT: u8 = 0x00; pub const XLOG_HEAP_DELETE: u8 = 0x10; pub const XLOG_HEAP_UPDATE: u8 = 0x20; pub const XLOG_HEAP_HOT_UPDATE: u8 = 0x40; +pub const XLOG_HEAP_LOCK: u8 = 0x60; pub const XLOG_HEAP_INIT_PAGE: u8 = 0x80; pub const XLOG_HEAP2_VISIBLE: u8 = 0x40; pub const XLOG_HEAP2_MULTI_INSERT: u8 = 0x50; +pub const XLOG_HEAP2_LOCK_UPDATED: u8 = 0x60; +pub const XLH_LOCK_ALL_FROZEN_CLEARED: u8 = 0x01; pub const XLH_INSERT_ALL_FROZEN_SET: u8 = (1 << 5) as u8; pub const XLH_INSERT_ALL_VISIBLE_CLEARED: u8 = (1 << 0) as u8; pub const XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED: u8 = (1 << 0) as u8; diff --git a/pageserver/src/walingest.rs b/pageserver/src/walingest.rs index d5ef1f2a24b1..e293fcc81b6e 100644 --- a/pageserver/src/walingest.rs +++ b/pageserver/src/walingest.rs @@ -444,6 +444,7 @@ impl<'a> WalIngest<'a> { // need to clear the corresponding bits in the visibility map. let mut new_heap_blkno: Option = None; let mut old_heap_blkno: Option = None; + let mut flags = pg_constants::VISIBILITYMAP_VALID_BITS; match self.timeline.pg_version { 14 => { @@ -479,6 +480,12 @@ impl<'a> WalIngest<'a> { // set. new_heap_blkno = Some(decoded.blocks[0].blkno); } + } else if info == pg_constants::XLOG_HEAP_LOCK { + let xlrec = v14::XlHeapLock::decode(buf); + if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 { + old_heap_blkno = Some(decoded.blocks[0].blkno); + flags = pg_constants::VISIBILITYMAP_ALL_FROZEN; + } } } else if decoded.xl_rmid == pg_constants::RM_HEAP2_ID { let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK; @@ -497,6 +504,12 @@ impl<'a> WalIngest<'a> { if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 { new_heap_blkno = Some(decoded.blocks[0].blkno); } + } else if info == pg_constants::XLOG_HEAP2_LOCK_UPDATED { + let xlrec = v14::XlHeapLockUpdated::decode(buf); + if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 { + old_heap_blkno = Some(decoded.blocks[0].blkno); + flags = pg_constants::VISIBILITYMAP_ALL_FROZEN; + } } } else { bail!("Unknown RMGR {} for Heap decoding", decoded.xl_rmid); @@ -535,6 +548,12 @@ impl<'a> WalIngest<'a> { // set. new_heap_blkno = Some(decoded.blocks[0].blkno); } + } else if info == pg_constants::XLOG_HEAP_LOCK { + let xlrec = v15::XlHeapLock::decode(buf); + if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 { + old_heap_blkno = Some(decoded.blocks[0].blkno); + flags = pg_constants::VISIBILITYMAP_ALL_FROZEN; + } } } else if decoded.xl_rmid == pg_constants::RM_HEAP2_ID { let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK; @@ -553,6 +572,12 @@ impl<'a> WalIngest<'a> { if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 { new_heap_blkno = Some(decoded.blocks[0].blkno); } + } else if info == pg_constants::XLOG_HEAP2_LOCK_UPDATED { + let xlrec = v15::XlHeapLockUpdated::decode(buf); + if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 { + old_heap_blkno = Some(decoded.blocks[0].blkno); + flags = pg_constants::VISIBILITYMAP_ALL_FROZEN; + } } } else { bail!("Unknown RMGR {} for Heap decoding", decoded.xl_rmid); @@ -591,6 +616,12 @@ impl<'a> WalIngest<'a> { // set. new_heap_blkno = Some(decoded.blocks[0].blkno); } + } else if info == pg_constants::XLOG_HEAP_LOCK { + let xlrec = v16::XlHeapLock::decode(buf); + if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 { + old_heap_blkno = Some(decoded.blocks[0].blkno); + flags = pg_constants::VISIBILITYMAP_ALL_FROZEN; + } } } else if decoded.xl_rmid == pg_constants::RM_HEAP2_ID { let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK; @@ -609,6 +640,12 @@ impl<'a> WalIngest<'a> { if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 { new_heap_blkno = Some(decoded.blocks[0].blkno); } + } else if info == pg_constants::XLOG_HEAP2_LOCK_UPDATED { + let xlrec = v16::XlHeapLockUpdated::decode(buf); + if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 { + old_heap_blkno = Some(decoded.blocks[0].blkno); + flags = pg_constants::VISIBILITYMAP_ALL_FROZEN; + } } } else { bail!("Unknown RMGR {} for Heap decoding", decoded.xl_rmid); @@ -616,7 +653,6 @@ impl<'a> WalIngest<'a> { } _ => {} } - // FIXME: What about XLOG_HEAP_LOCK and XLOG_HEAP2_LOCK_UPDATED? // Clear the VM bits if required. if new_heap_blkno.is_some() || old_heap_blkno.is_some() { @@ -660,7 +696,7 @@ impl<'a> WalIngest<'a> { NeonWalRecord::ClearVisibilityMapFlags { new_heap_blkno, old_heap_blkno, - flags: pg_constants::VISIBILITYMAP_VALID_BITS, + flags, }, ctx, ) @@ -676,7 +712,7 @@ impl<'a> WalIngest<'a> { NeonWalRecord::ClearVisibilityMapFlags { new_heap_blkno, old_heap_blkno: None, - flags: pg_constants::VISIBILITYMAP_VALID_BITS, + flags, }, ctx, ) @@ -690,7 +726,7 @@ impl<'a> WalIngest<'a> { NeonWalRecord::ClearVisibilityMapFlags { new_heap_blkno: None, old_heap_blkno, - flags: pg_constants::VISIBILITYMAP_VALID_BITS, + flags, }, ctx, ) @@ -717,6 +753,8 @@ impl<'a> WalIngest<'a> { // need to clear the corresponding bits in the visibility map. let mut new_heap_blkno: Option = None; let mut old_heap_blkno: Option = None; + let mut flags = pg_constants::VISIBILITYMAP_VALID_BITS; + assert_eq!(decoded.xl_rmid, pg_constants::RM_NEON_ID); match self.timeline.pg_version { @@ -772,7 +810,11 @@ impl<'a> WalIngest<'a> { } } pg_constants::XLOG_NEON_HEAP_LOCK => { - /* XLOG_NEON_HEAP_LOCK doesn't need special care */ + let xlrec = v16::rm_neon::XlNeonHeapLock::decode(buf); + if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 { + old_heap_blkno = Some(decoded.blocks[0].blkno); + flags = pg_constants::VISIBILITYMAP_ALL_FROZEN; + } } info => bail!("Unknown WAL record type for Neon RMGR: {}", info), } @@ -783,8 +825,6 @@ impl<'a> WalIngest<'a> { ), } - // FIXME: What about XLOG_NEON_HEAP_LOCK? - // Clear the VM bits if required. if new_heap_blkno.is_some() || old_heap_blkno.is_some() { let vm_rel = RelTag { @@ -827,7 +867,7 @@ impl<'a> WalIngest<'a> { NeonWalRecord::ClearVisibilityMapFlags { new_heap_blkno, old_heap_blkno, - flags: pg_constants::VISIBILITYMAP_VALID_BITS, + flags, }, ctx, ) @@ -843,7 +883,7 @@ impl<'a> WalIngest<'a> { NeonWalRecord::ClearVisibilityMapFlags { new_heap_blkno, old_heap_blkno: None, - flags: pg_constants::VISIBILITYMAP_VALID_BITS, + flags, }, ctx, ) @@ -857,7 +897,7 @@ impl<'a> WalIngest<'a> { NeonWalRecord::ClearVisibilityMapFlags { new_heap_blkno: None, old_heap_blkno, - flags: pg_constants::VISIBILITYMAP_VALID_BITS, + flags, }, ctx, ) diff --git a/pageserver/src/walrecord.rs b/pageserver/src/walrecord.rs index 27d73fb46d68..9c2e522f1765 100644 --- a/pageserver/src/walrecord.rs +++ b/pageserver/src/walrecord.rs @@ -219,20 +219,66 @@ pub mod v14 { old_offnum: buf.get_u16_le(), old_infobits_set: buf.get_u8(), flags: buf.get_u8(), - t_cid: buf.get_u32(), + t_cid: buf.get_u32_le(), new_xmax: buf.get_u32_le(), new_offnum: buf.get_u16_le(), } } } + + #[repr(C)] + #[derive(Debug)] + pub struct XlHeapLock { + pub locking_xid: TransactionId, + pub offnum: OffsetNumber, + pub _padding: u16, + pub t_cid: u32, + pub infobits_set: u8, + pub flags: u8, + } + + impl XlHeapLock { + pub fn decode(buf: &mut Bytes) -> XlHeapLock { + XlHeapLock { + locking_xid: buf.get_u32_le(), + offnum: buf.get_u16_le(), + _padding: buf.get_u16_le(), + t_cid: buf.get_u32_le(), + infobits_set: buf.get_u8(), + flags: buf.get_u8(), + } + } + } + + #[repr(C)] + #[derive(Debug)] + pub struct XlHeapLockUpdated { + pub xmax: TransactionId, + pub offnum: OffsetNumber, + pub infobits_set: u8, + pub flags: u8, + } + + impl XlHeapLockUpdated { + pub fn decode(buf: &mut Bytes) -> XlHeapLockUpdated { + XlHeapLockUpdated { + xmax: buf.get_u32_le(), + offnum: buf.get_u16_le(), + infobits_set: buf.get_u8(), + flags: buf.get_u8(), + } + } + } } pub mod v15 { - pub use super::v14::{XlHeapDelete, XlHeapInsert, XlHeapMultiInsert, XlHeapUpdate}; + pub use super::v14::{ + XlHeapDelete, XlHeapInsert, XlHeapLock, XlHeapLockUpdated, XlHeapMultiInsert, XlHeapUpdate, + }; } pub mod v16 { - pub use super::v14::{XlHeapInsert, XlHeapMultiInsert}; + pub use super::v14::{XlHeapInsert, XlHeapLockUpdated, XlHeapMultiInsert}; use bytes::{Buf, Bytes}; use postgres_ffi::{OffsetNumber, TransactionId}; @@ -278,6 +324,26 @@ pub mod v16 { } } + #[repr(C)] + #[derive(Debug)] + pub struct XlHeapLock { + pub locking_xid: TransactionId, + pub offnum: OffsetNumber, + pub infobits_set: u8, + pub flags: u8, + } + + impl XlHeapLock { + pub fn decode(buf: &mut Bytes) -> XlHeapLock { + XlHeapLock { + locking_xid: buf.get_u32_le(), + offnum: buf.get_u16_le(), + infobits_set: buf.get_u8(), + flags: buf.get_u8(), + } + } + } + /* Since PG16, we have the Neon RMGR (RM_NEON_ID) to manage Neon-flavored WAL. */ pub mod rm_neon { use bytes::{Buf, Bytes}; @@ -366,6 +432,28 @@ pub mod v16 { } } } + + #[repr(C)] + #[derive(Debug)] + pub struct XlNeonHeapLock { + pub locking_xid: TransactionId, + pub t_cid: u32, + pub offnum: OffsetNumber, + pub infobits_set: u8, + pub flags: u8, + } + + impl XlNeonHeapLock { + pub fn decode(buf: &mut Bytes) -> XlNeonHeapLock { + XlNeonHeapLock { + locking_xid: buf.get_u32_le(), + t_cid: buf.get_u32_le(), + offnum: buf.get_u16_le(), + infobits_set: buf.get_u8(), + flags: buf.get_u8(), + } + } + } } } diff --git a/test_runner/regress/test_vm_bits.py b/test_runner/regress/test_vm_bits.py index 8e20a444075e..bc810ceb09c2 100644 --- a/test_runner/regress/test_vm_bits.py +++ b/test_runner/regress/test_vm_bits.py @@ -111,3 +111,103 @@ def test_vm_bit_clear(neon_simple_env: NeonEnv): assert cur_new.fetchall() == [] cur_new.execute("SELECT id FROM vmtest_cold_update2 WHERE id = 1") assert cur_new.fetchall() == [] + + +# +# Test that the ALL_FROZEN VM bit is cleared correctly at a HEAP_LOCK +# record. +# +def test_vm_bit_clear_on_heap_lock(neon_simple_env: NeonEnv): + env = neon_simple_env + + env.neon_cli.create_branch("test_vm_bit_clear_on_heap_lock", "empty") + endpoint = env.endpoints.create_start( + "test_vm_bit_clear_on_heap_lock", + config_lines=[ + "log_autovacuum_min_duration = 0", + # Perform anti-wraparound vacuuming aggressively + "autovacuum_naptime='1 s'", + "autovacuum_freeze_max_age = 1000000", + ], + ) + + pg_conn = endpoint.connect() + cur = pg_conn.cursor() + + # Install extension containing function needed for test + cur.execute("CREATE EXTENSION neon_test_utils") + + cur.execute("SELECT pg_switch_wal()") + + # Create a test table and freeze it to set the all-frozen VM bit on all pages. + cur.execute("CREATE TABLE vmtest_lock (id integer PRIMARY KEY)") + cur.execute("INSERT INTO vmtest_lock SELECT g FROM generate_series(1, 50000) g") + cur.execute("VACUUM FREEZE vmtest_lock") + + # Lock a row. This clears the all-frozen VM bit for that page. + cur.execute("SELECT * FROM vmtest_lock WHERE id = 40000 FOR UPDATE") + + # Remember the XID. We will use it later to verify that we have consumed a lot of + # XIDs after this. + cur.execute("select pg_current_xact_id()") + locking_xid = cur.fetchall()[0][0] + + # Stop and restart postgres, to clear the buffer cache. + # + # NOTE: clear_buffer_cache() will not do, because it evicts the dirty pages + # in a "clean" way. Our neon extension will write a full-page image of the VM + # page, and we want to avoid that. + endpoint.stop() + endpoint.start() + pg_conn = endpoint.connect() + cur = pg_conn.cursor() + + cur.execute("select xmin, xmax, * from vmtest_lock where id = 40000 ") + tup = cur.fetchall() + xmax_before = tup[0][1] + + # Consume a lot of XIDs, so that anti-wraparound autovacuum kicks + # in and the clog gets truncated. We set autovacuum_freeze_max_age to a very + # low value, so it doesn't take all that many XIDs for autovacuum to kick in. + for i in range(1000): + cur.execute( + """ + CREATE TEMP TABLE othertable (i int) ON COMMIT DROP; + do $$ + begin + for i in 1..100000 loop + -- Use a begin-exception block to generate a new subtransaction on each iteration + begin + insert into othertable values (i); + exception when others then + raise 'not expected %', sqlerrm; + end; + end loop; + end; + $$; + """ + ) + cur.execute("select xmin, xmax, * from vmtest_lock where id = 40000 ") + tup = cur.fetchall() + log.info(f"tuple = {tup}") + xmax = tup[0][1] + assert xmax == xmax_before + + if i % 50 == 0: + cur.execute("select datfrozenxid from pg_database where datname='postgres'") + datfrozenxid = cur.fetchall()[0][0] + if datfrozenxid > locking_xid: + break + + cur.execute("select pg_current_xact_id()") + curr_xid = cur.fetchall()[0][0] + assert int(curr_xid) - int(locking_xid) >= 100000 + + # Now, if the VM all-frozen bit was not correctly cleared on + # replay, we will try to fetch the status of the XID that was + # already truncated away. + # + # ERROR: could not access status of transaction 1027 + cur.execute("select xmin, xmax, * from vmtest_lock where id = 40000 for update") + tup = cur.fetchall() + log.info(f"tuple = {tup}")