Skip to content

Commit

Permalink
Optimize IO thread offload for modified argv
Browse files Browse the repository at this point in the history
Signed-off-by: Uri Yagelnik <[email protected]>
  • Loading branch information
uriyage committed Nov 26, 2024
1 parent 1c18c80 commit 69cea2a
Show file tree
Hide file tree
Showing 5 changed files with 83 additions and 44 deletions.
1 change: 0 additions & 1 deletion src/blocked.c
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,6 @@ void unblockClient(client *c, int queue_for_reprocessing) {
/* Reset the client for a new query, unless the client has pending command to process
* or in case a shutdown operation was canceled and we are still in the processCommand sequence */
if (!c->flag.pending_command && c->bstate.btype != BLOCKED_SHUTDOWN) {
freeClientOriginalArgv(c);
/* Clients that are not blocked on keys are not reprocessed so we must
* call reqresAppendResponse here (for clients blocked on key,
* unblockClientOnKey is called, which eventually calls processCommand,
Expand Down
32 changes: 24 additions & 8 deletions src/io_threads.c
Original file line number Diff line number Diff line change
Expand Up @@ -439,13 +439,29 @@ void IOThreadFreeArgv(void *data) {
}

/* This function attempts to offload the client's argv to an IO thread.
*
* If free_original_argv is set to 1, the function will offload the free of the client's original argv.
* Otherwise, it will offload the free of the client's current argv.
*
* Returns C_OK if the client's argv were successfully offloaded to an IO thread,
* C_ERR otherwise. */
int tryOffloadFreeArgvToIOThreads(client *c) {
int tryOffloadFreeArgvToIOThreads(client *c, int free_original_argv) {
if (server.active_io_threads_num <= 1 || c->argc == 0) {
return C_ERR;
}

if (!free_original_argv && c->original_argv != NULL) {
/* If original_argv exist We offload only the original argv that may have been allocated by the IO thread. */
return C_ERR;
}

robj **argv = c->argv;
int argc = c->argc;
if (free_original_argv) {
argv = c->original_argv;
argc = c->original_argc;
}

size_t tid = (c->id % (server.active_io_threads_num - 1)) + 1;

IOJobQueue *jq = &io_jobs[tid];
Expand All @@ -456,29 +472,29 @@ int tryOffloadFreeArgvToIOThreads(client *c) {
int last_arg_to_free = -1;

/* Prepare the argv */
for (int j = 0; j < c->argc; j++) {
if (c->argv[j]->refcount > 1) {
decrRefCount(c->argv[j]);
for (int j = 0; j < argc; j++) {
if (argv[j]->refcount > 1) {
decrRefCount(argv[j]);
/* Set argv[j] to NULL to avoid double free */
c->argv[j] = NULL;
argv[j] = NULL;
} else {
last_arg_to_free = j;
}
}

/* If no argv to free, free the argv array at the main thread */
if (last_arg_to_free == -1) {
zfree(c->argv);
zfree(argv);
return C_OK;
}

/* We set the refcount of the last arg to free to 0 to indicate that
* this is the last argument to free. With this approach, we don't need to
* send the argc to the IO thread and we can send just the argv ptr. */
c->argv[last_arg_to_free]->refcount = 0;
argv[last_arg_to_free]->refcount = 0;

/* Must succeed as we checked the free space before. */
IOJobQueue_push(jq, IOThreadFreeArgv, c->argv);
IOJobQueue_push(jq, IOThreadFreeArgv, argv);

return C_OK;
}
Expand Down
2 changes: 1 addition & 1 deletion src/io_threads.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ int inMainThread(void);
int trySendReadToIOThreads(client *c);
int trySendWriteToIOThreads(client *c);
int tryOffloadFreeObjToIOThreads(robj *o);
int tryOffloadFreeArgvToIOThreads(client *c);
int tryOffloadFreeArgvToIOThreads(client *c, int free_original_argv);
void adjustIOThreadsByEventLoad(int numevents, int increase_only);
void drainIOThreadsQueue(void);
void trySendPollJobToIOThreads(void);
Expand Down
88 changes: 58 additions & 30 deletions src/networking.c
Original file line number Diff line number Diff line change
Expand Up @@ -1481,14 +1481,17 @@ void freeClientOriginalArgv(client *c) {
/* We didn't rewrite this client */
if (!c->original_argv) return;

for (int j = 0; j < c->original_argc; j++) decrRefCount(c->original_argv[j]);
zfree(c->original_argv);
if (tryOffloadFreeArgvToIOThreads(c, 1) == C_ERR) {
for (int j = 0; j < c->original_argc; j++) decrRefCount(c->original_argv[j]);
zfree(c->original_argv);
}

c->original_argv = NULL;
c->original_argc = 0;
}

void freeClientArgv(client *c) {
if (tryOffloadFreeArgvToIOThreads(c) == C_ERR) {
if (tryOffloadFreeArgvToIOThreads(c, 0) == C_ERR) {
for (int j = 0; j < c->argc; j++) decrRefCount(c->argv[j]);
zfree(c->argv);
}
Expand Down Expand Up @@ -2524,6 +2527,7 @@ void resetClient(client *c) {
serverCommandProc *prevcmd = c->cmd ? c->cmd->proc : NULL;

freeClientArgv(c);
freeClientOriginalArgv(c);
c->cur_script = NULL;
c->reqtype = 0;
c->multibulklen = 0;
Expand Down Expand Up @@ -4184,24 +4188,61 @@ void securityWarningCommand(client *c) {
freeClientAsync(c);
}

/* Keep track of the original command arguments so that we can generate
* an accurate slowlog entry after the command has been executed. */
static void retainOriginalCommandVector(client *c) {
/* We already rewrote this command, so don't rewrite it again */
if (c->original_argv) return;
c->original_argc = c->argc;
c->original_argv = zmalloc(sizeof(robj *) * (c->argc));
for (int j = 0; j < c->argc; j++) {
c->original_argv[j] = c->argv[j];
incrRefCount(c->argv[j]);
/* This function preserves the original command arguments for accurate slowlog recording.
*
* It performs the following operations:
* - Stores the initial command vector if not already saved
* - Manages memory allocation for command argument modifications
*
* new_argc - The new number of arguments to allocate space for if necessary.
* new_argv - Optional pointer to a new argument vector. If NULL, space will be
* allocated for new_argc arguments, preserving the existing arguments.
*/
static void backupAndUpdateClientArgv(client *c, int new_argc, robj **new_argv) {
robj **old_argv = c->argv;
int old_argc = c->argc;

/* Store original arguments if not already saved */
if (!c->original_argv) {
c->original_argc = old_argc;
c->original_argv = old_argv;
}

/* Handle direct argv replacement */
if (new_argv) {
c->argv = new_argv;
} else if (c->original_argv == old_argv || new_argc > old_argc) {
/* Allocate new array if necessary */
c->argv = zmalloc(sizeof(robj *) * new_argc);

for (int i = 0; i < old_argc && i < new_argc; i++) {
c->argv[i] = old_argv[i];
incrRefCount(c->argv[i]);
}

/* Initialize new argument slots to NULL */
for (int i = old_argc; i < new_argc; i++) {
c->argv[i] = NULL;
}
}

c->argc = new_argc;
c->argv_len = new_argc;

/* Clean up old argv if necessary */
if (c->argv != old_argv && c->original_argv != old_argv) {
for (int i = 0; i < old_argc; i++) {
decrRefCount(old_argv[i]);
}
zfree(old_argv);
}
}

/* Redact a given argument to prevent it from being shown
* in the slowlog. This information is stored in the
* original_argv array. */
void redactClientCommandArgument(client *c, int argc) {
retainOriginalCommandVector(c);
backupAndUpdateClientArgv(c, c->argc, NULL);
if (c->original_argv[argc] == shared.redacted) {
/* This argument has already been redacted */
return;
Expand Down Expand Up @@ -4234,10 +4275,7 @@ void rewriteClientCommandVector(client *c, int argc, ...) {
/* Completely replace the client command vector with the provided one. */
void replaceClientCommandVector(client *c, int argc, robj **argv) {
int j;
retainOriginalCommandVector(c);
freeClientArgv(c);
c->argv = argv;
c->argc = argc;
backupAndUpdateClientArgv(c, argc, argv);
c->argv_len_sum = 0;
for (j = 0; j < c->argc; j++)
if (c->argv[j]) c->argv_len_sum += getStringObjectLen(c->argv[j]);
Expand All @@ -4258,19 +4296,9 @@ void replaceClientCommandVector(client *c, int argc, robj **argv) {
* free the no longer used objects on c->argv. */
void rewriteClientCommandArgument(client *c, int i, robj *newval) {
robj *oldval;
retainOriginalCommandVector(c);
int new_argc = (i >= c->argc) ? i + 1 : c->argc;
backupAndUpdateClientArgv(c, new_argc, NULL);

/* We need to handle both extending beyond argc (just update it and
* initialize the new element) or beyond argv_len (realloc is needed).
*/
if (i >= c->argc) {
if (i >= c->argv_len) {
c->argv = zrealloc(c->argv, sizeof(robj *) * (i + 1));
c->argv_len = i + 1;
}
c->argc = i + 1;
c->argv[i] = NULL;
}
oldval = c->argv[i];
if (oldval) c->argv_len_sum -= getStringObjectLen(oldval);
if (newval) c->argv_len_sum += getStringObjectLen(newval);
Expand Down
4 changes: 0 additions & 4 deletions src/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -3670,10 +3670,6 @@ void call(client *c, int flags) {
replicationFeedMonitors(c, server.monitors, c->db->id, argv, argc);
}

/* Clear the original argv.
* If the client is blocked we will handle slowlog when it is unblocked. */
if (!c->flag.blocked) freeClientOriginalArgv(c);

/* Populate the per-command and per-slot statistics that we show in INFO commandstats and CLUSTER SLOT-STATS,
* respectively. If the client is blocked we will handle latency stats and duration when it is unblocked. */
if (update_command_stats && !c->flag.blocked) {
Expand Down

0 comments on commit 69cea2a

Please sign in to comment.