用到的知识点 :
1.序列化和反序列化
2.zookpeer监听机制
3.对象流完成 datanode的 信息的注册.
1.功能:
- 当 datanode上线时,namenode中可以感知到.
- 当datanode下线时,namenode中可以感知到.
2.分析: - 当datanode上线时,datanode在zk中创建的节点类型为 瞬时的顺序节点.
2. namenode临听节点的变化情况,一旦有一个datanode下线,则这个节点会删除,namenode会自动回调》 - 请用对象流完成 datanode的 信息的注册.
- datanode的节点信息: System中获取 properties.
利用zk完成,请先在zk中创建一个 servers 节点.
三 .模拟代码实现 3.1 DataNodeServer类DataNodeServer该类用于提供外界访问,我把他在maven中pom.xml中加入 指定的主类build 配置 ,这个配置 在jar包中 manidest 文件中加入Main-class指定,就可通过java -Defile.encoding = utf-8 -jar xxx.jar包执行了
public class DataNodeServer {
private String parentNode="/servers";//这是整个集群的根节点
private static ZkHelper zkHelper;
private static ZooKeeper zk;
private static Logger logger = Logger.getLogger(DataNodeServer.class);
private static Scanner sc = new Scanner(System.in);
private static String zkUrl;
private void getConnect() throws IOException, InterruptedException {
if(zkUrl!=null){
zkHelper.connectString = zkUrl;
}
zkHelper = new ZkHelper();
zk = zkHelper.connect();
}
private byte[] toByteArray(Object obj){
byte[] bs = null;
ByteArrayOutputStream bos = new ByteArrayOutputStream(); //将数据以byte[]写出(输出)到内存
ObjectOutputStream oos = null;
try{
oos = new ObjectOutputStream(bos);//将可以序列化的对象序列化写出到bos中 obj必须实现java.io.Serializable
oos.writeObject(obj);
oos.flush();
//obj中的数据变成 byte[] 存到内存
bs = bos.toByteArray();
}catch (IOException e){
e.printStackTrace();
}finally {
try{
if(oos!=null){
oos.close();
}
if(bos!=null){
bos.close();
}
}catch (IOException e){
e.printStackTrace();
}
}
return bs;
}
private void registerServer(){
Properties p = getLocalHostInfo();
byte[] data = toByteArray(p); //将序列化的byte数组 赋给 data数组
String ip = (String) p.get("ip");
try {
String resultPath = zk.create(parentNode+"/server_"+ip+"_",data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
logger.info("在服务器创建节点成功,路径为:"+resultPath);
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private static Properties getLocalHostInfo() {
Runtime r = Runtime.getRuntime(); //jvm信息 : jvm内存
Properties props = System.getProperties(); //操作系统信息
InetAddress addr = null;
try {
addr = InetAddress.getLocalHost();//IP地址
} catch (UnknownHostException e) {
e.printStackTrace();
}
String ip = addr.getHostAddress(); //取地址
props.setProperty("ip",ip);
props.setProperty("hostname", addr.getHostName());
props.setProperty("totalMem",r.freeMemory()+"");
props.setProperty("availableProcessors",r.availableProcessors()+"");
System.out.println("ip: " + ip);
System.out.println("hostname: "+addr.getHostName());
System.out.println("jvm可以使用的总内存: "+ r.totalMemory());
System.out.println("JVM可以使用的剩余内存: "+r.freeMemory());
System.out.println("JVM可以使用的处理器个数: "+r.availableProcessors());
System.out.println("Java的运行环境版本:"+props.getProperty("java.version"));
System.out.println("Java的运行环境供应商:"+props.getProperty("java.vendor"));
System.out.println("Java供应商的URL:"+props.getProperty("java.vendor.url"));
System.out.println("Java的安装路径:"+props.getProperty("java.home"));
System.out.println("Java的虚拟机规范版本:"+props.getProperty("java.vm.specification.version"));
System.out.println("Java的虚拟机规范供应商:"+props.getProperty("java.vm.specification.vendor"));
System.out.println("Java的虚拟机规范名称:"+props.getProperty("java.vm.specification.name"));
System.out.println("Java的虚拟机实现版本:"+props.getProperty("java.vm.version"));
System.out.println("Java的虚拟机实现供应商:"+props.getProperty("java.vm.vendor"));
System.out.println("Java的虚拟机实现名称:"+props.getProperty("java.vm.name"));
System.out.println("Java运行时环境规范版本:"+props.getProperty("java.specification.version"));
System.out.println("Java运行时环境规范供应商:"+props.getProperty("java.specification.vender"));
System.out.println("Java运行时环境规范名称:"+props.getProperty("java.specification.name"));
System.out.println("Java的类格式版本号:"+props.getProperty("java.class.version"));
System.out.println("Java的类路径:"+props.getProperty("java.class.path"));
System.out.println("加载库时搜索的路径列表:"+props.getProperty("java.library.path"));
System.out.println("默认的临时文件路径:"+props.getProperty("java.io.tmpdir"));
System.out.println("一个或多个扩展目录的路径:"+props.getProperty("java.ext.dirs"));
System.out.println("操作系统的名称:"+props.getProperty("os.name"));
System.out.println("操作系统的构架:"+props.getProperty("os.arch"));
System.out.println("操作系统的版本:"+props.getProperty("os.version"));
System.out.println("文件分隔符:"+props.getProperty("file.separator")); //在 unix 系统中是"/"
System.out.println("路径分隔符:"+props.getProperty("path.separator")); //在 unix 系统中是":"
System.out.println("行分隔符:"+props.getProperty("line.separator")); //在 unix 系统中是"/n"
System.out.println("用户的账户名称:"+props.getProperty("user.name"));
System.out.println("用户的主目录:"+props.getProperty("user.home"));
System.out.println("用户的当前工作目录:"+props.getProperty("user.dir"));
return props;
}
private void business(){
System.out.println("请输入割圆次数: ");
int n = sc.nextInt();
double y = 1.0;
for (int i = 0; i < n; i++) {
double π = 3 * Math.pow( 2 , i) * y;
System.out.println("第"+i+"次切割,为正"+(6 + 6*i) + "边形,圆周率π = " + π);
y = Math.sqrt( 2 - Math.sqrt( 4 - y*y));
}
}
public static void main(String[] args) throws InterruptedException, IOException {
if(args != null && args.length >0){
zkUrl = args[0];
}
DataNodeServer server = new DataNodeServer();
server.getConnect();
server.registerServer(); //第一次datanode后注册到zk
boolean flag = false;
int choice = 2;
while (!flag){
System.out.println("请输入你要的操作:n1.执行计算任务 n2.退出n");
choice = sc.nextInt();
switch (choice){
case 1:
server.business();
break;
case 2:
zkHelper.close();
flag = true;
sc.close();
break;
default:
System.out.println("没有这个操作....");
}
}
System.out.println("客户端退出");
}
}
3.2 NameNodeServer该类用于监视datanodeserve的心跳,可以通过zookpeer的watch来监听上线节点数目,只需要实现Watcher接口再传入
public class NameNodeServer {
private static ZkHelper zkHelper;
private static ZooKeeper zk;
private static Logger logger = Logger.getLogger(NameNodeServer.class);
private String parentNode = "/servers";
//初始化节点parentdate节点
private void initMainNode()throws Exception{
zkHelper = new ZkHelper();
zk = zkHelper.connect();
if(zk==null || zk.getState()!= ZooKeeper.States.CONNECTED){
logger.error("没有建立起与zookpeer服务器"+zkHelper.getConnectString()+"的连接");
throw new Exception("没有建立起与zookpeer服务器"+zkHelper.getConnectString()+"的连接");
}
try{
zk.exists(parentNode,true);
}catch (Exception e){
zk.create(parentNode,"this is yc datanode cluster".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
}
private void getServerList() throws KeeperException,InterruptedException{
CountDownLatch countDownLatch = new CountDownLatch(Integer.MAX_VALUE);
MyMatch nm = new MyMatch(zk,parentNode,countDownLatch);
List servers = zk.getChildren(parentNode,nm);/
public void closeZookpeer(){
logger.info("关闭zookpeer的连接....");
if(zk!=null && zk.getState() == ZooKeeper.States.CONNECTED){
try {
zk.close();
}catch (InterruptedException e){
e.printStackTrace();
}
}
}
//start-all.sh stop.all.sh
public static void main(String[] args) throws Exception {
//1.实例化服务器
NameNodeServer server = new NameNodeServer();
//2.建立连接 NameNodeServer -> zk;
//3.判断/servers是否存在 如果不存在 则创建 -》init
server.initMainNode();;
//绑定监听 /servers下的/子节点变化
// * ls path [watch] ->getChildren
/
class MyMatch implements Watcher {
private ZooKeeper zk;
private String path;
private CountDownLatch countDownLatch;
private Logger logger = Logger.getLogger(MyMatch.class);
private void showChildNodeInfo(List children) {
//1.循环子节点列表
logger.info("当前在线的datanode有:" + children.size());
logger.info("他们是:");
logger.info(children); //[xx,xx,xxx]
logger.info("各节点的详情:");
for (String sonPath : children) {
logger.info("*****" + sonPath + "*********");
try {
byte[] data = zk.getData(path + "/" + sonPath, false, null);//byte[]->反序列化
Properties p = (Properties) deserilizable(data);
logger.info(p);
logger.info("**************");
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
//2.根据子节点 路径 显示信息
}
}
序列化转为byte数组
private Object deserilizable(byte[] data) {
Object obj = null;
ByteArrayInputStream bis = null;
ObjectInputStream ois = null;
try {
bis = new ByteArrayInputStream(data);
ois = new ObjectInputStream(bis);
obj = ois.readObject();
} catch (ClassNotFoundException | IOException e) {
e.printStackTrace();
} finally {
if (ois != null) {
try {
ois.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if (bis != null) {
try {
bis.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
return obj;
}
@Override
public void process(WatchedEvent event) {
if (event.getType() == NodeChildrenChanged) {
//实现子节点列表变化的代码
logger.info("监听到了子节点的变化...");
try {
//重新绑定
List children = zk.getChildren(path,MyMatch.this);
showChildNodeInfo(children);
} catch (KeeperException | InterruptedException e) {
e.printStackTrace();
logger.error(e.getMessage());
}
} else if (event.getType() == NodeDataChanged) {
try {
Stat stat = new Stat();
//画龙点睛之笔
byte[] data = zk.getData(path, MyMatch.this, stat);//异常不能抛出
String dataString = new String(data,"utf-8");
logger.info("监听程序中获取节点:"+path+"更新后的数据为:"+dataString);
logger.info("节点最新的信息stat为:");
String info = YcZnodeUtil.printZnodeInfo(stat);
logger.info(info);
} catch (Exception e) {
e.printStackTrace();
}
}
countDownLatch.countDown(); //递减
//重新设置countdown 的初始值 让他循环
if (countDownLatch.getCount() == 1) {
countDownLatch = new CountDownLatch(Integer.MAX_VALUE);
}
System.out.println("当前 connectedsignal为: " + countDownLatch.getCount());
}
public MyMatch(ZooKeeper zk,String path,CountDownLatch countDownLatch){
super();
this.zk = zk;
this.path = path;
this.countDownLatch = countDownLatch;
}
public MyMatch(){
super();
}
public void setZk(ZooKeeper zk){
this.zk = zk;
}
public void setPath(String path){
this.path = path;
}
}
# pom文件配置
org.apache.maven.plugins
maven-compiler-plugin
8
8
org.apache.maven.plugins
maven-assembly-plugin
false
jar-with-dependencies
com.yc.Zookpeek.program2.NameNodeServer
make-assembly
package
assembly
junit
junit
4.13.2
test
org.junit.jupiter
junit-jupiter
RELEASE
compile
org.apache.zookeeper
zookeeper
3.4.6
org.projectlombok
lombok
1.18.20
provided



