依赖
org.apache.curator curator-recipes 5.2.0 org.junit.jupiter junit-jupiter-api 5.8.2 test
public class TestCuratorService {
private String zookeeperConnectionString = "127.0.0.1:2181";
private Curatorframework client;
@Before
public void initClient1() {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
client = CuratorframeworkFactory.newClient(zookeeperConnectionString, retryPolicy);
client.start();
}
public void initClient2() {
ExponentialBackoffRetry retry = new ExponentialBackoffRetry(1000, 3);
client = CuratorframeworkFactory.builder()
//zookeeper 的连接地址
.connectString(zookeeperConnectionString)
.connectionTimeoutMs(10000)
.sessionTimeoutMs(10000)
.retryPolicy(retry).build();
client.start();
}
@Test
public void dataIsExists() throws Exception {
String path = "/test/aaaa";
Stat stat = client.checkExists()
.usingWatcher(new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
System.out.println("监听回调");
}
})
.forPath(path);
System.out.println(stat);
}
@Test
public void createPath() throws Exception {
String path = "/test/aaaa";
String res = client.create()
// 递归创建path
.creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT)
.withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
.forPath(path, "path数据内容".getBytes());
System.out.println(res);
}
@Test
public void deletePath() throws Exception {
String path = "/test/aaaa";
client.delete()
// 强制删除
.guaranteed()
// 递归删除
.deletingChildrenIfNeeded()
//.withVersion(-1)
.forPath(path);
}
@Test
public void getPath() throws Exception {
String path = "/test";
CountDownLatch countDownLatch = new CountDownLatch(1);
List childPaths = client.getChildren()
.usingWatcher(new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
System.out.println("监听回调");
}
})
// inBackground会导致forPath的返回为null
.inBackground(new BackgroundCallback() {
@Override
public void processResult(Curatorframework curatorframework, CuratorEvent curatorEvent) {
System.out.println("异步操作");
List pathList = curatorframework.getChildren().forPath(path);
System.out.println(pathList);
countDownLatch.countDown();
}
})
.forPath(path);
countDownLatch.await();
}
@Test
public void getData() throws Exception {
String path = "/test";
byte[] bytes = client.getData()
.usingWatcher(new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
System.out.println("监听回调");
}
})
.forPath(path);
String data = new String(bytes);
System.out.println(data);
}
@Test
public void updateData() throws Exception {
String path = "/test";
client.setData().forPath(path, "数据内容".getBytes());
}
@Test
public void transaction() throws Exception {
//创建事务
CuratorOp curatorOp = client.transactionOp().delete().forPath("/test/aaa");
//执行事务
List results = client.transaction().forOperations(curatorOp);
}
@Test
public void zookeeperLock() throws Exception {
String lockPath = "/test/lock";
InterProcessMutex lock = new InterProcessMutex(client, lockPath);
// 加锁
if (lock.acquire(5, TimeUnit.SECONDS)) {
try {
// 业务操作
} finally {
// 解锁
lock.release();
}
}
}
}



