栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

使用 Redis 实现分布式锁案例

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

使用 Redis 实现分布式锁案例

目录
  • 1.整合redis到工程
    • 1.1添加redis配置类
  • 二、分布式锁
    • 2.1 本地锁的局限性
      • 2.1.1 编写测试代码
      • 2.1.2 使用ab工具测试
      • 2.1.3 使用本地锁
      • 2.1.4 本地锁问题演示锁
    • 2.2 分布式锁实现的解决方案
    • 2.3 使用redis实现分布式锁
      • 2.3.1 编写代码
      • 2.3.2 优化之设置锁的过期时间
      • 2.3.3 优化之UUID防误删
      • 2.3.4 优化之LUA脚本保证删除的原子性
      • 2.3.5 总结
    • 2.4 使用redisson 解决分布式锁
      • 2.4.1 实现代码
      • 2.4.2 可重入锁(Reentrant Lock)
      • 2.4.3 读写锁(ReadWriteLock)

1.整合redis到工程


  org.springframework.boot
  spring-boot-starter-data-redis




  org.apache.commons
  commons-pool2
  2.6.0




  org.redisson
  redisson
  3.11.2

server:
  port: 8206
spring:
  redis:
    host: xxxxx
    port: 6379
    database: 0
    timeout: 1800000
    password:
    lettuce:
      pool:
        max-active: 20 #最大连接数
        max-wait: -1    #最大阻塞等待时间(负数表示没限制)
        max-idle: 5    #最大空闲
        min-idle: 0     #最小空闲
1.1添加redis配置类
@Configuration
@EnableCaching
public class RedisConfig {

    
    @Bean
    public KeyGenerator wiselyKeyGenerator() {
        return new KeyGenerator() {
            @Override
            public Object generate(Object target, Method method, Object... params) {
                StringBuilder sb = new StringBuilder();
                sb.append(target.getClass().getName());
                sb.append(method.getName());
                for (Object obj : params) {
                    sb.append(obj.toString());
                }
                return sb.toString();
            }
        };
    }


    @Bean
    public RedisTemplate redisTemplate(RedisConnectionFactory redisConnectionFactory) {
        RedisTemplate redisTemplate = new RedisTemplate<>();
        redisTemplate.setConnectionFactory(redisConnectionFactory);
        // 设置序列化对象,固定写法
        Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
        ObjectMapper objectMapper = new ObjectMapper();
        objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
        jackson2JsonRedisSerializer.setObjectMapper(objectMapper);
        //  将Redis 中 string ,hash 数据类型,自动序列化!
        redisTemplate.setKeySerializer(new StringRedisSerializer());
        redisTemplate.setValueSerializer(jackson2JsonRedisSerializer);
        redisTemplate.setHashKeySerializer(new StringRedisSerializer());
        redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer);

        redisTemplate.afterPropertiesSet();
        return redisTemplate;
    }

    @Bean
    public CacheManager cacheManager(RedisConnectionFactory factory) {
        RedisSerializer redisSerializer = new StringRedisSerializer();
        Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);

        //解决查询缓存转换异常的问题
        ObjectMapper om = new ObjectMapper();
        om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
        jackson2JsonRedisSerializer.setObjectMapper(om);

        // 配置序列化(解决乱码的问题),过期时间600秒
        RedisCacheConfiguration config = RedisCacheConfiguration.defaultCacheConfig()
                .entryTtl(Duration.ofSeconds(600))
                .serializeKeysWith(RedisSerializationContext.SerializationPair.fromSerializer(redisSerializer))
                .serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(jackson2JsonRedisSerializer))
                .disableCachingNullValues();

        RedisCacheManager cacheManager = RedisCacheManager.builder(factory)
                .cacheDefaults(config)
                .build();
        return cacheManager;
    }
}

二、分布式锁 2.1 本地锁的局限性 2.1.1 编写测试代码

说明:通过reids客户端设置 num = 0

set num 0
@RestController
@RequestMapping("test")
public class TestController {

    @Autowired
    private TestService testService;

    @GetMapping("testLock")
    public Result testLock() {
        testService.testLock();
        return Result.ok();
    }
}
public interface TestService {

    
    void testLock();
}
@Service
public class TestServiceImpl implements TestService {

