栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

注册中心介绍--zookeeper分析

注册中心介绍--zookeeper分析

上面几篇介绍了RPC调用相关的知识,这次我们了解下关于注册中心的原理和实践。我们主要从下面几个方面进行介绍:

  • 注册中心的作用及设计分析
  • 开源注册中心选型
  • Nacos注册中心深入分析
  • Zookeeper实现深入剖析
一、注册中心的作用及设计分析

什么是注册中心?

用来实现微服务实例的自动注册与发现,是分布式系统中的核心基础服务。

没有注册中心

我们可以思考一下没有注册中心的场景。

像上图一样,多个service调用ServiceA,然后ServiceA又需要调用ServiceB、ServiceC,ServiceC又是有多个实例。没有注册中心的时候,我们可以通过全局配置文件的方式,配置每个服务的节点信息保存起来。当服务信息发生变化的时候,通过每个服务更新本地的配置文件。这边存在的一个问题是,当这种增加服务的情况经常出现时,每个节点的上的配置文件慢慢变成不一样,这样就导致了这不是一个全局文件,而是每个节点自己维护的配置文件。

1、注册中心主要功能

注册中心的主要功能包括:

  • 服务注册:包括ip、端口,服务等
  • 服务发现:服务调用方从注册中心中找到需要的服务提供方节点
  • 健康检查:注册中心需要对服务提供方进行监控检查
  • 变更通知:当服务提供方出现变化时,注册中心通知服务调用方的变化
(1)服务注册

服务提供方将自身路由信息发布到注册中心,供消费方获取用于与提供方建立连接并发起调用。

  • 路由信息:注册服务节点的IP,监听端口等路由信息
  • 服务信息:序列化协议、路由规则、节点权重等
(2)服务发现

服务消费方通过访问注册中心获取服务提供节点路由信息。服务发现有一下3种策略:

  • 启动拉取:服务消费方启动后,从注册中心拉取提供方节点列表,建立连接,进行RPC调用
  • 通知回调:接收注册中心变更通知,重新获取数据,更新节点列表
  • 轮询拉取:兜底策略,服务消费方运行过程中定时拉取服务提供方节点列表,用来更新本地数据
(3)健康检查

确保已注册节点健康度,能够及时剔除失效节点,保证服务发现正确性。

在我们使用过程中,服务失效的原因有很多,包括:

  • 部署重启
  • 服务假死
  • 异常中止

对于上面服务失效的情况,我们有下面几种解决方案

  • 上报心跳:可以解决服务重启、异常中止的情况,对于服务假死的情况,不一定可以区分出来
  • 服务探测:比较高级的注册中心功能,需要我们自己开发适应相关的探测功能。
(4)变更通知

当服务提供方节点发生变更时,注册中心应该能够第一时间把变更事件或变更后的数据推送到服务订阅方。在注册中心内部的数据结构中,我们需要为每个服务提供方建立订阅列表,当服务提供方节点变更时通知所有订阅该服务的消费方节点。

2、注册中心的主要功能设计

如果我们自己要实现一个注册中心,需要包含哪些功能呢?通过上面的分析,我们可以发现最核心的部分一个是服务的注册发现,还有一个就是异常情况的处理。那这边就包括两个核心设计:

  • 数据存储
  • 超时处理
(1)注册中心存储设计

注册中心的存储包括:

  • 服务调用方与服务提供方的对,这样可以方便服务调用方快速的查询到想要调用的服务信息
  • 服务提供方与服务调用方的对,这样存储的意义是当服务提供方发生变动,进行对订阅这个服务的调用方发送事件通知;如果没有这种存储结构,就需要我们循环遍历上面的存储结构,找到订阅的调用方发送通知,这样非常影响性能。

存储系统主要关注点:

  • 数据可靠性:数据冗余存储,确保不会因为单节点故障导致数据丢失
  • 数据一致性:各节点间数据同步,保证数据一致性
  • 服务可用性:多节点对等的对外提供服务
(2)注册中心的其他功能

注册中心除了实现服务注册与发现,还可以用来实现服务治理相关的功能。

  • 服务扩容/缩容
  • 机器迁移
  • 权重
  • 灰度流量
(3)注册中心的思考

CAP定理:分布式系统中,C(数据一致性)、A(服务可用性)、P(分区容错性)只能满足其二。

那我们的注册中心作为分布式系统中核心的功能,应该选择什么样的存储(CP或AP)?这可能还是需要我们从业务场景出发。

从实践的角度出发,对于服务的消费方来说,可以获取到不同的节点列表明显好于无法获取到全部的服务提供方列表;对于服务提供方来说,部分节点提供服务明显好于全部不可用。综上所述,AP这种模型更加适合注册中心的功能。

3、注册中心的选型

注册中心的选型如果只考虑CAP就过于片面了,还需要结合实际场景,多维度综合评估。

  • 数据模型
  • 数据一致性
  • 健康检查
  • 性能与容量
  • 稳定性
  • 易用性
  • 集群扩展性
  • 成熟度
  • 社区活跃程度
