Zookeeper是一个Apache开源的分布式的应用,为系统架构提供协调服务。从设计模式角度来审视:该组件是一个基于观察者模式设计的框架,负责存储和管理数据,接受观察者的注册,一旦数据的状态发生变化,Zookeeper就将负责通知已经在Zookeeper上注册的观察者做出相应的反应,从而实现集群中类似Master/Slave管理模式。ZooKeeper的目标就是封装好复杂易出错的关键服务,将简单易用的接口和性能高效、功能稳定的系统提供给用户。
技术架构:
Spring Boot 2.6.3 、Zookeeper3.4.6 、 JDK1.8
导入依赖application.yml配置org.apache.curator curator-framework2.12.0 org.apache.curator curator-recipes2.12.0 org.apache.curator curator-client2.12.0 org.projectlombok lombok1.18.4
zoo:
keeper:
#开启标志
enabled: true
#服务器地址
server: 192.168.116.100:2181
#命名空间,被称为ZNode
namespace: lx
#权限控制,加密
digest: smile:111111
#会话超时时间
sessionTimeoutMs: 3000
#连接超时时间
connectionTimeoutMs: 60000
#最大重试次数
maxRetries: 10
#初始休眠时间
baseSleepTimeMs: 1000
Zookeeper配置
package com.example.zk.config;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
@Data
@Component
@ConfigurationProperties(prefix = "zoo.keeper")
public class ZookeeperProperties {
private String enabled;
private String server;
private String namespace;
private String digest;
private int sessionTimeoutMs;
private int connectionTimeoutMs;
private int maxRetries;
private int baseSleepTimeMs;
}
Zookeeper 初始化
package com.example.zk.config;
import lombok.extern.slf4j.Slf4j;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.Curatorframework;
import org.apache.curator.framework.CuratorframeworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import javax.annotation.PostConstruct;
@Slf4j
@Configuration
public class ZookeeperConfig {
@Autowired
private ZookeeperProperties zookeeperProperties ;
private static Curatorframework client = null ;
@PostConstruct
public void init (){
//重试策略,初试时间1秒,重试10次
RetryPolicy policy = new ExponentialBackoffRetry(
zookeeperProperties.getbaseSleepTimeMs(),
zookeeperProperties.getMaxRetries());
//通过工厂创建Curator
client = CuratorframeworkFactory.builder()
.connectString(zookeeperProperties.getServer())
.authorization("digest",zookeeperProperties.getDigest().getBytes())
.connectionTimeoutMs(zookeeperProperties.getConnectionTimeoutMs())
.sessionTimeoutMs(zookeeperProperties.getSessionTimeoutMs())
.retryPolicy(policy).build();
//开启连接
client.start();
log.info("zookeeper 初始化完成...");
}
public static Curatorframework getClient (){
return client ;
}
public static void closeClient (){
if (client != null){
client.close();
}
}
}
Zookeeper节点数据监听
package com.example.zk.config;
import lombok.extern.slf4j.Slf4j;
import org.apache.curator.framework.Curatorframework;
import org.apache.curator.framework.recipes.cache.*;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@Component
@Slf4j
public class ZookListening implements ApplicationRunner {
@Override
public void run(ApplicationArguments args) throws Exception {
ExecutorService pool = Executors.newFixedThreadPool(2);
final NodeCache nodeCache = new NodeCache(ZookeeperConfig.getClient(), "/lanxi", false);
nodeCache.start(true);
nodeCache.getListenable().addListener(
new NodeCacheListener() {
@Override
public void nodeChanged() throws Exception {
log.info("Node data is changed, new data: {}" ,
new String(nodeCache.getCurrentData().getData()));
}
},
pool
);
final PathChildrenCache childrenCache = new PathChildrenCache(ZookeeperConfig.getClient(), "/llx", true);
childrenCache.start(PathChildrenCache.StartMode.NORMAL);
childrenCache.getListenable().addListener(
new PathChildrenCacheListener() {
@Override
public void childEvent(Curatorframework client, PathChildrenCacheEvent event)
throws Exception {
switch (event.getType()) {
case CHILD_ADDED:
System.out.println("CHILD_ADDED: " + event.getData().getPath());
break;
case CHILD_REMOVED:
System.out.println("CHILD_REMOVED: " + event.getData().getPath());
break;
case CHILD_UPDATED:
System.out.println("CHILD_UPDATED: " + event.getData().getPath());
break;
default:
System.out.println("default: " + event.getData().getPath());
break;
}
}
},
pool
);
}
}
Zookeeper操作Service类
package com.example.zk.service;
import org.apache.curator.framework.recipes.locks.InterProcessReadWriteLock;
import org.apache.zookeeper.CreateMode;
import java.util.List;
public interface ZookeeperService {
boolean isExistNode (final String path) ;
void createNode (CreateMode mode, String path ) ;
void setNodeData (String path, String nodeData) ;
void createNodeAndData (CreateMode mode, String path , String nodeData) ;
String getNodeData (String path) ;
List getNodeChild (String path) ;
void deleteNode (String path,Boolean recursive) ;
InterProcessReadWriteLock getReadWriteLock (String path) ;
}
package com.example.zk.service.impl;
import com.example.zk.config.ZookeeperConfig;
import com.example.zk.service.ZookeeperService;
import lombok.extern.slf4j.Slf4j;
import org.apache.curator.framework.Curatorframework;
import org.apache.curator.framework.recipes.locks.InterProcessReadWriteLock;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;
import java.util.ArrayList;
import java.util.List;
@Slf4j
@Service
public class ZookeeperServiceImpl implements ZookeeperService {
@Override
public boolean isExistNode(String path) {
Curatorframework client = ZookeeperConfig.getClient();
client.sync() ;
try {
Stat stat = client.checkExists().forPath(path);
return client.checkExists().forPath(path) != null;
} catch (Exception e) {
log.error("isExistNode error...", e);
e.printStackTrace();
}
return false;
}
@Override
public void createNode(CreateMode mode, String path) {
Curatorframework client = ZookeeperConfig.getClient() ;
try {
// 递归创建所需父节点
client.create().creatingParentsIfNeeded().withMode(mode).forPath(path);
} catch (Exception e) {
log.error("createNode error...", e);
e.printStackTrace();
}
}
@Override
public void setNodeData(String path, String nodeData) {
Curatorframework client = ZookeeperConfig.getClient() ;
try {
// 设置节点数据
client.setData().forPath(path, nodeData.getBytes("UTF-8"));
} catch (Exception e) {
log.error("setNodeData error...", e);
e.printStackTrace();
}
}
@Override
public void createNodeAndData(CreateMode mode, String path, String nodeData) {
Curatorframework client = ZookeeperConfig.getClient() ;
try {
// 创建节点,关联数据
client.create().creatingParentsIfNeeded().withMode(mode)
.forPath(path,nodeData.getBytes("UTF-8"));
} catch (Exception e) {
log.error("createNode error...", e);
e.printStackTrace();
}
}
@Override
public String getNodeData(String path) {
Curatorframework client = ZookeeperConfig.getClient() ;
try {
// 数据读取和转换
byte[] dataByte = client.getData().forPath(path) ;
String data = new String(dataByte,"UTF-8") ;
if (!StringUtils.isEmpty(data)){
return data ;
}
}catch (Exception e) {
log.error("getNodeData error...", e);
e.printStackTrace();
}
return null;
}
@Override
public List getNodeChild(String path) {
Curatorframework client = ZookeeperConfig.getClient() ;
List nodeChildDataList = new ArrayList<>();
try {
// 节点下数据集
nodeChildDataList = client.getChildren().forPath(path);
} catch (Exception e) {
log.error("getNodeChild error...", e);
e.printStackTrace();
}
return nodeChildDataList;
}
@Override
public void deleteNode(String path, Boolean recursive) {
Curatorframework client = ZookeeperConfig.getClient() ;
try {
if(recursive) {
// 递归删除节点
client.delete().guaranteed().deletingChildrenIfNeeded().forPath(path);
} else {
// 删除单个节点
client.delete().guaranteed().forPath(path);
}
} catch (Exception e) {
log.error("deleteNode error...", e);
e.printStackTrace();
}
}
@Override
public InterProcessReadWriteLock getReadWriteLock(String path) {
Curatorframework client = ZookeeperConfig.getClient() ;
// 写锁互斥、读写互斥
InterProcessReadWriteLock readWriteLock = new InterProcessReadWriteLock(client, path);
return readWriteLock ;
}
}
Zookeeper操作Controller类
package com.example.zk.web;
import com.example.zk.service.ZookeeperService;
import org.apache.zookeeper.CreateMode;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.List;
import static org.apache.zookeeper.CreateMode.PERSISTENT;
@RestController
public class ZookeeperApi {
@Autowired
private ZookeeperService zookeeperService ;
@GetMapping("/getNodeData")
public String getNodeData (String path) {
return zookeeperService.getNodeData(path) ;
}
@GetMapping("/isExistNode")
public boolean isExistNode (final String path){
return zookeeperService.isExistNode(path) ;
}
@GetMapping("/createNode")
public String createNode (String path ){
zookeeperService.createNode(CreateMode.PERSISTENT,path) ;
return "success" ;
}
@GetMapping("/setNodeData")
public String setNodeData (String path, String nodeData) {
zookeeperService.setNodeData(path,nodeData) ;
return "success" ;
}
@GetMapping("/createNodeAndData")
public String createNodeAndData ( String path , String nodeData){
zookeeperService.createNodeAndData(PERSISTENT,path,nodeData) ;
return "success" ;
}
@GetMapping("/getNodeChild")
public List getNodeChild (String path) {
return zookeeperService.getNodeChild(path) ;
}
@GetMapping("/deleteNode")
public String deleteNode (String path,Boolean recursive) {
zookeeperService.deleteNode(path,recursive) ;
return "success" ;
}
}
接口测试
http://127.0.0.1:8088/createNode?path=/llx/222



