栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

六、使用javaApi操作zookeeper

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

六、使用javaApi操作zookeeper

文章目录
  • 先行步骤
  • 创建测试用例
    • 创建全局变量
    • 编写初始化步骤
    • 获取节点api
    • 判断节点是否存在api
    • 完整代码
  • 进阶操作
    • 监控节点
      • 先行步骤
      • 客户端代码
      • 服务端代码

先行步骤

springboot项目中引入如下依赖

		
			org.apache.zookeeper
			zookeeper
			3.6.2
		
		
			junit
			junit
			test
		
创建测试用例 创建全局变量
private static final Logger logger = LoggerFactory.getLogger(ZookeeperDemoApplicationTest.class);

    private static String connectString =
            "192.168.1.148:2181,192.168.1.60:2181,192.168.1.125:2181";

    private static int sessionTimeout = 500000000;
    private ZooKeeper zkClient = null;
编写初始化步骤
@Before
    public void init() throws Exception {
        zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                    // 收到事件通知后的回调函数(用户的业务逻辑)
                logger.info(watchedEvent.getType() + "--"
                        + watchedEvent.getPath());

                try {
                    List children = zkClient.getChildren("/", true);
                    for (String child : children) {
                        logger.info("***************************"+child);
                    }
                } catch (Exception e) {
                   logger.error("zk监听回调失败"+e);
                }
            }
        });
    }
获取节点api
@Test
    public void getChildren(){
        try {
            List children = zkClient.getChildren("/", true);
            for (String child : children) {
                logger.info(child);
            }
            //延时 保证这段代码不会被销毁 当有回调事件时就会唤醒这段代码
            Thread.sleep(Long.MAX_VALUE);
        } catch (Exception e) {
            logger.error("zk监听回调失败"+e);
        }
    }
判断节点是否存在api
 @Test
    public void  exist() throws KeeperException, InterruptedException {
        Stat exists = zkClient.exists("/", false);
        logger.info("当前节点是否存在 "+exists==null?"不存在":"存在");
    }
完整代码
package com.example.zookeeperDemo;

import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;


public class ZookeeperDemoApplicationTest {


    private static final Logger logger = LoggerFactory.getLogger(ZookeeperDemoApplicationTest.class);

    private static String connectString =
            "192.168.1.148:2181";

    private static int sessionTimeout = 500000000;
    private ZooKeeper zkClient = null;

    @Before
    public void init() throws Exception {
        zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                    // 收到事件通知后的回调函数(用户的业务逻辑)
                logger.info(watchedEvent.getType() + "--"
                        + watchedEvent.getPath());

                try {
                    List children = zkClient.getChildren("/", true);
                    for (String child : children) {
                        logger.info("***************************"+child);
                    }
                } catch (Exception e) {
                   logger.error("zk监听回调失败"+e);
                }
            }
        });
    }

    @Test
    public void getChildren(){
        try {
            List children = zkClient.getChildren("/", true);
            for (String child : children) {
                logger.info(child);
            }
            Thread.sleep(Long.MAX_VALUE);
        } catch (Exception e) {
            logger.error("zk监听回调失败"+e);
        }
    }

    @Test
    public void  exist() throws KeeperException, InterruptedException {
        Stat exists = zkClient.exists("/", false);
        logger.info("当前节点是否存在 "+exists==null?"不存在":"存在");
    }
}

进阶操作

如下图所示,zookeeper常用于服务器上下线监控,当有服务器上线时,服务器就会向zookeeper注册临时节点,当服务器下线时这个节点就会消失。
对此,客户端只需对这个节点进行监控即可根据服务器动态上下线的各种情况做出相应调整。

监控节点 先行步骤

创建servers,各个服务器上线时都得在这个节点下创建节点。

 create /servers
客户端代码
package com.example.zookeeperDemo;

import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;

public class ZkClient {
    private static final Logger logger = LoggerFactory.getLogger(ZkClient.class);

    private static String connectString =
            "ip:2181";
    private static int sessionTimeout = 100000;
    private ZooKeeper zk = null;
    private String parentNode = "/servers";


    public void initZK() throws Exception {
        zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                try {
                    listenNode();
                } catch (Exception e) {
                    logger.error("监听节点变化出错 "+e);
                }
            }
        });
    }


    public void listenNode() throws Exception {
        List children = zk.getChildren(parentNode, true);

        List nodes=new ArrayList<>();

        for (String child : children) {
            byte[] data = zk.getData(parentNode + "/" + child, false, null);
            nodes.add(new String(data));
        }

        logger.info("***********当前节点内容 {}***************",nodes);
    }


    public void work() throws Exception {
        logger.info("****************************client is working ******************************* ");
        Thread.sleep(Long.MAX_VALUE);
    }

    public static void main(String[] args) throws Exception {
        ZkClient zkClient=new ZkClient();
        zkClient.initZK();
        zkClient.listenNode();

        zkClient.work();
    }
}

服务端代码
package com.example.zookeeperDemo;

import org.apache.zookeeper.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ZkServer {

    private static final Logger logger = LoggerFactory.getLogger(ZkServer.class);

    private static String connectString =
            "ip:2181";
    private static int sessionTimeout = 100000;
    private ZooKeeper zk = null;
    private String parentNode = "/servers";


    public void initZK() throws Exception {
        zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {

            }
        });
    }

    public void registerServer(String hostName) throws Exception {
        String result = zk.create(parentNode + "/" + hostName, hostName.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
        logger.info("******************{} 创建结果 {}*************************************", hostName, result);
    }


    public void work(String hostName) throws Exception {
        logger.info("****************************{} is working ******************************* ", hostName);
        Thread.sleep(Long.MAX_VALUE);
    }

    public static void main(String[] args) throws Exception {
        ZkServer server = new ZkServer();
        server.initZK();
        String hostName = "server1";
        server.registerServer(hostName);
        server.work(hostName);
    }
}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/301111.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号