From cf9ce82e30c697185d364e07271dc80cd489f286 Mon Sep 17 00:00:00 2001 From: MMeent Date: Thu, 13 Apr 2023 22:42:25 +0200 Subject: [PATCH] [PG15] Feature/replicas (#279) * Recovery requirements: Add condition variable for WAL recovery; allowing backends to wait for recovery up to some record pointer. * Fix issues w.r.t. WAL when LwLsn is initiated and when recovery starts. This fixes some test failures that showed up after updating Neon code to do more precise handling of replica's get_page_at_lsn's request_lsn lsns. --------- Co-authored-by: Matthias van de Meent --- src/backend/access/transam/xlog.c | 19 +++++-- src/backend/access/transam/xlogrecovery.c | 62 +++++++++++++++++++++++ src/include/access/xlogrecovery.h | 1 + src/include/access/xlogutils.h | 4 ++ 4 files changed, 81 insertions(+), 5 deletions(-) diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index e0b365534cc..3bed031b3a3 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -5303,6 +5303,14 @@ StartupXLOG(void) RedoRecPtr = XLogCtl->RedoRecPtr = XLogCtl->Insert.RedoRecPtr = checkPoint.redo; doPageWrites = lastFullPageWrites; + /* + * Setup last written lsn cache, max written LSN. + * Starting from here, we could be modifying pages through REDO, which requires + * the existance of maxLwLsn + LwLsn LRU. + */ + XLogCtl->maxLastWrittenLsn = RedoRecPtr; + dlist_init(&XLogCtl->lastWrittenLsnLRU); + /* REDO */ if (InRecovery) { @@ -5671,8 +5679,6 @@ StartupXLOG(void) XLogCtl->LogwrtRqst.Write = EndOfLog; XLogCtl->LogwrtRqst.Flush = EndOfLog; - XLogCtl->maxLastWrittenLsn = EndOfLog; - dlist_init(&XLogCtl->lastWrittenLsnLRU); /* * Preallocate additional log files, if wanted. @@ -8144,11 +8150,14 @@ xlog_redo(XLogReaderState *record) continue; } result = XLogReadBufferForRedo(record, block_id, &buffer); - if (result == BLK_DONE && !IsUnderPostmaster) + if (result == BLK_DONE && (!IsUnderPostmaster || StandbyMode)) { /* - * In the special WAL process, blocks that are being ignored - * return BLK_DONE. Accept that. + * NEON: In the special WAL redo process, blocks that are being + * ignored return BLK_DONE. Accept that. + * Additionally, in standby mode, blocks that are not present + * in shared buffers are ignored during replay, so we also + * ignore those blocks. */ } else if (result != BLK_RESTORED) diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c index 73d4af1bd76..35c7210032b 100644 --- a/src/backend/access/transam/xlogrecovery.c +++ b/src/backend/access/transam/xlogrecovery.c @@ -336,6 +336,7 @@ typedef struct XLogRecoveryCtlData XLogRecPtr lastReplayedReadRecPtr; /* start position */ XLogRecPtr lastReplayedEndRecPtr; /* end+1 position */ TimeLineID lastReplayedTLI; /* timeline */ + ConditionVariable replayProgressCV; /* CV for waiters */ /* * When we're currently replaying a record, ie. in a redo function, @@ -465,6 +466,7 @@ XLogRecoveryShmemInit(void) SpinLockInit(&XLogRecoveryCtl->info_lck); InitSharedLatch(&XLogRecoveryCtl->recoveryWakeupLatch); + ConditionVariableInit(&XLogRecoveryCtl->replayProgressCV); ConditionVariableInit(&XLogRecoveryCtl->recoveryNotPausedCV); } @@ -486,6 +488,64 @@ EnableStandbyMode(void) disable_startup_progress_timeout(); } +/* + * Wait for recovery to complete replaying all WAL up to and including + * redoEndRecPtr. + * + * This gets woken up for every WAL record replayed, so make sure you're not + * trying to wait an LSN that is too far in the future. + */ +void +XLogWaitForReplayOf(XLogRecPtr redoEndRecPtr) +{ + static XLogRecPtr replayRecPtr = 0; + + if (!RecoveryInProgress()) + return; + + /* + * Check the backend-local variable first, we may be able to skip accessing + * shared memory (which requires locking) + */ + if (redoEndRecPtr <= replayRecPtr) + return; + + replayRecPtr = GetXLogReplayRecPtr(NULL); + + /* + * Check again if we're going to need to wait, now that we've updated + * the local cached variable. + */ + if (redoEndRecPtr <= replayRecPtr) + return; + + /* + * We need to wait for the variable, so prepare for that. + * + * Note: This wakes up every time a WAL record is replayed, so this can + * be expensive. + */ + ConditionVariablePrepareToSleep(&XLogRecoveryCtl->replayProgressCV); + + while (redoEndRecPtr > replayRecPtr) + { + bool timeout; + timeout = ConditionVariableTimedSleep(&XLogRecoveryCtl->replayProgressCV, + 10000000, /* 10 seconds */ + WAIT_EVENT_RECOVERY_WAL_STREAM); + + replayRecPtr = GetXLogReplayRecPtr(NULL); + + if (timeout) + ereport(LOG, + (errmsg("Waiting for recovery to catch up to %X/%X (currently %X/%X)", + LSN_FORMAT_ARGS(redoEndRecPtr), + LSN_FORMAT_ARGS(replayRecPtr)))); + } + + ConditionVariableCancelSleep(); +} + /* * Prepare the system for WAL recovery, if needed. * @@ -2051,6 +2111,8 @@ ApplyWalRecord(XLogReaderState *xlogreader, XLogRecord *record, TimeLineID *repl /* Reset the prefetcher. */ XLogPrefetchReconfigure(); } + + ConditionVariableBroadcast(&XLogRecoveryCtl->replayProgressCV); } /* diff --git a/src/include/access/xlogrecovery.h b/src/include/access/xlogrecovery.h index 0aa85d90e89..48eaa8bcbf1 100644 --- a/src/include/access/xlogrecovery.h +++ b/src/include/access/xlogrecovery.h @@ -135,6 +135,7 @@ extern void ShutdownWalRecovery(void); extern void RemovePromoteSignalFiles(void); extern bool HotStandbyActive(void); +extern void XLogWaitForReplayOf(XLogRecPtr redoEndRecPtr); extern XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI); extern RecoveryPauseState GetRecoveryPauseState(void); extern void SetRecoveryPause(bool recoveryPause); diff --git a/src/include/access/xlogutils.h b/src/include/access/xlogutils.h index 57cb9d84215..15f155238a0 100644 --- a/src/include/access/xlogutils.h +++ b/src/include/access/xlogutils.h @@ -81,6 +81,10 @@ typedef struct ReadLocalXLogPageNoWaitPrivate bool end_of_wal; /* true, when end of WAL is reached */ } ReadLocalXLogPageNoWaitPrivate; +/* + * Returns true if we shouldn't do REDO on that block in record indicated by + * block_id; false otherwise. + */ extern bool (*redo_read_buffer_filter) (XLogReaderState *record, uint8 block_id); extern XLogRedoAction XLogReadBufferForRedo(XLogReaderState *record,