栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

使用Zookeeper API 对Zookeeper的增删查改操作

使用Zookeeper API 对Zookeeper的增删查改操作

简介:本篇博客主要是使用Zookeeper的API对Zookeeper的增删查改,但是在递归实现节点的数据变化监控时,这里始终没有写好,希望有大神帮我指点迷津,感谢!

package com.lqs.api.zookeeper;

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.io.IOException;
import java.util.List;



public class ZookeeperDemo {

    private String connectString;
    private ZooKeeper zooKeeper;
    private int sessionTimeout;

    
    @Before
    public void init() throws IOException {
        connectString = "bdc112:2181,bdc113:2181,bdc114:2181";
        sessionTimeout = 3000;

        zooKeeper = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
//                System.out.println(watchedEvent);
            }
        });
    }

    
    @After
    public void close() throws InterruptedException {
        zooKeeper.close();
    }

    
    @Test
    public void ls() throws InterruptedException, KeeperException {
        //通过客户端对象对Zookeeper进行各种操作
        List children = zooKeeper.getChildren("/", false);
        System.out.println(children);
    }

    
    @Test
    public void lsAndWatch() throws InterruptedException, KeeperException {

        final boolean[] flag = {false};

        List children = zooKeeper.getChildren("/test", new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                System.out.println(watchedEvent);
                flag[0] = true;
                System.exit(flag[0] ? 0 : 1);
                try {
                    lsAndWatch();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (KeeperException e) {
                    e.printStackTrace();
                }
            }
        });

        System.out.println(children);
        //这里设置睡眠是需要监听,所以当前的线程不能结束
        Thread.sleep(Long.MAX_VALUE);

    }

    
    @Test
    public void create() throws InterruptedException, KeeperException {

        //参数解读 1节点路径  2节点存储的数据
        //3节点的权限(使用Ids选个OPEN即可) 4节点类型 短暂 持久 短暂带序号 持久带序号

        

        //创建持久序列
        String path = zooKeeper.create("/test12", "test11".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.CONTAINER);

        //创建临时节点
//        String path = zooKeeper.create("/testLin", "test22".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);

        //创建临时序列
//        String path = zooKeeper.create("/test", "test12".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);

        //一定要注意,创建临时节点的话,需要线程阻塞,否则看不到效果
        //Thread.sleep(10000);


        //创建持久化序列
//        String path = zooKeeper.create("/test", "test".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);

        //创建持久化节点
//        String path = zooKeeper.create("/test/test11", "test".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);


    }

    
    @Test
    public void exist() throws InterruptedException, KeeperException {

        Stat stat = zooKeeper.exists("/test", false);
        System.out.println(stat != null ? "文件存在..." : "文件不存在...");

    }

    
    @Test
    public void get() throws InterruptedException, KeeperException {

        //判断节点是否存在,在Zookeeper中
        Stat stat = zooKeeper.exists("/test", false);

        if (stat == null) {
            System.out.println("当前节点不存在...");
        } else {
            byte[] data = zooKeeper.getData("/test", false, stat);
            System.out.println(new String(data));
        }

    }

    
    @Test
    public void getAndWatch() throws InterruptedException, KeeperException {

        Stat stat = zooKeeper.exists("/test12", false);

        if (stat == null) {
            System.out.println("当前节点不存在...");
            return;
        }

        final boolean[] flagOut = {true};

        //这里监听的是我们获取子节点的数据有没有发生变化
        byte[] data = zooKeeper.getData("/test12", new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                System.out.println(watchedEvent);
                flagOut[0] = true;
            }
        }, stat);


        if (flagOut[0]) {
            System.out.println(new String(data));
        }

        
        while (true) {
            //睡眠时间为1秒,即1秒中调用一次
            Thread.sleep(1000);
//            flagOut[0] =false;
            if (flagOut[0]) {
                getAndWatch();
            }

        }


    }

    
    @Test
    public void set() throws InterruptedException, KeeperException {

        Stat stat = zooKeeper.exists("/test", false);

        if (stat == null) {
            System.out.println("当前节点不存在...");
            return;
        }

        Stat result = zooKeeper.setData("/test", "test".getBytes(), stat.getVersion());

        System.out.println(result != null ? "设置节点值成功..." : "设置节点值失败...");
    }

    private Stat getStat(String path) throws InterruptedException, KeeperException {
        Stat stat = zooKeeper.exists(path, false);

        if (stat == null) {
            System.out.println("当前节点不存在...");
            exist();
        }

        return stat;
    }

    
    @Test
    public void rm() throws InterruptedException, KeeperException {

        Stat result = getStat("/test0000000006");

        zooKeeper.delete("/test0000000006", result.getVersion());
    }

    
    public void deleteAll(String path) throws InterruptedException, KeeperException {
        Stat stat = getStat(path);
        List children = zooKeeper.getChildren(path, false);

        for (String child : children) {

            deleteAll(path + "/" + child);
        }

//        zooKeeper.delete(path,stat.getVersion());
        //这里的version即stat里面的得到的节点的dataversion - znode数据变化号。如果不知道是多少,则传入-1即可
        zooKeeper.delete(path, -1);
    }


    @Test
    public void deleteAllTest() throws InterruptedException, KeeperException {

        deleteAll("/test");

    }

    
    public void lsAndWatch(String path) throws InterruptedException, KeeperException {

        List children = zooKeeper.getChildren(path, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                System.out.println(watchedEvent);
                try {
                    lsAndWatch(path);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (KeeperException e) {
                    e.printStackTrace();
                }
            }
        });

        System.out.println(children);

    }

    @Test
    public void testWatch() throws InterruptedException, KeeperException {
        lsAndWatch("/test");
        Thread.sleep(Long.MAX_VALUE);
    }

}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/677238.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号