栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Zookeeper入门(二) --- 客户端常用命令、服务器上下线监听案例、分布式锁实现

Zookeeper入门(二) --- 客户端常用命令、服务器上下线监听案例、分布式锁实现

文章目录

四、客户端常用命令五、Zookeeper节点

5.1 节点数据信息5.2 节点类型5.3 监听器原理 六、服务器动态上下线监听案例七、分布式锁案例

7.1 锁原理7.2 原生API代码实现7.3 Curator框架实现

四、客户端常用命令
命令行基本语法功能描述
help显示所有操作命令
ls [-w] [-s] path使用 ls 命令来查看znode的子节点,
-w 监听子节点变化
-s 附加次级信息
create [-s] [-e] node value创建节点
-s 带序列的(后面追加系统自增的数字)
-e 临时节点,重启或者超时时消失
get [-w] [-s] path获得节点的值
-w 监听节点内容变化
-s 附加次级信息
set设置/修改节点的具体值
stat查看节点状态
delete删除节点
deleteall递归删除节点和其子节点
五、Zookeeper节点 5.1 节点数据信息
[zk: localhost:2181(CONNECTED) 0] ls -s /
[zookeeper]cZxid = 0x0
ctime = Thu Jan 01 08:00:00 CST 1970
mZxid = 0x0
mtime = Thu Jan 01 08:00:00 CST 1970
pZxid = 0x9
cversion = 1
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 0
numChildren = 1

czxid: 创建节点的事务 zxid 每次修改 ZooKeeper 状态都会产生一个 ZooKeeper 事务 ID。事务 ID 是 ZooKeeper 中所有修改总的次序。每次修改都有唯一的 zxid,如果 zxid1 小于 zxid2,那么 zxid1 在 zxid2 之前发生。ctime: znode 被创建的毫秒数mzxid: znode 最后更新的事务 zxidmtime: znode 最后修改的毫秒数pZxid: znode 最后更新的子节点 zxidcversion: znode 子节点变化号,znode 子节点修改次数dataversion: znode 数据变化号aclVersion: znode 访问控制列表的变化号ephemeralOwner: 如果是临时节点,这个是 znode 拥有者的 session id。如果不是临时节点则是 0dataLength: znode 的数据长度numChildren: znode 子节点数量 5.2 节点类型

5.3 监听器原理

客户端注册监听它关心的目录节点,当目录节点发生变化(数据改变、节点删除、子目录节点增加删除)时,ZooKeeper 会通知客户端

监听机制保证 ZooKeeper 保存的任何的数据的任何改变都能快速的响应到监听了该节点的应用程序

六、服务器动态上下线监听案例

需求:某分布式系统中,主节点可以有多台,可以动态上下线,任意一台客户端都能实时感知到主节点服务器的上下线。

新建Maven项目并导入依赖


    
        junit
        junit
        4.13.2
    
    
        org.apache.logging.log4j
        log4j-core
        2.14.1
    
    
        org.apache.zookeeper
        zookeeper
        3.5.7
    

服务端:

public class DistributeServer {
    private final String connectString = "192.168.2.128:2181,192.168.2.128:2182,192.168.2.128:2183";
    private static int sessionTimeout = 60000;
    private ZooKeeper zkClient = null;
    private final String ParentPath = "/servers";

    //获取到zookeeper的客户端连接
    public void init() throws IOException {
        zkClient = new ZooKeeper(connectString,sessionTimeout,watchedEvent -> {

        });
    }

    //注册服务器到zk上
    public void registerServer(String hostname) throws KeeperException, InterruptedException {
        String created = zkClient.create(ParentPath+"/server", hostname.getBytes(StandardCharsets.UTF_8),
                ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);//模式为 -e -s,暂时的带序号的
        System.out.println(hostname+"is online"+created);
    }

    //业务功能
    public void business(String hostname) throws InterruptedException {
        System.out.println(hostname+"正在服务。。");
        Thread.sleep(Long.MAX_VALUE);
    }

