Zookeeper学习
1.zookeeper概述
#定义
#zk是Apache Hadoop项目的一个子项目,用来管理使用;是一个分布式的开源的分布式应用程序协调服务
#功能
#1.分布式锁
#2.配置管理:作为配置中心
#3.集群管理:作为注册中心
#安装,自行搜索
2.zookeeper操作
#数据模型
1.树形目录服务,和Unix的文件系统很类似,拥有层次化结构
2.每个节点被称为ZNode,保存 数据 和 节点信息
3.节点可以拥有子节点,同时允许少量数据存储(1MB)
4.节点可以分为四大类:
PERSISTENT 持久化节点 #服务器关了也不会消失,需要手动删除
EPHEMERAL 临时节点 #创建时加-e
PERSISTENT_SEQUENTIAL 持久化顺序节点 #创建时加-s
EPHEMERAL_SEQUENTIAL 临时顺序节点 #创建时加-es
#常用命令
#Server端启动/查看/停止/重启:
./zkServer.sh start/status/stop/restart
#Client端
./zkCli.sh -server ip:端口(2181) #连接,默认本机不用后面参数
help #帮助
quit #退出
ls / #查看根节点 (默认dubbo,zookeeper两个节点)
ls /dubbo #查看指定节点下的子节点
create (-e/-s/-es) /name data #创建一个name节点,数据为data,data可为空,临时节点关闭当前会话,节点消失,创建顺序节点时,会所有节点名称后面会有编号,编号为全局唯一自增
get /node
set /node data
delete /node #获取/赋值/删除 ,节点不可重复
deleteall /node #节点有子节点时仍可删除 delete不可以
ls -s /nodepath #显示节点的详细信息
#JavaAPI操作:客户端有原生jJavaApi和ZkClient和Curator,前两者太复杂
#Curator: 官网http://curator.apache.org/ 可以查看和zk的版本兼容问题
#引入jar
org.apache.curator
curator-framework
4.0.0
org.apache.curator
curator-recipes
4.0.0
#常用操作:(详见下面代码)
1.建立连接 2.添加节点 3.删除节点 4.修改节点 5.查询节点 6.Watch事件监听 7.分布式锁的实现
3.
分布式锁
#概念:应用是分布式集群时,属于多JVM的工作环境,跨JVM之间已经无法通过多线程的锁解决同步问题,此时就需要一种解决跨机器的进程间数据同步问题
#zk分布式锁原理
#核心:当客户端要获取锁的时候创建节点,使用完之后删除该节点
1.客户端获取锁的时,在lock节点下创建**临时顺序**节点
2.然后获取lock下面的所有子节点,客户端获取所有的子节点之后,如果发现自己创建的子节点最小,则认为客户端获取到了锁,使用完之后,删除节点
3.如果发现自己创建的节点并非lock所有的子节点最小的,说明没有获取到锁,此时客户端需要找到比自己小的子节点,同时对其注册事件监听器,监听删除事件
4.如果发现比自己小的那个子节点被删除,则客户端的Watcher会收到相应的通知,此时在判断自己创建的节点是否为序号最小的,如果是,则获取锁,如果不是重复以上步骤。
//分布式锁代码demo
//五种锁方案:
InterProcessSemaphoreMutex//分布式排他锁
InterProcessMutex //分布式可重入排他锁
InterProcessReadWriteLock//分布式读写锁
InterProcessMultiLock //将多个锁作为单个实体管理容器,多个锁当成一个锁使用
InterProcessSemaphoreV2 //共享信号量
//client见下面两种方式
private InterProcessMutex lock = new InterProcessMutex(client,"/lmy");
lock.acquire(1,TimeUnit.SECONDS);//获取锁
//需要锁的业务
lock.release();//释放锁
4.集群
#介绍
#Leader选举:一个Leader多个follower
1.Serverid(服务器ID)编号越大,在算法中权重越大;
2.Zxid(数据ID)服务器中存放的最大数据ID值越大越新,权重越大
3.在Leader选举中,如果某台zk获得了超半数选票,则这个就成为Leader(顺序启动时,Leader为n/2+1位)
#搭建:
1.JDK环境和Zk压缩包
2.创建data目录,把conf下的zoo_sample.cfg改成zoo.cfg
3.配置集群:在每个data目录创建myid文件#echo 1 > ..../data/myid
4.修改配置文件,配置dataDir和clientPort和集群服务器列表
#server.1= 127.0.0.1:2881:3881 .....
#server.服务器ID=服务器IP:服务器通信端口:服务器间投票选举端口
5.启动各个zk服务 ./zkServer.sh start
#./zkServer.sh status 查看集群状态
#注意:集群多于半数宕机,整个集群会宕机;多于半数之后,其他节点也会复活
#zk集群角色:
1.leader角色: 处理事务请求和调度集群内部各个服务器
2.follower角色:处理非事务请求,转发事务请求给leader并参与leader选举投票
3.observer角色:处理客户端非事务请求,转发事务请求给leader
#zk集群多少台zk合适,台数多可靠性高但通信延时大
#经验:10个服务器: 3个; 20-->5;100-->11;200-->11
#=============配置文件详解==============
tickTime = 2000; #zk与客户端或其他服务端通信心跳时间,ms
initLimit =10; #Leader和Follower初始通信的最多心跳数
syncLimit = 5; #LF的同步时限,超过syncLimit*tickTime,认为follower宕机,移除它
dataDir: /tmp #zk的数据
clientPort=2181 #暴露的端口
5.算法问题
#zk怎么保证数据一致性 ZAB算法
1.Paxos算法:基于消息传递且具有高度容错的一致性算法(多个广播者或理解为leader)
2.ZAB算法:基于Paxos算法,但是只有一个leader
#CAP理论:一致性、可用性、分区容错性
1.zk保证的是CP即一致性和分区容错性。
2.进行leader选举时集群不可用。
ps:Curator代码demo
public class testZk {
private Curatorframework client;
@Before //在执行@Test执行前先执行
public void testConnect() {
//第一种方式
Curatorframework client1 = CuratorframeworkFactory.newClient("127.0.0.1:2181", 3000, 3000
, new ExponentialBackoffRetry(3000, 10));
client1.start();//开启连接
//第二种方式
client = CuratorframeworkFactory.builder().connectString("127.0.0.1:2181")
.sessionTimeoutMs(3000)
.connectionTimeoutMs(3000)
.retryPolicy(new ExponentialBackoffRetry(3000, 10))
.namespace("lmy")//命名空间,默认为lmy的根目录,没有子节点时,这个根节点也会消失
.build();
client.start();
}
@Test
public void testCreate() throws Exception {
//操作的节点必须以/开头
//默认持久化节点
client.create().forPath("/lmy");//默认数据为当前客户端的ip作为数据存储
client.create().forPath("/lmy", "hhh".getBytes());
//设置节点类型 临时节点。方法结束之前会存在,否则,临时节点会消失
client.create().withMode(CreateMode.EPHEMERAL).forPath("/lmy");
//创建多级节点:命令不可以创建多级节点
client.create().creatingParentsIfNeeded().forPath("/lmy");
//======================
//查询数据
byte[] bytes = client.getData().forPath("/lmy");
System.out.println(new String(bytes));
//查询子节点
List childNodes = client.getChildren().forPath("/lmy");
//指定命名空间,默认查询命名空间的节点
List cns = client.getChildren().forPath("/");
//查询节点状态信息
Stat status = new Stat();
byte[] info = client.getData().storingStatIn(status).forPath("/lmy");
System.out.println(status);
//================================
//修改数据 根据version版本修改
int version = status.getVersion();
client.setData().withVersion(version).forPath("/lmy", "hello".getBytes());
//================================
//删除节点
client.delete().forPath("/lmy");//删除单个
client.delete().deletingChildrenIfNeeded().forPath("/lmy");//删除多个
client.delete().guaranteed().forPath("/lmy");//确认必须删除成功
client.delete().guaranteed().inBackground(new BackgroundCallback() {
public void processResult(Curatorframework curatorframework, CuratorEvent curatorEvent) throws Exception {
System.out.println("删除成功后回调函数");
}
}).forPath("/lmy");
//==================================
//Watch事件监听:zk允许在指定节点上注册一些Watcher,并在特定事件发生后通知客户端
//Curator引入了Cache来实现对zk服务的监听、
//监听某一个节点:创建NodeCache对象: client 节点 是否压缩
final NodeCache nodeCache = new NodeCache(client, "/lmy", true);
//注册监听
nodeCache.getListenable().addListener(new NodeCacheListener() {
public void nodeChanged() throws Exception {
System.out.println("节点变化了");
//获取修改节点后数据
byte[] data = nodeCache.getCurrentData().getData();
System.out.println(new String(data));
}
});
//Lambda写法
nodeCache.getListenable().addListener(() -> System.out.println("节点变化了"));
//开启监听:如果为true 则开启监听时加载缓冲数据
nodeCache.start(true);
//监听某节点所有子节点 cachedata:是否缓存状态信息
final PathChildrenCache pathChildrenCache = new PathChildrenCache(client, "/lmy", true);
pathChildrenCache.getListenable().addListener(
new PathChildrenCacheListener() {
@Override
public void childEvent(Curatorframework client, PathChildrenCacheEvent event) throws Exception {
System.out.println("子节点变化了");
System.out.println(event);
PathChildrenCacheEvent.Type type = event.getType();
//判断类型是否为update
if(type.equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)){
byte[] data = event.getData().getData();
System.out.println(new String(data));//变更后数据
}
}
}
);
pathChildrenCache.start();
//监听所有节点
TreeCache treeCache = new TreeCache(client,"/lmy");
treeCache.getListenable().addListener(new TreeCacheListener() {
@Override
public void childEvent(Curatorframework client, TreeCacheEvent event) throws Exception {
System.out.println("节点变化了");
System.out.println(event);
}
});
}
@After
public void close() {
if (client != null) {
client.close();
}
}
}