在之前的博客中,博主已经介绍了Curator框架的Session、Znode以及ACL API:
- ZooKeeper : Curator框架重试策略和Session API介绍
- ZooKeeper : Curator框架Znode、ACL API介绍
本篇博客将介绍Curator框架的Watcher API,它简化了Java客户端原生Watcher API的使用,了解后者可以更好地理解前者的实现:
- ZooKeeper :Java客户端Watcher API介绍
博主使用的Curator框架版本是5.2.0,ZooKeeper版本是3.6.3。
org.apache.curator curator-recipes 5.2.0
5.2.0版本的Curator使用3.6.3版本的ZooKeeper,因此它们是兼容的。
Watcher是ZooKeeper中非常重要的特性, ZooKeeper上创建的节点,可以对这些节点绑定监听事件,比如可以监听节点数据变更、节点删除、子节点状态变更等事件,通过这个事件机制,可以基于ZooKeeper实现分布式锁、集群管理等功能。
Watcher特性:比如当节点数据发生变化的时候,ZooKeeper会产生一个Watcher事件,并且会发送到客户端,客户端收到监听的节点事件后,就可以进行相应的业务处理了。ZooKeeper的Watcher机制,可以分为三个过程:客户端注册Watcher、服务端处理Watcher和客户端回调。
一次性Watcher在Java客户端原生API中,调用getData、getChildren以及exists这三个方法都可以在节点上留下一次性Watcher,而这些Watcher的类型分别是Data、Children和Data,这些一次性Watcher类型在WatcherType枚举类中定义。
@Public
public static enum WatcherType {
Children(1),
Data(2),
Any(3);
private final int intValue;
private WatcherType(int intValue) {
this.intValue = intValue;
}
public int getIntValue() {
return this.intValue;
}
public static Watcher.WatcherType fromInt(int intValue) {
switch(intValue) {
case 1:
return Children;
case 2:
return Data;
case 3:
return Any;
default:
throw new RuntimeException("Invalid integer value for conversion to WatcherType");
}
}
}
Java客户端原生API中getData、getChildren以及exists三个方法的一次性Watcher触发条件:
- getData:一次性Watcher将由在节点上设置数据或删除节点的成功操作触发。
- exists:一次性Watcher将由创建、删除节点或在节点上设置数据的成功操作触发。
- getChildren:删除给定路径的节点或在节点下创建、删除子节点的成功操作将触发一次性Watcher。
测试类Application(实现了CuratorWatcher接口,因此Application的实例有Watcher的功能 ):
package com.kaven.zookeeper;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.Curatorframework;
import org.apache.curator.framework.CuratorframeworkFactory;
import org.apache.curator.framework.api.CuratorWatcher;
import org.apache.curator.framework.imps.CuratorframeworkState;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
public class Application implements CuratorWatcher {
private static final String SERVER_PROXY = "192.168.31.172:9000";
private static final int CONNECTION_TIMEOUT_MS = 40000;
private static final int SESSION_TIMEOUT_MS = 10000;
private static final String NAMESPACE = "MyNamespace";
public static void main(String[] args) throws Exception {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
Curatorframework curator = CuratorframeworkFactory.builder()
.connectString(SERVER_PROXY)
.retryPolicy(retryPolicy)
.connectionTimeoutMs(CONNECTION_TIMEOUT_MS)
.sessionTimeoutMs(SESSION_TIMEOUT_MS)
.namespace(NAMESPACE)
.build();
curator.start();
assert curator.getState().equals(CuratorframeworkState.STARTED);
Application watcher = new Application();
curator.checkExists()
.usingWatcher(watcher)
.forPath("/father/son/grandson1");
curator.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.EPHEMERAL)
.forPath("/father/son/grandson1", "data".getBytes());
curator.setData()
.forPath("/father/son/grandson1", "new data".getBytes());
Thread.sleep(10000000);
}
@Override
public void process(WatchedEvent watchedEvent) throws Exception {
System.out.println(watchedEvent.getPath());
System.out.println(watchedEvent.getType());
}
}
输出:
/father/son/grandson1 NodeCreated
Curator框架的checkExists方法对应于Java客户端的exists方法。对节点进行了两次操作(创建和修改节点数据),但Watcher只触发了一次。如果想要多次监听节点事件就需要多次添加这种一次性Watcher:
Application watcher = new Application();
curator.checkExists()
.usingWatcher(watcher)
.forPath("/father/son/grandson1");
curator.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.EPHEMERAL)
.forPath("/father/son/grandson1", "data".getBytes());
curator.checkExists()
.usingWatcher(watcher)
.forPath("/father/son/grandson1");
curator.setData()
.forPath("/father/son/grandson1", "new data".getBytes());
curator.checkExists()
.usingWatcher(watcher)
.forPath("/father/son/grandson1");
curator.delete()
.forPath("/father/son/grandson1");
输出:
/father/son/grandson1 NodeCreated /father/son/grandson1 NodeDataChanged /father/son/grandson1 NodeDeleted
很显然符合预期,想要多次监听事件,就必须多次添加一次性Watcher,而上面的程序由于没有再次添加一次性Watcher,节点的NodeDataChanged事件就不能监听,因为一次性Watcher在节点的NodeCreated事件触发时就被Zookeeper服务端移除了。Curator框架的getData方法对应于Java客户端的getData方法,它们都只能在节点存在的情况下给节点留下一次性Watcher,因此它们留下的一次性Watcher肯定不能监听该节点的创建事件,其他事件的监听与checkExists方法的一次性Watcher类似,这里不再赘述。
Application watcher = new Application();
curator.getData()
.usingWatcher(watcher)
.forPath("/father/son/grandson1");
curator.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.EPHEMERAL)
.forPath("/father/son/grandson1", "data".getBytes());
Curator框架的getChildren方法对应于Java客户端的getChildren方法,给指定节点留下一次性Watcher的方式也是类似的,都是通过usingWatcher方法,删除给定路径的节点或在节点下创建、删除子节点的成功操作将触发这个一次性Watcher(临时节点不能创建子节点)。
curator.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT)
.forPath("/father/son/grandson1", "data".getBytes());
Application watcher = new Application();
curator.getChildren()
.usingWatcher(watcher)
.forPath("/father/son/grandson1");
curator.create()
.withMode(CreateMode.EPHEMERAL)
.forPath("/father/son/grandson1/test", "test".getBytes());
curator.getChildren()
.usingWatcher(watcher)
.forPath("/father/son/grandson1");
curator.delete()
.forPath("/father/son/grandson1/test");
curator.getChildren()
.usingWatcher(watcher)
.forPath("/father/son/grandson1");
curator.delete()
.forPath("/father/son/grandson1");
输出:
/father/son/grandson1 NodeChildrenChanged /father/son/grandson1 NodeChildrenChanged /father/son/grandson1 NodeDeleted
在Curator框架中可以使用两种Watcher,Java客户端提供的Watcher接口和Curator框架提供的CuratorWatcher接口。而它们的方法定义是类似的:
package org.apache.curator.framework.api;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
public interface CuratorWatcher
{
public void process(WatchedEvent event) throws Exception;
}
package org.apache.zookeeper;
import org.apache.yetus.audience.InterfaceAudience.Public;
@Public
public interface Watcher {
void process(WatchedEvent var1);
...
}
在Curator框架中这两种Watcher的实现都可以使用。
T usingWatcher(Watcher watcher);
T usingWatcher(CuratorWatcher watcher);
当需要一直监听指定节点的事件时,一次性Watcher就不太方便了,这时就需要持久Watcher。
持久Watcher在Java客户端原生API中,调用addWatch方法可以在节点上添加持久Watcher(PERSISTENT和PERSISTENT_RECURSIVE),并且这些Watcher的类型是Any。在Curator框架中,可以如下所示给节点添加持久Watcher(通过.watchers().add()链式调用):
curator.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT)
.forPath("/father/son/grandson1", "data".getBytes());
Application watcher = new Application();
curator.watchers()
.add()
.withMode(AddWatchMode.PERSISTENT)
.usingWatcher(watcher)
.forPath("/father/son/grandson1");
curator.create()
.withMode(CreateMode.EPHEMERAL)
.forPath("/father/son/grandson1/test", "test".getBytes());
curator.setData()
.forPath("/father/son/grandson1/test", "new test".getBytes());
输出:
/father/son/grandson1 NodeChildrenChanged
AddWatchMode.PERSISTENT类型的Watcher只会监听注册节点的相关事件(节点数据更改NodeDataChanged、子节点列表更改NodeChildrenChanged以及节点删除NodeDeleted等),而不会监听注册节点的子节点的相关事件(不会引起子节点列表更改的事件)。如果将AddWatchMode.PERSISTENT换成AddWatchMode.PERSISTENT_RECURSIVE:
curator.watchers()
.add()
.withMode(AddWatchMode.PERSISTENT_RECURSIVE)
.usingWatcher(watcher)
.forPath("/father/son/grandson1");
输出:
/father/son/grandson1/test NodeCreated /father/son/grandson1/test NodeDataChanged
AddWatchMode.PERSISTENT_RECURSIVE类型的Watcher不仅监听注册节点的相关事件(节点数据更改NodeDataChanged和节点删除NodeDeleted等,而子节点列表更改NodeChildrenChanged其实就是子节点的节点创建NodeCreated和节点删除NodeDeleted,应该触发的是子节点的事件监听),还会递归地监听注册节点的所有子节点的相关事件。
AddWatchMode枚举类:
删除类型为Any的Watcher,也会一起删除类型为Children和Data的Watcher,而删除类型为Children或Data的Watcher,只能删除对应类型的Watcher。
删除指定节点类型为Data的所有Watcher:
Application watcher1 = new Application();
Application watcher2 = new Application();
curator.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT)
.forPath("/father/son/grandson1", "data".getBytes());
curator.getChildren()
.usingWatcher(watcher1)
.forPath("/father/son/grandson1");
curator.getData()
.usingWatcher(watcher2)
.forPath("/father/son/grandson1");
curator.watchers()
.removeAll()
.ofType(Watcher.WatcherType.Data)
.forPath("/father/son/grandson1");
@Override
public void process(WatchedEvent watchedEvent) throws Exception {
System.out.println(watchedEvent.getPath());
System.out.println(watchedEvent.getType());
System.out.println(this);
}
输出:
/father/son/grandson1 DataWatchRemoved com.kaven.zookeeper.Application@5af5ec05
删除指定节点类型为Any的所有Watcher:
curator.watchers()
.removeAll()
.ofType(Watcher.WatcherType.Any)
.forPath("/father/son/grandson1");
输出:
/father/son/grandson1 ChildWatchRemoved com.kaven.zookeeper.Application@61613ed9 /father/son/grandson1 DataWatchRemoved com.kaven.zookeeper.Application@5af5ec05
删除指定节点类型为Any的指定Watcher:
curator.watchers()
.remove(watcher1)
.ofType(Watcher.WatcherType.Any)
.forPath("/father/son/grandson1");
输出:
/father/son/grandson1 ChildWatchRemoved com.kaven.zookeeper.Application@5ebb066a
这些输出都符合预期。Curator框架Watcher API介绍就到这里,如果博主有说错的地方或者大家有不同的见解,欢迎大家评论补充。