(1)注册中心对比
特征zookeeperetcdconsuleureka
服务健康检查长连接心跳服务状态可配支持
多数据中心支持
kv存储服务支持支持支持
一致性zabraftraft弱一致性
CAP定理CPCPCPAP
watch支持支持支持长轮询
客户端访问SDKhttphttp&dnshttp
社区支持积极积极积极暂停
4、Nacos注册中心深入分析

Nacos是Dubbo生态中注册中心的实现。Nacos的功能包括:

  • 服务注册与健康检查
  • 数据模型
  • 数据一致性保障
(1)Nacos的健康检查

临时节点:心跳注册

持久化节点:tcp/http探活

对于临时节点来说,我们使用心跳上报的方式来检查服务的活性:

  • 每5秒上报一次心跳
  • 15秒没有接收到心跳将节点标记为不健康
  • 超过30秒没有接收到心跳,将这个临时节点剔除
(2)数据模型
数据存储

Nacos的数据存储类似上图,服务提供方分为多个集群,每个集群中有多个应用实例提供服务。这样做的优势是可以更大程度上保证服务可用。

数据隔离

4层数据隔离:

  • 账号
  • 命名空间
  • 分组
  • 服务名称
(3)数据一致性

zab、raft CP一致性

Distro AP一致性

5、Zookeeper实现深入分析 (1)节点角色

server节点组成的一个集群,在集群中存在一个唯一的leader节点负责响应写入请求,其他节点只负责接收转发client的请求

  • Leader:响应写入请求,发起提案,超过半数Follower同意写入,写入成功
  • Follower:响应查询,将写入请求发给Leader,参与选举和写入投票
  • ObServer:响应请求,将写入请求发给Leader,不参与投票,只接收写入结果
(2)选主逻辑

Zookeeper筛选leader,想要成为leader,需要获得法定数据票数才能成功,即获得一半以上的票数才能成为leader。

判断的依据:

  • Epoch:leader的任期
  • ZXID:Zookeeper事物ID,越大表示数据越新
  • SID:集群中每个节点的唯一编号

比较策略:任期大的胜出,任期相同比较ZXID大的胜出,ZXID相同比较SID大的胜出

参照上图,一般情况下Epoch每个节点都是一样,这边不考虑Epoch不同的情况。

下面我们对照选主的源码看下

