Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement replica prewarm #9466

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions libs/postgres_ffi/src/pg_constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ pub const XLOG_NEON_HEAP_UPDATE: u8 = 0x20;
pub const XLOG_NEON_HEAP_HOT_UPDATE: u8 = 0x30;
pub const XLOG_NEON_HEAP_LOCK: u8 = 0x40;
pub const XLOG_NEON_HEAP_MULTI_INSERT: u8 = 0x50;
pub const XLOG_NEON_LFC_PREWARM: u8 = 0x60;

pub const XLOG_NEON_HEAP_VISIBLE: u8 = 0x40;

Expand Down
1 change: 1 addition & 0 deletions pageserver/src/walingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1159,6 +1159,7 @@ impl WalIngest {
flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
}
}
pg_constants::XLOG_NEON_LFC_PREWARM => {}
info => bail!("Unknown WAL record type for Neon RMGR: {}", info),
}
}
Expand Down
120 changes: 105 additions & 15 deletions pgxn/neon/file_cache.c
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,17 @@
#include "neon_pgversioncompat.h"

#include "access/parallel.h"
#include "access/xlog.h"
#include "access/xloginsert.h"
#include "funcapi.h"
#include "miscadmin.h"
#include "pagestore_client.h"
#include "file_cache.h"
#include "common/hashfn.h"
#include "pgstat.h"
#include "port/pg_iovec.h"
#include "postmaster/bgworker.h"
#include "postmaster/interrupt.h"
#include RELFILEINFO_HDR
#include "storage/buf_internals.h"
#include "storage/fd.h"
Expand All @@ -39,12 +43,18 @@
#include "utils/builtins.h"
#include "utils/dynahash.h"
#include "utils/guc.h"
#include "../neon_rmgr/neon_rmgr.h"

#include "hll.h"
#include "bitmap.h"
#include "neon.h"
#include "neon_perf_counters.h"

#if PG_VERSION_NUM>=160000
#include "access/neon_xlog.h"
#endif


#define CriticalAssert(cond) do if (!(cond)) elog(PANIC, "Assertion %s failed at %s:%d: ", #cond, __FILE__, __LINE__); while (0)

/*
Expand Down Expand Up @@ -78,29 +88,17 @@
* before extending the nominal size of the file.
*/

/* Local file storage allocation chunk.
* Should be power of two. Using larger than page chunks can
* 1. Reduce hash-map memory footprint: 8TB database contains billion pages
* and size of hash entry is 40 bytes, so we need 40Gb just for hash map.
* 1Mb chunks can reduce hash map size to 320Mb.
* 2. Improve access locality, subsequent pages will be allocated together improving seqscan speed
*/
#define BLOCKS_PER_CHUNK 128 /* 1Mb chunk */
/*
* Smaller chunk seems to be better for OLTP workload
*/
// #define BLOCKS_PER_CHUNK 8 /* 64kb chunk */
#define MB ((uint64)1024*1024)

#define SIZE_MB_TO_CHUNKS(size) ((uint32)((size) * MB / BLCKSZ / BLOCKS_PER_CHUNK))
#define CHUNK_BITMAP_SIZE ((BLOCKS_PER_CHUNK + 31) / 32)

typedef struct FileCacheEntry
{
BufferTag key;
uint32 hash;
uint32 offset;
uint32 access_count;
uint32 access_count : 31;
uint32 synced : 1;
uint32 bitmap[CHUNK_BITMAP_SIZE];
dlist_node list_node; /* LRU/holes list node */
} FileCacheEntry;
Expand All @@ -124,11 +122,15 @@ typedef struct FileCacheControl
HyperLogLogState wss_estimation; /* estimation of working set size */
} FileCacheControl;

#define LFC_MAX_PREWARM_SIZE 1024
#define LFC_PREWARM_POLL_INTERVAL 1000000 /* 1 second */

static HTAB *lfc_hash;
static int lfc_desc = 0;
static LWLockId lfc_lock;
static int lfc_max_size;
static int lfc_size_limit;
static int lfc_prewarm_rate;
static char *lfc_path;
static FileCacheControl *lfc_ctl;
static shmem_startup_hook_type prev_shmem_startup_hook;
Expand Down Expand Up @@ -374,6 +376,7 @@ lfc_change_limit_hook(int newval, void *extra)
hole->hash = hash;
hole->offset = offset;
hole->access_count = 0;
hole->synced = 0;
CriticalAssert(!found);
dlist_push_tail(&lfc_ctl->holes, &hole->list_node);

