栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

SpringBoot2整合Redis从入门到进阶

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

SpringBoot2整合Redis从入门到进阶

首先需要安装Redis,如何安装可以看我的这篇文章
接下来说明如何使用,以及一些Redis的相关知识。

一、缓存

哪些数据适合放入缓存?

  • 即时性、数据一致性要求不高
  • 访问量大且更新频率不高的数据(读多,写少)

本地缓存:和微服务同一个进程。缺点:分布式时本地缓存不能共享
分布式缓存:缓存中间件

二、使用 1. pom.xml文件中导入依赖

    org.springframework.boot
    spring-boot-starter-data-redis

2. application.yml配置redis

配置redis主机地址

spring:
  redis:
    host: xxx(你的ip)
    port: 6379
3. 自动注入RedisTemplate
public class RedisTests{
	@Autowired
	StringRedisTemplate stringRedisTemplate;

	public void testStringRedisTemplate(){
		ValueOperations ops = stringRedisTemplate.opsForValue();
		// 保存
		ops.set("hello", "world_" + UUID.randomUUID().toString());
		// 查询
		String hello = ops.get("hello");
		System.out.println(hello);
	}
}
三、改造实例
public class CategoryServiceImpl extends ServiceImpl implements CategoryService{
	@Autowired
	CategoryBrandRelationService categoryBrandRelationService;
	@Autowired
	private StringRedisTemplate redisTemplate;
	
	@Override
	private Map> getCatalogJson(){
		// 给缓存中放json字符串,拿出的json字符串,用逆转为能用的对象类型(序列化与反序列化)。
		// 1. 加入缓存逻辑,缓存中存的数据是json字符串。 json的优点是:跨语言,跨平台兼容
		String catalogJSON = redisTemplate.opsForValue().get("catalogJSON");
		if(StringUtils.isEmpty(catalogJSON )){
			// 2. 缓存中没有,查询数据库
			Map> catalogJsonFromDb = getCatalogJsonFromDb();
			// 3. 查到的数据再放入缓存,将对象转为json放入缓存中
			// 使用alibaba的fastjson包,可以将任意对象转换为json字符串
			String s = JSON.toJSONString(catalogJsonFromDb);
			redisTemplate.opsForValue().set("catalogJSON", s);
			return catalogJsonFromDb;
		}
		// 转为指定的对象
		Map> result = JSON.parseObject(catalogJSON, new TypeReference>>(){});
		return result;
	}
	
	// 从数据库查询并封装分类数据
	private Map> getCatalogJsonFromDb(){
		List categoryEntities = this.list();
        //查出所有一级分类
    	List level1Categories = getCategoryByParentCid(categoryEntities, 0L);
    	Map> listMap = level1Categories.stream().collect(Collectors.toMap(k->k.getCatId().toString(), v -> {
            //遍历查找出二级分类
        List level2Categories = getCategoryByParentCid(categoryEntities, v.getCatId());
        List catalog2Vos=null;
        if (level2Categories!=null){
             //封装二级分类到vo并且查出其中的三级分类
             catalog2Vos = level2Categories.stream().map(cat -> {
                //遍历查出三级分类并封装
                List level3Catagories = getCategoryByParentCid(categoryEntities, cat.getCatId());
                List catalog3Vos = null;
                if (level3Catagories != null) {
                    catalog3Vos = level3Catagories.stream()
                             .map(level3 -> new Catalog2Vo.Catalog3Vo(level3.getParentCid().toString(), level3.getCatId().toString(), level3.getName()))
                            .collect(Collectors.toList());
                }
                Catalog2Vo catalog2Vo = new Catalog2Vo(v.getCatId().toString(), cat.getCatId().toString(), cat.getName(), catalog3Vos);
                return catalog2Vo;
            }).collect(Collectors.toList());
        }
        return catalog2Vos;
        }));
        return listMap;
	}
}
四、缓存失效问题

1. 缓存穿透

缓存穿透:查询一个一定不存在的数据,由于缓存是不命中的,将去查询数据库,但是数据库也无此记录,没有将这次查询的null写入缓存,这将导致这个不存在的数据每次请求都要到存储层去查询,失去了缓存的意义。

风险:利用不存在的数据进行攻击,数据库瞬间压力增大,最终导致崩溃。

解决:缓存空对象、布隆过滤器、mvc拦截器

2. 缓存雪崩

缓存雪崩是指在我们设置缓存时key采用了相同的过期时间,导致缓存在某一时刻
同时失效,请求全部转发到DB,DB瞬时压力过重雪崩。

