From 305ea421714d907d02c486c79f2745c5e09f6ff1 Mon Sep 17 00:00:00 2001 From: Konstantin Knizhnik Date: Wed, 11 Dec 2024 14:02:30 +0200 Subject: [PATCH] Add wallog_file_removal function --- src/backend/access/heap/rewriteheap.c | 4 +- src/backend/replication/logical/message.c | 64 +++++++++++++++-------- src/include/replication/message.h | 1 + 3 files changed, 44 insertions(+), 25 deletions(-) diff --git a/src/backend/access/heap/rewriteheap.c b/src/backend/access/heap/rewriteheap.c index 7e708b5dd5b..c451ce8593d 100644 --- a/src/backend/access/heap/rewriteheap.c +++ b/src/backend/access/heap/rewriteheap.c @@ -1171,8 +1171,6 @@ heap_xlog_logical_rewrite(XLogReaderState *r) errmsg("could not fsync file \"%s\": %m", path))); pgstat_report_wait_end(); - wallog_file_descriptor(path, fd, -1); - if (CloseTransientFile(fd) != 0) ereport(ERROR, (errcode_for_file_access(), @@ -1250,7 +1248,7 @@ CheckPointLogicalRewriteHeap(void) ereport(ERROR, (errcode_for_file_access(), errmsg("could not remove file \"%s\": %m", path))); - wallog_file_descriptor(path, -1, -1); + wallog_file_removal(path); } else { diff --git a/src/backend/replication/logical/message.c b/src/backend/replication/logical/message.c index 6abb4c77d7a..9a538863a50 100644 --- a/src/backend/replication/logical/message.c +++ b/src/backend/replication/logical/message.c @@ -90,45 +90,65 @@ logicalmsg_redo(XLogReaderState *record) /* This is only interesting for logical decoding, see decode.c. */ } +/* + * NEON: remove AUX object + */ +void +wallog_file_removal(char const* path) +{ + char prefix[MAXPGPATH]; + + /* Do not wallog AUX file at replica */ + if (!XLogInsertAllowed()) + return; + + snprintf(prefix, sizeof(prefix), "neon-file:%s", path); + elog(DEBUG1, "neon: deleting contents of file %s", path); + + /* unlink file */ + XLogFlush(LogLogicalMessage(prefix, NULL, 0, false)); +} + /* * NEON: persist file in WAL to save it in persistent storage. - * If fd < 0, then remote entry from page server. + * This funcion changes current position in the file, so caller should be aware of it. */ void wallog_file_descriptor(char const* path, int fd, uint64_t limit) { char prefix[MAXPGPATH]; + off_t size; + + Assert(fd >= 0); /* Do not wallog AUX file at replica */ if (!XLogInsertAllowed()) return; - snprintf(prefix, sizeof(prefix), "neon-file:%s", path); - if (fd < 0) + size = lseek(fd, 0, SEEK_END); + elog(DEBUG1, "neon: writing contents of file %s, size %ld", path, (long)size); + if (size < 0) + elog(ERROR, "Failed to get size of file %s: %m", path); + + if ((uint64_t)size > limit) { - elog(DEBUG1, "neon: deleting contents of file %s", path); - /* unlink file */ - XLogFlush(LogLogicalMessage(prefix, NULL, 0, false)); + elog(WARNING, "Size of file %s %ld is larger than limit %ld", path, (long)size, (long)limit); + wallog_file_removal(path); } else { - off_t size = lseek(fd, 0, SEEK_END); - elog(DEBUG1, "neon: writing contents of file %s, size %ld", path, (long)size); - if (size < 0) - elog(ERROR, "Failed to get size of file %s: %m", path); - if ((uint64_t)size > limit) - { - elog(WARNING, "Size of file %s %ld is larger than limit %ld", path, (long)size, (long)limit); - } - else - { - char* buf = palloc((size_t)size); - lseek(fd, 0, SEEK_SET); - if (read(fd, buf, (size_t)size) != size) + char* buf = palloc((size_t)size); + size_t offs = 0; + lseek(fd, 0, SEEK_SET); + while (offs < size) { + ssize_t rc = read(fd, buf + offs, (size_t)size - offs); + if (rc <= 0) elog(ERROR, "Failed to read file %s: %m", path); - XLogFlush(LogLogicalMessage(prefix, buf, (size_t)size, false)); - pfree(buf); + offs += rc; } + snprintf(prefix, sizeof(prefix), "neon-file:%s", path); + XLogFlush(LogLogicalMessage(prefix, buf, (size_t)size, false)); + pfree(buf); } } @@ -140,7 +160,7 @@ wallog_file(char const* path, uint64_t limit) { ereport(LOG, (errcode_for_file_access(), - errmsg("could not create file \"%s\": %m", path))); + errmsg("could not open file \"%s\": %m", path))); } else { diff --git a/src/include/replication/message.h b/src/include/replication/message.h index bf9e13d1af9..58ffab79ab4 100644 --- a/src/include/replication/message.h +++ b/src/include/replication/message.h @@ -34,6 +34,7 @@ extern XLogRecPtr LogLogicalMessage(const char *prefix, const char *message, extern void wallog_file(char const* path, uint64_t limit); extern void wallog_file_descriptor(char const* path, int fd, uint64_t limit); +extern void wallog_file_removal(char const* path); /* RMGR API */ #define XLOG_LOGICAL_MESSAGE 0x00