- 前言:
- 1 zookeeper概述
- 1.1 作用
- 1.2 特点
- 1.3 数据结构
- 1.4 应用场景
- 2 zookeeper安装
- 2.1 本地模式安装与配置
- 2.2 集群模式安装与配置
- 3 zookeeper选举机制
- 4 zookeeper实操
- 4.1 客户端命令行操作
- 4.1.1 命令行语法:
- 4.1.2 节点操作
- 4.1.3 监听器
- 4.2 客户端API操作
- 4.3 服务器动态上下监听案例
- 4.3.1 需求
- 4.3.2 服务器端代码
- 4.3.3 客户端代码
- 4.4 分布式锁案例
前言:
zookeeper是分布式应用程序协调服务,负责协调其他的大数据框架。学习要求:会用即可。因此本文主要简单阐述一下zookeeper是怎么用的,基本的原理是啥。
1 zookeeper概述 1.1 作用zookeeper是一个基于观察者设计模式的分布式服务管理框架,负责存储和管理集群中几点的数据,接受观察者的注册,一旦这些数据状态发生改变,Zookeeper就将负责通知已经在zookeeper中注册的那些观察者做出相应的反映。
zookeeper = 文件系统 + 通知机制
zookeeper也是一个分布式的程序,具有多个节点,具有如下特点:
- 有一个领导者(Leader),多个跟随者(follower)组成的集群
- 集群中只要有半数以上节点活着,zookeeper就能正常服务,所以zookeeper适合安装奇数台服务器。
- 全局数据一致:每一个server爆粗那一份相同的副本,Client无论连接到哪一台Server,拿到的数据都是一致的。
- 更新请求顺序执行,来自同一个Client的更新请求按照其发送顺序依次执行
- 数据更新原子性,一个数据更新要么成功,要么失败
- 实时性,在一定时间范围内,Client能够读到最新的数据。
zookeeper数据模型以节点树的数据结构构建,每个节点称为一个znode,每一个znode默认存储1MB的数据(可以看到数据量很小,只可以存储一些配置信息)。
-
统一命名服务
-
统一配置管理(之前hadoop的学习使用的是分发功能)
-
统一集群管理
-
服务器节点动态上下线
-
软负载均衡(节点间的负载均衡)
官网:https://zookeeper.apache.org/
下载地址:https://archive.apache.org/dist/zookeeper/zookeeper-3.5.7/
本文是基于3.5.7来进行安装的。
安装步骤:
1)下载、上传、解压、安装JDK
2)修改配置:
- 将安装目录下conf/zoo_sample.cfg改名为zoo.cfg
- 修改zoo.cfg文件的dataDir为自己创建的非tmp目录(默认值在/tmp目录中,linux系统隔一段时间会删除这些文件),修改后结果类似于下图。
3)zookeeper初步了解:
查看zookeeper的目录结构:
- bin:框架相关命令操作:启动、停止,客户端、服务端
- conf: 配置文件
- docs文档
- lib 配置文档的依赖
查看bin目录下的文件
用的多的有:
- 集群启动:zkServer.sh start
- 集群关闭:zkServer.sh stop
- 查看状态:zkServer.sh status
- 客户端启动:zkCli.sh
- 客户端退出:quit
- 查看进程是否启动:jps
第2步中修改了zookeeper配置文件,其主要参数除了dataDir之外还有:
其具体含义为:
- tickTime = 2000:zookeeper服务器与客户端心跳时间,单位毫秒
- initLimit = 10:初始化通信的时间,最多不能超过的时间(10*心跳时间),超过的话,通信失败
- syncLimit = 5:建立好连接后,下次的通信时间如果超过(5*心跳时间),通信失败
- dataDir:保存zookeeper的数据,默认是tmp会被系统定期清除
- clientPort =2181:客户端的连接端口,一般不需要修改
与hadoop需要在配置文件中指定谁是namenode、secondary NameNode、resourceMannager不同,zookeeper它能够自己选出leader,所以不需要进行相关配置。但是需要自己配置每台机器的myid(用来唯一标识每一台机器的ID,每台机器不能重复),并需要在zoo.cfg文件中配置一下,告诉所有机器zookeeper集群本应该在哪几台机器上运行。
接下来进入安装步骤:
1)按照本地模式安装与配置安装一台机器
2)在存储目录dataDir中新建myid文件,并在其中写入惟一的编号。
3)将安装目录和存储目录分发到你要配置的机器上,然后分别为他们修改myid。
4)需要重新配置zoo.cfg文件:增加如下配置,之后分发个各个集群:
server.1=hadoop100:2888:3888 server.3=hadoop102:2888:3888 server.4=hadoop103:2888:3888
当前主要配置编号的参数是server.A=B:C:D
- A:标识第几台服务器(myid)。zookeeper启动时读取此文件,拿到里面的数据与zoo.cfg里面的配置信息比较从而判断到底是哪个server。
- B:标识服务器映射地址。
- C:标识服务器 Follower 与集群中的 Leader 服务器交换信息的端口。
- D:主要是用来选举,如果Leader 服务器挂了。这个端口就是用来执行选举时服务器相互通信的端口,通过这个端口进行重新选举leader。
5)制作批量启动、停止、查看状态的脚本
#!/bin/bash
case $1 in
"start"){
for i in hadoop100 hadoop102 hadoop103
do
echo ---------- zookeeper $i 启动 ------------
ssh $i "/opt/module/zookeeper-3.5.7/bin/zkServer.sh start"
done
};;
"stop"){
for i in hadoop100 hadoop102 hadoop103
do
echo ---------- zookeeper $i 停止 ------------
ssh $i "/opt/module/zookeeper-3.5.7/bin/zkServer.sh stop"
done
};;
"status"){
for i in hadoop100 hadoop102 hadoop103
do
echo ---------- zookeeper $i 状态 ------------
ssh $i "/opt/module/zookeeper-3.5.7/bin/zkServer.sh status"
done
};;
esac
做一下说明:这个脚本能用前提是配置了主机名称映射和ssh免密登录和可执行权限。运行启动脚本,查看状态,得到如下图:
解释一下:hadoop100配置的myid = 1,hadoop102配置的myid = 3,hadoop103配置的myid = 4。为啥hadoop2是leader下面揭晓。
这个PPT讲的很清楚了:
唯一解释一下的是ZXID:
Zxid 是一个 64 位的数字,其中低 32位是一个简单的单调递增的计数器,针对客户端每一个事 务请求,计数器加 1;而高 32 位则代表Leader 周期 epoch 的编号。
每个当选产生一个新的 Leader服务器,就会从这个 Leader服务器上取出其本地日志中最大事务的 ZXID,并从中读取 epoch值,然后加 1,以此作为新的 epoch,并将低 32 位从 0 开始计数。
Zxid(Transaction id)类似于 RDBMS 中的事务 ID,用于标识一次更新操作的 Proposal(提议)
ID。为了保证顺序性,该 zkid 必须单调递增。
依据上面选举的原理,就不难理解为啥hadoop102是leader了而不是hadoop103。初始化谁是leader与启动顺序和myid都有关系。
4 zookeeper实操 4.1 客户端命令行操作客户端是对节点进行操作,先总结语法和能实现的功能,再分别介绍节点的操作和监听器的原理。
4.1.1 命令行语法:| 命令 | 功能 |
|---|---|
| help | 显示所有操作命令 |
| ls path | 使用 ls 命令来查看当前 znode 的子节点 [可监听] ;-w 监听子节点变化;-s 附加次级信息 |
| create | 普通创建;-s 含有序列;-e 临时(重启或者超时消失) |
| get path | 获得节点的值 [可监听] ,-w 监听节点内容变化-s 附加次级信息 |
| set | 设置节点的具体值 |
| stat | 查看节点状态 |
| delete | 删除节点 |
| deleteall | 递归删除节点 |
实操:
1)首先开启客户端zkCli.sh
注意:客户端默认启动的是本地客户端,如果要启动专门的服务器,启动客户端的时候后缀要加上./zkCli.sh -server 服务器名:2181(eg:./zkCli.sh -server hadoop102:2181)
2)查看根目录包含的内容
3)查看节点的详细数据
其中各项的含义是:
| 名称 | 表述 |
|---|---|
| czxid | 创建节点的事务 zxid,每次修改 ZooKeeper 状态都会产生一个 ZooKeeper 事务 ID。事务 ID 是 ZooKeeper 中所有修改总的次序。每次修改都有唯一的 zxid,如果 zxid1 小于 zxid2,那么 zxid1 在 zxid2 之前发生。 |
| ctime | znode 被创建的毫秒数(从 1970 年开始) |
| mzxid | znode 最后更新的事务 zxid |
| mtime | znode 最后修改的毫秒数(从 1970 年开始) |
| pZxid | znode 最后更新的子节点 zxid |
| cversion | znode 子节点变化号,znode 子节点修改次数 |
| dataversion | znode 数据变化号 |
| aclVersion | znode 访问控制列表的变化号 |
| ephemeralOwner | 如果是临时节点,这个是 znode 拥有者的 session id。如果不是临时节点则是 0 |
| dataLength | znode 的数据长度 |
| numChildren | znode 子节点数量 |
zookeeper节点主要可以分别为:
- 持久有序号:客户端与zookeeper断开后该节点任然存在,zookeeper给节点顺序编号。
- 持久无序号:定义与上面类似
- 短暂有序号:定义与上面类似
- 短暂无序号:定义与上面类似
(1)节点创建
分别对应的创建方法:
- 持久有序号:create -s /sanguo "diaochan"
- 持久无序号:create /sanguo "diaochan"
- 短暂有序号:create -s -e /sanguo "diaochan"
- 短暂无序号:create -e /sanguo "diaochan"
注意创建节点时要进行赋值,创建有序号的需要加-s,创建短暂的需要加-e
(2)节点值的获取与修改
- 节点值获取:get -s /sanguo感觉get -s与ls -s差不多。
- 节点值修改:set /sanguo "liubei"
(2)节点的删除与查看
- 节点删除:delete /sanguo
- 递归删除:deleteall /sanguo
- 查看状态:stat /sanguo
客户端注册监听它关心的目录节点,当目录节点发生变化(数据改变 节点删除子目录节点增加删除)时,ZooKeeper 会通知客户端 监听机制保证ZooKeeper 保存的任何的数 据的任何改变都能快速的响应到监听了该节点的应用程序 .
大致可以分为两种:节点值变化监听与节点路径变化监听。
(1)节点值变化监听
(2)节点路径变化监听
通过get -w节点值进行监听节点值的变化
通过ls -w进行路径监听
前置工作
导包:
junit junit RELEASE org.apache.logging.log4j log4j-core 2.8.2 org.apache.zookeeper zookeeper 3.5.7
在资源文件夹下建立log4j.properties文件
log4j.rootLogger=INFO, stdout log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=%d%p[%c]-%m%n log4j.appender.logfile=org.apache.log4j.FileAppender log4j.appender.logfile.File=target/spring.log log4j.appender.logfile.layout=org.apache.log4j.PatternLayout log4j.appender.logfile.layout.ConversionPattern=%d %p [%c]- %m%n
建立一个类,并写初始化init()的内容
package www.hhh.cn;
import org.apache.zookeeper.*;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
public class zkClient {
String conecting = "hadoop100:2181,hadoop102:2181,hadoop103:2181";
int sessionTimeout = 2000;
ZooKeeper zkClient;
@Before
public void init() throws IOException {
zkClient = new ZooKeeper(conecting, sessionTimeout, new Watcher() {
//监听控制器
@Override
public void process(WatchedEvent watchedEvent) {
}
});
}
@Test
public void create() throws InterruptedException, KeeperException {
String nodeCreated = zkClient.create("/hhh","dataZkClient".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
}
(1)创建文件
短短一句话
public void create() throws InterruptedException, KeeperException {
String nodeCreated = zkClient.create("/hhh","dataZkClient".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
参数解释:
- 第一个参数是路径
- 第二个参数是数据,要求是字节类型,需要用getBytes()
- 第三个参数是权限,Ids.OPEN_ACL_UNSAFE允许所有人进行访问
- 第四个参数是创建节点的类型
节点类型对应着之前原理讲解的四个节点的类型。创建结束之后打开服务器使用下面的命令查看,果然有创建的节点。
(2)监听子节点
package www.hhh.cn;
import org.apache.zookeeper.*;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.List;
public class zkClient {
String conecting = "hadoop100:2181,hadoop102:2181,hadoop103:2181";
int sessionTimeout = 2000;
ZooKeeper zkClient;
@Before
public void init() throws IOException {
zkClient = new ZooKeeper(conecting, sessionTimeout, new Watcher() {
//监听控制器
@Override
public void process(WatchedEvent watchedEvent) {
System.out.println("----------watcher------------");
List children = null;
try {
children = zkClient.getChildren("/", true);
for(String child : children){
System.out.println(child);
}
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("---------------------------");
}
});
}
@Test
public void create() throws InterruptedException, KeeperException {
String nodeCreated = zkClient.create("/hhh","dataZkClient".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
@Test
public void getChildren() throws InterruptedException, KeeperException {
List children = zkClient.getChildren("/", true);
Thread.sleep(Long.MAX_VALUE);
}
}
首先看getChildren这个函数:获取子节点第一个是路径,第二个是监听函数,如果使用true,则会使用初始化函数中重写的监听函数
该怎么理解监听呢?注册一次生效一次,还需要在进行注册
希望通过延迟函数延迟程序的结束,继续监听Thread.sleep(Long.MAX_VALUE)就可以阻塞线程,然后将注册过程放在监听函数中就可以注册一次生效一次,之后在注册。如果目录下文件发生改变,就会重新调用这个watcher函数。
注意的几点是:该目录下新建子节点不会被监听到。
(3)判断节点是否存在
exits函数,返回的是状态信息,通过状态信息判断是否还在
第一个参数是路径
第二个参数是是否监听
@Test
public void exist() throws KeeperException, InterruptedException {
Stat stat = zkClient.exists("/manongyanjiuseng", false);
System.out.println(stat==null? "not exist " : "exist");
}
(4)客户端向服务端写数据流程
- 发送给leader的时候
通俗解释:客户端给服务器的leader发送写请求,写完数据后给手下发送写请求,手下写完发送给leader,超过半票以上都写了则发回给客户端。之后leader在给其他手下让他们写,写完在发数据给leader - 发送给follower的时候
通俗解释:客户端给手下发送写的请求,手下给leader发送写的请求,写完后,给手下发送写的请求,手下写完后给leader发送确认,超过半票,leader确认后,发给刻划断,之后leader在发送写请求给其他手下
- 服务器上线的时候其实就是服务器启动时去注册信息(创建的都是临时节点)
- 客户端获取到当前在线的服务器列表后
- 服务器节点下线后给集群管理
- 集群管理服务器节点的下件时间通知给客户端
- 客户端通过获取服务器列表重选选择服务器
先创建目录create /servers "servers"
4.3.2 服务器端代码 4.3.3 客户端代码 4.4 分布式锁案例