解决方案:
规避雪崩:缓存数据的过期时间设置随机,防止同一时间大量数据过期现象发生。
如果缓存数据库是分布式部署,将热点数据均匀分布在不同缓存数据库中。
设置热点数据永远不过期。
出现雪崩:降级 熔断
事前:尽量保证整个 redis 集群的高可用性,发现机器宕机尽快补上。选择合适的内存淘汰策略。
事中:本地ehcache缓存 + hystrix限流&降级,避免MySQL崩掉
事后:利用 redis 持久化机制保存的数据尽快恢复缓存

3. 缓存击穿

缓存雪崩和缓存击穿不同的是:

缓存击穿 指 并发查同一条数据。缓存击穿是指缓存中没有但数据库中有的数据
(一般是缓存时间到期),这时由于并发用户特别多,同时读缓存没读到数据,
又同时去数据库去取数据,引起数据库压力瞬间增大,造成过大压力
缓存雪崩是不同数据都过期了,很多数据都查不到从而查数据库。
解决方案:

设置热点数据永远不过期。
加互斥锁:业界比较常用的做法,是使用mutex。简单地来说,就是在缓存失效
的时候(判断拿出来的值为空),不是立即去load db去数据库加载,而是先使用
缓存工具的某些带成功操作返回值的操作(比如Redis的SETNX或者Memcache
的ADD)去set一个mutex key,当操作返回成功时,再进行load db的操作并回设
缓存;否则,就重试整个get缓存的方法。

缓存击穿:加锁
不好的方法是synchronized(this),肯定不能这么写 ,不具体写了

锁时序问题:之前的逻辑是查缓存没有,然后取竞争锁查数据库,这样就造成多
次查数据库。

解决方法:竞争到锁后,再次确认缓存中没有,再去查数据库。

五、修改代码,解决缓存失效问题 1. 非分布式解决方案
public class CategoryServiceImpl extends ServiceImpl implements CategoryService{
	@Autowired
	CategoryBrandRelationService categoryBrandRelationService;
	@Autowired
	private StringRedisTemplate redisTemplate;
	
	@Override
	private Map> getCatalogJson(){
		// 给缓存中放json字符串,拿出的json字符串,用逆转为能用的对象类型(序列化与反序列化)。
		
		// 1. 加入缓存逻辑,缓存中存的数据是json字符串。 json的优点是:跨语言,跨平台兼容
		String catalogJSON = redisTemplate.opsForValue().get("catalogJSON");
		if(StringUtils.isEmpty(catalogJSON )){
			// 2. 缓存中没有,查询数据库
			Map> catalogJsonFromDb = getCatalogJsonFromDb();
			return catalogJsonFromDb;
		}
		// 转为指定的对象
		Map> result = JSON.parseObject(catalogJSON, new TypeReference>>(){});
		return result;
	}
	
	// 从数据库查询并封装分类数据
	private Map> getCatalogJsonFromDb(){
		// 在非分布式的场景下,只要是同一把锁,就能锁住需要这个锁的所有线程
		// synchronized(this): SpringBoot所有的组件在容器中都是单例的。
		synchronized(this){
			// 双重检测,得到锁后,应该再去缓存中确认一次,如果没有才需要继续查询
			String catalogJSON = redisTemplate.opsForValue().get("catalogJSON");
			if(!StringUtils.isEmpty(catalogJSON)){
				// 如果不为null,直接返回
				Map> result = JSON.parseObject(catalogJSON, new TypeReference>>(){});
				return result;
			}
			List categoryEntities = this.list();
	        //查出所有一级分类
	        List level1Categories = getCategoryByParentCid(categoryEntities, 0L);
	        Map> listMap = level1Categories.stream().collect(Collectors.toMap(k->k.getCatId().toString(), v -> {
	            //遍历查找出二级分类
	            List level2Categories = getCategoryByParentCid(categoryEntities, v.getCatId());
	            List catalog2Vos=null;
	            if (level2Categories!=null){
	                //封装二级分类到vo并且查出其中的三级分类
	                catalog2Vos = level2Categories.stream().map(cat -> {
	                    //遍历查出三级分类并封装
	                    List level3Catagories = getCategoryByParentCid(categoryEntities, cat.getCatId());
	                    List catalog3Vos = null;
	                    if (level3Catagories != null) {
	                        catalog3Vos = level3Catagories.stream()
	                                .map(level3 -> new Catalog2Vo.Catalog3Vo(level3.getParentCid().toString(), level3.getCatId().toString(), level3.getName()))
	                                .collect(Collectors.toList());
	                    }
	                    Catalog2Vo catalog2Vo = new Catalog2Vo(v.getCatId().toString(), cat.getCatId().toString(), cat.getName(), catalog3Vos);
	                    return catalog2Vo;
	                }).collect(Collectors.toList());
	            }
	            return catalog2Vos;
	        }));
	        // 3. 查到的数据再放入缓存,将对象转为json放入缓存中
			// 使用alibaba的fastjson包,可以将任意对象转换为json字符串
			String s = JSON.toJSONString(listMap);
			redisTemplate.opsForValue().set("catalogJSON", s, 1, TimeUnit.DAYS);
	        return listMap;
		}
	}
}
2. 分布式解决方案

