栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

【使用zookpeer】模拟 hadoop的 datenode与namenode 的master-slaves的 关系

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

【使用zookpeer】模拟 hadoop的 datenode与namenode 的master-slaves的 关系

一:功能需求 总结

用到的知识点 :
1.序列化和反序列化
2.zookpeer监听机制
3.对象流完成 datanode的 信息的注册.

1.功能:

  1. 当 datanode上线时,namenode中可以感知到.
  2. 当datanode下线时,namenode中可以感知到.
    2.分析:
  3. 当datanode上线时,datanode在zk中创建的节点类型为 瞬时的顺序节点. 
    2. namenode临听节点的变化情况,一旦有一个datanode下线,则这个节点会删除,namenode会自动回调》
  4. 请用对象流完成 datanode的 信息的注册.
  5. datanode的节点信息: System中获取 properties.
二: 前提:

利用zk完成,请先在zk中创建一个 servers 节点.

三 .模拟代码实现 3.1 DataNodeServer类

DataNodeServer该类用于提供外界访问,我把他在maven中pom.xml中加入 指定的主类build 配置 ,这个配置 在jar包中 manidest 文件中加入Main-class指定,就可通过java -Defile.encoding = utf-8 -jar xxx.jar包执行了

public class DataNodeServer {

private String parentNode="/servers";//这是整个集群的根节点
private static ZkHelper zkHelper;
private static ZooKeeper zk;
private static Logger logger = Logger.getLogger(DataNodeServer.class);
private static Scanner sc = new Scanner(System.in);
private static String zkUrl;


private void getConnect() throws IOException, InterruptedException {
    if(zkUrl!=null){
        zkHelper.connectString = zkUrl;
    }
    zkHelper = new ZkHelper();
    zk = zkHelper.connect();
}

private byte[]  toByteArray(Object obj){
    byte[] bs = null;
    ByteArrayOutputStream bos = new ByteArrayOutputStream(); //将数据以byte[]写出(输出)到内存
      ObjectOutputStream oos = null;
    try{
          oos = new ObjectOutputStream(bos);//将可以序列化的对象序列化写出到bos中   obj必须实现java.io.Serializable
          oos.writeObject(obj);
          oos.flush();

          //obj中的数据变成 byte[] 存到内存
          bs = bos.toByteArray();
    }catch (IOException e){
        e.printStackTrace();
    }finally {
          try{
              if(oos!=null){
                  oos.close();
              }
              if(bos!=null){
                  bos.close();
              }
          }catch (IOException e){
              e.printStackTrace();
          }
    }
    return bs;
}


private void registerServer(){
    Properties p = getLocalHostInfo();
    byte[] data = toByteArray(p);  //将序列化的byte数组 赋给 data数组
    String ip = (String) p.get("ip");
    try {
        
        String resultPath = zk.create(parentNode+"/server_"+ip+"_",data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
        logger.info("在服务器创建节点成功,路径为:"+resultPath);
    } catch (KeeperException e) {
        e.printStackTrace();
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}


private static Properties getLocalHostInfo() {

      Runtime r = Runtime.getRuntime(); //jvm信息   : jvm内存
      Properties props = System.getProperties();  //操作系统信息
       InetAddress addr = null;

    try {
        addr = InetAddress.getLocalHost();//IP地址
    } catch (UnknownHostException e) {
        e.printStackTrace();
    }

    String ip = addr.getHostAddress();   //取地址


    props.setProperty("ip",ip);
    props.setProperty("hostname", addr.getHostName());
    props.setProperty("totalMem",r.freeMemory()+"");
    props.setProperty("availableProcessors",r.availableProcessors()+"");

    System.out.println("ip: " + ip);
    System.out.println("hostname: "+addr.getHostName());
    System.out.println("jvm可以使用的总内存: "+ r.totalMemory());
    System.out.println("JVM可以使用的剩余内存: "+r.freeMemory());
    System.out.println("JVM可以使用的处理器个数: "+r.availableProcessors());


    System.out.println("Java的运行环境版本:"+props.getProperty("java.version"));
    System.out.println("Java的运行环境供应商:"+props.getProperty("java.vendor"));
    System.out.println("Java供应商的URL:"+props.getProperty("java.vendor.url"));
    System.out.println("Java的安装路径:"+props.getProperty("java.home"));
    System.out.println("Java的虚拟机规范版本:"+props.getProperty("java.vm.specification.version"));
    System.out.println("Java的虚拟机规范供应商:"+props.getProperty("java.vm.specification.vendor"));
    System.out.println("Java的虚拟机规范名称:"+props.getProperty("java.vm.specification.name"));
    System.out.println("Java的虚拟机实现版本:"+props.getProperty("java.vm.version"));
    System.out.println("Java的虚拟机实现供应商:"+props.getProperty("java.vm.vendor"));
    System.out.println("Java的虚拟机实现名称:"+props.getProperty("java.vm.name"));
    System.out.println("Java运行时环境规范版本:"+props.getProperty("java.specification.version"));
    System.out.println("Java运行时环境规范供应商:"+props.getProperty("java.specification.vender"));
    System.out.println("Java运行时环境规范名称:"+props.getProperty("java.specification.name"));
    System.out.println("Java的类格式版本号:"+props.getProperty("java.class.version"));
    System.out.println("Java的类路径:"+props.getProperty("java.class.path"));
    System.out.println("加载库时搜索的路径列表:"+props.getProperty("java.library.path"));
    System.out.println("默认的临时文件路径:"+props.getProperty("java.io.tmpdir"));
    System.out.println("一个或多个扩展目录的路径:"+props.getProperty("java.ext.dirs"));
    System.out.println("操作系统的名称:"+props.getProperty("os.name"));
    System.out.println("操作系统的构架:"+props.getProperty("os.arch"));
    System.out.println("操作系统的版本:"+props.getProperty("os.version"));
    System.out.println("文件分隔符:"+props.getProperty("file.separator")); //在 unix 系统中是"/"
    System.out.println("路径分隔符:"+props.getProperty("path.separator")); //在 unix 系统中是":"
    System.out.println("行分隔符:"+props.getProperty("line.separator")); //在 unix 系统中是"/n"
    System.out.println("用户的账户名称:"+props.getProperty("user.name"));
    System.out.println("用户的主目录:"+props.getProperty("user.home"));
    System.out.println("用户的当前工作目录:"+props.getProperty("user.dir"));



    return props;
}




private void business(){
    System.out.println("请输入割圆次数: ");
    int n = sc.nextInt();
      double y = 1.0;
      for (int i = 0; i < n; i++) {
          double π   =  3 * Math.pow( 2 , i) * y;
        System.out.println("第"+i+"次切割,为正"+(6 + 6*i) + "边形,圆周率π = " + π);
        y = Math.sqrt( 2 - Math.sqrt( 4 - y*y));
    }
}


public static void main(String[] args) throws InterruptedException, IOException {
    if(args != null && args.length >0){
        zkUrl = args[0];
    }

    DataNodeServer server = new DataNodeServer();
    server.getConnect();
    server.registerServer(); //第一次datanode后注册到zk

    boolean flag = false;
    int choice = 2;
    while (!flag){
        System.out.println("请输入你要的操作:n1.执行计算任务  n2.退出n");
        choice = sc.nextInt();
        switch (choice){
            case 1:
                 server.business();
                 break;
            case 2:
                zkHelper.close();
                flag = true;
                sc.close();
                break;
            default:
                System.out.println("没有这个操作....");
        }
    }
    System.out.println("客户端退出");
}

}

3.2 NameNodeServer

该类用于监视datanodeserve的心跳,可以通过zookpeer的watch来监听上线节点数目,只需要实现Watcher接口再传入

public class NameNodeServer {

    private static ZkHelper zkHelper;
    private static ZooKeeper zk;
    private static Logger logger = Logger.getLogger(NameNodeServer.class);
    private String parentNode = "/servers";


    //初始化节点parentdate节点
    private void initMainNode()throws  Exception{
        zkHelper = new ZkHelper();
        zk = zkHelper.connect();
        if(zk==null || zk.getState()!= ZooKeeper.States.CONNECTED){
            logger.error("没有建立起与zookpeer服务器"+zkHelper.getConnectString()+"的连接");
            throw new Exception("没有建立起与zookpeer服务器"+zkHelper.getConnectString()+"的连接");
        }
        try{
            zk.exists(parentNode,true);
        }catch (Exception e){
            zk.create(parentNode,"this is yc datanode cluster".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        }
    }

    
    private void getServerList() throws KeeperException,InterruptedException{
        CountDownLatch countDownLatch = new CountDownLatch(Integer.MAX_VALUE);

         MyMatch nm = new MyMatch(zk,parentNode,countDownLatch);
        List servers = zk.getChildren(parentNode,nm);/
    public void closeZookpeer(){
        logger.info("关闭zookpeer的连接....");
        if(zk!=null && zk.getState() == ZooKeeper.States.CONNECTED){
            try {
                zk.close();
            }catch (InterruptedException e){
                e.printStackTrace();
            }
        }
    }
//start-all.sh   stop.all.sh
    public static void main(String[] args) throws Exception {
        //1.实例化服务器
        NameNodeServer server = new NameNodeServer();
        //2.建立连接 NameNodeServer -> zk;
        //3.判断/servers是否存在 如果不存在 则创建 -》init
        server.initMainNode();;

        //绑定监听 /servers下的/子节点变化
        // * ls path [watch] ->getChildren
        /
class MyMatch implements Watcher {

    private ZooKeeper zk;
    private String path;
    private CountDownLatch countDownLatch;
    private Logger logger = Logger.getLogger(MyMatch.class);

    private void showChildNodeInfo(List children) {
        //1.循环子节点列表
        logger.info("当前在线的datanode有:" + children.size());
        logger.info("他们是:");
        logger.info(children); //[xx,xx,xxx]
        logger.info("各节点的详情:");
        for (String sonPath : children) {

            logger.info("*****" + sonPath + "*********");

            try {
                byte[] data = zk.getData(path + "/" + sonPath, false, null);//byte[]->反序列化
                Properties p = (Properties) deserilizable(data);
                logger.info(p);
                logger.info("**************");

            } catch (KeeperException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            //2.根据子节点 路径 显示信息
        }
    }
序列化转为byte数组
    private Object deserilizable(byte[] data) {

        Object obj = null;
        ByteArrayInputStream bis = null;
        ObjectInputStream ois = null;
        try {
            bis = new ByteArrayInputStream(data);
            ois = new ObjectInputStream(bis);
            obj = ois.readObject();
        } catch (ClassNotFoundException | IOException e) {
            e.printStackTrace();
        } finally {
            if (ois != null) {
                try {
                    ois.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
            if (bis != null) {
                try {
                    bis.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
        return obj;
    }

    @Override
    public void process(WatchedEvent event) {
        if (event.getType() == NodeChildrenChanged) {
            //实现子节点列表变化的代码
            logger.info("监听到了子节点的变化...");
            try {
                 //重新绑定
                List children = zk.getChildren(path,MyMatch.this);
                showChildNodeInfo(children);
            } catch (KeeperException | InterruptedException e) {

                e.printStackTrace();
                logger.error(e.getMessage());
            }
        } else if (event.getType() == NodeDataChanged) {
            try {
                Stat stat = new Stat();
           //画龙点睛之笔
                byte[] data = zk.getData(path, MyMatch.this, stat);//异常不能抛出
                String dataString = new String(data,"utf-8");
                logger.info("监听程序中获取节点:"+path+"更新后的数据为:"+dataString);
                logger.info("节点最新的信息stat为:");

                String info = YcZnodeUtil.printZnodeInfo(stat);
                logger.info(info);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        countDownLatch.countDown();  //递减
        //重新设置countdown 的初始值 让他循环
        if (countDownLatch.getCount() == 1) {
            countDownLatch = new CountDownLatch(Integer.MAX_VALUE);
        }
        System.out.println("当前  connectedsignal为:   " + countDownLatch.getCount());


    }
    public MyMatch(ZooKeeper zk,String path,CountDownLatch countDownLatch){
        super();
        this.zk = zk;
        this.path = path;
        this.countDownLatch = countDownLatch;
    }

    public MyMatch(){
        super();
    }

    public void setZk(ZooKeeper zk){
        this.zk = zk;
    }
    public void setPath(String path){
        this.path = path;
    }
}


# pom文件配置


        

            
                org.apache.maven.plugins
                maven-compiler-plugin
                
                    8
                    8
                
             

            
              org.apache.maven.plugins
              maven-assembly-plugin
                
                  false
                    

                        jar-with-dependencies
                    
                  
                      

                          com.yc.Zookpeek.program2.NameNodeServer
                      
                  


             
                
                    
                        make-assembly

                        package
                        
                            assembly
                        
                    
                
         

        
    


    

        
            junit
            junit
            4.13.2
            test
        
        
            org.junit.jupiter
            junit-jupiter
            RELEASE
            compile
        


        
            org.apache.zookeeper
            zookeeper
            3.4.6
        


        
            org.projectlombok
            lombok
            1.18.20
            provided
        



    



转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/871914.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号