在之前的博客中,博主介绍了Curator框架的重试策略和Session API,并且对namespace进行了原理分析:
- ZooKeeper : Curator框架重试策略和Session API介绍
- ZooKeeper : Curator框架namespace原理分析
博主使用的Curator框架版本是5.2.0,ZooKeeper版本是3.6.3(5.2.0版本的Curator使用3.6.3版本的ZooKeeper,因此它们是兼容的)。
Znodeorg.apache.curator curator-recipes 5.2.0
在ZooKeeper中,Znode是存储数据、事件监听以及权限控制的主要对象,ZooKeeper的数据模型是一棵树,由斜杠/分割,类似Linux的文件系统,如下图所示(图来自ZooKeeper官网):
Znode有如下类型:
- 持久、临时:持久是默认的Znode类型,临时Znode相较于持久Znode来说就是它会随着客户端会话结束而被删除,通常可以用在一些特定的场景,如分布式锁释放、健康检查等。
- 持久顺序、临时顺序:在上面两种Znode类型的基础上,ZooKeeper会自动在这两种Znode类型的节点名后加一个数字后缀(保证唯一),这数字后缀的应用场景可以实现诸如分布式队列,分布式公平锁等。
- 容器:容器Znode类型是 3.5 以后新增的节点类型,容器Znode和持久Znode类似,但是区别是服务端启动后,会有一个单独的线程去扫描所有的容器Znode,当发现容器Znode的子节点数量为0时,会自动删除该容器Znode(删除时机,留到以后分析ZooKeeper源码时再进行介绍)。
- TTL、顺序TTL:这两种Znode类型重点是TTL(time to live,存活时间,单位为秒) ,当该节点没有子节点并且超过了指定的TTL时间后就会被自动删除,和容器Znode类似,只是容器Znode没有超时时间,使用TTL Znode需要配置extendedTypesEnabled=true,不然创建TTL Znode时会收到Unimplemented的报错。
每个Znode除了可以存储数据外,其本身还存储了数据节点相关的一些状态信息。Znode的状态信息如下表所示:
| 状态信息 | 描述 |
|---|---|
| cZxid | 该节点被创建时的事务id,ZooKeeper中的每个改变都会产生一个全局唯一的zxid,通过它可确定更新操作的先后顺序 |
| ctime | 该节点被创建的时间 |
| mZxid | 该节点最后一次更新的事务id |
| mtime | 该节点最后一次更新的时间 |
| pZxid | 该节点的子节点列表最后一次修改的事务id,只有子节点列表变更才会更新pZxid,子节点内容变更不会更新 |
| cversion | 子节点版本号,该节点的子节点变化时值就会增加1 |
| dataVersion | 节点数据版本号,节点创建时为0,每更新一次节点数据(不管内容有无变化)该版本号的值增加1 |
| aclVersion | 节点的ACL版本号 |
| ephemeralOwner | 创建该临时Znode的session id,如果是持久Znode,则ephemeralOwner为0 |
| dataLength | 节点数据的长度 |
| numChildren | 当前节点的子节点个数 |
在ZooKeeper提供的客户端中,使用stat命令可以查看Znode的状态信息(如下图所示,根Znode的状态信息):
- ZooKeeper :重要概念 & 客户端命令介绍
创建Znode需要使用create方法,但Curator框架提供的create方法,比Java原生客户端提供的create方法更方便、强大和易用,可以通过Builder的方式来组装需要创建的Znode,并且当需要创建的节点的父节点不存在时,可以先创建其父节点(不是默认行为,需要通过方法指定)。
package com.kaven.zookeeper;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.Curatorframework;
import org.apache.curator.framework.CuratorframeworkFactory;
import org.apache.curator.framework.imps.CuratorframeworkState;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
public class Application{
private static final String SERVER_PROXY = "192.168.1.184:9000";
private static final int CONNECTION_TIMEOUT_MS = 40000;
private static final int SESSION_TIMEOUT_MS = 10000;
private static final String NAMESPACE = "namespace";
public static void main(String[] args) throws Exception {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
Curatorframework curator = CuratorframeworkFactory.builder()
.connectString(SERVER_PROXY)
.retryPolicy(retryPolicy)
.connectionTimeoutMs(CONNECTION_TIMEOUT_MS)
.sessionTimeoutMs(SESSION_TIMEOUT_MS)
.namespace(NAMESPACE)
.build();
curator.start();
assert curator.getState().equals(CuratorframeworkState.STARTED);
curator.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.EPHEMERAL)
.inBackground((client, event) -> {
int resultCode = event.getResultCode();
System.out.println(KeeperException.Code.get(resultCode).name());
})
.forPath("/father/son/grandson", "data".getBytes());
Thread.sleep(10000000);
}
}
192.168.1.184:9000是ZooKeeper服务端的套接字(用于客户端的连接,博主修改了客户端连接的默认端口),create方法会返回CreateBuilder接口实现类的一个实例,用于通过Builder的方式来组装需要创建的Znode,如下图所示(很显然其他操作也是如此):
调用creatingParentsIfNeeded方法后,如果需要创建的节点的父节点不存在时(默认会报错),会以持久节点类型创建其不存在的父节点,而调用creatingParentContainersIfNeeded方法后,会以容器节点类型创建其不存在的父节点。调用withMode方法指定需要创建的节点的类型,所有的节点类型在CreateMode枚举类中定义:
package org.apache.zookeeper;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@InterfaceAudience.Public
public enum CreateMode {
PERSISTENT(0, false, false, false, false),
PERSISTENT_SEQUENTIAL(2, false, true, false, false),
EPHEMERAL(1, true, false, false, false),
EPHEMERAL_SEQUENTIAL(3, true, true, false, false),
CONTAINER(4, false, false, true, false),
PERSISTENT_WITH_TTL(5, false, false, false, true),
PERSISTENT_SEQUENTIAL_WITH_TTL(6, false, true, false, true);
private static final Logger LOG = LoggerFactory.getLogger(CreateMode.class);
private boolean ephemeral;
private boolean sequential;
private final boolean isContainer;
private int flag;
private boolean isTTL;
CreateMode(int flag, boolean ephemeral, boolean sequential, boolean isContainer, boolean isTTL) {
this.flag = flag;
this.ephemeral = ephemeral;
this.sequential = sequential;
this.isContainer = isContainer;
this.isTTL = isTTL;
}
...
}
调用inBackground方法会指定该操作以异步的方式进行,当异步操作完成后需要调用回调方法,因此需要将回调实例传入inBackground方法中,这里使用lambda表达式表示该回调实例,它其实应该是BackgroundCallback接口实现类的实例,因为该接口只有一个未实现的方法,因此可以通过lambda表达式表示,类似函数式接口。
public interface BackgroundCallback
{
public void processResult(Curatorframework client, CuratorEvent event) throws Exception;
}
而调用forPath方法,就是指定该操作的节点路径,并且还可以指定该节点存储的数据(也可以不指定)。所有节点都创建成功。
创建TTL节点,需要添加配置(extendedTypesEnabled=true):
添加配置后再重启ZooKeeper服务即可。
/usr/local/apache-zookeeper-3.6.3-bin/bin/zkServer.sh restart
CountDownLatch latch = new CountDownLatch(1);
Stat stat = new Stat();
curator.create()
.orSetData()
.withTtl(10000)
.storingStatIn(stat)
.creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT_WITH_TTL)
.inBackground((client, event) -> {
int resultCode = event.getResultCode();
System.out.println(KeeperException.Code.get(resultCode).name());
latch.countDown();
})
.forPath("/father/son/grandson", "data".getBytes());
latch.await();
System.out.println(stat);
- orSetData:如果节点存在,就设置节点数据。
- withTtl:指定TTL节点的TTL时间,必须是创建TTL节点。
- storingStatIn:将节点的状态信息存储于传入的Stat实例中(以填充的形式)。
- CountDownLatch :线程协调工具,因为上面创建节点的操作是异步的,因此需要等回调方法被调用后(创建完成,不管创建失败还是成功),才能输出stat实例,不然stat实例还未被填充(Java并发编程一CountDownLatch、CyclicBarrier、Semaphore初使用)。
而这个TTL节点只能存活10秒钟,因为它没有子节点。
删除节点需要调用delete方法,如果需要删除的节点存在子节点,可以先删除其存在的子节点(不是默认行为,需要通过方法指定)。
CountDownLatch latch = new CountDownLatch(1);
curator.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.EPHEMERAL)
.inBackground((client, event) -> {
int resultCode = event.getResultCode();
System.out.println(KeeperException.Code.get(resultCode).name());
latch.countDown();
})
.forPath("/father/son/grandson", "data".getBytes());
latch.await();
Stat stat = curator.setData()
.forPath("/father", "father".getBytes());
System.out.println(stat.getVersion());
byte[] data = curator.getData()
.storingStatIn(stat)
.forPath("/father");
System.out.println(new String(data));
System.out.println(stat.getVersion());
curator.delete()
.guaranteed()
.deletingChildrenIfNeeded()
.withVersion(0)
.inBackground((client, event) -> {
int resultCode = event.getResultCode();
System.out.println(KeeperException.Code.get(resultCode).name());
})
.forPath("/father");
- setData:设置节点存储的数据。
- getData:获取节点存储的数据。
- guaranteed:解决了在服务端操作可能成功,但在响应返回给客户端之前发生连接失败的边缘情况。
- deletingChildrenIfNeeded:如果需要删除的节点存在子节点,可以先删除其存在的子节点。
- withVersion:指定删除节点的版本号,如果与ZooKeeper服务端该节点的版本号不匹配,则删除操作会失败。
存在的子节点也不会被删除。
正确的版本号,删除操作会成功。
.withVersion(stat.getVersion())获取子节点列表
获取子节点列表需要使用getChildren方法。
curator.getChildren()
.forPath("/")
.forEach(System.out::println);
curator.getChildren()
.forPath("/father")
.forEach(System.out::println);
输出如下图所示:
Curator框架基本上使用了ZooKeeper提供的ACL定义,只是改变了使用方式,ZooKeeper的Java客户端对ACL的定义体现在Perms接口(授权权限)和Id类(授权策略和授权对象)中:
@InterfaceAudience.Public
public interface Perms {
int READ = 1 << 0;
int WRITE = 1 << 1;
int CREATE = 1 << 2;
int DELETE = 1 << 3;
int ADMIN = 1 << 4;
int ALL = READ | WRITE | CREATE | DELETE | ADMIN;
}
在Perms接口中授权权限被定义成一个整数,每个授权权限的二进制表示只有一个位置为1,并且这个位置各不相同,因此,可以通过按位或的方式实现授权权限的叠加。
| 权限值(只看后5位) | 权限 | 描述 |
|---|---|---|
| 00001 | READ | 可以读取节点数据及显示子节点列表 |
| 00010 | WRITE | 可以设置节点数据 |
| 00100 | CREATE | 可以创建子节点 |
| 01000 | DELETE | 可以删除子节点(仅直接子节点) |
| 10000 | ADMIN | 可以设置节点访问控制列表权限 |
@Public
public class Id implements Record {
private String scheme;
private String id;
...
}
授权策略(Scheme):
- world:开放模式,world表示任意客户端都可以访问(默认设置)。
- ip:限定客户端IP防问。
- auth:只有在会话中通过了认证才可以访问(通过addauth命令)。
- digest:与auth类似,区别在于auth用明文密码,而digest用SHA1+base64加密后的密码(通过addauth命令,实际场景中digest更常见)。
授权对象(ID)就是指定的授权策略(Scheme)的内容,比如world:anyone中的anyone、ip:192.168.1.189中的192.168.1.189、auth:username:password中的username:password(明文密码)、digest:username:password_digest中的username:password_digest(用SHA1+base64加密后的密码)。
ZooKeeper的Java客户端中内置了一些ACL定义(Ids接口中):
@InterfaceAudience.Public
public interface Ids {
Id ANYONE_ID_UNSAFE = new Id("world", "anyone");
Id AUTH_IDS = new Id("auth", "");
@SuppressFBWarnings(value = "MS_MUTABLE_COLLECTION", justification = "Cannot break API")
ArrayList OPEN_ACL_UNSAFE = new ArrayList(Collections.singletonList(new ACL(Perms.ALL, ANYONE_ID_UNSAFE)));
@SuppressFBWarnings(value = "MS_MUTABLE_COLLECTION", justification = "Cannot break API")
ArrayList CREATOR_ALL_ACL = new ArrayList(Collections.singletonList(new ACL(Perms.ALL, AUTH_IDS)));
@SuppressFBWarnings(value = "MS_MUTABLE_COLLECTION", justification = "Cannot break API")
ArrayList READ_ACL_UNSAFE = new ArrayList(Collections.singletonList(new ACL(Perms.READ, ANYONE_ID_UNSAFE)));
}
授权IP地址为192.168.1.100的客户端READ和WRITE权限可以定义如下:
new ACL(Perms.READ | Perms.WRITE, new Id("ip", "192.168.1.100"));
其他ACL定义以此类推。
客户端添加验证客户端添加验证其实很简单,在创建Curatorframework实例时,通过authorization方法指定:
Curatorframework curator = CuratorframeworkFactory.builder()
.connectString(SERVER_PROXY)
.retryPolicy(retryPolicy)
.connectionTimeoutMs(CONNECTION_TIMEOUT_MS)
.sessionTimeoutMs(SESSION_TIMEOUT_MS)
.namespace(NAMESPACE)
.authorization("digest", "kaven:itkaven".getBytes())
.build();
设置ACL
通过withACL方法,可以指定节点的ACL。
ACL acl1 = new ACL(ZooDefs.Perms.CREATE | ZooDefs.Perms.ADMIN | ZooDefs.Perms.READ | ZooDefs.Perms.WRITE,
new Id("digest", DigestAuthenticationProvider.generateDigest("kaven:itkaven")));
ACL acl2 = new ACL(ZooDefs.Perms.ALL ^ ZooDefs.Perms.DELETE,
new Id("digest", DigestAuthenticationProvider.generateDigest("kaven:itkaven")));
CountDownLatch latch = new CountDownLatch(1);
curator.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.EPHEMERAL)
.withACL(Collections.singletonList(acl2), true)
.inBackground((client, event) -> {
int resultCode = event.getResultCode();
System.out.println(KeeperException.Code.get(resultCode).name());
latch.countDown();
})
.forPath("/father/son/grandson", "data".getBytes());
latch.await();
curator.delete()
.inBackground((client, event) -> {
int resultCode = event.getResultCode();
System.out.println(KeeperException.Code.get(resultCode).name());
})
.forPath("/father/son/grandson");
这两个ACL实例是一样的功能(使用二进制的按位或和异或的操作,得到的结果是一样的)。
ACL acl1 = new ACL(ZooDefs.Perms.CREATE | ZooDefs.Perms.ADMIN | ZooDefs.Perms.READ | ZooDefs.Perms.WRITE,
new Id("digest", DigestAuthenticationProvider.generateDigest("kaven:itkaven")));
ACL acl2 = new ACL(ZooDefs.Perms.ALL ^ ZooDefs.Perms.DELETE,
new Id("digest", DigestAuthenticationProvider.generateDigest("kaven:itkaven")));
withACL方法的第二个参数如果为true(可以不指定,默认为false,博主只是为了演示,一般情况下不使用),则指定的ACL也应用于创建的父节点(即不存在的父节点),因此删除/father/son/grandson节点返回NOAUTH(没有权限),因为/father/son节点也具有指定的ACL,因此不能删除它的子节点(没有DELETE权限)。
而创建子节点的权限是有的(同理,其他具有权限的操作也可以顺利完成)。
curator.create()
.inBackground((client, event) -> {
int resultCode = event.getResultCode();
System.out.println(KeeperException.Code.get(resultCode).name());
})
.forPath("/father/son/grandson_");
还可以通过setACL方法(也可以指定节点的版本号,这里就不演示了)来设置节点的ACL,而getACL方法用来获取节点的ACL。
curator.setACL()
.withACL(Collections.singletonList(acl1))
.forPath("/father/son/grandson");
System.out.println(curator.getACL()
.forPath("/father/son/grandson"));
输出如下图所示:
23表示10111(二进制表示的权限)。
Curator框架Znode和ACL API就介绍到这里,如果博主有说错的地方或者大家有不同的见解,欢迎大家评论补充。



