栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

ZooKeeper分布式配置——看这篇就够了

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

ZooKeeper分布式配置——看这篇就够了

public void setCc(CountDownLatch cc) {

this.cc = cc;

}

@Override

public void process(WatchedEvent event) {

System.out.println(event.toString());

switch (event.getState()) {

case Unknown:

break;

case Disconnected:

break;

case NoSyncConnected:

break;

case SyncConnected:

System.out.println(“连接成功。。。。。”);

//连接成功后,执行countDown,此时便可以拿zk对象使用了

cc.countDown();

break;

case AuthFailed:

break;

case ConnectedReadOnly:

break;

case SaslAuthenticated:

break;

case Expired:

break;

case Closed:

break;

}

}

}

由于是异步进行操作的,我们创建一个ZooKeeper对象之后,如果不进行阻塞操作的话,有可能还没有连接完成就执行后续的操作,所以这里我们用 CountDownLatch进行阻塞操作,当监测连接成功后,进行 countDown放行,执行后续的ZK的动作。

当我们连接成功 ZooKeeper 之后,我们需要通过 exists判断是否存在节点,存在就进行 getData操作。这里我们创建一个 WatchCallBack因为exists和getData都需要一个callback,所以除了实现Watcher以外还需要实现节点状态:AsyncCallback.StatCallback 数据监听:AsyncCallback.DataCallback

import org.apache.zookeeper.AsyncCallback;

import org.apache.zookeeper.WatchedEvent;

import org.apache.zookeeper.Watcher;

import org.apache.zookeeper.ZooKeeper;

import org.apache.zookeeper.data.Stat;

import java.util.concurrent.CountDownLatch;

public class WatchCallBack implements Watcher, AsyncCallback.StatCallback, AsyncCallback.DataCallback {

ZooKeeper zk ;

MyConfig conf ;

CountDownLatch cc = new CountDownLatch(1);

public MyConfig getConf() {

return conf;

}

public void setConf(MyConfig conf) {

this.conf = conf;

}

public ZooKeeper getZk() {

return zk;

}

public void setZk(ZooKeeper zk) {

this.zk = zk;

}

public void aWait(){

//exists的异步实现版本

zk.exists(ZKConstants.ZK_NODE,this,this ,“exists watch”);

try {

cc.await();

} catch (InterruptedException e) {

e.printStackTrace();

}

}

@Override

public void processResult(int rc, String path, Object ctx, Stat stat) {

if(stat != null){

//getData的异步实现版本

zk.getData(ZKConstants.ZK_NODE,this,this,“status”);

}

}

@Override

public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {

if(data != null ){

String s = new String(data);

conf.setConf(s);

cc.countDown();

}

}

@Override

public void process(WatchedEvent event) {

switch (event.getType()) {

case None:

break;

case NodeCreated:

//当一个node被创建后,获取node

//getData中又会触发StatCallback的回调processResult

zk.getData(ZKConstants.ZK_NODE,this,this,“sdfs”);

break;

case NodeDeleted:

//节点删除

conf.setConf(“”);

//重新开启CountDownLatch

cc = new CountDownLatch(1);

break;

case NodeDataChanged:

//节点数据被改变了

//触发DataCallback的回调

zk.getData(ZKConstants.ZK_NODE,this,this,“sdfs”);

break;

//子节点发生变化的时候

case NodeChildrenChanged:

break;

}

}

}

当前面准备好了之后,我们可以编写测试用例了:

ZKUtils 工具类

import org.apache.zookeeper.ZooKeeper;

import java.util.concurrent.CountDownLatch;

public class ZKUtils {

private static ZooKeeper zk;

//192.168.5.130:2181/mxn 这个后面/mxn,表示客户端如果成功建立了到zk集群的连接,

// 那么默认该客户端工作的根path就是/mxn,如果不带/mxn,默认根path是/

//当然我们要保证/mxn这个节点在ZK上是存在的

private static String address =“192.18.5.129:2181,192.168.5.130:2181,192.168.5.130:2181/mxn”;

private static DefaultWatch watch = new DefaultWatch();

private static CountDownLatch init = new CountDownLatch(1);

public static ZooKeeper getZK(){

try {

//因为是异步的,所以要await,等到连接上zk集群之后再进行后续操作

zk = new ZooKeeper(address,1000,watch);

watch.setCc(init);

init.await();

} catch (Exception e) {

e.printStackTrace();

}

return zk;

}

}

测试类:

import org.apache.zookeeper.ZooKeeper;

import org.junit.Before;

import org.junit.Test;

public class TestConfig {

ZooKeeper zk;

@Before

public void conn(){

zk = ZKUtils.getZK();

}

public void close(){

try {

zk.close();

}catch (Exception e){

e.printStackTrace();

}

}

@Test

public void getConf(){

WatchCallBack watchCallBack = new WatchCallBack();

watchCallBack.setZk(zk);

MyConfig myConfig = new MyConfig();

watchCallBack.setConf(myConfig);

//阻塞等待

watchCallBack.aWait();

while(true){

if(myConfig.getConf().equals(“”)){

System.out.println(“zk node 节点丢失了 …”);

watchCallBack.aWait();

}else{

System.out.println(myConfig.getConf());

}

//

try {

//每隔500毫秒打印一次

Thread.sleep(500);

} catch (InterruptedException e) {

e.printStackTrace();

}

}

}

[](()运行测试


首先我们要知道,因为我们连接IP的时候加上了 /mxn这个目录结构,所以我们在服务器初始状态就必须要有这个节点:

集群初始状态:

[zk: localhost:2181(CONNECTED) 7] ls /

[mxn, zookeeper]

我们启动程序看看

连接成功

ZooKeeper 下 /mxn 现在也是空

[zk: localhost:2181(CONNECTED) 9] ls /mxn

[]

[zk: localhost:2181(CONNECTED) 10]

现在我们来创建一个/mxn/myZNode节点数据

[zk: localhost:2181(CONNECTED) 10] create /mxn/myZNode “muxiaonong666”

Created /mxn/myZNode

可以看到,创建完成之后,程序马上给出响应,打印出了我配置的值,muxiaonong666

此时,再设置/mxn/myZNode的值为 muxiaonong6969

啪,很快啊!!!我们就可以看到值瞬间改变了

![在这里插入图片描述](https://img-blog.csdnimg.cn/91089266962b42a289a5eb2994b1f5 《一线大厂Java面试题解析+后端开发学习笔记+最新架构讲解视频+实战项目源码讲义》无偿开源 威信搜索公众号【编程进阶路】 ea.png?x-oss-process=image/watermark,type_ZHJvaWRzYW5zZmFsbGJhY2s,shadow_50,text_Q1NETiBA54mn5bCP5Yac,size_20,color_FFFFFF,t_70,g_se,x_16)

这个时候我们如果删除/mxn/myZNode节点,会发生什么呢,前面我们已经写了watch,如果Znode被删除了,,watch and callback执行

case NodeDeleted:

//节点删除

conf.setConf(“”);

//重新开启CountDownLatch

cc = new CountDownLatch(1);

break;

if(myConfig.getConf().equals(“”)){

System.out.println(“zk node 节点丢失了 …”);

此时应该阻塞住,等待着node重新创建

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/863298.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号