From 55b44ec5e223b2741d60f6db9c6b76b52187dfce Mon Sep 17 00:00:00 2001 From: Konstantin Knizhnik Date: Wed, 11 Dec 2024 14:02:19 +0200 Subject: [PATCH] Add wallog_file_removal function --- src/backend/access/heap/rewriteheap.c | 4 +- src/backend/replication/logical/message.c | 62 +++++++++++++++-------- src/include/replication/message.h | 1 + 3 files changed, 43 insertions(+), 24 deletions(-) diff --git a/src/backend/access/heap/rewriteheap.c b/src/backend/access/heap/rewriteheap.c index 4b61ba7e6f6..c1205c0d072 100644 --- a/src/backend/access/heap/rewriteheap.c +++ b/src/backend/access/heap/rewriteheap.c @@ -1138,8 +1138,6 @@ heap_xlog_logical_rewrite(XLogReaderState *r) errmsg("could not fsync file \"%s\": %m", path))); pgstat_report_wait_end(); - wallog_file_descriptor(path, fd, -1); - if (CloseTransientFile(fd) != 0) ereport(ERROR, (errcode_for_file_access(), @@ -1217,7 +1215,7 @@ CheckPointLogicalRewriteHeap(void) ereport(ERROR, (errcode_for_file_access(), errmsg("could not remove file \"%s\": %m", path))); - wallog_file_descriptor(path, -1, -1); + wallog_file_removal(path); } else { diff --git a/src/backend/replication/logical/message.c b/src/backend/replication/logical/message.c index 807af67f333..1849611f282 100644 --- a/src/backend/replication/logical/message.c +++ b/src/backend/replication/logical/message.c @@ -97,45 +97,65 @@ logicalmsg_redo(XLogReaderState *record) /* This is only interesting for logical decoding, see decode.c. */ } +/* + * NEON: remove AUX object + */ +void +wallog_file_removal(char const* path) +{ + char prefix[MAXPGPATH]; + + /* Do not wallog AUX file at replica */ + if (!XLogInsertAllowed()) + return; + + snprintf(prefix, sizeof(prefix), "neon-file:%s", path); + elog(DEBUG1, "neon: deleting contents of file %s", path); + + /* unlink file */ + LogLogicalMessage(prefix, NULL, 0, false, true); +} + /* * NEON: persist file in WAL to save it in persistent storage. - * If fd < 0, then remove entry from page server. + * This funcion changes current position in the file, so caller should be aware of it. */ void wallog_file_descriptor(char const* path, int fd, uint64_t limit) { char prefix[MAXPGPATH]; + off_t size; + + Assert(fd >= 0); /* Do not wallog AUX file at replica */ if (!XLogInsertAllowed()) return; - snprintf(prefix, sizeof(prefix), "neon-file:%s", path); - if (fd < 0) + size = lseek(fd, 0, SEEK_END); + elog(DEBUG1, "neon: writing contents of file %s, size %ld", path, (long)size); + if (size < 0) + elog(ERROR, "Failed to get size of file %s: %m", path); + + if ((uint64_t)size > limit) { - elog(DEBUG1, "neon: deleting contents of file %s", path); - /* unlink file */ - LogLogicalMessage(prefix, NULL, 0, false, true); + elog(WARNING, "Size of file %s %ld is larger than limit %ld", path, (long)size, (long)limit); + wallog_file_removal(path); } else { - off_t size = lseek(fd, 0, SEEK_END); - elog(DEBUG1, "neon: writing contents of file %s, size %ld", path, (long)size); - if (size < 0) - elog(ERROR, "Failed to get size of file %s: %m", path); - if ((uint64_t)size > limit) - { - elog(WARNING, "Size of file %s %ld is larger than limit %ld", path, (long)size, (long)limit); - } - else - { - char* buf = palloc((size_t)size); - lseek(fd, 0, SEEK_SET); - if (read(fd, buf, (size_t)size) != size) + char* buf = palloc((size_t)size); + size_t offs = 0; + lseek(fd, 0, SEEK_SET); + while (offs < size) { + ssize_t rc = read(fd, buf + offs, (size_t)size - offs); + if (rc <= 0) elog(ERROR, "Failed to read file %s: %m", path); - LogLogicalMessage(prefix, buf, (size_t)size, false, true); - pfree(buf); + offs += rc; } + snprintf(prefix, sizeof(prefix), "neon-file:%s", path); + LogLogicalMessage(prefix, buf, (size_t)size, false, true); + pfree(buf); } } diff --git a/src/include/replication/message.h b/src/include/replication/message.h index ff233bb4834..cd7f8163964 100644 --- a/src/include/replication/message.h +++ b/src/include/replication/message.h @@ -35,6 +35,7 @@ extern XLogRecPtr LogLogicalMessage(const char *prefix, const char *message, extern void wallog_file(char const* path, uint64_t limit); extern void wallog_file_descriptor(char const* path, int fd, uint64_t limit); +extern void wallog_file_removal(char const* path); /* RMGR API */ #define XLOG_LOGICAL_MESSAGE 0x00