栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

【redis源码分析】Redis Sentinel 是如何实际解决分布式共识问题的

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

【redis源码分析】Redis Sentinel 是如何实际解决分布式共识问题的

文章目录
    • 开题
    • Sentinel 结构
    • Sentinel故障与安全模式:TITL模式
    • 故障转移主逻辑
    • 心跳监测
    • 判断下线 及 投票表决
    • 投票选举 leader哨兵
    • 故障转移状态机

开题

前几篇都偏离了轨道,方知写一个系列一定要先定指导思想。
那几篇过于注重“what”和“why”了,但是我写这个系列的初衷是“how”,怎么做!

所以,本篇将聚焦与以下几个问题:
1、哨兵是如何监视节点的?
2、哨兵是如何选举的?
3、从节点是如何上位的?

对于 分布式一致性算法raft 不了解的可以先了解一下:分布式一致性之raft算法,文中还有关于分布式事务的连接,想了解的也可以了解一下,后文出现专有名词我就不再解释了。


Sentinel 结构

每个 Sentinel 节点都维护一份自己视角下的当前 Sentinel 集群的状态,该状态信息存储在 SentinelState结构体中:

struct sentinelState {
    char myid[CONFIG_RUN_ID_SIZE+1]; 
    uint64_t current_epoch;         // 集群当前任期号,用于故障转移时使用 raft 算法选举 leader 节点
    dict *masters;      
	......
} sentinel;

这里要明确一点,集群节点会挂,哨兵也是会挂的。

sentinelRedisInstance 结构体负责存储Sentinel 集群中主从节点,以及其它 Sentinel 节点的实例数据。

typedef struct sentinelRedisInstance {
    int flags;      // 节点标志,见下文
    char *name;     
    char *runid;    
    uint64_t config_epoch;  
    sentinelAddr *addr; 
    instancelink *link; 
    mstime_t last_pub_time;   
    mstime_t last_hello_time; 
    mstime_t last_master_down_reply_time; 
    mstime_t s_down_since_time; 
    mstime_t o_down_since_time; 
    mstime_t down_after_period; 
    ......
    
    int role_reported;
    mstime_t role_reported_time;
    mstime_t slave_conf_change_time; 

    
    dict *sentinels;    
    dict *slaves;       
    unsigned int quorum;
    int parallel_syncs; 
    char *auth_pass;    
    char *auth_user;    

    
    mstime_t master_link_down_time; 
    int slave_priority; 
    mstime_t slave_reconf_sent_time; 
    struct sentinelRedisInstance *master; 
    char *slave_master_host;    
    int slave_master_port;      
    int slave_master_link_status; 
    unsigned long long slave_repl_offset; 
    
    char *leader;       
    uint64_t leader_epoch; 
    uint64_t failover_epoch; 
    int failover_state; 
    mstime_t failover_state_change_time;
    mstime_t failover_start_time;   
    mstime_t failover_timeout;      
    mstime_t failover_delay_logged; 
    struct sentinelRedisInstance *promoted_slave; 
    ......
} sentinelRedisInstance;


#define SRI_MASTER  (1<<0)
#define SRI_SLAVE   (1<<1)
#define SRI_SENTINEL (1<<2)
#define SRI_S_DOWN (1<<3)   //该节点已主观下线
#define SRI_O_DOWN (1<<4)   //该节点已客观下线
#define SRI_MASTER_DOWN (1<<5) 
#define SRI_FAILOVER_IN_PROGRESS (1<<6) //节点正在进行故障迁移
#define SRI_PROMOTED (1<<7)            //节点被选为前一种的晋升节点

在 Sentinel 集群没有执行故障转移时,集群中所有 sentinel 节点都是平等的。当执行故障转移时,会选出一个leader节点,由leader节点完成故障转移。

Sentinel 利用了频道订阅功能,每个Sentinel节点都订阅了主从节点的一个特定频道,并将自身节点信息发送到该频道,这样每个Sentinel节点自身信息就会被广播给集群其他Sentinel节点。


Sentinel故障与安全模式:TITL模式

Sentinel 机制非常依赖系统时间,举个栗子:基于某个节点上次响应 PING 命令的时间与当前系统时间之差来判断该节点是否下线。如果系统时间被修改或者进程由于繁忙而阻塞,那么Sentinel机制可能出现运行不正常的情况。

为了结局这种情况,Sentinel 机制中定义了TITL 模式。每次执行 sentinelTimer 函数都会检查上次执行该函数的时间与当前系统时间之差,如果出现负数或时间差特别大,则Sentinel进入TITL模式:

1、它不再执行任何操作,如故障转移
2、当其他Sentinel节点询问它对于某个主节点主观下线的判定结果时,它将返回节点未下线的判定结果
3、如果TITL模式下Sentinel机制可以正常运行30秒,则该节点退出TITL模式

