在上一篇博客中,博主给大家介绍了Java客户端的Session、ACL以及Znode API:
- ZooKeeper :Java客户端Session、ACL、Znode API介绍
这一篇博客,博主将会介绍Watcher API的使用。
先创建一个maven项目:
pom.xml如下所示:
4.0.0 com.kaven zookeeper 1.0-SNAPSHOT 8 8 org.apache.zookeeper zookeeper 3.6.3
ZooKeeper依赖包的版本最好要和ZooKeeper服务端的版本一致。
WatcherWatcher是ZooKeeper中非常重要的特性, ZooKeeper上创建的节点,可以对这些节点绑定监听事件,比如可以监听节点数据变更、节点删除、子节点状态变更等事件,通过这个事件机制,可以基于ZooKeeper实现分布式锁、集群管理等功能。
Watcher特性:比如当节点数据发生变化的时候,ZooKeeper会产生一个Watcher事件,并且会发送到客户端,客户端收到监听的节点事件后,就可以进行相应的业务处理了。ZooKeeper的Watcher机制,可以分为三个过程:客户端注册Watcher、服务端处理Watcher和客户端回调。
添加Watcher在重要概念 & 客户端命令介绍这篇博客中,博主介绍了ZooKeeper客户端的addWatch命令:
- addWatch:客户端在节点上添加监听,有两种模式PERSISTENT和PERSISTENT_RECURSIVE,PERSISTENT模式只监听指定的节点事件,而PERSISTENT_RECURSIVE模式会监听指定节点与它所有子节点的事件。
addWatch命令对应Java客户端的addWatch方法:
// 使用给定的模式向给定的ZNode添加Watcher // 此方法只能设置AddWatchMode中的模式 void addWatch(String basePath, Watcher watcher, AddWatchMode mode) // 使用给定的模式向给定的ZNode添加Watcher // 此方法只能设置AddWatchMode中的模式 // 使用了默认的Watcher(创建ZooKeeper实例时传入的Watcher),其他方法类似 void addWatch(String basePath, AddWatchMode mode) // addWatch(String, Watcher, AddWatchMode)异步版本 void addWatch(String basePath, Watcher watcher, AddWatchMode mode, VoidCallback cb, Object ctx) // addWatch(String, AddWatchMode)异步版本 void addWatch(String basePath, AddWatchMode mode, VoidCallback cb, Object ctx)
AddWatchMode枚举类:
public enum AddWatchMode {
// 在给定的路径上设置一个在触发时不会被移除的Watcher(即它保持活动状态直到被移除)
// 该Watcher对data和child两类事件进行触发
// 该Watcher的行为就像在给定路径的ZNode上放置一个exists() Watcher和一个getChildren() Watcher一样
// 要移除该Watcher,需要使用removeWatches()和WatcherType.Any
PERSISTENT(ZooDefs.AddWatchModes.persistent),
// 在给定的路径上设置一个Watcher
// a) 触发时不会被移除(即它保持活动状态直到被移除)
// b) 不仅适用于注册路径,而且递归地适用于所有子路径
// 该Watcher对data和child两类事件进行触发
// 该Watcher的行为就像在给定路径的ZNode上放置一个exists() Watcher和一个getChildren() Watcher一样
// 要删除该Watcher,需要使用removeWatches()和WatcherType.Any
// 注意:当有递归监听时,性能会略有下降,因为必须检查ZNode路径的所有子路径以进行事件监听
PERSISTENT_RECURSIVE(ZooDefs.AddWatchModes.persistentRecursive)
;
public int getMode() {
return mode;
}
private final int mode;
AddWatchMode(int mode) {
this.mode = mode;
}
}
Application类(实现了Watcher接口,通过process方法,可以监听WatchedEvent的触发):
package com.kaven.zookeeper;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Id;
import org.apache.zookeeper.server.auth.DigestAuthenticationProvider;
import org.apache.zookeeper.server.watch.WatcherMode;
import java.io.IOException;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
public class Application implements Watcher {
private static CountDownLatch latch;
private static final String SERVER_PROXY = "192.168.1.184:9000";
private static final int TIMEOUT = 40000;
private static long time;
private String watcherName;
protected Application(String watcherName) {
this.watcherName = watcherName;
}
public static void main(String[] args) throws IOException, InterruptedException, KeeperException, NoSuchAlgorithmException {
Watcher connectWatcher = new Application("connectWatcher");
latch = new CountDownLatch(1);
time = System.currentTimeMillis();
ZooKeeper zk = new ZooKeeper(SERVER_PROXY, TIMEOUT, connectWatcher);
latch.await();
System.out.println(zk.getState());
System.out.println("Connection complete!");
ACL acl = new ACL(
ZooDefs.Perms.ALL,
new Id("digest",
DigestAuthenticationProvider.generateDigest("kaven:itkaven"))
);
String message = "success";
latch = new CountDownLatch(1);
zk.create("/itkaven",
"hello kaven".getBytes(),
new ArrayList<>(Collections.singletonList(acl)),
CreateMode.PERSISTENT,
(rc, path, ctx, name, s) -> {
System.out.println("-----------------create------------------");
System.out.println(rc);
System.out.println(path);
System.out.println(name);
System.out.println(name.equals(path) ? ctx : "error");
System.out.println(s.getDataLength());
System.out.println("-----------------create------------------");
latch.countDown();
},
message);
latch.await();
Watcher nodeWatcher = new Application("nodeWatcher");
String nodeMessage = "nodeWatcher success";
latch = new CountDownLatch(1);
AtomicBoolean isOk = new AtomicBoolean(false);
zk.addWatch("/itkaven", nodeWatcher, AddWatchMode.PERSISTENT,
(rc, path, ctx) -> {
System.out.println("-----------------addWatch------------------");
System.out.println(rc);
if(rc == KeeperException.Code.OK.intValue()) {
System.out.println(path);
System.out.println(ctx);
isOk.set(true);
}
System.out.println("-----------------addWatch------------------");
latch.countDown();
},
nodeMessage);
latch.await();
if(isOk.get()) {
zk.addAuthInfo("digest","kaven:itkaven".getBytes());
zk.setData("/itkaven", "new data".getBytes(), -1);
zk.create("/itkaven/son1", "son1".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
zk.getData("/itkaven/son1", true, null);
zk.create("/itkaven/son1/grandson1", "grandson1".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
zk.setData("/itkaven/son1", "new son1".getBytes(), -1);
zk.setData("/itkaven/son1", "new son2".getBytes(), -1);
zk.setData("/itkaven/son1/grandson1", "new grandson1".getBytes(), -1);
zk.delete("/itkaven/son1/grandson1", -1);
zk.delete("/itkaven/son1", -1);
zk.delete("/itkaven", -1);
}
Thread.sleep(1000000);
}
@Override
public void process(WatchedEvent watchedEvent) {
System.out.println("-----------------WatchedEvent------------------");
System.out.println(this.watcherName);
System.out.println(watchedEvent.getType());
System.out.println(watchedEvent.getState().name());
System.out.println(watchedEvent.getPath());
System.out.println("time use(ms):" + (System.currentTimeMillis() - time));
time = System.currentTimeMillis();
System.out.println("-----------------WatchedEvent------------------");
if(watchedEvent.getState().equals(Event.KeeperState.SyncConnected)) {
latch.countDown();
}
}
}
输出:
-----------------WatchedEvent------------------ connectWatcher None SyncConnected null time use(ms):13720 -----------------WatchedEvent------------------ ConNECTED Connection complete! -----------------create------------------ 0 /itkaven /itkaven success 11 -----------------create------------------ -----------------addWatch------------------ 0 /itkaven nodeWatcher success -----------------addWatch------------------ -----------------WatchedEvent------------------ nodeWatcher NodeDataChanged SyncConnected /itkaven time use(ms):31 -----------------WatchedEvent------------------ -----------------WatchedEvent------------------ nodeWatcher NodeChildrenChanged SyncConnected /itkaven time use(ms):3 -----------------WatchedEvent------------------ -----------------WatchedEvent------------------ connectWatcher NodeDataChanged SyncConnected /itkaven/son1 time use(ms):8 -----------------WatchedEvent------------------ -----------------WatchedEvent------------------ nodeWatcher NodeChildrenChanged SyncConnected /itkaven time use(ms):13 -----------------WatchedEvent------------------ -----------------WatchedEvent------------------ nodeWatcher NodeDeleted SyncConnected /itkaven time use(ms):4 -----------------WatchedEvent------------------
AddWatchMode.PERSISTENT类型的Watcher只会监听注册节点的相关事件(节点数据更改NodeDataChanged、子节点列表更改NodeChildrenChanged以及节点删除NodeDeleted等),而不会监听注册节点的子节点的相关事件(不会引起子节点列表更改的事件)。
ZooKeeper API调用成功后可以在ZooKeeper服务端的数据节点上留下Watcher(如果只是通过布尔参数boolean watch来判断是否留下Watcher,而不是传入Watcher实例参数,则是使用默认Watcher实例来进行监听事件的触发回调,上面程序的输出也正好如此)。 其他成功的ZooKeeper API调用可以触发这些Watcher。 一旦Watcher被触发,一个事件将被传递给Watcher的处理方法(以回调的形式,如process方法)。 每个Watcher只能触发一次,如上面的zk.getData("/itkaven/son1", true, null),在/itkaven/son1节点上留下了Watcher,但对/itkaven/son1节点的数据更改了两次,该Watcher只触发了一次,因为只能触发一次,和通过addWatch方法添加到节点上的Watcher不一样,因为这些Watcher触发时不会被移除。
// 方法定义 byte[] getData(String path, boolean watch, Stat stat) byte[] getData(final String path, Watcher watcher, Stat stat)
EventType枚举类(事件类型):
enum EventType {
None(-1),
NodeCreated(1),
NodeDeleted(2),
NodeDataChanged(3),
NodeChildrenChanged(4),
DataWatchRemoved(5),
ChildWatchRemoved(6),
PersistentWatchRemoved (7);
}
- None:连接建立事件。
- NodeCreated:节点创建事件。
- NodeDeleted:节点删除事件。
- NodeDataChanged:节点数据更改事件。
- NodeChildrenChanged:子节点列表更改事件。
- DataWatchRemoved:节点的数据监听被移除事件。
- ChildWatchRemoved:节点的子节点列表监听被移除事件。
- PersistentWatchRemoved:节点的持久监听被移除事件。
如果将AddWatchMode.PERSISTENT换成AddWatchMode.PERSISTENT_RECURSIVE:
zk.addWatch("/itkaven", nodeWatcher, AddWatchMode.PERSISTENT_RECURSIVE,
(rc, path, ctx) -> {
System.out.println("-----------------addWatch------------------");
System.out.println(rc);
if(rc == KeeperException.Code.OK.intValue()) {
System.out.println(path);
System.out.println(ctx);
isOk.set(true);
}
System.out.println("-----------------addWatch------------------");
latch.countDown();
},
nodeMessage);
输出也会改变:
-----------------WatchedEvent------------------ connectWatcher None SyncConnected null time use(ms):13763 -----------------WatchedEvent------------------ ConNECTED Connection complete! -----------------create------------------ 0 /itkaven /itkaven success 11 -----------------create------------------ -----------------addWatch------------------ 0 /itkaven nodeWatcher success -----------------addWatch------------------ -----------------WatchedEvent------------------ nodeWatcher NodeDataChanged SyncConnected /itkaven time use(ms):23 -----------------WatchedEvent------------------ -----------------WatchedEvent------------------ nodeWatcher NodeCreated SyncConnected /itkaven/son1 time use(ms):3 -----------------WatchedEvent------------------ -----------------WatchedEvent------------------ nodeWatcher NodeCreated SyncConnected /itkaven/son1/grandson1 time use(ms):4 -----------------WatchedEvent------------------ -----------------WatchedEvent------------------ connectWatcher NodeDataChanged SyncConnected /itkaven/son1 time use(ms):2 -----------------WatchedEvent------------------ -----------------WatchedEvent------------------ nodeWatcher NodeDataChanged SyncConnected /itkaven/son1 time use(ms):0 -----------------WatchedEvent------------------ -----------------WatchedEvent------------------ nodeWatcher NodeDataChanged SyncConnected /itkaven/son1 time use(ms):2 -----------------WatchedEvent------------------ -----------------WatchedEvent------------------ nodeWatcher NodeDataChanged SyncConnected /itkaven/son1/grandson1 time use(ms):3 -----------------WatchedEvent------------------ -----------------WatchedEvent------------------ nodeWatcher NodeDeleted SyncConnected /itkaven/son1/grandson1 time use(ms):3 -----------------WatchedEvent------------------ -----------------WatchedEvent------------------ nodeWatcher NodeDeleted SyncConnected /itkaven/son1 time use(ms):2 -----------------WatchedEvent------------------ -----------------WatchedEvent------------------ nodeWatcher NodeDeleted SyncConnected /itkaven time use(ms):4 -----------------WatchedEvent------------------
AddWatchMode.PERSISTENT_RECURSIVE类型的Watcher不仅监听注册节点的相关事件(节点数据更改NodeDataChanged和节点删除NodeDeleted等,而子节点列表更改NodeChildrenChanged其实就是子节点的节点创建NodeCreated和节点删除NodeDeleted,应该触发的是子节点的事件监听),还会递归地监听注册节点的所有子节点的相关事件。
注册默认Watcher调用ZooKeeper的某些API时,如果只是通过布尔参数boolean watch来判断是否给指定节点留下Watcher,而不是传入Watcher实例参数,则是使用默认Watcher实例来进行监听事件的触发回调(在创建ZooKeeper实例时被传入),register方法(synchronized修饰)用于注册默认Watcher,会覆盖构建期间指定的Watcher。
// 指定连接的默认Watcher(覆盖构建期间指定的Watcher)
public synchronized void register(Watcher watcher) {
watchManager.defaultWatcher = watcher;
}
public static void main(String[] args) throws IOException, InterruptedException, KeeperException, NoSuchAlgorithmException {
Watcher connectWatcher = new Application("connectWatcher");
latch = new CountDownLatch(1);
time = System.currentTimeMillis();
ZooKeeper zk = new ZooKeeper(SERVER_PROXY, TIMEOUT, connectWatcher);
latch.await();
System.out.println(zk.getState());
System.out.println("Connection complete!");
ACL acl = new ACL(
ZooDefs.Perms.ALL,
new Id("digest",
DigestAuthenticationProvider.generateDigest("kaven:itkaven"))
);
String message = "success";
latch = new CountDownLatch(1);
zk.create("/itkaven",
"hello kaven".getBytes(),
new ArrayList<>(Collections.singletonList(acl)),
CreateMode.PERSISTENT,
(rc, path, ctx, name, s) -> {
System.out.println("-----------------create------------------");
System.out.println(rc);
System.out.println(path);
System.out.println(name);
System.out.println(name.equals(path) ? ctx : "error");
System.out.println(s.getDataLength());
System.out.println("-----------------create------------------");
latch.countDown();
},
message);
latch.await();
zk.addAuthInfo("digest","kaven:itkaven".getBytes());
zk.create("/itkaven/son1", "son1".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
zk.register(new Application("new watcher1"));
zk.register(new Application("new watcher2"));
zk.getData("/itkaven/son1", true, null);
zk.register(new Application("new watcher3"));
zk.setData("/itkaven/son1", "new son1".getBytes(), -1);
Thread.sleep(1000000);
}
输出:
-----------------WatchedEvent------------------ connectWatcher None SyncConnected null time use(ms):13699 -----------------WatchedEvent------------------ ConNECTED Connection complete! -----------------create------------------ 0 /itkaven /itkaven success 11 -----------------create------------------ -----------------WatchedEvent------------------ new watcher2 NodeDataChanged SyncConnected /itkaven/son1 time use(ms):25 -----------------WatchedEvent------------------
由输出结果可知,register方法可以被多次调用(重复覆盖),并且ZooKeeper API会选择方法调用之前最新的默认Watcher,如getData方法(其他方法类似):
public byte[] getData(String path, boolean watch, Stat stat) throws KeeperException, InterruptedException {
return getData(path, watch ? watchManager.defaultWatcher : null, stat);
}
volatile修饰符保证了defaultWatcher属性的可见性,因此,只需要保证register方法的原子性即可,而register方法被synchronized修饰(具有原子性)。
protected volatile Watcher defaultWatcher;删除Watcher
删除Watcher有以下方法(同步版本与异步版本):
// 对于给定的ZNode路径(path参数),删除指定类型(watcherType参数)的指定Watcher(watcher参数,因此watcher参数不能为空) void removeWatches(String path, Watcher watcher, WatcherType watcherType, boolean local) // 异步版本 void removeWatches(String path, Watcher watcher, WatcherType watcherType, boolean local, VoidCallback cb,Object ctx) // 对于给定的ZNode路径(path参数),删除指定类型(watcherType参数)的所有Watcher(没有Watcher的限制) void removeAllWatches(String path, WatcherType watcherType, boolean local) // 异步版本 void removeAllWatches(String path, WatcherType watcherType, boolean local, VoidCallback cb, Object ctx)
- path:节点的路径。
- watcher:一个具体的Watcher。
- watcherType:要移除的Watcher类型。
- local:没有服务端连接时,是否可以在本地移除Watcher。
- VoidCallback:异步回调实例。
WatcherType枚举类:
enum WatcherType {
Children(1),
Data(2),
Any(3);
}
public static void main(String[] args) throws IOException, InterruptedException, KeeperException, NoSuchAlgorithmException {
Watcher connectWatcher = new Application("connectWatcher");
latch = new CountDownLatch(1);
time = System.currentTimeMillis();
ZooKeeper zk = new ZooKeeper(SERVER_PROXY, TIMEOUT, connectWatcher);
latch.await();
System.out.println(zk.getState());
System.out.println("Connection complete!");
ACL acl = new ACL(
ZooDefs.Perms.ALL,
new Id("digest",
DigestAuthenticationProvider.generateDigest("kaven:itkaven"))
);
String message = "success";
latch = new CountDownLatch(1);
zk.create("/itkaven",
"hello kaven".getBytes(),
new ArrayList<>(Collections.singletonList(acl)),
CreateMode.PERSISTENT,
(rc, path, ctx, name, s) -> {
System.out.println("-----------------create------------------");
System.out.println(rc);
System.out.println(path);
System.out.println(name);
System.out.println(name.equals(path) ? ctx : "error");
System.out.println(s.getDataLength());
System.out.println("-----------------create------------------");
latch.countDown();
},
message);
latch.await();
Watcher nodeWatcher = new Application("nodeWatcher");
String nodeMessage = "nodeWatcher success";
latch = new CountDownLatch(1);
AtomicBoolean isOk = new AtomicBoolean(false);
zk.addWatch("/itkaven", nodeWatcher, AddWatchMode.PERSISTENT_RECURSIVE,
(rc, path, ctx) -> {
System.out.println("-----------------addWatch------------------");
System.out.println(rc);
if(rc == KeeperException.Code.OK.intValue()) {
System.out.println(path);
System.out.println(ctx);
isOk.set(true);
}
System.out.println("-----------------addWatch------------------");
latch.countDown();
},
nodeMessage);
latch.await();
if(isOk.get()) {
zk.addAuthInfo("digest","kaven:itkaven".getBytes());
zk.getData("/itkaven", true, null);
zk.removeWatches("/itkaven", connectWatcher, WatcherType.Data, false);
zk.getChildren("/itkaven", true);
zk.removeAllWatches("/itkaven", WatcherType.Children, false);
zk.getChildren("/itkaven", nodeWatcher);
zk.exists("/itkaven", nodeWatcher);
zk.removeAllWatches("/itkaven", WatcherType.Any, false);
}
Thread.sleep(1000000);
}
输出:
-----------------WatchedEvent------------------ connectWatcher None SyncConnected null time use(ms):13732 -----------------WatchedEvent------------------ ConNECTED Connection complete! -----------------create------------------ 0 /itkaven /itkaven success 11 -----------------create------------------ -----------------addWatch------------------ 0 /itkaven nodeWatcher success -----------------addWatch------------------ -----------------WatchedEvent------------------ connectWatcher DataWatchRemoved SyncConnected /itkaven time use(ms):23 -----------------WatchedEvent------------------ -----------------WatchedEvent------------------ connectWatcher ChildWatchRemoved SyncConnected /itkaven time use(ms):2 -----------------WatchedEvent------------------ -----------------WatchedEvent------------------ nodeWatcher PersistentWatchRemoved SyncConnected /itkaven time use(ms):3 -----------------WatchedEvent------------------ -----------------WatchedEvent------------------ nodeWatcher DataWatchRemoved SyncConnected /itkaven time use(ms):1 -----------------WatchedEvent------------------ -----------------WatchedEvent------------------ nodeWatcher ChildWatchRemoved SyncConnected /itkaven time use(ms):0 -----------------WatchedEvent------------------
getData、getChildren以及exists三个方法都可以在节点上留下一次性Watcher,而这些Watcher的类型分别是Data、Children和Data,而通过addWatch方法可以在节点上添加持久Watcher(PERSISTENT和PERSISTENT_RECURSIVE),并且这些Watcher的类型是Any。由输出结果可知,删除类型为Any的Watcher,也可以一起删除类型为Children和Data的Watcher,而删除类型为Children或Data的Watcher,只能删除对应类型的Watcher。
在Java客户端Session、ACL、Znode API介绍这篇博客中介绍了这些方法的特性:
- getData:方法返回给定路径的节点的数据和状态信息(类似ZooKeeper客户端的get -s命令)。如果watch为true并且调用成功(也可以传入一个Watcher实例),则watch将留在给定路径的节点上。watch将由在节点上设置数据或删除节点的成功操作触发,该方法也存在异步版本。
- exists:方法返回给定路径的节点的状态信息(类似ZooKeeper客户端的stat命令)。如果不存在这样的节点,则返回null。如果watch为 true并且调用成功(也可以传入一个Watcher实例),则watch将留在给定路径的节点上。 watch将由创建、删除节点或在节点上设置数据的成功操作触发,该方法也存在异步版本。
- getChildren:返回给定路径的节点的子节点列表。如果watch为true并且调用成功(也可以传入一个Watcher实例),则watch将留在给定路径的节点上。 删除给定路径的节点或在节点下创建、删除子节点的成功操作将触发watch。返回的子节点列表未排序,该方法存在异步版本。
回顾前面博主对AddWatchMode枚举类的介绍:
不过博主觉得Java客户端的Watcher API部分,设计的有点混乱。Java客户端Watcher API介绍就到这里,如果博主有说错的地方或者大家有不同的见解,欢迎大家评论补充。



