栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Zookeeper 源码分析

Zookeeper 源码分析

Zookeeper 源码分析
  • 算法基础
    • 拜占庭将军问题
    • Paxos 算法
      • Paxos算法描述:
      • Paxos算法流程
    • ZAB 协议
      • ZAB算法
      • Zab协议内容
        • 消息广播
        • 崩溃恢复
          • 崩溃恢复——Leader选举
          • 崩溃恢复——数据恢复
    • CAP
      • CAP理论概述
      • 用CAP理论来分析ZooKeeper
  • 源码详解
    • 辅助源码
      • 持久化源码
      • 序列化源码
    • ZK 服务端初始化源码解析
      • ZK服务端启动脚本分析
      • ZK服务端启动入口
      • 解析参数zoo.cfg和myid
      • 过期快照删除
      • 初始化通信组件
    • ZK 服务端加载数据源码解析
      • 冷启动数据恢复快照数据
      • 冷启动数据恢复编辑日志
      • 冷启动数据恢复快照数据
      • 冷启动数据恢复编辑日志
    • ZK 选举源码解析
      • Zookeeper选举机制——第一次启动
      • Zookeeper选举机制——非第一次启动
      • ZK选举源码解析
      • 选举准备
      • 选举执行
    • Follower 和 Leader 状态同步源码
      • Leader.lead()等待接收follower的状态同步申请
      • Follower.lead()查找并连接Leader
      • Leader.lead()创建LearnerHandler
      • Follower.lead()创建registerWithLeader
      • Leader.lead()接收Follwer状态,根据同步方式发送同步消息
      • Follower.lead()应答Leader同步结果
      • Leader.lead()应答Follower
    • 服务端 Leader 启动
    • 服务端 Follower 启动
    • 客户端启动
      • 客户端初始化源码解析
      • 创建ZookeeperMain
      • 初始化监听器
      • 解析连接地址
      • 创建通信
      • 执行run()

算法基础 拜占庭将军问题

拜占庭将军问题是一个协议问题, 拜占庭帝国军队的将军们必须全体一致的决定是否攻击某一支敌军。 问题是这些将军在地理上是分隔开来的, 并且将军中存在叛徒。 叛徒可以任意行动以达到以下目标: 欺骗某些将军采取进攻行动; 促成一个不是所有将军都同意的决定, 如当将军们不希望进攻时促成进攻行动; 或者迷惑某些将军, 使他们无法做出决定。 如果叛徒达到了这些目的之一, 则任何攻击行动的结果都是注定要失败的, 只有完全达成一致的努力才能获得胜利。

Paxos 算法

Paxos 算法: 一种基于消息传递且具有高度容错特性的一致性算法

Paxos 算法解决的问题: 就是如何快速正确的在一个分布式系统中对某个数据值达成一致,并且保证不论发生任何异常,都不会破坏整个系统的一致性

Paxos算法描述:

在一个Paxos系统中,首先将所有节点划分为 Proposer(提议者), Acceptor(接受者) ,和 Learner(学习者) 。(注意:每个节点都可以身兼数职)

一个完整的Paxos算法流程分为三个阶段:

  • Prepare 准备阶段

Proposer向多个Acceptor发出Propose请求Promise(承诺)
Acceptor针对收到的Propose请求进行Promise(承诺)

  • Accept 接受阶段

Proposer收到多数Acceptor承诺的Promise后,向Acceptor发出Propose请求
Acceptor针对收到的Propose请求进行Accept处理

  • Learn 学习阶段:

Proposer将形成的决议发送给所有Learners

Paxos算法流程
  1. Prepare : Proposer 生成全局唯一且递增的 Proposal ID , 向所有 Acceptor 发送 Propose 请求, 这里无需携带提案内容, 只携带 Proposal ID 即可

  2. Promise : Acceptor 收到 Propose 请求后, 做出 " 两个承诺, 一个应答 "