故障转移主逻辑
void sentinelHandleDictOfRedisInstances(dict *instances) {
    dictIterator *di;
    dictEntry *de;
    sentinelRedisInstance *switch_to_promoted = NULL;

    
    di = dictGetIterator(instances);
    while((de = dictNext(di)) != NULL) {
        sentinelRedisInstance *ri = dictGetVal(de);

        sentinelHandleRedisInstance(ri);	//调用主逻辑函数
        if (ri->flags & SRI_MASTER) {	
        //如果当前处理的是主节点,还需要递归处理主节点实例下的slaves 和 sentinels
            sentinelHandleDictOfRedisInstances(ri->slaves);
            sentinelHandleDictOfRedisInstances(ri->sentinels);
            if (ri->failover_state == SENTINEL_FAILOVER_STATE_UPDATE_CONFIG) {
                switch_to_promoted = ri;
            }
        }
    }
    //完成故障转移的最后一步
    if (switch_to_promoted)
        sentinelFailoverSwitchToPromotedSlave(switch_to_promoted);
    dictReleaseIterator(di);
}




void sentinelHandleRedisInstance(sentinelRedisInstance *ri) {
    
    
    sentinelReconnectInstance(ri);		//建立网络连接
    sentinelSendPeriodicCommands(ri);	

    
    
    if (sentinel.tilt) {
        if (mstime()-sentinel.tilt_start_time < SENTINEL_TILT_PERIOD) return;
        sentinel.tilt = 0;
        sentinelEvent(LL_WARNING,"-tilt",NULL,"#tilt mode exited");
    }

    
    sentinelCheckSubjectivelyDown(ri);	//检查是否存在主观下线的节点

    
    if (ri->flags & (SRI_MASTER|SRI_SLAVE)) {
        
    }

    
    if (ri->flags & SRI_MASTER) {	//只对主节点执行
        sentinelCheckObjectivelyDown(ri);	//检查是否存在客观下线的节点
        if (sentinelStartFailoverIfNeeded(ri))	//判断是够可以进行故障转移
            sentinelAskMasterStateToOtherSentinels(ri,SENTINEL_ASK_FORCED);	//发送投票请求
   	     	sentinelFailoverStateMachine(ri);	//实现一个故障转移状态机,实现故障转移逻辑
        	sentinelAskMasterStateToOtherSentinels(ri,SENTINEL_NO_FLAGS);	//询问其他 Sentinel 节点对该节点主观下线的判定结果
    }
}

主观下线:我个人认为你下线了
客观下线:超过半数的人都认为你下线了


心跳监测

Sentinel 会定时发送消息给主从节点和其他 Sentinel 节点,看它们还活着不:

void sentinelSendPeriodicCommands(sentinelRedisInstance *ri) {
    mstime_t now = mstime();
    mstime_t info_period, ping_period;
    int retval;

    
    if (ri->link->disconnected) return;

    
    if (ri->link->pending_commands >=
        SENTINEL_MAX_PENDING_COMMANDS * ri->link->refcount) return;

    
    if ((ri->flags & SRI_SLAVE) &&
        ((ri->master->flags & (SRI_O_DOWN|SRI_FAILOVER_IN_PROGRESS)) ||
         (ri->master_link_down_time != 0)))
    {
        info_period = 1000;
    } else {
        info_period = SENTINEL_INFO_PERIOD;
    }

    
    ping_period = ri->down_after_period;
    if (ping_period > SENTINEL_PING_PERIOD) ping_period = SENTINEL_PING_PERIOD;

    
    if ((ri->flags & SRI_SENTINEL) == 0 &&
        (ri->info_refresh == 0 ||
        (now - ri->info_refresh) > info_period))
    {
        retval = redisAsyncCommand(ri->link->cc,
            sentinelInfoReplyCallback, ri, "%s",
            sentinelInstanceMapCommand(ri,"INFO"));
        if (retval == C_OK) ri->link->pending_commands++;
    }

    
    if ((now - ri->link->last_pong_time) > ping_period &&
               (now - ri->link->last_ping_time) > ping_period/2) {
        sentinelSendPing(ri);
    }

    
    if ((now - ri->last_pub_time) > SENTINEL_PUBLISH_PERIOD) {
        sentinelSendHello(ri);
    }
}

判断下线 及 投票表决

首先,这种事情需要我自己先说服我自己,他已经挂了,所以:



