栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

09、Hadoop框架Zookeeper Java API

09、Hadoop框架Zookeeper Java API

文章目录

Hadoop框架Zookeeper Java API

引入zookeeper依赖测试连接

1、新建连接2、创建临时节点3、运行测试 ZKJavaAPI

名词解析创建永久节点创建临时节点获取节点数据修改数据删除节点事件完整代码

Hadoop框架Zookeeper Java API 引入zookeeper依赖

  去Maven官网引入Zookeeper依赖。

  选择3.4.6版本,复制到IDEA的pom文件里


	org.apache.zookeeper
	zookeeper
	3.4.6

  新建ZOOKEEPER包

  新建ZKJavaAPI

测试连接 1、新建连接

  这里需要抛出异常

        // 1、新建连接
        ZooKeeper zk = new ZooKeeper(
                "master:2181,node1:2181,node2:2181"
                , 100000
                , null
        );
2、创建临时节点

  这里需要抛出异常

        zk.create("/test1"
                ,"abcdefg".getBytes()
                , ZooDefs.Ids.OPEN_ACL_UNSAFE
                , CreateMode.PERSISTENT
        );
3、运行测试
package com.liangzai.ZOOKEEPER;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;

import java.io.IOException;

public class ZKJavaAPI {
    public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
        // 1、新建连接
        ZooKeeper zk = new ZooKeeper(
                "master:2181,node1:2181,node2:2181"
                , 100000
                , null
        );

        // 2、创建临时节点
        zk.create("/test1"
                ,"abcdefg".getBytes()
                , ZooDefs.Ids.OPEN_ACL_UNSAFE
                , CreateMode.EPHEMERAL
        );
    }
}
控制台输出结果:
log4j:WARN No appenders could be found for logger (org.apache.zookeeper.ZooKeeper).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.

Process finished with exit code 0

去ZK里查看运行结果

#启动ZK
zkCli.sh -server node1:2181

#ZK Shell
ls /
get /test1

运行结果:

注意我这里并没有zk.close();
因为创建的是临时节点,断开了就会被删除
这里是运行成功的。
下面我将详细介绍。

ZKJavaAPI 名词解析

ZooDefs.Ids :控制所创建的ZNODE的权限OPEN_ACL_UNSAFE :完全开放的ACL,任何连接的客户端都可以操作该属性znodeCREATOR_ALL_ACL :只有创建者才有ACL权限READ_ACL_UNSAFE :只能读取ACLCreateMode :创建的ZNODE的类型PERSISTENT :永久创建,连接断开不会删除EPHEMERAL :创建临时节点,连接断开即删除 创建永久节点

    ZooKeeper zk;

    @Before
    // 创建连接
    public void init() throws IOException {
        zk = new ZooKeeper(
                "master:2181,node1:2181,node2:2181"
                , 100000
                , null
        );
    }

    @Test
    // 创建永久节点
    public void createPersistentZNODE() throws InterruptedException, KeeperException {
        
        zk.create(
                "/test3"
                , "def".getBytes()
                , ZooDefs.Ids.OPEN_ACL_UNSAFE
                , CreateMode.PERSISTENT
        );
    }

    @After
    // 关闭连接
    public void closed() throws InterruptedException {
        zk.close();
    }
ls /
get /test3

运行结果:

可见@After注解关闭了ZK连接
test3被创建了
PERSISTENT :永久创建,连接断开不会删除

创建临时节点
    ZooKeeper zk;

    @Before
    // 创建连接
    public void init() throws IOException {
        zk = new ZooKeeper(
                "master:2181,node1:2181,node2:2181"
                , 100000
                , null
        );
    }
    
    @Test
    // 创建临时节点
    public void createEPHEMERALZNODE() throws InterruptedException, KeeperException {
        zk.create(
                "/test2"
                , "abc".getBytes()
                , ZooDefs.Ids.OPEN_ACL_UNSAFE
                , CreateMode.EPHEMERAL
        );
    }
    
    @After
    // 关闭连接
    public void closed() throws InterruptedException {
        zk.close();
    }
ls /

运行结果:

可见后面@After注解里将ZK连接关闭了
EPHEMERAL : 创建临时节点,连接断开即删除
所以test2并没有被创建

获取节点数据
    ZooKeeper zk;

    @Before
    // 创建连接
    public void init() throws IOException {
        zk = new ZooKeeper(
                "master:2181,node1:2181,node2:2181"
                , 100000
                , null
        );
    }

    @Test
    // 获取节点数据
    public void getZNODE() throws InterruptedException, KeeperException {
        byte[] data = zk.getData("/test1", null, new Stat());
        // 字节数组转String,这里toString没有意义
        System.out.println(new String(data));
    }
    
    @After
    // 关闭连接
    public void closed() throws InterruptedException {
        zk.close();
    }