不再接受 Proposal ID <= 当前请求的 Propose 请求
不再接受Proposal ID < 当前请求的 Accept 请求
不违背以前做出的承诺下, 回复已经 Accept 过的提案中 Proposal ID 最大的那个提案的 Value 和 Proposal ID, 没有则返回空值

  1. Propose : Proposer 收到多数 Acceptor 的 Promise 应答后, 从应答中选择 Proposal ID 最大的提案的Value, 作为本次要发起的提案。 如果所有应答的提案Value均为空值, 则可以自己随意决定提案 Value。 然后携带当前 Proposal ID, 向所有 Acceptor 发送 Propose 请求

  2. Accept : Acceptor 收到 Propose 请求后, 在不违背自己之前做出的承诺下, 接受并持久化当前 Proposal ID 和提案 Value

  3. Learn : Proposer 收到多数 Acceptor 的 Accept 后, 决议形成, 将形成的决议发送给所有Learner

下面我们针对上述描述做三种情况的推演举例:为了简化流程,我们这里不设置 Learner

情况1:

有A1, A2, A3, A4, A5 5位议员,就税率问题进行决议

情况2:

Paxos 算法缺陷:在网络复杂的情况下,一个应用 Paxos 算法的分布式系统,可能很久无法收敛,甚至陷入活锁的情况

情况3:

现在我们假设在A1提出提案的同时, A5决定将税率定为20%

  • A1, A5同时发起Propose(序号分别为1, 2)
  • A2承诺A1, A4承诺A5, A3行为成为关键
  • 情况2: A3先收到A1消息,承诺A1。之后立刻收到A5消息,承诺A5。
  • A1发起Proposal(1, 10%),无足够响应, A1重新Propose (序号3), A3再次承诺A1。
  • A5发起Proposal(2, 20%),无足够相应。 A5重新Propose (序号4), A3再次承诺A5。

造成这种情况的原因是系统中有一个以上的 Proposer,多个 Proposers 相互争夺 Acceptor,造成迟迟无法达成一致的情况。 针对这种情况,一种改进的 Paxos 算法被提出:从系统中选出一个节点作为 Leader,只有 Leader 能够发起提案。 这样,一次 Paxos 流程中只有一个 Proposer,不会出现活锁的情况,此时只会出现例子中第一种情况

ZAB 协议 ZAB算法

Zab 借鉴了 Paxos 算法,是特别为 Zookeeper 设计的支持崩溃恢复的原子广播协议。基于该协议, Zookeeper 设计为只有一台客户端(Leader)负责处理外部的写事务请求,然后 Leader 客户端将数据同步到其他 Follower 节点。 即 Zookeeper 只有一个 Leader 可以发起提案。

Zab协议内容

Zab 协议包括两种基本的模式: 消息广播、 崩溃恢复

消息广播

ZAB协议针对事务请求的处理过程 类似于一个两阶段提交过程
(1) 广播事务阶段
(2) 广播提交操作
这两阶段提交模型如下, 有可能因 为Leader宕机带来数据不一致, 比如
( 1 ) Leader 发 起 一 个 事 务
Proposal1 后 就 宕 机 , Follower 都 没 有 Proposal1
( 2) Leader收到半数ACK宕机, 没来得及向Follower发送Commit

(1) 客户端发起一个写操作请求。
(2) Leader服务器将客户端的请求转化为事务Proposal 提案, 同时为每个Proposal 分配一个全局的ID, 即zxid。
(3) Leader服务器为每个Follower服务器分配一个单独的队列, 然后将需要广播的 Proposal依次放到队列中去, 并且根据FIFO策略进行消息发送。
(4) Follower接收到Proposal后, 会首先将其以事务日志的方式写入本地磁盘中, 写入成功后向Leader反馈一个Ack响应消息。
(5) Leader接收到超过半数以上Follower的Ack响应消息后, 即认为消息发送成功, 可以发送commit消息。
(6) Leader向所有Follower广播commit消息, 同时自身也会完成事务提交。 Follower 接收到commit消息后, 会将上一条事务提交。

(7) Zookeeper采用Zab协议的核心, 就是只要有一台服务器提交了Proposal, 就要确保所有的服务器最终都能正确提交Proposal。

崩溃恢复

崩溃恢复主要包括两部分: Leader 选举和数据恢复

一旦 Leader 服务器出现崩溃或者由于网络原因导致 Leader 服务器失去了与过半 Follower 的联系,那么就会进入崩溃恢复模式

