栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

zookeeper-2.原理知识,paxos、zab、角色功能、API开发基础

zookeeper-2.原理知识,paxos、zab、角色功能、API开发基础

zookeeper

ZooKeeper是提供分布式“协调”的,而不是提供分布式服务的。

在以往的分布式系统中,最典型的集群模式是 master/slave 模式(主备模式),我们把所有能够处理写操作的机器成为Master机器,把所有通过异步复制方式获取最新数据,并提供度服务的机器称为Slave机器
而在 ZooKeeper中,这些概念被颠覆了。它没有引入 master/slave 的概念 ,而是引入了 leader、follower、observer三种角色。
leader服务器为客户端提供读和写服务
follower、observer 都能提供读服务,区别在于 observer 不参与 leader 的选举过程,也不参与写操作的“过半写成功”策略。
因此,observer 可以在不影响写性能的情况下,提升集群的读性能。

paxos

什么是paxos,Paxos算法是基于消息传递且具有高度容错特性的一致性算法,是目前公认的解决分布式一致性问题最有效的算法之一。
在paxos 理论上作者指出基于没有拜占庭将军问题才能成立!国外说话跟咱们就是不一样啊,还拜占庭,没有拜占庭将军就是网络是可靠的,不会被入侵破坏
拜占庭将军问题(Byzantine failures),是由莱斯利·兰伯特提出的点对点通信中的基本问题。含义是在存在消息丢失的不可靠信道上试图通过消息传递的方式达到一致性是不可能的

关于这个算法网上讲解有很多

比较不错的豆瓣链接
比较不错的维基链接
只有干货没有故事的链接

作者这里抄一下豆瓣的链接举例,人家写的比我好

Paxos描述了这样一个场景,有一个叫做Paxos的小岛(Island)上面住了一批居民,岛上面所有的事情由一些特殊的人决定,他们叫做议员(Senator)。议员的总数(Senator Count)是确定的,不能更改。岛上每次环境事务的变更都需要通过一个提议(Proposal),每个提议都有一个编号(PID),这个编号是一直增长的,不能倒退。每个提议都需要超过半数((Senator Count)/2 +1)的议员同意才能生效。每个议员只会同意大于当前编号的提议,包括已生效的和未生效的。如果议员收到小于等于当前编号的提议,他会拒绝,并告知对方:你的提议已经有人提过了。这里的当前编号是每个议员在自己记事本上面记录的编号,他不断更新这个编号。整个议会不能保证所有议员记事本上的编号总是相同的。现在议会有一个目标:保证所有的议员对于提议都能达成一致的看法。

好,现在议会开始运作,所有议员一开始记事本上面记录的编号都是0。有一个议员发了一个提议:将电费设定为1元/度。他首先看了一下记事本,嗯,当前提议编号是0,那么我的这个提议的编号就是1,于是他给所有议员发消息:1号提议,设定电费1元/度。其他议员收到消息以后查了一下记事本,哦,当前提议编号是0,这个提议可接受,于是他记录下这个提议并回复:我接受你的1号提议,同时他在记事本上记录:当前提议编号为1。发起提议的议员收到了超过半数的回复,立即给所有人发通知:1号提议生效!收到的议员会修改他的记事本,将1好提议由记录改成正式的法令,当有人问他电费为多少时,他会查看法令并告诉对方:1元/度。

现在看冲突的解决:假设总共有三个议员S1-S3,S1和S2同时发起了一个提议:1号提议,设定电费。S1想设为1元/度, S2想设为2元/度。结果S3先收到了S1的提议,于是他做了和前面同样的操作。紧接着他又收到了S2的提议,结果他一查记事本,咦,这个提议的编号小于等于我的当前编号1,于是他拒绝了这个提议:对不起,这个提议先前提过了。于是S2的提议被拒绝,S1正式发布了提议: 1号提议生效。S2向S1或者S3打听并更新了1号法令的内容,然后他可以选择继续发起2号提议。

好,我觉得Paxos的精华就这么多内容。现在让我们来对号入座,看看在ZK Server里面Paxos是如何得以贯彻实施的。

小岛(Island)——ZK Server Cluster 我们客户端

议员(Senator)——ZK Server 服务端

提议(Proposal)——ZNode Change(Create/Delete/SetData…) 各种api

