栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

细读经典第一期——从Paxos到Zookeeper 分布式一致性原理与实践(4)

细读经典第一期——从Paxos到Zookeeper 分布式一致性原理与实践(4)

目录

七、Zookeeper技术内幕

7.1系统模型

7.1.1数据模型

7.1.2 节点类型

7.1.3 Znode的版本信息——保证分布式数据操作的原子性

7.1.4 Watcher——数据变更通知

7.1.5 ACL访问控制

7.2序列化协议

7.3 客户端

7.4 会话

7.4.1 会话状态转换

7.4.2 会话创建


七、Zookeeper技术内幕

ZK最重要的一点就是利用自己的数据模型实现分布式系统的一致性

7.1系统模型

7.1.1数据模型

Znode数据节点是最小单元,由Znode构成了树。

对于每一个事务请求,Zookeeper都会为其分配一个全局的事务ID,用ZXID来表示,每个ZXID对应一次更新操作,ZXID用来保证全局顺序操作

7.1.2 节点类型

持久节点(PERSISITENT),临时节点(EPHEMERAL)和顺序节点(SEQUENTIAL),具体创建过程中,会形成

持久节点,持久顺序节点,临时节点,临时顺序节点四类,其中临时节点只能作为叶子节点,且与会话绑定,而非TCP连接

7.1.3 Znode的版本信息——保证分布式数据操作的原子性

Znode的版本信息用来保证分布式数据的原子性操作,每个数据节点有三个版本信息

version表示当前节点自从创建之后被更新的次数,即使数据一样,只要版本发生变化,version就会改变,例如当一个Znode被创建后,其version为0,同样,这样的改变也会存在所谓ABA问题,所以这里的版本并不是指Znode的本身的内容是否变化,而是是否有用户对节点内容进行变更。

乐观锁控制事务分成如下三个阶段:数据读取,写入校验,和数据写入

Znode在进行setDataRequest时会进行版本比较,客户端可以使用CAS,也可以不使用CAS,如果使用但版本不一致,会抛出BadVersionException异常

7.1.4 Watcher——数据变更通知

客户端向服务端注册Watcher监听,当服务端一些指定的事件触发了Watcher,那么就会向指定客户端发送一个事件通知来实现分布式通知功能

 Client向zookeeper注册Watcher时,会将Watcher信息存储在WatchManager中,当服务端出发Watcher事件之后,会向客户端发送通知,客户端线程从WatchManager中取出Watcher执行相应的回调逻辑。

Watcher接口内定义了两个枚举:KeeperState和EventType和一个方法process(WatchedEvent event)

本质上,还是在服务端维护了一个Map>,将节点路径和该节点路径的Set集合进行映射。

总结Watcher特性:

一次性:一旦一个Watcher被触发,就会从Set中移除该Watcher,因此在使用Watcher时,要确定是否需要在Watcher触发后再次注册Watcher。
轻量:WatchedEvent是整个Zookeeper做Watcher的最小通知单元,且只包含三个成员变量,也就是说process回调只会告诉客户端发生了事件,而不会说明事件的具体内容,对于变更前后的数据都需要客户端自己去获取。从而做到轻量级的通知机制。

7.1.5 ACL访问控制

UGO(user,group,others)转为ACL(Scheme:id:permission)

7.2序列化协议

Zk采用jute序列化组件

7.3 客户端

核心部件:

Zookeeper实例:ClientWatcherManager客户端WatcherManager

HostProvider:服务器地址管理器

ClientCnxn:客户端核心线程,其中又包含SendThread(用于建立TCP通信)和EventThread(事件处理线程),ZK客户端创建一次会话的过程:

7.4 会话

7.4.1 会话状态转换

 图片来源:[ZooKeeper]ZooKeeper的会话状态_zjysource的专栏-CSDN博客_zookeeper会话状态

会话状态CONNECTING,CONNECTED,RECONNECTING,RECONNECTED,CLOSE。

7.4.2 会话创建

(1)Session客户端会话实体

    interface Session {
        // 用sessionId唯一标识一个会话
        long getSessionId();
        // 超时时间
        int getTimeout();
        // 是否已经关闭
        boolean isClosing();
    }

sessionId生成规则,左移24位是为了把二进制日期前的0都移除,之后无符号右移8位,把高8位给id腾出位置,之后把id按位与在64位时间的高8位,最终得一个根据时间唯一确定的sessionId。(每次一有时间生成id我就想到时钟回拨。。)

    public static long initializeNextSessionId(long id) {
        long nextSid;
        nextSid = (Time.currentElapsedTime() << 24) >>> 8;
        nextSid = nextSid | (id << 56);
        if (nextSid == EphemeralType.CONTAINER_EPHEMERAL_OWNER) {
            ++nextSid;  // this is an unlikely edge case, but check it just in case
        }
        return nextSid;
    }

 最后,为了管理session,ZK提供了SessionTracker接口进行管理。

