Skip to content

Commit

Permalink
PR comments
Browse files Browse the repository at this point in the history
  • Loading branch information
uriyage committed Aug 25, 2024
1 parent d300b1b commit 3dcf709
Show file tree
Hide file tree
Showing 8 changed files with 73 additions and 65 deletions.
8 changes: 1 addition & 7 deletions src/config.c
Original file line number Diff line number Diff line change
Expand Up @@ -2564,12 +2564,6 @@ static int updateOOMScoreAdj(const char **err) {
return 1;
}

static int UpdateMaxPrefetchBatchSize(const char **err) {
UNUSED(err);
onMaxBatchSizeChange();
return 1;
}

int invalidateClusterSlotsResp(const char **err) {
UNUSED(err);
clearCachedClusterSlotsResponse();
Expand Down Expand Up @@ -3170,7 +3164,7 @@ standardConfig static_configs[] = {
createIntConfig("port", NULL, MODIFIABLE_CONFIG, 0, 65535, server.port, 6379, INTEGER_CONFIG, NULL, updatePort), /* TCP port. */
createIntConfig("io-threads", NULL, DEBUG_CONFIG | IMMUTABLE_CONFIG, 1, 128, server.io_threads_num, 1, INTEGER_CONFIG, NULL, NULL), /* Single threaded by default */
createIntConfig("events-per-io-thread", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.events_per_io_thread, 2, INTEGER_CONFIG, NULL, NULL),
createIntConfig("prefetch-batch-max-size", NULL, MODIFIABLE_CONFIG, 0, 128, server.prefetch_batch_max_size, 16, INTEGER_CONFIG, NULL, UpdateMaxPrefetchBatchSize),
createIntConfig("prefetch-batch-max-size", NULL, MODIFIABLE_CONFIG, 0, 128, server.prefetch_batch_max_size, 16, INTEGER_CONFIG, NULL, NULL),
createIntConfig("auto-aof-rewrite-percentage", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.aof_rewrite_perc, 100, INTEGER_CONFIG, NULL, NULL),
createIntConfig("cluster-replica-validity-factor", "cluster-slave-validity-factor", MODIFIABLE_CONFIG, 0, INT_MAX, server.cluster_replica_validity_factor, 10, INTEGER_CONFIG, NULL, NULL), /* replica max data age factor. */
createIntConfig("list-max-listpack-size", "list-max-ziplist-size", MODIFIABLE_CONFIG, INT_MIN, INT_MAX, server.list_max_listpack_size, -2, INTEGER_CONFIG, NULL, NULL),
Expand Down
1 change: 0 additions & 1 deletion src/dict.c
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,6 @@ static void _dictExpandIfNeeded(dict *d);
static void _dictShrinkIfNeeded(dict *d);
static signed char _dictNextExp(unsigned long size);
static int _dictInit(dict *d, dictType *type);
dictEntry *dictGetNext(const dictEntry *de);
static dictEntry **dictGetNextRef(dictEntry *de);
static void dictSetNext(dictEntry *de, dictEntry *next);

Expand Down
1 change: 1 addition & 0 deletions src/dict.h
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,7 @@ void dictInitIterator(dictIterator *iter, dict *d);
void dictInitSafeIterator(dictIterator *iter, dict *d);
void dictResetIterator(dictIterator *iter);
dictEntry *dictNext(dictIterator *iter);
dictEntry *dictGetNext(const dictEntry *de);
void dictReleaseIterator(dictIterator *iter);
dictEntry *dictGetRandomKey(dict *d);
dictEntry *dictGetFairRandomKey(dict *d);
Expand Down
6 changes: 6 additions & 0 deletions src/io_threads.c
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
/*
* Copyright Valkey Contributors.
* All rights reserved.
* SPDX-License-Identifier: BSD 3-Clause
*/

#include "io_threads.h"

static __thread int thread_id = 0; /* Thread local var */
Expand Down
1 change: 1 addition & 0 deletions src/kvstore.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,5 +76,6 @@ void kvstoreDictSetVal(kvstore *kvs, int didx, dictEntry *de, void *val);
dictEntry *kvstoreDictTwoPhaseUnlinkFind(kvstore *kvs, int didx, const void *key, dictEntry ***plink, int *table_index);
void kvstoreDictTwoPhaseUnlinkFree(kvstore *kvs, int didx, dictEntry *he, dictEntry **plink, int table_index);
int kvstoreDictDelete(kvstore *kvs, int didx, const void *key);
dict *kvstoreGetDict(kvstore *kvs, int didx);

#endif /* DICTARRAY_H_ */
62 changes: 33 additions & 29 deletions src/memory_prefetch.c
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
/*
* Copyright Valkey Contributors.
* All rights reserved.
* SPDX-License-Identifier: BSD 3-Clause
*
* This file utilizes prefetching keys and data for multiple commands in a batch,
* to improve performance by amortizing memory access costs across multiple operations.
*/
Expand Down Expand Up @@ -48,22 +52,22 @@ typedef enum {
└───────────┬──────────────┘ │
| │
┌───────-─▼─────────────┐ │
│ PREFETCH_DONE │◄────────┘
│ PREFETCH_DONE │◄────────┘
└───────────────────────┘
**********************************************************************************************************************/

typedef void *(*GetValueDataFunc)(const void *val);

typedef struct PrefetchInfo {
typedef struct KeyPrefetchInfo {
PrefetchState state; /* Current state of the prefetch operation */
HashTableIndex ht_idx; /* Index of the current hash table (0 or 1 for rehashing) */
uint64_t bucket_idx; /* Index of the bucket in the current hash table */
uint64_t key_hash; /* Hash value of the key being prefetched */
dictEntry *current_entry; /* Pointer to the current entry being processed */
} PrefetchInfo;
} KeyPrefetchInfo;

/* CommandsBatch structure holds the state of the current batch of client commands being processed. */
typedef struct CommandsBatch {
/* PrefetchCommandsBatch structure holds the state of the current batch of client commands being processed. */
typedef struct PrefetchCommandsBatch {
size_t cur_idx; /* Index of the current key being processed */
size_t keys_done; /* Number of keys that have been prefetched */
size_t key_count; /* Number of keys in the current batch */
Expand All @@ -76,10 +80,10 @@ typedef struct CommandsBatch {
dict **keys_dicts; /* Main dict for each key */
dict **expire_dicts; /* Expire dict for each key */
dict **current_dicts; /* Points to either keys_dicts or expire_dicts */
PrefetchInfo *prefetch_info; /* Prefetch info for each key */
} CommandsBatch;
KeyPrefetchInfo *prefetch_info; /* Prefetch info for each key */
} PrefetchCommandsBatch;

static CommandsBatch *batch = NULL;
static PrefetchCommandsBatch *batch = NULL;

void freePrefetchCommandsBatch(void) {
if (batch == NULL) {
Expand All @@ -104,14 +108,14 @@ void prefetchCommandsBatchInit(void) {
return;
}

batch = zcalloc(sizeof(CommandsBatch));
batch = zcalloc(sizeof(PrefetchCommandsBatch));
batch->max_prefetch_size = max_prefetch_size;
batch->clients = zcalloc(max_prefetch_size * sizeof(client *));
batch->keys = zcalloc(max_prefetch_size * sizeof(void *));
batch->keys_dicts = zcalloc(max_prefetch_size * sizeof(dict *));
batch->expire_dicts = zcalloc(max_prefetch_size * sizeof(dict *));
batch->slots = zcalloc(max_prefetch_size * sizeof(int));
batch->prefetch_info = zcalloc(max_prefetch_size * sizeof(PrefetchInfo));
batch->prefetch_info = zcalloc(max_prefetch_size * sizeof(KeyPrefetchInfo));
}

void onMaxBatchSizeChange(void) {
Expand All @@ -125,23 +129,23 @@ void onMaxBatchSizeChange(void) {
}

/* Prefetch the given pointer and move to the next key in the batch. */
static void prefetch(void *addr) {
static void prefetchAndMoveToNextKey(void *addr) {
valkey_prefetch(addr);
/* While the prefetch is in progress, we can continue to the next key */
batch->cur_idx = (batch->cur_idx + 1) % batch->key_count;
}

static void markDone(PrefetchInfo *info) {
static void markKeyAsdone(KeyPrefetchInfo *info) {
info->state = PREFETCH_DONE;
server.stat_total_prefetch_entries++;
batch->keys_done++;
}

/* Returns the next PrefetchInfo structure that needs to be processed. */
static PrefetchInfo *getNextPrefetchInfo(void) {
/* Returns the next KeyPrefetchInfo structure that needs to be processed. */
static KeyPrefetchInfo *getNextPrefetchInfo(void) {
size_t start_idx = batch->cur_idx;
do {
PrefetchInfo *info = &batch->prefetch_info[batch->cur_idx];
KeyPrefetchInfo *info = &batch->prefetch_info[batch->cur_idx];
if (info->state != PREFETCH_DONE) return info;
batch->cur_idx = (batch->cur_idx + 1) % batch->key_count;
} while (batch->cur_idx != start_idx);
Expand All @@ -153,7 +157,7 @@ static void initBatchInfo(dict **dicts) {

/* Initialize the prefetch info */
for (size_t i = 0; i < batch->key_count; i++) {
PrefetchInfo *info = &batch->prefetch_info[i];
KeyPrefetchInfo *info = &batch->prefetch_info[i];
if (!batch->current_dicts[i] || dictSize(batch->current_dicts[i]) == 0) {
info->state = PREFETCH_DONE;
batch->keys_done++;
Expand All @@ -168,7 +172,7 @@ static void initBatchInfo(dict **dicts) {

/* Prefetch the bucket of the next hash table index.
* If no tables are left, move to the PREFETCH_DONE state. */
static void prefetchBucket(PrefetchInfo *info) {
static void prefetchBucket(KeyPrefetchInfo *info) {
size_t i = batch->cur_idx;

/* Determine which hash table to use */
Expand All @@ -178,20 +182,20 @@ static void prefetchBucket(PrefetchInfo *info) {
info->ht_idx = HT_IDX_SECOND;
} else {
/* No more tables left - mark as done. */
markDone(info);
markKeyAsdone(info);
return;
}

/* Prefetch the bucket */
info->bucket_idx = info->key_hash & DICTHT_SIZE_MASK(batch->current_dicts[i]->ht_size_exp[info->ht_idx]);
prefetch(&batch->current_dicts[i]->ht_table[info->ht_idx][info->bucket_idx]);
prefetchAndMoveToNextKey(&batch->current_dicts[i]->ht_table[info->ht_idx][info->bucket_idx]);
info->current_entry = NULL;
info->state = PREFETCH_ENTRY;
}

/* Prefetch the next entry in the bucket and move to the PREFETCH_VALUE state.
* If no more entries in the bucket, move to the PREFETCH_BUCKET state to look at the next table. */
static void prefetchEntry(PrefetchInfo *info) {
static void prefetchEntry(KeyPrefetchInfo *info) {
size_t i = batch->cur_idx;

if (info->current_entry) {
Expand All @@ -203,7 +207,7 @@ static void prefetchEntry(PrefetchInfo *info) {
}

if (info->current_entry) {
prefetch(info->current_entry);
prefetchAndMoveToNextKey(info->current_entry);
info->state = PREFETCH_VALUE;
} else {
/* No entry found in the bucket - try the bucket in the next table */
Expand All @@ -213,13 +217,13 @@ static void prefetchEntry(PrefetchInfo *info) {

/* Prefetch the entry's value. If the value is found, move to the PREFETCH_VALUE_DATA state.
* If the value is not found, move to the PREFETCH_ENTRY state to look at the next entry in the bucket. */
static void prefetchValue(PrefetchInfo *info) {
static void prefetchValue(KeyPrefetchInfo *info) {
size_t i = batch->cur_idx;
void *value = dictGetVal(info->current_entry);

if (dictGetNext(info->current_entry) == NULL && !dictIsRehashing(batch->current_dicts[i])) {
/* If this is the last element, we assume a hit and don't compare the keys */
prefetch(value);
prefetchAndMoveToNextKey(value);
info->state = PREFETCH_VALUE_DATA;
return;
}
Expand All @@ -228,7 +232,7 @@ static void prefetchValue(PrefetchInfo *info) {
if (batch->keys[i] == current_entry_key ||
dictCompareKeys(batch->current_dicts[i], batch->keys[i], current_entry_key)) {
/* If the key is found, prefetch the value */
prefetch(value);
prefetchAndMoveToNextKey(value);
info->state = PREFETCH_VALUE_DATA;
} else {
/* Move to the next entry */
Expand All @@ -237,12 +241,12 @@ static void prefetchValue(PrefetchInfo *info) {
}

/* Prefetch the value data if available. */
static void prefetchValueData(PrefetchInfo *info, GetValueDataFunc get_val_data_func) {
static void prefetchValueData(KeyPrefetchInfo *info, GetValueDataFunc get_val_data_func) {
if (get_val_data_func) {
void *value_data = get_val_data_func(dictGetVal(info->current_entry));
if (value_data) prefetch(value_data);
if (value_data) prefetchAndMoveToNextKey(value_data);
}
markDone(info);
markKeyAsdone(info);
}

/* Prefetch dictionary data for an array of keys.
Expand All @@ -268,7 +272,7 @@ static void prefetchValueData(PrefetchInfo *info, GetValueDataFunc get_val_data_
*/
static void dictPrefetch(dict **dicts, GetValueDataFunc get_val_data_func) {
initBatchInfo(dicts);
PrefetchInfo *info;
KeyPrefetchInfo *info;
while ((info = getNextPrefetchInfo())) {
switch (info->state) {
case PREFETCH_BUCKET: prefetchBucket(info); break;
Expand Down Expand Up @@ -357,7 +361,7 @@ void processClientsCommandsBatch(void) {

resetCommandsBatch();

/* Handle the case where the max prefetch size has been changed during the batch processing */
/* Handle the case where the max prefetch size has been changed. */
if (batch->max_prefetch_size != (size_t)server.prefetch_batch_max_size) {
onMaxBatchSizeChange();
}
Expand Down
1 change: 0 additions & 1 deletion src/memory_prefetch.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

struct client;

void onMaxBatchSizeChange(void);
void prefetchCommandsBatchInit(void);
void processClientsCommandsBatch(void);
int addCommandToBatchAndProcessIfFull(struct client *c);
Expand Down
58 changes: 31 additions & 27 deletions tests/unit/networking.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -172,12 +172,12 @@ start_server {config "minimal.conf" tags {"external:skip"}} {
}

start_server {config "minimal.conf" tags {"external:skip"} overrides {enable-debug-command {yes}}} {

set server_pid [s process_id]
# Skip if non io-threads mode - as it is relevant only for io-threads mode
if {[r config get io-threads] ne "io-threads 1"} {
test {prefetch works as expected when killing a client from the middle of prefetch commands batch} {
# Create 17 (prefetch batch size) +1 clients
for {set i 0} {$i < 17} {incr i} {
# Create 16 (prefetch batch size) +1 clients
for {set i 0} {$i < 16} {incr i} {
set rd$i [valkey_deferring_client]
}

Expand All @@ -188,22 +188,24 @@ start_server {config "minimal.conf" tags {"external:skip"} overrides {enable-deb
$rd4 client id
set rd4_id [$rd4 read]

# Create a batch of commands by making sure the server sleeps for a while
# Create a batch of commands by suspending the server for a while
# before responding to the first command
$rd0 debug sleep 2
after 200 ; # wait a bit to make sure the server is sleeping.
pause_process $server_pid

# The first client will kill the fourth client
$rd1 client kill id $rd4_id
$rd0 client kill id $rd4_id

# Send set commands for all clients except the first
for {set i 1} {$i < 17} {incr i} {
for {set i 1} {$i < 16} {incr i} {
[set rd$i] set a $i
[set rd$i] flush
}

# Resume the server
resume_process $server_pid

# Read the results
assert_equal {1} [$rd1 read]
assert_equal {1} [$rd0 read]
catch {$rd4 read} err
assert_match {I/O error reading reply} $err

Expand All @@ -212,25 +214,24 @@ start_server {config "minimal.conf" tags {"external:skip"} overrides {enable-deb
set prefetch_entries [getInfoProperty $info io_threaded_total_prefetch_entries]
assert_range $prefetch_entries 2 15; # With slower machines, the number of prefetch entries can be lower
set prefetch_batches [getInfoProperty $info io_threaded_total_prefetch_batches]
assert_range $prefetch_batches 1 7; # With slower machines, the number of batches can be higher
assert_range $prefetch_batches 1 7; # With slower machines, the number of batches can be higher

# Verify the final state
$rd16 get a
assert_equal {OK} [$rd16 read]
assert_equal {16} [$rd16 read]
$rd15 get a
assert_equal {OK} [$rd15 read]
assert_equal {15} [$rd15 read]
}

test {prefetch works as expected when changing the batch size while executing the commands batch} {
# Create 16 (default prefetch batch size) clients
for {set i 0} {$i < 16} {incr i} {
set rd$i [valkey_deferring_client]
}
# Create a batch of commands by making sure the server sleeps for a while

# Create a batch of commands by suspending the server for a while
# before responding to the first command
$rd0 debug sleep 2
after 200 ; # wait a bit to make sure the server is sleeping.

pause_process $server_pid

# Send set commands for all clients the 5th client will change the prefetch batch size
for {set i 0} {$i < 16} {incr i} {
if {$i == 4} {
Expand All @@ -239,14 +240,15 @@ start_server {config "minimal.conf" tags {"external:skip"} overrides {enable-deb
[set rd$i] set a $i
[set rd$i] flush
}

# Resume the server
resume_process $server_pid
# Read the results
for {set i 0} {$i < 16} {incr i} {
assert_equal {OK} [[set rd$i] read]
}

# assert the configured prefetch batch size was changed
assert {[r config get prefetch-batch-max-size] eq "prefetch-batch-max-size 1"}
assert {[r config get prefetch-batch-max-size] eq "prefetch-batch-max-size 1"}
}

test {no prefetch when the batch size is set to 0} {
Expand All @@ -260,18 +262,20 @@ start_server {config "minimal.conf" tags {"external:skip"} overrides {enable-deb
for {set i 0} {$i < 16} {incr i} {
set rd$i [valkey_deferring_client]
}
# Create a batch of commands by making sure the server sleeps for a while

# Create a batch of commands by suspending the server for a while
# before responding to the first command
$rd0 debug sleep 2
after 200 ; # wait a bit to make sure the server is sleeping.

pause_process $server_pid

# Send set commands for all clients
for {set i 0} {$i < 16} {incr i} {
[set rd$i] set a $i
[set rd$i] flush
}


# Resume the server
resume_process $server_pid

# Read the results
for {set i 0} {$i < 16} {incr i} {
assert_equal {OK} [[set rd$i] read]
Expand Down

0 comments on commit 3dcf709

Please sign in to comment.