Skip to content

Commit

Permalink
Merge pull request #4 from caipengbo/sentinel-7.2
Browse files Browse the repository at this point in the history
Read sentinel code
  • Loading branch information
caipengbo authored Mar 5, 2024
2 parents c1e7bbc + e178d61 commit 42304f4
Showing 1 changed file with 18 additions and 5 deletions.
23 changes: 18 additions & 5 deletions src/sentinel.c
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ typedef struct instanceLink {
int refcount; /* Number of sentinelRedisInstance owners. */
int disconnected; /* Non-zero if we need to reconnect cc or pc. */
int pending_commands; /* Number of commands sent waiting for a reply. */
// 建立两个异步连接
redisAsyncContext *cc; /* Hiredis context for commands. */
redisAsyncContext *pc; /* Hiredis context for Pub / Sub. */
mstime_t cc_conn_time; /* cc connection time. */
Expand Down Expand Up @@ -251,7 +252,7 @@ typedef struct sentinelRedisInstance {
sds info; /* cached INFO output */
} sentinelRedisInstance;

/* Main state. */
/* Main state. 全局变量 */
struct sentinelState {
char myid[CONFIG_RUN_ID_SIZE+1]; /* This sentinel ID. */
uint64_t current_epoch; /* Current epoch. */
Expand Down Expand Up @@ -2419,7 +2420,7 @@ void sentinelReconnectInstance(sentinelRedisInstance *ri) {
ri->addr = tryResolveAddr;
}
}

// 建立异步连接
link->cc = redisAsyncConnectBind(ri->addr->ip,ri->addr->port,server.bind_source_addr);

if (link->cc && !link->cc->err) anetCloexec(link->cc->c.fd);
Expand Down Expand Up @@ -2680,7 +2681,7 @@ void sentinelRefreshInstanceInfo(sentinelRedisInstance *ri, const char *info) {
(ri->master->flags & SRI_FAILOVER_IN_PROGRESS) &&
(ri->master->failover_state ==
SENTINEL_FAILOVER_STATE_WAIT_PROMOTION))
{
{ // 这个地方是发送 slaveof no one 后,通过 INFO 检查到变成主了
/* Now that we are sure the slave was reconfigured as a master
* set the master configuration epoch to the epoch we won the
* election to perform this failover. This will force the other
Expand Down Expand Up @@ -3159,6 +3160,7 @@ void sentinelSendPeriodicCommands(sentinelRedisInstance *ri) {
(ri->info_refresh == 0 ||
(now - ri->info_refresh) > info_period))
{
// 异步发送 info 命令
retval = redisAsyncCommand(ri->link->cc,
sentinelInfoReplyCallback, ri, "%s",
sentinelInstanceMapCommand(ri,"INFO"));
Expand Down Expand Up @@ -4601,6 +4603,7 @@ void sentinelReceiveIsMasterDownReply(redisAsyncContext *c, void *reply, void *p
}
}

// 向监控该 master 的其他 Sentinel 发送 SENTINEL IS-MASTER-DOWN-BY-ADDR 命令,该函数中没有 check response
/* If we think the master is down, we start sending
* SENTINEL IS-MASTER-DOWN-BY-ADDR requests to other sentinels
* in order to get the replies that allow to reach the quorum
Expand All @@ -4610,6 +4613,7 @@ void sentinelAskMasterStateToOtherSentinels(sentinelRedisInstance *master, int f
dictIterator *di;
dictEntry *de;

// 遍历监控这个 master 上面的其他的 Sentinel
di = dictGetIterator(master->sentinels);
while((de = dictNext(di)) != NULL) {
sentinelRedisInstance *ri = dictGetVal(de);
Expand Down Expand Up @@ -4720,6 +4724,7 @@ int sentinelLeaderIncr(dict *counters, char *runid) {
* To be a leader for a given epoch, we should have the majority of
* the Sentinels we know (ever seen since the last SENTINEL RESET) that
* reported the same instance as leader for the same epoch. */
// 因为已经询问了其他 sentinel 的 Leader, 查看每个 Sentinel 的 leader runid, 统计自己是否是leader
char *sentinelGetLeader(sentinelRedisInstance *master, uint64_t epoch) {
dict *counters;
dictIterator *di;
Expand Down Expand Up @@ -4912,7 +4917,7 @@ int sentinelStartFailoverIfNeeded(sentinelRedisInstance *master) {
}
return 0;
}

// 设置 failover 状态机开始
sentinelStartFailover(master);
return 1;
}
Expand Down Expand Up @@ -5051,6 +5056,7 @@ void sentinelFailoverWaitStart(sentinelRedisInstance *ri) {
sentinelEvent(LL_WARNING,"+elected-leader",ri,"%@");
if (sentinel.simfailure_flags & SENTINEL_SIMFAILURE_CRASH_AFTER_ELECTION)
sentinelSimFailureCrash();
// 对于 Leader 才继续执行状态机,将状态设置为 SENTINEL_FAILOVER_STATE_SELECT_SLAVE
ri->failover_state = SENTINEL_FAILOVER_STATE_SELECT_SLAVE;
ri->failover_state_change_time = mstime();
sentinelEvent(LL_WARNING,"+failover-state-select-slave",ri,"%@");
Expand Down Expand Up @@ -5246,6 +5252,7 @@ void sentinelFailoverSwitchToPromotedSlave(sentinelRedisInstance *master) {
sentinelResetMasterAndChangeAddress(master,ref->addr->hostname,ref->addr->port);
}

// failover 状态机
void sentinelFailoverStateMachine(sentinelRedisInstance *ri) {
serverAssert(ri->flags & SRI_MASTER);

Expand All @@ -5255,13 +5262,16 @@ void sentinelFailoverStateMachine(sentinelRedisInstance *ri) {
case SENTINEL_FAILOVER_STATE_WAIT_START:
sentinelFailoverWaitStart(ri);
break;
case SENTINEL_FAILOVER_STATE_SELECT_SLAVE:
case SENTINEL_FAILOVER_STATE_SELECT_SLAVE: // leader sentinel 才会走到这个状态
// 选择新主
sentinelFailoverSelectSlave(ri);
break;
case SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE:
// 发送 slaveof no one 给新主
sentinelFailoverSendSlaveOfNoOne(ri);
break;
case SENTINEL_FAILOVER_STATE_WAIT_PROMOTION:
// 这个函数只检查 timeout,真正检查是否成为主的逻辑在 INFO 的解析中(sentinelRefreshInstanceInfo)
sentinelFailoverWaitPromotion(ri);
break;
case SENTINEL_FAILOVER_STATE_RECONF_SLAVES:
Expand Down Expand Up @@ -5298,6 +5308,7 @@ void sentinelHandleRedisInstance(sentinelRedisInstance *ri) {
/* ========== MONITORING HALF ============ */
/* Every kind of instance */
sentinelReconnectInstance(ri);
// 周期性的发送探测命令
sentinelSendPeriodicCommands(ri);

/* ============== ACTING HALF ============= */
Expand All @@ -5320,9 +5331,11 @@ void sentinelHandleRedisInstance(sentinelRedisInstance *ri) {

/* Only masters */
if (ri->flags & SRI_MASTER) {
// 判断客观下线
sentinelCheckObjectivelyDown(ri);
if (sentinelStartFailoverIfNeeded(ri))
sentinelAskMasterStateToOtherSentinels(ri,SENTINEL_ASK_FORCED);
// 开启状态机
sentinelFailoverStateMachine(ri);
sentinelAskMasterStateToOtherSentinels(ri,SENTINEL_NO_FLAGS);
}
Expand Down

0 comments on commit 42304f4

Please sign in to comment.