Skip to content

Commit

Permalink
Add custom xlogreader callbacks to walsender (v15) (#405)
Browse files Browse the repository at this point in the history
* Add hooks for custom xlogreader in walsender

* Make WalSndWaitForWal not static

* Spaces to tab
  • Loading branch information
Sasha Krassovsky authored and tristan957 committed May 10, 2024
1 parent fa8f7d2 commit e7c9b0e
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 6 deletions.
17 changes: 11 additions & 6 deletions src/backend/replication/walsender.c
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ int wal_sender_timeout = 60 * 1000; /* maximum time to send one WAL
* data message */
bool log_replication_commands = false;

void (*WalSender_Custom_XLogReaderRoutines)(XLogReaderRoutine *xlr);
/*
* State for WalSndWakeupRequest
*/
Expand Down Expand Up @@ -258,7 +259,7 @@ static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, Tran
static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
static void WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
bool skipped_xact);
static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc);
XLogRecPtr WalSndWaitForWal(XLogRecPtr loc);
static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch);

static void WalSndSegmentOpen(XLogReaderState *state, XLogSegNo nextSegNo,
Expand Down Expand Up @@ -1261,6 +1262,7 @@ StartLogicalReplication(StartReplicationCmd *cmd)
{
StringInfoData buf;
QueryCompletion qc;
XLogReaderRoutine xlr;

/* make sure that our requirements are still fulfilled */
CheckLogicalDecodingRequirements();
Expand Down Expand Up @@ -1288,6 +1290,12 @@ StartLogicalReplication(StartReplicationCmd *cmd)
got_STOPPING = true;
}

xlr.page_read = logical_read_xlog_page;
xlr.segment_open = WalSndSegmentOpen;
xlr.segment_close = wal_segment_close;
if (WalSender_Custom_XLogReaderRoutines != NULL)
WalSender_Custom_XLogReaderRoutines(&xlr);

/*
* Create our decoding context, making it start at the previously ack'ed
* position.
Expand All @@ -1296,10 +1304,7 @@ StartLogicalReplication(StartReplicationCmd *cmd)
* are reported early.
*/
logical_decoding_ctx =
CreateDecodingContext(cmd->startpoint, cmd->options, false,
XL_ROUTINE(.page_read = logical_read_xlog_page,
.segment_open = WalSndSegmentOpen,
.segment_close = wal_segment_close),
CreateDecodingContext(cmd->startpoint, cmd->options, false, &xlr,
WalSndPrepareWrite, WalSndWriteData,
WalSndUpdateProgress);
xlogreader = logical_decoding_ctx->reader;
Expand Down Expand Up @@ -1547,7 +1552,7 @@ WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId
* if we detect a shutdown request (either from postmaster or client)
* we will return early, so caller must always check.
*/
static XLogRecPtr
XLogRecPtr
WalSndWaitForWal(XLogRecPtr loc)
{
int wakeEvents;
Expand Down
3 changes: 3 additions & 0 deletions src/include/replication/walsender.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ extern PGDLLIMPORT int max_wal_senders;
extern PGDLLIMPORT int wal_sender_timeout;
extern PGDLLIMPORT bool log_replication_commands;

struct XLogReaderRoutine;
extern PGDLLIMPORT void (*WalSender_Custom_XLogReaderRoutines)(struct XLogReaderRoutine *xlr);

extern void InitWalSender(void);
extern bool exec_replication_command(const char *query_string);
extern void WalSndErrorCleanup(void);
Expand Down

0 comments on commit e7c9b0e

Please sign in to comment.