分布式锁
分布式项目时,但本地锁只能锁住当前服务,需要分布式锁

redis分布式锁的原理:setnx,同一时刻只能设置成功一个
前提,锁的key是一定的,value可以变

没获取到锁阻塞或者sleep一会

设置好了锁,玩意服务出现宕机,没有执行删除锁逻辑,这就造成了死锁

解决:设置过期时间
业务还没执行完锁就过期了,别人拿到锁,自己执行完去删了别人的锁

解决:锁续期(redisson有看门狗)。删锁的时候明确是自己的锁。如uuid
判断uuid对了,但是将要删除的时候锁过期了,别人设置了新值,那删除了别人
的锁

解决:删除锁必须保证原子性(保证判断和删锁是原子的)。使用redis+Lua脚本
完成,脚本是原子的

public class CategoryServiceImpl extends ServiceImpl implements CategoryService{
	@Autowired
	CategoryBrandRelationService categoryBrandRelationService;
	@Autowired
	private StringRedisTemplate redisTemplate;
	
	@Override
	private Map> getCatalogJson(){
		// 给缓存中放json字符串,拿出的json字符串,用逆转为能用的对象类型(序列化与反序列化)。
		
		// 1. 加入缓存逻辑,缓存中存的数据是json字符串。 json的优点是:跨语言,跨平台兼容
		String catalogJSON = redisTemplate.opsForValue().get("catalogJSON");
		if(StringUtils.isEmpty(catalogJSON )){
			// 2. 缓存中没有,查询数据库
			Map> catalogJsonFromDb = getCatalogJsonFromDbWithRedisLock();
			return catalogJsonFromDb;
		}
		// 转为指定的对象
		Map> result = JSON.parseObject(catalogJSON, new TypeReference>>(){});
		return result;
	}
	
	// 从数据库查询并封装分类数据
	private Map> getCatalogJsonFromDbWithRedisLock(){
		// 1) 占分布式锁
		String uuid = UUID.randomUUID().toString();
		Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", uuid, 300, TimeUnit.SECONDS);
		if(lock){
			// 加锁成功
			// 2) 设置过期时间,必须和加锁是同步的,原子性的
			// redisTemplate.expire("lock", 30, TimeUnit.SECONDS);
			Map> dataFromDb;
			try{
				dataFromDb = getDataFromDb();
			}finally{
				// get与delete的原子操作
				String script = "if redis.call("get",KEYS[1]) == ARGV[1] thenn" +
	            "    return redis.call("del",KEYS[1])n" +
	            "elsen" +
	            "    return 0n" +
	            "end";
	            Long lockValue = stringRedisTemplate.execute(
		            new DefaultRedisScript(script, Long.class), // 脚本和返回类型
		            Arrays.asList("lock"), // 参数
		            uuid); // 参数值,锁的值
			}
			// 获取值对比 + 对比成功删除 = 原子操作,因此采用lua脚本
			
			return dataFromDb;
		}else{
			// 加锁失败
			// 休眠 100ms
			try {
            	Thread.sleep(100);
	        } catch (InterruptedException e) {
	            e.printStackTrace();
	        }
			return getCatalogJsonFromDbWithRedisLock(); // 自旋的方式
		}
		return getDataFromDb();
	}

