栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

ZooKeeper : Curator框架之分布式锁InterProcessMutex

ZooKeeper : Curator框架之分布式锁InterProcessMutex

InterProcessMutex

InterProcessMutex类的源码注释:

A re-entrant mutex that works across JVMs. Uses Zookeeper to hold the lock. All processes in all JVMs that use the same lock path will achieve an inter-process critical section. Further, this mutex is “fair” - each user will get the mutex in the order requested (from ZK’s point of view).

跨JVM工作的可重入互斥锁。使用Zookeeper来持有锁。所有JVM中使用相同锁路径的所有进程都将实现进程间临界区。此外,这个互斥锁是“公平的”—每个用户都将按照请求的顺序获得互斥锁(从ZK的角度来看)。

测试

pom.xml:



    4.0.0

    com.kaven
    zookeeper
    1.0-SNAPSHOT

    
        8
        8
    

    
        
            org.apache.curator
            curator-recipes
            5.2.0
        
        
            org.projectlombok
            lombok
            1.18.22
        
    

CuratorframeworkProperties类(提供Curatorframework需要的一些配置信息):

package com.kaven.zookeeper;

import org.apache.curator.RetryPolicy;
import org.apache.curator.retry.ExponentialBackoffRetry;

public class CuratorframeworkProperties {
    // 连接地址
    public static final String CONNECT_ADDRESS = "192.168.1.3:9000";
    // 连接超时时间
    public static final int CONNECTION_TIMEOUT_MS = 40000;
    // Session超时时间
    public static final int SESSION_TIMEOUT_MS = 10000;
    // 命名空间
    public static final String NAMESPACE = "MyNamespace";
    // 重试策略
    public static final RetryPolicy RETRY_POLICY = new ExponentialBackoffRetry(1000, 3);
}

InterProcessMutexRunnable类(实现了Runnable接口,模拟分布式节点获取分布式锁):

package com.kaven.zookeeper;

import lombok.SneakyThrows;
import org.apache.curator.framework.Curatorframework;
import org.apache.curator.framework.CuratorframeworkFactory;
import org.apache.curator.framework.imps.CuratorframeworkState;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import java.util.Random;

public class InterProcessMutexRunnable implements Runnable{
    @SneakyThrows
    @Override
    public void run() {
        // 使用不同的Curatorframework实例,表示不同的分布式节点
        Curatorframework curator = getCuratorframework();
        curator.start();
        assert curator.getState().equals(CuratorframeworkState.STARTED);

        // 模拟随机加入的分布式节点
        int randomSleep = new Random().nextInt(1000);
        Thread.sleep(randomSleep);

        // 分布式锁的锁路径
        String path = "/kaven";

        // 创建InterProcessMutex实例,用于获取分布式锁
        InterProcessMutex mutex = new InterProcessMutex(curator, path);
        // 阻塞,直到获取分布式锁
        mutex.acquire();
        if(mutex.isOwnedByCurrentThread()) {
            System.out.println(Thread.currentThread().getName() + " 持有分布式锁");
            mutex.getParticipantNodes().forEach(System.out::println);
            // 处理业务
            Thread.sleep(5000);
            // 业务处理完成
            System.out.println(Thread.currentThread().getName() + " 业务处理完成");
            // 释放分布式锁
            mutex.release();
        }
        else {
            throw new RuntimeException("获取分布式锁时被中断");
        }
    }

    private Curatorframework getCuratorframework() {

        // 创建Curatorframework实例
        return CuratorframeworkFactory.builder()
                .connectString(CuratorframeworkProperties.CONNECT_ADDRESS)
                .retryPolicy(CuratorframeworkProperties.RETRY_POLICY)
                .connectionTimeoutMs(CuratorframeworkProperties.CONNECTION_TIMEOUT_MS)
                .sessionTimeoutMs(CuratorframeworkProperties.SESSION_TIMEOUT_MS)
                .namespace(CuratorframeworkProperties.NAMESPACE)
                .build();
    }
}

启动类:

package com.kaven.zookeeper;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Application {
    private static final ExecutorService EXECUTOR_SERVICE = Executors.newCachedThreadPool();

    public static void main(String[] args) throws Exception {
        for (int i = 0; i < 5; i++) {
            EXECUTOR_SERVICE.execute(new InterProcessMutexRunnable());
        }
        Thread.sleep(10000000);
    }
}

模拟5个分布式节点获取分布式锁,输出如下所示:

