- 1.整合redis到工程
- 1.1添加redis配置类
- 二、分布式锁
- 2.1 本地锁的局限性
- 2.1.1 编写测试代码
- 2.1.2 使用ab工具测试
- 2.1.3 使用本地锁
- 2.1.4 本地锁问题演示锁
- 2.2 分布式锁实现的解决方案
- 2.3 使用redis实现分布式锁
- 2.3.1 编写代码
- 2.3.2 优化之设置锁的过期时间
- 2.3.3 优化之UUID防误删
- 2.3.4 优化之LUA脚本保证删除的原子性
- 2.3.5 总结
- 2.4 使用redisson 解决分布式锁
- 2.4.1 实现代码
- 2.4.2 可重入锁(Reentrant Lock)
- 2.4.3 读写锁(ReadWriteLock)
org.springframework.boot spring-boot-starter-data-redis org.apache.commons commons-pool2 2.6.0 org.redisson redisson 3.11.2
server:
port: 8206
spring:
redis:
host: xxxxx
port: 6379
database: 0
timeout: 1800000
password:
lettuce:
pool:
max-active: 20 #最大连接数
max-wait: -1 #最大阻塞等待时间(负数表示没限制)
max-idle: 5 #最大空闲
min-idle: 0 #最小空闲
1.1添加redis配置类
@Configuration
@EnableCaching
public class RedisConfig {
@Bean
public KeyGenerator wiselyKeyGenerator() {
return new KeyGenerator() {
@Override
public Object generate(Object target, Method method, Object... params) {
StringBuilder sb = new StringBuilder();
sb.append(target.getClass().getName());
sb.append(method.getName());
for (Object obj : params) {
sb.append(obj.toString());
}
return sb.toString();
}
};
}
@Bean
public RedisTemplate
二、分布式锁
2.1 本地锁的局限性
2.1.1 编写测试代码
说明:通过reids客户端设置 num = 0
set num 0
@RestController
@RequestMapping("test")
public class TestController {
@Autowired
private TestService testService;
@GetMapping("testLock")
public Result testLock() {
testService.testLock();
return Result.ok();
}
}
public interface TestService {
void testLock();
}
@Service
public class TestServiceImpl implements TestService {
@Autowired
private StringRedisTemplate redisTemplate;
@Override
public void testLock() {
// 利用缓存中的StringRedisTemplate,获取到当前的num数据值
String num = redisTemplate.opsForValue().get("num");
if (StringUtils.isEmpty(num)) {
return;
}
// 如果num不为空,则需要对当前值+1操作
int numValue = Integer.parseInt(num);
// 写回缓存
redisTemplate.opsForValue().set("num", String.valueOf(++numValue));
}
}
2.1.2 使用ab工具测试
使用 ab 测试工具:httpd-tools(yum install -y httpd-tools)
ab -n(一次发送的请求数) -c(请求的并发数) 访问路径
测试如下:5000请求,100并发
ab -n 5000 -c 100 http://127.0.0.1:8206/test/testLock
结果应该为:5000
查看redis中的值:
使用ab工具压力测试:5000次请求,并发100。
查看redis中的结果:
接下来启动 8206 8216 8226 三个运行实例
运行多个service实例:
- server.port=8216
- server.port=8226
通过网关压力测试:
ab -n 5000 -c 100 http://127.0.0.1:8206/test/testLock
查看redis中的值:
以上测试,可以发现:本地锁只能锁住同一工程内的资源,在分布式系统里面都存在局限性,此时需要分布式锁。
2.2 分布式锁实现的解决方案随着业务发展的需要,原单体单机部署的系统被演化成分布式集群系统后,由于分布式系统多线程、多进程并且分布在不同机器上,这将使原单机部署情况下的并发控制锁策略失效,单纯的 Java API 并不能提供分布式锁的能力。为了解决这个问题就需要一种跨 JVM 的互斥机制来控制共享资源的访问,这就是分布式锁要解决的问题。
分布式锁主流的实现方案:
- 基于数据库实现分布式锁
- 基于缓存(Redis等)
- 基于Zookeeper
每一种分布式锁解决方案都有各自的优缺点:
- 性能:redis最高
- 可靠性:zookeeper最高
- 多个客户端同时获取锁(setnx)
- 获取成功,执行业务逻辑 {从db获取数据,放入缓存} ,执行完成释放锁(del)
- 其他客户端等待重试
@Override
public void testLock() {
// 使用setnx命令
// setnx lock ok
Boolean flag = redisTemplate.opsForValue().setIfAbsent("lock", "OK");
if (flag) {
// flag = true:表示获取到锁
// 执行业务逻辑
String num = redisTemplate.opsForValue().get("num");
if (StringUtils.isEmpty(num)) {
return;
}
int numValue = Integer.parseInt(num);
redisTemplate.opsForValue().set("num", String.valueOf(++numValue));
// 释放锁
redisTemplate.delete("lock");
} else {
// 没有获取到锁
try {
Thread.sleep(100);
// 每隔1秒钟回调一次,再次尝试获取锁(自旋)
testLock();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
重启,服务集群,通过网关压力测试。
查看redis中num的值:
问题:setnx 刚好获取到锁,业务逻辑出现异常,导致锁无法释放。
解决:设置过期时间,自动释放锁。
2.3.2 优化之设置锁的过期时间设置过期时间有两种方式:
- 首先想到通过 expire 设置过期时间(缺乏原子性:如果在 setnx 和 expire 之间出现异常,锁也无法释放)
- 在 set 时指定过期时间(推荐)
设置过期时间:
问题:可能会释放其他服务器的锁。
场景:如果业务逻辑的执行时间是7s,执行流程如下:
- index1业务逻辑没执行完,3秒后锁被自动释放;
- index2获取到锁,执行业务逻辑,3秒后锁被自动释放;
- index3获取到锁,执行业务逻辑;
- index1业务逻辑执行完成,开始调用del释放锁,这时释放的是index3的锁, 导致index3的业务只执行1s就被别人释放,最终等于没锁的情况。
解决:setnx获取锁时,设置一个指定的唯一值(例如:uuid);释放前获取这个值,判断是否自己的锁。
2.3.3 优化之UUID防误删
问题:删除操作缺乏原子性。
场景:
- index1执行删除时,查询到的 lock 值确实和 uuid 相等;
- index1执行删除前,lock 刚好过期时间已到,被 redis 自动释放,在redis中没有了锁;
- index2获取了lock,index2线程获取到了cpu的资源,开始执行方法;
- index1执行删除,此时会把 index2 的 lock 删除。
index1 因为已经在方法中了,所以不需要重新上锁,index1有执行的权限。index1已经比较完成了,这个时候,开始执行删除的index2的锁。
2.3.4 优化之LUA脚本保证删除的原子性@Override
public void testLock() {
// 使用setnx命令
// setnx lock ok
String uuid = UUID.randomUUID().toString();
Boolean flag = redisTemplate.opsForValue().setIfAbsent(
"lock", uuid, 3, TimeUnit.SECONDS);
if (flag) {
// flag = true:表示获取到锁
// 执行业务逻辑
String num = redisTemplate.opsForValue().get("num");
if (StringUtils.isEmpty(num)) {
return;
}
int numValue = Integer.parseInt(num);
redisTemplate.opsForValue().set("num", String.valueOf(++numValue));
// 定义一个lua脚本
String secript = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
// 准备执行lua 脚本
DefaultRedisScript redisScript = new DefaultRedisScript<>();
// 设置lua脚本
redisScript.setScriptText(secript);
// 设置DefaultRedisScript 这个对象的泛型
redisScript.setResultType(Long.class);
// redis调用lua脚本
redisTemplate.execute(redisScript, Arrays.asList("lock"), uuid);
} else {
// 没有获取到锁
try {
Thread.sleep(100);
// 每隔1秒钟回调一次,再次尝试获取锁(自旋)
testLock();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
2.3.5 总结
为了确保分布式锁可用,我们至少要确保锁的实现同时满足以下四个条件:
- 互斥性。在任意时刻,只有一个客户端能持有锁;
- 不会发生死锁。即使有一个客户端在持有锁的期间崩溃而没有主动解锁,也能保证后续其他客户端能加锁;
- 解铃还须系铃人。加锁和解锁必须是同一个客户端,客户端自己不能把别人加的锁给解了;
- 加锁和解锁必须具有原子性
Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务。其中包括(BitSet, Set, Multimap, SortedSet, Map, List, Queue, BlockingQueue, Deque, BlockingDeque, Semaphore, Lock, AtomicLong, CountDownLatch, Publish / Subscribe, Bloom filter, Remote service, Spring cache, Executor service, Live Object service, Scheduler service) Redisson提供了使用Redis的最简单和最便捷的方法。Redisson的宗旨是促进使用者对Redis的关注分离(Separation of Concern),从而让使用者能够将精力更集中地放在处理业务逻辑上。
2.4.1 实现代码org.redisson redisson 3.11.2
配置类
@Data
@Configuration
@ConfigurationProperties("spring.redis")
public class RedissonConfig {
private String host;
private String addresses;
private String password;
private String port;
private int timeout = 3000;
private int connectionPoolSize = 64;
private int connectionMinimumIdleSize = 10;
private int pingConnectionInterval = 60000;
private static String ADDRESS_PREFIX = "redis://";
@Bean
RedissonClient redissonSingle() {
Config config = new Config();
// 判断地址是否为空
if (StringUtils.isEmpty(host)) {
throw new RuntimeException("host is empty");
}
SingleServerConfig serverConfig = config.useSingleServer()
// //redis://127.0.0.1:6379
.setAddress(ADDRESS_PREFIX + this.host + ":" + port)
.setTimeout(this.timeout)
.setPingConnectionInterval(pingConnectionInterval)
.setConnectionPoolSize(this.connectionPoolSize)
.setConnectionMinimumIdleSize(this.connectionMinimumIdleSize);
// 是否需要密码
if (!StringUtils.isEmpty(this.password)) {
serverConfig.setPassword(this.password);
}
// RedissonClient redisson = Redisson.create(config);
return Redisson.create(config);
}
}
@Service
public class TestServiceImpl implements TestService {
@Autowired
private StringRedisTemplate redisTemplate;
@Autowired
private RedissonClient redissonClient;
@Override
public void testLock() {
RLock lock = redissonClient.getLock("lock");
// 开始加锁
lock.lock();
try {
String value = redisTemplate.opsForValue().get("num");
if (StringUtils.isNotEmpty(value)) {
return;
}
int num = Integer.parseInt(value);
redisTemplate.opsForValue().set("num", String.valueOf(++num));
} catch (NumberFormatException e) {
e.printStackTrace();
} finally {
// 解锁:
lock.unlock();
}
}
}
2.4.2 可重入锁(Reentrant Lock)
基于Redis的Redisson分布式可重入锁RLock Java对象实现了java.util.concurrent.locks.Lock接口。如果负责储存这个分布式锁的Redisson节点宕机以后,而且这个锁正好处于锁住的状态时,这个锁会出现锁死的状态。为了避免这种情况的发生,Redisson内部提供了一个监控锁的看门狗,它的作用是在Redisson实例被关闭前,不断的延长锁的有效期。默认情况下,看门狗的检查锁的超时时间是30秒钟,也可以通过修改Config.lockWatchdogTimeout来另行指定。
另外Redisson还通过加锁的方法提供了leaseTime的参数来指定加锁的时间,超过这个时间后锁便自动解开了。
最常见的使用:
RLock lock = redisson.getLock("anyLock");
// 最常使用
lock.lock();
// 加锁以后10秒钟自动解锁
// 无需调用unlock方法手动解锁
lock.lock(10, TimeUnit.SECONDS);
// 尝试加锁,最多等待100秒,上锁以后10秒自动解锁
boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);
if (res) {
try {
...
} finally {
lock.unlock();
}
}
2.4.3 读写锁(ReadWriteLock)
基于Redis的Redisson分布式可重入读写锁RReadWriteLock Java对象实现了java.util.concurrent.locks.ReadWriteLock接口。其中读锁和写锁都继承了RLock接口。
分布式可重入读写锁允许同时有多个读锁和一个写锁处于加锁状态。
RReadWriteLock rwlock = redisson.getReadWriteLock("anyRWLock");
// 最常见的使用方法
rwlock.readLock().lock();
// 或
rwlock.writeLock().lock();
// 10秒钟以后自动解锁
// 无需调用unlock方法手动解锁
rwlock.readLock().lock(10, TimeUnit.SECONDS);
// 或
rwlock.writeLock().lock(10, TimeUnit.SECONDS);
// 尝试加锁,最多等待100秒,上锁以后10秒自动解锁
boolean res = rwlock.readLock().tryLock(100, 10, TimeUnit.SECONDS);
// 或
boolean res = rwlock.writeLock().tryLock(100, 10, TimeUnit.SECONDS);
...
lock.unlock();
代码实现
@GetMapping("read")
public Result read(){
String msg = testService.readLock();
return Result.ok(msg);
}
@GetMapping("write")
public Result write(){
String msg = testService.writeLock();
return Result.ok(msg);
}
public interface TestService {
String readLock();
String writeLock();
}
@Service
public class TestServiceImpl implements TestService {
@Autowired
private StringRedisTemplate redisTemplate;
@Autowired
private RedissonClient redissonClient;
@Override
public String readLock() {
// 初始化读写锁
RReadWriteLock readWriteLock = redissonClient.getReadWriteLock("readwriteLock");
// 获取读锁
RLock rLock = readWriteLock.readLock();
// 加10s锁
rLock.lock(10, TimeUnit.SECONDS);
String msg = this.redisTemplate.opsForValue().get("msg");
//rLock.unlock(); // 解锁
return msg;
}
@Override
public String writeLock() {
// 初始化读写锁
RReadWriteLock readWriteLock = redissonClient.getReadWriteLock("readwriteLock");
// 获取写锁
RLock rLock = readWriteLock.writeLock();
// 加10s锁
rLock.lock(10, TimeUnit.SECONDS);
this.redisTemplate.opsForValue().set("msg", UUID.randomUUID().toString());
//rLock.unlock(); // 解锁
return "成功写入了内容";
}
}
打开两个浏览器窗口测试:
http://localhost:8206/test/read
http://localhost:8206/test/write
- 同时访问写:一个写完之后,等待一会儿(约10s),另一个写开始;
- 同时访问读:不用等待;
- 先写后读:读要等待(约10s)写完成;
- 先读后写:写要等待(约10s)读完成;