// sessionId和session的映射
protected final ConcurrentHashMap sessionsById = new ConcurrentHashMap();
// session和session超时时间的映射
protected final ConcurrentMap sessionsWithTimeout;

7.4.3 会话管理

ZK的会话管理主要是由SessionTracker负责,采用“分桶策略” (不重要)

7.6 Leader选举

一台服务器在ZK集群选举中可能扮演的角色:Leader,Observer,Follower。一台服务器在整个选举的过程中可能存在的状态包括:Looking,Following,Leading,Observing

7.6.1选举概述

1、初始化集群时的Leader选择

ZK集群模式至少是2台服务器起,集群模式下需要在zoo.cfg文件的dataDir路径下,为当前机器创建myid,用来唯一标识集群中的这台ZK。这里以3台服务器为例,三台机器的myid分别为myid1,myid2和myid3。集群开始启动,当集群中仅有ZK1时,无法进行Leader选举,此时集群无Leader,当ZK2开始启动,并且与ZK1建立通信之后,集群可以开始选举Leader,进入Leader选举流程。

(1)每个Server发出投票,以(myid,ZXID)的形式进行投票,因为是初始化阶段,每台服务器都投给自己,ZXID为0,所以ZK1投票(1,0),ZK2投票(2,0)并将投票结果发给集群内的其他ZK;

(2)每台服务器接收投票,并验证投票可靠性(是否本轮,是否来自Looking状态的服务器)

(3)处理投票,每台服务器拿收到的投票和自己的投票比,比较ZXID,哪个版本高选择那个票当最终投票,如果ZXID相同,就比较myid,哪个大就选择哪个票当最终投票。所以对于当前投票,ZK1在收到ZK2的(2,0)投票之后,就把自己之前投票(1,0)改为(2,0),ZK2在收到ZK1投票之后,把自己原投票作为最终票,ZK1,ZK2再次向集群中所有ZK发送自己的最终投票结果。

(4)投票统计

投票之后,ZK1和ZK2均收到(2,0)投票,由于集群内节点数量为3,ZK1和ZK2均拿到两票(2,0),此时ZK2的投票结果已经大于半数(2/n+1,n=3),所以ZK1和ZK2均认为已经选出了Leader。

(5)更改服务器状态

在投票阶段,服务器状态均为Looking,一旦确定了Leader,服务器就会更改自己的状态,如果是Follower,则将自己的状态改为Following,如果是Leader,则将自己的状态改为Leading,如果是Observer则将自己的状态改为Observing。

2、服务器运行期间的Leader选举

当Zookeeper已经选举完Leader并正常运行后,非Leader节点的上下线并不会影响集群的Leader节点。但是一旦Leader节点挂了,那么集群将无法对外提供服务,集群将进入新一轮的Leader选举。

(1)变更状态

当Leader挂了之后,所有的非Observer节点会将自己的节点变更为Looking

(2)每个Server发起投票

同样生成投票(myid,ZXID),由于这是在运行期间,各个节点的ZXID可能不同,和初始化时同理,ZK1生成投票(1,122),ZK2生成投票(2,123)并将自己的投票进行广播

(3)各个ZK接受投票结果

(4)投票统计,同样是先比较ZXID,再比较myid,因此各个节点投票统一(2,123)

(5)更改各个服务器状态,Leader节点改为Leading,Follower节点改为Following,如果是Observer则将自己的状态改为Observing。

总结:ZXID越大越容易成为Leader,ZXID相同,myid越大成为Leader的概率越大

7.6.2 选举算法

老版本的ZK提供了三种选举算法,不过目前ZK仅仅保留了TCP版本的FastLeaderElection封装在类FastLeaderElection中

7.6.3 具体实现细节

下面看看源码:

先看投票

public class Vote {

    // 当前ZK的myid
    private final long id;
    // 当前ZK的事务ID ZXID
    private final long zxid;
    // 逻辑时钟,没赋值则默认为-1,每开始一轮投票+1,确保各个ZK节点收到的投票均在同一轮投票中
    private final long electionEpoch;
    // 被选举的Leader的epoch版本号
    private final long peerEpoch;
    // 当前ZK的状态,enum:LOOKING,FOLLOWING,LEADING,OBSERVING
    private final ServerState state;
}

