栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

【Zookeeper】分布式锁案例(原生、Curator)

【Zookeeper】分布式锁案例(原生、Curator)

 学习视频 

【尚硅谷】2021新版Zookeeper 3.5.7版本教程

集数:24—28


 学习笔记 

【Java】学习笔记汇总


一、介绍

什么叫做分布式锁呢?

比如说,"进程1"在使用该资源的时候,会先去获得锁,"进程1"获得锁以后会对该资源保持独占,这样其他进程就无法访问该资源,"进程1"用完该资源以后就将锁释放掉,让其他进程来获得锁,那么通过这个锁机制,我们就能保证了分布式系统中多个进程能够有序的访问该临界资源。那么我们把这个分布式环境下的这个锁叫作分布式锁。

二、原生ZooKeeper分布式锁案例

手动实现

public class DistributedLock {

    private final String connectString = "192.168.150.101:2181";
    private final int sessionTimeout = 10000;
    private final ZooKeeper zooKeeper;

    private CountDownLatch connectLatch = new CountDownLatch(1);
    private CountDownLatch waitLatch = new CountDownLatch(1);
    private String waitPath;
    private String currentNode;

    public DistributedLock() throws IOException, InterruptedException, KeeperException {
        // 获取连接
        zooKeeper = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                // connectLatch 如果连接上zk 可以释放
                if (watchedEvent.getState() == Event.KeeperState.SyncConnected) {
                    connectLatch.countDown();
                }
                // waitLatch 需要释放
                if (watchedEvent.getType() == Event.EventType.NodeDeleted && watchedEvent.getPath().equals(waitPath)) {
                    waitLatch.countDown();
                }
            }
        });

        // 等待zk正常连接后,往下走程序,让zookeeper更健壮
        connectLatch.await();

        // 判断根节点/locks是否存在
        Stat exists = zooKeeper.exists("/locks", false);

        if (exists == null) {
            // 创建根节点
            zooKeeper.create("/locks","locks".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        }
    }


    // 对zk加锁
    public void zkLock() {
        // 创建对应的临时带序号节点
        try {
            currentNode = zooKeeper.create("/locks/" + "seq-", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);

            // 判断创建的节点是否时最小的序号节点,如果是,获取到锁;如果不是,监听他序号前一个节点
            List children = zooKeeper.getChildren("/locks", false);

            // 如果children只有一个节点,那就直接获取锁;如果有多个节点,需要判断谁最小
            if (children.size() == 1) {
                return;
            } else {
                Collections.sort(children);
                // 获取节点名称

                // 获取节点名称 seq-00000000
                String thisNode = currentNode.substring("/locks/".length());
                // 通过seq-00000000获取该节点在children集合的位置
                int index = children.indexOf(thisNode);

                // 判断
                if (index == -1) {
                    System.out.println("数据异常");
                } else if (index == 0) {
                    // 就一个节点,可以获取锁
                    return;
                } else {
                    // 需要监听前一个节点
                    waitPath = "/locks/" + children.get(index - 1);
                    zooKeeper.getData(waitPath, true, null);

                    // 等待监听
                    waitLatch.await();

                    return;
                }
            }


        } catch (KeeperException | InterruptedException e) {
            e.printStackTrace();
        }

    }

    // 对zk解锁
    public void unZkLock() {
        // 删除节点
        try {
            zooKeeper.delete(currentNode, -1);
        } catch (InterruptedException | KeeperException e) {
            e.printStackTrace();
        }
    }
}

测试代码

public class DistributeClient {

    private String connectString = "192.168.150.101:2181";
    private int sessionTimeout = 10000;
    private ZooKeeper zooKeeper;

    public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
        DistributeClient client = new DistributeClient();

        // 1 获取连接
        client.getConnect();
        // 2 监听/servers下面子节点的增加和删除
        client.getServerList();
        // 3 业务逻辑(睡觉)
        client.businesss();
    }

    private void getServerList() throws InterruptedException, KeeperException {
        List children = zooKeeper.getChildren("/servers", true);
        ArrayList servers = new ArrayList<>();
        for (String child : children) {
            byte[] data = zooKeeper.getData("/servers/" + child, false, null);
            servers.add(new String(data));
        }
        System.out.println(servers);
    }

    private void businesss() throws InterruptedException {
        Thread.sleep(Long.MAX_VALUE);
    }
    

    private void getConnect() throws IOException {
        zooKeeper = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                // 调用getServerList,可以保证持续监听
                try {
                    getServerList();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (KeeperException e) {
                    e.printStackTrace();
                }
            }
        });
    }
}
三、Curator框架实现分布式锁案例 3.1 概述

 原生的 Java API开发存在的问题

1 会话连接是异步的,需要自己去处理。比如使用 CountDownLatch

2 Watch需要重复注册,不然就不能生效

3 开发的复杂性还是比较高的

4 不支持多节点删除和创建。需要自己去递归

 Curator是一个专门解决分布式锁的框架,解决了原生 Java API开发分布式遇到的问题。

详情请查看官方文档:https://curator.apache.org/index.html

3.2 Curator案例实操

 添加依赖


    org.apache.curator
    curator-framework
    4.3.0


    org.apache.curator
    curator-recipes
    4.3.0


    org.apache.curator
    curator-client
    4.3.0

 代码实现

public class CuratorLockTest {
    public static void main(String[] args) {
        // 创建分布式锁1
        InterProcessMutex lock1 = new InterProcessMutex(getCuratorframework(), "/locks");
        // 创建分布式锁2
        InterProcessMutex lock2 = new InterProcessMutex(getCuratorframework(), "/locks");


        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    lock1.acquire();
                    System.out.println("thread 1 get lock");

                    lock1.acquire();
                    System.out.println("thread 1 get lock again");

                    Thread.sleep(5000);

                    lock1.release();
                    System.out.println("thread 1 release lock");

                    lock1.release();
                    System.out.println("thread 1 release lock again");

                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();


        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    lock2.acquire();
                    System.out.println("thread 2 get lock");

                    lock2.acquire();
                    System.out.println("thread 2 get lock again");

                    Thread.sleep(5000);

                    lock2.release();
                    System.out.println("thread 2 release lock");

                    lock2.release();
                    System.out.println("thread 2 release lock again");

                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }

    private static Curatorframework getCuratorframework() {
        ExponentialBackoffRetry retry = new ExponentialBackoffRetry(3000, 3);
        Curatorframework client = CuratorframeworkFactory.builder()
                .connectString("192.168.150.101:2181")
                .connectionTimeoutMs(10000)
                .retryPolicy(retry).build();

        client.start();

        System.out.println("zookeeper 启动成功");
        return client;
    }
}

控台输出:

thread 2 get lock
thread 2 get lock again
thread 2 release lock
thread 2 release lock again
thread 1 get lock
thread 1 get lock again
thread 1 release lock
thread 1 release lock again
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/487186.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号