Hadoop框架Zookeeper Java API
引入zookeeper依赖测试连接
1、新建连接2、创建临时节点3、运行测试 ZKJavaAPI
名词解析创建永久节点创建临时节点获取节点数据修改数据删除节点事件完整代码
Hadoop框架Zookeeper Java API 引入zookeeper依赖去Maven官网引入Zookeeper依赖。
选择3.4.6版本,复制到IDEA的pom文件里
org.apache.zookeeper zookeeper 3.4.6
新建ZOOKEEPER包
新建ZKJavaAPI
测试连接 1、新建连接这里需要抛出异常
// 1、新建连接
ZooKeeper zk = new ZooKeeper(
"master:2181,node1:2181,node2:2181"
, 100000
, null
);
2、创建临时节点
这里需要抛出异常
zk.create("/test1"
,"abcdefg".getBytes()
, ZooDefs.Ids.OPEN_ACL_UNSAFE
, CreateMode.PERSISTENT
);
3、运行测试
package com.liangzai.ZOOKEEPER;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import java.io.IOException;
public class ZKJavaAPI {
public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
// 1、新建连接
ZooKeeper zk = new ZooKeeper(
"master:2181,node1:2181,node2:2181"
, 100000
, null
);
// 2、创建临时节点
zk.create("/test1"
,"abcdefg".getBytes()
, ZooDefs.Ids.OPEN_ACL_UNSAFE
, CreateMode.EPHEMERAL
);
}
}
控制台输出结果: log4j:WARN No appenders could be found for logger (org.apache.zookeeper.ZooKeeper). log4j:WARN Please initialize the log4j system properly. log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. Process finished with exit code 0
去ZK里查看运行结果
#启动ZK zkCli.sh -server node1:2181 #ZK Shell ls / get /test1
运行结果:
ZKJavaAPI 名词解析注意我这里并没有zk.close();
因为创建的是临时节点,断开了就会被删除
这里是运行成功的。
下面我将详细介绍。
ZooDefs.Ids :控制所创建的ZNODE的权限OPEN_ACL_UNSAFE :完全开放的ACL,任何连接的客户端都可以操作该属性znodeCREATOR_ALL_ACL :只有创建者才有ACL权限READ_ACL_UNSAFE :只能读取ACLCreateMode :创建的ZNODE的类型PERSISTENT :永久创建,连接断开不会删除EPHEMERAL :创建临时节点,连接断开即删除 创建永久节点
ZooKeeper zk;
@Before
// 创建连接
public void init() throws IOException {
zk = new ZooKeeper(
"master:2181,node1:2181,node2:2181"
, 100000
, null
);
}
@Test
// 创建永久节点
public void createPersistentZNODE() throws InterruptedException, KeeperException {
zk.create(
"/test3"
, "def".getBytes()
, ZooDefs.Ids.OPEN_ACL_UNSAFE
, CreateMode.PERSISTENT
);
}
@After
// 关闭连接
public void closed() throws InterruptedException {
zk.close();
}
ls / get /test3
运行结果:
创建临时节点可见@After注解关闭了ZK连接
test3被创建了
PERSISTENT :永久创建,连接断开不会删除
ZooKeeper zk;
@Before
// 创建连接
public void init() throws IOException {
zk = new ZooKeeper(
"master:2181,node1:2181,node2:2181"
, 100000
, null
);
}
@Test
// 创建临时节点
public void createEPHEMERALZNODE() throws InterruptedException, KeeperException {
zk.create(
"/test2"
, "abc".getBytes()
, ZooDefs.Ids.OPEN_ACL_UNSAFE
, CreateMode.EPHEMERAL
);
}
@After
// 关闭连接
public void closed() throws InterruptedException {
zk.close();
}
ls /
运行结果:
获取节点数据可见后面@After注解里将ZK连接关闭了
EPHEMERAL : 创建临时节点,连接断开即删除
所以test2并没有被创建
ZooKeeper zk;
@Before
// 创建连接
public void init() throws IOException {
zk = new ZooKeeper(
"master:2181,node1:2181,node2:2181"
, 100000
, null
);
}
@Test
// 获取节点数据
public void getZNODE() throws InterruptedException, KeeperException {
byte[] data = zk.getData("/test1", null, new Stat());
// 字节数组转String,这里toString没有意义
System.out.println(new String(data));
}
@After
// 关闭连接
public void closed() throws InterruptedException {
zk.close();
}
get /test1
运行结果:
这里的watcher就是我们NameNode的故障转移机制
修改数据 ZooKeeper zk;
@Before
// 创建连接
public void init() throws IOException {
zk = new ZooKeeper(
"master:2181,node1:2181,node2:2181"
, 100000
, null
);
}
@Test
// 修改数据
public void setZNODE() throws InterruptedException, KeeperException {
zk.setData("/test1","liangzai".getBytes(),1);
}
@After
// 关闭连接
public void closed() throws InterruptedException {
zk.close();
}
get /test1
运行结果:
删除节点刚刚我们test1里面的数据是bacdef
运行后改成了liangzai
ZooKeeper zk;
@Before
// 创建连接
public void init() throws IOException {
zk = new ZooKeeper(
"master:2181,node1:2181,node2:2181"
, 100000
, null
);
}
@Test
// 删除节点
public void deleteZNODE() throws InterruptedException, KeeperException {
zk.delete("/test3", 0);
}
@After
// 关闭连接
public void closed() throws InterruptedException {
zk.close();
}
ls /
运行结果:
事件这里通过ls / 查看后
test3被删除了
ZooKeeper zk;
@Before
// 创建连接
public void init() throws IOException {
zk = new ZooKeeper(
"master:2181,node1:2181,node2:2181"
, 100000
, null
);
}
@Test
// 事件
public void triggerWatcher() throws InterruptedException, KeeperException {
zk.exists("/test", new Watcher() {
@Override
public void process(WatchedEvent event) {
System.out.println("/test 发生了变化");
System.out.println(event.getPath());
System.out.println(event.getState());
System.out.println(event.getType());
System.out.println(event.getWrapper());
}
});
@After
// 关闭连接
public void closed() throws InterruptedException {
zk.close();
}
set /test node1
运行结果:
/test 发生了变化 /test SyncConnected NodeDataChanged 3,3,'/test
完整代码事件方便我们去监控NameNode的故障转移机制
package com.liangzai.ZOOKEEPER;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
public class ZKJavaAPI {
ZooKeeper zk;
@Before
// 创建连接
public void init() throws IOException {
zk = new ZooKeeper(
"master:2181,node1:2181,node2:2181"
, 100000
, null
);
}
@Test
// 创建临时节点
public void createEPHEMERALZNODE() throws InterruptedException, KeeperException {
zk.create(
"/test2"
, "abc".getBytes()
, ZooDefs.Ids.OPEN_ACL_UNSAFE
, CreateMode.EPHEMERAL
);
}
@Test
// 创建永久节点
public void createPersistentZNODE() throws InterruptedException, KeeperException {
zk.create(
"/test3"
, "def".getBytes()
, ZooDefs.Ids.OPEN_ACL_UNSAFE
, CreateMode.PERSISTENT
);
}
@Test
// 获取节点数据
public void getZNODE() throws InterruptedException, KeeperException {
byte[] data = zk.getData("/test1", null, new Stat());
// 字节数组转String,这里toString没有意义
System.out.println(new String(data));
}
@Test
// 修改数据
public void setZNODE() throws InterruptedException, KeeperException {
zk.setData("/test1", "liangzai".getBytes(), 1);
}
@Test
// 删除节点
public void deleteZNODE() throws InterruptedException, KeeperException {
zk.delete("/test3", 0);
}
@Test
// 事件
public void triggerWatcher() throws InterruptedException, KeeperException {
zk.exists("/test", new Watcher() {
@Override
public void process(WatchedEvent event) {
System.out.println("/test 发生了变化");
System.out.println(event.getPath());
System.out.println(event.getState());
System.out.println(event.getType());
System.out.println(event.getWrapper());
}
});
// 加入死循环,不会被退出 方便在控制台输出
while (true) {
}
}
@After
// 关闭连接
public void closed() throws InterruptedException {
zk.close();
}
}
到底啦!觉得靓仔的文章对你学习Hadoop有所帮助的话,一波三连吧!q(≧▽≦q)