public Vote lookForLeader() throws InterruptedException {
        try {
            self.jmxLeaderElectionBean = new LeaderElectionBean();
            MBeanRegistry.getInstance().register(self.jmxLeaderElectionBean, self.jmxLocalPeerBean);
        } catch (Exception e) {
            LOG.warn("Failed to register with JMX", e);
            self.jmxLeaderElectionBean = null;
        }

        self.start_fle = Time.currentElapsedTime();
        try {
            
            Map recvset = new HashMap();

            
            Map outofelection = new HashMap();

            int notTimeout = minNotificationInterval;

            synchronized (this) {
              	// 选举轮数加1
                logicalclock.incrementAndGet();
              	// 初始化投票信息,第一个参数如果有权限投票就是自己节点ID,第二个参数是当前节点处理的最大事务ID,第三个参数是任期值
                updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
            }

            LOG.info(
                "New election. My id = {}, proposed zxid=0x{}",
                self.getId(),
                Long.toHexString(proposedZxid));
          
          	// 发送选票信息,将信息发送到队列中
            sendNotifications();

            SyncedLearnerTracker voteSet = null;

            

          	// 当前节点没有停止并且是选主状态
            while ((self.getPeerState() == ServerState.LOOKING) && (!stop)) {
                
                Notification n = recvqueue.poll(notTimeout, TimeUnit.MILLISECONDS);

                
                if (n == null) {
                  	// 消息队列的消息都处理完了
                    if (manager.haveDelivered()) {
                      	// 发送最新的消息到消息队列
                        sendNotifications();
                    } else {
                      	// 与所有的server节点创建连接
                        manager.connectAll();
                    }

                    
                    notTimeout = Math.min(notTimeout << 1, maxNotificationInterval);

                    
                    if (self.getQuorumVerifier() instanceof QuorumOracleMaj
                            && self.getQuorumVerifier().revalidateVoteset(voteSet, notTimeout != minNotificationInterval)) {
                        setPeerState(proposedLeader, voteSet);
                        Vote endVote = new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch);
                        leaveInstance(endVote);
                        return endVote;
                    }

                    LOG.info("Notification time out: {} ms", notTimeout);

                } else if (validVoter(n.sid) && validVoter(n.leader)) {
                    
                    switch (n.state) {
                    case LOOKING: // 选主状态
                        // 异常判断
                        if (getInitLastLoggedZxid() == -1) {
                            LOG.debug("Ignoring notification as our zxid is -1");
                            break;
                        }
                        if (n.zxid == -1) {
                            LOG.debug("Ignoring notification from member with -1 zxid {}", n.sid);
                            break;
                        }
                        // 如果收到选票的选举轮数 > 当前节点的选举轮数
                        if (n.electionEpoch > logicalclock.get()) {
                            logicalclock.set(n.electionEpoch); // 更新当前的选举轮数
                            recvset.clear();  // 清理之前的选票记录
                           	// 根据我们上述描述的选主逻辑,分别先比较epoch、zxid、sid,然后更新选票
                            if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
                                updateProposal(n.leader, n.zxid, n.peerEpoch);
                            } else {
                                updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
                            }
                          	// 发送通知
                            sendNotifications();
                        } else if (n.electionEpoch < logicalclock.get()) { // 无效选票
                                LOG.debug(
                                    "Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x{}, logicalclock=0x{}",
                                    Long.toHexString(n.electionEpoch),
                                    Long.toHexString(logicalclock.get()));
                            break;
                        } else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) {
                            updateProposal(n.leader, n.zxid, n.peerEpoch);
                            sendNotifications();
                        }

                        LOG.debug(
                            "Adding vote: from={}, proposed leader={}, proposed zxid=0x{}, proposed election epoch=0x{}",
                            n.sid,
                            n.leader,
                            Long.toHexString(n.zxid),
                            Long.toHexString(n.electionEpoch));

                        // 保存选票信息
                        recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));

                        // 计票比较
                        voteSet = getVoteTracker(recvset, new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch));

                        if (voteSet.hasAllQuorums()) {

                            // 判断接下来的投票是否和选的结果一致
                            while ((n = recvqueue.poll(finalizeWait, TimeUnit.MILLISECONDS)) != null) {
                                if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) {
                                    recvqueue.put(n);
                                    break;
                                }
                            }

                            
                            if (n == null) {
                                setPeerState(proposedLeader, voteSet);
                                Vote endVote = new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch);
                                leaveInstance(endVote);
                                return endVote;
                            }
                        }
                        break;
                    case OBSERVING:
                        LOG.debug("Notification from observer: {}", n.sid);
                        break;

                        
                    case FOLLOWING:
                        
                        Vote resultFN = receivedFollowingNotification(recvset, outofelection, voteSet, n);
                        if (resultFN == null) {
                            break;
                        } else {
                            return resultFN;
                        }
                    case LEADING:
                        
                        Vote resultLN = receivedLeadingNotification(recvset, outofelection, voteSet, n);
                        if (resultLN == null) {
                            break;
                        } else {
                            return resultLN;
                        }
                    default:
                        LOG.warn("Notification state unrecognized: {} (n.state), {}(n.sid)", n.state, n.sid);
                        break;
                    }
                } else {
                    if (!validVoter(n.leader)) {
                        LOG.warn("Ignoring notification for non-cluster member sid {} from sid {}", n.leader, n.sid);
                    }
                    if (!validVoter(n.sid)) {
                        LOG.warn("Ignoring notification for sid {} from non-quorum member sid {}", n.leader, n.sid);
                    }
                }
            }
            return null;
        } finally {
            try {
                if (self.jmxLeaderElectionBean != null) {
                    MBeanRegistry.getInstance().unregister(self.jmxLeaderElectionBean);
                }
            } catch (Exception e) {
                LOG.warn("Failed to unregister with JMX", e);
            }
            self.jmxLeaderElectionBean = null;
            LOG.debug("Number of connection processing threads: {}", manager.getConnectionThreadCount());
        }
    }

有一个内部类:Messenger,里面有2个实现类:WorkReceiver和WorkSender。一个用来处理消息发送,一个用来处理消息接收。

(3)数据一致性保障

zookeeper有一个很有名的Zab协议,Zookeeper Atomic Broadcast。

zookeeper保证的不是强一致,而是顺序一致。

(4)数据模型

zookeeper的数据存储是树状结构存储数据,分为永久节点和临时节点。

  • DataNode
    • DataNode parent: 父节点的引用
    • byte data[]: 该节点存储数据
    • Long acl: acl控制权限
    • StatPersisted stat: 持久化节点状态
    • Set children: 自节点列表
  • DataTree
    • ConcurrentHashMap nodes: key是path,value是datanode
    • WatchManager dataWatches: 数据变更通知
    • WatchManager childWatches: 节点变更通知
    • String rootZookeeper: 根节点
    • Map ephemerals: 临时节点信息,key 是 session,value是path的集合
  • ZKDatabase
    • DataTree dataTree
    • ConcurrentHashMap sessionsWithTimeouts:客户端会话连接管理
    • FileTxnSnapLog snapLog: 事务日志

将zookeeper作为注册中心

  • 服务注册:创建临时Node
  • 服务发现:查询Node节点数据
  • 健康检查:临时节点
  • 信息订阅:Watch机制

zookeeper的劣势

  • zookeeper是顺序一致性,不保证读到最新数据
  • 选举过程中服务不可用
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/316478.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号