void sentinelCheckSubjectivelyDown(sentinelRedisInstance *ri) {
    mstime_t elapsed = 0;

	//计算目标节点上次响应后过去的时间
    if (ri->link->act_ping_time)
        elapsed = mstime() - ri->link->act_ping_time;
    else if (ri->link->disconnected)
        elapsed = mstime() - ri->link->last_avail_time;

    
    if (ri->link->cc &&
        (mstime() - ri->link->cc_conn_time) >
        SENTINEL_MIN_link_RECONNECT_PERIOD &&
        ri->link->act_ping_time != 0 && 
        
        (mstime() - ri->link->act_ping_time) > (ri->down_after_period/2) &&
        (mstime() - ri->link->last_pong_time) > (ri->down_after_period/2))
    {
        instancelinkCloseConnection(ri->link,ri->link->cc);
    }

    
    if (ri->link->pc &&
        (mstime() - ri->link->pc_conn_time) >
         SENTINEL_MIN_link_RECONNECT_PERIOD &&
        (mstime() - ri->link->pc_last_activity) > (SENTINEL_PUBLISH_PERIOD*3))
    {
        instancelinkCloseConnection(ri->link,ri->link->pc);
    }

    
    if (elapsed > ri->down_after_period ||
        (ri->flags & SRI_MASTER &&
         ri->role_reported == SRI_SLAVE &&
         mstime() - ri->role_reported_time >
          (ri->down_after_period+SENTINEL_INFO_PERIOD*2)))
    {
        
        if ((ri->flags & SRI_S_DOWN) == 0) {
            sentinelEvent(LL_WARNING,"+sdown",ri,"%@");
            ri->s_down_since_time = mstime();
            ri->flags |= SRI_S_DOWN;
        }
    } else {
        
        if (ri->flags & SRI_S_DOWN) {
            sentinelEvent(LL_WARNING,"-sdown",ri,"%@");
            ri->flags &= ~(SRI_S_DOWN|SRI_script_KILL_SENT);
        }
    }
}

我说服了自己之后,为了避免决策失误,我便开始问询身边同频的朋友的意见:

//该函数内含选举逻辑
//其他 Sentinel 节点会回复一个标志位,如果为 true,则代表他也认为那个节点下线了
void sentinelAskMasterStateToOtherSentinels(sentinelRedisInstance *master, int flags) {
    dictIterator *di;
    dictEntry *de;

    di = dictGetIterator(master->sentinels);
    while((de = dictNext(di)) != NULL) {
        sentinelRedisInstance *ri = dictGetVal(de);
        mstime_t elapsed = mstime() - ri->last_master_down_reply_time;
        char port[32];
        int retval;

        
        if (elapsed > SENTINEL_ASK_PERIOD*5) {
            ri->flags &= ~SRI_MASTER_DOWN;
            sdsfree(ri->leader);
            ri->leader = NULL;
        }

        
        if ((master->flags & SRI_S_DOWN) == 0) continue;
        if (ri->link->disconnected) continue;
        if (!(flags & SENTINEL_ASK_FORCED) &&
            mstime() - ri->last_master_down_reply_time < SENTINEL_ASK_PERIOD)
            continue;

        
        ll2string(port,sizeof(port),master->addr->port);
        retval = redisAsyncCommand(ri->link->cc,
                    sentinelReceiveIsMasterDownReply, ri,
                    "%s is-master-down-by-addr %s %s %llu %s",
                    sentinelInstanceMapCommand(ri,"SENTINEL"),
                    master->addr->ip, port,
                    sentinel.current_epoch,
                    (master->failover_state > SENTINEL_FAILOVER_STATE_NONE) ?
                    sentinel.myid : "*");
        if (retval == C_OK) ri->link->pending_commands++;
    }
    dictReleaseIterator(di);
}


投票选举 leader哨兵

现在认定他挂了,我们一群监视的要推举一个主事儿的来料理他的后事,由于是我先发现他不对劲儿的,也是我先获取了他最终挂掉的信息,所以我抢先发起了料理后事的请求,其他哨兵只能先给我投票,如果我落选了,他们才有机会发起选举:

1、拉票

void sentinelAskMasterStateToOtherSentinels(sentinelRedisInstance *master, int flags) {
    dictIterator *di;
    dictEntry *de;

    di = dictGetIterator(master->sentinels);
    while((de = dictNext(di)) != NULL) {
        sentinelRedisInstance *ri = dictGetVal(de);
        mstime_t elapsed = mstime() - ri->last_master_down_reply_time;
        char port[32];
        int retval;

        
        if (elapsed > SENTINEL_ASK_PERIOD*5) {
            ri->flags &= ~SRI_MASTER_DOWN;
            sdsfree(ri->leader);
            ri->leader = NULL;
        }

        
        if ((master->flags & SRI_S_DOWN) == 0) continue;
        if (ri->link->disconnected) continue;
        if (!(flags & SENTINEL_ASK_FORCED) &&
            mstime() - ri->last_master_down_reply_time < SENTINEL_ASK_PERIOD)
            continue;

        
        ll2string(port,sizeof(port),master->addr->port);
        retval = redisAsyncCommand(ri->link->cc,
                    sentinelReceiveIsMasterDownReply, ri,
                    "%s is-master-down-by-addr %s %s %llu %s",
                    sentinelInstanceMapCommand(ri,"SENTINEL"),
                    master->addr->ip, port,
                    sentinel.current_epoch,
                    (master->failover_state > SENTINEL_FAILOVER_STATE_NONE) ?
                    sentinel.myid : "*");
        if (retval == C_OK) ri->link->pending_commands++;
    }
    dictReleaseIterator(di);
}