    @Autowired
    private StringRedisTemplate redisTemplate;

    
    @Override
    public void testLock() {
        // 利用缓存中的StringRedisTemplate,获取到当前的num数据值
        String num = redisTemplate.opsForValue().get("num");
        if (StringUtils.isEmpty(num)) {
            return;
        }
        // 如果num不为空,则需要对当前值+1操作
        int numValue = Integer.parseInt(num);
        // 写回缓存
        redisTemplate.opsForValue().set("num", String.valueOf(++numValue));
    }
}
2.1.2 使用ab工具测试

使用 ab 测试工具:httpd-tools(yum install -y httpd-tools)

ab -n(一次发送的请求数) -c(请求的并发数) 访问路径

测试如下:5000请求,100并发
ab -n 5000 -c 100 http://127.0.0.1:8206/test/testLock

结果应该为:5000

查看redis中的值:

2.1.3 使用本地锁

使用ab工具压力测试:5000次请求,并发100。

查看redis中的结果:

2.1.4 本地锁问题演示锁

接下来启动 8206 8216 8226 三个运行实例

运行多个service实例:

  1. server.port=8216
  2. server.port=8226

通过网关压力测试:
ab -n 5000 -c 100 http://127.0.0.1:8206/test/testLock

查看redis中的值:

以上测试,可以发现:本地锁只能锁住同一工程内的资源,在分布式系统里面都存在局限性,此时需要分布式锁。

2.2 分布式锁实现的解决方案

随着业务发展的需要,原单体单机部署的系统被演化成分布式集群系统后,由于分布式系统多线程、多进程并且分布在不同机器上,这将使原单机部署情况下的并发控制锁策略失效,单纯的 Java API 并不能提供分布式锁的能力。为了解决这个问题就需要一种跨 JVM 的互斥机制来控制共享资源的访问,这就是分布式锁要解决的问题。

分布式锁主流的实现方案:

  1. 基于数据库实现分布式锁
  2. 基于缓存(Redis等)
  3. 基于Zookeeper

每一种分布式锁解决方案都有各自的优缺点:

  1. 性能:redis最高
  2. 可靠性:zookeeper最高
2.3 使用redis实现分布式锁
  1. 多个客户端同时获取锁(setnx)
  2. 获取成功,执行业务逻辑 {从db获取数据,放入缓存} ,执行完成释放锁(del)
  3. 其他客户端等待重试

