Skip to content

Commit

Permalink
client struct: lazy init components and optimize struct layout
Browse files Browse the repository at this point in the history
Signed-off-by: Uri Yagelnik <[email protected]>
  • Loading branch information
uriyage committed Dec 8, 2024
1 parent f20d629 commit 618c9eb
Show file tree
Hide file tree
Showing 16 changed files with 761 additions and 679 deletions.
6 changes: 3 additions & 3 deletions src/acl.c
Original file line number Diff line number Diff line change
Expand Up @@ -1951,7 +1951,7 @@ int ACLShouldKillPubsubClient(client *c, list *upcoming) {

if (getClientType(c) == CLIENT_TYPE_PUBSUB) {
/* Check for pattern violations. */
dictIterator *di = dictGetIterator(c->pubsub_patterns);
dictIterator *di = dictGetIterator(c->pubsub_data->pubsub_patterns);
dictEntry *de;
while (!kill && ((de = dictNext(di)) != NULL)) {
o = dictGetKey(de);
Expand All @@ -1963,7 +1963,7 @@ int ACLShouldKillPubsubClient(client *c, list *upcoming) {
/* Check for channel violations. */
if (!kill) {
/* Check for global channels violation. */
di = dictGetIterator(c->pubsub_channels);
di = dictGetIterator(c->pubsub_data->pubsub_channels);

while (!kill && ((de = dictNext(di)) != NULL)) {
o = dictGetKey(de);
Expand All @@ -1974,7 +1974,7 @@ int ACLShouldKillPubsubClient(client *c, list *upcoming) {
}
if (!kill) {
/* Check for shard channels violation. */
di = dictGetIterator(c->pubsubshard_channels);
di = dictGetIterator(c->pubsub_data->pubsubshard_channels);
while (!kill && ((de = dictNext(di)) != NULL)) {
o = dictGetKey(de);
int res = ACLCheckChannelAgainstList(upcoming, o->ptr, sdslen(o->ptr), 0);
Expand Down
3 changes: 2 additions & 1 deletion src/aof.c
Original file line number Diff line number Diff line change
Expand Up @@ -1375,7 +1375,8 @@ struct client *createAOFClient(void) {

/* We set the fake client as a replica waiting for the synchronization
* so that the server will not try to send replies to this client. */
c->repl_state = REPLICA_STATE_WAIT_BGSAVE_START;
initClientReplicationData(c);
c->repl_data->repl_state = REPLICA_STATE_WAIT_BGSAVE_START;
return c;
}

Expand Down
127 changes: 71 additions & 56 deletions src/blocked.c
Original file line number Diff line number Diff line change
Expand Up @@ -74,16 +74,25 @@ static void moduleUnblockClientOnKey(client *c, robj *key);
static void releaseBlockedEntry(client *c, dictEntry *de, int remove_key);

void initClientBlockingState(client *c) {
c->bstate.btype = BLOCKED_NONE;
c->bstate.timeout = 0;
c->bstate.unblock_on_nokey = 0;
c->bstate.keys = dictCreate(&objectKeyHeapPointerValueDictType);
c->bstate.numreplicas = 0;
c->bstate.numlocal = 0;
c->bstate.reploffset = 0;
c->bstate.generic_blocked_list_node = NULL;
c->bstate.module_blocked_handle = NULL;
c->bstate.async_rm_call_handle = NULL;
if (c->bstate) return;
c->bstate = zmalloc(sizeof(blockingState));
c->bstate->btype = BLOCKED_NONE;
c->bstate->timeout = 0;
c->bstate->unblock_on_nokey = 0;
c->bstate->keys = dictCreate(&objectKeyHeapPointerValueDictType);
c->bstate->numreplicas = 0;
c->bstate->numlocal = 0;
c->bstate->reploffset = 0;
c->bstate->generic_blocked_list_node = NULL;
c->bstate->module_blocked_handle = NULL;
c->bstate->async_rm_call_handle = NULL;
}

void freeClientBlockingState(client *c) {
if (!c->bstate) return;
dictRelease(c->bstate->keys);
zfree(c->bstate);
c->bstate = NULL;
}

/* Block a client for the specific operation type. Once the CLIENT_BLOCKED
Expand All @@ -93,8 +102,10 @@ void blockClient(client *c, int btype) {
/* Primary client should never be blocked unless pause or module */
serverAssert(!(c->flag.primary && btype != BLOCKED_MODULE && btype != BLOCKED_POSTPONE));

initClientBlockingState(c);

c->flag.blocked = 1;
c->bstate.btype = btype;
c->bstate->btype = btype;
if (!c->flag.module)
server.blocked_clients++; /* We count blocked client stats on regular clients and not on module clients */
server.blocked_clients_by_type[btype]++;
Expand Down Expand Up @@ -186,26 +197,26 @@ void queueClientForReprocessing(client *c) {
/* Unblock a client calling the right function depending on the kind
* of operation the client is blocking for. */
void unblockClient(client *c, int queue_for_reprocessing) {
if (c->bstate.btype == BLOCKED_LIST || c->bstate.btype == BLOCKED_ZSET || c->bstate.btype == BLOCKED_STREAM) {
if (c->bstate->btype == BLOCKED_LIST || c->bstate->btype == BLOCKED_ZSET || c->bstate->btype == BLOCKED_STREAM) {
unblockClientWaitingData(c);
} else if (c->bstate.btype == BLOCKED_WAIT) {
} else if (c->bstate->btype == BLOCKED_WAIT) {
unblockClientWaitingReplicas(c);
} else if (c->bstate.btype == BLOCKED_MODULE) {
} else if (c->bstate->btype == BLOCKED_MODULE) {
if (moduleClientIsBlockedOnKeys(c)) unblockClientWaitingData(c);
unblockClientFromModule(c);
} else if (c->bstate.btype == BLOCKED_POSTPONE) {
serverAssert(c->bstate.postponed_list_node);
listDelNode(server.postponed_clients, c->bstate.postponed_list_node);
c->bstate.postponed_list_node = NULL;
} else if (c->bstate.btype == BLOCKED_SHUTDOWN) {
} else if (c->bstate->btype == BLOCKED_POSTPONE) {
serverAssert(c->bstate->postponed_list_node);
listDelNode(server.postponed_clients, c->bstate->postponed_list_node);
c->bstate->postponed_list_node = NULL;
} else if (c->bstate->btype == BLOCKED_SHUTDOWN) {
/* No special cleanup. */
} else {
serverPanic("Unknown btype in unblockClient().");
}

/* Reset the client for a new query, unless the client has pending command to process
* or in case a shutdown operation was canceled and we are still in the processCommand sequence */
if (!c->flag.pending_command && c->bstate.btype != BLOCKED_SHUTDOWN) {
if (!c->flag.pending_command && c->bstate->btype != BLOCKED_SHUTDOWN) {
/* Clients that are not blocked on keys are not reprocessed so we must
* call reqresAppendResponse here (for clients blocked on key,
* unblockClientOnKey is called, which eventually calls processCommand,
Expand All @@ -216,12 +227,12 @@ void unblockClient(client *c, int queue_for_reprocessing) {

/* We count blocked client stats on regular clients and not on module clients */
if (!c->flag.module) server.blocked_clients--;
server.blocked_clients_by_type[c->bstate.btype]--;
server.blocked_clients_by_type[c->bstate->btype]--;
/* Clear the flags, and put the client in the unblocked list so that
* we'll process new commands in its query buffer ASAP. */
c->flag.blocked = 0;
c->bstate.btype = BLOCKED_NONE;
c->bstate.unblock_on_nokey = 0;
c->bstate->btype = BLOCKED_NONE;
c->bstate->unblock_on_nokey = 0;
removeClientFromTimeoutTable(c);
if (queue_for_reprocessing) queueClientForReprocessing(c);
}
Expand All @@ -230,22 +241,22 @@ void unblockClient(client *c, int queue_for_reprocessing) {
* send it a reply of some kind. After this function is called,
* unblockClient() will be called with the same client as argument. */
void replyToBlockedClientTimedOut(client *c) {
if (c->bstate.btype == BLOCKED_LIST || c->bstate.btype == BLOCKED_ZSET || c->bstate.btype == BLOCKED_STREAM) {
if (c->bstate->btype == BLOCKED_LIST || c->bstate->btype == BLOCKED_ZSET || c->bstate->btype == BLOCKED_STREAM) {
addReplyNullArray(c);
updateStatsOnUnblock(c, 0, 0, 0);
} else if (c->bstate.btype == BLOCKED_WAIT) {
} else if (c->bstate->btype == BLOCKED_WAIT) {
if (c->cmd->proc == waitCommand) {
addReplyLongLong(c, replicationCountAcksByOffset(c->bstate.reploffset));
addReplyLongLong(c, replicationCountAcksByOffset(c->bstate->reploffset));
} else if (c->cmd->proc == waitaofCommand) {
addReplyArrayLen(c, 2);
addReplyLongLong(c, server.fsynced_reploff >= c->bstate.reploffset);
addReplyLongLong(c, replicationCountAOFAcksByOffset(c->bstate.reploffset));
addReplyLongLong(c, server.fsynced_reploff >= c->bstate->reploffset);
addReplyLongLong(c, replicationCountAOFAcksByOffset(c->bstate->reploffset));
} else if (c->cmd->proc == clusterCommand) {
addReplyErrorObject(c, shared.noreplicaserr);
} else {
serverPanic("Unknown wait command %s in replyToBlockedClientTimedOut().", c->cmd->declared_name);
}
} else if (c->bstate.btype == BLOCKED_MODULE) {
} else if (c->bstate->btype == BLOCKED_MODULE) {
moduleBlockedClientTimedOut(c, 0);
} else {
serverPanic("Unknown btype in replyToBlockedClientTimedOut().");
Expand All @@ -261,7 +272,7 @@ void replyToClientsBlockedOnShutdown(void) {
listRewind(server.clients, &li);
while ((ln = listNext(&li))) {
client *c = listNodeValue(ln);
if (c->flag.blocked && c->bstate.btype == BLOCKED_SHUTDOWN) {
if (c->flag.blocked && c->bstate->btype == BLOCKED_SHUTDOWN) {
addReplyError(c, "Errors trying to SHUTDOWN. Check logs.");
unblockClient(c, 1);
}
Expand All @@ -288,7 +299,7 @@ void disconnectAllBlockedClients(void) {
* command processing will start from scratch, and the command will
* be either executed or rejected. (unlike LIST blocked clients for
* which the command is already in progress in a way. */
if (c->bstate.btype == BLOCKED_POSTPONE) continue;
if (c->bstate->btype == BLOCKED_POSTPONE) continue;

unblockClientOnError(c, "-UNBLOCKED force unblock from blocking operation, "
"instance state changed (master -> replica?)");
Expand Down Expand Up @@ -373,15 +384,17 @@ void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeo
list *l;
int j;

initClientBlockingState(c);

if (!c->flag.reprocessing_command) {
/* If the client is re-processing the command, we do not set the timeout
* because we need to retain the client's original timeout. */
c->bstate.timeout = timeout;
c->bstate->timeout = timeout;
}

for (j = 0; j < numkeys; j++) {
/* If the key already exists in the dictionary ignore it. */
if (!(client_blocked_entry = dictAddRaw(c->bstate.keys, keys[j], NULL))) {
if (!(client_blocked_entry = dictAddRaw(c->bstate->keys, keys[j], NULL))) {
continue;
}
incrRefCount(keys[j]);
Expand All @@ -398,7 +411,7 @@ void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeo
l = dictGetVal(db_blocked_existing_entry);
}
listAddNodeTail(l, c);
dictSetVal(c->bstate.keys, client_blocked_entry, listLast(l));
dictSetVal(c->bstate->keys, client_blocked_entry, listLast(l));

/* We need to add the key to blocking_keys_unblock_on_nokey, if the client
* wants to be awakened if key is deleted (like XREADGROUP) */
Expand All @@ -412,7 +425,7 @@ void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeo
}
}
}
c->bstate.unblock_on_nokey = unblock_on_nokey;
c->bstate->unblock_on_nokey = unblock_on_nokey;
/* Currently we assume key blocking will require reprocessing the command.
* However in case of modules, they have a different way to handle the reprocessing
* which does not require setting the pending command flag */
Expand All @@ -426,15 +439,15 @@ static void unblockClientWaitingData(client *c) {
dictEntry *de;
dictIterator *di;

if (dictSize(c->bstate.keys) == 0) return;
if (dictSize(c->bstate->keys) == 0) return;

di = dictGetIterator(c->bstate.keys);
di = dictGetIterator(c->bstate->keys);
/* The client may wait for multiple keys, so unblock it for every key. */
while ((de = dictNext(di)) != NULL) {
releaseBlockedEntry(c, de, 0);
}
dictReleaseIterator(di);
dictEmpty(c->bstate.keys, NULL);
dictEmpty(c->bstate->keys, NULL);
}

static blocking_type getBlockedTypeByType(int type) {
Expand Down Expand Up @@ -533,7 +546,7 @@ static void releaseBlockedEntry(client *c, dictEntry *de, int remove_key) {
if (listLength(l) == 0) {
dictDelete(c->db->blocking_keys, key);
dictDelete(c->db->blocking_keys_unblock_on_nokey, key);
} else if (c->bstate.unblock_on_nokey) {
} else if (c->bstate->unblock_on_nokey) {
unblock_on_nokey_entry = dictFind(c->db->blocking_keys_unblock_on_nokey, key);
/* it is not possible to have a client blocked on nokey with no matching entry */
serverAssertWithInfo(c, key, unblock_on_nokey_entry != NULL);
Expand All @@ -542,7 +555,7 @@ static void releaseBlockedEntry(client *c, dictEntry *de, int remove_key) {
dictDelete(c->db->blocking_keys_unblock_on_nokey, key);
}
}
if (remove_key) dictDelete(c->bstate.keys, key);
if (remove_key) dictDelete(c->bstate->keys, key);
}

void signalKeyAsReady(serverDb *db, robj *key, int type) {
Expand Down Expand Up @@ -580,9 +593,9 @@ static void handleClientsBlockedOnKey(readyList *rl) {
* module is trying to accomplish right now.
* 3. In case of XREADGROUP call we will want to unblock on any change in object type
* or in case the key was deleted, since the group is no longer valid. */
if ((o != NULL && (receiver->bstate.btype == getBlockedTypeByType(o->type))) ||
(o != NULL && (receiver->bstate.btype == BLOCKED_MODULE)) || (receiver->bstate.unblock_on_nokey)) {
if (receiver->bstate.btype != BLOCKED_MODULE)
if ((o != NULL && (receiver->bstate->btype == getBlockedTypeByType(o->type))) ||
(o != NULL && (receiver->bstate->btype == BLOCKED_MODULE)) || (receiver->bstate->unblock_on_nokey)) {
if (receiver->bstate->btype != BLOCKED_MODULE)
unblockClientOnKey(receiver, rl->key);
else
moduleUnblockClientOnKey(receiver, rl->key);
Expand All @@ -593,28 +606,30 @@ static void handleClientsBlockedOnKey(readyList *rl) {

/* block a client for replica acknowledgement */
void blockClientForReplicaAck(client *c, mstime_t timeout, long long offset, long numreplicas, int numlocal) {
c->bstate.timeout = timeout;
c->bstate.reploffset = offset;
c->bstate.numreplicas = numreplicas;
c->bstate.numlocal = numlocal;
initClientBlockingState(c);
c->bstate->timeout = timeout;
c->bstate->reploffset = offset;
c->bstate->numreplicas = numreplicas;
c->bstate->numlocal = numlocal;
listAddNodeHead(server.clients_waiting_acks, c);
/* Note that we remember the linked list node where the client is stored,
* this way removing the client in unblockClientWaitingReplicas() will not
* require a linear scan, but just a constant time operation. */
serverAssert(c->bstate.client_waiting_acks_list_node == NULL);
c->bstate.client_waiting_acks_list_node = listFirst(server.clients_waiting_acks);
serverAssert(c->bstate->client_waiting_acks_list_node == NULL);
c->bstate->client_waiting_acks_list_node = listFirst(server.clients_waiting_acks);
blockClient(c, BLOCKED_WAIT);
}

/* Postpone client from executing a command. For example the server might be busy
* requesting to avoid processing clients commands which will be processed later
* when the it is ready to accept them. */
void blockPostponeClient(client *c) {
c->bstate.timeout = 0;
initClientBlockingState(c);
c->bstate->timeout = 0;
blockClient(c, BLOCKED_POSTPONE);
listAddNodeTail(server.postponed_clients, c);
serverAssert(c->bstate.postponed_list_node == NULL);
c->bstate.postponed_list_node = listLast(server.postponed_clients);
serverAssert(c->bstate->postponed_list_node == NULL);
c->bstate->postponed_list_node = listLast(server.postponed_clients);
/* Mark this client to execute its command */
c->flag.pending_command = 1;
}
Expand All @@ -631,13 +646,13 @@ void blockClientShutdown(client *c) {
static void unblockClientOnKey(client *c, robj *key) {
dictEntry *de;

de = dictFind(c->bstate.keys, key);
de = dictFind(c->bstate->keys, key);
releaseBlockedEntry(c, de, 1);

/* Only in case of blocking API calls, we might be blocked on several keys.
however we should force unblock the entire blocking keys */
serverAssert(c->bstate.btype == BLOCKED_STREAM || c->bstate.btype == BLOCKED_LIST ||
c->bstate.btype == BLOCKED_ZSET);
serverAssert(c->bstate->btype == BLOCKED_STREAM || c->bstate->btype == BLOCKED_LIST ||
c->bstate->btype == BLOCKED_ZSET);

/* We need to unblock the client before calling processCommandAndResetClient
* because it checks the CLIENT_BLOCKED flag */
Expand Down Expand Up @@ -698,7 +713,7 @@ static void moduleUnblockClientOnKey(client *c, robj *key) {
* command with timeout reply. */
void unblockClientOnTimeout(client *c) {
/* The client has been unlocked (in the moduleUnblocked list), return ASAP. */
if (c->bstate.btype == BLOCKED_MODULE && isModuleClientUnblocked(c)) return;
if (c->bstate->btype == BLOCKED_MODULE && isModuleClientUnblocked(c)) return;

replyToBlockedClientTimedOut(c);
if (c->flag.pending_command) c->flag.pending_command = 0;
Expand Down
14 changes: 7 additions & 7 deletions src/cluster.c
Original file line number Diff line number Diff line change
Expand Up @@ -1005,7 +1005,7 @@ getNodeByQuery(client *c, struct serverCommand *cmd, robj **argv, int argc, int
/* If CLIENT_MULTI flag is not set EXEC is just going to return an
* error. */
if (!c->flag.multi) return myself;
ms = &c->mstate;
ms = c->mstate;
} else {
/* In order to have a single codepath create a fake Multi State
* structure if the client is not in MULTI/EXEC state, this way
Expand All @@ -1022,7 +1022,7 @@ getNodeByQuery(client *c, struct serverCommand *cmd, robj **argv, int argc, int

/* Only valid for sharded pubsub as regular pubsub can operate on any node and bypasses this layer. */
int pubsubshard_included =
(cmd_flags & CMD_PUBSUB) || (c->cmd->proc == execCommand && (c->mstate.cmd_flags & CMD_PUBSUB));
(cmd_flags & CMD_PUBSUB) || (c->cmd->proc == execCommand && (c->mstate->cmd_flags & CMD_PUBSUB));

/* Check that all the keys are in the same hash slot, and obtain this
* slot and the node associated. */
Expand Down Expand Up @@ -1175,7 +1175,7 @@ getNodeByQuery(client *c, struct serverCommand *cmd, robj **argv, int argc, int
* node is a replica and the request is about a hash slot our primary
* is serving, we can reply without redirection. */
int is_write_command =
(cmd_flags & CMD_WRITE) || (c->cmd->proc == execCommand && (c->mstate.cmd_flags & CMD_WRITE));
(cmd_flags & CMD_WRITE) || (c->cmd->proc == execCommand && (c->mstate->cmd_flags & CMD_WRITE));
if ((c->flag.readonly || pubsubshard_included) && !is_write_command && clusterNodeIsReplica(myself) &&
clusterNodeGetPrimary(myself) == n) {
return myself;
Expand Down Expand Up @@ -1232,14 +1232,14 @@ void clusterRedirectClient(client *c, clusterNode *n, int hashslot, int error_co
* returns 1. Otherwise 0 is returned and no operation is performed. */
int clusterRedirectBlockedClientIfNeeded(client *c) {
clusterNode *myself = getMyClusterNode();
if (c->flag.blocked && (c->bstate.btype == BLOCKED_LIST || c->bstate.btype == BLOCKED_ZSET ||
c->bstate.btype == BLOCKED_STREAM || c->bstate.btype == BLOCKED_MODULE)) {
if (c->flag.blocked && (c->bstate->btype == BLOCKED_LIST || c->bstate->btype == BLOCKED_ZSET ||
c->bstate->btype == BLOCKED_STREAM || c->bstate->btype == BLOCKED_MODULE)) {
dictEntry *de;
dictIterator *di;

/* If the client is blocked on module, but not on a specific key,
* don't unblock it. */
if (c->bstate.btype == BLOCKED_MODULE && !moduleClientIsBlockedOnKeys(c)) return 0;
if (c->bstate->btype == BLOCKED_MODULE && !moduleClientIsBlockedOnKeys(c)) return 0;

/* If the cluster is down, unblock the client with the right error.
* If the cluster is configured to allow reads on cluster down, we
Expand All @@ -1251,7 +1251,7 @@ int clusterRedirectBlockedClientIfNeeded(client *c) {
}

/* All keys must belong to the same slot, so check first key only. */
di = dictGetIterator(c->bstate.keys);
di = dictGetIterator(c->bstate->keys);
if ((de = dictNext(di)) != NULL) {
robj *key = dictGetKey(de);
int slot = keyHashSlot((char *)key->ptr, sdslen(key->ptr));
Expand Down
Loading

0 comments on commit 618c9eb

Please sign in to comment.