Expand All @@ -388,6 +391,26 @@ lfc_change_limit_hook(int newval, void *extra)
LWLockRelease(lfc_lock);
}

static void
lfc_register_prewarm_worker()
{
#if PG_MAJORVERSION_NUM >= 16
BackgroundWorker bgw;
memset(&bgw, 0, sizeof(bgw));
bgw.bgw_flags = BGWORKER_SHMEM_ACCESS;
bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
snprintf(bgw.bgw_library_name, BGW_MAXLEN, "neon");
snprintf(bgw.bgw_function_name, BGW_MAXLEN, "FileCachePrewarmMain");
snprintf(bgw.bgw_name, BGW_MAXLEN, "LFC prewarm");
snprintf(bgw.bgw_type, BGW_MAXLEN, "LFC prewarm");
bgw.bgw_restart_time = 5;
bgw.bgw_notify_pid = 0;
bgw.bgw_main_arg = (Datum) 0;

RegisterBackgroundWorker(&bgw);
#endif
}

void
lfc_init(void)
{
Expand Down Expand Up @@ -436,6 +459,19 @@ lfc_init(void)
NULL,
NULL);

DefineCustomIntVariable("neon.file_cache_prewarm_rate",
"Interval of generating prewarm WAL records",
NULL,
&lfc_prewarm_rate,
0, /* disabled by default */
0,
INT_MAX,
PGC_SIGHUP,
GUC_UNIT_MS,
NULL,
NULL,
NULL);

if (lfc_max_size == 0)
return;

Expand All @@ -447,6 +483,8 @@ lfc_init(void)
#else
lfc_shmem_request();
#endif

lfc_register_prewarm_worker();
}

/*
Expand Down Expand Up @@ -693,7 +731,7 @@ lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,

CriticalAssert(BufTagGetRelNumber(&tag) != InvalidRelFileNumber);

/*
/*
* For every chunk that has blocks we're interested in, we
* 1. get the chunk header
* 2. Check if the chunk actually has the blocks we're interested in
Expand Down Expand Up @@ -961,6 +999,7 @@ lfc_writev(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
entry->hash = hash;
memset(entry->bitmap, 0, sizeof entry->bitmap);
}
entry->synced = false;

generation = lfc_ctl->generation;
entry_offset = entry->offset;
Expand Down Expand Up @@ -1013,6 +1052,57 @@ lfc_writev(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
}
}

#if PG_MAJORVERSION_NUM >= 16
PGDLLEXPORT void
FileCachePrewarmMain(Datum main_arg)
{
pqsignal(SIGHUP, SignalHandlerForConfigReload);
pqsignal(SIGTERM, SignalHandlerForShutdownRequest);
BackgroundWorkerUnblockSignals();

while (!ShutdownRequestPending)
{
FileCacheEntryDesc prewarm[LFC_MAX_PREWARM_SIZE];
size_t n_prewarm = 0;
dlist_iter iter;

pg_usleep(lfc_prewarm_rate ? lfc_prewarm_rate*1000 : LFC_PREWARM_POLL_INTERVAL);

CHECK_FOR_INTERRUPTS();

if (lfc_prewarm_rate == 0)
continue;

LWLockAcquire(lfc_lock, LW_EXCLUSIVE);

/* First send most recently used entryies */
dlist_reverse_foreach(iter, &lfc_ctl->lru)
{
FileCacheEntry* entry = dlist_container(FileCacheEntry, list_node, iter.cur);
if (!entry->synced)
{
prewarm[n_prewarm].key = entry->key;
memcpy(prewarm[n_prewarm].bitmap, entry->bitmap, sizeof(entry->bitmap));
entry->synced = true;
if (++n_prewarm == LFC_MAX_PREWARM_SIZE)
break;
}
}
LWLockRelease(lfc_lock);

if (n_prewarm > 0)
{
XLogBeginInsert();
XLogRegisterData((char *) &prewarm, n_prewarm*sizeof(FileCacheEntryDesc));
XLogFlush(XLogInsert(RM_NEON_ID, XLOG_NEON_LFC_PREWARM));
}
}
}
#endif

