Skip to content

Commit

Permalink
Dynamic config for batch size.
Browse files Browse the repository at this point in the history
Signed-off-by: Uri Yagelnik <[email protected]>
  • Loading branch information
uriyage committed Aug 18, 2024
1 parent 9a41120 commit d04a755
Show file tree
Hide file tree
Showing 13 changed files with 537 additions and 345 deletions.
2 changes: 1 addition & 1 deletion src/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,7 @@ endif
ENGINE_NAME=valkey
SERVER_NAME=$(ENGINE_NAME)-server$(PROG_SUFFIX)
ENGINE_SENTINEL_NAME=$(ENGINE_NAME)-sentinel$(PROG_SUFFIX)
ENGINE_SERVER_OBJ=threads_mngr.o adlist.o quicklist.o ae.o anet.o dict.o kvstore.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o io_threads.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o cluster_legacy.o cluster_slot_stats.o crc16.o endianconv.o slowlog.o eval.o bio.o rio.o rand.o memtest.o syscheck.o crcspeed.o crccombine.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o valkey-check-rdb.o valkey-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o acl.o tracking.o socket.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o mt19937-64.o resp_parser.o call_reply.o script_lua.o script.o functions.o function_lua.o commands.o strl.o connection.o unix.o logreqres.o
ENGINE_SERVER_OBJ=threads_mngr.o adlist.o quicklist.o ae.o anet.o dict.o kvstore.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o maa.o io_threads.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o cluster_legacy.o cluster_slot_stats.o crc16.o endianconv.o slowlog.o eval.o bio.o rio.o rand.o memtest.o syscheck.o crcspeed.o crccombine.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o valkey-check-rdb.o valkey-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o acl.o tracking.o socket.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o mt19937-64.o resp_parser.o call_reply.o script_lua.o script.o functions.o function_lua.o commands.o strl.o connection.o unix.o logreqres.o
ENGINE_CLI_NAME=$(ENGINE_NAME)-cli$(PROG_SUFFIX)
ENGINE_CLI_OBJ=anet.o adlist.o dict.o valkey-cli.o zmalloc.o release.o ae.o serverassert.o crcspeed.o crccombine.o crc64.o siphash.o crc16.o monotonic.o cli_common.o mt19937-64.o strl.o cli_commands.o
ENGINE_BENCHMARK_NAME=$(ENGINE_NAME)-benchmark$(PROG_SUFFIX)
Expand Down
7 changes: 7 additions & 0 deletions src/config.c
Original file line number Diff line number Diff line change
Expand Up @@ -2564,6 +2564,12 @@ static int updateOOMScoreAdj(const char **err) {
return 1;
}

static int UpdateMaxPrefetchBatchSize(const char **err) {
UNUSED(err);
onMaxBatchSizeChange();
return 1;
}

