zookeeper工具类
简单实用、废话不说、直接上代码!
public class CuratorZookeeperUtils {
private static Log log = LogFactory.getLog(CuratorZookeeperUtils.class);
public static Curatorframework initClient(ZKConfig zkConfig) {
String connectString = "";
for (String tempConnect : zkConfig.getConnectString()) {
connectString += tempConnect + ",";
}
// 为了实现不同的ZooKeeper也无语之间的隔离,往往会为每个业务分配一个独立的命名空间,即指定一个ZooKeeper根路径,那么该客户端对ZooKeeper上数据节点的任何操作,都是基于该相对目录进行的
Curatorframework client = CuratorframeworkFactory
.builder()
.connectionTimeoutMs(zkConfig.getConnectionTimeoutMs()).sessionTimeoutMs(zkConfig.getSessionTimeoutMs())
.connectString(connectString.substring(0, connectString.length() - 1))
.namespace(zkConfig.getNamespace())
.retryPolicy(new RetryNTimes(zkConfig.getRetryCount(), zkConfig.getSleepMsBetweenRetries())).build();
client.start();
return client;
}
public static List nodesList(Curatorframework client, String parentPath) throws Exception {
Stat stat = client.checkExists().forPath(parentPath);
if (stat == null) {
return null;
}
List paths = client.getChildren().forPath(parentPath);
return paths;
}
public static Boolean createNode(Curatorframework client, String path) throws Exception {
Stat stat = client.checkExists().forPath(path);
if (stat == null) {
client.create().creatingParentsIfNeeded().forPath(path);
}
return true;
}
public static Boolean createNode(Curatorframework client, String path, String value) throws Exception { byte[] data = getByteDataNode(client, path);
if (data != null) {
if (Arrays.equals(data, value.getBytes())) {
return true;
} else {
delNodeAndChild(client, path);
}
}
client.create().creatingParentsIfNeeded().forPath(path, value.getBytes());
return true;
}
public static String getDataNode(Curatorframework client, String path) throws Exception {
Stat stat = client.checkExists().forPath(path);
if (stat == null) {
return null;
}
byte[] datas = client.getData().forPath(path);
return new String(datas, "UTF-8");
}
public static byte[] getByteDataNode(Curatorframework client, String path) throws Exception {
Stat stat = client.checkExists().forPath(path);
if (stat == null) {
return null;
}
byte[] data = client.getData().forPath(path);
return data;
}
public static Boolean setDataNode(Curatorframework client, String path, String message) throws Exception {
Stat stat = client.checkExists().forPath(path);
if (stat == null) {
return null;
}
client.setData().withVersion(stat.getVersion()).forPath(path, message.getBytes());
return true;
}
public static Boolean delNodeAndChild(Curatorframework client, String path) throws Exception {
Stat stat = client.checkExists().forPath(path);
if (stat == null) {
return null;
}
client.delete().deletingChildrenIfNeeded().forPath(path);
return true;
}
public static Boolean delCurrentNode(Curatorframework client, String path) throws Exception {
Stat stat = client.checkExists().forPath(path);
if (stat == null) {
return null;
}
client.delete().forPath(path);
return true;
}
public static Boolean guarantDelNodeAndChild(Curatorframework client, String path) throws Exception {
Stat stat = client.checkExists().forPath(path);
if (stat == null) {
return null;
}
client.delete().guaranteed().deletingChildrenIfNeeded().forPath(path);
return true;
}
public static Boolean checkNode(Curatorframework client, String path) throws Exception {
Stat stat = client.checkExists().forPath(path);
if (stat == null) {
return false;
}
return true;
}
public static void setListenter(Curatorframework client, ZookeeperOpService curatorZookeeperOp, String path)
throws Exception {
ExecutorService pool = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS,
new SynchronousQueue());
// 设置监听器和处理过程
if (StringHelper.isEmpty(path)) {
path = "/";
}
if (!path.startsWith("/")) {
path = "/" + path;
}
delNodeAndChild(client,path);
CuratorCache curatorCache = CuratorCache.builder(client, path).build();
curatorCache.listenable()
.addListener(CuratorCacheListener.builder().forTreeCache(client, new TreeCacheListener() {
@Override
public void childEvent(Curatorframework client, TreeCacheEvent event) throws Exception {
ChildData data = event.getData();
if (data != null) {
switch (event.getType()) {
case NODE_ADDED:
log.info("NODE_ADDED : " + data.getPath() + " 数据:" + new String(data.getData()));
curatorZookeeperOp.addNode(data);
break;
case NODE_REMOVED:
log.info("NODE_ADDED : " + data.getPath() + " 数据:" + new String(data.getData()));
curatorZookeeperOp.delNode(data);
break;
case NODE_UPDATED:
log.info("NODE_ADDED : " + data.getPath() + " 数据:" + new String(data.getData()));
curatorZookeeperOp.updNode(data);
break;
default:
break;
}
} else {
log.info("data is null : " + event.getType());
}
}
}).build());
// 开始监听
curatorCache.start();
}
}