栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

ZooKeeper学习笔记二:API基本使用

ZooKeeper学习笔记二:API基本使用

Grey

ZooKeeper学习笔记二:API基本使用

准备工作

搭建一个zk集群,参考ZooKeeper学习笔记一:集群搭建。

确保项目可以访问集群的每个节点

新建一个基于jdk1.8的maven项目。

配置依赖

     org.apache.zookeeper
     zookeeper
     3.6.3

注:zookeeper的依赖版本要和集群安装的zookeeper版本一致。

zk配置类

新建ZookeeperConfig.java,作为一个工具类,获取zk客户端实例,具体代码如下:

import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;

public class ZookeeperConfig {
    private static final String ADDRESS = "192.168.205.145:2181,192.168.205.146:2181,192.168.205.147:2181,192.168.205.148:2181";
    private static ZooKeeper zk;
    static CountDownLatch latch;

    public static ZooKeeper create() {
        latch = new CountDownLatch(1);
        try {
            zk = new ZooKeeper(ADDRESS, 3000, new DefaultWatch());
            latch.await();
        } catch (IOException | InterruptedException e) {
            e.printStackTrace();
        }
        return zk;
    }

    public static void close() {
        if (zk != null) {
            try {
                zk.close();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    private static class DefaultWatch implements Watcher {
        @Override
        public void process(WatchedEvent event) {
            if (event.getState() == Event.KeeperState.SyncConnected) {
                latch.countDown();
            }
        }
    }
}

调用

Zookeeper zookeeper = ZookeeperConfig.create();

即可获取一个zk客户端。

简单使用

对于一些set/get方法,我做了一些简单的封装,包括直接getData,getData同时增加watcher,通过回调函数来实现getData的后续逻辑,代码很简单,如下:

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;

import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.zookeeper.CreateMode.EPHEMERAL;
import static org.apache.zookeeper.ZooDefs.Ids.OPEN_ACL_UNSAFE;

public class ReactiveClient {
    public static final String ADDRESS = "192.168.205.145:2181,192.168.205.146:2181,192.168.205.147:2181,192.168.205.148:2181";
    private static final ZooKeeper CLIENT = ZookeeperConfig.create();

    public static void main(String[] args) {
        getData();
        getDataWithWatcher();
        getDataAndCallback();
        pending(10000);
    }

    private static void getDataAndCallback() {
        System.out.println("get data and callback");
        String path = "/abc";
        String data = "Hello";
        createOrUpdate(path, data);
        CLIENT.getData(path, false, (rc, path1, ctx, data1, stat) -> {
            //System.out.println(rc);
            //System.out.println(ctx);
            System.out.println("call back get data: " + new String(data1));
            //System.out.println(stat);
        }, "abc");
    }


    private static void getDataWithWatcher() {
        System.out.println("---create and get data with watcher---");
        String path = "/abc";
        String data = "Hello";
        createOrUpdate(path, data);
        Stat stat = new Stat();
        try {
            Stat finalStat = stat;
            byte[] data1 = CLIENT.getData(path, new Watcher() {
                @Override
                public void process(WatchedEvent event) {
                    // System.out.println("get data event: " + event);
                    try {
                        byte[] data2 = CLIENT.getData(path, this, finalStat);
                        System.out.println("get data from event : " + new String(data2));
                    } catch (KeeperException | InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }, stat);
            System.out.println(new String(data1));
        } catch (KeeperException | InterruptedException e) {
            e.printStackTrace();
        }
        String newData = "World";
        try {
            // 触发回调
            stat = CLIENT.setData(path, newData.getBytes(UTF_8), stat.getVersion());
            stat = CLIENT.setData(path, newData.getBytes(UTF_8), stat.getVersion());
            stat = CLIENT.setData(path, newData.getBytes(UTF_8), stat.getVersion());
        } catch (KeeperException | InterruptedException e) {
            e.printStackTrace();
        }
    }

    private static void getData() {
        String path = "/abc";
        String data = "Hello";
        createOrUpdate(path, data);
        String result = getData(path);
        System.out.println(result);
        createOrUpdate(path, "world");
        result = getData(path);
        System.out.println(result);
    }

    public static void pending(long sec) {
        try {
            Thread.sleep(sec);
            ZookeeperConfig.close();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }


    public static String getData(String path) {
        try {
            return new String(CLIENT.getData(path, false, new Stat()));
        } catch (KeeperException | InterruptedException e) {
            e.printStackTrace();
        }
        return null;
    }

    public static void createOrUpdate(String path, String data) {
        try {
            Stat exists = CLIENT.exists(path, false);
            if (null != exists) {
                CLIENT.setData(path, data.getBytes(UTF_8), exists.getVersion());
                return;
            }
            // 创建一个节点
            CLIENT.create(path, data.getBytes(UTF_8), OPEN_ACL_UNSAFE, EPHEMERAL);
        } catch (KeeperException | InterruptedException e) {
            e.printStackTrace();
        }
    }
}

源码

Github

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/317234.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号