    public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
        DistributeServer server = new DistributeServer();
        server.init();
        server.registerServer(args[0]);
        server.business(args[0]);
    }
}

客户端:

public class DistributeClient {
    private final String connectString = "192.168.2.128:2181,192.168.2.128:2182,192.168.2.128:2183";
    private static int sessionTimeout = 60000;
    private ZooKeeper zkClient = null;
    private final String ParentPath = "/servers";

    //获取连接
    public void init() throws IOException {
        zkClient = new ZooKeeper(connectString,sessionTimeout,watchedEvent -> {
            System.out.println(watchedEvent.getPath()+"路径下:"+watchedEvent.getType());
            try {
                System.out.println("开始监听。。。");
                listen();
            } catch (KeeperException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
    }

    //监听服务器端有哪些可用
    public void listen() throws KeeperException, InterruptedException {
        List children = zkClient.getChildren(ParentPath, true);
        System.out.println("可用的服务端:"+children);
    }

    //业务
    public void business() throws InterruptedException {
        System.out.println("开始业务");
        Thread.sleep(Long.MAX_VALUE);
    }

    public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
        DistributeClient client = new DistributeClient();
        client.init();
        client.listen();
        client.business();
    }
}
public class DistributeClient {
    private final String connectString = "192.168.2.128:2181,192.168.2.128:2182,192.168.2.128:2183";
    private static int sessionTimeout = 60000;
    private ZooKeeper zkClient = null;
    private final String ParentPath = "/servers";

    //获取连接
    public void init() throws IOException {
        zkClient = new ZooKeeper(connectString,sessionTimeout,watchedEvent -> {
            System.out.println(watchedEvent.getPath()+"路径下:"+watchedEvent.getType());
            try {
                System.out.println("开始监听。。。");
                listen();
            } catch (KeeperException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
    }

    //监听服务器端有哪些可用
    public void listen() throws KeeperException, InterruptedException {
        List children = zkClient.getChildren(ParentPath, true);
        System.out.println("可用的服务端:"+children);
    }

    //业务
    public void business() throws InterruptedException {
        System.out.println("开始业务");
        Thread.sleep(Long.MAX_VALUE);
    }

    public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
        DistributeClient client = new DistributeClient();
        client.init();
        client.listen();
        client.business();
    }
}

运行截图:

先启动一个客户端

然后依次启动三个服务端,服务端在jvm启动参数那设置arg[0]分别为192.168.2.128:2181,192.168.2.128:2182,192.168.2.128:2183

然后依次下线

七、分布式锁案例 7.1 锁原理

利用watch监听机制

多个客户端竞争锁,即各自创建自己的节点,用-s -e方式,创建顺序临时节点,谁排在第一个就成功获取到锁,其他的就等待直到排到自己

7.2 原生API代码实现
public class DistributeLock {
    private final String connectString = "127.0.0.1:2181";
    private static int sessionTimeout = 60000;
    private ZooKeeper zkClient = null;
    private final String rootNode = "/locks";
    private final String subNode = "/seq-";
    private String waitPath;    //当前节点等待的节点,就是前一个节点
    private String currentNode;//当前client创建的子节点
    private CountDownLatch connectLatch = new CountDownLatch(1);
    private CountDownLatch waitLatch = new CountDownLatch(1);

    public DistributeLock() throws IOException, KeeperException, InterruptedException {
        zkClient = new ZooKeeper(connectString, sessionTimeout, watchedEvent -> {
            //建立连接的时候让一个线程唤醒,往下执行
            if (watchedEvent.getState() == Watcher.Event.KeeperState.SyncConnected){
                connectLatch.countDown();
            }
            //发生了waitPath的删除事件
            if(watchedEvent.getType() == Watcher.Event.EventType.NodeDeleted &&
                watchedEvent.getPath().equals(waitPath))  {
                waitLatch.countDown();
            }
        });
        connectLatch.await();
        Stat stat = zkClient.exists(rootNode, false);//判断根节点状态
        if(stat==null){
            System.out.println("根节点不存在,开始创建");
            zkClient.create(rootNode,new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        }
    }

	//加锁
    public void lock(){
        try {
            currentNode = zkClient.create(rootNode+subNode,null, ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL);
            // wait 一小会, 让结果更清晰一些
            Thread.sleep(10);
            //获取节点判断是否为第一位
            List children = zkClient.getChildren(rootNode, false);
            System.out.println("检测到:"+children);
            if (children.size()==1){
                return;//如果只有一个节点,那肯定是当前节点
            } else {
                Collections.sort(children);
                int index = children.indexOf(children);
                if(index==0){//排序第一位就获得锁
                    return;
                } else {
                    //获取前一位的节点
                    waitPath = rootNode + "/" + children.get(index-1);
                    //在 waitPath 上注册监听器, 当 waitPath 被删除时,
                    //zookeeper 会回调监听器的 process 方法
                    zkClient.getData(waitPath,true,new Stat());
                    waitLatch.await();
                    return;
                }
            }
        } catch (KeeperException | InterruptedException e) {
            e.printStackTrace();
        }
    }

    //解锁
    public void unlock(){
        try {
            zkClient.delete(currentNode,-1);
        } catch (InterruptedException | KeeperException e) {
            e.printStackTrace();
        }

    }
}

测试方法:

public class TestLock {
    public static void main(String[] args) throws InterruptedException, IOException, KeeperException {
        for (int i = 0; i < 5; i++) {
            final DistributeLock lock = new DistributeLock();
            new Thread(()->{
                try {
                    lock.lock();
                    System.out.println(Thread.currentThread().getName()+"获取了锁");
                    Thread.sleep(5 * 1000);
                    lock.unlock();
                    System.out.println(Thread.currentThread().getName()+"释放了锁");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            },i+"").start();
        }
    }
}

运行截图:

如果没加锁,会所有线程一起拿到锁

加锁后:


排队获取锁

7.3 Curator框架实现

官网:Apache Curator

原生的 Java API 开发存在的问题

会话连接是异步的,需要自己去处理。比如使用 CountDownLatchWatch 需要重复注册,不然就不能生效开发的复杂性还是比较高的不支持多节点删除和创建。需要自己去递归

代码实现:


 org.apache.curator
 curator-framework
 4.3.0


 org.apache.curator
 curator-recipes
 4.3.0


 org.apache.curator
 curator-client
 4.3.0

public class CuratorLockTest {
    private final static String rootNode = "/locks";
    private final static String connectString = "39.106.87.37:2181";
    private final static int sessionTimeout = 60000;
    private final static int connectTimeout = 60000;

    public static void main(String[] args) {
        new CuratorLockTest().test();
    }

    public void test(){
        for (int i = 0; i < 5; i++) {
            final InterProcessLock lock = new InterProcessMutex(getCuratorframework(), rootNode);
            new Thread(()->{
                try {
                    lock.acquire();
                    System.out.println(Thread.currentThread().getName()+"获得锁");
                    lock.acquire();//可重入
                    System.out.println(Thread.currentThread().getName()+"再次获得锁");
                    Thread.sleep(5*1000);
                    lock.release();
                    System.out.println(Thread.currentThread().getName()+"释放锁");
                    lock.release();
                    System.out.println(Thread.currentThread().getName()+"再次释放锁");
                } catch (Exception e) {
                    e.printStackTrace();
                }

            },"线程"+i).start();
        }
    }

    public Curatorframework getCuratorframework(){
        Curatorframework client = CuratorframeworkFactory.builder()
                .connectString(connectString)
                .connectionTimeoutMs(connectTimeout)
                .sessionTimeoutMs(sessionTimeout)
                .retryPolicy(new ExponentialBackoffRetry(3000, 3))//重试策略,初试3秒,重试3次
                .build();
        client.start();
        System.out.println("zookeeper 初始化完成");
        return client;
    }
}

运行截图:

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/746294.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号