栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

从源码层面深度剖析Redisson实现分布式锁的原理(全程干货,注意收藏)

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

从源码层面深度剖析Redisson实现分布式锁的原理(全程干货,注意收藏)

Redis实现分布式锁的原理

前面讲了Redis在实际业务场景中的应用,那么下面再来了解一下Redisson功能性场景的应用,也就是大家经常使用的分布式锁的实现场景。

  • 引入redisson依赖

    
        org.redisson
        redisson
        3.16.0
    
  • 编写简单的测试代码

    public class RedissonTest {
    
        private static RedissonClient redissonClient;
    
        static {
            Config config=new Config();
            config.useSingleServer().setAddress("redis://192.168.221.128:6379");
            redissonClient=Redisson.create(config);
        }
    
        public static void main(String[] args) throws InterruptedException {
            RLock rLock=redissonClient.getLock("updateOrder");
            //最多等待100秒、上锁10s以后自动解锁
            if(rLock.tryLock(100,10,TimeUnit.SECONDS)){
                System.out.println("获取锁成功");
            }
            Thread.sleep(2000);
            rLock.unlock();
    
            redissonClient.shutdown();
        }
    }
    

Redisson分布式锁的实现原理

你们会发现,通过redisson,非常简单就可以实现我们所需要的功能,当然这只是redisson的冰山一角,redisson最强大的地方就是提供了分布式特性的常用工具类。使得原本作为协调单机多线程并发程序的并发程序的工具包获得了协调分布式多级多线程并发系统的能力,降低了程序员在分布式环境下解决分布式问题的难度,下面分析一下RedissonLock的实现原理

RedissonLock.tryLock
@Override
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
    long time = unit.toMillis(waitTime);
    long current = System.currentTimeMillis();
    long threadId = Thread.currentThread().getId();
    //通过tryAcquire方法尝试获取锁
    Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
    // lock acquired
    if (ttl == null) { //表示成功获取到锁,直接返回
        return true;
    }
    //省略部分代码....
}

tryAcquire
private  RFuture tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
    RFuture ttlRemainingFuture;
    //leaseTime就是租约时间,就是redis key的过期时间。
    if (leaseTime != -1) { //如果设置过期时间
        ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.eval_LONG);
    } else {//如果没设置了过期时间,则从配置中获取key超时时间,默认是30s过期
        ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
                                               TimeUnit.MILLISECONDS, threadId, RedisCommands.eval_LONG);
    }
    //当tryLockInnerAsync执行结束后,触发下面回调
    ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
        if (e != null) { //说明出现异常,直接返回
            return;
        }
        // lock acquired
        if (ttlRemaining == null) { //表示第一次设置锁键
            if (leaseTime != -1) { //表示设置过超时时间,更新internalLockLeaseTime,并返回
                internalLockLeaseTime = unit.toMillis(leaseTime);
            } else { //leaseTime=-1,启动Watch Dog
                scheduleExpirationRenewal(threadId);
            }
        }
    });
    return ttlRemainingFuture;
}

tryLockInnerAsync

通过lua脚本来实现加锁的操作

  1. 判断lock键是否存在,不存在直接调用hset存储当前线程信息并且设置过期时间,返回nil,告诉客户端直接获取到锁。

  2. 判断lock键是否存在,存在则将重入次数加1,并重新设置过期时间,返回nil,告诉客户端直接获取到锁。

  3. 被其它线程已经锁定,返回锁有效期的剩余时间,告诉客户端需要等待。

 RFuture tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand command) {
    return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
                          "if (redis.call('exists', KEYS[1]) == 0) then " +
                          "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                          "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                          "return nil; " +
                          "end; " +
                          "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                          "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                          "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                          "return nil; " +
                          "end; " +
                          "return redis.call('pttl', KEYS[1]);",
                          Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
}

关于Lua脚本,我们稍后再解释。

unlock释放锁流程

释放锁的流程,脚本看起来会稍微复杂一点

  1. 如果lock键不存在,通过publish指令发送一个消息表示锁已经可用。

  2. 如果锁不是被当前线程锁定,则返回nil

  3. 由于支持可重入,在解锁时将重入次数需要减1

  4. 如果计算后的重入次数>0,则重新设置过期时间

  5. 如果计算后的重入次数<=0,则发消息说锁已经可用