提议编号(PID)——Zxid(ZooKeeper Transaction Id) 事务id

正式法令——所有ZNode及其数据

貌似关键的概念都能一一对应上,但是等一下,Paxos岛上的议员应该是人人平等的吧,而ZK Server好像有一个Leader的概念。没错,其实Leader的概念也应该属于Paxos范畴的。如果议员人人平等,在某种情况下会由于提议的冲突而产生一个“活锁”(所谓活锁我的理解是大家都没有死,都在动,但是一直解决不了冲突问题)。Paxos的作者Lamport在他的文章”The Part-Time Parliament“中阐述了这个问题并给出了解决方案——在所有议员中设立一个总统,只有总统有权发出提议,如果议员有自己的提议,必须发给总统并由总统来提出。好,我们又多了一个角色:总统。

总统——ZK Server Leader

又一个问题产生了,总统怎么选出来的?

现在我们假设总统已经选好了,下面看看ZK Server是怎么实施的。

情况一:
屁民甲(Client)到某个议员(ZK Server)那里询问(Get)某条法令的情况(ZNode的数据),议员毫不犹豫的拿出他的记事本(local storage),查阅法令并告诉他结果,同时声明:我的数据不一定是最新的。你想要最新的数据?没问题,等着,等我找总统Sync一下再告诉你。

情况二:
屁民乙(Client)到某个议员(ZK Server)那里要求政府归还欠他的一万元钱,议员让他在办公室等着,自己将问题反映给了总统,总统询问所有议员的意见,多数议员表示欠屁民的钱一定要还,于是总统发表声明,从国库中拿出一万元还债,国库总资产由100万变成99万。屁民乙拿到钱回去了(Client函数返回)。 客户端对数据增删改查,访问到追随者 然后在给leader,然后leader再发起投票,然后在减去1W

情况三:
总统突然挂了,议员接二连三的发现联系不上总统,于是各自发表声明,推选新的总统,总统大选期间政府停业,拒绝屁民的请求

呵呵,到此为止吧,当然还有很多其他的情况,但这些情况总是能在Paxos的算法中找到原型并加以解决。这也正是我们认为Paxos是Zookeeper的灵魂的原因。当然ZK Server还有很多属于自己特性的东西:Session, Watcher,Version等等等等,需要我们花更多的时间去研究和学习。

讲解下这个图片 一开始客户端 client 去follower 跟随着 去创建(1).create,这个时候follower 是不能做主的,所以去找头部 leader 创建请求(2),leader 创建(3). 事务id zxid ,(4-1) leader去通知他的小弟们是否可以创建,小弟并发挥是否统一,小弟有一半以上同意,代表创建成功,然后leader,在把创建成功的消息给他的小弟们

Zab协议原理:对Paxos的简化

Zab协议要求每个 Leader 都要经历三个阶段:发现,同步,广播。

发现:要求zookeeper集群必须选举出一个 Leader 进程,同时 Leader 会维护一个 Follower 可用客户端列表。将来客户端可以和这些 Follower节点进行通信。

同步:Leader 要负责将本身的数据与 Follower 完成同步,做到多副本存储。这样也是提现了CAP中的高可用和分区容错。Follower将队列中未处理完的请求消费完成后,写入本地事务日志中。

广播:Leader 可以接受客户端新的事务Proposal请求,将新的Proposal请求广播给所有的 Follower。

leader挂掉或重启集群

选择myid 最大的那个做leader

watch


在 ZooKeeper 中,引入了 Watcher 机制来实现分布式通知的功能.

ZooKeeper 允许客户端向服务端注册一个 Watcher 监听,当服务端的一些指定事件触发了这个Watcher,那么就会向指定客户端发送一个事件通知,来实现分布式通知功能。

API调用

zk使用那个版本,导入的jar一定是那个版本 ,就是这么个尿性

 
            org.apache.zookeeper
            zookeeper
            3.6.3
  

简单示例
提示一下 为什么链接的是2181,因为是zookeeper默认的,在重申一遍之前配置文件的几个端口
service.N =YYY: A:B
N:代表服务器编号(也就是myid里面的值)
YYY:服务器地址
A:表示 Flower 跟 Leader的通信端口,简称服务端内部通信的端口(默认2888)
B:表示 是选举端口(默认是3888)

package org.example;

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;

