From e178d615f7a3708abcca87a1f2ffacec37e82c21 Mon Sep 17 00:00:00 2001 From: Myth Date: Tue, 5 Mar 2024 09:34:14 +0800 Subject: [PATCH] Read sentinel code --- src/sentinel.c | 23 ++++++++++++++++++----- 1 file changed, 18 insertions(+), 5 deletions(-) diff --git a/src/sentinel.c b/src/sentinel.c index 326def135..c3ca2e360 100644 --- a/src/sentinel.c +++ b/src/sentinel.c @@ -156,6 +156,7 @@ typedef struct instanceLink { int refcount; /* Number of sentinelRedisInstance owners. */ int disconnected; /* Non-zero if we need to reconnect cc or pc. */ int pending_commands; /* Number of commands sent waiting for a reply. */ + // 建立两个异步连接 redisAsyncContext *cc; /* Hiredis context for commands. */ redisAsyncContext *pc; /* Hiredis context for Pub / Sub. */ mstime_t cc_conn_time; /* cc connection time. */ @@ -251,7 +252,7 @@ typedef struct sentinelRedisInstance { sds info; /* cached INFO output */ } sentinelRedisInstance; -/* Main state. */ +/* Main state. 全局变量 */ struct sentinelState { char myid[CONFIG_RUN_ID_SIZE+1]; /* This sentinel ID. */ uint64_t current_epoch; /* Current epoch. */ @@ -2419,7 +2420,7 @@ void sentinelReconnectInstance(sentinelRedisInstance *ri) { ri->addr = tryResolveAddr; } } - + // 建立异步连接 link->cc = redisAsyncConnectBind(ri->addr->ip,ri->addr->port,server.bind_source_addr); if (link->cc && !link->cc->err) anetCloexec(link->cc->c.fd); @@ -2680,7 +2681,7 @@ void sentinelRefreshInstanceInfo(sentinelRedisInstance *ri, const char *info) { (ri->master->flags & SRI_FAILOVER_IN_PROGRESS) && (ri->master->failover_state == SENTINEL_FAILOVER_STATE_WAIT_PROMOTION)) - { + { // 这个地方是发送 slaveof no one 后,通过 INFO 检查到变成主了 /* Now that we are sure the slave was reconfigured as a master * set the master configuration epoch to the epoch we won the * election to perform this failover. This will force the other @@ -3159,6 +3160,7 @@ void sentinelSendPeriodicCommands(sentinelRedisInstance *ri) { (ri->info_refresh == 0 || (now - ri->info_refresh) > info_period)) { + // 异步发送 info 命令 retval = redisAsyncCommand(ri->link->cc, sentinelInfoReplyCallback, ri, "%s", sentinelInstanceMapCommand(ri,"INFO")); @@ -4601,6 +4603,7 @@ void sentinelReceiveIsMasterDownReply(redisAsyncContext *c, void *reply, void *p } } +// 向监控该 master 的其他 Sentinel 发送 SENTINEL IS-MASTER-DOWN-BY-ADDR 命令,该函数中没有 check response /* If we think the master is down, we start sending * SENTINEL IS-MASTER-DOWN-BY-ADDR requests to other sentinels * in order to get the replies that allow to reach the quorum @@ -4610,6 +4613,7 @@ void sentinelAskMasterStateToOtherSentinels(sentinelRedisInstance *master, int f dictIterator *di; dictEntry *de; + // 遍历监控这个 master 上面的其他的 Sentinel di = dictGetIterator(master->sentinels); while((de = dictNext(di)) != NULL) { sentinelRedisInstance *ri = dictGetVal(de); @@ -4720,6 +4724,7 @@ int sentinelLeaderIncr(dict *counters, char *runid) { * To be a leader for a given epoch, we should have the majority of * the Sentinels we know (ever seen since the last SENTINEL RESET) that * reported the same instance as leader for the same epoch. */ +// 因为已经询问了其他 sentinel 的 Leader, 查看每个 Sentinel 的 leader runid, 统计自己是否是leader char *sentinelGetLeader(sentinelRedisInstance *master, uint64_t epoch) { dict *counters; dictIterator *di; @@ -4912,7 +4917,7 @@ int sentinelStartFailoverIfNeeded(sentinelRedisInstance *master) { } return 0; } - + // 设置 failover 状态机开始 sentinelStartFailover(master); return 1; } @@ -5051,6 +5056,7 @@ void sentinelFailoverWaitStart(sentinelRedisInstance *ri) { sentinelEvent(LL_WARNING,"+elected-leader",ri,"%@"); if (sentinel.simfailure_flags & SENTINEL_SIMFAILURE_CRASH_AFTER_ELECTION) sentinelSimFailureCrash(); + // 对于 Leader 才继续执行状态机,将状态设置为 SENTINEL_FAILOVER_STATE_SELECT_SLAVE ri->failover_state = SENTINEL_FAILOVER_STATE_SELECT_SLAVE; ri->failover_state_change_time = mstime(); sentinelEvent(LL_WARNING,"+failover-state-select-slave",ri,"%@"); @@ -5246,6 +5252,7 @@ void sentinelFailoverSwitchToPromotedSlave(sentinelRedisInstance *master) { sentinelResetMasterAndChangeAddress(master,ref->addr->hostname,ref->addr->port); } +// failover 状态机 void sentinelFailoverStateMachine(sentinelRedisInstance *ri) { serverAssert(ri->flags & SRI_MASTER); @@ -5255,13 +5262,16 @@ void sentinelFailoverStateMachine(sentinelRedisInstance *ri) { case SENTINEL_FAILOVER_STATE_WAIT_START: sentinelFailoverWaitStart(ri); break; - case SENTINEL_FAILOVER_STATE_SELECT_SLAVE: + case SENTINEL_FAILOVER_STATE_SELECT_SLAVE: // leader sentinel 才会走到这个状态 + // 选择新主 sentinelFailoverSelectSlave(ri); break; case SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE: + // 发送 slaveof no one 给新主 sentinelFailoverSendSlaveOfNoOne(ri); break; case SENTINEL_FAILOVER_STATE_WAIT_PROMOTION: + // 这个函数只检查 timeout,真正检查是否成为主的逻辑在 INFO 的解析中(sentinelRefreshInstanceInfo) sentinelFailoverWaitPromotion(ri); break; case SENTINEL_FAILOVER_STATE_RECONF_SLAVES: @@ -5298,6 +5308,7 @@ void sentinelHandleRedisInstance(sentinelRedisInstance *ri) { /* ========== MONITORING HALF ============ */ /* Every kind of instance */ sentinelReconnectInstance(ri); + // 周期性的发送探测命令 sentinelSendPeriodicCommands(ri); /* ============== ACTING HALF ============= */ @@ -5320,9 +5331,11 @@ void sentinelHandleRedisInstance(sentinelRedisInstance *ri) { /* Only masters */ if (ri->flags & SRI_MASTER) { + // 判断客观下线 sentinelCheckObjectivelyDown(ri); if (sentinelStartFailoverIfNeeded(ri)) sentinelAskMasterStateToOtherSentinels(ri,SENTINEL_ASK_FORCED); + // 开启状态机 sentinelFailoverStateMachine(ri); sentinelAskMasterStateToOtherSentinels(ri,SENTINEL_NO_FLAGS); }