int invalidateClusterSlotsResp(const char **err) {
UNUSED(err);
clearCachedClusterSlotsResponse();
Expand Down Expand Up @@ -3164,6 +3170,7 @@ standardConfig static_configs[] = {
createIntConfig("port", NULL, MODIFIABLE_CONFIG, 0, 65535, server.port, 6379, INTEGER_CONFIG, NULL, updatePort), /* TCP port. */
createIntConfig("io-threads", NULL, DEBUG_CONFIG | IMMUTABLE_CONFIG, 1, 128, server.io_threads_num, 1, INTEGER_CONFIG, NULL, NULL), /* Single threaded by default */
createIntConfig("events-per-io-thread", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.events_per_io_thread, 2, INTEGER_CONFIG, NULL, NULL),
createIntConfig("prefetch-batch-max-size", NULL, MODIFIABLE_CONFIG, 0, 128, server.prefetch_batch_max_size, 16, INTEGER_CONFIG, NULL, UpdateMaxPrefetchBatchSize),
createIntConfig("auto-aof-rewrite-percentage", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.aof_rewrite_perc, 100, INTEGER_CONFIG, NULL, NULL),
createIntConfig("cluster-replica-validity-factor", "cluster-slave-validity-factor", MODIFIABLE_CONFIG, 0, INT_MAX, server.cluster_replica_validity_factor, 10, INTEGER_CONFIG, NULL, NULL), /* replica max data age factor. */
createIntConfig("list-max-listpack-size", "list-max-ziplist-size", MODIFIABLE_CONFIG, INT_MIN, INT_MAX, server.list_max_listpack_size, -2, INTEGER_CONFIG, NULL, NULL),
Expand Down
202 changes: 2 additions & 200 deletions src/dict.c
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ static void _dictExpandIfNeeded(dict *d);
static void _dictShrinkIfNeeded(dict *d);
static signed char _dictNextExp(unsigned long size);
static int _dictInit(dict *d, dictType *type);
static dictEntry *dictGetNext(const dictEntry *de);
dictEntry *dictGetNext(const dictEntry *de);
static dictEntry **dictGetNextRef(dictEntry *de);
static void dictSetNext(dictEntry *de, dictEntry *next);

Expand Down Expand Up @@ -963,7 +963,7 @@ double *dictGetDoubleValPtr(dictEntry *de) {

/* Returns the 'next' field of the entry or NULL if the entry doesn't have a
* 'next' field. */
static dictEntry *dictGetNext(const dictEntry *de) {
dictEntry *dictGetNext(const dictEntry *de) {
if (entryIsKey(de)) return NULL; /* there's no next */
if (entryIsNoValue(de)) return decodeEntryNoValue(de)->next;
if (entryIsEmbedded(de)) return decodeEmbeddedEntry(de)->next;
Expand Down Expand Up @@ -1542,204 +1542,6 @@ dictScanDefrag(dict *d, unsigned long v, dictScanFunction *fn, dictDefragFunctio
return v;
}


/* -------------------------- Dict Prefetching ------------------------------ */

typedef enum {
PrefetchBucket, /* Initial state, determines which hash table to use, and prefetch the table's bucket */
PrefetchEntry, /* Prefetches entries associated with the given key's hash */
PrefetchValue, /* Prefetches the value object of the entry found in the previous step*/
PrefetchValueData, /* Prefetches the value object's data (if applicable) */
PrefetchDone /* Indicates that prefetching for this key is complete */
} PrefetchState;

/************************************ State machine diagram for the prefetch operation. ********************************
start
┌────────▼─────────┐
┌─────────►│ PrefetchBucket ├────►────────┐
│ └────────┬─────────┘ no more tables -> done
| bucket|found |
│ | │
entry not found - goto next table ┌────────▼────────┐ │
└────◄─────┤ PrefetchEntry | ▼
┌────────────►└────────┬────────┘ │
| Entry│found │
│ | │
value not found - goto next entry ┌───────▼────────┐ |
└───────◄──────┤ PrefetchValue | ▼
└───────┬────────┘ │
Value│found │
| |
┌───────────▼──────────────┐ │
│ PrefetchValueData │ ▼
└───────────┬──────────────┘ │
| │
┌───────-─▼─────────────┐ │
│ PrefetchDone │◄────────┘
└───────────────────────┘
**********************************************************************************************************************/

typedef struct {
PrefetchState state; /* Current state of the prefetch operation */
int ht_idx; /* Index of the current hash table (0 or 1 for rehashing) */
uint64_t bucket_idx; /* Index of the bucket in the current hash table */
uint64_t key_hash; /* Hash value of the key being prefetched */
dictEntry *current_entry; /* Pointer to the current entry being processed */
} PrefetchInfo;

typedef struct {
PrefetchInfo prefetch_info[DictMaxPrefetchSize];
size_t current_batch_size; /* Number of keys in the current batch */
size_t cur_idx; /* Index of the current key being prefetched */
size_t keys_done; /* Number of keys that have been processed */
} PrefetchBatch;

/* Prefetches the given pointer and move to the next key in the batch */
static void prefetch(void *addr, PrefetchBatch *batch) {
valkey_prefetch(addr);
/* while the prefetch is in progress, we can continue to the next key */
batch->cur_idx = (batch->cur_idx + 1) % batch->current_batch_size;
}

static void markDone(PrefetchInfo *info, PrefetchBatch *batch) {
info->state = PrefetchDone;
batch->keys_done++;
}

static PrefetchInfo *getNextPrefetchInfo(PrefetchBatch *batch) {
while (batch->prefetch_info[batch->cur_idx].state == PrefetchDone) {
batch->cur_idx = (batch->cur_idx + 1) % batch->current_batch_size;
}
return &batch->prefetch_info[batch->cur_idx];
}

static void initBatch(dict **keys_dicts, size_t num_keys, const void **keys, PrefetchBatch *batch) {
assert(num_keys <= DictMaxPrefetchSize);

batch->current_batch_size = num_keys;
batch->cur_idx = 0;
batch->keys_done = 0;

/* Initialize the prefetch info */
for (size_t i = 0; i < batch->current_batch_size; i++) {
PrefetchInfo *info = &batch->prefetch_info[i];
if (!keys_dicts[i] || dictSize(keys_dicts[i]) == 0) {
info->state = PrefetchDone;
batch->keys_done++;
continue;
}
info->ht_idx = -1;
info->current_entry = NULL;
info->state = PrefetchBucket;
info->key_hash = dictHashKey(keys_dicts[i], keys[i]);
}
}

/* dictPrefetch - Prefetches dictionary data for an array of keys
*
* This function takes an array of dictionaries and keys, attempting to bring
* data closer to the L1 cache that might be needed for dictionary operations
* on those keys.
*
* dictFind Algorithm:
* 1. Evaluate the hash of the key
* 2. Access the index in the first table
* 3. Walk the linked list until the key is found
* If the key hasn't been found and the dictionary is in the middle of rehashing,
* access the index on the second table and repeat step 3
*
* dictPrefetch executes the same algorithm as dictFind, but one step at a time
* for each key. Instead of waiting for data to be read from memory, it prefetches
* the data and then moves on to execute the next prefetch for another key.
*
* dictPrefetch can be invoked with a callback function, get_val_data_func,
* to bring the key's value data closer to the L1 cache as well. */
void dictPrefetch(dict **keys_dicts, size_t num_keys, const void **keys, void *(*get_val_data_func)(const void *val)) {
PrefetchBatch batch; /* prefetch batch - holds the current batch of keys being prefetched */
initBatch(keys_dicts, num_keys, keys, &batch);

while (batch.keys_done < batch.current_batch_size) {
PrefetchInfo *info = getNextPrefetchInfo(&batch);
size_t i = batch.cur_idx;
switch (info->state) {
case PrefetchBucket:
/* Determine which hash table to use */
if (info->ht_idx == -1) {
info->ht_idx = 0;
} else if (info->ht_idx == 0 && dictIsRehashing(keys_dicts[i])) {
info->ht_idx = 1;
} else {
/* No more tables left - mark as done. */
markDone(info, &batch);
break;
}

/* Prefetch the bucket */
info->bucket_idx = info->key_hash & DICTHT_SIZE_MASK(keys_dicts[i]->ht_size_exp[info->ht_idx]);
prefetch(&keys_dicts[i]->ht_table[info->ht_idx][info->bucket_idx], &batch);
info->current_entry = NULL;
info->state = PrefetchEntry;
break;

case PrefetchEntry:
/* Prefetch the first entry in the bucket */
if (info->current_entry) {
/* We already found an entry in the bucket - move to the next entry */
info->current_entry = dictGetNext(info->current_entry);
} else {
/* Find the first entry in the bucket */
info->current_entry = keys_dicts[i]->ht_table[info->ht_idx][info->bucket_idx];
}

if (info->current_entry) {
prefetch(info->current_entry, &batch);
info->state = PrefetchValue;
} else {
/* No entry found in the bucket - try the bucket in the next table */
info->state = PrefetchBucket;
}
break;

case PrefetchValue: {
/* Prefetch the entry's value. */
void *value = dictGetVal(info->current_entry);

if (dictGetNext(info->current_entry) == NULL && !dictIsRehashing(keys_dicts[i])) {
/* If this is the last element we assume a hit and dont compare the keys */
prefetch(value, &batch);
info->state = PrefetchValueData;
break;
}

void *current_entry_key = dictGetKey(info->current_entry);
if (keys[i] == current_entry_key || dictCompareKeys(keys_dicts[i], keys[i], current_entry_key)) {
/* If the key is found, prefetch the value */
prefetch(value, &batch);
info->state = PrefetchValueData;
} else {
/* Move to next entry */
info->state = PrefetchEntry;
}
break;
}

case PrefetchValueData: {
/* Prefetch value data if available */
if (get_val_data_func) {
void *value_data = get_val_data_func(dictGetVal(info->current_entry));
if (value_data) prefetch(value_data, &batch);
}
markDone(info, &batch);
break;
}

default: assert(0);
}
}
}

/* ------------------------- private functions ------------------------------ */

/* Because we may need to allocate huge memory chunk at once when dict
Expand Down
4 changes: 1 addition & 3 deletions src/dict.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,7 @@
#define DICT_ERR 1

/* Hash table parameters */
#define HASHTABLE_MIN_FILL 8 /* Minimal hash table fill 12.5%(100/8) */
#define DictMaxPrefetchSize 16 /* Limit of maximum number of dict entries to prefetch */
#define HASHTABLE_MIN_FILL 8 /* Minimal hash table fill 12.5%(100/8) */

typedef struct dictEntry dictEntry; /* opaque */
typedef struct dict dict;
Expand Down Expand Up @@ -248,7 +247,6 @@ unsigned long
dictScanDefrag(dict *d, unsigned long v, dictScanFunction *fn, dictDefragFunctions *defragfns, void *privdata);
uint64_t dictGetHash(dict *d, const void *key);
void dictRehashingInfo(dict *d, unsigned long long *from_size, unsigned long long *to_size);
void dictPrefetch(dict **keys_dicts, size_t num_keys, const void **keys, void *(*get_val_data_func)(const void *val));

size_t dictGetStatsMsg(char *buf, size_t bufsize, dictStats *stats, int full);
dictStats *dictGetStatsHt(dict *d, int htidx, int full);
Expand Down
2 changes: 2 additions & 0 deletions src/io_threads.c
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,8 @@ void initIOThreads(void) {

serverAssert(server.io_threads_num <= IO_THREADS_MAX_NUM);

prefetchCommandsBatchInit();

/* Spawn and initialize the I/O threads. */
for (int i = 1; i < server.io_threads_num; i++) {
createIOThread(i);
Expand Down
14 changes: 1 addition & 13 deletions src/kvstore.c
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ typedef struct {
/**********************************/

/* Get the dictionary pointer based on dict-index. */
static dict *kvstoreGetDict(kvstore *kvs, int didx) {
dict *kvstoreGetDict(kvstore *kvs, int didx) {
return kvs->dicts[didx];
}

Expand Down Expand Up @@ -828,15 +828,3 @@ int kvstoreDictDelete(kvstore *kvs, int didx, const void *key) {
}
return ret;
}

void kvstoreDictPrefetch(kvstore **kvs,
int *slots,
const void **keys,
size_t keys_count,
void *(*get_val_data_func)(const void *val)) {
dict *dicts[keys_count];
for (size_t i = 0; i < keys_count; i++) {
dicts[i] = kvstoreGetDict(kvs[i], slots[i]);
}
dictPrefetch(dicts, keys_count, keys, get_val_data_func);
}
5 changes: 0 additions & 5 deletions src/kvstore.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,6 @@ int kvstoreNumNonEmptyDicts(kvstore *kvs);
int kvstoreNumAllocatedDicts(kvstore *kvs);
int kvstoreNumDicts(kvstore *kvs);
uint64_t kvstoreGetHash(kvstore *kvs, const void *key);
void kvstoreDictPrefetch(kvstore **kvs,
int *slots,
const void **keys,
size_t keys_count,
void *(*get_val_data_func)(const void *val));

/* kvstore iterator specific functions */
kvstoreIterator *kvstoreIteratorInit(kvstore *kvs);
Expand Down
Loading

0 comments on commit d04a755

Please sign in to comment.