1.pom 文件
org.apache.zookeeper zookeeper3.4.8 org.apache.curator curator-framework4.0.0 org.apache.curator curator-recipes4.0.0
2.zk 操作
package com.vince.xq.kafka;
import org.apache.curator.framework.Curatorframework;
import org.apache.curator.framework.CuratorframeworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import org.junit.Test;
import java.util.concurrent.CountDownLatch;
public class ZKTest {
@Test
public void testZkConnect() throws Exception {
final CountDownLatch countDownLatch = new CountDownLatch(1);
ZooKeeper zooKeeper =
new ZooKeeper("127.0.0.1:2181",
4000, new Watcher() {
@Override
public void process(WatchedEvent event) {
if (Event.KeeperState.SyncConnected == event.getState()) {
//如果收到了服务端的响应事件,连接成功
countDownLatch.countDown();
}
}
});
countDownLatch.await();
//ConNECTED
//zooKeeper.create("/CONFIG/timeout", "0".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
zooKeeper.delete("/CONFIG/timeout", -1);
System.out.println(zooKeeper.getState());
}
@Test
public void testZkCreateNode() throws Exception {
final CountDownLatch countDownLatch = new CountDownLatch(1);
ZooKeeper zooKeeper =
new ZooKeeper("127.0.0.1:2181",
4000, new Watcher() {
@Override
public void process(WatchedEvent event) {
if (Event.KeeperState.SyncConnected == event.getState()) {
//如果收到了服务端的响应事件,连接成功
countDownLatch.countDown();
}
}
});
countDownLatch.await();
//ConNECTED
String path = "/watcher";
String nodePath = zooKeeper.create(path, "123".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
System.out.println(nodePath);
System.out.println(zooKeeper.getState());
}
@Test
public void testZkDeleteNode() throws Exception {
final CountDownLatch countDownLatch = new CountDownLatch(1);
ZooKeeper zooKeeper =
new ZooKeeper("127.0.0.1:2181",
4000, new Watcher() {
@Override
public void process(WatchedEvent event) {
if (Event.KeeperState.SyncConnected == event.getState()) {
//如果收到了服务端的响应事件,连接成功
countDownLatch.countDown();
}
}
});
countDownLatch.await();
//ConNECTED
zooKeeper.delete("/watcher", -1);
System.out.println(zooKeeper.getState());
}
@Test
public void getZkNodeData() throws Exception {
final CountDownLatch countDownLatch = new CountDownLatch(1);
ZooKeeper zooKeeper =
new ZooKeeper("127.0.0.1:2181",
4000, new Watcher() {
@Override
public void process(WatchedEvent event) {
if (Event.KeeperState.SyncConnected == event.getState()) {
//如果收到了服务端的响应事件,连接成功
countDownLatch.countDown();
}
}
});
countDownLatch.await();
Stat stat = new Stat();
// getData的返回值是该节点的数据值,节点的状态信息会赋值给stat对象
byte[] data = zooKeeper.getData("/CONFIG/timeout", true, stat);
System.out.println(new String(data));
System.out.println(stat);
System.out.println(zooKeeper.getState());
}
@Test
public void getZkConnect() throws Exception {
Curatorframework curatorframework = CuratorframeworkFactory.
builder().connectString("127.0.0.1:2181").
sessionTimeoutMs(4000).retryPolicy(new
ExponentialBackoffRetry(1000, 3)).
namespace("").build();
curatorframework.start();
Stat stat = new Stat();
//查询节点数据
byte[] bytes = curatorframework.getData().storingStatIn(stat).forPath("/runoob");
System.out.println(new String(bytes));
curatorframework.close();
}
@Test
public void setData() throws Exception {
Curatorframework curatorframework = CuratorframeworkFactory.
builder().connectString("127.0.0.1:2181").
sessionTimeoutMs(4000).retryPolicy(new
ExponentialBackoffRetry(1000, 3)).
namespace("").build();
curatorframework.start();
Stat stat = new Stat();
//查询节点数据
curatorframework.setData().forPath("/watcher", "123".getBytes());
curatorframework.close();
}
}
参考:
https://segmentfault.com/a/1190000012262940