public class App {
    public static void main(String[] args) throws IOException, InterruptedException, KeeperException {

        // zk 是有 session 概念的,没有连接池的概念。
        // 每一个连接,使用的是一个独立的 session。

        // watch:观察,回调。watch 的注册只发生在读类型调用,如 get,exists方法等。
        // Watcher分为两类:
        // 第一类:new 的时候传入的 Watcher,是session级别的,与path,node没有关系
        // 第二类:在调用 getData 的时候传入 Watcher
        CountDownLatch countDownLatch = new CountDownLatch(1);
        // 设置临时节点,断开后数据保留3000ms
        ZooKeeper zk = new ZooKeeper("192.168.1.129:2181,192.168.1.130:2181,192.168.1.130,192.168.1.131", 3000, new Watcher() {
        //Watch 的回调方法!
            @Override
            public void process(WatchedEvent watchedEvent) {//回调方法
                Event.KeeperState state = watchedEvent.getState();
                Event.EventType type = watchedEvent.getType();
                String path = watchedEvent.getPath();
                System.out.println(watchedEvent.toString());
                switch (state) {//alt+enter自动生成 switch内容
                    case Unknown:
                        break;
                    case Disconnected:
                        break;
                    case NoSyncConnected:
                        break;
                    case SyncConnected:
                        System.out.println("success connected!!");
                        countDownLatch.countDown();
                        break;
                    case AuthFailed:
                        break;
                    case ConnectedReadOnly:
                        break;
                    case SaslAuthenticated:
                        break;
                    case Expired:
                        break;
                    case Closed:
                        break;
                }
                switch (type) {
                    case None:
                        break;
                    case NodeCreated:
                        break;
                    case NodeDeleted:
                        break;
                    case NodeDataChanged:
                        break;
                    case NodeChildrenChanged:
                        break;
                    case DataWatchRemoved:
                        break;
                    case ChildWatchRemoved:
                        break;
                    case PersistentWatchRemoved:
                        break;
                }
            }
        });
        countDownLatch.await();
        ZooKeeper.States state = zk.getState();
        switch (state) {
            case CONNECTING:
                System.out.println("CONNECTING");// new对象之后,还没有真正准备好,是异步进行连接的。需要添加countDownLatch。
                break;
            case ASSOCIATING:
                break;
            case CONNECTED:
                System.out.println("CONNECTED");
                break;
            case CONNECTEDREADONLY:
                break;
            case CLOSED:
                break;
            case AUTH_FAILED:
                break;
            case NOT_CONNECTED:
                break;
        }

        // 增删改查
        // 1、创建目录
        String pathName = zk.create("/foo", "old data".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);

        // 2、取节点数据
        Stat stat = new Stat();
        byte[] data = zk.getData("/foo", new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                System.out.println("[In getData, watchedEvent is ]" + watchedEvent.toString());//这个回调是一次性的
                try {
                    zk.getData("/foo", this, stat);//继续把watch放进来,后面还能回调。如果将这里的this改为true的话,使用的是默认watch,也就是new对象时的watch
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }, stat);
        System.out.println("The old data is: " + new String(data));

        // 3、修改数据
        Stat stat1 = zk.setData("/foo", "new data1".getBytes(), 0);
        Stat stat2 = zk.setData("/foo", "new data2".getBytes(), stat1.getVersion());

        // 异步回调方法,这里不会在等待数据的时候阻塞
        System.out.println("----- 异步回调方法-之前 -----");
        zk.getData("/foo", false, new AsyncCallback.DataCallback() {
            @Override
            public void processResult(int i, String s, Object o, byte[] bytes, Stat stat) {
                System.out.println("执行异步回调,取到数据:" + new String(bytes));
                System.out.println(o.toString());
            }
        }, "abc");
        System.out.println("----- 异步回调方法-之后-----");

        Thread.sleep(1000000);
    }
}

输出

WatchedEvent state:SyncConnected type:None path:null
success connected!!
ConNECTED
The old data is: olddata
[In getData, watchedEvent is ]WatchedEvent state:SyncConnected type:NodeDataChanged path:/foo
[In getData, watchedEvent is ]WatchedEvent state:SyncConnected type:NodeDataChanged path:/foo
----- 异步回调方法-之前 -----
----- 异步回调方法-之后-----
执行异步回调,取到数据:new data2
abc

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/747404.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号