假设两种服务器异常情况:

  • 假设一个事务在 Leader 提出之后, Leader 挂了。

  • Zab协议崩溃恢复要求满足以下两个要求:

确保已经被Leader提交的提案Proposal, 必须最终被所有的Follower服务器提交。 (已经产生的提案, Follower必须执行)

确保丢弃已经被Leader提出的, 但是没有被提交的Proposal。 (丢弃胎死腹中的提案)

崩溃恢复——Leader选举

Leader选举: 根据上述要求, Zab协议需要保证选举出来的Leader需要满足以下条件:

  • 新选举出来的 Leader 不能包含未提交的 Proposal 。 即新 Leader 必须都是已经提交了 Proposal 的 Follower 服务器节点
  • 新选举的 Leader 节点中含有最大的 zxid 。 这样做的好处是可以避免Leader 服务器检查 Proposal 的提交和丢弃工作
崩溃恢复——数据恢复

Zab如何数据同步:

(1) 完成 Leader 选举后, 在正式开始工作之前(接收事务请求, 然后提出新的 Proposal ) , Leader 服务器会首先确认事务日志中的所有的Proposal 是否已经被集群中过半的服务器 Commit

2) Leader服务器需要确保所有的Follower服务器能够接收到每一条事务的Proposal, 并且能将所有已经提交的事务Proposal 应用到内存数据中。 等到Follower将所有尚未同步的事务Proposal都从Leader服务器上同步过, 并且应用到内存数据中以后, Leader才会把该Follower加入到真正可用的Follower列表中

CAP CAP理论概述

一个分布式系统不可能同时满足以下三种

  • 一致性(C : Consistency)
  • 可用性(A : Available)
  • 分区容错性( P : Partition Tolerance)

这三个基本需求, 最多只能同时满足其中的两项, 因为P是必须的, 因此往往选择就在 CP 或者 AP 中

  • 一致性( C : Consistency)

在分布式环境中, 一致性是指数据在多个副本之间是否能够保持数据一致的特性。 在一致性的需求下, 当一个系统在数据一致的状态下执行更新操作后, 应该保证系统的数据仍然处于一致的状态

  • 可用性(A: Available)

可用性是指系统提供的服务必须一直处于可用的状态, 对于用户的每一个操作请求总是能够在有限的时间内返回结果

  • 分区容错性( P : Partition Tolerance)

分布式系统在遇到任何网络分区故障的时候, 仍然需要能够保证对外提供满足一致性和可用性的服务, 除非是整个网络环境都发生了故障

用CAP理论来分析ZooKeeper

ZooKeeper 保证的是 CP

  • ZooKeeper 不能保证每次服务请求的可用性。 (注:在极端环境下, ZooKeeper 可能会丢弃一些请求, 消费者程序需要重新请求才能获得结果) 。 所以说, ZooKeeper 不能保证服务可用性

  • 进行Leader选举时集群都是不可用

源码详解 辅助源码 持久化源码

Leader 和 Follower 中的数据会在内存和磁盘中各保存一份。所以需要将内存中的数据持久化到磁盘中

在 org.apache.zookeeper.server.persistence 包下的相关类都是序列化相关的代码

快照

public interface SnapShot {
	// 反序列化方法
	long deserialize(DataTree dt, Map sessions) throws IOException;
	
	// 序列化方法
	void serialize(DataTree dt, Map sessions, File name) throws IOException;
	
	
	File findMostRecentSnapshot() throws IOException;
	
	// 释放资源
	void close() throws IOException;
}
public interface TxnLog {
	// 设置服务状态
	void setServerStats(ServerStats serverStats);
	// 滚动日志
	void rollLog() throws IOException;
	// 追加
	boolean append(TxnHeader hdr, Record r) throws IOException;
	// 读取数据
	TxnIterator read(long zxid) throws IOException;
	// 获取最后一个 zxid
	long getLastLoggedZxid() throws IOException;
	// 删除日志
	boolean truncate(long zxid) throws IOException;
	// 获取 DbId
	long getDbId() throws IOException;
	// 提交
	void commit() throws IOException;
	// 日志同步时间
	long getTxnLogSyncElapsedTime();
	// 关闭日志
	void close() throws IOException;
	// 读取日志的接口
	public interface TxnIterator {
		// 获取头信息
		TxnHeader getHeader();
		// 获取传输的内容
		Record getTxn();
		// 下一条记录
		boolean next() throws IOException;
		// 关闭资源
		void close() throws IOException;
		// 获取存储的大小
		long getStorageSize() throws IOException;
	}
}