2.3.1 编写代码
@Override
    public void testLock() {
        // 使用setnx命令
        // setnx lock ok
        Boolean flag = redisTemplate.opsForValue().setIfAbsent("lock", "OK");
        if (flag) {
            // flag = true:表示获取到锁
            // 执行业务逻辑
            String num = redisTemplate.opsForValue().get("num");
            if (StringUtils.isEmpty(num)) {
                return;
            }
            int numValue = Integer.parseInt(num);
            redisTemplate.opsForValue().set("num", String.valueOf(++numValue));
            // 释放锁
            redisTemplate.delete("lock");
        } else {
            // 没有获取到锁
            try {
                Thread.sleep(100);
                // 每隔1秒钟回调一次,再次尝试获取锁(自旋)
                testLock();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

重启,服务集群,通过网关压力测试。

查看redis中num的值:

问题:setnx 刚好获取到锁,业务逻辑出现异常,导致锁无法释放。

解决:设置过期时间,自动释放锁。

2.3.2 优化之设置锁的过期时间

设置过期时间有两种方式:

  1. 首先想到通过 expire 设置过期时间(缺乏原子性:如果在 setnx 和 expire 之间出现异常,锁也无法释放)
  2. 在 set 时指定过期时间(推荐)

设置过期时间:

问题:可能会释放其他服务器的锁。

场景:如果业务逻辑的执行时间是7s,执行流程如下:

  1. index1业务逻辑没执行完,3秒后锁被自动释放;
  2. index2获取到锁,执行业务逻辑,3秒后锁被自动释放;
  3. index3获取到锁,执行业务逻辑;
  4. index1业务逻辑执行完成,开始调用del释放锁,这时释放的是index3的锁, 导致index3的业务只执行1s就被别人释放,最终等于没锁的情况。

解决:setnx获取锁时,设置一个指定的唯一值(例如:uuid);释放前获取这个值,判断是否自己的锁。

2.3.3 优化之UUID防误删


问题:删除操作缺乏原子性。

场景:

  1. index1执行删除时,查询到的 lock 值确实和 uuid 相等;
  2. index1执行删除前,lock 刚好过期时间已到,被 redis 自动释放,在redis中没有了锁;
  3. index2获取了lock,index2线程获取到了cpu的资源,开始执行方法;
  4. index1执行删除,此时会把 index2 的 lock 删除。

index1 因为已经在方法中了,所以不需要重新上锁,index1有执行的权限。index1已经比较完成了,这个时候,开始执行删除的index2的锁。

2.3.4 优化之LUA脚本保证删除的原子性
@Override
    public void testLock() {
        // 使用setnx命令
        // setnx lock ok
        String uuid = UUID.randomUUID().toString();
        Boolean flag = redisTemplate.opsForValue().setIfAbsent(
                "lock", uuid, 3, TimeUnit.SECONDS);
        if (flag) {
            // flag = true:表示获取到锁
            // 执行业务逻辑
            String num = redisTemplate.opsForValue().get("num");
            if (StringUtils.isEmpty(num)) {
                return;
            }
            int numValue = Integer.parseInt(num);
            redisTemplate.opsForValue().set("num", String.valueOf(++numValue));
            // 定义一个lua脚本
            String secript = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
            //  准备执行lua 脚本
            DefaultRedisScript redisScript = new DefaultRedisScript<>();
            // 设置lua脚本
            redisScript.setScriptText(secript);
            // 设置DefaultRedisScript 这个对象的泛型
            redisScript.setResultType(Long.class);
            //  redis调用lua脚本
            redisTemplate.execute(redisScript, Arrays.asList("lock"), uuid);
        } else {
            // 没有获取到锁
            try {
                Thread.sleep(100);
                // 每隔1秒钟回调一次,再次尝试获取锁(自旋)
                testLock();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
2.3.5 总结

为了确保分布式锁可用,我们至少要确保锁的实现同时满足以下四个条件:

  1. 互斥性。在任意时刻,只有一个客户端能持有锁;
  2. 不会发生死锁。即使有一个客户端在持有锁的期间崩溃而没有主动解锁,也能保证后续其他客户端能加锁;
  3. 解铃还须系铃人。加锁和解锁必须是同一个客户端,客户端自己不能把别人加的锁给解了;
  4. 加锁和解锁必须具有原子性
2.4 使用redisson 解决分布式锁

Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务。其中包括(BitSet, Set, Multimap, SortedSet, Map, List, Queue, BlockingQueue, Deque, BlockingDeque, Semaphore, Lock, AtomicLong, CountDownLatch, Publish / Subscribe, Bloom filter, Remote service, Spring cache, Executor service, Live Object service, Scheduler service) Redisson提供了使用Redis的最简单和最便捷的方法。Redisson的宗旨是促进使用者对Redis的关注分离(Separation of Concern),从而让使用者能够将精力更集中地放在处理业务逻辑上。

2.4.1 实现代码


   org.redisson
   redisson
   3.11.2

配置类

@Data
@Configuration
@ConfigurationProperties("spring.redis")
public class RedissonConfig {
    
    private String host;
    
    private String addresses;
    
    private String password;
    
    private String port;
    
    private int timeout = 3000;
    private int connectionPoolSize = 64;
    private int connectionMinimumIdleSize = 10;
    private int pingConnectionInterval = 60000;
    private static String ADDRESS_PREFIX = "redis://";
    
    
    @Bean
    RedissonClient redissonSingle() {
        Config config = new Config();
        // 判断地址是否为空
        if (StringUtils.isEmpty(host)) {
            throw new RuntimeException("host is empty");
        }
        SingleServerConfig serverConfig = config.useSingleServer()
            // //redis://127.0.0.1:6379
            .setAddress(ADDRESS_PREFIX + this.host + ":" + port)
            .setTimeout(this.timeout)
            .setPingConnectionInterval(pingConnectionInterval)
            .setConnectionPoolSize(this.connectionPoolSize)
            .setConnectionMinimumIdleSize(this.connectionMinimumIdleSize);
        // 是否需要密码
        if (!StringUtils.isEmpty(this.password)) {
            serverConfig.setPassword(this.password);
        }
        // RedissonClient redisson = Redisson.create(config);
        return Redisson.create(config);
    }
}

@Service
public class TestServiceImpl implements TestService {

    @Autowired
    private StringRedisTemplate redisTemplate;

    @Autowired
    private RedissonClient redissonClient;

    @Override
    public void testLock() {
        RLock lock = redissonClient.getLock("lock");
        // 开始加锁
        lock.lock();
        try {
            String value = redisTemplate.opsForValue().get("num");
            if (StringUtils.isNotEmpty(value)) {
                return;
            }
            int num = Integer.parseInt(value);
            redisTemplate.opsForValue().set("num", String.valueOf(++num));
        } catch (NumberFormatException e) {
            e.printStackTrace();
        } finally {
            // 解锁:
            lock.unlock();
        }
    }
}

2.4.2 可重入锁(Reentrant Lock)

基于Redis的Redisson分布式可重入锁RLock Java对象实现了java.util.concurrent.locks.Lock接口。如果负责储存这个分布式锁的Redisson节点宕机以后,而且这个锁正好处于锁住的状态时,这个锁会出现锁死的状态。为了避免这种情况的发生,Redisson内部提供了一个监控锁的看门狗,它的作用是在Redisson实例被关闭前,不断的延长锁的有效期。默认情况下,看门狗的检查锁的超时时间是30秒钟,也可以通过修改Config.lockWatchdogTimeout来另行指定。
另外Redisson还通过加锁的方法提供了leaseTime的参数来指定加锁的时间,超过这个时间后锁便自动解开了。

最常见的使用:

RLock lock = redisson.getLock("anyLock");
// 最常使用
lock.lock();
// 加锁以后10秒钟自动解锁
// 无需调用unlock方法手动解锁
lock.lock(10, TimeUnit.SECONDS);

// 尝试加锁,最多等待100秒,上锁以后10秒自动解锁
boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);
if (res) {
   try {
     ...
   } finally {
       lock.unlock();
   }
}
2.4.3 读写锁(ReadWriteLock)

基于Redis的Redisson分布式可重入读写锁RReadWriteLock Java对象实现了java.util.concurrent.locks.ReadWriteLock接口。其中读锁和写锁都继承了RLock接口。
分布式可重入读写锁允许同时有多个读锁和一个写锁处于加锁状态。

RReadWriteLock rwlock = redisson.getReadWriteLock("anyRWLock");
// 最常见的使用方法
rwlock.readLock().lock();
// 或
rwlock.writeLock().lock();

// 10秒钟以后自动解锁
// 无需调用unlock方法手动解锁
rwlock.readLock().lock(10, TimeUnit.SECONDS);
// 或
rwlock.writeLock().lock(10, TimeUnit.SECONDS);

// 尝试加锁,最多等待100秒,上锁以后10秒自动解锁
boolean res = rwlock.readLock().tryLock(100, 10, TimeUnit.SECONDS);
// 或
boolean res = rwlock.writeLock().tryLock(100, 10, TimeUnit.SECONDS);
...
lock.unlock();

代码实现

@GetMapping("read")
public Result read(){
    String msg = testService.readLock();
    return Result.ok(msg);
}

@GetMapping("write")
public Result write(){
    String msg = testService.writeLock();
    return Result.ok(msg);
}
public interface TestService {

    String readLock();

    String writeLock();
}
@Service
public class TestServiceImpl implements TestService {

    @Autowired
    private StringRedisTemplate redisTemplate;

    @Autowired
    private RedissonClient redissonClient;

    @Override
    public String readLock() {
        // 初始化读写锁
        RReadWriteLock readWriteLock = redissonClient.getReadWriteLock("readwriteLock");
        // 获取读锁
        RLock rLock = readWriteLock.readLock();
        // 加10s锁
        rLock.lock(10, TimeUnit.SECONDS);
        String msg = this.redisTemplate.opsForValue().get("msg");
        //rLock.unlock(); // 解锁
        return msg;
    }

    @Override
    public String writeLock() {
        // 初始化读写锁
        RReadWriteLock readWriteLock = redissonClient.getReadWriteLock("readwriteLock");
        // 获取写锁
        RLock rLock = readWriteLock.writeLock();
        // 加10s锁
        rLock.lock(10, TimeUnit.SECONDS);
        this.redisTemplate.opsForValue().set("msg", UUID.randomUUID().toString());
        //rLock.unlock(); // 解锁
        return "成功写入了内容";
    }
}

打开两个浏览器窗口测试:
http://localhost:8206/test/read
http://localhost:8206/test/write

  • 同时访问写:一个写完之后,等待一会儿(约10s),另一个写开始;
  • 同时访问读:不用等待;
  • 先写后读:读要等待(约10s)写完成;
  • 先读后写:写要等待(约10s)读完成;
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/1041262.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号