目录
七、Zookeeper技术内幕
7.1系统模型
7.1.1数据模型
7.1.2 节点类型
7.1.3 Znode的版本信息——保证分布式数据操作的原子性
7.1.4 Watcher——数据变更通知
7.1.5 ACL访问控制
7.2序列化协议
7.3 客户端
7.4 会话
7.4.1 会话状态转换
7.4.2 会话创建
七、Zookeeper技术内幕
ZK最重要的一点就是利用自己的数据模型实现分布式系统的一致性
7.1系统模型
7.1.1数据模型
Znode数据节点是最小单元,由Znode构成了树。
对于每一个事务请求,Zookeeper都会为其分配一个全局的事务ID,用ZXID来表示,每个ZXID对应一次更新操作,ZXID用来保证全局顺序操作
7.1.2 节点类型
持久节点(PERSISITENT),临时节点(EPHEMERAL)和顺序节点(SEQUENTIAL),具体创建过程中,会形成
持久节点,持久顺序节点,临时节点,临时顺序节点四类,其中临时节点只能作为叶子节点,且与会话绑定,而非TCP连接
7.1.3 Znode的版本信息——保证分布式数据操作的原子性
Znode的版本信息用来保证分布式数据的原子性操作,每个数据节点有三个版本信息
version表示当前节点自从创建之后被更新的次数,即使数据一样,只要版本发生变化,version就会改变,例如当一个Znode被创建后,其version为0,同样,这样的改变也会存在所谓ABA问题,所以这里的版本并不是指Znode的本身的内容是否变化,而是是否有用户对节点内容进行变更。
乐观锁控制事务分成如下三个阶段:数据读取,写入校验,和数据写入
Znode在进行setDataRequest时会进行版本比较,客户端可以使用CAS,也可以不使用CAS,如果使用但版本不一致,会抛出BadVersionException异常
7.1.4 Watcher——数据变更通知
客户端向服务端注册Watcher监听,当服务端一些指定的事件触发了Watcher,那么就会向指定客户端发送一个事件通知来实现分布式通知功能
Client向zookeeper注册Watcher时,会将Watcher信息存储在WatchManager中,当服务端出发Watcher事件之后,会向客户端发送通知,客户端线程从WatchManager中取出Watcher执行相应的回调逻辑。
Watcher接口内定义了两个枚举:KeeperState和EventType和一个方法process(WatchedEvent event)
本质上,还是在服务端维护了一个Map
总结Watcher特性:
一次性:一旦一个Watcher被触发,就会从Set
轻量:WatchedEvent是整个Zookeeper做Watcher的最小通知单元,且只包含三个成员变量,也就是说process回调只会告诉客户端发生了事件,而不会说明事件的具体内容,对于变更前后的数据都需要客户端自己去获取。从而做到轻量级的通知机制。
7.1.5 ACL访问控制
UGO(user,group,others)转为ACL(Scheme:id:permission)
7.2序列化协议
Zk采用jute序列化组件
7.3 客户端
核心部件:
Zookeeper实例:ClientWatcherManager客户端WatcherManager
HostProvider:服务器地址管理器
ClientCnxn:客户端核心线程,其中又包含SendThread(用于建立TCP通信)和EventThread(事件处理线程),ZK客户端创建一次会话的过程:
7.4 会话
7.4.1 会话状态转换
图片来源:[ZooKeeper]ZooKeeper的会话状态_zjysource的专栏-CSDN博客_zookeeper会话状态
会话状态CONNECTING,CONNECTED,RECONNECTING,RECONNECTED,CLOSE。7.4.2 会话创建
(1)Session客户端会话实体
interface Session {
// 用sessionId唯一标识一个会话
long getSessionId();
// 超时时间
int getTimeout();
// 是否已经关闭
boolean isClosing();
}
sessionId生成规则,左移24位是为了把二进制日期前的0都移除,之后无符号右移8位,把高8位给id腾出位置,之后把id按位与在64位时间的高8位,最终得一个根据时间唯一确定的sessionId。(每次一有时间生成id我就想到时钟回拨。。)
public static long initializeNextSessionId(long id) {
long nextSid;
nextSid = (Time.currentElapsedTime() << 24) >>> 8;
nextSid = nextSid | (id << 56);
if (nextSid == EphemeralType.CONTAINER_EPHEMERAL_OWNER) {
++nextSid; // this is an unlikely edge case, but check it just in case
}
return nextSid;
}
最后,为了管理session,ZK提供了SessionTracker接口进行管理。
// sessionId和session的映射 protected final ConcurrentHashMapsessionsById = new ConcurrentHashMap (); // session和session超时时间的映射 protected final ConcurrentMap sessionsWithTimeout;
7.4.3 会话管理
ZK的会话管理主要是由SessionTracker负责,采用“分桶策略” (不重要)
7.6 Leader选举
一台服务器在ZK集群选举中可能扮演的角色:Leader,Observer,Follower。一台服务器在整个选举的过程中可能存在的状态包括:Looking,Following,Leading,Observing
7.6.1选举概述
1、初始化集群时的Leader选择
ZK集群模式至少是2台服务器起,集群模式下需要在zoo.cfg文件的dataDir路径下,为当前机器创建myid,用来唯一标识集群中的这台ZK。这里以3台服务器为例,三台机器的myid分别为myid1,myid2和myid3。集群开始启动,当集群中仅有ZK1时,无法进行Leader选举,此时集群无Leader,当ZK2开始启动,并且与ZK1建立通信之后,集群可以开始选举Leader,进入Leader选举流程。
(1)每个Server发出投票,以(myid,ZXID)的形式进行投票,因为是初始化阶段,每台服务器都投给自己,ZXID为0,所以ZK1投票(1,0),ZK2投票(2,0)并将投票结果发给集群内的其他ZK;
(2)每台服务器接收投票,并验证投票可靠性(是否本轮,是否来自Looking状态的服务器)
(3)处理投票,每台服务器拿收到的投票和自己的投票比,比较ZXID,哪个版本高选择那个票当最终投票,如果ZXID相同,就比较myid,哪个大就选择哪个票当最终投票。所以对于当前投票,ZK1在收到ZK2的(2,0)投票之后,就把自己之前投票(1,0)改为(2,0),ZK2在收到ZK1投票之后,把自己原投票作为最终票,ZK1,ZK2再次向集群中所有ZK发送自己的最终投票结果。
(4)投票统计
投票之后,ZK1和ZK2均收到(2,0)投票,由于集群内节点数量为3,ZK1和ZK2均拿到两票(2,0),此时ZK2的投票结果已经大于半数(2/n+1,n=3),所以ZK1和ZK2均认为已经选出了Leader。
(5)更改服务器状态
在投票阶段,服务器状态均为Looking,一旦确定了Leader,服务器就会更改自己的状态,如果是Follower,则将自己的状态改为Following,如果是Leader,则将自己的状态改为Leading,如果是Observer则将自己的状态改为Observing。
2、服务器运行期间的Leader选举
当Zookeeper已经选举完Leader并正常运行后,非Leader节点的上下线并不会影响集群的Leader节点。但是一旦Leader节点挂了,那么集群将无法对外提供服务,集群将进入新一轮的Leader选举。
(1)变更状态
当Leader挂了之后,所有的非Observer节点会将自己的节点变更为Looking
(2)每个Server发起投票
同样生成投票(myid,ZXID),由于这是在运行期间,各个节点的ZXID可能不同,和初始化时同理,ZK1生成投票(1,122),ZK2生成投票(2,123)并将自己的投票进行广播
(3)各个ZK接受投票结果
(4)投票统计,同样是先比较ZXID,再比较myid,因此各个节点投票统一(2,123)
(5)更改各个服务器状态,Leader节点改为Leading,Follower节点改为Following,如果是Observer则将自己的状态改为Observing。
总结:ZXID越大越容易成为Leader,ZXID相同,myid越大成为Leader的概率越大
7.6.2 选举算法
老版本的ZK提供了三种选举算法,不过目前ZK仅仅保留了TCP版本的FastLeaderElection封装在类FastLeaderElection中
7.6.3 具体实现细节
下面看看源码:
先看投票
public class Vote {
// 当前ZK的myid
private final long id;
// 当前ZK的事务ID ZXID
private final long zxid;
// 逻辑时钟,没赋值则默认为-1,每开始一轮投票+1,确保各个ZK节点收到的投票均在同一轮投票中
private final long electionEpoch;
// 被选举的Leader的epoch版本号
private final long peerEpoch;
// 当前ZK的状态,enum:LOOKING,FOLLOWING,LEADING,OBSERVING
private final ServerState state;
}
之后就是投票算法实现:类FastLeaderElection,源码很长,看几个关键的方法。
public Vote lookForLeader() throws InterruptedException {
try {
self.jmxLeaderElectionBean = new LeaderElectionBean();
MBeanRegistry.getInstance().register(self.jmxLeaderElectionBean, self.jmxLocalPeerBean);
} catch (Exception e) {
LOG.warn("Failed to register with JMX", e);
self.jmxLeaderElectionBean = null;
}
self.start_fle = Time.currentElapsedTime();
try {
Map recvset = new HashMap();
Map outofelection = new HashMap();
int notTimeout = minNotificationInterval;
synchronized (this) {
logicalclock.incrementAndGet();
updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
}
LOG.info(
"New election. My id = {}, proposed zxid=0x{}",
self.getId(),
Long.toHexString(proposedZxid));
sendNotifications();
SyncedLearnerTracker voteSet = null;
while ((self.getPeerState() == ServerState.LOOKING) && (!stop)) {
Notification n = recvqueue.poll(notTimeout, TimeUnit.MILLISECONDS);
if (n == null) {
if (manager.haveDelivered()) {
sendNotifications();
} else {
manager.connectAll();
}
notTimeout = Math.min(notTimeout << 1, maxNotificationInterval);
if (self.getQuorumVerifier() instanceof QuorumOracleMaj
&& self.getQuorumVerifier().revalidateVoteset(voteSet, notTimeout != minNotificationInterval)) {
setPeerState(proposedLeader, voteSet);
Vote endVote = new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch);
leaveInstance(endVote);
return endVote;
}
LOG.info("Notification time out: {} ms", notTimeout);
} else if (validVoter(n.sid) && validVoter(n.leader)) {
switch (n.state) {
case LOOKING:
if (getInitLastLoggedZxid() == -1) {
LOG.debug("Ignoring notification as our zxid is -1");
break;
}
if (n.zxid == -1) {
LOG.debug("Ignoring notification from member with -1 zxid {}", n.sid);
break;
}
// If notification > current, replace and send messages out
if (n.electionEpoch > logicalclock.get()) {
logicalclock.set(n.electionEpoch);
recvset.clear();
if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
updateProposal(n.leader, n.zxid, n.peerEpoch);
} else {
updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
}
sendNotifications();
} else if (n.electionEpoch < logicalclock.get()) {
LOG.debug(
"Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x{}, logicalclock=0x{}",
Long.toHexString(n.electionEpoch),
Long.toHexString(logicalclock.get()));
break;
} else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) {
updateProposal(n.leader, n.zxid, n.peerEpoch);
sendNotifications();
}
LOG.debug(
"Adding vote: from={}, proposed leader={}, proposed zxid=0x{}, proposed election epoch=0x{}",
n.sid,
n.leader,
Long.toHexString(n.zxid),
Long.toHexString(n.electionEpoch));
// don't care about the version if it's in LOOKING state
recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
voteSet = getVoteTracker(recvset, new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch));
if (voteSet.hasAllQuorums()) {
// Verify if there is any change in the proposed leader
while ((n = recvqueue.poll(finalizeWait, TimeUnit.MILLISECONDS)) != null) {
if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) {
recvqueue.put(n);
break;
}
}
if (n == null) {
setPeerState(proposedLeader, voteSet);
Vote endVote = new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch);
leaveInstance(endVote);
return endVote;
}
}
break;
case OBSERVING:
LOG.debug("Notification from observer: {}", n.sid);
break;
case FOLLOWING:
Vote resultFN = receivedFollowingNotification(recvset, outofelection, voteSet, n);
if (resultFN == null) {
break;
} else {
return resultFN;
}
case LEADING:
Vote resultLN = receivedLeadingNotification(recvset, outofelection, voteSet, n);
if (resultLN == null) {
break;
} else {
return resultLN;
}
default:
LOG.warn("Notification state unrecognized: {} (n.state), {}(n.sid)", n.state, n.sid);
break;
}
} else {
if (!validVoter(n.leader)) {
LOG.warn("Ignoring notification for non-cluster member sid {} from sid {}", n.leader, n.sid);
}
if (!validVoter(n.sid)) {
LOG.warn("Ignoring notification for sid {} from non-quorum member sid {}", n.leader, n.sid);
}
}
}
return null;
} finally {
try {
if (self.jmxLeaderElectionBean != null) {
MBeanRegistry.getInstance().unregister(self.jmxLeaderElectionBean);
}
} catch (Exception e) {
LOG.warn("Failed to unregister with JMX", e);
}
self.jmxLeaderElectionBean = null;
LOG.debug("Number of connection processing threads: {}", manager.getConnectionThreadCount());
}
}