之后就是投票算法实现:类FastLeaderElection,源码很长,看几个关键的方法。

    
    public Vote lookForLeader() throws InterruptedException {
        try {
            self.jmxLeaderElectionBean = new LeaderElectionBean();
            MBeanRegistry.getInstance().register(self.jmxLeaderElectionBean, self.jmxLocalPeerBean);
        } catch (Exception e) {
            LOG.warn("Failed to register with JMX", e);
            self.jmxLeaderElectionBean = null;
        }

        self.start_fle = Time.currentElapsedTime();
        try {
            
            Map recvset = new HashMap();

            
            Map outofelection = new HashMap();

            int notTimeout = minNotificationInterval;

            synchronized (this) {
                logicalclock.incrementAndGet();
                updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
            }

            LOG.info(
                "New election. My id = {}, proposed zxid=0x{}",
                self.getId(),
                Long.toHexString(proposedZxid));
            sendNotifications();

            SyncedLearnerTracker voteSet = null;

            

            while ((self.getPeerState() == ServerState.LOOKING) && (!stop)) {
                
                Notification n = recvqueue.poll(notTimeout, TimeUnit.MILLISECONDS);

                
                if (n == null) {
                    if (manager.haveDelivered()) {
                        sendNotifications();
                    } else {
                        manager.connectAll();
                    }

                    
                    notTimeout = Math.min(notTimeout << 1, maxNotificationInterval);

                    
                    if (self.getQuorumVerifier() instanceof QuorumOracleMaj
                            && self.getQuorumVerifier().revalidateVoteset(voteSet, notTimeout != minNotificationInterval)) {
                        setPeerState(proposedLeader, voteSet);
                        Vote endVote = new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch);
                        leaveInstance(endVote);
                        return endVote;
                    }

                    LOG.info("Notification time out: {} ms", notTimeout);

                } else if (validVoter(n.sid) && validVoter(n.leader)) {
                    
                    switch (n.state) {
                    case LOOKING:
                        if (getInitLastLoggedZxid() == -1) {
                            LOG.debug("Ignoring notification as our zxid is -1");
                            break;
                        }
                        if (n.zxid == -1) {
                            LOG.debug("Ignoring notification from member with -1 zxid {}", n.sid);
                            break;
                        }
                        // If notification > current, replace and send messages out
                        if (n.electionEpoch > logicalclock.get()) {
                            logicalclock.set(n.electionEpoch);
                            recvset.clear();
                            if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
                                updateProposal(n.leader, n.zxid, n.peerEpoch);
                            } else {
                                updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
                            }
                            sendNotifications();
                        } else if (n.electionEpoch < logicalclock.get()) {
                                LOG.debug(
                                    "Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x{}, logicalclock=0x{}",
                                    Long.toHexString(n.electionEpoch),
                                    Long.toHexString(logicalclock.get()));
                            break;
                        } else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) {
                            updateProposal(n.leader, n.zxid, n.peerEpoch);
                            sendNotifications();
                        }

                        LOG.debug(
                            "Adding vote: from={}, proposed leader={}, proposed zxid=0x{}, proposed election epoch=0x{}",
                            n.sid,
                            n.leader,
                            Long.toHexString(n.zxid),
                            Long.toHexString(n.electionEpoch));

                        // don't care about the version if it's in LOOKING state
                        recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));

                        voteSet = getVoteTracker(recvset, new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch));

                        if (voteSet.hasAllQuorums()) {

                            // Verify if there is any change in the proposed leader
                            while ((n = recvqueue.poll(finalizeWait, TimeUnit.MILLISECONDS)) != null) {
                                if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) {
                                    recvqueue.put(n);
                                    break;
                                }
                            }

                            
                            if (n == null) {
                                setPeerState(proposedLeader, voteSet);
                                Vote endVote = new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch);
                                leaveInstance(endVote);
                                return endVote;
                            }
                        }
                        break;
                    case OBSERVING:
                        LOG.debug("Notification from observer: {}", n.sid);
                        break;

                        
                    case FOLLOWING:
                        
                        Vote resultFN = receivedFollowingNotification(recvset, outofelection, voteSet, n);
                        if (resultFN == null) {
                            break;
                        } else {
                            return resultFN;
                        }
                    case LEADING:
                        
                        Vote resultLN = receivedLeadingNotification(recvset, outofelection, voteSet, n);
                        if (resultLN == null) {
                            break;
                        } else {
                            return resultLN;
                        }
                    default:
                        LOG.warn("Notification state unrecognized: {} (n.state), {}(n.sid)", n.state, n.sid);
                        break;
                    }
                } else {
                    if (!validVoter(n.leader)) {
                        LOG.warn("Ignoring notification for non-cluster member sid {} from sid {}", n.leader, n.sid);
                    }
                    if (!validVoter(n.sid)) {
                        LOG.warn("Ignoring notification for sid {} from non-quorum member sid {}", n.leader, n.sid);
                    }
                }
            }
            return null;
        } finally {
            try {
                if (self.jmxLeaderElectionBean != null) {
                    MBeanRegistry.getInstance().unregister(self.jmxLeaderElectionBean);
                }
            } catch (Exception e) {
                LOG.warn("Failed to unregister with JMX", e);
            }
            self.jmxLeaderElectionBean = null;
            LOG.debug("Number of connection processing threads: {}", manager.getConnectionThreadCount());
        }
    }

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/342714.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号