diff --git a/libs/postgres_ffi/src/pg_constants.rs b/libs/postgres_ffi/src/pg_constants.rs index 497d011d7a20..318bac5c7860 100644 --- a/libs/postgres_ffi/src/pg_constants.rs +++ b/libs/postgres_ffi/src/pg_constants.rs @@ -184,6 +184,7 @@ pub const XLOG_NEON_HEAP_UPDATE: u8 = 0x20; pub const XLOG_NEON_HEAP_HOT_UPDATE: u8 = 0x30; pub const XLOG_NEON_HEAP_LOCK: u8 = 0x40; pub const XLOG_NEON_HEAP_MULTI_INSERT: u8 = 0x50; +pub const XLOG_NEON_LFC_PREWARM: u8 = 0x60; pub const XLOG_NEON_HEAP_VISIBLE: u8 = 0x40; diff --git a/pageserver/src/walingest.rs b/pageserver/src/walingest.rs index 95d1f769205c..db292ae41dd7 100644 --- a/pageserver/src/walingest.rs +++ b/pageserver/src/walingest.rs @@ -1159,6 +1159,7 @@ impl WalIngest { flags = pg_constants::VISIBILITYMAP_ALL_FROZEN; } } + pg_constants::XLOG_NEON_LFC_PREWARM => {} info => bail!("Unknown WAL record type for Neon RMGR: {}", info), } } diff --git a/pgxn/neon/file_cache.c b/pgxn/neon/file_cache.c index 70b250d3945d..613ac37cab35 100644 --- a/pgxn/neon/file_cache.c +++ b/pgxn/neon/file_cache.c @@ -22,13 +22,17 @@ #include "neon_pgversioncompat.h" #include "access/parallel.h" +#include "access/xlog.h" +#include "access/xloginsert.h" #include "funcapi.h" #include "miscadmin.h" #include "pagestore_client.h" +#include "file_cache.h" #include "common/hashfn.h" #include "pgstat.h" #include "port/pg_iovec.h" #include "postmaster/bgworker.h" +#include "postmaster/interrupt.h" #include RELFILEINFO_HDR #include "storage/buf_internals.h" #include "storage/fd.h" @@ -39,12 +43,18 @@ #include "utils/builtins.h" #include "utils/dynahash.h" #include "utils/guc.h" +#include "../neon_rmgr/neon_rmgr.h" #include "hll.h" #include "bitmap.h" #include "neon.h" #include "neon_perf_counters.h" +#if PG_VERSION_NUM>=160000 +#include "access/neon_xlog.h" +#endif + + #define CriticalAssert(cond) do if (!(cond)) elog(PANIC, "Assertion %s failed at %s:%d: ", #cond, __FILE__, __LINE__); while (0) /* @@ -78,29 +88,17 @@ * before extending the nominal size of the file. */ -/* Local file storage allocation chunk. - * Should be power of two. Using larger than page chunks can - * 1. Reduce hash-map memory footprint: 8TB database contains billion pages - * and size of hash entry is 40 bytes, so we need 40Gb just for hash map. - * 1Mb chunks can reduce hash map size to 320Mb. - * 2. Improve access locality, subsequent pages will be allocated together improving seqscan speed - */ -#define BLOCKS_PER_CHUNK 128 /* 1Mb chunk */ -/* - * Smaller chunk seems to be better for OLTP workload - */ -// #define BLOCKS_PER_CHUNK 8 /* 64kb chunk */ #define MB ((uint64)1024*1024) #define SIZE_MB_TO_CHUNKS(size) ((uint32)((size) * MB / BLCKSZ / BLOCKS_PER_CHUNK)) -#define CHUNK_BITMAP_SIZE ((BLOCKS_PER_CHUNK + 31) / 32) typedef struct FileCacheEntry { BufferTag key; uint32 hash; uint32 offset; - uint32 access_count; + uint32 access_count : 31; + uint32 synced : 1; uint32 bitmap[CHUNK_BITMAP_SIZE]; dlist_node list_node; /* LRU/holes list node */ } FileCacheEntry; @@ -124,11 +122,15 @@ typedef struct FileCacheControl HyperLogLogState wss_estimation; /* estimation of working set size */ } FileCacheControl; +#define LFC_MAX_PREWARM_SIZE 1024 +#define LFC_PREWARM_POLL_INTERVAL 1000000 /* 1 second */ + static HTAB *lfc_hash; static int lfc_desc = 0; static LWLockId lfc_lock; static int lfc_max_size; static int lfc_size_limit; +static int lfc_prewarm_rate; static char *lfc_path; static FileCacheControl *lfc_ctl; static shmem_startup_hook_type prev_shmem_startup_hook; @@ -374,6 +376,7 @@ lfc_change_limit_hook(int newval, void *extra) hole->hash = hash; hole->offset = offset; hole->access_count = 0; + hole->synced = 0; CriticalAssert(!found); dlist_push_tail(&lfc_ctl->holes, &hole->list_node); @@ -388,6 +391,26 @@ lfc_change_limit_hook(int newval, void *extra) LWLockRelease(lfc_lock); } +static void +lfc_register_prewarm_worker() +{ +#if PG_MAJORVERSION_NUM >= 16 + BackgroundWorker bgw; + memset(&bgw, 0, sizeof(bgw)); + bgw.bgw_flags = BGWORKER_SHMEM_ACCESS; + bgw.bgw_start_time = BgWorkerStart_RecoveryFinished; + snprintf(bgw.bgw_library_name, BGW_MAXLEN, "neon"); + snprintf(bgw.bgw_function_name, BGW_MAXLEN, "FileCachePrewarmMain"); + snprintf(bgw.bgw_name, BGW_MAXLEN, "LFC prewarm"); + snprintf(bgw.bgw_type, BGW_MAXLEN, "LFC prewarm"); + bgw.bgw_restart_time = 5; + bgw.bgw_notify_pid = 0; + bgw.bgw_main_arg = (Datum) 0; + + RegisterBackgroundWorker(&bgw); +#endif +} + void lfc_init(void) { @@ -436,6 +459,19 @@ lfc_init(void) NULL, NULL); + DefineCustomIntVariable("neon.file_cache_prewarm_rate", + "Interval of generating prewarm WAL records", + NULL, + &lfc_prewarm_rate, + 0, /* disabled by default */ + 0, + INT_MAX, + PGC_SIGHUP, + GUC_UNIT_MS, + NULL, + NULL, + NULL); + if (lfc_max_size == 0) return; @@ -447,6 +483,8 @@ lfc_init(void) #else lfc_shmem_request(); #endif + + lfc_register_prewarm_worker(); } /* @@ -693,7 +731,7 @@ lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, CriticalAssert(BufTagGetRelNumber(&tag) != InvalidRelFileNumber); - /* + /* * For every chunk that has blocks we're interested in, we * 1. get the chunk header * 2. Check if the chunk actually has the blocks we're interested in @@ -961,6 +999,7 @@ lfc_writev(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, entry->hash = hash; memset(entry->bitmap, 0, sizeof entry->bitmap); } + entry->synced = false; generation = lfc_ctl->generation; entry_offset = entry->offset; @@ -1013,6 +1052,57 @@ lfc_writev(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, } } +#if PG_MAJORVERSION_NUM >= 16 +PGDLLEXPORT void +FileCachePrewarmMain(Datum main_arg) +{ + pqsignal(SIGHUP, SignalHandlerForConfigReload); + pqsignal(SIGTERM, SignalHandlerForShutdownRequest); + BackgroundWorkerUnblockSignals(); + + while (!ShutdownRequestPending) + { + FileCacheEntryDesc prewarm[LFC_MAX_PREWARM_SIZE]; + size_t n_prewarm = 0; + dlist_iter iter; + + pg_usleep(lfc_prewarm_rate ? lfc_prewarm_rate*1000 : LFC_PREWARM_POLL_INTERVAL); + + CHECK_FOR_INTERRUPTS(); + + if (lfc_prewarm_rate == 0) + continue; + + LWLockAcquire(lfc_lock, LW_EXCLUSIVE); + + /* First send most recently used entryies */ + dlist_reverse_foreach(iter, &lfc_ctl->lru) + { + FileCacheEntry* entry = dlist_container(FileCacheEntry, list_node, iter.cur); + if (!entry->synced) + { + prewarm[n_prewarm].key = entry->key; + memcpy(prewarm[n_prewarm].bitmap, entry->bitmap, sizeof(entry->bitmap)); + entry->synced = true; + if (++n_prewarm == LFC_MAX_PREWARM_SIZE) + break; + } + } + LWLockRelease(lfc_lock); + + if (n_prewarm > 0) + { + XLogBeginInsert(); + XLogRegisterData((char *) &prewarm, n_prewarm*sizeof(FileCacheEntryDesc)); + XLogFlush(XLogInsert(RM_NEON_ID, XLOG_NEON_LFC_PREWARM)); + } + } +} +#endif + +/* + * Admin functions + */ typedef struct { TupleDesc tupdesc; diff --git a/pgxn/neon/file_cache.h b/pgxn/neon/file_cache.h new file mode 100644 index 000000000000..619daa696fa6 --- /dev/null +++ b/pgxn/neon/file_cache.h @@ -0,0 +1,64 @@ +/*------------------------------------------------------------------------- + * + * file_cache.h + * + * + * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + *------------------------------------------------------------------------- + */ +#ifndef file_cache_h +#define file_cache_h + +#include "neon_pgversioncompat.h" + +/* Local file storage allocation chunk. + * Should be power of two. Using larger than page chunks can + * 1. Reduce hash-map memory footprint: 8TB database contains billion pages + * and size of hash entry is 40 bytes, so we need 40Gb just for hash map. + * 1Mb chunks can reduce hash map size to 320Mb. + * 2. Improve access locality, subsequent pages will be allocated together improving seqscan speed + */ +#define BLOCKS_PER_CHUNK 128 /* 1Mb chunk */ +#define CHUNK_BITMAP_SIZE ((BLOCKS_PER_CHUNK + 31) / 32) + +typedef struct +{ + BufferTag key; + uint32 bitmap[CHUNK_BITMAP_SIZE]; +} FileCacheEntryDesc; + +PGDLLEXPORT void FileCachePrewarmMain(Datum main_arg); + +extern void lfc_writev(NRelFileInfo rinfo, ForkNumber forkNum, + BlockNumber blkno, const void *const *buffers, + BlockNumber nblocks); +/* returns number of blocks read, with one bit set in *read for each */ +extern int lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, + BlockNumber blkno, void **buffers, + BlockNumber nblocks, bits8 *mask); + +extern bool lfc_cache_contains(NRelFileInfo rinfo, ForkNumber forkNum, + BlockNumber blkno); +extern int lfc_cache_containsv(NRelFileInfo rinfo, ForkNumber forkNum, + BlockNumber blkno, int nblocks, bits8 *bitmap); +extern void lfc_evict(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno); +extern void lfc_init(void); + +static inline bool +lfc_read(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, + void *buffer) +{ + bits8 rv = 0; + return lfc_readv_select(rinfo, forkNum, blkno, &buffer, 1, &rv) == 1; +} + +static inline void +lfc_write(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, + const void *buffer) +{ + return lfc_writev(rinfo, forkNum, blkno, &buffer, 1); +} + +#endif diff --git a/pgxn/neon/libpagestore.c b/pgxn/neon/libpagestore.c index b60ae41af3cc..4f0460c23802 100644 --- a/pgxn/neon/libpagestore.c +++ b/pgxn/neon/libpagestore.c @@ -33,6 +33,7 @@ #include "neon_perf_counters.h" #include "neon_utils.h" #include "pagestore_client.h" +#include "file_cache.h" #include "walproposer.h" #define PageStoreTrace DEBUG5 diff --git a/pgxn/neon/pagestore_client.h b/pgxn/neon/pagestore_client.h index f905e3b0faa3..c9f80e8b9a9a 100644 --- a/pgxn/neon/pagestore_client.h +++ b/pgxn/neon/pagestore_client.h @@ -261,35 +261,5 @@ extern void set_cached_relsize(NRelFileInfo rinfo, ForkNumber forknum, BlockNumb extern void update_cached_relsize(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber size); extern void forget_cached_relsize(NRelFileInfo rinfo, ForkNumber forknum); -/* functions for local file cache */ -extern void lfc_writev(NRelFileInfo rinfo, ForkNumber forkNum, - BlockNumber blkno, const void *const *buffers, - BlockNumber nblocks); -/* returns number of blocks read, with one bit set in *read for each */ -extern int lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, - BlockNumber blkno, void **buffers, - BlockNumber nblocks, bits8 *mask); - -extern bool lfc_cache_contains(NRelFileInfo rinfo, ForkNumber forkNum, - BlockNumber blkno); -extern int lfc_cache_containsv(NRelFileInfo rinfo, ForkNumber forkNum, - BlockNumber blkno, int nblocks, bits8 *bitmap); -extern void lfc_evict(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno); -extern void lfc_init(void); - -static inline bool -lfc_read(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, - void *buffer) -{ - bits8 rv = 0; - return lfc_readv_select(rinfo, forkNum, blkno, &buffer, 1, &rv) == 1; -} - -static inline void -lfc_write(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, - const void *buffer) -{ - return lfc_writev(rinfo, forkNum, blkno, &buffer, 1); -} #endif diff --git a/pgxn/neon/pagestore_smgr.c b/pgxn/neon/pagestore_smgr.c index cbb0e2ae6d0b..fe51493d68df 100644 --- a/pgxn/neon/pagestore_smgr.c +++ b/pgxn/neon/pagestore_smgr.c @@ -68,6 +68,7 @@ #include "neon_perf_counters.h" #include "pagestore_client.h" +#include "file_cache.h" #include "bitmap.h" #if PG_VERSION_NUM >= 150000 diff --git a/pgxn/neon_rmgr/neon_rmgr.c b/pgxn/neon_rmgr/neon_rmgr.c index c3f726db847a..8c75951d79bb 100644 --- a/pgxn/neon_rmgr/neon_rmgr.c +++ b/pgxn/neon_rmgr/neon_rmgr.c @@ -13,9 +13,12 @@ #include "miscadmin.h" #include "storage/buf.h" #include "storage/bufmgr.h" +#include "storage/buf_internals.h" #include "storage/bufpage.h" #include "storage/freespace.h" #include "neon_rmgr.h" +#include "../neon/file_cache.h" +#include "../neon/neon_pgversioncompat.h" PG_MODULE_MAGIC; void _PG_init(void); @@ -30,6 +33,7 @@ static void redo_neon_heap_delete(XLogReaderState *record); static void redo_neon_heap_update(XLogReaderState *record, bool hot_update); static void redo_neon_heap_lock(XLogReaderState *record); static void redo_neon_heap_multi_insert(XLogReaderState *record); +static void redo_neon_lfc_prewarm(XLogReaderState *record); const static RmgrData NeonRmgr = { .rm_name = "neon", @@ -76,6 +80,9 @@ neon_rm_redo(XLogReaderState *record) case XLOG_NEON_HEAP_MULTI_INSERT: redo_neon_heap_multi_insert(record); break; + case XLOG_NEON_LFC_PREWARM: + redo_neon_lfc_prewarm(record); + break; default: elog(PANIC, "neon_rm_redo: unknown op code %u", info); } @@ -882,6 +889,28 @@ redo_neon_heap_multi_insert(XLogReaderState *record) XLogRecordPageWithFreeSpace(rlocator, blkno, freespace); } +static void +redo_neon_lfc_prewarm(XLogReaderState *record) +{ + FileCacheEntryDesc* entries = (FileCacheEntryDesc*)XLogRecGetData(record); + size_t n_entries = XLogRecGetDataLen(record)/sizeof(FileCacheEntryDesc); + char buf[BLCKSZ]; + + for (size_t i = 0; i < n_entries; i++) + { + FileCacheEntryDesc* entry = &entries[i]; + NRelFileInfo rinfo = BufTagGetNRelFileInfo(entry->key); + SMgrRelation reln = smgropen(rinfo, INVALID_PROC_NUMBER, RELPERSISTENCE_PERMANENT); + for (size_t j = 0; j < CHUNK_BITMAP_SIZE; j++) + { + if (entry->bitmap[j >> 5] & (1 << (i & 31))) + { + smgrread(reln, entry->key.forkNum, entry->key.blockNum + j, buf); + } + } + } +} + #else /* safeguard for older PostgreSQL versions */ PG_MODULE_MAGIC; diff --git a/pgxn/neon_rmgr/neon_rmgr.h b/pgxn/neon_rmgr/neon_rmgr.h index 2c26a928ad1c..632903dafd0c 100644 --- a/pgxn/neon_rmgr/neon_rmgr.h +++ b/pgxn/neon_rmgr/neon_rmgr.h @@ -5,6 +5,8 @@ #include "replication/decode.h" #include "replication/logical.h" +#define XLOG_NEON_LFC_PREWARM 0x60 + extern void neon_rm_desc(StringInfo buf, XLogReaderState *record); extern void neon_rm_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); extern const char *neon_rm_identify(uint8 info); diff --git a/pgxn/neon_rmgr/neon_rmgr_decode.c b/pgxn/neon_rmgr/neon_rmgr_decode.c index 66032c88f62c..ac7d5c8c2d96 100644 --- a/pgxn/neon_rmgr/neon_rmgr_decode.c +++ b/pgxn/neon_rmgr/neon_rmgr_decode.c @@ -456,6 +456,8 @@ neon_rm_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) if (SnapBuildProcessChange(builder, xid, buf->origptr)) DecodeNeonMultiInsert(ctx, buf); break; + case XLOG_NEON_LFC_PREWARM: + break; default: elog(ERROR, "unexpected RM_HEAP_ID record type: %u", info); break; diff --git a/pgxn/neon_rmgr/neon_rmgr_desc.c b/pgxn/neon_rmgr/neon_rmgr_desc.c index e8003a106661..3f1f2432a2ff 100644 --- a/pgxn/neon_rmgr/neon_rmgr_desc.c +++ b/pgxn/neon_rmgr/neon_rmgr_desc.c @@ -113,6 +113,10 @@ neon_rm_desc(StringInfo buf, XLogReaderState *record) xlrec->ntuples, &offset_elem_desc, NULL); } } + else if (info == XLOG_NEON_LFC_PREWARM) + { + appendStringInfo(buf, "%d chunks", XLogRecGetDataLen(record)); + } } const char * @@ -152,6 +156,9 @@ neon_rm_identify(uint8 info) case XLOG_NEON_HEAP_MULTI_INSERT | XLOG_NEON_INIT_PAGE: id = "MULTI_INSERT+INIT"; break; + case XLOG_NEON_LFC_PREWARM: + id = "LFC_PREWARM"; + break; } return id; diff --git a/test_runner/regress/test_replica_prewarm.py b/test_runner/regress/test_replica_prewarm.py new file mode 100644 index 000000000000..804a7a46a1c3 --- /dev/null +++ b/test_runner/regress/test_replica_prewarm.py @@ -0,0 +1,53 @@ +from __future__ import annotations + +import pytest +from fixtures.neon_fixtures import NeonEnv, wait_replica_caughtup +from fixtures.pg_version import PgVersion + + +def test_replica_prewarm(neon_simple_env: NeonEnv): + env = neon_simple_env + n_records = 1000000 + if env.pg_version < PgVersion.V16: + pytest.skip("NEON_RM is available only in PG16") + primary = env.endpoints.create_start( + branch_name="main", + endpoint_id="primary", + config_lines=[ + "autovacuum = off", + "shared_buffers=1MB", + "neon.max_file_cache_size=1GB", + "neon.file_cache_size_limit=1GB", + "neon.file_cache_prewarm_rate=1s", + ], + ) + p_con = primary.connect() + p_cur = p_con.cursor() + + p_cur.execute("create extension neon") + p_cur.execute("create table t(pk integer primary key, payload text default repeat('?', 128))") + + secondary = env.endpoints.new_replica_start( + origin=primary, + endpoint_id="secondary", + config_lines=[ + "autovacuum = off", + "shared_buffers=1MB", + "neon.max_file_cache_size=1GB", + "neon.file_cache_size_limit=1GB", + "neon.file_cache_prewarm_limit=1000", + ], + ) + p_cur.execute(f"insert into t (pk) values (generate_series(1,{n_records}))") + + wait_replica_caughtup(primary, secondary) + + s_con = secondary.connect() + s_cur = s_con.cursor() + + s_cur.execute("select lfc_value from neon_lfc_stats where lfc_key='file_cache_used_pages'") + lfc_used_pages = s_cur.fetchall()[0][0] + assert lfc_used_pages > 10000 + + s_cur.execute("select sum(pk) from t") + assert s_cur.fetchall()[0][0] == n_records * (n_records + 1) / 2