Skip to content

Commit

Permalink
Adjust memory prefetch to the new hash table
Browse files Browse the repository at this point in the history
Signed-off-by: Uri Yagelnik <[email protected]>
  • Loading branch information
uriyage committed Nov 21, 2024
1 parent 8aa2646 commit 97aca47
Show file tree
Hide file tree
Showing 2 changed files with 280 additions and 6 deletions.
9 changes: 8 additions & 1 deletion src/io_threads.c
Original file line number Diff line number Diff line change
Expand Up @@ -493,15 +493,22 @@ int tryOffloadFreeObjToIOThreads(robj *obj) {

if (obj->refcount > 1) return C_ERR;

if (obj->type != OBJ_ENCODING_RAW || obj->type != OBJ_STRING) return C_ERR;

/* We select the thread ID in a round-robin fashion. */
size_t tid = (server.stat_io_freed_objects % (server.active_io_threads_num - 1)) + 1;

IOJobQueue *jq = &io_jobs[tid];
if (IOJobQueue_isFull(jq)) {
return C_ERR;
}

/* We offload only the free of the ptr that may be allocated by the I/O thread.
* The object itself was allocated by the main thread and will be freed by the main thread. */
IOJobQueue_push(jq, zfree, obj->ptr);
obj->ptr = NULL;
decrRefCount(obj);

IOJobQueue_push(jq, decrRefCountVoid, obj);
server.stat_io_freed_objects++;
return C_OK;
}
Expand Down
277 changes: 272 additions & 5 deletions src/memory_prefetch.c
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,282 @@
*/

#include "memory_prefetch.h"
#include "server.h"

typedef enum {
PREFETCH_ENTRY, /* Initial state, prefetch entries associated with the given key's hash */
PREFETCH_VALUE, /* prefetch the value object of the entry found in the previous step */
PREFETCH_DONE /* Indicates that prefetching for this key is complete */
} PrefetchState;

typedef struct KeyPrefetchInfo {
PrefetchState state; /* Current state of the prefetch operation */
hashtableIncrementalFindState hashtab_state;
} KeyPrefetchInfo;

/* PrefetchCommandsBatch structure holds the state of the current batch of client commands being processed. */
typedef struct PrefetchCommandsBatch {
size_t cur_idx; /* Index of the current key being processed */
size_t keys_done; /* Number of keys that have been prefetched */
size_t key_count; /* Number of keys in the current batch */
size_t client_count; /* Number of clients in the current batch */
size_t max_prefetch_size; /* Maximum number of keys to prefetch in a batch */
size_t executed_commands; /* Number of commands executed in the current batch */
int *slots; /* Array of slots for each key */
void **keys; /* Array of keys to prefetch in the current batch */
client **clients; /* Array of clients in the current batch */
hashtable **keys_tables; /* Main table for each key */
hashtable **expire_tables; /* Expire table for each key */
KeyPrefetchInfo *prefetch_info; /* Prefetch info for each key */
} PrefetchCommandsBatch;

static PrefetchCommandsBatch *batch = NULL;

void freePrefetchCommandsBatch(void) {
if (batch == NULL) {
return;
}

zfree(batch->clients);
zfree(batch->keys);
zfree(batch->keys_tables);
zfree(batch->expire_tables);
zfree(batch->slots);
zfree(batch->prefetch_info);
zfree(batch);
batch = NULL;
}

void prefetchCommandsBatchInit(void) {
serverAssert(!batch);
size_t max_prefetch_size = server.prefetch_batch_max_size;

if (max_prefetch_size == 0) {
return;
}

batch = zcalloc(sizeof(PrefetchCommandsBatch));
batch->max_prefetch_size = max_prefetch_size;
batch->clients = zcalloc(max_prefetch_size * sizeof(client *));
batch->keys = zcalloc(max_prefetch_size * sizeof(void *));
batch->keys_tables = zcalloc(max_prefetch_size * sizeof(hashtable *));
batch->expire_tables = zcalloc(max_prefetch_size * sizeof(hashtable *));
batch->slots = zcalloc(max_prefetch_size * sizeof(int));
batch->prefetch_info = zcalloc(max_prefetch_size * sizeof(KeyPrefetchInfo));
}

