栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

curator框架分布式锁解析

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

curator框架分布式锁解析

1、InterProcessMutex

分析:

可重入的互斥锁,跨JVM工作。使用ZooKeeper来控制锁。所有JVM中的任何进程,只要使用同样的锁路径,将会成为跨进程的一部分。此外,这个排他锁是“公平的”,每个用户按照申请的顺序得到排他锁。可见InterProcessMutex和我们自己实现的例子都是一个排他锁,此外还可以重入。

代码:

    

public class Curator_Session {

    static AtomicInteger n = new AtomicInteger();

    static String lockPath = "/curator_recipe_lock_path";

    static Curatorframework client = CuratorframeworkFactory.builder().connectString("127.0.0.1:2181")
            .retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();

    public static void main(String[] args) throws Exception {

        client.start();

        final InterProcessMutex lock = new InterProcessMutex(client, lockPath);


        for (int i = 0; i < 30; i++) {
            System.out.println("----------------"+n.addAndGet(1)+"------------------");

            new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        lock.acquire();
                        System.out.println("--------------");
                    } catch (Exception e) {
                    }

                    SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss|SSS");
                    String orderNo = sdf.format(new Date());
                    System.out.println("---------------生成的订单号:" + orderNo);
                    try {
                        lock.release();
                    } catch (Exception exception) {
                    }
                }
            }).start();
        }

    }
}

   运行结果:

   

---------------生成的订单号:15:46:04|460
---------------生成的订单号:15:46:04|474
---------------生成的订单号:15:46:04|477
---------------生成的订单号:15:46:04|489

     逻辑分析:首先通过acquire()获取锁,该方法会阻塞进程,直到获取锁,然后执行你的业务方法,最后通过 release()释放锁。

    思路:1、创建有序临时节点

                2、触发“尝试取锁逻辑”,如果自己是临时锁节点序列的第一个,则取得锁,获取锁成功。

                3、如果自己不是序列中第一个,则监听前一个锁节点变更。同时阻塞线程。

                4、当前一个锁节点变更时,通过watcher恢复线程,然后再次到步骤2“尝试取锁逻辑”

   源码解析:

             InterProcessMutex实现了两个接口:

            public class InterProcessMutex implements InterProcessLock, Revocable

           InterProcessLock是分布式锁接口,分布式锁必须实现接口中的如下方法:

          1、获取锁,直到锁可用

             public void acquire() throws Exception;

         2、在指定等待的时间内获取锁。

            public boolean acquire(long time, TimeUnit unit) throws Exception;

         3、释放锁

           public void release() throws Exception;

        4、当前线程是否获取了锁

       boolean isAcquiredInThisProcess();

      获取锁:

   

public void acquire() throws Exception
{
    if ( !internalLock(-1, null) )
    {
        throw new IOException("Lost connection while trying to acquire lock: " + basePath);
    }
}

private boolean internalLock(long time, TimeUnit unit) throws Exception
{
    
    获取当前线程
    Thread currentThread = Thread.currentThread();
    取得当前线程在threadData中的lockData
    LockData lockData = threadData.get(currentThread);
    if ( lockData != null )
    {
        // re-entering  
        如果存在该线程的锁数据,说明是锁重入, lockData.lockCount加1,直接返回true。获取锁成功
        lockData.lockCount.incrementAndGet();
        return true;
    }
    如果不存在该线程的锁数据,则通过internals.attemptLock()获取锁,此时线程被阻塞,直至获得到锁
    String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());
    if ( lockPath != null )
    {
        锁获取成功后,把锁的信息保存到threadData中。
        LockData newLockData = new LockData(currentThread, lockPath);
        threadData.put(currentThread, newLockData);
        return true;
    }
    如果没能获取到锁,则返回false。
    return false;
}

   LockInternals源码分析:Curator通过zk实现分布式锁的核心逻辑都在LockInternals中

   在InterProcessMutex获取锁的代码分析中,可以看到它是通过internals.attemptLock(time, unit, getLockNodeBytes());来获取锁的,那么我们就以这个方法为入口。

   

String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception
    {
        final long      startMillis = System.currentTimeMillis();
        final Long      millisToWait = (unit != null) ? unit.toMillis(time) : null;
        final byte[]    localLockNodeBytes = (revocable.get() != null) ? new byte[0] : lockNodeBytes;
        int             retryCount = 0;

        String          ourPath = null;
        boolean         hasTheLock = false;
        boolean         isDone = false;
        while ( !isDone )
        {
            isDone = true;

            try
            {
                通过driver在zk上创建锁节点,获得锁节点路径。
                ourPath = driver.createsTheLock(client, path, localLockNodeBytes);
                通过internalLockLoop()方法阻塞进程,直到获取锁成功
                hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);
            }
            catch ( KeeperException.NonodeException e )
            {
                // gets thrown by StandardLockInternalsDriver when it can't find the lock node
                // this can happen when the session expires, etc. So, if the retry allows, just try it all again
                if ( client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper()) )
                {
                    isDone = false;
                }
                else
                {
                    throw e;
                }
            }
        }


    private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception
    {
        boolean     haveTheLock = false;
        boolean     doDelete = false;
        try
        {
            if ( revocable.get() != null )
            {
                client.getData().usingWatcher(revocableWatcher).forPath(ourPath);
            }

            internalLockLoop中通过while自旋,判断锁如果没有被获取,将不断的去尝试获取锁。
            while ( (client.getState() == CuratorframeworkState.STARTED) && !haveTheLock )
            {
                List        children = getSortedChildren();
                String              sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slash

                通过driver查看当前锁节点序号是否排在第一位,如果排在第一位,说明取锁成功,跳出循环
                PredicateResults    predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);
                if ( predicateResults.getsTheLock() )
                {
                    haveTheLock = true;
                }
                else
                {
                    如果没有排在第一位,则监听自己的前序锁节点,然后阻塞线程。
                    当前序节点释放了锁,监听会被触发,恢复线程,此时主线程又回到while中第一步。
                    String  previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();

                    synchronized(this)
                    {
                        try 
                        {
                            // use getData() instead of exists() to avoid leaving unneeded watchers which is a type of resource leak
                            client.getData().usingWatcher(watcher).forPath(previousSequencePath);
                            if ( millisToWait != null )
                            {
                                millisToWait -= (System.currentTimeMillis() - startMillis);
                                startMillis = System.currentTimeMillis();
                                if ( millisToWait <= 0 )
                                {
                                    doDelete = true;    // timed out - delete our node
                                    break;
                                }

                                wait(millisToWait);
                            }
                            else
                            {
                                wait();
                            }
                        }
                        catch ( KeeperException.NonodeException e ) 
                        {
                            // it has been deleted (i.e. lock released). Try to acquire again
                        }
                    }
                }
            }
        }
        catch ( Exception e )
        {
            ThreadUtils.checkInterrupted(e);
            doDelete = true;
            throw e;
        }
        finally
        {
            if ( doDelete )
            {
                deleteOurPath(ourPath);
            }
        }
        return haveTheLock;
    }

        if ( hasTheLock )
        {
            return ourPath;
        }

        return null;
    }

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/736623.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号