栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

zookeeper 闆嗙兢(zookeeper鍒嗗竷寮忛攣鍘熺悊)

zookeeper 闆嗙兢(zookeeper鍒嗗竷寮忛攣鍘熺悊)

自行实现
package com.peace.test.zookeeper.dlock;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;


public class DistributedLock {

    private final ZooKeeper zkClient;

    private final String lockRootPath = "/locks";

    private final CountDownLatch connectCountDownLatch = new CountDownLatch(1);

    private final CountDownLatch awaitLatch = new CountDownLatch(1);
    private String awaitPath;
    private String currentNodeFullPath;

    public DistributedLock() throws IOException, InterruptedException, KeeperException {
        int sessionTimeout = 2000;
        String host = "192.168.1.1:2128,192.168.1.1:2128,192.168.1.1:2128";
        zkClient = new ZooKeeper(host, sessionTimeout, watchedEvent -> {
            // 连接事件
            if (watchedEvent.getState() == Watcher.Event.KeeperState.SyncConnected) {
                connectCountDownLatch.countDown();
            }
            // 锁节点释放
            if (watchedEvent.getType() == Watcher.Event.EventType.NodeDeleted && watchedEvent.getPath().equals(awaitPath)) {
                awaitLatch.countDown();
            }
        });
        connectCountDownLatch.await();
        Stat stat = zkClient.exists(lockRootPath, false);
        // 创建根节点
        if (stat == null) {
            zkClient.create(lockRootPath, "lock".getBytes(StandardCharsets.UTF_8),
                    ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        }

    }

    public void lock() throws Exception {
        // 创建临时 带序号 节点,/locks/seq-00000000
        currentNodeFullPath = zkClient.create(lockRootPath + "/seq-", null,
                ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
        // 查询所有获取锁的请求
        List children = zkClient.getChildren(lockRootPath, false);
        if (children.size() == 1) {
            // 没人竞争,直接拿到锁
            return;
        } else {
            Collections.sort(children);
            // seq-00000000
            String currentNodeShortPath = currentNodeFullPath.substring("/locks/".length());
            int index = children.indexOf(currentNodeShortPath);
            if (index == -1) {
                throw new Exception("数据异常");
            } else if (index == 1) {
                // 是第一个,可以得到锁
                return;
            } else {
                // 不是第一个,监听前一个节点
                awaitPath = "/locks/" + children.get(index - 1);
                zkClient.getData(awaitPath, true, null);
                awaitLatch.await();
                return;
            }

        }
    }

    public void unlock() throws KeeperException, InterruptedException {
        zkClient.delete(currentNodeFullPath, -1);
    }
}
curator框架实现
package com.peace.test.zookeeper.dlock;

import org.apache.curator.framework.Curatorframework;
import org.apache.curator.framework.CuratorframeworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;


public class CuratorDistributedLock {

    static String host = "192.168.1.1:2128,192.168.1.1:2128,192.168.1.1:2128";

    static Curatorframework curatorframework;


    static {
        ExponentialBackoffRetry policy = new ExponentialBackoffRetry(5000, 3);
        curatorframework = CuratorframeworkFactory.builder()
                .connectString(host).connectionTimeoutMs(2000)
                .sessionTimeoutMs(2000).retryPolicy(policy).build();
        curatorframework.start();
    }

    public static void lock(String path) throws Exception {
        InterProcessMutex lock = new InterProcessMutex(curatorframework, path);
        lock.acquire();
        // 支持可重入
        // 
    }

    public static void unlock(String path) throws Exception {
        InterProcessMutex lock = new InterProcessMutex(curatorframework, path);
        lock.release();
    }

}

 

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/772309.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号