From ec8b8b941d8945b33a7c0d998cef5097c65770e4 Mon Sep 17 00:00:00 2001 From: Konstantin Knizhnik Date: Thu, 31 Oct 2024 10:12:37 +0200 Subject: [PATCH 01/10] Add functions to get LFC state, prewarm LFC and monitor prewarm process --- pgxn/neon/Makefile | 2 + pgxn/neon/file_cache.c | 393 ++++++++++++++++++++++-- pgxn/neon/neon--1.5--1.6.sql | 22 ++ pgxn/neon/neon--1.6--1.5.sql | 7 + test_runner/regress/test_lfc_prewarm.py | 50 +++ 5 files changed, 455 insertions(+), 19 deletions(-) create mode 100644 pgxn/neon/neon--1.5--1.6.sql create mode 100644 pgxn/neon/neon--1.6--1.5.sql create mode 100644 test_runner/regress/test_lfc_prewarm.py diff --git a/pgxn/neon/Makefile b/pgxn/neon/Makefile index c87ae59fd6af..b8cce81bb13c 100644 --- a/pgxn/neon/Makefile +++ b/pgxn/neon/Makefile @@ -34,6 +34,8 @@ DATA = \ neon--1.2--1.3.sql \ neon--1.3--1.4.sql \ neon--1.4--1.5.sql \ + neon--1.5--1.6.sql \ + neon--1.6--1.5.sql \ neon--1.5--1.4.sql \ neon--1.4--1.3.sql \ neon--1.3--1.2.sql \ diff --git a/pgxn/neon/file_cache.c b/pgxn/neon/file_cache.c index f49415be6869..9b9d7bee71b2 100644 --- a/pgxn/neon/file_cache.c +++ b/pgxn/neon/file_cache.c @@ -22,6 +22,7 @@ #include "neon_pgversioncompat.h" #include "access/parallel.h" +#include "access/xlog.h" #include "funcapi.h" #include "miscadmin.h" #include "pagestore_client.h" @@ -40,12 +41,16 @@ #include "utils/dynahash.h" #include "utils/guc.h" +#if PG_VERSION_NUM >= 150000 +#include "access/xlogrecovery.h" +#endif + #include "hll.h" #include "bitmap.h" #include "neon.h" #include "neon_perf_counters.h" -#define CriticalAssert(cond) do if (!(cond)) elog(PANIC, "Assertion %s failed at %s:%d: ", #cond, __FILE__, __LINE__); while (0) +#define CriticalAssert(cond) do if (!(cond)) elog(PANIC, "LFC: assertion %s failed at %s:%d: ", #cond, __FILE__, __LINE__); while (0) /* * Local file cache is used to temporary store relations pages in local file system. @@ -100,7 +105,9 @@ typedef struct FileCacheEntry BufferTag key; uint32 hash; uint32 offset; - uint32 access_count; + uint32 access_count : 30; + uint32 prewarm_requested : 1; /* entry should be filled by prewarm */ + uint32 prewarm_started : 1; /* chunk is written by lfc_prewarm */ uint32 bitmap[CHUNK_BITMAP_SIZE]; dlist_node list_node; /* LRU/holes list node */ } FileCacheEntry; @@ -118,17 +125,29 @@ typedef struct FileCacheControl uint64 writes; /* number of writes issued */ uint64 time_read; /* time spent reading (us) */ uint64 time_write; /* time spent writing (us) */ + uint32 prewarm_total_chunks; + uint32 prewarm_curr_chunk; + uint32 prewarmed_pages; + uint32 skipped_pages; dlist_head lru; /* double linked list for LRU replacement * algorithm */ dlist_head holes; /* double linked list of punched holes */ HyperLogLogState wss_estimation; /* estimation of working set size */ } FileCacheControl; +typedef struct FileCacheStateEntry +{ + BufferTag key; + uint32 bitmap[CHUNK_BITMAP_SIZE]; +} FileCacheStateEntry; + static HTAB *lfc_hash; static int lfc_desc = 0; static LWLockId lfc_lock; static int lfc_max_size; static int lfc_size_limit; +static int lfc_prewarm_limit; +static int lfc_prewarm_batch; static char *lfc_path; static FileCacheControl *lfc_ctl; static shmem_startup_hook_type prev_shmem_startup_hook; @@ -149,7 +168,7 @@ lfc_disable(char const *op) { int fd; - elog(WARNING, "Failed to %s local file cache at %s: %m, disabling local file cache", op, lfc_path); + elog(WARNING, "LFC: failed to %s local file cache at %s: %m, disabling local file cache", op, lfc_path); /* Invalidate hash */ LWLockAcquire(lfc_lock, LW_EXCLUSIVE); @@ -184,7 +203,7 @@ lfc_disable(char const *op) pgstat_report_wait_end(); if (rc < 0) - elog(WARNING, "Failed to truncate local file cache %s: %m", lfc_path); + elog(WARNING, "LFC: failed to truncate local file cache %s: %m", lfc_path); } } @@ -196,7 +215,7 @@ lfc_disable(char const *op) fd = BasicOpenFile(lfc_path, O_RDWR | O_CREAT | O_TRUNC); if (fd < 0) - elog(WARNING, "Failed to recreate local file cache %s: %m", lfc_path); + elog(WARNING, "LFC: failed to recreate local file cache %s: %m", lfc_path); else close(fd); @@ -267,14 +286,7 @@ lfc_shmem_startup(void) n_chunks + 1, n_chunks + 1, &info, HASH_ELEM | HASH_BLOBS); - lfc_ctl->generation = 0; - lfc_ctl->size = 0; - lfc_ctl->used = 0; - lfc_ctl->hits = 0; - lfc_ctl->misses = 0; - lfc_ctl->writes = 0; - lfc_ctl->time_read = 0; - lfc_ctl->time_write = 0; + memset(lfc_ctl, 0, sizeof *lfc_ctl); dlist_init(&lfc_ctl->lru); dlist_init(&lfc_ctl->holes); @@ -285,7 +297,7 @@ lfc_shmem_startup(void) fd = BasicOpenFile(lfc_path, O_RDWR | O_CREAT | O_TRUNC); if (fd < 0) { - elog(WARNING, "Failed to create local file cache %s: %m", lfc_path); + elog(WARNING, "LFC: failed to create local file cache %s: %m", lfc_path); lfc_ctl->limit = 0; } else @@ -327,7 +339,7 @@ lfc_check_limit_hook(int *newval, void **extra, GucSource source) { if (*newval > lfc_max_size) { - elog(ERROR, "neon.file_cache_size_limit can not be larger than neon.max_file_cache_size"); + elog(ERROR, "LFC: neon.file_cache_size_limit can not be larger than neon.max_file_cache_size"); return false; } return true; @@ -440,6 +452,32 @@ lfc_init(void) NULL, NULL); + DefineCustomIntVariable("neon.file_cache_prewarm_limit", + "Maximal number of prewarmed pages", + NULL, + &lfc_prewarm_limit, + 0, /* disabled by default */ + 0, + INT_MAX, + PGC_SIGHUP, + 0, + NULL, + NULL, + NULL); + + DefineCustomIntVariable("neon.file_cache_prewarm_batch", + "Number of pages retrivied by prewarm from page server", + NULL, + &lfc_prewarm_batch, + 64, + 1, + INT_MAX, + PGC_SIGHUP, + 0, + NULL, + NULL, + NULL); + if (lfc_max_size == 0) return; @@ -453,6 +491,247 @@ lfc_init(void) #endif } +static FileCacheStateEntry* +lfc_get_state(size_t* n_entries) +{ + size_t max_entries = *n_entries; + size_t i = 0; + FileCacheStateEntry* fs = (FileCacheStateEntry*)palloc(sizeof(FileCacheStateEntry) * max_entries); + + LWLockAcquire(lfc_lock, LW_SHARED); + + if (LFC_ENABLED()) + { + dlist_iter iter; + dlist_reverse_foreach(iter, &lfc_ctl->lru) + { + FileCacheEntry *entry = dlist_container(FileCacheEntry, list_node, iter.cur); + memcpy(&fs[i].key, &entry->key, sizeof entry->key); + memcpy(fs[i].bitmap, entry->bitmap, sizeof entry->bitmap); + if (++i == max_entries) + break; + } + elog(LOG, "LFC: save state of %ld chunks", (long)i); + } + + LWLockRelease(lfc_lock); + + *n_entries = i; + return fs; +} + +/* + * Prewarm LFC cache to the specified state. + * + * Prewarming can interfere with accesses to the pages by other backends. Usually access to LFC is protected by shared buffers: when Postgres + * is reading page, it pins shared buffer and enforces that only one backend is reading it, while other are waiting for read completion. + * + * But it is not true for prewarming: backend can fetch page itself, modify and then write it to LFC. At the + * same time `lfc_prewarm` tries to write deteriorated image of this page in LFC. To increase concurrency, access to LFC files (both read and write) + * is performed without holding locks. So it can happen that two or more processes write different content to the same location in the LFC file. + * Certainly we can not rely on disk content in this case. + * + * To solve this problem we use two flags in LFC entry: `prewarm_requested` and `prewarm_started`. First is set before prewarm is actually started. + * `lfc_prewarm` writes to LFC file only if this flag is set. This flag is cleared if any other backend performs write to this LFC chunk. + * In this case data loaded by `lfc_prewarm` is considered to be deteriorated and should be just ignored. + * + * But as far as write to LFC is performed without holding lock, there is no guarantee that no such write is in progress. + * This is why second flag is used: `prewarm_started`. It is set by `lfc_prewarm` when is starts writing page and cleared when write is completed. + * Any other backend writing to LFC should abandon it's write to LFC file (just not mark page as loaded in bitmap) if this flag is set. + * So neither `lfc_prewarm`, neither backend are saving page in LFC in this case - it is just skipped. + */ + +static void +lfc_prewarm(FileCacheStateEntry* fs, size_t n_entries) +{ + ssize_t rc; + size_t snd_idx = 0, rcv_idx = 0; + size_t n_sent = 0, n_received = 0; + FileCacheEntry *entry; + uint64 generation; + uint32 entry_offset; + uint32 hash; + size_t i; + bool found; + int shard_no; + + if (!lfc_ensure_opened()) + return; + + if (n_entries == 0 || fs == NULL) + { + elog(LOG, "LFC: prewarm is disabled"); + return; + } + + LWLockAcquire(lfc_lock, LW_EXCLUSIVE); + + /* Do not prewarm more entries than LFC limit */ + if (lfc_ctl->limit <= lfc_ctl->size) + { + LWLockRelease(lfc_lock); + return; + } + if (n_entries > lfc_ctl->limit - lfc_ctl->size) + { + n_entries = lfc_ctl->limit - lfc_ctl->size; + } + + /* Initialize fields used to track prewarming progress */ + lfc_ctl->prewarm_total_chunks = n_entries; + lfc_ctl->prewarm_curr_chunk = 0; + + /* + * Load LFC state and add entries in hash table. + * It is needed to track modification of prewarmed pages. + * All such entries have `prewarm_requested` flag set. When entry is updated (some backed reads or writes + * some pages from this chunk), then `prewarm_requested` flag is cleared, prohibiting prewarm of this chunk. + * It prevents overwritting page updated or loaded by backend with older one, loaded by prewarm. + */ + for (i = 0; i < n_entries; i++) + { + hash = get_hash_value(lfc_hash, &fs[i].key); + entry = hash_search_with_hash_value(lfc_hash, &fs[i].key, hash, HASH_ENTER, &found); + /* Do not prewarm chunks which are already present in LFC */ + if (!found) + { + entry->offset = lfc_ctl->size++; + entry->hash = hash; + entry->access_count = 0; + entry->prewarm_requested = true; + entry->prewarm_started = false; + memset(entry->bitmap, 0, sizeof entry->bitmap); + /* Most recently visted pages are stored first */ + dlist_push_head(&lfc_ctl->lru, &entry->list_node); + lfc_ctl->used += 1; + } + } + LWLockRelease(lfc_lock); + + elog(LOG, "LFC: start loading %ld chunks", (long)n_entries); + + while (true) + { + size_t chunk_no = snd_idx / BLOCKS_PER_CHUNK; + size_t offs_in_chunk = snd_idx % BLOCKS_PER_CHUNK; + if (chunk_no < n_entries) + { + if (fs[chunk_no].bitmap[offs_in_chunk >> 5] & (1 << (offs_in_chunk & 31))) + { + /* + * In case of prewarming replica we should be careful not to load too new version + * of the page - with LSN larger than current replay LSN. + * At primary we are always loading latest version. + */ + XLogRecPtr req_lsn = RecoveryInProgress() ? GetXLogReplayRecPtr(NULL) : UINT64_MAX; + + NeonGetPageRequest request = { + .req.tag = T_NeonGetPageRequest, + /* lsn and not_modified_since are filled in below */ + .rinfo = BufTagGetNRelFileInfo(fs[chunk_no].key), + .forknum = fs[chunk_no].key.forkNum, + .blkno = fs[chunk_no].key.blockNum + offs_in_chunk, + .req.lsn = req_lsn, + .req.not_modified_since = 0 + }; + shard_no = get_shard_number(&fs[chunk_no].key); + while (!page_server->send(shard_no, (NeonRequest *) &request) + || !page_server->flush(shard_no)) + { + /* do nothing */ + } + n_sent += 1; + } + snd_idx += 1; + } + if (n_sent >= n_received + lfc_prewarm_batch || chunk_no == n_entries) + { + NeonResponse * resp; + do + { + chunk_no = rcv_idx / BLOCKS_PER_CHUNK; + offs_in_chunk = rcv_idx % BLOCKS_PER_CHUNK; + rcv_idx += 1; + } while (!(fs[chunk_no].bitmap[offs_in_chunk >> 5] & (1 << (offs_in_chunk & 31)))); + + shard_no = get_shard_number(&fs[chunk_no].key); + resp = page_server->receive(shard_no); + lfc_ctl->prewarm_curr_chunk = chunk_no; + + if (resp->tag != T_NeonGetPageResponse) + { + elog(LOG, "LFC: unexpected response type: %d", resp->tag); + return; + } + + hash = get_hash_value(lfc_hash, &fs[chunk_no].key); + + LWLockAcquire(lfc_lock, LW_EXCLUSIVE); + entry = hash_search_with_hash_value(lfc_hash, &fs[chunk_no].key, hash, HASH_FIND, NULL); + if (entry != NULL && entry->prewarm_requested) + { + /* Unlink entry from LRU list to pin it for the duration of IO operation */ + if (entry->access_count++ == 0) + dlist_delete(&entry->list_node); + + generation = lfc_ctl->generation; + entry_offset = entry->offset; + Assert(!entry->prewarm_started); + entry->prewarm_started = true; + + LWLockRelease(lfc_lock); + + rc = pwrite(lfc_desc, ((NeonGetPageResponse*)resp)->page, BLCKSZ, ((off_t) entry_offset * BLOCKS_PER_CHUNK + offs_in_chunk) * BLCKSZ); + if (rc != BLCKSZ) + { + lfc_disable("write"); + break; + } + else + { + LWLockAcquire(lfc_lock, LW_EXCLUSIVE); + + if (lfc_ctl->generation == generation) + { + CriticalAssert(LFC_ENABLED()); + if (--entry->access_count == 0) + dlist_push_tail(&lfc_ctl->lru, &entry->list_node); + if (entry->prewarm_requested) + { + lfc_ctl->used_pages += 1 - ((entry->bitmap[offs_in_chunk >> 5] >> (offs_in_chunk & 31)) & 1); + entry->bitmap[offs_in_chunk >> 5] |= 1 << (offs_in_chunk & 31); + lfc_ctl->prewarmed_pages += 1; + } + else + { + lfc_ctl->skipped_pages += 1; + } + Assert(entry->prewarm_started); + entry->prewarm_started = false; + } + + LWLockRelease(lfc_lock); + } + } + else + { + Assert(!entry || !entry->prewarm_started); + lfc_ctl->skipped_pages += 1; + LWLockRelease(lfc_lock); + } + + if (++n_received == n_sent && snd_idx >= n_entries * BLOCKS_PER_CHUNK) + { + break; + } + } + } + Assert(n_sent == n_received); + lfc_ctl->prewarm_curr_chunk = n_entries; + elog(LOG, "LFC: complete prewarming: loaded %ld pages", (long)n_received); +} + + /* * Check if page is present in the cache. * Returns true if page is found in local cache. @@ -620,6 +899,7 @@ lfc_evict(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno) /* remove the page from the cache */ entry->bitmap[chunk_offs >> 5] &= ~(1 << (chunk_offs & (32 - 1))); + entry->prewarm_requested = false; /* prohibit prewarm of this LFC entry */ if (entry->access_count == 0) { @@ -865,7 +1145,7 @@ lfc_writev(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, CriticalAssert(BufTagGetRelNumber(&tag) != InvalidRelFileNumber); - /* + /* * For every chunk that has blocks we're interested in, we * 1. get the chunk header * 2. Check if the chunk actually has the blocks we're interested in @@ -903,6 +1183,17 @@ lfc_writev(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, if (found) { + if (entry->prewarm_started) + { + /* + * Some page of this chunk is currently written by `lfc_prewarm`. + * We should give-up not to interfere with it. + * But clearing `prewarm_requested` flag also will not allow `lfc_prewarm` to fix it result. + */ + entry->prewarm_requested = false; + LWLockRelease(lfc_lock); + return; + } /* * Unlink entry from LRU list to pin it for the duration of IO * operation @@ -932,7 +1223,7 @@ lfc_writev(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, { /* Cache overflow: evict least recently used chunk */ FileCacheEntry *victim = dlist_container(FileCacheEntry, list_node, dlist_pop_head_node(&lfc_ctl->lru)); - + for (int i = 0; i < BLOCKS_PER_CHUNK; i++) { lfc_ctl->used_pages -= (victim->bitmap[i >> 5] >> (i & 31)) & 1; @@ -948,10 +1239,10 @@ lfc_writev(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, FileCacheEntry *hole = dlist_container(FileCacheEntry, list_node, dlist_pop_head_node(&lfc_ctl->holes)); uint32 offset = hole->offset; bool hole_found; - + hash_search_with_hash_value(lfc_hash, &hole->key, hole->hash, HASH_REMOVE, &hole_found); CriticalAssert(hole_found); - + lfc_ctl->used += 1; entry->offset = offset; /* reuse the hole */ } @@ -963,9 +1254,11 @@ lfc_writev(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, } entry->access_count = 1; entry->hash = hash; + entry->prewarm_started = false; memset(entry->bitmap, 0, sizeof entry->bitmap); } + entry->prewarm_requested = false; /* prohibit prewarm if LFC entry is updated by some backend */ generation = lfc_ctl->generation; entry_offset = entry->offset; LWLockRelease(lfc_lock); @@ -1338,3 +1631,65 @@ approximate_working_set_size(PG_FUNCTION_ARGS) } PG_RETURN_NULL(); } + +PG_FUNCTION_INFO_V1(get_local_cache_state); + +Datum +get_local_cache_state(PG_FUNCTION_ARGS) +{ + size_t n_entries = PG_ARGISNULL(0) ? lfc_prewarm_limit : PG_GETARG_INT32(0); + FileCacheStateEntry* fs = lfc_get_state(&n_entries); + size_t size_in_bytes = sizeof(FileCacheStateEntry) * n_entries; + bytea* res = (bytea*)palloc(VARHDRSZ + size_in_bytes); + + SET_VARSIZE(res, VARHDRSZ + size_in_bytes); + memcpy(VARDATA(res), fs, size_in_bytes); + pfree(fs); + + PG_RETURN_BYTEA_P(res); +} + +PG_FUNCTION_INFO_V1(prewarm_local_cache); + +Datum +prewarm_local_cache(PG_FUNCTION_ARGS) +{ + bytea* state = PG_GETARG_BYTEA_PP(0); + uint32 n_entries = VARSIZE_ANY_EXHDR(state)/sizeof(FileCacheStateEntry); + FileCacheStateEntry* fs = (FileCacheStateEntry*)VARDATA_ANY(state); + + lfc_prewarm(fs, n_entries); + + PG_RETURN_NULL(); +} + +PG_FUNCTION_INFO_V1(get_prewarm_info); + +Datum +get_prewarm_info(PG_FUNCTION_ARGS) +{ + Datum values[4]; + bool nulls[4]; + TupleDesc tupdesc; + + if (lfc_size_limit == 0) + PG_RETURN_NULL(); + + tupdesc = CreateTemplateTupleDesc(4); + TupleDescInitEntry(tupdesc, (AttrNumber) 1, "total_chunks", INT4OID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 2, "curr_chunk", INT4OID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 3, "prewarmed_pages", INT4OID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 4, "skipped_pages", INT4OID, -1, 0); + tupdesc = BlessTupleDesc(tupdesc); + + MemSet(nulls, 0, sizeof(nulls)); + LWLockAcquire(lfc_lock, LW_SHARED); + values[0] = Int32GetDatum(lfc_ctl->prewarm_total_chunks); + values[1] = Int32GetDatum(lfc_ctl->prewarm_curr_chunk); + values[2] = Int32GetDatum(lfc_ctl->prewarmed_pages); + values[3] = Int32GetDatum(lfc_ctl->skipped_pages); + LWLockRelease(lfc_lock); + + PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls))); +} + diff --git a/pgxn/neon/neon--1.5--1.6.sql b/pgxn/neon/neon--1.5--1.6.sql new file mode 100644 index 000000000000..c2f3895883ec --- /dev/null +++ b/pgxn/neon/neon--1.5--1.6.sql @@ -0,0 +1,22 @@ +\echo Use "ALTER EXTENSION neon UPDATE TO '1.6'" to load this file. \quit + +CREATE FUNCTION get_prewarm_info(out total_chunks integer, out curr_chunk integer, out prewarmed_pages integer, out skipped_pages integer) +RETURNS record +AS 'MODULE_PATHNAME', 'get_prewarm_info' +LANGUAGE C STRICT +PARALLEL SAFE; + +CREATE FUNCTION get_local_cache_state(max_chunks integer default null) +RETURNS bytea +AS 'MODULE_PATHNAME', 'get_local_cache_state' +LANGUAGE C +PARALLEL UNSAFE; + +CREATE FUNCTION prewarm_local_cache(state bytea) +RETURNS void +AS 'MODULE_PATHNAME', 'prewarm_local_cache' +LANGUAGE C STRICT +PARALLEL UNSAFE; + + + diff --git a/pgxn/neon/neon--1.6--1.5.sql b/pgxn/neon/neon--1.6--1.5.sql new file mode 100644 index 000000000000..0ff29933b885 --- /dev/null +++ b/pgxn/neon/neon--1.6--1.5.sql @@ -0,0 +1,7 @@ +DROP FUNCTION IF EXISTS get_prewarm_info(out total_chunks integer, out curr_chunk integer, out prewarmed_pages integer, out skipped_pages integer); + +DROP FUNCTION IF EXISTS get_local_cache_state(max_chunks integer); + +DROP FUNCTION IF EXISTS prewarm_local_cache(state bytea); + + diff --git a/test_runner/regress/test_lfc_prewarm.py b/test_runner/regress/test_lfc_prewarm.py new file mode 100644 index 000000000000..48afed0cf3bb --- /dev/null +++ b/test_runner/regress/test_lfc_prewarm.py @@ -0,0 +1,50 @@ + +from fixtures.log_helper import log +from fixtures.neon_fixtures import NeonEnv + + +def test_lfc_prewarm(neon_simple_env: NeonEnv): + env = neon_simple_env + n_records = 1000000 + + endpoint = env.endpoints.create_start( + branch_name="main", + config_lines=[ + "autovacuum = off", + "shared_buffers=1MB", + "neon.max_file_cache_size=1GB", + "neon.file_cache_size_limit=1GB", + "neon.file_cache_prewarm_limit=1000", + ], + ) + conn = endpoint.connect() + cur = conn.cursor() + cur.execute("create extension neon version '1.6'") + cur.execute("create table t(pk integer primary key, payload text default repeat('?', 128))") + cur.execute(f"insert into t (pk) values (generate_series(1,{n_records}))") + cur.execute("select get_local_cache_state()") + lfc_state = cur.fetchall()[0][0] + + endpoint.stop() + endpoint.start() + + conn = endpoint.connect() + cur = conn.cursor() + cur.execute("alter extension neon update to '1.6'") + cur.execute("select prewarm_local_cache(%s)", (lfc_state,)) + + cur.execute("select lfc_value from neon_lfc_stats where lfc_key='file_cache_used_pages'") + lfc_used_pages = cur.fetchall()[0][0] + log.info(f"Used LFC size: {lfc_used_pages}") + cur.execute("select * from get_prewarm_info()") + prewarm_info = cur.fetchall()[0] + log.info(f"Prewarm info: {prewarm_info}") + log.info(f"Prewarm progress: {prewarm_info[1]*100//prewarm_info[0]}%") + + assert lfc_used_pages > 10000 + assert prewarm_info[0] > 0 and prewarm_info[0] == prewarm_info[1] + + cur.execute("select sum(pk) from t") + assert cur.fetchall()[0][0] == n_records * (n_records + 1) / 2 + + assert prewarm_info[1] > 0 From dc1684efccc41d284169f217c261d8318d5f3cf0 Mon Sep 17 00:00:00 2001 From: Konstantin Knizhnik Date: Thu, 31 Oct 2024 18:26:42 +0200 Subject: [PATCH 02/10] Add delay between upgrade of extension version --- test_runner/regress/test_lfc_prewarm.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/test_runner/regress/test_lfc_prewarm.py b/test_runner/regress/test_lfc_prewarm.py index 48afed0cf3bb..f464d1ab9e07 100644 --- a/test_runner/regress/test_lfc_prewarm.py +++ b/test_runner/regress/test_lfc_prewarm.py @@ -1,3 +1,4 @@ +import time from fixtures.log_helper import log from fixtures.neon_fixtures import NeonEnv @@ -30,6 +31,7 @@ def test_lfc_prewarm(neon_simple_env: NeonEnv): conn = endpoint.connect() cur = conn.cursor() + time.sleep(1) # wait until compute_ctl complete downgrade of extension to default version cur.execute("alter extension neon update to '1.6'") cur.execute("select prewarm_local_cache(%s)", (lfc_state,)) From 7e2fb10cca4f2fa9859fb7750f2f0d1d39147ff2 Mon Sep 17 00:00:00 2001 From: Konstantin Knizhnik Date: Thu, 14 Nov 2024 21:14:45 +0200 Subject: [PATCH 03/10] Fix handling zero neon.file_cache_prewarm_limit --- pgxn/neon/file_cache.c | 23 ++++++++++++++++------- 1 file changed, 16 insertions(+), 7 deletions(-) diff --git a/pgxn/neon/file_cache.c b/pgxn/neon/file_cache.c index 9b9d7bee71b2..457beb0eb296 100644 --- a/pgxn/neon/file_cache.c +++ b/pgxn/neon/file_cache.c @@ -496,7 +496,12 @@ lfc_get_state(size_t* n_entries) { size_t max_entries = *n_entries; size_t i = 0; - FileCacheStateEntry* fs = (FileCacheStateEntry*)palloc(sizeof(FileCacheStateEntry) * max_entries); + FileCacheStateEntry* fs; + + if (lfc_maybe_disabled() || max_entries == 0) /* fast exit if file cache is disabled */ + return NULL; + + fs = (FileCacheStateEntry*)palloc(sizeof(FileCacheStateEntry) * max_entries); LWLockAcquire(lfc_lock, LW_SHARED); @@ -1639,14 +1644,18 @@ get_local_cache_state(PG_FUNCTION_ARGS) { size_t n_entries = PG_ARGISNULL(0) ? lfc_prewarm_limit : PG_GETARG_INT32(0); FileCacheStateEntry* fs = lfc_get_state(&n_entries); - size_t size_in_bytes = sizeof(FileCacheStateEntry) * n_entries; - bytea* res = (bytea*)palloc(VARHDRSZ + size_in_bytes); + if (fs != NULL) + { + size_t size_in_bytes = sizeof(FileCacheStateEntry) * n_entries; + bytea* res = (bytea*)palloc(VARHDRSZ + size_in_bytes); - SET_VARSIZE(res, VARHDRSZ + size_in_bytes); - memcpy(VARDATA(res), fs, size_in_bytes); - pfree(fs); + SET_VARSIZE(res, VARHDRSZ + size_in_bytes); + memcpy(VARDATA(res), fs, size_in_bytes); + pfree(fs); - PG_RETURN_BYTEA_P(res); + PG_RETURN_BYTEA_P(res); + } + PG_RETURN_NULL(); } PG_FUNCTION_INFO_V1(prewarm_local_cache); From e07eedca5d10eae1453f98e9ab7b46d4455700fd Mon Sep 17 00:00:00 2001 From: Konstantin Knizhnik Date: Thu, 12 Dec 2024 22:27:52 +0200 Subject: [PATCH 04/10] correctly handle PS disconect duriug prewarm --- pgxn/neon/file_cache.c | 22 +++++++++++++++++----- 1 file changed, 17 insertions(+), 5 deletions(-) diff --git a/pgxn/neon/file_cache.c b/pgxn/neon/file_cache.c index 457beb0eb296..3f0e226ed129 100644 --- a/pgxn/neon/file_cache.c +++ b/pgxn/neon/file_cache.c @@ -641,9 +641,10 @@ lfc_prewarm(FileCacheStateEntry* fs, size_t n_entries) }; shard_no = get_shard_number(&fs[chunk_no].key); while (!page_server->send(shard_no, (NeonRequest *) &request) - || !page_server->flush(shard_no)) + || !page_server->flush(shard_no)) { - /* do nothing */ + /* page server disconnected: all previusly sent prefetch requests are lost */ + n_sent = 0; } n_sent += 1; } @@ -663,10 +664,21 @@ lfc_prewarm(FileCacheStateEntry* fs, size_t n_entries) resp = page_server->receive(shard_no); lfc_ctl->prewarm_curr_chunk = chunk_no; - if (resp->tag != T_NeonGetPageResponse) + switch (resp->tag) { - elog(LOG, "LFC: unexpected response type: %d", resp->tag); - return; + case T_NeonGetPageResponse: + break; + case T_NeonErrorResponse: + { + /* Prefech can request page which is already dropped so PS can respond with error: just ignore it */ + NeonErrorResponse *err_resp = (NeonErrorResponse *) resp; + elog(LOG, "LFC: page server failed to load page %u of relation %u/%u/%u.%u: %s", + fs[chunk_no].key.blockNum + offs_in_chunk, RelFileInfoFmt(BufTagGetNRelFileInfo(fs[chunk_no].key)), fs[chunk_no].key.forkNum, err_resp->message); + continue; + } + default: + elog(LOG, "LFC: unexpected response type: %d", resp->tag); + return; } hash = get_hash_value(lfc_hash, &fs[chunk_no].key); From 7b80ad4950153917a42bdf9dd26144f1ea49a5d2 Mon Sep 17 00:00:00 2001 From: Konstantin Knizhnik Date: Sat, 14 Dec 2024 16:59:22 +0200 Subject: [PATCH 05/10] Fix format warning --- pgxn/neon/file_cache.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pgxn/neon/file_cache.c b/pgxn/neon/file_cache.c index 3f0e226ed129..44ca6e970167 100644 --- a/pgxn/neon/file_cache.c +++ b/pgxn/neon/file_cache.c @@ -673,7 +673,7 @@ lfc_prewarm(FileCacheStateEntry* fs, size_t n_entries) /* Prefech can request page which is already dropped so PS can respond with error: just ignore it */ NeonErrorResponse *err_resp = (NeonErrorResponse *) resp; elog(LOG, "LFC: page server failed to load page %u of relation %u/%u/%u.%u: %s", - fs[chunk_no].key.blockNum + offs_in_chunk, RelFileInfoFmt(BufTagGetNRelFileInfo(fs[chunk_no].key)), fs[chunk_no].key.forkNum, err_resp->message); + fs[chunk_no].key.blockNum + (BlockNumber)offs_in_chunk, RelFileInfoFmt(BufTagGetNRelFileInfo(fs[chunk_no].key)), fs[chunk_no].key.forkNum, err_resp->message); continue; } default: From 1f2b47c70f745647bb79570570620873437ae311 Mon Sep 17 00:00:00 2001 From: Konstantin Knizhnik Date: Sat, 14 Dec 2024 20:08:07 +0200 Subject: [PATCH 06/10] Set LFC path in test-lfc_prewarm test --- test_runner/regress/test_lfc_prewarm.py | 1 + 1 file changed, 1 insertion(+) diff --git a/test_runner/regress/test_lfc_prewarm.py b/test_runner/regress/test_lfc_prewarm.py index f464d1ab9e07..b92e60932a28 100644 --- a/test_runner/regress/test_lfc_prewarm.py +++ b/test_runner/regress/test_lfc_prewarm.py @@ -13,6 +13,7 @@ def test_lfc_prewarm(neon_simple_env: NeonEnv): config_lines=[ "autovacuum = off", "shared_buffers=1MB", + "neon.file_cache_path='file.cache'", "neon.max_file_cache_size=1GB", "neon.file_cache_size_limit=1GB", "neon.file_cache_prewarm_limit=1000", From 07027bde7d3f2c68b23272afc5f0207352822ffc Mon Sep 17 00:00:00 2001 From: Konstantin Knizhnik Date: Sat, 14 Dec 2024 21:11:20 +0200 Subject: [PATCH 07/10] Do not run test_lfc_prewarm test without LFC --- test_runner/regress/test_lfc_prewarm.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/test_runner/regress/test_lfc_prewarm.py b/test_runner/regress/test_lfc_prewarm.py index b92e60932a28..e5d0c79dc68c 100644 --- a/test_runner/regress/test_lfc_prewarm.py +++ b/test_runner/regress/test_lfc_prewarm.py @@ -1,9 +1,11 @@ import time +import pytest from fixtures.log_helper import log from fixtures.neon_fixtures import NeonEnv +@pytest.mark.skipif(not USE_LFC, reason="LFC is disabled, skipping") def test_lfc_prewarm(neon_simple_env: NeonEnv): env = neon_simple_env n_records = 1000000 @@ -13,7 +15,6 @@ def test_lfc_prewarm(neon_simple_env: NeonEnv): config_lines=[ "autovacuum = off", "shared_buffers=1MB", - "neon.file_cache_path='file.cache'", "neon.max_file_cache_size=1GB", "neon.file_cache_size_limit=1GB", "neon.file_cache_prewarm_limit=1000", From 1d77fb0dea3efb2f5f722eec4c1f3b2b75cfa2bb Mon Sep 17 00:00:00 2001 From: Konstantin Knizhnik Date: Sat, 14 Dec 2024 21:58:26 +0200 Subject: [PATCH 08/10] Eliminate stale reads from LFC in case of prewarm conflict --- pgxn/neon/file_cache.c | 29 ++++++++++++++++++----------- 1 file changed, 18 insertions(+), 11 deletions(-) diff --git a/pgxn/neon/file_cache.c b/pgxn/neon/file_cache.c index 44ca6e970167..f066abc43dd5 100644 --- a/pgxn/neon/file_cache.c +++ b/pgxn/neon/file_cache.c @@ -1162,6 +1162,14 @@ lfc_writev(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, CriticalAssert(BufTagGetRelNumber(&tag) != InvalidRelFileNumber); + LWLockAcquire(lfc_lock, LW_EXCLUSIVE); + + if (!LFC_ENABLED()) + { + LWLockRelease(lfc_lock); + return; + } + /* * For every chunk that has blocks we're interested in, we * 1. get the chunk header @@ -1188,14 +1196,6 @@ lfc_writev(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, tag.blockNum = blkno & ~(BLOCKS_PER_CHUNK - 1); hash = get_hash_value(lfc_hash, &tag); - LWLockAcquire(lfc_lock, LW_EXCLUSIVE); - - if (!LFC_ENABLED()) - { - LWLockRelease(lfc_lock); - return; - } - entry = hash_search_with_hash_value(lfc_hash, &tag, hash, HASH_ENTER, &found); if (found) @@ -1208,8 +1208,13 @@ lfc_writev(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, * But clearing `prewarm_requested` flag also will not allow `lfc_prewarm` to fix it result. */ entry->prewarm_requested = false; - LWLockRelease(lfc_lock); - return; + /* cleanup all affected pages of the chunk: we do not know which one of them is conflicting with prewarm */ + for (int i = 0; i < blocks_in_chunk; i++) + { + lfc_ctl->used_pages -= ((entry->bitmap[(chunk_offs + i) >> 5] >> ((chunk_offs + i) & 31)) & 1); + entry->bitmap[(chunk_offs + i) >> 5] &= ~(1 << ((chunk_offs + i) & 31)); + } + goto next_chunk; } /* * Unlink entry from LRU list to pin it for the duration of IO @@ -1290,6 +1295,7 @@ lfc_writev(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, if (rc != BLCKSZ * blocks_in_chunk) { lfc_disable("write"); + return; } else { @@ -1319,12 +1325,13 @@ lfc_writev(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, } } - LWLockRelease(lfc_lock); } + next_chunk: blkno += blocks_in_chunk; buf_offset += blocks_in_chunk; nblocks -= blocks_in_chunk; } + LWLockRelease(lfc_lock); } typedef struct From 5f74ff130729672a71c0da9e36bcdc81a4a12576 Mon Sep 17 00:00:00 2001 From: Konstantin Knizhnik Date: Sun, 15 Dec 2024 08:02:35 +0200 Subject: [PATCH 09/10] Make ruffhappy --- test_runner/regress/test_lfc_prewarm.py | 1 + 1 file changed, 1 insertion(+) diff --git a/test_runner/regress/test_lfc_prewarm.py b/test_runner/regress/test_lfc_prewarm.py index e5d0c79dc68c..b71f2a6efaef 100644 --- a/test_runner/regress/test_lfc_prewarm.py +++ b/test_runner/regress/test_lfc_prewarm.py @@ -3,6 +3,7 @@ import pytest from fixtures.log_helper import log from fixtures.neon_fixtures import NeonEnv +from fixtures.utils import USE_LFC @pytest.mark.skipif(not USE_LFC, reason="LFC is disabled, skipping") From cca517fb942cf8b7afb70f11410ca446fb15eb40 Mon Sep 17 00:00:00 2001 From: Konstantin Knizhnik Date: Mon, 23 Dec 2024 11:36:47 +0200 Subject: [PATCH 10/10] Add test for prewarm under workload --- test_runner/regress/test_lfc_prewarm.py | 75 +++++++++++++++++++++++++ 1 file changed, 75 insertions(+) diff --git a/test_runner/regress/test_lfc_prewarm.py b/test_runner/regress/test_lfc_prewarm.py index b71f2a6efaef..a27f1423e5e4 100644 --- a/test_runner/regress/test_lfc_prewarm.py +++ b/test_runner/regress/test_lfc_prewarm.py @@ -1,3 +1,5 @@ +import random +import threading import time import pytest @@ -53,3 +55,76 @@ def test_lfc_prewarm(neon_simple_env: NeonEnv): assert cur.fetchall()[0][0] == n_records * (n_records + 1) / 2 assert prewarm_info[1] > 0 + + +@pytest.mark.skipif(not USE_LFC, reason="LFC is disabled, skipping") +def test_lfc_prewarm_under_workload(neon_simple_env: NeonEnv): + env = neon_simple_env + n_records = 1000000 + n_threads = 4 + + endpoint = env.endpoints.create_start( + branch_name="main", + config_lines=[ + "shared_buffers=1MB", + "neon.max_file_cache_size=1GB", + "neon.file_cache_size_limit=1GB", + "neon.file_cache_prewarm_limit=1000000", + ], + ) + conn = endpoint.connect() + cur = conn.cursor() + cur.execute("create extension neon version '1.6'") + cur.execute( + "create table accounts(id integer primary key, balance bigint default 0, payload text default repeat('?', 128))" + ) + cur.execute(f"insert into accounts(id) values (generate_series(1,{n_records}))") + cur.execute("select get_local_cache_state()") + lfc_state = cur.fetchall()[0][0] + + running = True + + def workload(): + conn = endpoint.connect() + cur = conn.cursor() + n_transfers = 0 + while running: + src = random.randint(1, n_records) + dst = random.randint(1, n_records) + cur.execute("update accounts set balance=balance-100 where id=%s", (src,)) + cur.execute("update accounts set balance=balance+100 where id=%s", (dst,)) + n_transfers += 1 + log.info(f"Number of transfers: {n_transfers}") + + def prewarm(): + conn = endpoint.connect() + cur = conn.cursor() + n_prewarms = 0 + while running: + cur.execute("alter system set neon.file_cache_size_limit='1MB'") + cur.execute("select pg_reload_conf()") + cur.execute("alter system set neon.file_cache_size_limit='1GB'") + cur.execute("select pg_reload_conf()") + cur.execute("select prewarm_local_cache(%s)", (lfc_state,)) + n_prewarms += 1 + log.info(f"Number of prewarms: {n_prewarms}") + + workload_threads = [] + for _ in range(n_threads): + t = threading.Thread(target=workload) + workload_threads.append(t) + t.start() + + prewarm_thread = threading.Thread(target=prewarm) + prewarm_thread.start() + + time.sleep(100) + + running = False + for t in workload_threads: + t.join() + prewarm_thread.join() + + cur.execute("select sum(balance) from accounts") + total_balance = cur.fetchall()[0][0] + assert total_balance == 0