栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

zookeeper Zab协议——Follower和Leader状态同步

zookeeper Zab协议——Follower和Leader状态同步

目录

文章目录

2021SC@SDUSC

一、Zab协议是什么?

二、Follower和Leader状态同步过程

三、源码解析

1.Leader.leader()

2.LearnerHandler.run()

3、Follower.followLeader()

2021SC@SDUSC

一、Zab协议是什么?

ZAB协议包括两种基本模式:崩溃恢复和消息广播。

当整个服务框架在启动过程中,或是Leader服务器出现崩溃、网络中断、重启或者集群中不存在过半的服务器与Leader服务器保持正常通信等异常情况时,ZAB就会进入崩溃恢复模式并选举产生新的Leader服务器。

当选举产生了新的Leader服务器,同时集群中已经有过半的机器与该Leader服务器完成了状态同步(数据同步)之后,ZAB协议就会退出恢复模式。

之后就进入了消息广播模式。如果此时一个同样遵循ZAB协议的加入到集群时,那么如果此时已经存在一个Leader在负责消息广播,那么该服务器就会自觉进入数据恢复模式,在完成之后加入到消息广播流程中。

ZooKeeper设计成只允许唯一的一个Leader服务器来进行事物请求的处理。Leader服务器在接收到客户端的事物请求之后,会生成对应提案并发起一轮广播;如果是其他非Leader机器接收到客户端的事物请求,那么它会将这个事物请求转发给Leader服务器。

二、Follower和Leader状态同步过程

        当选举结束后,每个节点都需要根据自己的角色更新自己的状态,选举出的Leader更新自己状态为Leader,其他节点更新自己状态为Follower。

Leader更新状态入口:leader.lead()       

Follower更新状态入口:follower.followerLeader()

(1)follower 必须要让leader知道自己的状态:epoch、zxid、sid,必须要找出谁是leader,发送请求连接leader,并发送自己的信息给leader;leader接收到信息后,必须要返回对应的信息给follower。

(2)当leader知道follower状态之后,就确定需要做何种方式的数据同步DIFF,TRUNC,SNAP。

(3)执行数据同步。

(4)当leader接收到超过半数follower的ack之后,进入正常工作状态,集群启动完成。

三、源码解析

1.Leader.leader()

[1]、启动等待来自新跟随者的连接请求的线程。

cnxAcceptor = new LearnerCnxAcceptor();

cnxAcceptor.start();

[2]、执行LearnerCnxAcceptor.run()

//等待接收follower的状态同步申请
s = ss.accept();

//启动线程
LearnerHandler fh = new LearnerHandler(s, is, Leader.this);

fh.start();

[3]、执行LearnerHandler.run()

2.LearnerHandler.run()

[1]、从网络中接收Follower的信息

ia.readRecord(qp, "packet");

[2]、Leader根据从Follower获取sid和旧的epoch,构建新的epoch

long newEpoch = leader.getEpochToPropose(this.getSid(), lastAcceptedEpoch);

[3]、Leader向Follower发送信息(包含zxid和newEpoch)

oa.writeRecord(newEpochPacket, "packet");

[4]、接收到Follower应答的ackepoch

QuorumPacket ackEpochPacket = new QuorumPacket();
ia.readRecord(ackEpochPacket, "packet");

[5]、判断Leader和Follower是否要同步

boolean needSnap = syncFollower(peerLastZxid, leader.zk.getZKDatabase(), leader);
oa.writeRecord(new QuorumPacket(Leader.SNAP, zxidToSend, null, null), "packet");

[6]、接收Follower的ack

qp = new QuorumPacket();
ia.readRecord(qp, "packet");

[7]、uptodate

queuedPackets.add(new QuorumPacket(Leader.UPTODATE, -1, null, null));

3、Follower.followLeader()

[1]、查找leader

QuorumServer leaderServer = findLeader();   

[2]、连接leader

connectToLeader(leaderServer.addr, leaderServer.hostname);

[3]、向leader注册

long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO);

[4]、读取leader返回的结果:leaderinfo

readPacket(qp);

[5]、发送ackepoch给leader(包含自己的epoch和zxid)

writePacket(ackNewEpoch,true)

[6]、发送ack给leader

case Leader.COMMIT:
    fzk.commit(qp.getZxid());
Request request = fzk.pendingTxns.element();
commitProcessor.commit(request)

[7]、uptodate

processPacket()

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/307796.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号