protected RFuture unlockInnerAsync(long threadId) {
    return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.eval_BOOLEAN,
                          "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                          "return nil;" +
                          "end; " +
                          "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
                          "if (counter > 0) then " +
                          "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                          "return 0; " +
                          "else " +
                          "redis.call('del', KEYS[1]); " +
                          "redis.call('publish', KEYS[2], ARGV[1]); " +
                          "return 1; " +
                          "end; " +
                          "return nil;",
                          Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
}

RedissonLock有竞争的情况

有竞争的情况在redis端的lua脚本是相同的,只是不同的条件执行不同的redis命令。当通过tryAcquire发现锁被其它线程申请时,需要进入等待竞争逻辑中

  1. this.await返回false,说明等待时间已经超出获取锁最大等待时间,取消订阅并返回获取锁失败

  2. this.await返回true,进入循环尝试获取锁。

继续看RedissonLock.tryLock后半部分代码如下:

public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
//省略部分代码
        time -= System.currentTimeMillis() - current;
        if (time <= 0) {
            acquireFailed(waitTime, unit, threadId);
            return false;
        }
        current = System.currentTimeMillis();
       // 订阅监听redis消息,并且创建RedissonLockEntry
        RFuture subscribeFuture = subscribe(threadId);
      // 阻塞等待subscribe的future的结果对象,如果subscribe方法调用超过了time,说明已经超过了客户端设置的最大wait time,则直接返回false,取消订阅,不再继续申请锁了。
        if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) {
            if (!subscribeFuture.cancel(false)) { //取消订阅
                subscribeFuture.onComplete((res, e) -> {
                    if (e == null) {
                        unsubscribe(subscribeFuture, threadId);
                    }
                });
            }
            acquireFailed(waitTime, unit, threadId); //表示抢占锁失败
            return false; //返回false
        }
        try {
            //判断是否超时,如果等待超时,返回获的锁失败
            time -= System.currentTimeMillis() - current;
            if (time <= 0) {
                acquireFailed(waitTime, unit, threadId);
                return false;
            }
            //通过while循环再次尝试竞争锁
            while (true) { 
                long currentTime = System.currentTimeMillis();
                ttl = tryAcquire(waitTime, leaseTime, unit, threadId); //竞争锁,返回锁超时时间
                // lock acquired
                if (ttl == null) { //如果超时时间为null,说明获得锁成功
                    return true;
                }
                //判断是否超时,如果超时,表示获取锁失败
                time -= System.currentTimeMillis() - currentTime;
                if (time <= 0) {
                    acquireFailed(waitTime, unit, threadId);
                    return false;
                }

                // 通过信号量(共享锁)阻塞,等待解锁消息.  (减少申请锁调用的频率)
                // 如果剩余时间(ttl)小于wait time ,就在 ttl 时间内,从Entry的信号量获取一个许可(除非被中断或者一直没有可用的许可)。
                // 否则就在wait time 时间范围内等待可以通过信号量
                currentTime = System.currentTimeMillis();
                if (ttl >= 0 && ttl < time) {
                    subscribeFuture.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                } else {
                    subscribeFuture.getNow().getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
                }
                // 更新等待时间(最大等待时间-已经消耗的阻塞时间)
                time -= System.currentTimeMillis() - currentTime;
                if (time <= 0) { //获取锁失败
                    acquireFailed(waitTime, unit, threadId);
                    return false;
                }
            }
        } finally {
            unsubscribe(subscribeFuture, threadId); //取消订阅
        }
//        return get(tryLockAsync(waitTime, leaseTime, unit));
    }

锁过期了怎么办?

一般来说,我们去获得分布式锁时,为了避免死锁的情况,我们会对锁设置一个超时时间,但是有一种情况是,如果在指定时间内当前线程没有执行完,由于锁超时导致锁被释放,那么其他线程就会拿到这把锁,从而导致一些故障。