/*
* Admin functions
*/
typedef struct
{
TupleDesc tupdesc;
Expand Down
64 changes: 64 additions & 0 deletions pgxn/neon/file_cache.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*-------------------------------------------------------------------------
*
* file_cache.h
*
*
* Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
*-------------------------------------------------------------------------
*/
#ifndef file_cache_h
#define file_cache_h

#include "neon_pgversioncompat.h"

/* Local file storage allocation chunk.
* Should be power of two. Using larger than page chunks can
* 1. Reduce hash-map memory footprint: 8TB database contains billion pages
* and size of hash entry is 40 bytes, so we need 40Gb just for hash map.
* 1Mb chunks can reduce hash map size to 320Mb.
* 2. Improve access locality, subsequent pages will be allocated together improving seqscan speed
*/
#define BLOCKS_PER_CHUNK 128 /* 1Mb chunk */
#define CHUNK_BITMAP_SIZE ((BLOCKS_PER_CHUNK + 31) / 32)

typedef struct
{
BufferTag key;
uint32 bitmap[CHUNK_BITMAP_SIZE];
} FileCacheEntryDesc;

PGDLLEXPORT void FileCachePrewarmMain(Datum main_arg);

extern void lfc_writev(NRelFileInfo rinfo, ForkNumber forkNum,
BlockNumber blkno, const void *const *buffers,
BlockNumber nblocks);
/* returns number of blocks read, with one bit set in *read for each */
extern int lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum,
BlockNumber blkno, void **buffers,
BlockNumber nblocks, bits8 *mask);

extern bool lfc_cache_contains(NRelFileInfo rinfo, ForkNumber forkNum,
BlockNumber blkno);
extern int lfc_cache_containsv(NRelFileInfo rinfo, ForkNumber forkNum,
BlockNumber blkno, int nblocks, bits8 *bitmap);
extern void lfc_evict(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno);
extern void lfc_init(void);

static inline bool
lfc_read(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
void *buffer)
{
bits8 rv = 0;
return lfc_readv_select(rinfo, forkNum, blkno, &buffer, 1, &rv) == 1;
}

static inline void
lfc_write(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
const void *buffer)
{
return lfc_writev(rinfo, forkNum, blkno, &buffer, 1);
}

#endif
1 change: 1 addition & 0 deletions pgxn/neon/libpagestore.c
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include "neon_perf_counters.h"
#include "neon_utils.h"
#include "pagestore_client.h"
#include "file_cache.h"
#include "walproposer.h"

#define PageStoreTrace DEBUG5
Expand Down
30 changes: 0 additions & 30 deletions pgxn/neon/pagestore_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -261,35 +261,5 @@ extern void set_cached_relsize(NRelFileInfo rinfo, ForkNumber forknum, BlockNumb
extern void update_cached_relsize(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber size);
extern void forget_cached_relsize(NRelFileInfo rinfo, ForkNumber forknum);

/* functions for local file cache */
extern void lfc_writev(NRelFileInfo rinfo, ForkNumber forkNum,
BlockNumber blkno, const void *const *buffers,
BlockNumber nblocks);
/* returns number of blocks read, with one bit set in *read for each */
extern int lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum,
BlockNumber blkno, void **buffers,
BlockNumber nblocks, bits8 *mask);

extern bool lfc_cache_contains(NRelFileInfo rinfo, ForkNumber forkNum,
BlockNumber blkno);
extern int lfc_cache_containsv(NRelFileInfo rinfo, ForkNumber forkNum,
BlockNumber blkno, int nblocks, bits8 *bitmap);
extern void lfc_evict(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno);
extern void lfc_init(void);

static inline bool
lfc_read(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
void *buffer)
{
bits8 rv = 0;
return lfc_readv_select(rinfo, forkNum, blkno, &buffer, 1, &rv) == 1;
}

static inline void
lfc_write(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
const void *buffer)
{
return lfc_writev(rinfo, forkNum, blkno, &buffer, 1);
}

#endif
1 change: 1 addition & 0 deletions pgxn/neon/pagestore_smgr.c
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@

#include "neon_perf_counters.h"
#include "pagestore_client.h"
#include "file_cache.h"
#include "bitmap.h"

#if PG_VERSION_NUM >= 150000
Expand Down
Loading
Loading