pool-1-thread-4 持有分布式锁
/kaven/_c_e1940de6-9e12-4c94-8c1e-98d22178a94e-lock-0000000000
/kaven/_c_b53a7f04-2e85-4675-839a-c26d24585f2c-lock-0000000001
/kaven/_c_ad36f188-8d5b-4c8f-878b-2675dfb2e3ee-lock-0000000002
/kaven/_c_e843c51c-acf6-431f-b02a-479f7423c4d3-lock-0000000003
/kaven/_c_f594e331-b10d-4447-a475-6fd809d1b8c6-lock-0000000004
pool-1-thread-4 业务处理完成
pool-1-thread-5 持有分布式锁
/kaven/_c_b53a7f04-2e85-4675-839a-c26d24585f2c-lock-0000000001
/kaven/_c_ad36f188-8d5b-4c8f-878b-2675dfb2e3ee-lock-0000000002
/kaven/_c_e843c51c-acf6-431f-b02a-479f7423c4d3-lock-0000000003
/kaven/_c_f594e331-b10d-4447-a475-6fd809d1b8c6-lock-0000000004
pool-1-thread-5 业务处理完成
pool-1-thread-2 持有分布式锁
/kaven/_c_ad36f188-8d5b-4c8f-878b-2675dfb2e3ee-lock-0000000002
/kaven/_c_e843c51c-acf6-431f-b02a-479f7423c4d3-lock-0000000003
/kaven/_c_f594e331-b10d-4447-a475-6fd809d1b8c6-lock-0000000004
pool-1-thread-2 业务处理完成
pool-1-thread-1 持有分布式锁
/kaven/_c_e843c51c-acf6-431f-b02a-479f7423c4d3-lock-0000000003
/kaven/_c_f594e331-b10d-4447-a475-6fd809d1b8c6-lock-0000000004
pool-1-thread-1 业务处理完成
pool-1-thread-3 持有分布式锁
/kaven/_c_f594e331-b10d-4447-a475-6fd809d1b8c6-lock-0000000004
pool-1-thread-3 业务处理完成

InterProcessMutex类提供了两种方法来获取分布式锁:

    
    @Override
    public void acquire() throws Exception
    {
        if ( !internalLock(-1, null) )
        {
            throw new IOException("Lost connection while trying to acquire lock: " + basePath);
        }
    }

    
    @Override
    public boolean acquire(long time, TimeUnit unit) throws Exception
    {
        return internalLock(time, unit);
    }

acquire(long time, TimeUnit unit)方法后面会演示。

可重入锁
    @SneakyThrows
    @Override
    public void run() {
    
        ...
        
        // 阻塞,直到获取分布式锁
        mutex.acquire();
        if(mutex.isOwnedByCurrentThread()) {
            System.out.println(Thread.currentThread().getName() + " 持有分布式锁");
            mutex.getParticipantNodes().forEach(System.out::println);
            // 处理业务
            reentry(5, mutex);
            // 业务处理完成
            System.out.println(Thread.currentThread().getName() + " 业务处理完成");
            // 释放分布式锁
            mutex.release();
        }
        else {
            throw new RuntimeException("获取分布式锁时被中断");
        }
    }

    private void reentry(int nums, InterProcessMutex mutex) throws Exception {
        if(nums == 0) return;
        // 重复获取锁
        mutex.acquire();
        Thread.sleep(1000);
        reentry(nums - 1, mutex);
        // 每获取一次锁,都需要释放
        mutex.release();
    }

通过递归调用来模拟锁重入,输出如下所示:

pool-1-thread-4 持有分布式锁
/kaven/_c_90f9c755-249e-4d80-a101-98d7c894b5bc-lock-0000000000
/kaven/_c_a86b8fc8-e5e0-40fa-9cb0-d25a82e55926-lock-0000000001
/kaven/_c_e2a8e273-e728-4e43-9261-3bd5933d73c8-lock-0000000002
/kaven/_c_9e08256f-cef1-40c4-9d76-9675d92a84e3-lock-0000000003
/kaven/_c_3704bb49-1470-4c4e-a03d-66737967cbff-lock-0000000004
pool-1-thread-4 业务处理完成
pool-1-thread-2 持有分布式锁
/kaven/_c_a86b8fc8-e5e0-40fa-9cb0-d25a82e55926-lock-0000000001
/kaven/_c_e2a8e273-e728-4e43-9261-3bd5933d73c8-lock-0000000002
/kaven/_c_9e08256f-cef1-40c4-9d76-9675d92a84e3-lock-0000000003
/kaven/_c_3704bb49-1470-4c4e-a03d-66737967cbff-lock-0000000004
pool-1-thread-2 业务处理完成
pool-1-thread-3 持有分布式锁
/kaven/_c_e2a8e273-e728-4e43-9261-3bd5933d73c8-lock-0000000002
/kaven/_c_9e08256f-cef1-40c4-9d76-9675d92a84e3-lock-0000000003
/kaven/_c_3704bb49-1470-4c4e-a03d-66737967cbff-lock-0000000004
pool-1-thread-3 业务处理完成
pool-1-thread-5 持有分布式锁
/kaven/_c_9e08256f-cef1-40c4-9d76-9675d92a84e3-lock-0000000003
/kaven/_c_3704bb49-1470-4c4e-a03d-66737967cbff-lock-0000000004
pool-1-thread-5 业务处理完成
pool-1-thread-1 持有分布式锁
/kaven/_c_3704bb49-1470-4c4e-a03d-66737967cbff-lock-0000000004
pool-1-thread-1 业务处理完成
可撤销锁

