diff --git a/src/Makefile b/src/Makefile index eaf0e4e387..d5cd77be4c 100644 --- a/src/Makefile +++ b/src/Makefile @@ -423,7 +423,7 @@ endif ENGINE_NAME=valkey SERVER_NAME=$(ENGINE_NAME)-server$(PROG_SUFFIX) ENGINE_SENTINEL_NAME=$(ENGINE_NAME)-sentinel$(PROG_SUFFIX) -ENGINE_SERVER_OBJ=threads_mngr.o adlist.o quicklist.o ae.o anet.o dict.o kvstore.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o io_threads.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o cluster_legacy.o cluster_slot_stats.o crc16.o endianconv.o slowlog.o eval.o bio.o rio.o rand.o memtest.o syscheck.o crcspeed.o crccombine.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o valkey-check-rdb.o valkey-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o acl.o tracking.o socket.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o mt19937-64.o resp_parser.o call_reply.o script_lua.o script.o functions.o function_lua.o commands.o strl.o connection.o unix.o logreqres.o +ENGINE_SERVER_OBJ=threads_mngr.o adlist.o quicklist.o ae.o anet.o dict.o kvstore.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o maa.o io_threads.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o cluster_legacy.o cluster_slot_stats.o crc16.o endianconv.o slowlog.o eval.o bio.o rio.o rand.o memtest.o syscheck.o crcspeed.o crccombine.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o valkey-check-rdb.o valkey-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o acl.o tracking.o socket.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o mt19937-64.o resp_parser.o call_reply.o script_lua.o script.o functions.o function_lua.o commands.o strl.o connection.o unix.o logreqres.o ENGINE_CLI_NAME=$(ENGINE_NAME)-cli$(PROG_SUFFIX) ENGINE_CLI_OBJ=anet.o adlist.o dict.o valkey-cli.o zmalloc.o release.o ae.o serverassert.o crcspeed.o crccombine.o crc64.o siphash.o crc16.o monotonic.o cli_common.o mt19937-64.o strl.o cli_commands.o ENGINE_BENCHMARK_NAME=$(ENGINE_NAME)-benchmark$(PROG_SUFFIX) diff --git a/src/config.c b/src/config.c index ae60dd3fd0..364aa38fe1 100644 --- a/src/config.c +++ b/src/config.c @@ -2564,6 +2564,12 @@ static int updateOOMScoreAdj(const char **err) { return 1; } +static int UpdateMaxPrefetchBatchSize(const char **err) { + UNUSED(err); + onMaxBatchSizeChange(); + return 1; +} + int invalidateClusterSlotsResp(const char **err) { UNUSED(err); clearCachedClusterSlotsResponse(); @@ -3164,6 +3170,7 @@ standardConfig static_configs[] = { createIntConfig("port", NULL, MODIFIABLE_CONFIG, 0, 65535, server.port, 6379, INTEGER_CONFIG, NULL, updatePort), /* TCP port. */ createIntConfig("io-threads", NULL, DEBUG_CONFIG | IMMUTABLE_CONFIG, 1, 128, server.io_threads_num, 1, INTEGER_CONFIG, NULL, NULL), /* Single threaded by default */ createIntConfig("events-per-io-thread", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.events_per_io_thread, 2, INTEGER_CONFIG, NULL, NULL), + createIntConfig("prefetch-batch-max-size", NULL, MODIFIABLE_CONFIG, 0, 128, server.prefetch_batch_max_size, 16, INTEGER_CONFIG, NULL, UpdateMaxPrefetchBatchSize), createIntConfig("auto-aof-rewrite-percentage", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.aof_rewrite_perc, 100, INTEGER_CONFIG, NULL, NULL), createIntConfig("cluster-replica-validity-factor", "cluster-slave-validity-factor", MODIFIABLE_CONFIG, 0, INT_MAX, server.cluster_replica_validity_factor, 10, INTEGER_CONFIG, NULL, NULL), /* replica max data age factor. */ createIntConfig("list-max-listpack-size", "list-max-ziplist-size", MODIFIABLE_CONFIG, INT_MIN, INT_MAX, server.list_max_listpack_size, -2, INTEGER_CONFIG, NULL, NULL), diff --git a/src/dict.c b/src/dict.c index c7bace846a..9ad3fd0abf 100644 --- a/src/dict.c +++ b/src/dict.c @@ -120,7 +120,7 @@ static void _dictExpandIfNeeded(dict *d); static void _dictShrinkIfNeeded(dict *d); static signed char _dictNextExp(unsigned long size); static int _dictInit(dict *d, dictType *type); -static dictEntry *dictGetNext(const dictEntry *de); +dictEntry *dictGetNext(const dictEntry *de); static dictEntry **dictGetNextRef(dictEntry *de); static void dictSetNext(dictEntry *de, dictEntry *next); @@ -963,7 +963,7 @@ double *dictGetDoubleValPtr(dictEntry *de) { /* Returns the 'next' field of the entry or NULL if the entry doesn't have a * 'next' field. */ -static dictEntry *dictGetNext(const dictEntry *de) { +dictEntry *dictGetNext(const dictEntry *de) { if (entryIsKey(de)) return NULL; /* there's no next */ if (entryIsNoValue(de)) return decodeEntryNoValue(de)->next; if (entryIsEmbedded(de)) return decodeEmbeddedEntry(de)->next; @@ -1542,204 +1542,6 @@ dictScanDefrag(dict *d, unsigned long v, dictScanFunction *fn, dictDefragFunctio return v; } - -/* -------------------------- Dict Prefetching ------------------------------ */ - -typedef enum { - PrefetchBucket, /* Initial state, determines which hash table to use, and prefetch the table's bucket */ - PrefetchEntry, /* Prefetches entries associated with the given key's hash */ - PrefetchValue, /* Prefetches the value object of the entry found in the previous step*/ - PrefetchValueData, /* Prefetches the value object's data (if applicable) */ - PrefetchDone /* Indicates that prefetching for this key is complete */ -} PrefetchState; - -/************************************ State machine diagram for the prefetch operation. ******************************** - │ - start - │ - ┌────────▼─────────┐ - ┌─────────►│ PrefetchBucket ├────►────────┐ - │ └────────┬─────────┘ no more tables -> done - | bucket|found | - │ | │ - entry not found - goto next table ┌────────▼────────┐ │ - └────◄─────┤ PrefetchEntry | ▼ - ┌────────────►└────────┬────────┘ │ - | Entry│found │ - │ | │ - value not found - goto next entry ┌───────▼────────┐ | - └───────◄──────┤ PrefetchValue | ▼ - └───────┬────────┘ │ - Value│found │ - | | - ┌───────────▼──────────────┐ │ - │ PrefetchValueData │ ▼ - └───────────┬──────────────┘ │ - | │ - ┌───────-─▼─────────────┐ │ - │ PrefetchDone │◄────────┘ - └───────────────────────┘ -**********************************************************************************************************************/ - -typedef struct { - PrefetchState state; /* Current state of the prefetch operation */ - int ht_idx; /* Index of the current hash table (0 or 1 for rehashing) */ - uint64_t bucket_idx; /* Index of the bucket in the current hash table */ - uint64_t key_hash; /* Hash value of the key being prefetched */ - dictEntry *current_entry; /* Pointer to the current entry being processed */ -} PrefetchInfo; - -typedef struct { - PrefetchInfo prefetch_info[DictMaxPrefetchSize]; - size_t current_batch_size; /* Number of keys in the current batch */ - size_t cur_idx; /* Index of the current key being prefetched */ - size_t keys_done; /* Number of keys that have been processed */ -} PrefetchBatch; - -/* Prefetches the given pointer and move to the next key in the batch */ -static void prefetch(void *addr, PrefetchBatch *batch) { - valkey_prefetch(addr); - /* while the prefetch is in progress, we can continue to the next key */ - batch->cur_idx = (batch->cur_idx + 1) % batch->current_batch_size; -} - -static void markDone(PrefetchInfo *info, PrefetchBatch *batch) { - info->state = PrefetchDone; - batch->keys_done++; -} - -static PrefetchInfo *getNextPrefetchInfo(PrefetchBatch *batch) { - while (batch->prefetch_info[batch->cur_idx].state == PrefetchDone) { - batch->cur_idx = (batch->cur_idx + 1) % batch->current_batch_size; - } - return &batch->prefetch_info[batch->cur_idx]; -} - -static void initBatch(dict **keys_dicts, size_t num_keys, const void **keys, PrefetchBatch *batch) { - assert(num_keys <= DictMaxPrefetchSize); - - batch->current_batch_size = num_keys; - batch->cur_idx = 0; - batch->keys_done = 0; - - /* Initialize the prefetch info */ - for (size_t i = 0; i < batch->current_batch_size; i++) { - PrefetchInfo *info = &batch->prefetch_info[i]; - if (!keys_dicts[i] || dictSize(keys_dicts[i]) == 0) { - info->state = PrefetchDone; - batch->keys_done++; - continue; - } - info->ht_idx = -1; - info->current_entry = NULL; - info->state = PrefetchBucket; - info->key_hash = dictHashKey(keys_dicts[i], keys[i]); - } -} - -/* dictPrefetch - Prefetches dictionary data for an array of keys - * - * This function takes an array of dictionaries and keys, attempting to bring - * data closer to the L1 cache that might be needed for dictionary operations - * on those keys. - * - * dictFind Algorithm: - * 1. Evaluate the hash of the key - * 2. Access the index in the first table - * 3. Walk the linked list until the key is found - * If the key hasn't been found and the dictionary is in the middle of rehashing, - * access the index on the second table and repeat step 3 - * - * dictPrefetch executes the same algorithm as dictFind, but one step at a time - * for each key. Instead of waiting for data to be read from memory, it prefetches - * the data and then moves on to execute the next prefetch for another key. - * - * dictPrefetch can be invoked with a callback function, get_val_data_func, - * to bring the key's value data closer to the L1 cache as well. */ -void dictPrefetch(dict **keys_dicts, size_t num_keys, const void **keys, void *(*get_val_data_func)(const void *val)) { - PrefetchBatch batch; /* prefetch batch - holds the current batch of keys being prefetched */ - initBatch(keys_dicts, num_keys, keys, &batch); - - while (batch.keys_done < batch.current_batch_size) { - PrefetchInfo *info = getNextPrefetchInfo(&batch); - size_t i = batch.cur_idx; - switch (info->state) { - case PrefetchBucket: - /* Determine which hash table to use */ - if (info->ht_idx == -1) { - info->ht_idx = 0; - } else if (info->ht_idx == 0 && dictIsRehashing(keys_dicts[i])) { - info->ht_idx = 1; - } else { - /* No more tables left - mark as done. */ - markDone(info, &batch); - break; - } - - /* Prefetch the bucket */ - info->bucket_idx = info->key_hash & DICTHT_SIZE_MASK(keys_dicts[i]->ht_size_exp[info->ht_idx]); - prefetch(&keys_dicts[i]->ht_table[info->ht_idx][info->bucket_idx], &batch); - info->current_entry = NULL; - info->state = PrefetchEntry; - break; - - case PrefetchEntry: - /* Prefetch the first entry in the bucket */ - if (info->current_entry) { - /* We already found an entry in the bucket - move to the next entry */ - info->current_entry = dictGetNext(info->current_entry); - } else { - /* Find the first entry in the bucket */ - info->current_entry = keys_dicts[i]->ht_table[info->ht_idx][info->bucket_idx]; - } - - if (info->current_entry) { - prefetch(info->current_entry, &batch); - info->state = PrefetchValue; - } else { - /* No entry found in the bucket - try the bucket in the next table */ - info->state = PrefetchBucket; - } - break; - - case PrefetchValue: { - /* Prefetch the entry's value. */ - void *value = dictGetVal(info->current_entry); - - if (dictGetNext(info->current_entry) == NULL && !dictIsRehashing(keys_dicts[i])) { - /* If this is the last element we assume a hit and dont compare the keys */ - prefetch(value, &batch); - info->state = PrefetchValueData; - break; - } - - void *current_entry_key = dictGetKey(info->current_entry); - if (keys[i] == current_entry_key || dictCompareKeys(keys_dicts[i], keys[i], current_entry_key)) { - /* If the key is found, prefetch the value */ - prefetch(value, &batch); - info->state = PrefetchValueData; - } else { - /* Move to next entry */ - info->state = PrefetchEntry; - } - break; - } - - case PrefetchValueData: { - /* Prefetch value data if available */ - if (get_val_data_func) { - void *value_data = get_val_data_func(dictGetVal(info->current_entry)); - if (value_data) prefetch(value_data, &batch); - } - markDone(info, &batch); - break; - } - - default: assert(0); - } - } -} - /* ------------------------- private functions ------------------------------ */ /* Because we may need to allocate huge memory chunk at once when dict diff --git a/src/dict.h b/src/dict.h index f984023e23..97a79910cb 100644 --- a/src/dict.h +++ b/src/dict.h @@ -45,8 +45,7 @@ #define DICT_ERR 1 /* Hash table parameters */ -#define HASHTABLE_MIN_FILL 8 /* Minimal hash table fill 12.5%(100/8) */ -#define DictMaxPrefetchSize 16 /* Limit of maximum number of dict entries to prefetch */ +#define HASHTABLE_MIN_FILL 8 /* Minimal hash table fill 12.5%(100/8) */ typedef struct dictEntry dictEntry; /* opaque */ typedef struct dict dict; @@ -248,7 +247,6 @@ unsigned long dictScanDefrag(dict *d, unsigned long v, dictScanFunction *fn, dictDefragFunctions *defragfns, void *privdata); uint64_t dictGetHash(dict *d, const void *key); void dictRehashingInfo(dict *d, unsigned long long *from_size, unsigned long long *to_size); -void dictPrefetch(dict **keys_dicts, size_t num_keys, const void **keys, void *(*get_val_data_func)(const void *val)); size_t dictGetStatsMsg(char *buf, size_t bufsize, dictStats *stats, int full); dictStats *dictGetStatsHt(dict *d, int htidx, int full); diff --git a/src/io_threads.c b/src/io_threads.c index c9345d72e0..7a68cfb87f 100644 --- a/src/io_threads.c +++ b/src/io_threads.c @@ -303,6 +303,8 @@ void initIOThreads(void) { serverAssert(server.io_threads_num <= IO_THREADS_MAX_NUM); + prefetchCommandsBatchInit(); + /* Spawn and initialize the I/O threads. */ for (int i = 1; i < server.io_threads_num; i++) { createIOThread(i); diff --git a/src/kvstore.c b/src/kvstore.c index d6f886c95b..b7fa7359ab 100644 --- a/src/kvstore.c +++ b/src/kvstore.c @@ -93,7 +93,7 @@ typedef struct { /**********************************/ /* Get the dictionary pointer based on dict-index. */ -static dict *kvstoreGetDict(kvstore *kvs, int didx) { +dict *kvstoreGetDict(kvstore *kvs, int didx) { return kvs->dicts[didx]; } @@ -828,15 +828,3 @@ int kvstoreDictDelete(kvstore *kvs, int didx, const void *key) { } return ret; } - -void kvstoreDictPrefetch(kvstore **kvs, - int *slots, - const void **keys, - size_t keys_count, - void *(*get_val_data_func)(const void *val)) { - dict *dicts[keys_count]; - for (size_t i = 0; i < keys_count; i++) { - dicts[i] = kvstoreGetDict(kvs[i], slots[i]); - } - dictPrefetch(dicts, keys_count, keys, get_val_data_func); -} diff --git a/src/kvstore.h b/src/kvstore.h index 40d1eab15f..a94f366b6b 100644 --- a/src/kvstore.h +++ b/src/kvstore.h @@ -36,11 +36,6 @@ int kvstoreNumNonEmptyDicts(kvstore *kvs); int kvstoreNumAllocatedDicts(kvstore *kvs); int kvstoreNumDicts(kvstore *kvs); uint64_t kvstoreGetHash(kvstore *kvs, const void *key); -void kvstoreDictPrefetch(kvstore **kvs, - int *slots, - const void **keys, - size_t keys_count, - void *(*get_val_data_func)(const void *val)); /* kvstore iterator specific functions */ kvstoreIterator *kvstoreIteratorInit(kvstore *kvs); diff --git a/src/maa.c b/src/maa.c new file mode 100644 index 0000000000..fd9413c90b --- /dev/null +++ b/src/maa.c @@ -0,0 +1,415 @@ +/* + * maa.c - Memory Access Amortization (MAA) Implementation + * + * This file implements the memory access amortization technique for Valkey. + * It utilizes prefetching keys and data for multiple commands in a batch, + * to improve performance by amortizing memory access costs across multiple operations. + */ + +#include "maa.h" +#include "server.h" +#include "dict.h" + +/* Forward declarations of dict.c functions */ +dictEntry *dictGetNext(const dictEntry *de); + +/* Forward declarations of kvstore.c functions */ +dict *kvstoreGetDict(kvstore *kvs, int didx); + +#define HT_IDX_FIRST 0 +#define HT_IDX_SECOND 1 +#define HT_IDX_INVALID -1 + +typedef enum { + PREFETCH_BUCKET, /* Initial state, determines which hash table to use and prefetch the table's bucket */ + PREFETCH_ENTRY, /* prefetch entries associated with the given key's hash */ + PREFETCH_VALUE, /* prefetch the value object of the entry found in the previous step */ + PREFETCH_VALUE_DATA, /* prefetch the value object's data (if applicable) */ + PREFETCH_DONE /* Indicates that prefetching for this key is complete */ +} PrefetchState; + + +/************************************ State machine diagram for the prefetch operation. ******************************** + │ + start + │ + ┌────────▼─────────┐ + ┌─────────►│ PREFETCH_BUCKET ├────►────────┐ + │ └────────┬─────────┘ no more tables -> done + | bucket|found | + │ | │ + entry not found - goto next table ┌────────▼────────┐ │ + └────◄─────┤ PREFETCH_ENTRY | ▼ + ┌────────────►└────────┬────────┘ │ + | Entry│found │ + │ | │ + value not found - goto next entry ┌───────▼────────┐ | + └───────◄──────┤ PREFETCH_VALUE | ▼ + └───────┬────────┘ │ + Value│found │ + | | + ┌───────────▼──────────────┐ │ + │ PREFETCH_VALUE_DATA │ ▼ + └───────────┬──────────────┘ │ + | │ + ┌───────-─▼─────────────┐ │ + │ PREFETCH_DONE │◄────────┘ + └───────────────────────┘ +**********************************************************************************************************************/ + +typedef void *(*GetValueDataFunc)(const void *val); + +typedef struct PrefetchInfo { + PrefetchState state; /* Current state of the prefetch operation */ + int ht_idx; /* Index of the current hash table (0 or 1 for rehashing) */ + uint64_t bucket_idx; /* Index of the bucket in the current hash table */ + uint64_t key_hash; /* Hash value of the key being prefetched */ + dictEntry *current_entry; /* Pointer to the current entry being processed */ +} PrefetchInfo; + +/* CommandsBatch structure holds the state of the current batch of client commands being processed. */ +typedef struct CommandsBatch { + size_t cur_idx; /* Index of the current key being processed */ + size_t keys_done; /* Number of keys that have been prefetched */ + size_t key_count; /* Number of keys in the current batch */ + size_t client_count; /* Number of clients in the current batch */ + size_t max_prefetch_size; /* Maximum number of keys to prefetch in a batch */ + size_t executed_commands; /* Number of commands executed in the current batch */ + int *slots; /* Array of slots for each key */ + void **keys; /* Array of keys to prefetch in the current batch */ + client **clients; /* Array of clients in the current batch */ + dict **keys_dicts; /* Main dict for each key */ + dict **expire_dicts; /* Expire dict for each key */ + dict **current_dicts; /* Points to either keys_dicts or expire_dicts */ + PrefetchInfo *prefetch_info; /* Prefetch info for each key */ +} CommandsBatch; + +static CommandsBatch *batch = NULL; + +void freePrefetchCommandsBatch(void) { + if (batch == NULL) { + return; + } + + zfree(batch->clients); + zfree(batch->keys); + zfree(batch->keys_dicts); + zfree(batch->expire_dicts); + zfree(batch->slots); + zfree(batch->prefetch_info); + zfree(batch); + batch = NULL; +} + +void prefetchCommandsBatchInit(void) { + serverAssert(!batch); + size_t max_prefetch_size = server.prefetch_batch_max_size; + + if (max_prefetch_size == 0) { + return; + } + + batch = zcalloc(sizeof(CommandsBatch)); + batch->max_prefetch_size = max_prefetch_size; + batch->clients = zcalloc(max_prefetch_size * sizeof(client *)); + batch->keys = zcalloc(max_prefetch_size * sizeof(void *)); + batch->keys_dicts = zcalloc(max_prefetch_size * sizeof(dict *)); + batch->expire_dicts = zcalloc(max_prefetch_size * sizeof(dict *)); + batch->slots = zcalloc(max_prefetch_size * sizeof(int)); + batch->prefetch_info = zcalloc(max_prefetch_size * sizeof(PrefetchInfo)); +} + +void onMaxBatchSizeChange(void) { + if (batch && batch->client_count > 0) { + /* We need to process the current batch before updating the size */ + return; + } + + freePrefetchCommandsBatch(); + prefetchCommandsBatchInit(); +} + +/* Prefetch the given pointer and move to the next key in the batch. */ +static void prefetch(void *addr) { + valkey_prefetch(addr); + /* While the prefetch is in progress, we can continue to the next key */ + batch->cur_idx = (batch->cur_idx + 1) % batch->key_count; +} + +static void markDone(PrefetchInfo *info) { + info->state = PREFETCH_DONE; + server.stat_total_prefetch_entries++; + batch->keys_done++; +} + +/* Returns the next PrefetchInfo structure that needs to be processed. */ +static PrefetchInfo *getNextPrefetchInfo(void) { + size_t start_idx = batch->cur_idx; + do { + PrefetchInfo *info = &batch->prefetch_info[batch->cur_idx]; + if (info->state != PREFETCH_DONE) return info; + batch->cur_idx = (batch->cur_idx + 1) % batch->key_count; + } while (batch->cur_idx != start_idx); + return NULL; +} + +static void initBatchInfo(dict **dicts) { + batch->current_dicts = dicts; + + /* Initialize the prefetch info */ + for (size_t i = 0; i < batch->key_count; i++) { + PrefetchInfo *info = &batch->prefetch_info[i]; + if (!batch->current_dicts[i] || dictSize(batch->current_dicts[i]) == 0) { + info->state = PREFETCH_DONE; + batch->keys_done++; + continue; + } + info->ht_idx = HT_IDX_INVALID; + info->current_entry = NULL; + info->state = PREFETCH_BUCKET; + info->key_hash = dictHashKey(batch->current_dicts[i], batch->keys[i]); + } +} + +/* Prefetch the bucket of the next hash table index. + * If no tables are left, move to the PREFETCH_DONE state. */ +static void prefetchBucket(PrefetchInfo *info) { + size_t i = batch->cur_idx; + + /* Determine which hash table to use */ + if (info->ht_idx == HT_IDX_INVALID) { + info->ht_idx = HT_IDX_FIRST; + } else if (info->ht_idx == HT_IDX_FIRST && dictIsRehashing(batch->current_dicts[i])) { + info->ht_idx = HT_IDX_SECOND; + } else { + /* No more tables left - mark as done. */ + markDone(info); + return; + } + + /* Prefetch the bucket */ + info->bucket_idx = info->key_hash & DICTHT_SIZE_MASK(batch->current_dicts[i]->ht_size_exp[info->ht_idx]); + prefetch(&batch->current_dicts[i]->ht_table[info->ht_idx][info->bucket_idx]); + info->current_entry = NULL; + info->state = PREFETCH_ENTRY; +} + +/* Prefetch the next entry in the bucket and move to the PREFETCH_VALUE state. + * If no more entries in the bucket, move to the PREFETCH_BUCKET state to look at the next table. */ +static void prefetchEntry(PrefetchInfo *info) { + size_t i = batch->cur_idx; + + if (info->current_entry) { + /* We already found an entry in the bucket - move to the next entry */ + info->current_entry = dictGetNext(info->current_entry); + } else { + /* Go to the first entry in the bucket */ + info->current_entry = batch->current_dicts[i]->ht_table[info->ht_idx][info->bucket_idx]; + } + + if (info->current_entry) { + prefetch(info->current_entry); + info->state = PREFETCH_VALUE; + } else { + /* No entry found in the bucket - try the bucket in the next table */ + info->state = PREFETCH_BUCKET; + } +} + +/* Prefetch the entry's value. If the value is found, move to the PREFETCH_VALUE_DATA state. + * If the value is not found, move to the PREFETCH_ENTRY state to look at the next entry in the bucket. */ +static void prefetchValue(PrefetchInfo *info) { + size_t i = batch->cur_idx; + void *value = dictGetVal(info->current_entry); + + if (dictGetNext(info->current_entry) == NULL && !dictIsRehashing(batch->current_dicts[i])) { + /* If this is the last element, we assume a hit and don't compare the keys */ + prefetch(value); + info->state = PREFETCH_VALUE_DATA; + return; + } + + void *current_entry_key = dictGetKey(info->current_entry); + if (batch->keys[i] == current_entry_key || + dictCompareKeys(batch->current_dicts[i], batch->keys[i], current_entry_key)) { + /* If the key is found, prefetch the value */ + prefetch(value); + info->state = PREFETCH_VALUE_DATA; + } else { + /* Move to the next entry */ + info->state = PREFETCH_ENTRY; + } +} + +/* Prefetch the value data if available. */ +static void prefetchValueData(PrefetchInfo *info, GetValueDataFunc get_val_data_func) { + if (get_val_data_func) { + void *value_data = get_val_data_func(dictGetVal(info->current_entry)); + if (value_data) prefetch(value_data); + } + markDone(info); +} + +/* Prefetch dictionary data for an array of keys. + * + * This function takes an array of dictionaries and keys, attempting to bring + * data closer to the L1 cache that might be needed for dictionary operations + * on those keys. + * + * The dictFind algorithm: + * 1. Evaluate the hash of the key + * 2. Access the index in the first table + * 3. Walk the entries linked list until the key is found + * If the key hasn't been found and the dictionary is in the middle of rehashing, + * access the index on the second table and repeat step 3 + * + * dictPrefetch executes the same algorithm as dictFind, but one step at a time + * for each key. Instead of waiting for data to be read from memory, it prefetches + * the data and then moves on to execute the next prefetch for another key. + * + * dicts - An array of dictionaries to prefetch data from. + * get_val_data_func - A callback function that dictPrefetch can invoke + * to bring the key's value data closer to the L1 cache as well. + */ +static void dictPrefetch(dict **dicts, GetValueDataFunc get_val_data_func) { + initBatchInfo(dicts); + PrefetchInfo *info; + while ((info = getNextPrefetchInfo())) { + switch (info->state) { + case PREFETCH_BUCKET: prefetchBucket(info); break; + case PREFETCH_ENTRY: prefetchEntry(info); break; + case PREFETCH_VALUE: prefetchValue(info); break; + case PREFETCH_VALUE_DATA: prefetchValueData(info, get_val_data_func); break; + default: serverPanic("Unknown prefetch state %d", info->state); + } + } +} + +/* Helper function to get the value pointer of an object. */ +static void *getObjectValuePtr(const void *val) { + robj *o = (robj *)val; + return (o->type == OBJ_STRING && o->encoding == OBJ_ENCODING_RAW) ? o->ptr : NULL; +} + +static void resetCommandsBatch(void) { + batch->cur_idx = 0; + batch->keys_done = 0; + batch->key_count = 0; + batch->client_count = 0; + batch->executed_commands = 0; +} + +/* Prefetch command-related data: + * 1. Prefetch the command arguments allocated by the I/O thread to bring them closer to the L1 cache. + * 2. Prefetch the keys and values for all commands in the current batch from the main and expires dictionaries. */ +static void prefetchCommands(void) { + /* Prefetch argv's for all clients */ + for (size_t i = 0; i < batch->client_count; i++) { + client *c = batch->clients[i]; + if (!c || c->argc <= 1) continue; + /* Skip prefetching first argv (cmd name) it was already looked up by the I/O thread. */ + for (int j = 1; j < c->argc; j++) { + valkey_prefetch(c->argv[j]); + } + } + + /* Prefetch the argv->ptr if required */ + for (size_t i = 0; i < batch->client_count; i++) { + client *c = batch->clients[i]; + if (!c || c->argc <= 1) continue; + for (int j = 1; j < c->argc; j++) { + if (c->argv[j]->encoding == OBJ_ENCODING_RAW) { + valkey_prefetch(c->argv[j]->ptr); + } + } + } + + /* Get the keys ptrs - we do it here after the key obj was prefetched. */ + for (size_t i = 0; i < batch->key_count; i++) { + batch->keys[i] = ((robj *)batch->keys[i])->ptr; + } + + /* Prefetch dict keys for all commands. Prefetching is beneficial only if there are more than one key. */ + if (batch->key_count > 1) { + server.stat_total_prefetch_batches++; + /* Prefetch keys from the main dict */ + dictPrefetch(batch->keys_dicts, getObjectValuePtr); + /* Prefetch keys from the expires dict - no value data to prefetch */ + dictPrefetch(batch->expire_dicts, NULL); + } +} + +/* Processes all the prefetched commands in the current batch. */ +void processClientsCommandsBatch(void) { + if (!batch || batch->client_count == 0) return; + + /* If executed_commands is not 0, + * it means that we are in the middle of processing a batch and this is a recursive call */ + if (batch->executed_commands == 0) { + prefetchCommands(); + } + + /* Process the commands */ + for (size_t i = 0; i < batch->client_count; i++) { + client *c = batch->clients[i]; + if (c == NULL) continue; + + /* Set the client to null immediately to avoid accessing it again recursively when ProcessingEventsWhileBlocked */ + batch->clients[i] = NULL; + batch->executed_commands++; + if (processPendingCommandAndInputBuffer(c) != C_ERR) beforeNextClient(c); + } + + resetCommandsBatch(); + + /* Handle the case where the max prefetch size has benn changed during the batch processing */ + if (batch->max_prefetch_size != (size_t)server.prefetch_batch_max_size) { + onMaxBatchSizeChange(); + } +} + +/* Adds the client's command to the current batch and processes the batch + * if it becomes full. + * + * Returns C_OK if the command was added successfully, C_ERR otherwise. */ +int addCommandToBatchAndProcessIfFull(client *c) { + if (!batch) return C_ERR; + + batch->clients[batch->client_count++] = c; + + /* Get command's keys positions */ + if (c->io_parsed_cmd) { + getKeysResult result; + initGetKeysResult(&result); + int num_keys = getKeysFromCommand(c->io_parsed_cmd, c->argv, c->argc, &result); + for (int i = 0; i < num_keys && batch->key_count < batch->max_prefetch_size; i++) { + batch->keys[batch->key_count] = c->argv[result.keys[i].pos]; + batch->slots[batch->key_count] = c->slot > 0 ? c->slot : 0; + batch->keys_dicts[batch->key_count] = kvstoreGetDict(c->db->keys, batch->slots[batch->key_count]); + batch->expire_dicts[batch->key_count] = kvstoreGetDict(c->db->expires, batch->slots[batch->key_count]); + batch->key_count++; + } + getKeysFreeResult(&result); + } + + /* If the batch is full, process it. + * We also check the client count to handle cases where + * no keys exist for the clients' commands. */ + if (batch->client_count == batch->max_prefetch_size || batch->key_count == batch->max_prefetch_size) { + processClientsCommandsBatch(); + } + + return C_OK; +} + +/* Removes the given client from the pending prefetch batch, if present. */ +void removeClientFromPendingCommandsBatch(client *c) { + if (!batch) return; + + for (size_t i = 0; i < batch->client_count; i++) { + if (batch->clients[i] == c) { + batch->clients[i] = NULL; + return; + } + } +} diff --git a/src/maa.h b/src/maa.h new file mode 100644 index 0000000000..3e97e575e7 --- /dev/null +++ b/src/maa.h @@ -0,0 +1,12 @@ +#ifndef MAA_H +#define MAA_H + +struct client; + +void onMaxBatchSizeChange(void); +void prefetchCommandsBatchInit(void); +void processClientsCommandsBatch(void); +int addCommandToBatchAndProcessIfFull(struct client *c); +void removeClientFromPendingCommandsBatch(struct client *c); + +#endif /* MAA_H */ diff --git a/src/networking.c b/src/networking.c index 3965325123..ef32dc9688 100644 --- a/src/networking.c +++ b/src/networking.c @@ -33,8 +33,8 @@ #include "script.h" #include "fpconv_dtoa.h" #include "fmtargs.h" -#include #include "io_threads.h" +#include #include #include #include @@ -45,7 +45,6 @@ static void setProtocolError(const char *errstr, client *c); static void pauseClientsByClient(mstime_t end, int isPauseClientAll); int postponeClientRead(client *c); char *getClientSockname(client *c); -void removeClientFromPendingPrefetchBatch(client *c); int ProcessingEventsWhileBlocked = 0; /* See processEventsWhileBlocked(). */ __thread sds thread_shared_qb = NULL; @@ -1506,7 +1505,7 @@ void unlinkClient(client *c) { listDelNode(server.clients, c->client_list_node); c->client_list_node = NULL; } - removeClientFromPendingPrefetchBatch(c); + removeClientFromPendingCommandsBatch(c); /* Check if this is a replica waiting for diskless replication (rdb pipe), * in which case it needs to be cleaned from that list */ @@ -4615,124 +4614,11 @@ int postponeClientRead(client *c) { return (trySendReadToIOThreads(c) == C_OK); } -/* Prefetch multiple commands batch */ -typedef struct { - client *clients[DictMaxPrefetchSize]; - size_t client_count; - size_t key_count; - void *keys[DictMaxPrefetchSize]; - kvstore *keys_kvs[DictMaxPrefetchSize]; - kvstore *expire_kvs[DictMaxPrefetchSize]; - int slots[DictMaxPrefetchSize]; -} BatchProcessData; - -static BatchProcessData batch = {0}; - -static void *getObjectValuePtr(const void *val) { - robj *o = (robj *)val; - if (o->type == OBJ_STRING && o->encoding == OBJ_ENCODING_RAW) { - return o->ptr; - } - return NULL; -} - -static void batchProcessClientCommands(void) { - for (size_t i = 0; i < batch.client_count; i++) { - client *c = batch.clients[i]; - if (c) { - /* Set immediately the client to null - in order to not access it again when ProcessingEventsWhileBlocked */ - batch.clients[i] = NULL; - if (processPendingCommandAndInputBuffer(c) != C_ERR) { - beforeNextClient(c); - } - } - } - memset(&batch, 0, sizeof(batch)); -} - -/*Prefetch the commands' args allocated by the I/O thread and process all the commands in the batch.*/ -static void batchPrefetchArgsAndProcessClientCommands(void) { - if (batch.client_count == 0) return; - /* Prefetch argv's for all clients */ - for (size_t i = 0; i < batch.client_count; i++) { - client *c = batch.clients[i]; - if (!c || c->argc <= 1) continue; - /* Skip prefetching first argv (cmd name) it was already looked up by the I/O thread. */ - for (int j = 1; j < c->argc; j++) { - valkey_prefetch(c->argv[j]); - } - } - - /* prefetch the argv->ptr if required */ - for (size_t i = 0; i < batch.client_count; i++) { - client *c = batch.clients[i]; - if (!c || c->argc <= 1) continue; - for (int j = 1; j < c->argc; j++) { - if (c->argv[j]->encoding == OBJ_ENCODING_RAW) { - valkey_prefetch(c->argv[j]->ptr); - } - } - } - - /* Get the keys ptrs - we do it here since we wanted to wait for the arg prefetch */ - for (size_t i = 0; i < batch.key_count; i++) { - batch.keys[i] = ((robj *)batch.keys[i])->ptr; - } - - /* Prefetch keys for all commands, prefetch is beneficial only if there are more than one key */ - if (batch.key_count > 1) { - server.stat_total_prefetch_batches++; - server.stat_total_prefetch_entries += batch.key_count; - /* Keys */ - kvstoreDictPrefetch(batch.keys_kvs, batch.slots, (const void **) batch.keys, batch.key_count, getObjectValuePtr); - /* Expires - with expires no values prefetch are required. */ - kvstoreDictPrefetch(batch.expire_kvs, batch.slots, (const void **)batch.keys, batch.key_count, NULL); - } - - /* Process clients' commands */ - batchProcessClientCommands(); -} - -void addCommandToBatchAndProcessIfFull(client *c) { - batch.clients[batch.client_count++] = c; - - /* Get command's keys positions */ - if (c->io_parsed_cmd) { - getKeysResult result; - initGetKeysResult(&result); - int num_keys = getKeysFromCommand(c->io_parsed_cmd, c->argv, c->argc, &result); - for (int i = 0; i < num_keys && batch.key_count < DictMaxPrefetchSize; i++) { - batch.keys[batch.key_count] = c->argv[result.keys[i].pos]; - batch.slots[batch.key_count] = c->slot > 0 ? c->slot : 0; - batch.keys_kvs[batch.key_count] = c->db->keys; - batch.expire_kvs[batch.key_count] = c->db->expires; - batch.key_count++; - } - getKeysFreeResult(&result); - } - - /* If the batch is full, process it. - * We also check the client count to handle cases where - * no keys exist for the clients' commands. */ - if (batch.client_count == DictMaxPrefetchSize || batch.key_count == DictMaxPrefetchSize) { - batchPrefetchArgsAndProcessClientCommands(); - } -} - -void removeClientFromPendingPrefetchBatch(client *c) { - for (size_t i = 0; i < batch.client_count; i++) { - if (batch.clients[i] == c) { - batch.clients[i] = NULL; - return; - } - } -} - int processIOThreadsReadDone(void) { if (ProcessingEventsWhileBlocked) { /* When ProcessingEventsWhileBlocked we may call processIOThreadsReadDone recursively. * In this case, there may be some clients left in the batch waiting to be processed. */ - batchProcessClientCommands(); + processClientsCommandsBatch(); } if (listLength(server.clients_pending_io_read) == 0) return 0; @@ -4760,6 +4646,7 @@ int processIOThreadsReadDone(void) { /* Don't post-process-reads to clients that are going to be closed anyway. */ if (c->flag.close_asap) continue; + /* If a client is protected, don't do anything, * that may trigger read/write error or recreate handler. */ if (c->flag.protected) continue; @@ -4790,15 +4677,20 @@ int processIOThreadsReadDone(void) { c->flag.pending_command = 1; } - size_t list_len = listLength(server.clients_pending_io_read); - addCommandToBatchAndProcessIfFull(c); - if (list_len != listLength(server.clients_pending_io_read)) { - /* A client was removed from the list - next node may be invalid */ + size_t list_length_before_command_execute = listLength(server.clients_pending_io_read); + /* try to add the command to the batch */ + int ret = addCommandToBatchAndProcessIfFull(c); + /* If the command was not added to the commands batch, process it immediately */ + if (ret == C_ERR) { + if (processPendingCommandAndInputBuffer(c) == C_OK) beforeNextClient(c); + } + if (list_length_before_command_execute != listLength(server.clients_pending_io_read)) { + /* A client was unlink from the list possibly making the next node invalid */ next = listFirst(server.clients_pending_io_read); } } - batchPrefetchArgsAndProcessClientCommands(); + processClientsCommandsBatch(); return processed; } diff --git a/src/server.h b/src/server.h index d1a4d94190..d6d82ac81a 100644 --- a/src/server.h +++ b/src/server.h @@ -79,6 +79,7 @@ typedef long long ustime_t; /* microsecond time type. */ N-elements flat arrays */ #include "rax.h" /* Radix tree */ #include "connection.h" /* Connection abstraction */ +#include "maa.h" /* Memory access amortization */ #define VALKEYMODULE_CORE 1 typedef struct serverObject robj; @@ -1747,6 +1748,7 @@ struct valkeyServer { int io_threads_do_reads; /* Read and parse from IO threads? */ int active_io_threads_num; /* Current number of active IO threads, includes main thread. */ int events_per_io_thread; /* Number of events on the event loop to trigger IO threads activation. */ + int prefetch_batch_max_size; /* Maximum number of keys to prefetch in a single batch */ long long events_processed_while_blocked; /* processEventsWhileBlocked() */ int enable_protected_configs; /* Enable the modification of protected configs, see PROTECTED_ACTION_ALLOWED_* */ int enable_debug_cmd; /* Enable DEBUG commands, see PROTECTED_ACTION_ALLOWED_* */ diff --git a/tests/unit/networking.tcl b/tests/unit/networking.tcl index 1740436c2b..3ddda042b5 100644 --- a/tests/unit/networking.tcl +++ b/tests/unit/networking.tcl @@ -181,6 +181,9 @@ start_server {config "minimal.conf" tags {"external:skip"} overrides {enable-deb set rd$i [valkey_deferring_client] } + # set a key that will be later be prefetch + r set a 0 + # Get the client ID of rd4 $rd4 client id set rd4_id [$rd4 read] @@ -216,5 +219,68 @@ start_server {config "minimal.conf" tags {"external:skip"} overrides {enable-deb assert_equal {OK} [$rd16 read] assert_equal {16} [$rd16 read] } + + test {prefetch works as expected when changing the batch size while executing the commands batch} { + # Create 16 (default prefetch batch size) clients + for {set i 0} {$i < 16} {incr i} { + set rd$i [valkey_deferring_client] + } + + # Create a batch of commands by making sure the server sleeps for a while + # before responding to the first command + $rd0 debug sleep 2 + after 200 ; # wait a bit to make sure the server is sleeping. + + # Send set commands for all clients the 5th client will change the prefetch batch size + for {set i 0} {$i < 16} {incr i} { + if {$i == 4} { + [set rd$i] config set prefetch-batch-max-size 1 + } + [set rd$i] set a $i + [set rd$i] flush + } + + # Read the results + for {set i 0} {$i < 16} {incr i} { + assert_equal {OK} [[set rd$i] read] + } + + # assert the configured prefetch batch size was changed + assert {[r config get prefetch-batch-max-size] eq "prefetch-batch-max-size 1"} + } + + test {no prefetch when the batch size is set to 0} { + # set the batch size to 0 + r config set prefetch-batch-max-size 0 + # save the current value of prefetch entries + set info [r info stats] + set prefetch_entries [getInfoProperty $info io_threaded_total_prefetch_entries] + + # Create 16 (default prefetch batch size) clients + for {set i 0} {$i < 16} {incr i} { + set rd$i [valkey_deferring_client] + } + + # Create a batch of commands by making sure the server sleeps for a while + # before responding to the first command + $rd0 debug sleep 2 + after 200 ; # wait a bit to make sure the server is sleeping. + + # Send set commands for all clients + for {set i 0} {$i < 16} {incr i} { + [set rd$i] set a $i + [set rd$i] flush + } + + # Read the results + for {set i 0} {$i < 16} {incr i} { + assert_equal {OK} [[set rd$i] read] + } + + # assert the prefetch entries did not change + set info [r info stats] + set new_prefetch_entries [getInfoProperty $info io_threaded_total_prefetch_entries] + assert_equal $prefetch_entries $new_prefetch_entries + } } } diff --git a/valkey.conf b/valkey.conf index 68f4ad1f72..dbddcc80be 100644 --- a/valkey.conf +++ b/valkey.conf @@ -1325,7 +1325,20 @@ lazyfree-lazy-user-flush no # to thread the write and read syscall and transfer the client buffers to the # socket and to enable threading of reads and protocol parsing. # -# NOTE 2: If you want to test the server speedup using valkey-benchmark, make +# prefetch-batch-max-size 16 +# +# When multiple commands are parsed by the I/O threads and ready for execution, +# we take advantage of knowing the next set of commands and prefetch their +# required dictionary entries in a batch. This reduces memory access costs. +# +# The optimal batch size depends on the specific workflow of the user. +# The default batch size is 16, which can be modified using the +# 'prefetch-batch-max-size' config. +# +# When the config is set to 0, it means no prefetching will be done. +# This effectively disables the prefetching feature. +# +# NOTE: If you want to test the server speedup using valkey-benchmark, make # sure you also run the benchmark itself in threaded mode, using the # --threads option to match the number of server threads, otherwise you'll not # be able to notice the improvements.