public void setCc(CountDownLatch cc) {
this.cc = cc;
}
@Override
public void process(WatchedEvent event) {
System.out.println(event.toString());
switch (event.getState()) {
case Unknown:
break;
case Disconnected:
break;
case NoSyncConnected:
break;
case SyncConnected:
System.out.println(“连接成功。。。。。”);
//连接成功后,执行countDown,此时便可以拿zk对象使用了
cc.countDown();
break;
case AuthFailed:
break;
case ConnectedReadOnly:
break;
case SaslAuthenticated:
break;
case Expired:
break;
case Closed:
break;
}
}
}
由于是异步进行操作的,我们创建一个ZooKeeper对象之后,如果不进行阻塞操作的话,有可能还没有连接完成就执行后续的操作,所以这里我们用 CountDownLatch进行阻塞操作,当监测连接成功后,进行 countDown放行,执行后续的ZK的动作。
当我们连接成功 ZooKeeper 之后,我们需要通过 exists判断是否存在节点,存在就进行 getData操作。这里我们创建一个 WatchCallBack因为exists和getData都需要一个callback,所以除了实现Watcher以外还需要实现节点状态:AsyncCallback.StatCallback 数据监听:AsyncCallback.DataCallback
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import java.util.concurrent.CountDownLatch;
public class WatchCallBack implements Watcher, AsyncCallback.StatCallback, AsyncCallback.DataCallback {
ZooKeeper zk ;
MyConfig conf ;
CountDownLatch cc = new CountDownLatch(1);
public MyConfig getConf() {
return conf;
}
public void setConf(MyConfig conf) {
this.conf = conf;
}
public ZooKeeper getZk() {
return zk;
}
public void setZk(ZooKeeper zk) {
this.zk = zk;
}
public void aWait(){
//exists的异步实现版本
zk.exists(ZKConstants.ZK_NODE,this,this ,“exists watch”);
try {
cc.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public void processResult(int rc, String path, Object ctx, Stat stat) {
if(stat != null){
//getData的异步实现版本
zk.getData(ZKConstants.ZK_NODE,this,this,“status”);
}
}
@Override
public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
if(data != null ){
String s = new String(data);
conf.setConf(s);
cc.countDown();
}
}
@Override
public void process(WatchedEvent event) {
switch (event.getType()) {
case None:
break;
case NodeCreated:
//当一个node被创建后,获取node
//getData中又会触发StatCallback的回调processResult
zk.getData(ZKConstants.ZK_NODE,this,this,“sdfs”);
break;
case NodeDeleted:
//节点删除
conf.setConf(“”);
//重新开启CountDownLatch
cc = new CountDownLatch(1);
break;
case NodeDataChanged:
//节点数据被改变了
//触发DataCallback的回调
zk.getData(ZKConstants.ZK_NODE,this,this,“sdfs”);
break;
//子节点发生变化的时候
case NodeChildrenChanged:
break;
}
}
}
当前面准备好了之后,我们可以编写测试用例了:
ZKUtils 工具类
import org.apache.zookeeper.ZooKeeper;
import java.util.concurrent.CountDownLatch;
public class ZKUtils {
private static ZooKeeper zk;
//192.168.5.130:2181/mxn 这个后面/mxn,表示客户端如果成功建立了到zk集群的连接,
// 那么默认该客户端工作的根path就是/mxn,如果不带/mxn,默认根path是/
//当然我们要保证/mxn这个节点在ZK上是存在的
private static String address =“192.18.5.129:2181,192.168.5.130:2181,192.168.5.130:2181/mxn”;
private static DefaultWatch watch = new DefaultWatch();
private static CountDownLatch init = new CountDownLatch(1);
public static ZooKeeper getZK(){
try {
//因为是异步的,所以要await,等到连接上zk集群之后再进行后续操作
zk = new ZooKeeper(address,1000,watch);
watch.setCc(init);
init.await();
} catch (Exception e) {
e.printStackTrace();
}
return zk;
}
}
测试类:
import org.apache.zookeeper.ZooKeeper;
import org.junit.Before;
import org.junit.Test;
public class TestConfig {
ZooKeeper zk;
@Before
public void conn(){
zk = ZKUtils.getZK();
}
public void close(){
try {
zk.close();
}catch (Exception e){
e.printStackTrace();
}
}
@Test
public void getConf(){
WatchCallBack watchCallBack = new WatchCallBack();
watchCallBack.setZk(zk);
MyConfig myConfig = new MyConfig();
watchCallBack.setConf(myConfig);
//阻塞等待
watchCallBack.aWait();
while(true){
if(myConfig.getConf().equals(“”)){
System.out.println(“zk node 节点丢失了 …”);
watchCallBack.aWait();
}else{
System.out.println(myConfig.getConf());
}
//
try {
//每隔500毫秒打印一次
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
[](()运行测试
首先我们要知道,因为我们连接IP的时候加上了 /mxn这个目录结构,所以我们在服务器初始状态就必须要有这个节点:
集群初始状态:
[zk: localhost:2181(CONNECTED) 7] ls /
[mxn, zookeeper]
我们启动程序看看
连接成功
ZooKeeper 下 /mxn 现在也是空
[zk: localhost:2181(CONNECTED) 9] ls /mxn
[]
[zk: localhost:2181(CONNECTED) 10]
现在我们来创建一个/mxn/myZNode节点数据
[zk: localhost:2181(CONNECTED) 10] create /mxn/myZNode “muxiaonong666”
Created /mxn/myZNode
可以看到,创建完成之后,程序马上给出响应,打印出了我配置的值,muxiaonong666
此时,再设置/mxn/myZNode的值为 muxiaonong6969
啪,很快啊!!!我们就可以看到值瞬间改变了

这个时候我们如果删除/mxn/myZNode节点,会发生什么呢,前面我们已经写了watch,如果Znode被删除了,,watch and callback执行
case NodeDeleted:
//节点删除
conf.setConf(“”);
//重新开启CountDownLatch
cc = new CountDownLatch(1);
break;
if(myConfig.getConf().equals(“”)){
System.out.println(“zk node 节点丢失了 …”);
此时应该阻塞住,等待着node重新创建