2、投票

char *sentinelVoteLeader(sentinelRedisInstance *master, uint64_t req_epoch, char *req_runid, uint64_t *leader_epoch) {
    if (req_epoch > sentinel.current_epoch) {
        sentinel.current_epoch = req_epoch;
        sentinelFlushConfig();
        sentinelEvent(LL_WARNING,"+new-epoch",master,"%llu",
            (unsigned long long) sentinel.current_epoch);
    }

    if (master->leader_epoch < req_epoch && sentinel.current_epoch <= req_epoch)
    {
        sdsfree(master->leader);
        master->leader = sdsnew(req_runid);
        master->leader_epoch = sentinel.current_epoch;
        sentinelFlushConfig();
        sentinelEvent(LL_WARNING,"+vote-for-leader",master,"%s %llu",
            master->leader, (unsigned long long) master->leader_epoch);
        
        if (strcasecmp(master->leader,sentinel.myid))
            master->failover_start_time = mstime()+rand()%SENTINEL_MAX_DESYNC;
    }

    *leader_epoch = master->leader_epoch;
    return master->leader ? sdsnew(master->leader) : NULL;
}

3、确定干活

int sentinelStartFailoverIfNeeded(sentinelRedisInstance *master) {
    
    if (!(master->flags & SRI_O_DOWN)) return 0;

    
    if (master->flags & SRI_FAILOVER_IN_PROGRESS) return 0;

    
    //failover_start_time 可以理解为一个锁,直到上次故障转移的时间超过failover_start_time 的两倍,才可以开始新的故障转移,默认三分钟。
    if (mstime() - master->failover_start_time <
        master->failover_timeout*2)
    {
        if (master->failover_delay_logged != master->failover_start_time) {
            time_t clock = (master->failover_start_time +
                            master->failover_timeout*2) / 1000;
            char ctimebuf[26];

            ctime_r(&clock,ctimebuf);
            ctimebuf[24] = ''; 
            master->failover_delay_logged = master->failover_start_time;
            serverLog(LL_WARNING,
                "Next failover delay: I will not start a failover before %s",
                ctimebuf);
        }
        return 0;
    }

    sentinelStartFailover(master);
    return 1;
}

4、主持大局

void sentinelStartFailover(sentinelRedisInstance *master) {
    serverAssert(master->flags & SRI_MASTER);

    master->failover_state = SENTINEL_FAILOVER_STATE_WAIT_START;
    master->flags |= SRI_FAILOVER_IN_PROGRESS;
    master->failover_epoch = ++sentinel.current_epoch;
    sentinelEvent(LL_WARNING,"+new-epoch",master,"%llu",
        (unsigned long long) sentinel.current_epoch);
    sentinelEvent(LL_WARNING,"+try-failover",master,"%@");
    master->failover_start_time = mstime()+rand()%SENTINEL_MAX_DESYNC;
    master->failover_state_change_time = mstime();
}

故障转移状态机

现在由我来主持大局,完成这个故障转移工作。
那我是不是得有个指导,或者说执行步骤啊?不然我怎么开展工作呢?

void sentinelFailoverStateMachine(sentinelRedisInstance *ri) {
    serverAssert(ri->flags & SRI_MASTER);

    if (!(ri->flags & SRI_FAILOVER_IN_PROGRESS)) return;

    switch(ri->failover_state) {
        case SENTINEL_FAILOVER_STATE_WAIT_START:
            sentinelFailoverWaitStart(ri);	//统计投票结果
            break;
        case SENTINEL_FAILOVER_STATE_SELECT_SLAVE:
            sentinelFailoverSelectSlave(ri);//选择从节点
            break;
        case SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE:
            sentinelFailoverSendSlaveOfNoOne(ri);//取消该节点之前的主从关系,晋升成为主节点
            break;
        case SENTINEL_FAILOVER_STATE_WAIT_PROMOTION:
            sentinelFailoverWaitPromotion(ri);	//等待晋升完成
            break;
        case SENTINEL_FAILOVER_STATE_RECONF_SLAVES:
            sentinelFailoverReconfNextSlave(ri);//建立主从关系
            break;
    }
}

之后会在前面的主逻辑触发函数中更新视图数据和配置文件。

这里面这五个函数就不意义展开了吧。。。


转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/686487.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号