void onMaxBatchSizeChange(void) {
if (batch && batch->client_count > 0) {
/* We need to process the current batch before updating the size */
return;
}

freePrefetchCommandsBatch();
prefetchCommandsBatchInit();
}

/* Move to the next key in the batch. */
static void moveToNextKey(void) {
batch->cur_idx = (batch->cur_idx + 1) % batch->key_count;
}

static void markKeyAsdone(KeyPrefetchInfo *info) {
info->state = PREFETCH_DONE;
server.stat_total_prefetch_entries++;
batch->keys_done++;
}

/* Returns the next KeyPrefetchInfo structure that needs to be processed. */
static KeyPrefetchInfo *getNextPrefetchInfo(void) {
size_t start_idx = batch->cur_idx;
do {
KeyPrefetchInfo *info = &batch->prefetch_info[batch->cur_idx];
if (info->state != PREFETCH_DONE) return info;
batch->cur_idx = (batch->cur_idx + 1) % batch->key_count;
} while (batch->cur_idx != start_idx);
return NULL;
}

static void initBatchInfo(hashtable **tables) {
/* Initialize the prefetch info */
for (size_t i = 0; i < batch->key_count; i++) {
KeyPrefetchInfo *info = &batch->prefetch_info[i];
if (!tables[i] || hashtableSize(tables[i]) == 0) {
info->state = PREFETCH_DONE;
batch->keys_done++;
continue;
}
info->state = PREFETCH_ENTRY;
hashtableIncrementalFindInit(&info->hashtab_state, tables[i], batch->keys[i]);
}
}

static void prefetchEntry(KeyPrefetchInfo *info, int prefetch_value) {
if (hashtableIncrementalFindStep(&info->hashtab_state) == 1) {
/* Not done yet */
moveToNextKey();
} else {
if (prefetch_value) {
info->state = PREFETCH_VALUE;
} else {
markKeyAsdone(info);
}
}
}

/* Prefetch the entry's value. If the value is found.*/
static void prefetchValue(KeyPrefetchInfo *info) {
valkey *val;
if (hashtableIncrementalFindGetResult(&info->hashtab_state, (void **)&val)) {
if (val->encoding == OBJ_ENCODING_RAW && val->type == OBJ_STRING) {
valkey_prefetch(val->ptr);
}
}

markKeyAsdone(info);
}

/* Prefetch hashtable data for an array of keys.
*
* This function takes an array of tables and keys, attempting to bring
* data closer to the L1 cache that might be needed for hashtable operations
* on those keys.
*
* tables - An array of hashtables to prefetch data from.
* prefetch_value - If true, we prefetch the value data for each key.
* to bring the key's value data closer to the L1 cache as well.
*/
static void hashtablePrefetch(hashtable **tables, int prefetch_value) {
initBatchInfo(tables);
KeyPrefetchInfo *info;
while ((info = getNextPrefetchInfo())) {
switch (info->state) {
case PREFETCH_ENTRY: prefetchEntry(info, prefetch_value); break;
case PREFETCH_VALUE: prefetchValue(info); break;
default: serverPanic("Unknown prefetch state %d", info->state);
}
}
}

static void resetCommandsBatch(void) {
batch->cur_idx = 0;
batch->keys_done = 0;
batch->key_count = 0;
batch->client_count = 0;
batch->executed_commands = 0;
}

