一、Redis的事务二、Java连接Redis:Jedis
2.1 连接2.2 事务 三、SpringBoot整合Redis
3.1 导入依赖3.2 配置连接3.3 创建RedisConfig3.4 RedisUtil:封装RedisConfig 四、redis.conf文件五、Redis的持久化
5.1 RDB(默认)
流程触发方式恢复rdb文件持久化设置优缺点 5.2 AOF
修复AOF文件AOF重写持久化设置优缺点 六、Redis发布订阅七、Redis的主从复制
命令行配置主机写从机读Redis主从同步策略 八、哨兵模式
模式实现 九、缓存穿透、击穿以及雪崩
9.1 缓存穿透9.2 缓存击穿9.3 缓存雪崩
前言
本文章只是简单的记录,仅是入门的文章。
学习视频:【狂神说Java】Redis最新超详细版教程通俗易懂
截图均来来自于视频。文中还穿插着其他大佬的文章。
最后,感谢大佬们的分享。★,°:.☆( ̄▽ ̄)/$:.°★ 。
一、Redis的事务
Redis事务:一系列命令的集合,一个事务中所有的命令都会被序列化,在事务执行的过程中顺序执行
Redis的事务没有隔离级别的概念
Redis的事务
开启事务(multi)命令入队执行事务(exec)
Redis单条命令是原子性的;多条命令不具备原子性
编译型异常(代码出错,命令出错),事务中的命令都不会执行
运行时异常(1/0,存在语法错误),事务中的错误命令会不执行,其他命令依旧执行。
什么是乐观锁,什么是悲观锁
Redis测试监控:watch(乐观锁)
注意watch不是监视对象的值,是监视对象有没有被改动,哪怕改动的数据和原来的数据一样,那也是不成功的!!!
如果执行了exec和discard,就不用执行unwatch了
模拟线程正常监控的情况:
模拟事务在监控的时候,对数据进行了改动。
当事务在监控的时候,对数据进行了改动的情况,只需要获取最新的值,再进行改动即可
导入依赖
2.1 连接redis.clients jedis 4.1.1 com.alibaba fastjson 1.2.79
public class TestConn {
public static void main(String[] args) {
// 1. new Jedis
Jedis jedis = new Jedis("127.0.0.1", 6379);
// Jedis所有的命令就是Redis的命令行的命令
// 测试是否连接成功
System.out.println(jedis.ping());
}
}
输出:
正常事务运行
public class TestTransaction {
public static void main(String[] args) {
Jedis jedis = new Jedis("127.0.0.1", 6379);
JSonObject jsonObject = new JSONObject();
jsonObject.put("hello","world");
jsonObject.put("name","admin");
String string = jsonObject.toJSONString();
// 开启事务
Transaction multi = jedis.multi();
try {
// 设置命令
multi.set("user01",string);
multi.set("user02",string);
// 执行命令
multi.exec();
} catch (Exception e) {
// 放弃事务
multi.discard();
e.printStackTrace();
}finally {
System.out.println(jedis.get("user01"));
System.out.println(jedis.get("user02"));
// 关闭连接
jedis.close();
}
}
}
输出:
运行异常的事务处理
public class TestTransaction {
public static void main(String[] args) {
Jedis jedis = new Jedis("127.0.0.1", 6379);
jedis.flushDB();
JSonObject jsonObject = new JSONObject();
jsonObject.put("hello","world");
jsonObject.put("name","admin");
String string = jsonObject.toJSONString();
// 开启事务
Transaction multi = jedis.multi();
try {
// 设置命令
multi.set("user01",string);
multi.set("user02",string);
int i = 1/0;
// 执行命令
multi.exec();
} catch (Exception e) {
// 放弃事务
// 运行时异常,但是catch里面放弃了事物
multi.discard();
e.printStackTrace();
}finally {
System.out.println(jedis.get("user01"));
System.out.println(jedis.get("user02"));
// 关闭连接
jedis.close();
}
}
}
三、SpringBoot整合Redis
前言
点击查看Redis依赖
org.springframework.boot spring-boot-starter-data-redis
在SpringBoot2.0.x之后,原来的Jedis替换为Lettuce
Jedis:使用的直连,多个线程操作的话,是不安全的,如果想要避免不安全的,使用jedis pool 连接池!更像 BIO 模式
Lettuce:采用的是netty,实例可以在多个线程中共享,减少线程数量。更像 NIO 模式
SpringBoot为我们自动装配了Redis。如下图可以搜索到默认装配的配置:
点击 RedisAutoConfiguration
@Configuration(
proxyBeanMethods = false
)
@ConditionalOnClass({RedisOperations.class})
@EnableConfigurationProperties({RedisProperties.class}) // 自动装配了RedisProperties.class文件。
@import({LettuceConnectionConfiguration.class, JedisConnectionConfiguration.class})
public class RedisAutoConfiguration {
public RedisAutoConfiguration() {
}
@Bean
@ConditionalOnMissingBean( name = {"redisTemplate"} )
@ConditionalOnSingleCandidate(RedisConnectionFactory.class)
public RedisTemplate 点击RedisProperties,会看到Redis的默认数据库、主机、端口号
@ConfigurationProperties(prefix = "spring.redis")
public class RedisProperties {
private int database = 0;
private String url;
private String host = "localhost";
private String username;
private String password;
private int port = 6379;
private boolean ssl;
3.1 导入依赖 3.2 配置连接org.springframework.boot spring-boot-starter-data-redis
spring:
redis:
host: 127.0.0.1
port: 6379
测试
@SpringBootTest
class Redis02SpringbootApplicationTest {
@Autowired
private RedisTemplate redisTemplate;
@Test
void contextLoads(){
redisTemplate.opsForValue().set("myKey01","helen");
System.out.println(redisTemplate.opsForValue().get("myKey01"));
redisTemplate.opsForValue().set("myKey02","海伦娜");
System.out.println(redisTemplate.opsForValue().get("myKey02"));
}
}
控制台打印:
在Redis存储的 key 与 Value (乱码:需要进行序列化)
解决乱码
点击 RedisTemplate ,会看到有好几种序列化。
会看到上述的几种,都默认使用的是JDK的序列化
此时,我们都使用的是 SpringBoot 自己整合的 RedisTemplate :会出现存储的时候乱码。
因此,我们需要自己去实现
3.3 创建RedisConfig@Configuration
public class RedisConfig {
@Bean
@SuppressWarnings("all")
public RedisTemplate redisTemplate(RedisConnectionFactory factory) {
RedisTemplate template = new RedisTemplate();
template.setConnectionFactory(factory);
Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<>(Object.class);
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
// objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
objectMapper.activateDefaultTyping(
LaissezFaireSubTypevalidator.instance,
ObjectMapper.DefaultTyping.NON_FINAL,
JsonTypeInfo.As.WRAPPER_ARRAY);
jackson2JsonRedisSerializer.setObjectMapper(objectMapper);
StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
template.setKeySerializer(stringRedisSerializer);
template.setHashKeySerializer(stringRedisSerializer);
template.setValueSerializer(jackson2JsonRedisSerializer);
template.setHashValueSerializer(jackson2JsonRedisSerializer);
template.afterPropertiesSet();
return template;
}
}
此时,再运行测试类,查看在Redis里面的存储:
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
@Component
public class RedisUtil {
@Autowired
private RedisTemplate redisTemplate;
// =============================================================== common
public boolean expire(String key, long time) {
try {
if (time > 0) {
redisTemplate.expire(key, time, TimeUnit.SECONDS);
}
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
} finally {
}
}
public long getExpire(String key) {
return redisTemplate.getExpire(key, TimeUnit.SECONDS);
}
public boolean hasKey(String key) {
try {
return redisTemplate.hasKey(key);
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
public void del(String... key) {
if (key != null && key.length > 0) {
if (key.length == 1) {
redisTemplate.delete(key[0]);
} else {
redisTemplate.delete((Collection) CollectionUtils.arrayToList(key));
}
}
}
// =============================================================== String
public Object get(String key) {
return key == null ? null : redisTemplate.opsForValue().get(key);
}
public boolean set(String key, Object value) {
try {
redisTemplate.opsForValue().set(key, value);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
public boolean set(String key, Object value, long time) {
try {
if (time > 0) {
redisTemplate.opsForValue().set(key, value, time, TimeUnit.SECONDS);
} else {
redisTemplate.opsForValue().set(key, value);
}
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
public long incr(String key, long delta) {
if (delta < 0) {
throw new RuntimeException("The increment factor must be greater than 0");
}
return redisTemplate.opsForValue().increment(key, delta);
}
public long decr(String key, long delta) {
if (delta < 0) {
throw new RuntimeException("The decrement factor must be greater than 0");
}
return redisTemplate.opsForValue().decrement(key, delta);
}
// ======================================================================== Map
public Object hget(String key, String item) {
if (key.isEmpty() || item.isEmpty()) {
throw new RuntimeException("key or item Can't be empty ");
}
return redisTemplate.opsForHash().get(key, item);
}
public Map hmget(String key) {
return redisTemplate.opsForHash().entries(key);
}
public boolean hmset(String key, Map map) {
try {
redisTemplate.opsForHash().putAll(key, map);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
public boolean hset(String key, String item, Object value) {
try {
redisTemplate.opsForHash().put(key, item, value);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
public void hdel(String key, Object... item) {
redisTemplate.opsForHash().delete(key, item);
}
public boolean hHashKey(String key, String item) {
return redisTemplate.opsForHash().hasKey(key, item);
}
public double hincr(String key, String item, double by) {
return redisTemplate.opsForHash().increment(key, item, by);
}
public double hdecr(String key, String item, double by) {
return redisTemplate.opsForHash().increment(key, item, -by);
}
// =============================================================== set
public Set sGet(String key) {
try {
return redisTemplate.opsForSet().members(key);
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
public boolean sHasKey(String key, Object value) {
try {
return redisTemplate.opsForSet().isMember(key, value);
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
public long sSet(String key, Object... values) {
try {
return redisTemplate.opsForSet().add(key, values);
} catch (Exception e) {
e.printStackTrace();
return 0;
}
}
public long sSetAndTime(String key, long time, Object... values) {
try {
Long count = redisTemplate.opsForSet().add(key, values);
if (time > 0) {
expire(key, time);
}
return count;
} catch (Exception e) {
e.printStackTrace();
return 0;
}
}
public long sGetSetSize(String key) {
try {
return redisTemplate.opsForSet().size(key);
} catch (Exception e) {
e.printStackTrace();
return 0;
}
}
public long setRemove(String key, Object... values) {
try {
Long count = redisTemplate.opsForSet().remove(key, values);
return count;
} catch (Exception e) {
e.printStackTrace();
return 0;
}
}
// ================================================================== List
public List lGet(String key, long start, long end) {
try {
return redisTemplate.opsForList().range(key, start, end);
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
public long lGetListSize(String key) {
try {
return redisTemplate.opsForList().size(key);
} catch (Exception e) {
e.printStackTrace();
return 0;
}
}
public Object lGetIndex(String key, long index) {
try {
return redisTemplate.opsForList().index(key, index);
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
public boolean lSet(String key, Object value) {
try {
redisTemplate.opsForList().rightPush(key, value);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
public boolean lSet(String key, Object value, long time) {
try {
redisTemplate.opsForList().rightPush(key, value);
if (time > 0) {
expire(key, time);
}
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
public boolean lSet(String key, List value) {
try {
redisTemplate.opsForList().rightPushAll(key, value);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
public boolean lSet(String key, List value, long time) {
try {
redisTemplate.opsForList().rightPushAll(key, value);
if (time > 0) {
expire(key, time);
}
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
public boolean lUpdateIndex(String key, long index, Object value) {
try {
redisTemplate.opsForList().set(key, index, value);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
public long lRemove(String key, long count, Object value) {
try {
Long remove = redisTemplate.opsForList().remove(key, count, value);
return remove;
} catch (Exception e) {
e.printStackTrace();
return 0;
}
}
}
测试 RedisUtil 是否封装成功
@Test
void testRedisUtil(){
redisUtil.set("name","helen");
System.out.println(redisUtil.get("name")); // helen
}
四、redis.conf文件 文章:Redis:默认配置文件redis.conf详解 —— 从未被超越
视频:Redis配置文件详解 —— 狂神
5.1 RDB(默认)文章推荐
Redis 持久化
RDB持久化是把当前进程数据生成快照保存到磁盘上的过程
生成的 rdb 文件的名称以及位置由 redis.conf 文件的 dbfilename 以及 dir 指定。默认是dump.rdb
流程redis客户端执行bgsave命令或者自动触发bgsave命令;
主进程判断当前是否已经存在正在执行的子进程,如果存在,那么主进程直接返回;
如果不存在正在执行的子进程,那么就fork一个新的子进程进行持久化数据,fork过程是阻塞的,fork操作完成后主进程即可执行其他操作;
子进程先将数据写入到临时的rdb文件中,待快照数据写入完成后再原子替换旧的rdb文件;
同时发送信号给主进程,通知主进程rdb持久化完成,主进程更新相关的统计信息(info Persitence下的rdb_*相关选项)。
手动触发
save 命令使用主进程去持久化,会阻塞Redis服务器进程,直到RDB文件创建完毕为止。在此期间服务器不能处理任何命令的请求 ;
bgsave 命令是fork一个子进程,使用子进程去进行持久化,主进程只有在fork子进程时会短暂阻塞,fork操作完成后就不再阻塞,主进程可以正常进行其他操作。
save 、 flushall
自动触发
redis.conf中配置save m n,即在m秒内有n次修改时,自动触发bgsave生成rdb文件;
恢复rdb文件只需要将rdb文件放置在redis的启动目录之下,redis启动就可以检查dump.rdb,恢复数据
查看文件存在的位置
Redis的配置文件,可以看到下面的配置信息:
save 900 1 #在900秒(15分钟)之后,如果至少有1个key发生变化,则dump内存快照。 save 300 10 #在300秒(5分钟)之后,如果至少有10个key发生变化,则dump内存快照。 save 60 10000 #在60秒(1分钟)之后,如果至少有10000个key发生变化,则dump内存快照。优缺点
优点
适合大范围的数据恢复。你可能打算每个小时归档一次最近24小时的数据,同时还要每天归档一次最近30天的数据。通过这样的备份策略,一旦系统出现灾难性故障,我们可以非常容易的进行恢复。性能最大化。对于Redis的服务进程而言,在开始持久化时,它唯一需要做的只是fork出子进程,之后再由子进程完成这些持久化的工作,这样就可以极大的避免服务进程执行IO操作了。
缺点
可能会有数据丢失。因为系统一旦在定时持久化之前出现宕机现象,此前没有来得及写入磁盘的数据都将丢失。数据集较大时,恢复不友好。如果数据文件特别大,可能会导致对客户端提供的服务暂停数毫秒,或者甚至数秒。 5.2 AOF
AOF(append only file)持久化(原理是将Reids的操作日志以追加的方式写入文件)。默认是 appendonly.aof 文件。
需要在redis.conf文件中修改。
修复AOF文件appendonly.aof 文件若是遭到破坏。使用redis-check-aof --fix,进行修复。
修复之后,遭到破坏的数据会消失。
目的:解决AOF文件体积膨胀的问题,Redis 提供了AOF文件重写( rewrite)功能。
AOF的重写实现
读取库中现在的键值对,用一条命令去记录,解决了文件体积过大。
AOF的后台重写
为了防止AOF子进程重写的过程中,客户端又有新的写命令,造成数据不一致的情况。Redis服务器设置了一个AOF重写缓冲区。
该缓冲区在服务器创建子进程后开始使用。当Redis服务器执行完一个写命令之后, 它会同时将这个写命令发送给AOF缓冲区和AOF重写缓冲区。
AOF重写完成后,子进程会给父进程发送一个信号,父进程便会执行如下操作:
将AOF重写缓冲区中所有内容写入到新AOF文件中,此时文件所保存的内容便和服务器数据一致了。
对新AOF文件进行改名,原子地覆盖现有AOF文件,完成新旧两个文件的替换
——文字摘取于文章推荐中的Redis 持久化
在Redis的配置文件中存在三种同步方式,它们分别是:
appendfsync always #每次有数据修改发生时都会写入AOF文件。 appendfsync everysec #每秒钟同步一次,该策略为AOF的缺省策略。 appendfsync no #从不同步。高效但是数据不会被持久化。优缺点
优点
更好的保护数据不丢失,一般AOF会每隔1秒,通过一个后台线程执行一次fsync操作,最多丢失1秒钟的数据。
缺点
对于同一份数据来说,AOF日志文件通常比RDB数据快照文件更大,修复的速度也比rdb慢;运行效率也要比rdb慢,所以我们redis默认的配置就是rdb持久化 六、Redis发布订阅
菜鸟教程:Redis 发布订阅
发布和订阅机制
当一个客户端通过 PUBLISH 命令向订阅者发送信息的时候,我们称这个客户端为发布者(publisher)
而当一个客户端使用 SUBSCRIBE 或者 PSUBSCRIBE 命令接收信息的时候,我们称这个客户端为 订阅者(subscriber)
为了解耦 发布者(publisher) 和 订阅者(subscriber) 之间的关系,Redis 使用了 channel (频道) 作为两者的中介—— 发布者将信息直接发布给 channel ,而 channel 负责将信息发送给适当的订阅者,发布者和订阅者之间没有相互关系,也不知道对方的存在
发布订阅的原理
Redis是使用C实现的,通过分析Redis源码里的pubsub.c文件,了解发布和订阅机制的底层实现,籍此加深对Redis的理解。
Redis通过PUBLISH,SUBSCRIBE和PSUBSCRIBE等命令实现发布和订阅功能。
通过SUBSCRIBE命令订阅某频道后,redis-server里维护了一个字典,字典的键就是一个个channel,而字典的值则是一个链表,链表中保存了所有订阅这个channel的客户端。SUBSCRIBE命令的关键,就是将客户端添加到给定channel的订阅链表中。
通过PUBLISH命令向订阅者发送消息,redis-server会使用给定的频道作为键,在它所维护的channel字典中查找记录了订阅这个频道的所有客户端的链表,遍历这个链表,将消息发布给所有订阅者。
使用场景:
实时消息系统实时聊天(频道当做聊天室,将信息回显给所有人即可)订阅,关注系统都是可以的 七、Redis的主从复制
概念
主从复制,是指将一台Redis服务器的数据,复制到其他的Redis服务器。前者称为主节点(master/leader),后者称为从节点(slave/follower);
只能有一个主节点,可以有多个从节点。
数据的复制是单向的,只能由主节点到从节点。
Master以写为主,Slave 以读为主。
单台Redis的最大使用内存不应该超过20G。当超过时,就可以考虑主从复制了。
命令行配置Redis集群环境搭建 ——狂神说
只需要配置从库即可,因为在默认的情况下,每台Redis服务器都是主节点。
主:6379
从:6380 、6381
复制三个redis.conf配置文件,分别为redis7
9.conf,redis80.conf,redis81.conf
[root@iZ8vb409m8717t5boglt61Z bin]# ls redis-benchmark redis-check-rdb redis-sentinel yueconfig redis-check-aof redis-cli redis-server [root@iZ8vb409m8717t5boglt61Z bin]# cd redisConfig [root@iZ8vb409m8717t5boglt61Z redisConfig]# ls redis.conf [root@iZ8vb409m8717t5boglt61Z redisConfig]# cp redis.conf redis79.conf [root@iZ8vb409m8717t5boglt61Z redisConfig]# cp redis.conf redis80.conf [root@iZ8vb409m8717t5boglt61Z redisConfig]# cp redis.conf redis81.conf [root@iZ8vb409m8717t5boglt61Z redisConfig]# ls redis79.conf redis80.conf redis81.conf redis.conf
使用vim修改三个配置文件
[root@iZ8vb409m8717t5boglt61Z redisConfig]# vim redis79.conf [root@iZ8vb409m8717t5boglt61Z redisConfig]# vim redis80.conf [root@iZ8vb409m8717t5boglt61Z redisConfig]# vim redis81.conf
修改的内容为:
- 端口daemonize 为 yespid 名字log 名字dump.rdb 名字
以不同的配置文件,启动redis server,
[root@iZ8vb409m8717t5boglt61Z bin]# redis-server redisConfig/redis79.conf
查看redis进程信息,查看服务是否启动成功
[root@iZ8vb409m8717t5boglt61Z bin]# ps -ef|grep redis root 25342 1 0 09:35 ? 00:00:03 redis-server 127.0.0.1:6379 root 25526 1 0 10:14 ? 00:00:00 redis-server 127.0.0.1:6380 root 25532 1 0 10:15 ? 00:00:00 redis-server 127.0.0.1:6381 root 25556 25538 0 10:16 pts/3 00:00:00 grep --color=auto redis
开启6380,使用Slaveof 命令连接主节点6379
[root@iZ8vb409m8717t5boglt61Z bin]# redis-cli -p 6380 127.0.0.1:6380> slaveof 127.0.0.1 6379 # 连接6379 OK 127.0.0.1:6380> info replication role:slave #当前角色是从机 master_host:127.0.0.1 master_port:6379
同理,把6381连接到主机6379。之后,查看主机6379的信息。
127.0.0.1:6379> info replication # Replication role:master connected_slaves:2 #多了两个从机 slave0:ip=127.0.0.1,port=6380,state=online,offset=210,lag=0 slave1:ip=127.0.0.1,port=6381,state=online,offset=210,lag=1
真实的从主配置应该在配置文件中配置,使用命令行是临时的
主机可以写,从机只可以读。
主机写从机读主机写
127.0.0.1:6379> keys * (empty list or set) 127.0.0.1:6379> set k1 v1 OK
从机读
127.0.0.1:6380> keys * 1) "k1" 127.0.0.1:6380> get k1 "v1" 127.0.0.1:6380> set k2 v2 (error) READonLY You can't write against a read only replica.Redis主从同步策略
主从刚刚连接的时候,进行全量同步;全同步结束后,进行增量同步。
当然,如果有需要,slave 在任何时候都可以发起全量同步。redis 策略是,无论如何,首先会尝试进行增量同步,如不成功,要求从机进行全量同步。
文章:Redis主从复制原理总结 —— 老虎死了还有狼
八、哨兵模式前言
6380 使用 info replication命令。查看6380仍然是从节点
127.0.0.1:6380> info replication role:slave #当前角色是从机 master_host:127.0.0.1 master_port:6379
假设,主节点6379宕机之后,如何使某个节点(如:6380)变成主节点?
127.0.0.1:6380> SLAVEOF no one # 从节点转变回主节点,原来同步所得的数据集不会被丢弃 127.0.0.1:6380> info replication role:master # 当前为主节点 connected_slaves:1 #有1个从节点 slave0:ip=127.0.0.1,port=6381,state=online,offset=210,lag=0 # 从节点信息
如果,主节点恢复了,又要重新设置。
127.0.0.1:6380> SLAVEOF 127.0.0.1 6379
所以,就引入了哨兵模式。
概述
这里的哨兵有两个作用:
通过发送命令,让Redis服务器返回监控其运行状态,包括主服务器和从服务器。
当哨兵检测到master宕机,会自动将slave切换成master,然后通过发布订阅模式 通知其他的从服务器,修改配置文件,让它们切换主机。
假设主服务器宕机,哨兵1先检测到这个结果,系统并不会马上进行failover(故障转移)过程,仅仅是哨兵1主观的认为主服务器不可用,这个现象成为主观下线。
当后面的哨兵也检测到主服务器不可用,并且数量达到一定值时,那么哨兵之间就会进行一次投票,投票的结果由某一个哨兵发起,进行failover操作。
切换成功后,就会通过发布订阅模式,让各个哨兵把自己监控的从服务器实现切换主机,这个过程称为客观下线。
模式实现新建并编辑sentinel.conf文件
Redis学习之哨兵模式配置文件详解
在自定义的配置文件目录(redisConfig),新建一个sentinel.conf文件。
[root@iZ8vb409m8717t5boglt61Z redisConfig]# vim sentinel.conf
增加的内容如下:
sentinel monitor
sentinel monitor myredis 127.0.0.1 6379 1
告诉sentinel去监听地址为ip:port的一个master,这里的master-name可以自定义,quorum是一个数字,指明当有多少个sentinel认为一个master失效时,master才算真正失效。
一般建议将其设置为 Sentinel 节点的一半加1。
master-name只能包含英文字母,数字,和“.-_”这三个字符需要注意的是master-ip 最好要写真实的ip地址而不要用回环地址(127.0.0.1)。
启动哨兵模式
[root@iZ8vb409m8717t5boglt61Z bin]# redis-sentinel redisConfig/sentinel.conf
如果主机Master断开了,这时候就会从从机中随机选择一个服务器!
如果主机此时回来了,只能归并到新的主机下,老老实实当小弟,做从机,这就是哨兵模式的规则。
九、缓存穿透、击穿以及雪崩 9.1 缓存穿透用户想要查询一个数据,发现redis内存中没有,也就是缓存没有命中,于是向持久层数据库查询。发现也没有,也是本次查询失败。
当用户很多的时候 ,缓存都没有命中(秒杀),于是都去请求了持久层数据库。这回给持久层数据库造成很大的压力,这时候相当于出现了缓存穿透。
解决方案:布隆过滤器
文章:Redis的布隆过滤器
布隆过滤器是一种数据结构,对所有可能查询的参数以hash形式存储,在控制层先进行校验,不符合则丢弃,从而避免了对底层存储系统的查询压力。
缓存空对象:
当存储层不命中后,即使返回的空对象也将其缓存起来,通时会设置一个过期时间,之后在访问这个数据将会从缓存中获取,保护了后端数据源。
产生的问题
缓存需要更多的空间存储更多的键:因为这当中可能会有很多的空值的键;设置了过期时间,存在缓存层和存储层的数据会有段时间的不一致,这对于需要保持一致性的业务会有影响。 9.2 缓存击穿
是指一个key非常热点,在不停的扛着大并发,大并发几种对这一个点进行访问,当这个key在失效的瞬间,持续的大并发就穿破缓存,直接请求数据库。(微博热点:明星出轨)
解决方案
设置热点数据永不过期
从缓存层面来看,没有设置过期时间,所以不会出现热点key过期后产生的问题。
加互斥锁
分布式锁:使用分布式锁,保证对于每个key同时只有一个线程去查询后端服务,其他线程没有获得分布式锁的权限,因此只需要等待即可。这种方式将高并发的压力转移到了分布式锁,因此对分布式锁的考验很大
9.3 缓存雪崩缓存雪崩,是指在某一个时间段,缓存集中过期失效,redis宕机。
产生雪崩的原因之一,比如马上要到双十一,很快会迎来一波抢购,这波商品时间比较集中的放入了缓存,假设缓存一个小时。那么到了凌晨一点钟的时候,这批商品的缓存就都过期了。而对这批商品的访问查询,都落到了数据库上,对于数据库而言,就会产生周期性的压力波峰。于是所有的请求都会达到存储层,存储层的调用量会暴增,造成存储层也会挂掉的情况。
其实集中过期,倒不是非常致命,比较致命的雪崩,是缓存服务器某个节点宕机或断网。因为自然形成的缓存雪崩,一定是在某个时间段几种创建缓存,这个时候,数据库也是可以顶住压力的。无非就是对数据库产生周期性的压力而已,而缓存服务节点的宕机,对数据库服务器造成的压力是不可预知的,很有可能瞬间九八数据库压垮。
解决方案
- redis高可用
这个思想的含义是,既然redis有可能挂掉,那我多增加几台redis,这样一台挂掉之后其他的还可以继续工作,其实就是搭建的集群(异地多活)
- 限流降级
这个解决方案的思想是,在缓存失效后,通过加锁或者队列来控制读数据写缓存的线程数量。比如对某个key只允许一个线程查询数据和写缓存,其他线程等待。(关闭某些服务:双十一退款功能、修改收获地址功能不可用)
- 数据预热
数据预热的含义就是在正式部署之前,我先把可能的数据预先访问一遍,这样部分可能大量访问的数据就会加载倒缓存中。在即将发生大并发访问钱手动触发加载缓存不同的key,设置不同的过期时间,让缓存失效的时间点尽量均匀。