InterProcessMutex类实现了Revocable接口,使得锁可撤销,其实就是想要获取锁的用户给持有锁的用户发送一个请求(通过Curator的CuratorWatcher),持有锁的用户接收到这个请求后就可以进行处理,比如直接释放锁、平滑释放锁(处理完当前业务后再释放锁)甚至可以不理会该请求,因此这种撤销操作是合作完成的。


public interface Revocable
{
    
    public void     makeRevocable(RevocationListener listener);
    public void     makeRevocable(RevocationListener listener, Executor executor);
}

博主感觉这个功能不是很方便,撤销锁时需要知道持有锁的路径,而InterProcessMutex类并没有提供该路径的直接获取方法,博主只能通过获取参与竞争分布式锁的节点列表来完成该操作(获取该列表的第一个值,就是持有锁的路径),但这样做不是很方便,并且性能不太好。

    @SneakyThrows
    @Override
    public void run() {
    
        ...

        // 创建InterProcessMutex实例,用于获取分布式锁
        InterProcessMutex mutex = new InterProcessMutex(curator, path);
        // 本线程是否持有锁
        AtomicBoolean locked = new AtomicBoolean(false);
        mutex.makeRevocable((mtx) -> {
            // 接收到撤销锁的请求
            try {
                System.out.println(Thread.currentThread().getName() + " 接收到撤销锁的请求");
                if(locked.get()) {
                    // 直接释放锁
                    locked.set(false);
                    System.out.println(Thread.currentThread().getName() + " 直接释放分布式锁");
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        });
        // 阻塞,直到获取分布式锁或给定时间已到
        mutex.acquire(8, TimeUnit.SECONDS);
        if(mutex.isOwnedByCurrentThread()) {
            locked.set(true);
            System.out.println(Thread.currentThread().getName() + " 持有分布式锁");
            mutex.getParticipantNodes().forEach(System.out::println);
            // 处理业务
            while (locked.get()) {
                Thread.sleep(2000);
            }
            // 释放锁
            if(!locked.get()) {
                mutex.release();
            }
        }
        else {
            // 给持有锁的用户发送撤销锁的请求
            // 其实就是设置持有锁的路径的值为__REVOKE__
            // 然后持有锁的用户的监听器(CuratorWatcher)会被触发
            Revoker.attemptRevoke(curator,
                    new ArrayList<>(mutex.getParticipantNodes()).get(0));
            // 再次阻塞,直到获取分布式锁或给定时间已到
            mutex.acquire(8, TimeUnit.SECONDS);
            if(mutex.isOwnedByCurrentThread()) {
                System.out.println(Thread.currentThread().getName() + " 持有分布式锁");
                mutex.getParticipantNodes().forEach(System.out::println);
            }
        }
    }
public class Application {
    private static final ExecutorService EXECUTOR_SERVICE = Executors.newCachedThreadPool();

    public static void main(String[] args) throws Exception {
        for (int i = 0; i < 2; i++) {
            EXECUTOR_SERVICE.execute(new InterProcessMutexRunnable());
        }
        Thread.sleep(10000000);
    }
}

输出如下所示:

pool-1-thread-2 持有分布式锁
/kaven/_c_b5d1ada9-6ef8-4e71-a3c5-222c0d249d6f-lock-0000000000
/kaven/_c_802055d5-07f1-4e47-8f46-4ec64f46405d-lock-0000000001
pool-1-thread-2-EventThread 接收到撤销锁的请求
pool-1-thread-2-EventThread 直接释放分布式锁
pool-1-thread-1 持有分布式锁
/kaven/_c_00ad570e-f4ae-461c-977f-b35fa6157406-lock-0000000002

为什么会有三个节点?

/kaven/_c_b5d1ada9-6ef8-4e71-a3c5-222c0d249d6f-lock-0000000000
/kaven/_c_802055d5-07f1-4e47-8f46-4ec64f46405d-lock-0000000001
/kaven/_c_00ad570e-f4ae-461c-977f-b35fa6157406-lock-0000000002

因为线程pool-1-thread-1参加了两次分布式锁的获取。

Curator框架的分布式锁InterProcessMutex就介绍到这里,如果博主有说错的地方或者大家有不同的见解,欢迎大家评论补充。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/706215.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号