- 1 消息ID
- 2 消息内容
- 3 基础指令
- 4 独立消费
- 5 消费组消费
- 6 消息删除
Stream是Redis5.0新增的一个数据结构,是一个支持多播的可持久化消息队列。
Stream拥有一个消息列表,将所有的消息串起来,每个消息都有一个唯一ID和对应的内容,消息是持久化的,Redis重启后,内容还在。
每个Stream都有唯一的名称,就是Redis中的Key,结构在第一使用xadd指令追加消息时自动创建。
每个Stream都可以挂多个消费组,每个消费组都有一个游标last_delivered_id用来记录当前消费到的位置,没个消费组都要有一个Stream内的唯一名称,并且需要通过xgroup creat指令主动创建,创建时需要指定消息的初始消费ID,用来初始化last_delivered_id。消费组之间相互独立,一个消费组可以挂多个消费者,任意一个消费者读取了消息都会使得last_delivered_id后移,每个消费者有消费组的内的唯一名称。消费者内部有一个状态变量pending_ids,pending_ids被官方称为PEL,也就是Pending Entries List,里面存储着消费者读到取的,但是还没有ack(就是通知队列消费完成,通过xack指令通知)的消息ID,读取时载入,ack后移除,这是一个很重要的结构,用来保证消息至少被消费了一次,而不会在网络传输中丢失了没处理(具体见下面xreadgroup指令)。
Redis的主从同步是异步的,追求最终一致性,因此发生主从切换时可能丢失部分消息,这是Redis本身的特性导致,和Redis的数据结构无关。
Stream的消费模型借用了Kafka的消费分组概念,弥补了Redis没有持久化消息队列的空白,不同于Kafka的是Redis不支持分区,也就是说如果需要分区,需要自己创建多个Stream,然后在业务层面进行控制。而Kafka支持分区,而且能动态增加分区,虽然这种分区不会对之前的旧消息rehash。
Stream内部的默认的消息ID形式是:timestampInMillis-sequence,例如1632476036193-5,前者表示ID生成的时间戳,后者表示是该毫秒内生成的第6个ID(因为从0开始),消息ID可以自动生成,也可以指定,但必须是整数-整数的形式,而且必须后面加入的消息ID要大于前面加入的消息ID。
2 消息内容Stream内部的消息内容就是键值对,可以理解为内容就是一个hash结构。
3 基础指令- xadd streamKey * name zhangsan age 21 : streamKey就是Stream的key, * 标识自动生成消息ID,name zhangsan age 21就是消息内容。
- xdel streamKey msgId: 根据消息ID删除消息,实质上只是打了一个标记,并不是一次,计算消息队列长度时依然会被纳入统计
- xlen streamKey : 返回消息长度,包括已打上删除标记的消息
- xrange streamKey startMsgId endMsgId :获取指定区间的消息列表,包括startMsgId和endMsgId,但是不再包含打上删除标记的消息, 这个id可以用-标识开头,用+标识结束,比如xrange streamKey - +就是获取所有消息
- del streamKey : 删除Stream
示例:
111.0.0.1:6379>xadd testStream * name zhangsan age 21 1632476036193-0 111.0.0.1:6379>xadd testStream * name lisi age 22 1632476040193-0 111.0.0.1:6379>xadd testStream * name wangwu age 23 1632476047523-0 111.0.0.1:6379>xlen testStream (integer)3 111.0.0.1:6379>xrang testStream 1632476040193-0 + 1) 1) 1632476040193-0 2) 1) "name" 2) "lisi" 3) "age" 4) "22" 2) 1) 1632476047523-0 2) 1) "name" 2) "wangwu" 3) "age" 4) "23" 111.0.0.1:6379>xdel testStream 1632476047523-0 (integer)1 111.0.0.1:6379>xlen testStream (integer)3 111.0.0.1:6379>xrang testStream 1632476040193-0 + 1) 1) 1632476040193-0 2) 1) "name" 2) "lisi" 3) "age" 4) "22"4 独立消费
Stream也支持在不定义消费组的情况下独立消费,此时Stream看起来就和普通的list一样。独立消费使用xread消费消息,没有消息时阻塞等待。
xread block 0 count 2 streams testStream 0-0 : block 0表示阻塞等待多久,单位是毫秒,0表示无限等待,不写此参数表示立即返回,没消息就返回nil, count 2表示一次读两条,streams testStream是指定消费testStream , 0-0这是填起始消息ID,0-0表示从头开始,毕竟根据ID生成规则,自动生成的ID肯定是大于0-0的, 0-0填 $ 表示从末尾开始,就是只接受新消息了
111.0.0.1:6379>xread count 2 streams testStream 0-0 1) "testStream" 2) 1) 1) 1632476036193-0 2) 1) "name" 2) "zhangsan" 3) "age" 4) "21" 2) 1) 1632476040193-0 2) 1) "name" 2) "lisi" 3) "age" 4) "22" 111.0.0.1:6379>xread count 1 streams testStream $ (nil) //后续没有消息了,返回nil 111.0.0.1:6379>xread block 0 count 1 streams testStream $ // 没有消息无限阻塞在这里 // 新开一个窗口 start 111.0.0.1:6379>xadd testStream * name liuliu age 24 1632476048913-0 // 新开一个窗口 end // 下面再看原来的窗口,阻塞解除了,还显示一个等待时间 111.0.0.1:6379>xread block 0 count 1 streams testStream $ 1) "testStream" 2) 1) 1) 1632476036193-0 2) 1) "name" 2) "liuliu " 3) "age" 4) "24" (9.91s)
使用xread读取消息时需要记录消息读取到哪里了,就是上一条消息的ID,这样下一次读取时把这个ID作为参传入,才能实现顺序无遗漏的读取。
5 消费组消费Stream用xgroup create指令创建消费组,需要传递起始消息ID用来初始化last_delivered_id。
- xgroup create testStream g1 0-0 : testStream指定Stream, g1为消费组的名称 0-0标识起始消息ID,可以用$表示从尾部开始,忽略之前的消息,只接受新消息
- xinfo stream testStream : 查看testStream的信息
- xinfo groups testStream: 查看testStream的消费组信息
111.0.0.1:6379>xgroup create testStream g1 0-0 OK 111.0.0.1:6379>xgroup create testStream g2 $ OK 111.0.0.1:6379>xinfo stream testStream // 查看testStream的信息 1)length 2)(integer)3 // 现在testStream有3条消息 3)radix-tree-keys 4)(integer)1 5)radix-tree-nodes 6)(integer)2 7)groups 8)(integer)2 // 现在testStream有2个消费组,就是上面创建的g1和g2 9)first-entry // 第一条消息 10) 1) 1632476036193-0 2) 1) "name" 2) "zhangsan" 3) "age" 4) "21" 11)last-entry // 最后一条消息 12) 1) 1632476047523-0 2) 1) "name" 2) "wangwu" 3) "age" 4) "23" 111.0.0.1:6379>xinfo groups testStream // 查看Stream的消费组信息 1) 1)name 2)"g1" 3)consumers 4)(integer)0 // 消费组里面有0个消费者 5)pending 6)(integer)0 // 消费组没有正在处理的消息:就是上面说的读取了还没ACK的消息 2) 1)name 2)"g2" 3)consumers 4)(integer)0 // 消费组里面有0个消费者 5)pending 6)(integer)0 // 消费组没有正在处理的消息:就是上面说的读取了还没ACK的消息
建立完消费组之后就可以通过xreadgroup指令消费了,xreadgroup需要指定消费组,而且可以自动从last_delivered_id开始递增读取,xreadgroup也可以阻塞等待消息。读到消息之后,对应的消息ID就会进入消费者的PEL中,客户端处理完毕后使用xack指令通知服务器消费完成,该消息ID就会从PEL中移除。
- xreadgroup GROUP g1 c1 block 0 count 1 streams testStream > : GROUP g1表示指定消费组,c1表示消费者的名称, block 0表示阻塞获取时间,单位ms,0表示无限等待,此参数选填,不写表示不阻塞直接返回,无消息返回nil, count 1表示读取1条消息, streams testStream指定Stream,>表示从当前消费组的last_delivered_id往后读,也可以设置成消息的ID,从指定位置读取有效消息,有效消息是指没有接到ACK的消息,也就是说指定ID后,读取消息的范围是PEL中没有ACK的消息和last_delivered_id之后的消息,而不是所有消息队列中符合要求的数据。这个机制能很好的避免消息丢失:客户端读取消息时,服务端将消息发送给客户端时客户端宕机会导致消息丢失而且last_delivered_id已经后移。但是消息ID此时已进入PEL,因此客户端重启之后只需用xreadgroup 并将ID传入 0-0,那么那些没有ACK的消息就会被重新消费,从而避免了消息丢失。
- xinfo consumers testStream g1 : 查看testStream 的g1消费组的消费者信息
- xack testStream g1 1632476036193-0 : 通知服务器testStream 中的g1 消费组已消费1632476036193-0完毕,可以传入多个ID,一次ACK多个,ACK之后的消息ID就会从PEL中移除。处理完一定要记得ACK,如果不ACK,那么消息ID就会一直保存在PEL中,消费的消息越多,占据的内存也会越大。
111.0.0.1:6379>xreadgroup GROUP g1 c1 count 1 streams testStream > 1) "testStream" 2) 1) 1) 1632476036193-0 2) 1) "name" 2) "zhangsan" 3) "age" 4) "21" 111.0.0.1:6379>xreadgroup GROUP g1 c1 count 2 streams testStream > 1) "testStream" 2) 1) 1) 1632476040193-0 2) 1) "name" 2) "lisi" 3) "age" 4) "22" 2) 1) 1632476047523-0 2) 1) "name" 2) "wangwu" 3) "age" 4) "23" 111.0.0.1:6379>xreadgroup GROUP g1 c1 count 1 streams testStream > (nil) //后续没有消息了,返回nil 111.0.0.1:6379>xreadgroup GROUP g1 c1 block 0 count 1 streams testStream > // 没有消息无限阻塞在这里 // 新开一个窗口 start 111.0.0.1:6379>xadd testStream * name liuliu age 24 1632476048913-0 // 新开一个窗口 end // 下面再看原来的窗口,阻塞解除了,还显示一个等待时间 111.0.0.1:6379>xread block 0 count 1 streams testStream $ 1) "testStream" 2) 1) 1) 1632476036193-0 2) 1) "name" 2) "liuliu " 3) "age" 4) "24" (9.91s) 111.0.0.1:6379>xinfo groups testStream // 查看Stream的消费组信息 1) 1)name 2)"g1" 3)consumers 4)(integer)1 // 就有一个消费者了 5)pending 6)(integer)4 // 刚才上面读的4条都没有ACK 2) 1)name 2)"g2" 3)consumers 4)(integer)0 // 没操作还是0个消费者 5)pending 6)(integer)0 // 没读取消息,所以还是0 111.0.0.1:6379>xinfo consumers testStream g1 // 查看消费组的消费者信息 1) 1)name 2)"c1" 3)pending 4)(integer)4 // 4条没有ACK 5)idle 6)(integer)31586 // 空闲了31586 没有接收到消息了 111.0.0.1:6379>xack testStream g1 1632476036193-0 // ack一下 (integer)1 111.0.0.1:6379>xinfo consumers testStream g1 // 查看消费组的消费者信息 1) 1)name 2)"c1" 3)pending 4)(integer)3 // 就变成3条了 5)idle 6)(integer)33586 // 空闲了33586 没有接收到消息了 111.0.0.1:6379>xack testStream g1 1632476040193-0 1632476047523-0 1632476036193-0// 可以一次ack多条 (integer)36 消息删除
上面提到xdel指令仅仅是给消息打上了删除标记,消息其实并没有删除,还占据着内存,这意味着随着消息会越积累越多,占据的内存也越来越多,Stream并没有按照ID来删除消息的指令,而是在xadd时提供了一个定长截断的参数:
- xadd testStream maxlen 3 * name qiqi age 25 : maxlen 3 表示在这条消息添加完成后,仅保留最新的3条,之前的消息进行删除。 值得注意的是,此命令并不会关注这个消息是否消费,如果一个消费组消费的快,一个消费组消费的慢,那么可能出现快的消费组把消息删除了导致慢消费组丢失的情况,需要设置合理的保留值避免这种情况的发生
PS:
【JAVA核心知识】系列导航 [持续更新中…]
关联导航:Redis应用篇
关联导航:Redis基础篇
关联导航:Redis原理篇
关联导航:Redis集群篇
欢迎关注…
参考资料:
《Redis深度历险》



