From aecf3f56c1761e88b9d533dd78f49c3e97e51e21 Mon Sep 17 00:00:00 2001 From: Konstantin Knizhnik Date: Wed, 2 Oct 2024 09:43:03 +0300 Subject: [PATCH 01/10] Persist pg_stat file using AUX mechanism --- src/backend/access/heap/rewriteheap.c | 43 ++--------------- src/backend/access/transam/xlog.c | 8 ++-- src/backend/replication/logical/message.c | 57 +++++++++++++++++++++++ src/backend/utils/activity/pgstat.c | 10 ++-- src/backend/utils/misc/guc_tables.c | 11 +++++ src/include/pgstat.h | 3 +- src/include/replication/message.h | 3 ++ 7 files changed, 88 insertions(+), 47 deletions(-) diff --git a/src/backend/access/heap/rewriteheap.c b/src/backend/access/heap/rewriteheap.c index c438ae14eed..4b61ba7e6f6 100644 --- a/src/backend/access/heap/rewriteheap.c +++ b/src/backend/access/heap/rewriteheap.c @@ -752,41 +752,6 @@ raw_heap_insert(RewriteState state, HeapTuple tup) * ------------------------------------------------------------------------ */ -/* - * NEON: we need to persist mapping file in WAL - */ -static void -wallog_mapping_file(char const* path, int fd) -{ - char prefix[MAXPGPATH]; - - /* Do not wallog AUX file at replica */ - if (!XLogInsertAllowed()) - return; - - snprintf(prefix, sizeof(prefix), "neon-file:%s", path); - if (fd < 0) - { - elog(DEBUG1, "neon: deleting contents of rewrite file %s", path); - /* unlink file */ - LogLogicalMessage(prefix, NULL, 0, false, false); - } - else - { - off_t size = lseek(fd, 0, SEEK_END); - char* buf; - elog(DEBUG1, "neon: writing contents of rewrite file %s, size %ld", path, (long)size); - if (size < 0) - elog(ERROR, "Failed to get size of mapping file: %m"); - buf = palloc((size_t)size); - lseek(fd, 0, SEEK_SET); - if (read(fd, buf, (size_t)size) != size) - elog(ERROR, "Failed to read mapping file: %m"); - LogLogicalMessage(prefix, buf, (size_t)size, false, false); - pfree(buf); - } -} - /* * Do preparations for logging logical mappings during a rewrite if * necessary. If we detect that we don't need to log anything we'll prevent @@ -922,7 +887,7 @@ logical_heap_rewrite_flush_mappings(RewriteState state) errmsg("could not write to file \"%s\", wrote %d of %d: %m", src->path, written, len))); src->off += len; - wallog_mapping_file(src->path, FileGetRawDesc(src->vfd)); + wallog_file_descriptor(src->path, FileGetRawDesc(src->vfd), -1); XLogBeginInsert(); XLogRegisterData((char *) (&xlrec), sizeof(xlrec)); @@ -1173,7 +1138,7 @@ heap_xlog_logical_rewrite(XLogReaderState *r) errmsg("could not fsync file \"%s\": %m", path))); pgstat_report_wait_end(); - wallog_mapping_file(path, fd); + wallog_file_descriptor(path, fd, -1); if (CloseTransientFile(fd) != 0) ereport(ERROR, @@ -1252,7 +1217,7 @@ CheckPointLogicalRewriteHeap(void) ereport(ERROR, (errcode_for_file_access(), errmsg("could not remove file \"%s\": %m", path))); - wallog_mapping_file(path, -1); + wallog_file_descriptor(path, -1, -1); } else { @@ -1281,7 +1246,7 @@ CheckPointLogicalRewriteHeap(void) errmsg("could not fsync file \"%s\": %m", path))); pgstat_report_wait_end(); - wallog_mapping_file(path, fd); + wallog_file_descriptor(path, fd, -1); if (CloseTransientFile(fd) != 0) ereport(ERROR, diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 0c0a737f271..529a313acaa 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -7418,10 +7418,10 @@ CreateCheckPoint(int flags) */ SyncPreCheckpoint(); - /* - * NEON: perform checkpiont action requiring write to the WAL before we determine the REDO pointer. - */ - PreCheckPointGuts(flags); + /* + * NEON: perform checkpiont action requiring write to the WAL before we determine the REDO pointer. + */ + PreCheckPointGuts(flags); /* * Use a critical section to force system panic if we have trouble. diff --git a/src/backend/replication/logical/message.c b/src/backend/replication/logical/message.c index 9e41aac2813..23c34ba3d80 100644 --- a/src/backend/replication/logical/message.c +++ b/src/backend/replication/logical/message.c @@ -31,10 +31,13 @@ #include "postgres.h" +#include + #include "access/xact.h" #include "access/xloginsert.h" #include "miscadmin.h" #include "replication/message.h" +#include "storage/fd.h" /* * Write logical decoding message into XLog. @@ -93,3 +96,57 @@ logicalmsg_redo(XLogReaderState *record) /* This is only interesting for logical decoding, see decode.c. */ } + +/* + * NEON: persist file in WAL to save it in persistent storage. + * If fd < 0, then remote entry from page server. + */ +void +wallog_file_descriptor(char const* path, int fd, uint64_t limit) +{ + char prefix[MAXPGPATH]; + snprintf(prefix, sizeof(prefix), "neon-file:%s", path); + if (fd < 0) + { + elog(DEBUG1, "neon: deleting contents of file %s", path); + /* unlink file */ + LogLogicalMessage(prefix, NULL, 0, false, true); + } + else + { + off_t size = lseek(fd, 0, SEEK_END); + elog(DEBUG1, "neon: writing contents of file %s, size %ld", path, (long)size); + if (size < 0) + elog(ERROR, "Failed to get size of file %s: %m", path); + if ((uint64_t)size > limit) + { + elog(WARNING, "Size of file %s %ld is larger than limit %ld", path, (long)size, (long)limit); + } + else + { + char* buf = palloc((size_t)size); + lseek(fd, 0, SEEK_SET); + if (read(fd, buf, (size_t)size) != size) + elog(ERROR, "Failed to read file %s: %m", path); + LogLogicalMessage(prefix, buf, (size_t)size, false, true); + pfree(buf); + } + } +} + +void +wallog_file(char const* path, uint64_t limit) +{ + int fd = OpenTransientFile(path, O_RDONLY | PG_BINARY); + if (fd < 0) + { + ereport(LOG, + (errcode_for_file_access(), + errmsg("could not create file \"%s\": %m", path))); + } + else + { + wallog_file_descriptor(path, fd, limit); + CloseTransientFile(fd); + } +} diff --git a/src/backend/utils/activity/pgstat.c b/src/backend/utils/activity/pgstat.c index 87d22a1e19e..eaa107a35f0 100644 --- a/src/backend/utils/activity/pgstat.c +++ b/src/backend/utils/activity/pgstat.c @@ -97,6 +97,7 @@ #include "lib/dshash.h" #include "pgstat.h" #include "port/atomics.h" +#include "replication/message.h" #include "storage/fd.h" #include "storage/ipc.h" #include "storage/lwlock.h" @@ -162,7 +163,6 @@ typedef struct PgStat_SnapshotEntry * ---------- */ -static void pgstat_write_statsfile(void); static void pgstat_read_statsfile(void); static void pgstat_reset_after_failure(void); @@ -183,7 +183,7 @@ static inline bool pgstat_is_kind_valid(int ikind); bool pgstat_track_counts = false; int pgstat_fetch_consistency = PGSTAT_FETCH_CONSISTENCY_CACHE; - +int neon_pgstat_file_size_limit; /* ---------- * state shared with pgstat_*.c @@ -1307,7 +1307,7 @@ write_chunk(FILE *fpout, void *ptr, size_t len) * This function is called in the last process that is accessing the shared * stats so locking is not required. */ -static void +void pgstat_write_statsfile(void) { FILE *fpout; @@ -1465,6 +1465,10 @@ pgstat_write_statsfile(void) tmpfile, statfile))); unlink(tmpfile); } + else if (XLogInsertAllowed()) + { + wallog_file(statfile, (uint64_t)neon_pgstat_file_size_limit * 1024); + } } /* helpers for pgstat_read_statsfile() */ diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c index 69c8238de49..388efeca602 100644 --- a/src/backend/utils/misc/guc_tables.c +++ b/src/backend/utils/misc/guc_tables.c @@ -3650,6 +3650,17 @@ struct config_int ConfigureNamesInt[] = NULL, NULL, NULL }, + { + {"neon_pgstat_file_size_limit", PGC_SIGHUP, STATS_CUMULATIVE, + gettext_noop("Maximal size of pg_stat file saved in Neon storage."), + NULL, + GUC_UNIT_KB + }, + &neon_pgstat_file_size_limit, + 1024, 100, 1048576, + NULL, NULL, NULL + }, + { {"gin_pending_list_limit", PGC_USERSET, CLIENT_CONN_STATEMENT, gettext_noop("Sets the maximum size of the pending list for GIN index."), diff --git a/src/include/pgstat.h b/src/include/pgstat.h index 2136239710e..e01e58d38bd 100644 --- a/src/include/pgstat.h +++ b/src/include/pgstat.h @@ -469,6 +469,7 @@ extern void StatsShmemInit(void); extern void pgstat_restore_stats(void); extern void pgstat_discard_stats(void); extern void pgstat_before_server_shutdown(int code, Datum arg); +extern void pgstat_write_statsfile(void); /* Functions for backend initialization */ extern void pgstat_initialize(void); @@ -729,7 +730,7 @@ extern PgStat_WalStats *pgstat_fetch_stat_wal(void); extern PGDLLIMPORT bool pgstat_track_counts; extern PGDLLIMPORT int pgstat_track_functions; extern PGDLLIMPORT int pgstat_fetch_consistency; - +extern PGDLLIMPORT int neon_pgstat_file_size_limit; /* * Variables in pgstat_bgwriter.c diff --git a/src/include/replication/message.h b/src/include/replication/message.h index d5fb2fe0172..ff233bb4834 100644 --- a/src/include/replication/message.h +++ b/src/include/replication/message.h @@ -33,6 +33,9 @@ extern XLogRecPtr LogLogicalMessage(const char *prefix, const char *message, size_t size, bool transactional, bool flush); +extern void wallog_file(char const* path, uint64_t limit); +extern void wallog_file_descriptor(char const* path, int fd, uint64_t limit); + /* RMGR API */ #define XLOG_LOGICAL_MESSAGE 0x00 extern void logicalmsg_redo(XLogReaderState *record); From 073671a81abba08f3c6985412005c9606d5b10b4 Mon Sep 17 00:00:00 2001 From: Konstantin Knizhnik Date: Wed, 4 Dec 2024 09:32:15 +0200 Subject: [PATCH 02/10] Do not wallog AUX files at replica --- src/backend/replication/logical/message.c | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/backend/replication/logical/message.c b/src/backend/replication/logical/message.c index 23c34ba3d80..f82a48b3ec0 100644 --- a/src/backend/replication/logical/message.c +++ b/src/backend/replication/logical/message.c @@ -105,6 +105,11 @@ void wallog_file_descriptor(char const* path, int fd, uint64_t limit) { char prefix[MAXPGPATH]; + + /* Do not wallog AUX file at replica */ + if (!XLogInsertAllowed()) + return; + snprintf(prefix, sizeof(prefix), "neon-file:%s", path); if (fd < 0) { From ed3895ed0c0af4b0d4559ed0de19e51b90da053f Mon Sep 17 00:00:00 2001 From: Konstantin Knizhnik Date: Wed, 11 Dec 2024 11:52:23 +0200 Subject: [PATCH 03/10] Update src/backend/replication/logical/message.c Co-authored-by: Heikki Linnakangas --- src/backend/replication/logical/message.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/backend/replication/logical/message.c b/src/backend/replication/logical/message.c index f82a48b3ec0..dac27cb6a3e 100644 --- a/src/backend/replication/logical/message.c +++ b/src/backend/replication/logical/message.c @@ -99,7 +99,7 @@ logicalmsg_redo(XLogReaderState *record) /* * NEON: persist file in WAL to save it in persistent storage. - * If fd < 0, then remote entry from page server. + * If fd < 0, then remove entry from page server. */ void wallog_file_descriptor(char const* path, int fd, uint64_t limit) From b2be918f1c2e7a1520de1e4cb5fb3f676bf6e86a Mon Sep 17 00:00:00 2001 From: Konstantin Knizhnik Date: Wed, 11 Dec 2024 11:52:39 +0200 Subject: [PATCH 04/10] Update src/backend/replication/logical/message.c Co-authored-by: Heikki Linnakangas --- src/backend/replication/logical/message.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/backend/replication/logical/message.c b/src/backend/replication/logical/message.c index dac27cb6a3e..807af67f333 100644 --- a/src/backend/replication/logical/message.c +++ b/src/backend/replication/logical/message.c @@ -147,7 +147,7 @@ wallog_file(char const* path, uint64_t limit) { ereport(LOG, (errcode_for_file_access(), - errmsg("could not create file \"%s\": %m", path))); + errmsg("could not open file \"%s\": %m", path))); } else { From 7a1d34673882c32d41bb580e0697fccb13f0ccc0 Mon Sep 17 00:00:00 2001 From: Konstantin Knizhnik Date: Wed, 11 Dec 2024 14:02:19 +0200 Subject: [PATCH 05/10] Add wallog_file_removal function --- src/backend/access/heap/rewriteheap.c | 4 +- src/backend/replication/logical/message.c | 62 +++++++++++++++-------- src/include/replication/message.h | 1 + 3 files changed, 43 insertions(+), 24 deletions(-) diff --git a/src/backend/access/heap/rewriteheap.c b/src/backend/access/heap/rewriteheap.c index 4b61ba7e6f6..c1205c0d072 100644 --- a/src/backend/access/heap/rewriteheap.c +++ b/src/backend/access/heap/rewriteheap.c @@ -1138,8 +1138,6 @@ heap_xlog_logical_rewrite(XLogReaderState *r) errmsg("could not fsync file \"%s\": %m", path))); pgstat_report_wait_end(); - wallog_file_descriptor(path, fd, -1); - if (CloseTransientFile(fd) != 0) ereport(ERROR, (errcode_for_file_access(), @@ -1217,7 +1215,7 @@ CheckPointLogicalRewriteHeap(void) ereport(ERROR, (errcode_for_file_access(), errmsg("could not remove file \"%s\": %m", path))); - wallog_file_descriptor(path, -1, -1); + wallog_file_removal(path); } else { diff --git a/src/backend/replication/logical/message.c b/src/backend/replication/logical/message.c index 807af67f333..1849611f282 100644 --- a/src/backend/replication/logical/message.c +++ b/src/backend/replication/logical/message.c @@ -97,45 +97,65 @@ logicalmsg_redo(XLogReaderState *record) /* This is only interesting for logical decoding, see decode.c. */ } +/* + * NEON: remove AUX object + */ +void +wallog_file_removal(char const* path) +{ + char prefix[MAXPGPATH]; + + /* Do not wallog AUX file at replica */ + if (!XLogInsertAllowed()) + return; + + snprintf(prefix, sizeof(prefix), "neon-file:%s", path); + elog(DEBUG1, "neon: deleting contents of file %s", path); + + /* unlink file */ + LogLogicalMessage(prefix, NULL, 0, false, true); +} + /* * NEON: persist file in WAL to save it in persistent storage. - * If fd < 0, then remove entry from page server. + * This funcion changes current position in the file, so caller should be aware of it. */ void wallog_file_descriptor(char const* path, int fd, uint64_t limit) { char prefix[MAXPGPATH]; + off_t size; + + Assert(fd >= 0); /* Do not wallog AUX file at replica */ if (!XLogInsertAllowed()) return; - snprintf(prefix, sizeof(prefix), "neon-file:%s", path); - if (fd < 0) + size = lseek(fd, 0, SEEK_END); + elog(DEBUG1, "neon: writing contents of file %s, size %ld", path, (long)size); + if (size < 0) + elog(ERROR, "Failed to get size of file %s: %m", path); + + if ((uint64_t)size > limit) { - elog(DEBUG1, "neon: deleting contents of file %s", path); - /* unlink file */ - LogLogicalMessage(prefix, NULL, 0, false, true); + elog(WARNING, "Size of file %s %ld is larger than limit %ld", path, (long)size, (long)limit); + wallog_file_removal(path); } else { - off_t size = lseek(fd, 0, SEEK_END); - elog(DEBUG1, "neon: writing contents of file %s, size %ld", path, (long)size); - if (size < 0) - elog(ERROR, "Failed to get size of file %s: %m", path); - if ((uint64_t)size > limit) - { - elog(WARNING, "Size of file %s %ld is larger than limit %ld", path, (long)size, (long)limit); - } - else - { - char* buf = palloc((size_t)size); - lseek(fd, 0, SEEK_SET); - if (read(fd, buf, (size_t)size) != size) + char* buf = palloc((size_t)size); + size_t offs = 0; + lseek(fd, 0, SEEK_SET); + while (offs < size) { + ssize_t rc = read(fd, buf + offs, (size_t)size - offs); + if (rc <= 0) elog(ERROR, "Failed to read file %s: %m", path); - LogLogicalMessage(prefix, buf, (size_t)size, false, true); - pfree(buf); + offs += rc; } + snprintf(prefix, sizeof(prefix), "neon-file:%s", path); + LogLogicalMessage(prefix, buf, (size_t)size, false, true); + pfree(buf); } } diff --git a/src/include/replication/message.h b/src/include/replication/message.h index ff233bb4834..cd7f8163964 100644 --- a/src/include/replication/message.h +++ b/src/include/replication/message.h @@ -35,6 +35,7 @@ extern XLogRecPtr LogLogicalMessage(const char *prefix, const char *message, extern void wallog_file(char const* path, uint64_t limit); extern void wallog_file_descriptor(char const* path, int fd, uint64_t limit); +extern void wallog_file_removal(char const* path); /* RMGR API */ #define XLOG_LOGICAL_MESSAGE 0x00 From d499d9b3ca2968ff6b8a90a811d379d2f5eda7ba Mon Sep 17 00:00:00 2001 From: Konstantin Knizhnik Date: Tue, 17 Dec 2024 18:30:49 +0200 Subject: [PATCH 06/10] Do small refactoring --- src/backend/access/transam/xlog.c | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 529a313acaa..51c5ff7f60a 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -8009,7 +8009,10 @@ static void PreCheckPointGuts(int flags) { if (flags & CHECKPOINT_IS_SHUTDOWN) + { CheckPointReplicationState(flags); + pgstat_write_statsfile(); + } } /* From eab9eff933035841f56ae5165119cd9f5d8408e9 Mon Sep 17 00:00:00 2001 From: Konstantin Knizhnik Date: Tue, 17 Dec 2024 20:53:30 +0200 Subject: [PATCH 07/10] Update src/backend/replication/logical/message.c Co-authored-by: Heikki Linnakangas --- src/backend/replication/logical/message.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/backend/replication/logical/message.c b/src/backend/replication/logical/message.c index 1849611f282..a6fdca0c9b6 100644 --- a/src/backend/replication/logical/message.c +++ b/src/backend/replication/logical/message.c @@ -118,7 +118,7 @@ wallog_file_removal(char const* path) /* * NEON: persist file in WAL to save it in persistent storage. - * This funcion changes current position in the file, so caller should be aware of it. + * This function changes current position in the file, so caller should be aware of it. */ void wallog_file_descriptor(char const* path, int fd, uint64_t limit) From 253f1070bdaeed2eeef48082ca11662b01feb098 Mon Sep 17 00:00:00 2001 From: Konstantin Knizhnik Date: Tue, 17 Dec 2024 20:53:38 +0200 Subject: [PATCH 08/10] Update src/backend/access/heap/rewriteheap.c Co-authored-by: Heikki Linnakangas --- src/backend/access/heap/rewriteheap.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/backend/access/heap/rewriteheap.c b/src/backend/access/heap/rewriteheap.c index c1205c0d072..cdb42ab9e7e 100644 --- a/src/backend/access/heap/rewriteheap.c +++ b/src/backend/access/heap/rewriteheap.c @@ -887,7 +887,7 @@ logical_heap_rewrite_flush_mappings(RewriteState state) errmsg("could not write to file \"%s\", wrote %d of %d: %m", src->path, written, len))); src->off += len; - wallog_file_descriptor(src->path, FileGetRawDesc(src->vfd), -1); + wallog_file_descriptor(src->path, FileGetRawDesc(src->vfd), PG_UINT64_MAX); XLogBeginInsert(); XLogRegisterData((char *) (&xlrec), sizeof(xlrec)); From bdf84e1e1028f999e45542edb90203e5406fbfb5 Mon Sep 17 00:00:00 2001 From: Konstantin Knizhnik Date: Tue, 17 Dec 2024 21:03:20 +0200 Subject: [PATCH 09/10] Address review comments --- src/backend/utils/misc/guc_tables.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c index 388efeca602..8b82095d596 100644 --- a/src/backend/utils/misc/guc_tables.c +++ b/src/backend/utils/misc/guc_tables.c @@ -3657,7 +3657,7 @@ struct config_int ConfigureNamesInt[] = GUC_UNIT_KB }, &neon_pgstat_file_size_limit, - 1024, 100, 1048576, + 1024, 0, 1000000, NULL, NULL, NULL }, From 27ca38829c2ed792690ad6e740d71c71d951ff25 Mon Sep 17 00:00:00 2001 From: Konstantin Knizhnik Date: Tue, 17 Dec 2024 21:57:36 +0200 Subject: [PATCH 10/10] Address review comments --- src/backend/access/transam/xlog.c | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 51c5ff7f60a..c6a9a18016e 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -8011,6 +8011,9 @@ PreCheckPointGuts(int flags) if (flags & CHECKPOINT_IS_SHUTDOWN) { CheckPointReplicationState(flags); + /* + * pgstat_write_statsfile also persists information using AUX mechanism so do it here to avoid panic + */ pgstat_write_statsfile(); } }