Skip to content

Commit

Permalink
Merge with main
Browse files Browse the repository at this point in the history
  • Loading branch information
Konstantin Knizhnik committed Nov 9, 2023
1 parent a954568 commit 59ec475
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 85 deletions.
97 changes: 12 additions & 85 deletions pgxn/neon/libpagestore.c
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,6 @@ static shmem_startup_hook_type prev_shmem_startup_hook;
static shmem_request_hook_type prev_shmem_request_hook;
#endif

#define MAX_SHARDS 128
#define STRIPE_SIZE (256 * 1024 / 8) /* TODO: should in betaken from control plane? */
#define MAX_PS_CONNSTR_LEN 128

typedef struct
{
size_t n_shards;
Expand Down Expand Up @@ -149,7 +145,7 @@ psm_init(void)
* Reload page map if needed and return number of shards and connection string for the specified shard
*/
static shardno_t
load_page_map(shardno_t shard_no, char* connstr)
load_shard_map(shardno_t shard_no, char* connstr)
{
shardno_t n_shards;

Expand Down Expand Up @@ -203,7 +199,7 @@ load_page_map(shardno_t shard_no, char* connstr)
shardno_t
get_shard_number(BufferTag* tag)
{
shardno_t n_shards = load_page_map(0, NULL);
shardno_t n_shards = load_shard_map(0, NULL);
uint32 hash;

#if PG_MAJORVERSION_NUM < 16
Expand All @@ -221,21 +217,6 @@ get_shard_number(BufferTag* tag)
return hash % n_shards;
}

static void
pageserver_sighup_handler(SIGNAL_ARGS)
{
if (prev_signal_handler)
{
prev_signal_handler(postgres_signal_arg);
}
neon_log(LOG, "Received SIGHUP, disconnecting pageserver. New pageserver connstring is %s", page_server_connstring);

/* force refetching shard map */
LWLockAcquire(shard_map_lock, LW_EXCLUSIVE);
shard_map->n_shards = 0;
LWLockRelease(shard_map_lock);
}

static bool
pageserver_connect(shardno_t shard_no, int elevel)
{
Expand All @@ -250,7 +231,7 @@ pageserver_connect(shardno_t shard_no, int elevel)

Assert(page_servers[shard_no].conn == NULL);

(void)load_page_map(shard_no, connstr); /* refresh page map if needed */
(void)load_shard_map(shard_no, connstr); /* refresh page map if needed */

/*
* Connect using the connection string we got from the
Expand Down Expand Up @@ -412,12 +393,6 @@ pageserver_send(shardno_t shard_no, NeonRequest * request)
StringInfoData req_buff;
PGconn* pageserver_conn = page_servers[shard_no].conn;

if(CheckConnstringUpdated())
{
pageserver_disconnect();
ReloadConnstring();
}

/* If the connection was lost for some reason, reconnect */
if (pageserver_conn && PQstatus(pageserver_conn) == CONNECTION_BAD)
{
Expand Down Expand Up @@ -574,62 +549,16 @@ check_neon_id(char **newval, void **extra, GucSource source)
return **newval == '\0' || HexDecodeString(id, *newval, 16);
}

static Size
PagestoreShmemSize(void)
{
return sizeof(PagestoreShmemState);
}

static bool
PagestoreShmemInit(void)
{
bool found;
LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE);
pagestore_shared = ShmemInitStruct("libpagestore shared state",
PagestoreShmemSize(),
&found);
if(!found)
{
pagestore_shared->lock = &(GetNamedLWLockTranche("neon_libpagestore")->lock);
pg_atomic_init_u64(&pagestore_shared->update_counter, 0);
AssignPageserverConnstring(page_server_connstring, NULL);
}
LWLockRelease(AddinShmemInitLock);
return found;
}

static void
pagestore_shmem_startup_hook(void)
{
if(prev_shmem_startup_hook)
prev_shmem_startup_hook();

PagestoreShmemInit();
}

static void
pagestore_shmem_request(void)
{
#if PG_VERSION_NUM >= 150000
if(prev_shmem_request_hook)
prev_shmem_request_hook();
#endif

RequestAddinShmemSpace(PagestoreShmemSize());
RequestNamedLWLockTranche("neon_libpagestore", 1);
}

static void
pagestore_prepare_shmem(void)
AssignPageserverConnstring(const char *newval, void *extra)
{
#if PG_VERSION_NUM >= 150000
prev_shmem_request_hook = shmem_request_hook;
shmem_request_hook = pagestore_shmem_request;
#else
pagestore_shmem_request();
#endif
prev_shmem_startup_hook = shmem_startup_hook;
shmem_startup_hook = pagestore_shmem_startup_hook;
if (shard_map)
{
/* Force releoading of shard map */
LWLockAcquire(shard_map_lock, LW_EXCLUSIVE);
shard_map->n_shards = 0;
LWLockRelease(shard_map_lock);
}
}

/*
Expand All @@ -638,16 +567,14 @@ pagestore_prepare_shmem(void)
void
pg_init_libpagestore(void)
{
pagestore_prepare_shmem();

DefineCustomStringVariable("neon.pageserver_connstring",
"connection string to the page server",
NULL,
&page_server_connstring,
"",
PGC_SIGHUP,
0, /* no flags required */
CheckPageserverConnstring, AssignPageserverConnstring, NULL);
NULL, AssignPageserverConnstring, NULL);

DefineCustomStringVariable("neon.timeline_id",
"Neon timeline_id the server is running on",
Expand Down
4 changes: 4 additions & 0 deletions pgxn/neon/pagestore_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@

#include "pg_config.h"

#define MAX_SHARDS 128
#define STRIPE_SIZE (256 * 1024 / 8) /* TODO: should in betaken from control plane? */
#define MAX_PS_CONNSTR_LEN 128

typedef enum
{
/* pagestore_client -> pagestore */
Expand Down

0 comments on commit 59ec475

Please sign in to comment.