栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Zookeeper实现配置中心

Zookeeper实现配置中心

Zookeeper实现配置中心 一、环境准备

1 . 准备集群

搭建Zookeeper集群,我准备了四服务,分别是192.168.91.129,192.168.91.130, 192.168.91.131, 192.168.91.132,我本地用虚拟机开的四台机器,家里有条件的可以直接搞几台阿里云服务器,其实一台机器也能搞,推荐用三台服务器

2 . 导入依赖(客户端版本和Server端版本保持一致)

  
    org.apache.zookeeper
    zookeeper
    3.6.3
  
二、实现思路

1 . 创建Zookeeper对象(利用门栓保证zk可用)
2 . 判断配置中心的节点是否存在(使用回调的方式,需要传递 watch和statWatch),然后阻塞线程,直到获取配置完成
3 . 几种情况

  1. 第一个分支【没有配置文件节点】:如果没有配置文件节点,那需要在exists的Watch中对event时间Type中NodeCreated进行处理(一旦配置文件创建了就会回调这个地方),处理无非就是去get配置节点
  2. 第二个分支【有配置文件】:这个是正常流程用getData把节点数据读取回来(需要传递Watch监控后续更新)
  3. 第三个分支【配置文件更新】:getData的时候传递Watch对后续一些操作进行了监听,如果配置文件更新需要重新getData
  4. 第四个分支【配置节点删除】:这个地方有处理的方式比较宽泛,如果要求配置文件节点被删除了,配置一定也要同步删除,强一致要求配置和节点同步,那就需要清空配置文件,并且重新阻塞,重新getData,如果没有这种需求那这里就看着处理就行了(毕竟谁没事会删除配置文件呢)
三、代码实现

1 . Test测试

package org.example.configdemo2;

import org.apache.zookeeper.ZooKeeper;
import org.example.congfig.MyConf;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;


public class TestMain {

    
    private ZooKeeper zk;

    
    private final CountDownLatch countDownLatch = new CountDownLatch(1);

    @Before
    public void connection() {
        try {
            zk = new ZooKeeper("192.168.91.129,192.168.91.130,192.168.91.131,192.168.91.132/configTestdemo", 1000, event -> {
                switch (event.getState()) {
                    case Unknown:
                        break;
                    case Disconnected:
                        break;
                    case NoSyncConnected:
                        break;
                    case SyncConnected:
                        System.out.println("zk connection success !!");
                        countDownLatch.countDown();
                        break;
                    case AuthFailed:
                        break;
                    case ConnectedReadOnly:
                        break;
                    case SaslAuthenticated:
                        break;
                    case Expired:
                        break;
                    case Closed:
                        break;
                    default:
                }
            });
            try {
                countDownLatch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    @After
    public void close() {
        try {
            zk.close();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    @Test
    public void show() {
        WatchAndCallback watchAndCal = new WatchAndCallback(zk);
        MyConf conf = watchAndCal.getConf();
        while (true){
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            String config = conf.getConfig();
            if(config == null || "".equals(config)){
                conf = watchAndCal.getConf();
            }
            System.out.println(config);
        }
    }
}

2 . WatchAndCallback类(一个工具类)

package org.example.configdemo2;

import lombok.Data;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.example.congfig.MyConf;

import java.util.concurrent.CountDownLatch;


@Data
public class WatchAndCallback implements Watcher, AsyncCallback.StatCallback, AsyncCallback.DataCallback {

    
    private ZooKeeper zk;

    
    private MyConf myConf = new MyConf();

    
    public static final String PATH_NODE = "/cnf";

    
    private CountDownLatch countDownLatch = new CountDownLatch(1);

    
    public WatchAndCallback(ZooKeeper zk) {
        this.zk = zk;
    }

    @Override
    public void process(WatchedEvent event) {
        switch (event.getType()) {
            case None:
                break;
            case NodeCreated:
                zk.getData(PATH_NODE, this, this, "get");
                break;
            case NodeDeleted:
                myConf.setConfig("");
                countDownLatch = new CountDownLatch(1);
                System.out.println("配置删除,阻塞等待... ...");
                zk.getData(PATH_NODE, this, this, "get");
                break;
            case NodeDataChanged:
                zk.getData(PATH_NODE, this, this, "get");
                break;
            case NodeChildrenChanged:
                break;
            case DataWatchRemoved:
                break;
            case ChildWatchRemoved:
                break;
            case PersistentWatchRemoved:
                break;
            default:
        }
    }

    public MyConf getConf() {
        zk.exists(PATH_NODE, this, this, "exists");
        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return myConf;
    }

    @Override
    public void processResult(int rc, String path, Object ctx, Stat stat) {
        // stat callback
        if (stat != null) {
            zk.getData(PATH_NODE, this, this, "get");
        }
    }

    @Override
    public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
        if (stat != null) {
            myConf.setConfig(new String(data));
            countDownLatch.countDown();
        }
    }
}

四、测试结果

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/676915.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号