	private Map> getDataFromDb(){
		// 双重检测,得到锁后,应该再去缓存中确认一次,如果没有才需要继续查询
		String catelogJSON = redisTemplate.opsForValue().get("catalogJSON");
		if(!StringUtils.isEmpty(catelogJSON)){
			// 如果不为null,直接返回
			Map> result = JSON.parseObject(catelogJSON, new TypeReference>>(){});
			return result;
		}
		List categoryEntities = this.list();
	    //查出所有一级分类
		List level1Categories = getCategoryByParentCid(categoryEntities, 0L);
		Map> listMap = level1Categories.stream().collect(Collectors.toMap(k->k.getCatId().toString(), v -> {
	            //遍历查找出二级分类
			List level2Categories = getCategoryByParentCid(categoryEntities, v.getCatId());
			List catalog2Vos=null;
			if (level2Categories!=null){
				//封装二级分类到vo并且查出其中的三级分类
				catalog2Vos = level2Categories.stream().map(cat -> {
					//遍历查出三级分类并封装
					List level3Catagories = getCategoryByParentCid(categoryEntities, cat.getCatId());
					List catalog3Vos = null;
					if (level3Catagories != null) {
						catalog3Vos = level3Catagories.stream()
								.map(level3 -> new Catalog2Vo.Catalog3Vo(level3.getParentCid().toString(), level3.getCatId().toString(), level3.getName()))
								.collect(Collectors.toList());
		            }
		            Catalog2Vo catalog2Vo = new Catalog2Vo(v.getCatId().toString(), cat.getCatId().toString(), cat.getName(), catalog3Vos);
		            return catalog2Vo;
	            }).collect(Collectors.toList());
			}
			return catalog2Vos;
		}));
	    // 3. 查到的数据再放入缓存,将对象转为json放入缓存中
		// 使用alibaba的fastjson包,可以将任意对象转换为json字符串
		String s = JSON.toJSONString(listMap);
		redisTemplate.opsForValue().set("catalogJSON", s, 1, TimeUnit.DAYS);
	    return listMap;	
	}
}
六、使用更专业的分布式锁框架Redisson

Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务。其中包括(BitSet, Set, Multimap, SortedSet, Map, List, Queue, BlockingQueue, Deque, BlockingDeque, Semaphore, Lock, AtomicLong, CountDownLatch, Publish / Subscribe, Bloom filter, Remote service, Spring cache, Executor service, Live Object service, Scheduler service) Redisson提供了使用Redis的最简单和最便捷的方法。Redisson的宗旨是促进使用者对Redis的关注分离(Separation of Concern),从而让使用者能够将精力更集中地放在处理业务逻辑上。

1. 环境搭建 1)pom.xml导入依赖

    org.redisson
    redisson-spring-boot-starter
    3.15.6

2)yaml文件配置

application.yml

spring:
  application:
    name: springboot-redisson
  redis:
    redisson:
      file: classpath:redisson.yml

redisson.yml

# 单节点配置
singleServerConfig:
  # 连接空闲超时,单位:毫秒
  idleConnectionTimeout: 10000
  # 连接超时,单位:毫秒
  connectTimeout: 10000
  # 命令等待超时,单位:毫秒
  timeout: 3000
  # 命令失败重试次数,如果尝试达到 retryAttempts(命令失败重试次数) 仍然不能将命令发送至某个指定的节点时,将抛出错误。
  # 如果尝试在此限制之内发送成功,则开始启用 timeout(命令等待超时) 计时。
  retryAttempts: 3
  # 命令重试发送时间间隔,单位:毫秒
  retryInterval: 1500
  #  # 重新连接时间间隔,单位:毫秒
  #  reconnectionTimeout: 3000
  #  # 执行失败最大次数
  #  failedAttempts: 3
  # 密码
  password: 1234
  # 单个连接最大订阅数量
  subscriptionsPerConnection: 5
  # 客户端名称
  clientName: null
  #  # 节点地址
  address: "redis://127.0.0.1:6379"
  # 发布和订阅连接的最小空闲连接数
  subscriptionConnectionMinimumIdleSize: 1
  # 发布和订阅连接池大小
  subscriptionConnectionPoolSize: 50
  # 最小空闲连接数
  connectionMinimumIdleSize: 500
  # 连接池大小
  connectionPoolSize: 1000
  # 数据库编号
  database: 0
  # DNS监测时间间隔,单位:毫秒
  dnsMonitoringInterval: 5000
# 线程池数量,默认值: 当前处理核数量 * 2
threads: 16
# Netty线程池数量,默认值: 当前处理核数量 * 2
nettyThreads: 32
# 编码,不使用默认编码,因为set进去之后是乱码
#codec: ! {}
# 传输模式
transportMode : "NIO"
3)测试
import org.junit.jupiter.api.Test;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
class SpringbootRedissonApplicationTests {

    @Autowired
    private RedissonClient redissonClient;

    @Test
    void contextLoads() {
        redissonClient.getBucket("hello").set("bug");
       	String test = (String) redissonClient.getBucket("hello").get();
        System.out.println(test);
    }
}

具体使用方式可以查看官网,有详细的说明

2. 相关知识
  1. 锁的自动续期,如果业务时间超长,运行期间自动给锁续上新的30s,不用担心业务时间长,锁自动过期被删掉。
  2. 加锁的业务只要运行完成,就不会给当前锁续期,即使不手动解锁,锁默认在30s以后自动删除。
  3. 如果传递了锁的超时时间,就发送给redis执行脚本,进行占领,默认超时就是指定的时间。
  4. 如果未指定锁的超时时间,就使用30 * 1000 (LockWatchdogTimeout看门狗的默认时间)。
  5. 只要占锁成功,就会启动一个定时任务(重新给锁设置过期时间,新的过期时间就是看门狗的默认时间)。
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/854462.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号