栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Zookeeper+springclould实现分布式全局锁

Zookeeper+springclould实现分布式全局锁

实现原理:

利用zookeeper顺序操作串行的特性我们可以将zookeeper作为一个全局的锁来使用.

首先创建一个zookeeper基础操作类:

来对zookeeper节点进行操作

import lombok.extern.slf4j.Slf4j;
import org.apache.zookeeper.*;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.data.Stat;

import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;


@Slf4j
public class ZookeeperDistributeLock implements DistributedLock {

    private ZooKeeper zooKeeper;
    private String rootPath;// 根路径名
    private String lockNamePre;// 锁前缀
    private static ThreadLocal currentLockPath = new ThreadLocal<>();// 用于保存某个客户端在locker下面创建成功的顺序节点,用于后续相关操作使用(如判断)
    private static int MAX_RETRY_COUNT = 10;// 最大重试次数

    public ZookeeperDistributeLock(ZooKeeper zookeeper, String rootPath, String lockNamePre) {
        log.info("rootPath:{},lockNamePre:{}", rootPath, lockNamePre);
        this.zooKeeper = zookeeper;
        this.rootPath = rootPath;
        this.lockNamePre = lockNamePre;
        init();
    }

    
    private void init() {
        try {
            Stat stat = zooKeeper.exists(rootPath, false);// 判断一下根目录是否存在
            if (stat == null) {
                zooKeeper.create(rootPath, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            }
        } catch (Exception e) {
            log.error("create rootPath error", e);
        }
    }
    
    private String getLockNodeNumber(String str, String lockName) {
        int index = str.lastIndexOf(lockName);
        if (index >= 0) {
            index += lockName.length();
            return index <= str.length() ? str.substring(index) : "";
        }
        return str;
    }

    
    private List getSortedChildren() throws Exception {
        List children = zooKeeper.getChildren(rootPath, false);
        if (children != null && !children.isEmpty()) {
            Collections.sort(children, new Comparator() {
                @Override
                public int compare(String lhs, String rhs) {
                    return getLockNodeNumber(lhs, lockNamePre).compareTo(getLockNodeNumber(rhs, lockNamePre));
                }
            });
        }
        log.info("sort childRen:{}", children);
        return children;
    }

    
    private boolean waitToLock(long startMillis, Long millisToWait) throws Exception {

        boolean haveTheLock = false;
        boolean doDelete = false;

        try {
            while (!haveTheLock) {
                log.info("get Lock Begin");
                // 该方法实现获取locker节点下的所有顺序节点,并且从小到大排序,
                List children = getSortedChildren();
                String sequenceNodeName = currentLockPath.get().substring(rootPath.length() + 1);

                // 计算刚才客户端创建的顺序节点在locker的所有子节点中排序位置,如果是排序为0,则表示获取到了锁
                int ourIndex = children.indexOf(sequenceNodeName);

                
                if (ourIndex < 0) {
                    log.error("not find node:{}", sequenceNodeName);
                    throw new Exception("节点没有找到: " + sequenceNodeName);
                }

                // 如果当前客户端创建的节点在locker子节点列表中位置大于0,表示其它客户端已经获取了锁
                // 此时当前客户端需要等待其它客户端释放锁,
                boolean isGetTheLock = ourIndex == 0;

                // 如何判断其它客户端是否已经释放了锁?从子节点列表中获取到比自己次小的哪个节点,并对其建立监听
                String pathToWatch = isGetTheLock ? null : children.get(ourIndex - 1);

                if (isGetTheLock) {
                    log.info("get the lock,currentLockPath:{}", currentLockPath.get());
                    haveTheLock = true;
                } else {
                    // 如果次小的节点被删除了,则表示当前客户端的节点应该是最小的了,所以使用CountDownLatch来实现等待
                    String previousSequencePath = rootPath.concat("/").concat(pathToWatch);
                    final CountDownLatch latch = new CountDownLatch(1);
                    final Watcher previousListener = new Watcher() {
                        @Override
                        public void process(WatchedEvent event) {
                            if (event.getType() == EventType.NodeDeleted) {
                                latch.countDown();
                            }
                        }
                    };

                    // 如果节点不存在会出现异常
                    Stat stat = zooKeeper.exists(previousSequencePath, previousListener);
                    if (stat == null) {
                        log.error("节点为空,发生异常");
                        throw new Exception();
                    }

                    // 如果有超时时间,刚到超时时间就返回
                    if (millisToWait != null) {
                        millisToWait -= (System.currentTimeMillis() - startMillis);
                        startMillis = System.currentTimeMillis();
                        if (millisToWait <= 0) {
                            // timed out - delete our node
                            doDelete = true;
                            break;
                        }

                        latch.await(millisToWait, TimeUnit.MICROSECONDS);
                    } else {
                        latch.await();
                    }
                }
            }
        } catch (Exception e) {
            // 发生异常需要删除节点
            log.error("waitToLock exception", e);
            doDelete = true;
            throw e;
        } finally {
            // 如果需要删除节点
            if (doDelete) {
                unLock();
            }
        }
        log.info("get Lock end,haveTheLock=" + haveTheLock);
        return haveTheLock;
    }

    
    private String createLockNode(String path) throws Exception {
        Stat stat = zooKeeper.exists(rootPath, false);
        // 判断一下根目录是否存在
        if (stat == null) {
            //创建永久节点
            zooKeeper.create(rootPath, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        }
        //创建临时节点,客户端断开则删除节点
        return zooKeeper.create(path, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
    }

    
    private Boolean attemptLock(long time, TimeUnit unit) throws Exception {
        final long startMillis = System.currentTimeMillis();
        final Long millisToWait = (unit != null) ? unit.toMillis(time) : null;

        boolean hasTheLock = false;
        boolean isDone = false;
        int retryCount = 0;

        // 网络闪断需要重试一试,最大重试次数MAX_RETRY_COUNT
        while (!isDone) {
            isDone = true;
            try {
                currentLockPath.set(createLockNode(rootPath.concat("/").concat(lockNamePre)));
                hasTheLock = waitToLock(startMillis, millisToWait);

            } catch (Exception e) {
                if (retryCount++ < MAX_RETRY_COUNT) {
                    log.error("正在重试::" + retryCount);
                    isDone = false;
                } else {
                    throw e;
                }
            }
        }

        return hasTheLock;
    }

    @Override
    public boolean tryLock() throws Exception {
        log.info("tryLock Lock Begin");
        // 该方法实现获取locker节点下的所有顺序节点,并且从小到大排序,
        List children = getSortedChildren();
        String sequenceNodeName = currentLockPath.get().substring(rootPath.length() + 1);

        // 计算刚才客户端创建的顺序节点在locker的所有子节点中排序位置,如果是排序为0,则表示获取到了锁
        int ourIndex = children.indexOf(sequenceNodeName);

        if (ourIndex < 0) {
            log.error("not find node:{}", sequenceNodeName);
            throw new Exception("节点没有找到: " + sequenceNodeName);
        }

        // 如果当前客户端创建的节点在locker子节点列表中位置大于0,表示其它客户端已经获取了锁
        return ourIndex == 0;
    }

    @Override
    public void lock() throws Exception {
        // -1,null表示阻塞等待,不设置超时时间
        attemptLock(-1, null);

    }

    @Override
    public boolean lock(long time, TimeUnit unit) throws Exception {
        if (time <= 0) {
            throw new Exception("Lock wait for time must greater than 0");
        }

        if (unit == null) {
            throw new Exception("TimeUnit can not be null");
        }

        return attemptLock(time, unit);
    }
    @Override
    public void unLock() {
        try {
            Stat stat = zooKeeper.exists(currentLockPath.get(), true);
            if (stat != null) {
                zooKeeper.delete(currentLockPath.get(), -1);
            }
        } catch (Exception e) {
            log.error("unLock error", e);

        } finally {
            currentLockPath.remove();
        }
    }
}

使用方法也很简单,在程序中直接调用lock方法即可上锁,再次调用unlock方法即可释放全局锁

 @Around("lockPointCut()")
    public Object tryLock(ProceedingJoinPoint pjd) {
        Object result;
        String methodName = pjd.getSignature().getName();
        //执行目标方法
        try {
            //前置通知
            log.info("进入分布式锁切面方法:" + methodName);
            //加锁
            myLock.lock();
            log.info("加锁成功当前线程:"+Thread.currentThread().getName());
            //执行方法体
            result = pjd.proceed();
        } catch (Throwable throwable) {
            log.info("锁切面失效");
            myLock.unLock();
            throwable.printStackTrace();
            return new CommonResult().failed("加锁方法执行失败");
        } finally {
            log.info("释放zookeeper锁");
            myLock.unLock();
        }
        return result;
    }

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/742398.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号