/* Prefetch command-related data:
* 1. Prefetch the command arguments allocated by the I/O thread to bring them closer to the L1 cache.
* 2. Prefetch the keys and values for all commands in the current batch from the main and expires hashtables. */
static void prefetchCommands(void) {
/* Prefetch argv's for all clients */
for (size_t i = 0; i < batch->client_count; i++) {
client *c = batch->clients[i];
if (!c || c->argc <= 1) continue;
/* Skip prefetching first argv (cmd name) it was already looked up by the I/O thread. */
for (int j = 1; j < c->argc; j++) {
valkey_prefetch(c->argv[j]);
}
}

/* Prefetch the argv->ptr if required */
for (size_t i = 0; i < batch->client_count; i++) {
client *c = batch->clients[i];
if (!c || c->argc <= 1) continue;
for (int j = 1; j < c->argc; j++) {
if (c->argv[j]->encoding == OBJ_ENCODING_RAW) {
valkey_prefetch(c->argv[j]->ptr);
}
}
}

/* Get the keys ptrs - we do it here after the key obj was prefetched. */
for (size_t i = 0; i < batch->key_count; i++) {
batch->keys[i] = ((robj *)batch->keys[i])->ptr;
}

/* Prefetch hashtable keys for all commands. Prefetching is beneficial only if there are more than one key. */
if (batch->key_count > 1) {
server.stat_total_prefetch_batches++;
/* Prefetch keys from the main hashtable */
hashtablePrefetch(batch->keys_tables, 1);
/* Prefetch keys from the expires hashtable - no value data to prefetch */
hashtablePrefetch(batch->expire_tables, 0);
}
}

/* Processes all the prefetched commands in the current batch. */
void processClientsCommandsBatch(void) {
if (!batch || batch->client_count == 0) return;

/* If executed_commands is not 0,
* it means that we are in the middle of processing a batch and this is a recursive call */
if (batch->executed_commands == 0) {
prefetchCommands();
}

/* Process the commands */
for (size_t i = 0; i < batch->client_count; i++) {
client *c = batch->clients[i];
if (c == NULL) continue;

/* Set the client to null immediately to avoid accessing it again recursively when ProcessingEventsWhileBlocked */
batch->clients[i] = NULL;
batch->executed_commands++;
if (processPendingCommandAndInputBuffer(c) != C_ERR) beforeNextClient(c);
}

resetCommandsBatch();

/* Handle the case where the max prefetch size has been changed. */
if (batch->max_prefetch_size != (size_t)server.prefetch_batch_max_size) {
onMaxBatchSizeChange();
}
}
int addCommandToBatchAndProcessIfFull(struct client *c) {
(void)c;
return -1;

/* Adds the client's command to the current batch and processes the batch
* if it becomes full.
*
* Returns C_OK if the command was added successfully, C_ERR otherwise. */
int addCommandToBatchAndProcessIfFull(client *c) {
if (!batch) return C_ERR;

batch->clients[batch->client_count++] = c;

/* Get command's keys positions */
if (c->io_parsed_cmd) {
getKeysResult result;
initGetKeysResult(&result);
int num_keys = getKeysFromCommand(c->io_parsed_cmd, c->argv, c->argc, &result);
for (int i = 0; i < num_keys && batch->key_count < batch->max_prefetch_size; i++) {
batch->keys[batch->key_count] = c->argv[result.keys[i].pos];
batch->slots[batch->key_count] = c->slot > 0 ? c->slot : 0;
batch->keys_tables[batch->key_count] = kvstoreGetHashtable(c->db->keys, batch->slots[batch->key_count]);
batch->expire_tables[batch->key_count] = kvstoreGetHashtable(c->db->expires, batch->slots[batch->key_count]);
batch->key_count++;
}
getKeysFreeResult(&result);
}

/* If the batch is full, process it.
* We also check the client count to handle cases where
* no keys exist for the clients' commands. */
if (batch->client_count == batch->max_prefetch_size || batch->key_count == batch->max_prefetch_size) {
processClientsCommandsBatch();
}

return C_OK;
}
void removeClientFromPendingCommandsBatch(struct client *c) {
(void)c;

/* Removes the given client from the pending prefetch batch, if present. */
void removeClientFromPendingCommandsBatch(client *c) {
if (!batch) return;

for (size_t i = 0; i < batch->client_count; i++) {
if (batch->clients[i] == c) {
batch->clients[i] = NULL;
return;
}
}
}

0 comments on commit 97aca47

Please sign in to comment.