首先需要安装Redis,如何安装可以看我的这篇文章
接下来说明如何使用,以及一些Redis的相关知识。
哪些数据适合放入缓存?
- 即时性、数据一致性要求不高
- 访问量大且更新频率不高的数据(读多,写少)
本地缓存:和微服务同一个进程。缺点:分布式时本地缓存不能共享
分布式缓存:缓存中间件
2. application.yml配置redisorg.springframework.boot spring-boot-starter-data-redis
配置redis主机地址
spring:
redis:
host: xxx(你的ip)
port: 6379
3. 自动注入RedisTemplate
public class RedisTests{
@Autowired
StringRedisTemplate stringRedisTemplate;
public void testStringRedisTemplate(){
ValueOperations ops = stringRedisTemplate.opsForValue();
// 保存
ops.set("hello", "world_" + UUID.randomUUID().toString());
// 查询
String hello = ops.get("hello");
System.out.println(hello);
}
}
三、改造实例
public class CategoryServiceImpl extends ServiceImpl四、缓存失效问题 1. 缓存穿透implements CategoryService{ @Autowired CategoryBrandRelationService categoryBrandRelationService; @Autowired private StringRedisTemplate redisTemplate; @Override private Map > getCatalogJson(){ // 给缓存中放json字符串,拿出的json字符串,用逆转为能用的对象类型(序列化与反序列化)。 // 1. 加入缓存逻辑,缓存中存的数据是json字符串。 json的优点是:跨语言,跨平台兼容 String catalogJSON = redisTemplate.opsForValue().get("catalogJSON"); if(StringUtils.isEmpty(catalogJSON )){ // 2. 缓存中没有,查询数据库 Map > catalogJsonFromDb = getCatalogJsonFromDb(); // 3. 查到的数据再放入缓存,将对象转为json放入缓存中 // 使用alibaba的fastjson包,可以将任意对象转换为json字符串 String s = JSON.toJSONString(catalogJsonFromDb); redisTemplate.opsForValue().set("catalogJSON", s); return catalogJsonFromDb; } // 转为指定的对象 Map > result = JSON.parseObject(catalogJSON, new TypeReference
缓存穿透:查询一个一定不存在的数据,由于缓存是不命中的,将去查询数据库,但是数据库也无此记录,没有将这次查询的null写入缓存,这将导致这个不存在的数据每次请求都要到存储层去查询,失去了缓存的意义。
风险:利用不存在的数据进行攻击,数据库瞬间压力增大,最终导致崩溃。
解决:缓存空对象、布隆过滤器、mvc拦截器
2. 缓存雪崩缓存雪崩是指在我们设置缓存时key采用了相同的过期时间,导致缓存在某一时刻
同时失效,请求全部转发到DB,DB瞬时压力过重雪崩。
解决方案:
规避雪崩:缓存数据的过期时间设置随机,防止同一时间大量数据过期现象发生。
如果缓存数据库是分布式部署,将热点数据均匀分布在不同缓存数据库中。
设置热点数据永远不过期。
出现雪崩:降级 熔断
事前:尽量保证整个 redis 集群的高可用性,发现机器宕机尽快补上。选择合适的内存淘汰策略。
事中:本地ehcache缓存 + hystrix限流&降级,避免MySQL崩掉
事后:利用 redis 持久化机制保存的数据尽快恢复缓存
缓存雪崩和缓存击穿不同的是:
缓存击穿 指 并发查同一条数据。缓存击穿是指缓存中没有但数据库中有的数据
(一般是缓存时间到期),这时由于并发用户特别多,同时读缓存没读到数据,
又同时去数据库去取数据,引起数据库压力瞬间增大,造成过大压力
缓存雪崩是不同数据都过期了,很多数据都查不到从而查数据库。
解决方案:
设置热点数据永远不过期。
加互斥锁:业界比较常用的做法,是使用mutex。简单地来说,就是在缓存失效
的时候(判断拿出来的值为空),不是立即去load db去数据库加载,而是先使用
缓存工具的某些带成功操作返回值的操作(比如Redis的SETNX或者Memcache
的ADD)去set一个mutex key,当操作返回成功时,再进行load db的操作并回设
缓存;否则,就重试整个get缓存的方法。
缓存击穿:加锁
不好的方法是synchronized(this),肯定不能这么写 ,不具体写了
锁时序问题:之前的逻辑是查缓存没有,然后取竞争锁查数据库,这样就造成多
次查数据库。
解决方法:竞争到锁后,再次确认缓存中没有,再去查数据库。
五、修改代码,解决缓存失效问题 1. 非分布式解决方案public class CategoryServiceImpl extends ServiceImpl2. 分布式解决方案implements CategoryService{ @Autowired CategoryBrandRelationService categoryBrandRelationService; @Autowired private StringRedisTemplate redisTemplate; @Override private Map > getCatalogJson(){ // 给缓存中放json字符串,拿出的json字符串,用逆转为能用的对象类型(序列化与反序列化)。 // 1. 加入缓存逻辑,缓存中存的数据是json字符串。 json的优点是:跨语言,跨平台兼容 String catalogJSON = redisTemplate.opsForValue().get("catalogJSON"); if(StringUtils.isEmpty(catalogJSON )){ // 2. 缓存中没有,查询数据库 Map > catalogJsonFromDb = getCatalogJsonFromDb(); return catalogJsonFromDb; } // 转为指定的对象 Map > result = JSON.parseObject(catalogJSON, new TypeReference
分布式锁
分布式项目时,但本地锁只能锁住当前服务,需要分布式锁
redis分布式锁的原理:setnx,同一时刻只能设置成功一个
前提,锁的key是一定的,value可以变
没获取到锁阻塞或者sleep一会
设置好了锁,玩意服务出现宕机,没有执行删除锁逻辑,这就造成了死锁
解决:设置过期时间
业务还没执行完锁就过期了,别人拿到锁,自己执行完去删了别人的锁
解决:锁续期(redisson有看门狗)。删锁的时候明确是自己的锁。如uuid
判断uuid对了,但是将要删除的时候锁过期了,别人设置了新值,那删除了别人
的锁
解决:删除锁必须保证原子性(保证判断和删锁是原子的)。使用redis+Lua脚本
完成,脚本是原子的
public class CategoryServiceImpl extends ServiceImpl六、使用更专业的分布式锁框架Redissonimplements CategoryService{ @Autowired CategoryBrandRelationService categoryBrandRelationService; @Autowired private StringRedisTemplate redisTemplate; @Override private Map > getCatalogJson(){ // 给缓存中放json字符串,拿出的json字符串,用逆转为能用的对象类型(序列化与反序列化)。 // 1. 加入缓存逻辑,缓存中存的数据是json字符串。 json的优点是:跨语言,跨平台兼容 String catalogJSON = redisTemplate.opsForValue().get("catalogJSON"); if(StringUtils.isEmpty(catalogJSON )){ // 2. 缓存中没有,查询数据库 Map > catalogJsonFromDb = getCatalogJsonFromDbWithRedisLock(); return catalogJsonFromDb; } // 转为指定的对象 Map > result = JSON.parseObject(catalogJSON, new TypeReference >>(){}); return result; } // 从数据库查询并封装分类数据 private Map > getCatalogJsonFromDbWithRedisLock(){ // 1) 占分布式锁 String uuid = UUID.randomUUID().toString(); Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", uuid, 300, TimeUnit.SECONDS); if(lock){ // 加锁成功 // 2) 设置过期时间,必须和加锁是同步的,原子性的 // redisTemplate.expire("lock", 30, TimeUnit.SECONDS); Map > dataFromDb; try{ dataFromDb = getDataFromDb(); }finally{ // get与delete的原子操作 String script = "if redis.call("get",KEYS[1]) == ARGV[1] thenn" + " return redis.call("del",KEYS[1])n" + "elsen" + " return 0n" + "end"; Long lockValue = stringRedisTemplate.execute( new DefaultRedisScript (script, Long.class), // 脚本和返回类型 Arrays.asList("lock"), // 参数 uuid); // 参数值,锁的值 } // 获取值对比 + 对比成功删除 = 原子操作,因此采用lua脚本 return dataFromDb; }else{ // 加锁失败 // 休眠 100ms try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } return getCatalogJsonFromDbWithRedisLock(); // 自旋的方式 } return getDataFromDb(); } private Map > getDataFromDb(){ // 双重检测,得到锁后,应该再去缓存中确认一次,如果没有才需要继续查询 String catelogJSON = redisTemplate.opsForValue().get("catalogJSON"); if(!StringUtils.isEmpty(catelogJSON)){ // 如果不为null,直接返回 Map > result = JSON.parseObject(catelogJSON, new TypeReference >>(){}); return result; } List categoryEntities = this.list(); //查出所有一级分类 List level1Categories = getCategoryByParentCid(categoryEntities, 0L); Map > listMap = level1Categories.stream().collect(Collectors.toMap(k->k.getCatId().toString(), v -> { //遍历查找出二级分类 List level2Categories = getCategoryByParentCid(categoryEntities, v.getCatId()); List catalog2Vos=null; if (level2Categories!=null){ //封装二级分类到vo并且查出其中的三级分类 catalog2Vos = level2Categories.stream().map(cat -> { //遍历查出三级分类并封装 List level3Catagories = getCategoryByParentCid(categoryEntities, cat.getCatId()); List catalog3Vos = null; if (level3Catagories != null) { catalog3Vos = level3Catagories.stream() .map(level3 -> new Catalog2Vo.Catalog3Vo(level3.getParentCid().toString(), level3.getCatId().toString(), level3.getName())) .collect(Collectors.toList()); } Catalog2Vo catalog2Vo = new Catalog2Vo(v.getCatId().toString(), cat.getCatId().toString(), cat.getName(), catalog3Vos); return catalog2Vo; }).collect(Collectors.toList()); } return catalog2Vos; })); // 3. 查到的数据再放入缓存,将对象转为json放入缓存中 // 使用alibaba的fastjson包,可以将任意对象转换为json字符串 String s = JSON.toJSONString(listMap); redisTemplate.opsForValue().set("catalogJSON", s, 1, TimeUnit.DAYS); return listMap; } }
Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务。其中包括(BitSet, Set, Multimap, SortedSet, Map, List, Queue, BlockingQueue, Deque, BlockingDeque, Semaphore, Lock, AtomicLong, CountDownLatch, Publish / Subscribe, Bloom filter, Remote service, Spring cache, Executor service, Live Object service, Scheduler service) Redisson提供了使用Redis的最简单和最便捷的方法。Redisson的宗旨是促进使用者对Redis的关注分离(Separation of Concern),从而让使用者能够将精力更集中地放在处理业务逻辑上。
1. 环境搭建 1)pom.xml导入依赖2)yaml文件配置org.redisson redisson-spring-boot-starter 3.15.6
application.yml
spring:
application:
name: springboot-redisson
redis:
redisson:
file: classpath:redisson.yml
redisson.yml
# 单节点配置 singleServerConfig: # 连接空闲超时,单位:毫秒 idleConnectionTimeout: 10000 # 连接超时,单位:毫秒 connectTimeout: 10000 # 命令等待超时,单位:毫秒 timeout: 3000 # 命令失败重试次数,如果尝试达到 retryAttempts(命令失败重试次数) 仍然不能将命令发送至某个指定的节点时,将抛出错误。 # 如果尝试在此限制之内发送成功,则开始启用 timeout(命令等待超时) 计时。 retryAttempts: 3 # 命令重试发送时间间隔,单位:毫秒 retryInterval: 1500 # # 重新连接时间间隔,单位:毫秒 # reconnectionTimeout: 3000 # # 执行失败最大次数 # failedAttempts: 3 # 密码 password: 1234 # 单个连接最大订阅数量 subscriptionsPerConnection: 5 # 客户端名称 clientName: null # # 节点地址 address: "redis://127.0.0.1:6379" # 发布和订阅连接的最小空闲连接数 subscriptionConnectionMinimumIdleSize: 1 # 发布和订阅连接池大小 subscriptionConnectionPoolSize: 50 # 最小空闲连接数 connectionMinimumIdleSize: 500 # 连接池大小 connectionPoolSize: 1000 # 数据库编号 database: 0 # DNS监测时间间隔,单位:毫秒 dnsMonitoringInterval: 5000 # 线程池数量,默认值: 当前处理核数量 * 2 threads: 16 # Netty线程池数量,默认值: 当前处理核数量 * 2 nettyThreads: 32 # 编码,不使用默认编码,因为set进去之后是乱码 #codec: !3)测试{} # 传输模式 transportMode : "NIO"
import org.junit.jupiter.api.Test;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
class SpringbootRedissonApplicationTests {
@Autowired
private RedissonClient redissonClient;
@Test
void contextLoads() {
redissonClient.getBucket("hello").set("bug");
String test = (String) redissonClient.getBucket("hello").get();
System.out.println(test);
}
}
具体使用方式可以查看官网,有详细的说明
2. 相关知识- 锁的自动续期,如果业务时间超长,运行期间自动给锁续上新的30s,不用担心业务时间长,锁自动过期被删掉。
- 加锁的业务只要运行完成,就不会给当前锁续期,即使不手动解锁,锁默认在30s以后自动删除。
- 如果传递了锁的超时时间,就发送给redis执行脚本,进行占领,默认超时就是指定的时间。
- 如果未指定锁的超时时间,就使用30 * 1000 (LockWatchdogTimeout看门狗的默认时间)。
- 只要占锁成功,就会启动一个定时任务(重新给锁设置过期时间,新的过期时间就是看门狗的默认时间)。



