目录
stream介绍
stream操作命令
追加消息XADD
范围查询XRANGE,XREVRANGE
读取消息命令XREAD
消费组模式相关命令
创建消费组XGROUP CREATE
读取消费组消息XREADGROUP
全部命令
stream实现原理
radix查找和遍历
radix查找过程
radix遍历
radix插入
radix删除
redis相关实现原理
radix相关结构定义
stream结构定义
listpack紧凑列表
消费组功能实现
总结
stream介绍
stream是redis5.0引入的新数据类型,是消息队列的一种实现方式,实现了完整的消息队列功能。主要特点包括:
- stream 提供了消息的持久化和主备复制功能,保证消息不会丢失。阻塞操作,允许消费者等待生产者向流中添加新数据。消费组功能(类似Kafka实现),允许一组客户端协作使用同一消息流的不同部分。
stream操作命令
追加消息XADD
XADD命令向stream中追加消息,如果stream不存在,则创建。消息格式:
XADD key ID field value [feild value ...] key: stream名称,如果不存在则创建 ID: 消息ID,*表示redis自动创建ID field value: 值,K-V格式
执行XADD命令插入内容,返回消息ID,消息ID包含两部分${时间戳}-${序列}
> XADD teststream * name redis version 6.0 "1641651260618-0"
执行XADD命令,指定消息ID
> XADD teststream 1641651260618-1 name java version 1.8 "1641651260618-1"
范围查询XRANGE,XREVRANGE
XRANGE命令实现范围查询,XREVRANGE逆序查询,命令格式:
XRANGE|XREVRANGE key start end [COUNT count] key:straem名称 start:开始ID,- 表示最小 end:结束ID,+ 表示最大 count:返回数量
命令演示
> XRANGE teststream - +
1) 1) "1641651260618-0"
2) 1) "name"
2) "redis"
3) "version"
4) "6.0"
2) 1) "1641651260618-1"
2) 1) "name"
2) "java"
3) "version"
4) "1.8"
# 指定返回数量
> XRANGE teststream - + count 1
1) 1) "1641651260618-0"
2) 1) "name"
2) "redis"
3) "version"
4) "6.0"
# 指定时间参数
> XRANGE teststream 1641651260618 +
1) 1) "1641651260618-0"
2) 1) "name"
2) "redis"
3) "version"
4) "6.0"
2) 1) "1641651260618-1"
2) 1) "name"
2) "java"
3) "version"
4) "1.8"
# 指定开始结束ID参数
> XRANGE teststream 1641651260618-0 1641651260618-0
1) 1) "1641651260618-0"
2) 1) "name"
2) "redis"
3) "version"
4) "6.0"
# XREVRANGE命令逆序遍历
> XREVRANGE teststream + -
1) 1) "1641651260618-1"
2) 1) "name"
2) "java"
3) "version"
4) "1.8"
2) 1) "1641651260618-0"
2) 1) "name"
2) "redis"
3) "version"
4) "6.0"
读取消息命令XREAD
XREAD命令读取消息,可以阻塞或非阻塞,命令格式:
XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...] COUNT :数量 BLOCK :可选,阻塞毫秒数,没有设置就是非阻塞模式 key :stream名称 ID :消息 ID,$表示stream中最大的ID,即读取最新的消息
XREAD命令演示,非阻塞方式
> XREAD STREAMS teststream 0
1) 1) "teststream"
2) 1) 1) "1641651260618-0"
2) 1) "name"
2) "redis"
3) "version"
4) "6.0"
2) 1) "1641651260618-1"
2) 1) "name"
2) "java"
3) "version"
4) "1.8"
XREAD命令演示,阻塞方式
> XREAD BLOCK 1000000 STREAMS teststream $
1) 1) "teststream"
2) 1) 1) "1641653381688-0"
2) 1) "name"
2) "mysql"
3) "version"
4) "8.0"
(29.36s)
# 另开一个终端,push消息
> xadd teststream * name mysql version 8.0
"1641653381688-0"
消费组模式相关命令
创建消费组XGROUP CREATE
XGROUP [CREATE key groupname ID|$ [MKSTREAM]] [SETID key groupname ID|$] [DESTROY key groupname] [CREATEConSUMER key groupname consume
key :stream名称,如果不存在就创建
groupname :消费组名
ID : 指定开始消费ID,或使用 $表示从尾部开始消费,只接受新消息,当前 Stream 消息会全部忽略
XGROUP [CREATE key groupname ID|$ [MKSTREAM]] [SETID key groupname ID|$] [DESTROY key groupname] [CREATEConSUMER key groupname consume key :stream名称,如果不存在就创建 groupname :消费组名 ID : 指定开始消费ID,或使用 $表示从尾部开始消费,只接受新消息,当前 Stream 消息会全部忽略
命令演示
# 创建消费组,从头开始消费 > XGROUP CREATE teststream testgroup1 0 OK # 创建消费组,消费最新消息 > XGROUP CREATE teststream testgroup2 $ OK
读取消费组消息XREADGROUP
XREADGROUP指定消费组消费消息,与XREAD命令一样,XREADGROUP支持阻塞模式,命令格式:
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...] group :消费组名 consumer :消费者名 COUNT : 读取数量 BLOCK : 阻塞毫秒数 key : stream名称 ID : 消息 ID,特殊ID > 表示读取未消费过的消息
命令演示
> XREADGROUP GROUP testgroup1 Alice STREAMS teststream >
1) 1) "teststream"
2) 1) 1) "1641651260618-0"
2) 1) "name"
2) "redis"
3) "version"
4) "6.0"
2) 1) "1641651260618-1"
2) 1) "name"
2) "java"
3) "version"
4) "1.8"
3) 1) "1641653381688-0"
2) 1) "name"
2) "mysql"
3) "version"
4) "8.0"
全部命令
由于篇幅原因,这里不对全部命令一一介绍了,有需要的可以去redis官网自行查阅
| 命令 | 说明 |
| XTRIM | 对流进行修剪,限制长度,ID较小的项目可能会被删除 |
| XDEL | 删除消息 |
| XLEN | 获取消息长度 |
| XRANGE | XREVRANGE | 范围查询 |
| XREAD | 读取消息 |
| 消费组相关 | |
| XGROUP [ CREATE SETID DELCONSUMER DESTROY ] | 消费者相关操作命令 |
| XREADGROUP GROUP | 读取消费组中的消息 |
| XACK | 将消息标记为"已处理" |
| XPENDING | 显示待处理消息的相关信息 |
| XCLAIM | 改变待处理消息的所有权 |
| XINFO [ GROUPS STREAM ] | 查看流和消费者组的相关信息 |
stream实现原理
stream使用radix(基数树)作为存储结构,radix是一树形结构,适合查找和存储,radix特点:
树由叶子结点和非叶子结点组成同级结点按照顺序排列结点的key是由从根节点到结点的所有key组合而成如果结点为空且只有一个子结点,需要与子结点合并
radix查找和遍历
radix查找过程
radix遍历
radix树的遍历与二叉树类似,可以参考二叉树的遍历方式。
radix插入
radix树插入的各种场景
radix删除
删除是插入的逆操作,也需要区分场景,这里不详细介绍。
redis相关实现原理
radix相关结构定义
rax结构定义radix树结构,结点中raxNode.data保存数据,包含key和value。
//radix根结点定义
typedef struct rax {
raxNode *head;
uint64_t numele;
uint64_t numnodes;
} rax;
//
#define RAX_NODE_MAX_SIZE ((1<<29)-1)
typedef struct raxNode {
uint32_t iskey:1;
uint32_t isnull:1;
uint32_t iscompr:1;
uint32_t size:29;
unsigned char data[];
} raxNode;
stream结构定义
stream相关结构定义,内部使用rax存储数据。
typedef struct streamID {
uint64_t ms;
uint64_t seq;
} streamID;
typedef struct stream {
rax *rax;
uint64_t length;
streamID last_id;
rax *cgroups;
} stream;
typedef struct streamIterator {
stream *stream;
streamID master_id;
uint64_t master_fields_count;
unsigned char *master_fields_start;
unsigned char *master_fields_ptr;
int entry_flags;
int rev;
uint64_t start_key[2];
uint64_t end_key[2];
raxIterator ri;
unsigned char *lp;
unsigned char *lp_ele;
unsigned char *lp_flags;
unsigned char field_buf[LP_INTBUF_SIZE];
unsigned char value_buf[LP_INTBUF_SIZE];
} streamIterator;
listpack紧凑列表
除了radix,stream中还有一种数据结构:紧凑列表,用于存储结点数据。从名字就可以看出,紧凑列表是一种结构非常紧凑的数据结构,由于篇幅原因这里不详细介绍了。感兴趣的同学可以研究下。
消费组功能实现
消费组相关功能也是通过radix结构实现,其中streamCG结构保存消费组相关信息。
typedef struct streamCG {
streamID last_id;
rax *pel;
rax *consumers;
} streamCG;
typedef struct streamConsumer {
mstime_t seen_time;
sds name;
rax *pel;
} streamConsumer;
typedef struct streamNACK {
mstime_t delivery_time;
uint64_t delivery_count;
streamConsumer *consumer;
} streamNACK;
总结
截至目前为止,stream类型应该还没有在各大厂应用,Kafka的地位仍然稳固。当然多一种选择也没什么坏处,如果你有消息队列相关需求,但又觉得Kafka有点笨重,不妨试一下stream。即插即用,简单方便。



