ZooKeeper 是 Apache Hadoop 项目下的一个子项目,是一个树形目录服务。
ZooKeeper 翻译过来就是 动物园管理员,他是用来管 Hadoop(大象)、Hive(蜜蜂)的管理员。
ZooKeeper 是一个分布式的、开源的分布式应用程序的协调服务。
ZooKeeper 提供的主要功能包括:
配置管理
分布式锁
注册中心
安装步骤
# 先确保jdk环境已经安装 cd /usr/local # 把压缩包上传到linux系统,解压 tar -zxvf apache-zookeeper-3.6.2-bin.tar.gz # 进入apache-zookeeper-3.6.2-bin目录 cd apache-zookeeper-3.6.2-bin
# 进入conf目录 cd conf # 把zoo_sample.cfg 改名为zoo.cfg mv zoo_sample.cfg zoo.cfg # 打开zoo.cfg文件, 修改dataDir属性 vi zoo.cfg
# 设置zoo.cfg放置数据的目录 dataDir=/usr/local/apache-zookeeper-3.6.2-bin/data # 保存退出
# 进入bin目录下 cd /usr/local/apache-zookeeper-3.6.2-bin/bin # 启动服务命令 ./zkServer.sh start
./zkServer.sh stop 停止服务命令 ./zkServer.sh status 查看服务状态03. ZooKeeper 命令操作(了解) 3.1 数据模型
ZooKeeper 是一个树形目录服务,其数据模型和Unix的文件系统目录树很类似,拥有一个层次化结构。
Unix的文件系统目录:
zookeeper目录结构:
这里面的每一个节点都被称为: ZNode,每个节点上都会保存自己的数据和节点信息。
节点可以拥有子节点,同时也允许少量(1MB)数据存储在该节点之下。
节点可以分为四大类:
PERSISTENT 持久化节点EPHEMERAL 临时节点 :-ePERSISTENT_SEQUENTIAL 持久化顺序节点 :-sEPHEMERAL_SEQUENTIAL 临时顺序节点 :-es 3.2 服务端常用命令
我们可以通过zookeeper的客户端工具或者zookeeper的Api连接服务端:
首先来看zookeeper服务端命令,zookeeper服务端命令比较简单,如下:
# 启动 ZooKeeper 服务 ./zkServer.sh start # 查看 ZooKeeper 服务状态 ./zkServer.sh status # 停止 ZooKeeper 服务 ./zkServer.sh stop # 重启 ZooKeeper 服务 ./zkServer.sh restart3.3 客户端常用命令 3.3.1 连接和退出
# 连接zookeeper服务端, 可以不指定服务端地址,默认连接localhost:2181 ./zkCli.sh # 连接指定服务端地址 ./zkCli.sh –server localhost:2181 # 断开连接 quit3.3.2 查看目录
# 显示指定目录下节点 /代表根目录 ls /
说明:若存在dubbo节点说明用过它作为注册中心创建的信息
# 查看dubbo节点下的信息 ls /zookeeper3.3.3 创建节点
# 在根节点下创建app1节点, 并设置值为itheima create /app1 itheima
# 在根节点下创建app2节点, 且不设置值 create /app2 ""
说明:这是3.4版本的bug,创建节点必须给值,3.5后修复了,即不需要""
# 创建子节点 create /app2/app2-2 ""3.3.4 获取节点值
# 获取/app1节点的值 get /app13.3.5 设置节点值
# 相当于是修改/app1节点的值为itheima2 set /app1 itheima23.3.6 删除节点
# 删除单个节点,如果下面有子节点就删除不了 delete /app1 # 删除带有子节点的节点,有些版本是deleteall rmr /app23.3.7 临时节点和顺序节点
# 创建临时节点 /app1,如果断开连接,该节点就没了 create -e /app1 111
# 创建顺序节点, 会在app1后面加一堆的数字,方便进行排序 create -s /app1 111 create -s /app1 111 create -s /app1 1113.3.7 查看节点详细信息
# 先创建一个节点 create /app2 111 # 查看节点信息,再后面一些高版本主键废弃,使用ls -s /app2 ls2 /app2
说明:
czxid:节点被创建的事务IDctime: 创建时间mzxid: 最后一次被更新的事务IDmtime: 修改时间pzxid:子节点列表最后一次被更新的事务IDcversion:子节点的版本号dataversion:数据版本号aclversion:权限版本号ephemeralOwner:用于临时节点,代表临时节点的事务ID,如果为持久节点则为0dataLength:节点存储的数据的长度numChildren:当前节点的子节点个数 04. ZooKeeper JavaAPI操作(了解) 4.1 Curator 介绍
Curator 是Apache ZooKeeper 的Java客户端库,可以更容易的使用ZooKeeper,包含几个包:
Curator client:用来替代ZooKeeper提供的类,它封装了底层的管理并提供了一些有用的工具。Curator framework:提供了高级的API来简化ZooKeeper的使用,增加了很多基于ZooKeeper的特性,帮助管理ZooKeeper的连接以及重试操作。Curator Recipes:封装了一些高级特性,如:Cache事件监听、选举、分布式锁、分布式计数器、分布式Barrier等Curator Test:提供了基于ZooKeeper的单元测试工具。
Curator 项目的目标是简化 ZooKeeper 客户端的使用。常见的ZooKeeper Java API
zookeeperZkClientCurator Curator 最初是 Netfix 研发的,后来捐献了 Apache 基金会,目前是 Apache 的顶级项目。官网:http://curator.apache.org/ 4.2 Curator API 常用操作 4.2.1 建立连接
1)创建maven工程:curator-demo
2)添加依赖
4.0.0 cn.itcast curator-demo 1.0-SNAPSHOT org.apache.curator curator-recipes 4.2.0 org.apache.zookeeper zookeeper 3.4.6 junit junit 4.13 test
说明:若安装的是zookeeper3.5以上的版本。必须使用Curator4.0以上版本
3)编写测试类
package cn.itcast;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.Curatorframework;
import org.apache.curator.framework.CuratorframeworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.junit.Test;
public class CuratorTest {
@Test
public void testConnection(){
// 1.创建重试策略对象,指定重试策略,参数1-重试间隔时间(单位:毫秒),参数2-重试次数
RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 2);
// 2.创建连接客户端对象
Curatorframework client = CuratorframeworkFactory.builder()
.connectString("192.168.211.130:2181")
.retryPolicy(retryPolicy)
.namespace("itheima")
.build();
// 3.启动,建立连接
client.start();
}
}
4.2.2 添加节点
基本创建创建节点,有数据 (默认持久)设置节点类型:临时节点创建多级节点
package cn.itcast;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.Curatorframework;
import org.apache.curator.framework.CuratorframeworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class CuratorTest {
private Curatorframework client;
@Before
public void testConnection(){
// 1.创建重试策略对象,指定重试策略,参数1-重试间隔时间(单位:毫秒),参数2-重试次数
RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 2);
// 2.创建连接客户端对象
client = CuratorframeworkFactory.builder()
.connectString("192.168.211.130:2181")
.retryPolicy(retryPolicy)
.namespace("itheima") // 指定根目录为itheima
.build();
// 3.启动,建立连接
client.start();
}
@Test
public void testCreate1() throws Exception{
// 创建节点时不设置数据,默认将客户端所在服务器的ip地址作为数据。
String path = client.create().forPath("/app1");
System.out.println(path);
}
@Test
public void testCreate2() throws Exception{
String path = client.create().forPath("/app2", "hehe".getBytes());
System.out.println(path);
}
@Test
public void testCreate3() throws Exception{
// PERSISTENT 持久化节点
// PERSISTENT_SEQUENTIAL 持久化顺序节点
// EPHEMERAL 临时节点
// EPHEMERAL_SEQUENTIAL 临时数据节点
String path = client.create().withMode(CreateMode.EPHEMERAL).forPath("/app3");
System.out.println(path);
}
@Test
public void testCreate4() throws Exception{
// creatingParentsIfNeeded 如果父节点不存在则创建父节点
String path = client.create().creatingParentsIfNeeded().forPath("/app4/app4-1");
System.out.println(path);
}
@After
public void close() {
if (client != null) {
client.close();
}
}
}
4.2.3 查询节点
查询节点数据:get /itheima/app2
查询目录节点:ls /itheima/app4
查询节点状态信息:ls -s
@Test
public void testGet1() throws Exception{
// 相当于get /itheima/app2
byte[] bytes = client.getData().forPath("/app2");
System.out.println("结果 = " + bytes.toString());
}
@Test
public void testGet2() throws Exception{
// 相当于ls /itheima/app4,返回结果是所有子节点的名称
List children = client.getChildren().forPath("/app4");
for (String child : children) {
System.out.println(child);
}
}
@Test
public void testGet3() throws Exception{
Stat status = new Stat();
// 将 ls -s /app1 的结果数据放到status中
client.getData().storingStatIn(status).forPath("/app1");
// 打印节点创建时间
System.out.println(status.getCtime());
// 打印子节点个数
System.out.println(status.getNumChildren());
}
4.2.4 修改节点
@Test
public void testSetData() throws Exception{
client.setData().forPath("/app1", "goodBoy".getBytes());
}
@Test
public void testSetData2() throws Exception{
Stat status = new Stat();
// 将 ls -s /app1 的结果数据放到status中
client.getData().storingStatIn(status).forPath("/app1");
// 获取数据当前的版本号
int version = status.getVersion();
client.setData().withVersion(version).forPath("/app1", "goodBoy".getBytes());
}
4.2.5 删除节点
@Test
public void testDelete() throws Exception {
client.delete().forPath("/app1");
}
@Test
public void testDeleteAll() throws Exception {
client.delete().deletingChildrenIfNeeded().forPath("/app2");
}
05. Curator 事件监听
5.1 概述
ZooKeeper 允许客户端在指定节点上注册一些Watcher,并且在一些特定事件触发的时候,ZooKeeper 服务端会将事件通知到感兴趣的客户端上去,该机制是 ZooKeeper 实现分布式协调服务的重要特性。
ZooKeeper 中引入了Watcher机制来实现了发布/订阅功能能,能够让多个订阅者同时监听某一个对象,当一个对象自身状态变化时,会通知所有订阅者。
ZooKeeper 原生支持通过注册Watcher来进行事件监听,但是其使用并不是特别方便需要开发人员自己反复注册Watcher,比较繁琐。
Curator引入了 Cache 来实现对 ZooKeeper 服务端事件的监听。
ZooKeeper提供了三种Watcher:
NodeCache : 只是监听某一个特定的节点PathChildrenCache : 监控一个ZNode的子节点.TreeCache : 可以监控整个树上的所有节点,类似于PathChildrenCache和NodeCache的组合 5.2 NodeCache
NodeCache:给某一个节点注册事件监听器(本节点增删改)
@Test
public void NodeCache() throws Exception{
// 1. 创建监听器对象
final NodeCache nodeCache = new NodeCache(client, "/app1");
// 2. 绑定监听器
nodeCache.getListenable().addListener(new NodeCacheListener() {
// 当/app1节点发生改变,就会回调nodeChanged
public void nodeChanged() throws Exception {
System.out.println("节点改变了...");
// 获取改变后的数据
ChildData currentData = nodeCache.getCurrentData();
byte[] data = currentData.getData();
// 打印数据
System.out.println(new String(data));
}
});
// 3. 开启监听,如果参数为true开启监听时,如果设置为true,那么NodeCache在第一次启动的时候就会立刻在Zookeeper上读 // 取对应节点的数据内容,并保存在Cache中。
nodeCache.start(true);
// 不让线程结束
while(true){
}
}
5.3 PathChildrenCache
PathChildrenCache:监控某一个节点的子节点的变化(子节点增删改了)
@Test
public void pathChildrenCache() throws Exception {
// 1. 创建监听器对象,参数3-是否加载缓存数据
PathChildrenCache pathChildrenCache = new PathChildrenCache(client, "/app2", true);
// 2. 绑定监听器
pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
@Override
public void childEvent(Curatorframework client, PathChildrenCacheEvent event)
throws Exception {
// event={type=CHILD_ADDED, data=ChildData{path='/app2/p2', stat=..,data=null}}
// event={type=CHILD_UPDATED, data=ChildData{path='/app2/p2', stat=..,data=null}}
// event={type=CHILD_REMOVED, data=ChildData{path='/app2/p2', stat=..,data=null}}
System.out.println("子节点变化了," + event);
// 获取type类型
PathChildrenCacheEvent.Type type = event.getType();
if (type.equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)){
byte[] data = event.getData().getData();
System.out.println("获取变化后数据:" + new String(data));
}
}
});
// 3. 开启监听
pathChildrenCache.start();
while (true){
}
}
5.4 TreeCache
TreeCache : 可以监控整个树上的所有节点,相当于NodeCache+PathChildrenCache组合
@Test
public void treeCache() throws Exception {
// 1. 创建监听器对象
TreeCache treeCache = new TreeCache(client, "/app3");
// 2. 绑定监听器
treeCache.getListenable().addListener(new TreeCacheListener() {
@Override
public void childEvent(Curatorframework client, TreeCacheEvent event) throws Exception {
System.out.println("子节点变化了," + event);
// 获取type类型
TreeCacheEvent.Type type = event.getType();
// 监听修改节点的事件,获取修改后的值(不管自己还是儿子发生变化都监控到)
if (type.equals(TreeCacheEvent.Type.NODE_UPDATED)){
byte[] data = event.getData().getData();
System.out.println("获取变化后数据:" + new String(data));
}
}
});
// 3. 开启监听
treeCache.start();
while (true){
}
}
06. Zookeeper 分布式锁
6.1 概念
在我们进行单机应用开发,涉及并发同步的时候,我们往往采用synchronized或者Lock的方式来解决多线程间的代码同步问题,这时多线程的运行都是在同一个JVM之下,没有任何问题。
但当我们的应用是分布式集群工作的情况下,属于多JVM下的工作环境,跨JVM之间已经无法通过多线程的锁解决同步问题。
那么就需要一种更加高级的锁机制,来处理种跨机器的进程之间的数据同步问题——这就是分布式锁。
如图:
核心思想:当客户端要获取锁,则创建节点,使用完锁,则删除该节点。
- 客户端准备获取锁时,在lock节点下创建临时顺序节点。然后获取lock下面的所有子节点,如果发现自己创建的子节点序号最小,那么就认为该客户端获取到了锁。使用完锁后,将该节点删除。如果发现自己创建的节点并非lock所有子节点中最小的,说明自己还没有获取到锁,此时客户端需要找到比自己小的那个节点,同时对其注册事件监听器,监听删除事件。如果发现比自己小的那个节点被删除,则客户端的Watcher会收到相应通知,此时再次判断自己创建的节点是否是lock子节点中序号最小的,如果是则获取到了锁,如果不是则重复以上步骤继续获取到比自己小的一个节点并注册监听。
如图:
在Curator中有五种锁方案:
InterProcessSemaphoreMutex:分布式排它锁(非可重入锁)InterProcessMutex:分布式可重入排它锁InterProcessReadWriteLock:分布式读写锁(读读共享,读写互斥,写写互斥)InterProcessMultiLock:对多个对象加锁InterProcessSemaphoreV2:共享信号量 6.3.1 演示超卖
第一步:模拟售票
package cn.itcast;
// 真实项目中,就是一个控制器
public class Ticket12306 {
private int ticket = 10; // 数据库的票数,实际项目中存在数据库中
public void sellTicket() {
if (ticket > 0) {
// 打印当前线程的名,以及票数
System.out.println(Thread.currentThread().getName() + ":" + ticket);
ticket--;
}
}
}
第二步:模拟抢票
package cn.itcast;
public class LockMain {
public static void main(String[] args) {
final Ticket12306 ticket = new Ticket12306();
// 模拟同时20个线程进行抢票
for (int i = 0; i < 20; i++) {
Thread t = new Thread(new Runnable() {
public void run() {
// 买票
ticket.sellTicket();
}
}, "线程" + i);
t.start();
}
// 死循环不让主线程停止,否则所有的子线程都会停止
while (true) {
}
}
}
第三步:运行LockMain,开始抢票, 查看日志出现超卖现象
线程0:10 线程1:10 线程2:8 线程4:7 线程5:6 线程7:6 线程3:6 线程8:4 线程6:5 线程9:2 线程10:16.3.2 分布式锁
package cn.itcast;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.Curatorframework;
import org.apache.curator.framework.CuratorframeworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;
import java.util.concurrent.TimeUnit;
public class Ticket12306 {
private int ticket = 10; // 数据库的票数
private Curatorframework client;
private InterProcessMutex lock;
// 构造函数中初始化客户端对象以及创建连接
public Ticket12306() {
// 1.创建重试策略对象,指定重试策略,参数1-重试间隔时间(单位:毫秒),参数2-重试次数
RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 2);
// 2.创建连接客户端对象
client = CuratorframeworkFactory.builder()
.connectString("192.168.211.130:2181")
.retryPolicy(retryPolicy)
.build();
// 3.启动,建立连接
client.start();
// 4.创建锁对象
lock = new InterProcessMutex(client, "/lock");
}
public void sellTicket(){
try {
// 尝试获取锁,10s获取不到就放弃,拿到锁就执行下面的代码,没拿到就阻塞在这个地方,并注册监听,监听删除事件
lock.acquire(10, TimeUnit.SECONDS);
if (ticket > 0) {
System.out.println(Thread.currentThread().getName() + ":" + ticket);
ticket--;
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
if(lock!=null) {
// 释放锁
lock.release();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
运行结果:
线程5:10 线程6:9 线程12:8 线程0:7 线程9:6 线程14:5 线程19:4 线程8:3 线程7:2 线程10:107. ZooKeeper 集群 7.1 集群介绍 1)选举 2)角色介绍 7.2 集群搭建(了解)
1)复制三份文件夹,分别为zookeeper-1、zookeeper-2、zookeeper-3
# 进入文件夹 cd /usr/local # 创建文件夹 zk-cluster mkdir zk-cluster # 删除旧数据,以免影响 cd apache-zookeeper-3.6.2-bin rm -rf data # 再回到usr/local目录下 cd /usr/local # 复制三份文件夹、分别为zookeeper-1、zookeeper-2、zookeeper-3 cp -r apache-zookeeper-3.6.2-bin zk-cluster/zookeeper-1 cp -r apache-zookeeper-3.6.2-bin zk-cluster/zookeeper-2 cp -r apache-zookeeper-3.6.2-bin zk-cluster/zookeeper-3
2)修改每个文件夹中的zoo.cfg配置文件
zookeeper-1配置:
vi /usr/local/zk-cluster/zookeeper-1/conf/zoo.cfg
# 设置数据目录 dataDir=/usr/local/zk-cluster/zookeeper-1/data # 设置端口(同一台服务器,端口不能冲突,实际项目中最好部署在多台服务器上) clientPort=2181
zookeeper-2配置:
vi /usr/local/zk-cluster/zookeeper-2/conf/zoo.cfg
# 设置数据目录 dataDir=/usr/local/zk-cluster/zookeeper-2/data # 设置端口(同一台服务器,端口不能冲突,实际项目中最好部署在多台服务器上) clientPort=2182
zookeeper-3配置:
vi /usr/local/zk-cluster/zookeeper-3/conf/zoo.cfg
# 设置数据目录 dataDir=/usr/local/zk-cluster/zookeeper-3/data # 设置端口(同一台服务器,端口不能冲突,实际项目中最好部署在多台服务器上) clientPort=2183
3)在每个服务节点中,创建data目录,并创建一个myid文件,这个文件记录自己serverId
# 写入一个内容到对应的文件中,文件不存在会自动创建 echo 1 > /usr/local/zk-cluster/zookeeper-1/data/myid echo 2 > /usr/local/zk-cluster/zookeeper-2/data/myid echo 3 > /usr/local/zk-cluster/zookeeper-3/data/myid
4)在每个服务节点的zoo.cfg文件中,配置所有服务节点的地址信息列表
vi /usr/local/zk-cluster/zookeeper-1/conf/zoo.cfg vi /usr/local/zk-cluster/zookeeper-2/conf/zoo.cfg vi /usr/local/zk-cluster/zookeeper-3/conf/zoo.cfg
每个配置文件中都添加内容如下:
server.1=192.168.211.130:2881:3881 server.2=192.168.211.130:2882:3882 server.3=192.168.211.130:2883:3883
5)分别启动每个节点
# 启动之前把之前单机的2181服务停掉,以免端口冲突 /usr/local/apache-zookeeper-3.6.2-bin/bin/zkServer.sh stop
/usr/local/zk-cluster/zookeeper-1/bin/zkServer.sh start /usr/local/zk-cluster/zookeeper-2/bin/zkServer.sh start /usr/local/zk-cluster/zookeeper-3/bin/zkServer.sh start
6)查看状态
/usr/local/zk-cluster/zookeeper-1/bin/zkServer.sh status
/usr/local/zk-cluster/zookeeper-2/bin/zkServer.sh status
/usr/local/zk-cluster/zookeeper-3/bin/zkServer.sh status



