栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

zookeeper集群节点上注册监听器watch详解(二)

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

zookeeper集群节点上注册监听器watch详解(二)

1.前面一篇文章讲到了在Linux环境上搭建集群  建立4个节点,zookeeper有一个十分重要的功能是注册监听器,通过注册监听器,当zookeeper节点发生变化时,zookeeper会主动通知客户端,从而实现一些功能。好比如当一台服务器启动的时候,我们在zookeeper上创建一个临时节点。通过监听这些临时节点,我们就可以知道目前有多少台服务器在线。当服务器关掉时zookeeper也会主动通知我们,这样我们就相当于实时了解当前服务器在线情况,方便协调服务器访问,监听节点的数据变化事件包括:

1.1、节点被创建;

1.2、节点上写入数据;

1.3、节点数据变化;

1.4、节点数据被删除;

1.5、节点本身被删除

2.下面看代码示例,首先添加maven依赖包,并启动好zookeeper:


    com.101tec
    zkclient
    0.10

3.连接zk的ClusterCoordinate.java方法如下:

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;

import java.io.IOException;
import java.util.*;

public class ClusterCoordinate
{
    
    protected String ClusterName = null;
    
    protected String MyNodeName = null;
    
    protected String ConnectionString = null;
    
    protected int ConnectTimeout = 5000;
    
    protected ClusterCoordinateListener clusterCoordinateListener = null;

    private ZooKeeper zk;


    public ClusterCoordinate(String ClusterName, String MyNodeName, String ConnectionString, int ConnectTimeout, ClusterCoordinateListener clusterCoordinateListener)
    {
        this.ClusterName = ClusterName;
        this.MyNodeName = MyNodeName;
        this.ConnectionString = ConnectionString;
        this.ConnectTimeout = ConnectTimeout;
        this.clusterCoordinateListener = clusterCoordinateListener;
    }

    
    public int Connect()
    {
        try
        {

            //连接
            zk = new ZooKeeper(ConnectionString, ConnectTimeout, new Watcher()
            {
                @Override
                public void process(WatchedEvent watchedEvent)
                {

                    // zk 路径
                    String path = watchedEvent.getPath();
                    try
                    {
                        List children = zk.getChildren("/"+ClusterName, true);
                        clusterCoordinateListener.onClusterChange(children);
                        System.out.println("当前节点发生改变======================"+children+"=======变化节点================="+path);

                    }
                    catch (KeeperException e)
                    {
                        e.printStackTrace();
                    }
                    catch (InterruptedException e)
                    {
                        e.printStackTrace();
                    }
                }
            });
            try
            {
                //判断父节点是否存在
                Stat ExistsParents = this.zk.exists("/" + ClusterName, true);
                if(null==ExistsParents)
                {
                    this.zk.create("/"+ClusterName, "123".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
                }
                //判断子节点是否存在
                Stat ExistsChild = this.zk.exists("/" + ClusterName + "/" + MyNodeName, true);
                if(null==ExistsChild)
                {
                    this.zk.create("/"+ClusterName+"/"+MyNodeName, "123".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
                }
            }
            catch (KeeperException e)
            {
                e.printStackTrace();
            }
            catch (InterruptedException e)
            {
                e.printStackTrace();
            }

        }
        catch (IOException e)
        {
            e.printStackTrace();
        }
        return 0;
    }

}

以上代码是基于ZooKeeper的事件监听----Watch机制  Watcher 监听机制是 Zookeeper 中非常重要的特性,我们基于 zookeeper 上创建的节点,可以对这些节点绑定监听事件。比如可以监听节点数据变更、节点删除、子节点状态变更等事件,通过这个事件机制,可以实现:基于 zookeeper 实现分布式锁、集群管理等功能

Watcher 特性:当数据发生变化的时候, zookeeper 会产生一个 watcher 事件,并且会发送到客户端。但是客户端只会收到一次通知。如果后续这个节点再次发生变化,那么之前设置 watcher 的客户端不会再次收到消息(watcher 是一次性的操作),可以通过循环监听去达到永久监听效果

以上代码回调中zk.getChildren("/"+ClusterName, true);方法起到持续监听节点的作用,如果此处不查询只能监听一次   

4.最后写一个main方法进行测试:

public static void main(String[] args) throws Exception
{
ClusterCoordinate c = new ClusterCoordinate("Test", "node1", "192.168.71.39:2881,192.168.71.39:2882,192.168.71.39:2883,192.168.71.39:2884", 3000, new ClusterCoordinateListener()
{

@Override
public void onClusterChange(List AliveNode)
{

//TODO  此处是监听后的节点   具体的处理业务可以在这里写    故以上写了个实现ClusterCoordinateListener 接口
System.out.println(AliveNode);
}
});

c. Connect();

Thread.sleep(20000000);
}

5.ClusterCoordinateListener.java接口

import java.util.List;


public interface ClusterCoordinateListener
{
    public int onClusterChange(List AliveNode);
}

6.最后看一下测试结果 当全部把服务器集群全部停掉后   立马就会返回DisConnected状态,然后会持续的进行重试连接  当服务器重启之后    就会连接恢复数据   当干掉其中一台时  首先会抛出异常无法连接当前父节点,随后会进行重新选举  然后监听其它的节点: 

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/530801.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号