处理持久化的核心类

序列化源码

zookeeper-jute 代码是关于 Zookeeper 序列化相关源码

序列化和反序列化方法

public interface Record {
	// 序列化方法
	public void serialize(OutputArchive archive, String tag) throws IOException;
	
	// 反序列化方法
	public void deserialize(InputArchive archive, String tag) throws IOException;
}

迭代

public interface Index {
	// 结束
	public boolean done();
	// 下一个
	public void incr();
}

序列化支持的数据类型

public interface OutputArchive {
	public void writeByte(byte b, String tag) throws IOException;
	public void writeBool(boolean b, String tag) throws IOException;
	public void writeInt(int i, String tag) throws IOException;
	public void writeLong(long l, String tag) throws IOException;
	public void writeFloat(float f, String tag) throws IOException;
	public void writeDouble(double d, String tag) throws IOException;
	public void writeString(String s, String tag) throws IOException;
	public void writeBuffer(byte buf[], String tag) throws IOException;
	public void writeRecord(Record r, String tag) throws IOException;
	public void startRecord(Record r, String tag) throws IOException;
	public void endRecord(Record r, String tag) throws IOException;
	public void startVector(List v, String tag) throws IOException;
	public void endVector(List v, String tag) throws IOException;
	public void startMap(TreeMap v, String tag) throws IOException;
	public void endMap(TreeMap v, String tag) throws IOException;
}

反序列化支持的数据类型

public interface InputArchive {
	public byte readByte(String tag) throws IOException;
	public boolean readBool(String tag) throws IOException;
	public int readInt(String tag) throws IOException;
	public long readLong(String tag) throws IOException;
	public float readFloat(String tag) throws IOException;
	public double readDouble(String tag) throws IOException;
	public String readString(String tag) throws IOException;
	public byte[] readBuffer(String tag) throws IOException;
	public void readRecord(Record r, String tag) throws IOException;
	public void startRecord(String tag) throws IOException;
	public void endRecord(String tag) throws IOException;
	public Index startVector(String tag) throws IOException;
	public void endVector(String tag) throws IOException;
	public Index startMap(String tag) throws IOException;
	public void endMap(String tag) throws IOException;
}
ZK 服务端初始化源码解析 ZK服务端启动脚本分析

Zookeeper 服务的启动命令是 zkServer.sh start

ZK服务端启动入口

ctrl + n,查找 QuorumPeerMain

解析参数zoo.cfg和myid

QuorumPeerConfig.java

过期快照删除

可以启动定时任务,对过期的快照,执行删除。默认该功能时关闭的

初始化通信组件 ZK 服务端加载数据源码解析 冷启动数据恢复快照数据
  • zk 中的数据模型,是一棵树, DataTree,每个节点,叫做 DataNode
  • zk 集群中的 DataTree 时刻保持状态同步
  • Zookeeper 集群中每个 zk 节点中,数据在内存和磁盘中都有一份完整的数据

内存数据: DataTree
磁盘数据:快照文件 + 编辑日志

冷启动数据恢复编辑日志

冷启动数据恢复快照数据 冷启动数据恢复编辑日志 ZK 选举源码解析 Zookeeper选举机制——第一次启动

Zookeeper选举机制——非第一次启动

ZK选举源码解析

选举准备 选举执行

Follower 和 Leader 状态同步源码

Leader.lead()等待接收follower的状态同步申请 Follower.lead()查找并连接Leader Leader.lead()创建LearnerHandler Follower.lead()创建registerWithLeader Leader.lead()接收Follwer状态,根据同步方式发送同步消息 Follower.lead()应答Leader同步结果 Leader.lead()应答Follower 服务端 Leader 启动

服务端 Follower 启动

客户端启动 客户端初始化源码解析

创建ZookeeperMain 初始化监听器 解析连接地址 创建通信 执行run()
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/604181.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号