get /test1

运行结果:

这里的watcher就是我们NameNode的故障转移机制

修改数据
    ZooKeeper zk;

    @Before
    // 创建连接
    public void init() throws IOException {
        zk = new ZooKeeper(
                "master:2181,node1:2181,node2:2181"
                , 100000
                , null
        );
    }

    @Test
    // 修改数据
    public void setZNODE() throws InterruptedException, KeeperException {
        zk.setData("/test1","liangzai".getBytes(),1);
    }

    @After
    // 关闭连接
    public void closed() throws InterruptedException {
        zk.close();
    }
get /test1

运行结果:

刚刚我们test1里面的数据是bacdef
运行后改成了liangzai

删除节点
    ZooKeeper zk;

    @Before
    // 创建连接
    public void init() throws IOException {
        zk = new ZooKeeper(
                "master:2181,node1:2181,node2:2181"
                , 100000
                , null
        );
    }

    @Test
    // 删除节点
    public void deleteZNODE() throws InterruptedException, KeeperException {
        zk.delete("/test3", 0);
    }
    
    @After
    // 关闭连接
    public void closed() throws InterruptedException {
        zk.close();
    }
ls /

运行结果:

这里通过ls / 查看后
test3被删除了

事件
    ZooKeeper zk;

    @Before
    // 创建连接
    public void init() throws IOException {
        zk = new ZooKeeper(
                "master:2181,node1:2181,node2:2181"
                , 100000
                , null
        );
    }
    
    @Test
    // 事件
    public void triggerWatcher() throws InterruptedException, KeeperException {
        zk.exists("/test", new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                System.out.println("/test 发生了变化");
                System.out.println(event.getPath());
                System.out.println(event.getState());
                System.out.println(event.getType());
                System.out.println(event.getWrapper());
            }
        });
        
    @After
    // 关闭连接
    public void closed() throws InterruptedException {
        zk.close();
    }
set /test node1

运行结果:

/test 发生了变化
/test
SyncConnected
NodeDataChanged
3,3,'/test

事件方便我们去监控NameNode的故障转移机制

完整代码
package com.liangzai.ZOOKEEPER;

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.io.IOException;

public class ZKJavaAPI {
    ZooKeeper zk;

    @Before
    // 创建连接
    public void init() throws IOException {
        zk = new ZooKeeper(
                "master:2181,node1:2181,node2:2181"
                , 100000
                , null
        );
    }

    @Test
    // 创建临时节点
    public void createEPHEMERALZNODE() throws InterruptedException, KeeperException {
        zk.create(
                "/test2"
                , "abc".getBytes()
                , ZooDefs.Ids.OPEN_ACL_UNSAFE
                , CreateMode.EPHEMERAL
        );
    }

    @Test
    // 创建永久节点
    public void createPersistentZNODE() throws InterruptedException, KeeperException {
        
        zk.create(
                "/test3"
                , "def".getBytes()
                , ZooDefs.Ids.OPEN_ACL_UNSAFE
                , CreateMode.PERSISTENT
        );
    }

    @Test
    // 获取节点数据
    public void getZNODE() throws InterruptedException, KeeperException {
        byte[] data = zk.getData("/test1", null, new Stat());
        // 字节数组转String,这里toString没有意义
        System.out.println(new String(data));
    }

    @Test
    // 修改数据
    public void setZNODE() throws InterruptedException, KeeperException {
        zk.setData("/test1", "liangzai".getBytes(), 1);
    }

    @Test
    // 删除节点
    public void deleteZNODE() throws InterruptedException, KeeperException {
        zk.delete("/test3", 0);
    }

    @Test
    // 事件
    public void triggerWatcher() throws InterruptedException, KeeperException {
        zk.exists("/test", new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                System.out.println("/test 发生了变化");
                System.out.println(event.getPath());
                System.out.println(event.getState());
                System.out.println(event.getType());
                System.out.println(event.getWrapper());
            }
        });

        // 加入死循环,不会被退出 方便在控制台输出
        while (true) {

        }
    }

    @After
    // 关闭连接
    public void closed() throws InterruptedException {
        zk.close();
    }
}

到底啦!觉得靓仔的文章对你学习Hadoop有所帮助的话,一波三连吧!q(≧▽≦q)

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/733950.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号