| 命令 | 说明 |
|---|---|
| help | 显示所有操作命令 |
| ls path | 查看当前节点的子节点(可监听) -w 监听子节点变化 -s 附加次级信息 -R 递归查看 |
| create | 创建普通节点 -s 含有序列 -e 临时(重启或者超时消失) -c 容器节点,当该容器中没有节时,超时后被删除(60s) |
| get path | 获得指定路径下节点的值(可监听) -w 监听节点内容的变化 -s 附加次级信息 |
| set | 设置节点的具体信息 |
| stat | 查看节点状态 |
| delete | 删除节点 -v 跟一个版本号,根据版本号删除(乐观锁删除) |
| deleteall | 递归删除节点 |
持久(Persistent)节点:客户端和服务端断开连接后,创建的节点不删除
- 持久化目录节点:客户端与Zookeeper断开连接后,该节点依然存在
- 持久化顺序编号目录节点:客户端与Zookeeper断开连接后,该节点依旧存在,只是Zookeeper给该节名称进行顺序编号
短暂(Ephemeral)节点:客户端和服务端断开连接后,创建的节点将会删除
- 临时目录节点:客户端与Zookeeper断开连接后,该节点删除
- 临时顺序编号目录节点:客户端与Zookeeper断开连接后,该节点被删除,只是Zookeeper给该节点名称进行顺序编号
1.3 节点数据详情创建节点时设置顺序标识,节点名称后会附加一个值,顺序号是一个单调递增的计数器,由父节点维护。
| 详情 | 描述 |
|---|---|
| czxid | 创建节点的事务 zxid |
| ctime | 节点被创建的毫秒数(1970年开始) |
| mzxid | 节点最后跟新的事务zxid |
| mtime | 节点最后修改的毫秒数(1970年开始) |
| pzxid | 节点最后更新的子节点zxid |
| cversion | 子节点变化号,子节点修改次数 |
| dataversion | 节点数据变化号 |
| aclVersion | 节点访问控制列表的变化号 |
| ephemeralOwner | 如果是临时节点,这个是节点所有者的 session id,如果不是临时节点则是0 |
| dataLength | 节点的数据长度 |
| numChildren | 节点的子节点数量 |
zookeeper 中的节点包含四个部分:data、acl、stat、child
data:保存数据
acl:权限,定义了什么样的用户能操作这个节点,且能够进行怎样的操作
- c:ceate,创建权限,允许再该节点下创建节点
- w:write,更新权限,允许更新该节点的数据
- r:read,读取权限,允许读取该节点的内容以及子节点的列表信息
- d:delete,删除权限,允许删除该节点的子节点
- a:admin,管理者权限,允许对该节点进行acl权限设置
stat:描述当前节点的元数据
child:当前节点的子节点
1.5 节点权限注册当前会话的账号和密码:
addauth digest <账号>:<密码>
创建节点并设置权限:
create /<节点名称> <节点内容> auth:<账号>:<密码>:<权限>2. 客户端API操作 2.1 项目准备
1️⃣ 创建 Maven 项目
2️⃣ 导入依赖
2.2 创建客户端junit junit 4.13.1 test org.apache.zookeeper zookeeper 3.5.7 org.apache.logging.log4j log4j-core 2.6.2
private String connectString = "node1:2181,node2:2181,node3:2181";
private int sessionTimeOut = 2000;
private ZooKeeper zooKeeper;
@Before
public void init() throws IOException {
// connectString:连接的主机及端口号
// sessionTimeOut: 超时时间(单位秒)
// Watcher: 节点监听器
zooKeeper = new ZooKeeper(connectString, sessionTimeOut, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
}
});
}
2.3 创建节点
@Test
public void create() throws InterruptedException, KeeperException, IOException {
// 创建节点的地址
// 节点的数据内容
// 节点等级
// 节点类型
String nodeCreated = zooKeeper.create("/mouse",
"Jack".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
}
使用命令行测试:
2.4 判断节点是否存在@Test
public void exist() throws IOException, InterruptedException, KeeperException {
if(zooKeeper == null){
System.out.println("zooKeeper 为空,正在新建连接!----------------");
zooKeeper = new ZooKeeper(connectString, sessionTimeOut, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
}
});
}
Stat stat = zooKeeper.exists("/mouse", false);
System.out.println(stat==null ? "该节点不存在" : "该节点存在");
}
IDEA输出测试:
2.5 监听节点@Test
public void getChildren() throws IOException, InterruptedException, KeeperException {
if(zooKeeper == null){
System.out.println("zooKeeper 为空,正在新建连接!----------------");
zooKeeper = new ZooKeeper(connectString, sessionTimeOut, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
System.out.println("---------------------");
List childrens = null;
try {
childrens = zooKeeper.getChildren("/", true);
for (String children : childrens) {
System.out.println(children);
}
System.out.println("---------------------");
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
Thread.sleep(Long.MAX_VALUE);
}
测试:
3. Curator 客户端操作 3.1 Curator 简介Curator 是 Netflix 公司开源的一套 Zookeeper 客户端框架,目前是 Apache 的顶级项目。Curator 是对 Zookeeper 支持最好的客户端框架,与 Zookeeper 提供的原生客户端相比, Curator 的抽象层次更高,简化了 Zookeeper 客户端的开发量,包括连接重连、反复注册 wathcer 和 NodeExistsException 异常等。Curator 封装了大部分 Zookeeper 的功能,比如 Leader 选举、分布式锁等。
Curator 包的组成:
- curator-framework:,高级的 API 封装极大地简化了 ZooKeeper 的使用
- curator-client:提供客户端操作
- curator-recipes:封装了一些高级特性(如 Cache 事件监听、Leader 选举、分布式锁、分布式计数、分布式 Barrier 等)
引入 Curator 依赖:
3.2 配置文件org.apache.curator curator-framework 2.12.0 org.apache.curator curator-client 2.12.0 org.apache.curator curator-recipes 2.12.0
application.properties
curator.retryCount=5 curator.elapsedTimeMs=5000 curator.connectString=node1:2181,node2:2181,node3:2181 curator.sessionTimeoutMs=60000 curator.connectionTimeoutMs=5000
WrapperZookeeper.java
@Data
@Component
@ConfigurationProperties(prefix = "curator")
public class WrapperZookeeper {
private int retryCount;
private int elapsedTimeMs;
private String connectString;
private int sessionTimeoutMs;
private int connectionTimeoutMs;
}
3.3 注册连接
CuratorConfig.java
@Configuration
public class CuratorConfig {
@Autowired
WrapperZookeeper wrapperZookeeper;
@Bean(initMethod = "start")
public Curatorframework curatorframework() {
return CuratorframeworkFactory.newClient(
wrapperZookeeper.getConnectString(),
wrapperZookeeper.getSessionTimeoutMs(),
wrapperZookeeper.getConnectionTimeoutMs(),
new RetryNTimes(wrapperZookeeper.getRetryCount(), wrapperZookeeper.getElapsedTimeMs())
);
}
}
3.4 客户端操作
BootZkClientApplicationTests
@Slf4j
@SpringBootTest
public class BootZkClientApplicationTests {
@Autowired
Curatorframework curatorframework;
@Test
void createNode() throws Exception {
// 添加持久节点
String path = curatorframework.create().forPath("/animal");
// 添加临时序号节点
String path1 = curatorframework.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath("/animal", "cat".getBytes());
System.out.println(String.format("curator create node :%s successfully", path));
System.in.read();
}
// 获取节点数据
@Test
void testGetData() throws Exception {
byte[] bytes = curatorframework.getData().forPath("/animal");
System.out.println(new String(bytes));
}
// 修改节点数据
@Test
void testSetData() throws Exception {
curatorframework.setData().forPath("/animal", "change!".getBytes());
byte[] bytes = curatorframework.getData().forPath("animal");
System.out.println(new String(bytes));
}
// 父节点下创建子节点
@Test
public void testCreateWithParent() throws Exception {
String pathWithParent = "/animal/mouse";
String path = curatorframework.create().creatingParentsIfNeeded().forPath(pathWithParent);
System.out.println(String.format("curator create node :%s successfully.", path));
}
// 删除节点,如果有子节点也一并删除
@Test
void testDeleteNode() throws Exception {
curatorframework.delete().guaranteed().deletingChildrenIfNeeded().forPath("/animal");
}
// 监听节点 ============================================================
@Test
void addNodeListener() throws Exception {
NodeCache cache = new NodeCache(curatorframework, "/animal");
cache.getListenable().addListener(new NodeCacheListener() {
@Override
public void nodeChanged() throws Exception {
log.info("{} path nodeChanged!", "/animal");
printNodeData();
}
});
cache.start();
System.in.read();
}
// 打印节点数据
void printNodeData() throws Exception {
byte[] bytes = curatorframework.getData().forPath("/animal");
log.info("data: {}", new String(bytes));
}
}
❤️ END ❤️



