栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Zookeeper--07---案例----服务器动态上下线

Zookeeper--07---案例----服务器动态上下线

提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档

文章目录
  • 服务器动态上下线
    • 需求:
    • 步骤
    • 临时节点是如何维持⼼跳呢?
    • zk的watch机制
  • 具体实现
    • 1.先在集群上创建/servers 节点
    • 2. 服务器端向Zookeeper 注册代码
    • 3.服务器端向Zookeeper 注册代码
    • 4.客户端代码 )客户端代码
    • 5.测试
      • 1)在 Linux命令行上操作增加减少服务器
      • 2)在 Idea上操作增加减少服务器


服务器动态上下线 需求:
  • 客户端能实时洞察到服务器上下线的变化
步骤
  1. 服务端启动时去注册信息(创建都是临时节点)
  2. 获取到当前在线服务器列表,并且注册监听
  3. 服务器节点下线
  4. 服务器节点上下线事件通知
临时节点是如何维持⼼跳呢?

zk的watch机制

Watch机制介绍

  • 我们可以把 Watch 理解成是注册在特定 Znode 上的触发器。
  • 当这个 Znode 发⽣改变,也就是调⽤了 create , delete , setData ⽅法的时候,将会触发 Znode上注册的对应事件,请求 Watch 的客户端会接收到异步通知。

具体交互过程如下:

  • 客户端调⽤ getData ⽅法, watch 参数是 true 。服务端接到请求,返回节点数据,并且在对应的哈希表⾥插⼊被 Watch 的 Znode 路径,以及 Watcher 列表。
  • 当被 Watch 的 Znode 已删除,服务端会查找哈希表,找到该 Znode
    对应的所有Watcher,异步通知客户端,并且删除哈希表中对应的 Key-Value。

客户端使⽤了NIO通信模式监听服务端的调⽤。

具体实现 1.先在集群上创建/servers 节点
[zk: localhost:2181(CONNECTED) 10] create /servers "servers" 

2. 服务器端向Zookeeper 注册代码
  
            org.apache.zookeeper
            zookeeper
            3.5.7
        
3.服务器端向Zookeeper 注册代码
  1. 获取zk连接
  2. 注册服务器到zk集群
  3. 启动业务逻辑
package com.example.demo.atguigu.case1;

import org.apache.zookeeper.*;

import java.io.IOException;

public class DistributeServer {

    private String connectString = "hadoop102:2181,hadoop103:2181,hadoop104:2181";
    private int sessionTimeout = 2000;
    private ZooKeeper zk;

    public static void main(String[] args) throws IOException, KeeperException, InterruptedException {

        DistributeServer server = new DistributeServer();
        // 1 获取zk连接
        server.getConnect();

        // 2 注册服务器到zk集群
        server.regist(args[0]);


        // 3 启动业务逻辑(睡觉)
        server.business();

    }

    private void business() throws InterruptedException {
        Thread.sleep(Long.MAX_VALUE);
    }

    private void regist(String hostname) throws KeeperException, InterruptedException {
        String create = zk.create("/servers/"+hostname, hostname.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);

        System.out.println(hostname +" is online") ;
    }

    private void getConnect() throws IOException {

        zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {

            }
        });
    }
}

4.客户端代码 )客户端代码
package com.example.demo.atguigu.case1;

import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

public class DistributeClient {

    private String connectString = "hadoop102:2181,hadoop103:2181,hadoop104:2181";
    private int sessionTimeout = 2000;
    private ZooKeeper zk;

    public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
        DistributeClient client = new DistributeClient();

        // 1 获取zk连接
        client.getConnect();

        // 2 监听/servers下面子节点的增加和删除
        client.getServerList();

        // 3 业务逻辑(睡觉)
        client.business();

    }

    private void business() throws InterruptedException {
        Thread.sleep(Long.MAX_VALUE);
    }

    private void getServerList() throws KeeperException, InterruptedException {
        List children = zk.getChildren("/servers", true);

        ArrayList servers = new ArrayList<>();

        for (String child : children) {

            byte[] data = zk.getData("/servers/" + child, false, null);

            servers.add(new String(data));
        }

        // 打印
        System.out.println(servers);
    }

    private void getConnect() throws IOException {
        zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {

                try {
                    getServerList();
                } catch (KeeperException e) {
                    e.printStackTrace();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
    }
}

5.测试 1)在 Linux命令行上操作增加减少服务器

2)在 Idea上操作增加减少服务器



转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/695901.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号