为了避免这种情况,Redisson引入了一个Watch Dog机制,这个机制是针对分布式锁来实现锁的自动续约,简单来说,如果当前获得锁的线程没有执行完,那么Redisson会自动给Redis中目标key延长超时时间。

默认情况下,看门狗的续期时间是30s,也可以通过修改Config.lockWatchdogTimeout来另行指定。

@Override
public boolean tryLock(long waitTime, TimeUnit unit) throws InterruptedException {
    return tryLock(waitTime, -1, unit);  //leaseTime=-1
}

实际上,当我们通过tryLock方法没有传递超时时间时,默认会设置一个30s的超时时间,避免出现死锁的问题。

private  RFuture tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
    RFuture ttlRemainingFuture;
    if (leaseTime != -1) { 
        ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.eval_LONG);
    } else { //当leaseTime为-1时,leaseTime=internalLockLeaseTime,默认是30s,表示当前锁的过期时间。

        //this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
        ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
                                               TimeUnit.MILLISECONDS, threadId, RedisCommands.eval_LONG);
    }
    ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
        if (e != null) { //说明出现异常,直接返回
            return;
        }
        // lock acquired
        if (ttlRemaining == null) { //表示第一次设置锁键
            if (leaseTime != -1) { //表示设置过超时时间,更新internalLockLeaseTime,并返回
                internalLockLeaseTime = unit.toMillis(leaseTime);
            } else { //leaseTime=-1,启动Watch Dog
                scheduleExpirationRenewal(threadId);
            }
        }
    });
    return ttlRemainingFuture;
}

由于默认设置了一个30s的过期时间,为了防止过期之后当前线程还未执行完,所以通过定时任务对过期时间进行续约。

  • 首先,会先判断在expirationRenewalMap中是否存在了entryName,这是个map结构,主要还是判断在这个服务实例中的加锁客户端的锁key是否存在,
  • 如果已经存在了,就直接返回;主要是考虑到RedissonLock是可重入锁。
protected void scheduleExpirationRenewal(long threadId) {
    ExpirationEntry entry = new ExpirationEntry();
    ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
    if (oldEntry != null) {
        oldEntry.addThreadId(threadId);
    } else {// 第一次加锁的时候会调用,内部会启动WatchDog
        entry.addThreadId(threadId);
        renewExpiration();

    }
}

定义一个定时任务,该任务中调用renewExpirationAsync方法进行续约。

private void renewExpiration() {
    ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
    if (ee == null) {
        return;
    }
    //用到了时间轮机制
    Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
        @Override
        public void run(Timeout timeout) throws Exception {
            ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
            if (ent == null) {
                return;
            }
            Long threadId = ent.getFirstThreadId();
            if (threadId == null) {
                return;
            }
            // renewExpirationAsync续约租期
            RFuture future = renewExpirationAsync(threadId);
            future.onComplete((res, e) -> {
                if (e != null) {
                    log.error("Can't update lock " + getRawName() + " expiration", e);
                    EXPIRATION_RENEWAL_MAP.remove(getEntryName());
                    return;
                }

                if (res) {
                    // reschedule itself
                    renewExpiration();
                }
            });
        }
    }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);//每次间隔租期的1/3时间执行

    ee.setTimeout(task);
}

执行Lua脚本,对指定的key进行续约。

protected RFuture renewExpirationAsync(long threadId) {
    return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.eval_BOOLEAN,
                          "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                          "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                          "return 1; " +
                          "end; " +
                          "return 0;",
                          Collections.singletonList(getRawName()),
                          internalLockLeaseTime, getLockName(threadId));
}

Lua脚本

Lua是一个高效的轻量级脚本语言(和Javascript类似),用标准C语言编写并以源代码形式开放, 其设计目的是为了嵌入应用程序中,从而为应用程序提供灵活的扩展和定制功能。Lua在葡萄牙语中是“月亮”的意思,它的logo形式卫星,寓意是Lua是一个“卫星语言”,能够方便地嵌入到其他语言中使用;其实在很多常见的框架中,都有嵌入Lua脚本的功能,比如OpenResty、Redis等。

