1.前面一篇文章讲到了在Linux环境上搭建集群 建立4个节点,zookeeper有一个十分重要的功能是注册监听器,通过注册监听器,当zookeeper节点发生变化时,zookeeper会主动通知客户端,从而实现一些功能。好比如当一台服务器启动的时候,我们在zookeeper上创建一个临时节点。通过监听这些临时节点,我们就可以知道目前有多少台服务器在线。当服务器关掉时zookeeper也会主动通知我们,这样我们就相当于实时了解当前服务器在线情况,方便协调服务器访问,监听节点的数据变化事件包括:
1.1、节点被创建;
1.2、节点上写入数据;
1.3、节点数据变化;
1.4、节点数据被删除;
1.5、节点本身被删除
2.下面看代码示例,首先添加maven依赖包,并启动好zookeeper:
zkclient
3.连接zk的ClusterCoordinate.java方法如下:
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import java.io.IOException;
import java.util.*;
public class ClusterCoordinate
{
protected String ClusterName = null;
protected String MyNodeName = null;
protected String ConnectionString = null;
protected int ConnectTimeout = 5000;
protected ClusterCoordinateListener clusterCoordinateListener = null;
private ZooKeeper zk;
public ClusterCoordinate(String ClusterName, String MyNodeName, String ConnectionString, int ConnectTimeout, ClusterCoordinateListener clusterCoordinateListener)
{
this.ClusterName = ClusterName;
this.MyNodeName = MyNodeName;
this.ConnectionString = ConnectionString;
this.ConnectTimeout = ConnectTimeout;
this.clusterCoordinateListener = clusterCoordinateListener;
}
public int Connect()
{
try
{
//连接
zk = new ZooKeeper(ConnectionString, ConnectTimeout, new Watcher()
{
@Override
public void process(WatchedEvent watchedEvent)
{
// zk 路径
String path = watchedEvent.getPath();
try
{
List
clusterCoordinateListener.onClusterChange(children);
System.out.println("当前节点发生改变======================"+children+"=======变化节点================="+path);
}
catch (KeeperException e)
{
e.printStackTrace();
}
catch (InterruptedException e)
{
e.printStackTrace();
}
}
});
try
{
//判断父节点是否存在
Stat ExistsParents = this.zk.exists("/" + ClusterName, true);
if(null==ExistsParents)
{
this.zk.create("/"+ClusterName, "123".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
//判断子节点是否存在
Stat ExistsChild = this.zk.exists("/" + ClusterName + "/" + MyNodeName, true);
if(null==ExistsChild)
{
this.zk.create("/"+ClusterName+"/"+MyNodeName, "123".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
}
}
catch (KeeperException e)
{
e.printStackTrace();
}
catch (InterruptedException e)
{
e.printStackTrace();
}
}
catch (IOException e)
{
e.printStackTrace();
}
return 0;
}
}
以上代码是基于ZooKeeper的事件监听----Watch机制 Watcher 监听机制是 Zookeeper 中非常重要的特性,我们基于 zookeeper 上创建的节点,可以对这些节点绑定监听事件。比如可以监听节点数据变更、节点删除、子节点状态变更等事件,通过这个事件机制,可以实现:基于 zookeeper 实现分布式锁、集群管理等功能
Watcher 特性:当数据发生变化的时候, zookeeper 会产生一个 watcher 事件,并且会发送到客户端。但是客户端只会收到一次通知。如果后续这个节点再次发生变化,那么之前设置 watcher 的客户端不会再次收到消息(watcher 是一次性的操作),可以通过循环监听去达到永久监听效果
以上代码回调中zk.getChildren("/"+ClusterName, true);方法起到持续监听节点的作用,如果此处不查询只能监听一次
4.最后写一个main方法进行测试:
public static void main(String[] args) throws Exception
{
ClusterCoordinate c = new ClusterCoordinate("Test", "node1", "192.168.71.39:2881,192.168.71.39:2882,192.168.71.39:2883,192.168.71.39:2884", 3000, new ClusterCoordinateListener()
{
@Override
public void onClusterChange(List
{
//TODO 此处是监听后的节点 具体的处理业务可以在这里写 故以上写了个实现ClusterCoordinateListener 接口
System.out.println(AliveNode);
}
});
c. Connect();
Thread.sleep(20000000);
}
5.ClusterCoordinateListener.java接口
import java.util.List;
public interface ClusterCoordinateListener
{
public int onClusterChange(List
}
6.最后看一下测试结果 当全部把服务器集群全部停掉后 立马就会返回DisConnected状态,然后会持续的进行重试连接 当服务器重启之后 就会连接恢复数据 当干掉其中一台时 首先会抛出异常无法连接当前父节点,随后会进行重新选举 然后监听其它的节点:



