简介:本篇博客主要是使用Zookeeper的API对Zookeeper的增删查改,但是在递归实现节点的数据变化监控时,这里始终没有写好,希望有大神帮我指点迷津,感谢!
package com.lqs.api.zookeeper;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.List;
public class ZookeeperDemo {
private String connectString;
private ZooKeeper zooKeeper;
private int sessionTimeout;
@Before
public void init() throws IOException {
connectString = "bdc112:2181,bdc113:2181,bdc114:2181";
sessionTimeout = 3000;
zooKeeper = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
// System.out.println(watchedEvent);
}
});
}
@After
public void close() throws InterruptedException {
zooKeeper.close();
}
@Test
public void ls() throws InterruptedException, KeeperException {
//通过客户端对象对Zookeeper进行各种操作
List children = zooKeeper.getChildren("/", false);
System.out.println(children);
}
@Test
public void lsAndWatch() throws InterruptedException, KeeperException {
final boolean[] flag = {false};
List children = zooKeeper.getChildren("/test", new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
System.out.println(watchedEvent);
flag[0] = true;
System.exit(flag[0] ? 0 : 1);
try {
lsAndWatch();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
}
}
});
System.out.println(children);
//这里设置睡眠是需要监听,所以当前的线程不能结束
Thread.sleep(Long.MAX_VALUE);
}
@Test
public void create() throws InterruptedException, KeeperException {
//参数解读 1节点路径 2节点存储的数据
//3节点的权限(使用Ids选个OPEN即可) 4节点类型 短暂 持久 短暂带序号 持久带序号
//创建持久序列
String path = zooKeeper.create("/test12", "test11".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.CONTAINER);
//创建临时节点
// String path = zooKeeper.create("/testLin", "test22".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
//创建临时序列
// String path = zooKeeper.create("/test", "test12".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
//一定要注意,创建临时节点的话,需要线程阻塞,否则看不到效果
//Thread.sleep(10000);
//创建持久化序列
// String path = zooKeeper.create("/test", "test".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);
//创建持久化节点
// String path = zooKeeper.create("/test/test11", "test".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
@Test
public void exist() throws InterruptedException, KeeperException {
Stat stat = zooKeeper.exists("/test", false);
System.out.println(stat != null ? "文件存在..." : "文件不存在...");
}
@Test
public void get() throws InterruptedException, KeeperException {
//判断节点是否存在,在Zookeeper中
Stat stat = zooKeeper.exists("/test", false);
if (stat == null) {
System.out.println("当前节点不存在...");
} else {
byte[] data = zooKeeper.getData("/test", false, stat);
System.out.println(new String(data));
}
}
@Test
public void getAndWatch() throws InterruptedException, KeeperException {
Stat stat = zooKeeper.exists("/test12", false);
if (stat == null) {
System.out.println("当前节点不存在...");
return;
}
final boolean[] flagOut = {true};
//这里监听的是我们获取子节点的数据有没有发生变化
byte[] data = zooKeeper.getData("/test12", new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
System.out.println(watchedEvent);
flagOut[0] = true;
}
}, stat);
if (flagOut[0]) {
System.out.println(new String(data));
}
while (true) {
//睡眠时间为1秒,即1秒中调用一次
Thread.sleep(1000);
// flagOut[0] =false;
if (flagOut[0]) {
getAndWatch();
}
}
}
@Test
public void set() throws InterruptedException, KeeperException {
Stat stat = zooKeeper.exists("/test", false);
if (stat == null) {
System.out.println("当前节点不存在...");
return;
}
Stat result = zooKeeper.setData("/test", "test".getBytes(), stat.getVersion());
System.out.println(result != null ? "设置节点值成功..." : "设置节点值失败...");
}
private Stat getStat(String path) throws InterruptedException, KeeperException {
Stat stat = zooKeeper.exists(path, false);
if (stat == null) {
System.out.println("当前节点不存在...");
exist();
}
return stat;
}
@Test
public void rm() throws InterruptedException, KeeperException {
Stat result = getStat("/test0000000006");
zooKeeper.delete("/test0000000006", result.getVersion());
}
public void deleteAll(String path) throws InterruptedException, KeeperException {
Stat stat = getStat(path);
List children = zooKeeper.getChildren(path, false);
for (String child : children) {
deleteAll(path + "/" + child);
}
// zooKeeper.delete(path,stat.getVersion());
//这里的version即stat里面的得到的节点的dataversion - znode数据变化号。如果不知道是多少,则传入-1即可
zooKeeper.delete(path, -1);
}
@Test
public void deleteAllTest() throws InterruptedException, KeeperException {
deleteAll("/test");
}
public void lsAndWatch(String path) throws InterruptedException, KeeperException {
List children = zooKeeper.getChildren(path, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
System.out.println(watchedEvent);
try {
lsAndWatch(path);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
}
}
});
System.out.println(children);
}
@Test
public void testWatch() throws InterruptedException, KeeperException {
lsAndWatch("/test");
Thread.sleep(Long.MAX_VALUE);
}
}