使用Lua脚本的好处:

  1. 减少网络开销,在Lua脚本中可以把多个命令放在同一个脚本中运行

  2. 原子操作,redis会将整个脚本作为一个整体执行,中间不会被其他命令插入。换句话说,编写脚本的过程中无需担心会出现竞态条件

  3. 复用性,客户端发送的脚本会永远存储在redis中,这意味着其他客户端可以复用这一脚本来完成同样的逻辑

Lua的下载和安装

Lua是一个独立的脚本语言,所以它有专门的编译执行工具,下面简单带大家安装一下。

  • 下载Lua源码包: https://www.lua.org/download.html

    https://www.lua.org/ftp/lua-5.4.3.tar.gz

  • 安装步骤如下

    tar -zxvf lua-5.4.3.tar.gz
    cd lua-5.4.3
    make linux
    make install

如果报错,说找不到readline/readline.h, 可以通过yum命令安装

yum -y install readline-devel ncurses-devel

最后,直接输入lua命令即可进入lua的控制台。Lua脚本有自己的语法、变量、逻辑运算符、函数等,这块我就不在这里做过多的说明,用过Javascript的同学,应该只需要花几个小时就可以全部学完,简单演示两个案例如下。

array = {"Lua", "mic"}
for i= 0, 2 do
   print(array[i])
end
array = {"mic", "redis"}

for key,value in ipairs(array)
do
   print(key, value)
end

Redis与Lua

Redis中集成了Lua的编译和执行器,所以我们可以在Redis中定义Lua脚本去执行。同时,在Lua脚本中,可以直接调用Redis的命令,来操作Redis中的数据。

redis.call(‘set’,'hello','world')

local value=redis.call(‘get’,’hello’) 

redis.call 函数的返回值就是redis命令的执行结果,前面我们介绍过redis的5中类型的数据返回的值的类型也都不一样,redis.call函数会将这5种类型的返回值转化对应的Lua的数据类型

在很多情况下我们都需要脚本可以有返回值,毕竟这个脚本也是一个我们所编写的命令集,我们可以像调用其他redis内置命令一样调用我们自己写的脚本,所以同样redis会自动将脚本返回值的Lua数据类型转化为Redis的返回值类型。 在脚本中可以使用return 语句将值返回给redis客户端,通过return语句来执行,如果没有执行return,默认返回为nil。

Redis中执行Lua脚本相关的命令

编写完脚本后最重要的就是在程序中执行脚本。Redis提供了eval命令可以使开发者像调用其他Redis内置命令一样调用脚本。

eval命令-执行脚本

[eval] [脚本内容] [key参数的数量] [key …] [arg …]

可以通过key和arg这两个参数向脚本中传递数据,他们的值可以在脚本中分别使用KEYS和ARGV 这两个类型的全局变量访问。

比如我们通过脚本实现一个set命令,通过在redis客户端中调用,那么执行的语句是:

eval "return redis.call('set',KEYS[1],ARGV[1])" 1 lua hello

上述脚本相当于使用Lua脚本调用了Redis的set命令,存储了一个key=lua,value=hello到Redis中。

evalSHA命令

考虑到我们通过eval执行lua脚本,脚本比较长的情况下,每次调用脚本都需要把整个脚本传给redis,比较占用带宽。为了解决这个问题,redis提供了evalSHA命令允许开发者通过脚本内容的SHA1摘要来执行脚本。该命令的用法和eval一样,只不过是将脚本内容替换成脚本内容的SHA1摘要

  1. Redis在执行eval命令时会计算脚本的SHA1摘要并记录在脚本缓存中

  2. 执行evalSHA命令时Redis会根据提供的摘要从脚本缓存中查找对应的脚本内容,如果找到了就执行脚本,否则返回“NOscript No matching script,Please use eval”

    # 将脚本加入缓存并生成sha1命令
    script load "return redis.call('get','lua')"
    # ["13bd040587b891aedc00a72458cbf8588a27df90"]
    # 传递sha1的值来执行该命令
    evalsha "13bd040587b891aedc00a72458cbf8588a27df90" 0

Redisson执行Lua脚本

通过lua脚本来实现一个访问频率限制功能。

思路,定义一个key,key中包含ip地址。 value为指定时间内的访问次数,比如说是10秒内只能访问3次。