Redis5.0带来了Stream类型。从字面上看是流类型,但其实从功能上看,应该是Redis对消息队列(MQ,Message Queue)的完善实现。用过Redis做消息队列的都了解,基于Reids的消息队列实现有很多种,例如:
- PUB/SUB,订阅/发布模式
- 基于List的 LPUSH+BRPOP 的实现
- 基于Sorted-Set的实现
Redis5.0中发布的Stream类型,也用来实现典型的消息队列。该Stream类型的出现,几乎满足了消息队列具备的全部内容,包括但不限于:
- 消息ID的序列化生成
- 消息遍历
- 消息的阻塞和非阻塞读取
- 消息的分组消费
- 未完成消息的处理
- 消息队列监控
参考文档
https://zhuanlan.zhihu.com/p/60501638 https://www.cnblogs.com/jing1208/p/14201069.html实现商品的异步发布 1、创建 Jobs派遣任务
php artisan make:job Goods/AddGoodsJob2、初始化生产者 , 创建消费者组,并且在 handle()中执行消费
data = $goods_info;
//消息的盐标识
$this->data['salt'] = $this->data['store_id'] .'_'. substr(uniqid(),-7);
//监听消费队列的标识
$this->connection = "redis";
$addGoodsStreamService = new AddGoodsStreamService();
//生产者数据写入队列
$addGoodsStreamService->push($this->queue_name,$this->data);
}catch (Exception $e){
Log::info('生产者异常捕获:'.$e->getMessage());
}
}
public function handle()
{
$addGoodsStreamService = new AddGoodsStreamService();
//消费者消费信息
$addGoodsStreamService->pop($this->queue_name,function ($message,$xread_key) use (&$addGoodsStreamService){
//从消费队列中获取到详细的信息
$goods_info = $message[$this->queue_name][$xread_key[0]];
try{
//执行商品操作
$addGoodsStreamService->doEditGoods($goods_info);
}catch (Exception $e){
throw new BadRequestHttpException($e->getMessage());
}
});
}
}
3、创建service实现生产者,消费者组,消费者消费消息的功能(AddGoodsStreamService.php)
消费者在消费数据,ack后redis并不会释放资源,需要手动释放资源(xdel)
消息消费了但未ack的消息可以通过(xpending)查看未ack的消息
消息消费失败的数据的数据会存储在死讯队列中,可以通过死讯队列(xrange)重新获取消息重新消费(由定时任务完成即可)
redis = app('redis.connection');
}
public function push($queue,$messageBody){
//生产者写入消息队列
$goods_queue = $this->redis->xadd($queue,'*',$messageBody);
if(!$goods_queue){
Log::info('添加商品消息写入失败');
return false;
}
//如果没有创建消费者组,则创建消费者组
if(!$this->groups($queue)){
Log::info('创建消费者异常');
return false;
}
}
public function groups($queue){
//查看队列信息
$queue_info = $this->redis->xinfo('stream',$queue);
//消费者组不存在时,创建消费者组
if($queue_info['groups'] <= 0){
//添加消费者组
$create_group = $this->redis->xGroup('create',$queue,$this->group_name,0);
if(!$create_group) return false;
}
return true;
}
public function pop($queue,$callback){
try{
//消费者消费信息
$goods_read = $this->redis->xreadGroup($this->group_name,$this->cons_name,[$queue => '>'],1);
if(!$goods_read){
Log::info('暂无信息消费');
return false;
}
//获取消费id标识
$xread_key = array_keys($goods_read[$queue]);
//执行添加商品的操作
$resout = $callback($goods_read,$xread_key);
if($resout){
Log::info('消息消费失败~');
return false;
}
//ack确认消费,消费成功后删除redis的资源,如果在死信队列中存在消息,则是消费失败没有ack的消息
//如果重新消费无法消费成功,则认为数据是异常的,xdel消息的同时ack掉数据
$this->redis->xack($queue,$this->group_name,$xread_key);
//ack后手动释放资源,不然一直存在redis中
$this->redis->xdel($queue,$xread_key);
Log::info('消息消费完成~');
}catch (Exception $e){
Log::info('消费者消费数据异常:'.$e->getMessage());
}
}
public function getXrangeByUnAck(){
$msg_info = $this->redis->xRange($this->queue_name,'-','+');
return $msg_info;
}
public function doAgainAckByTack(){
try{
//从死信队列获取未ack的消息(存在数据已经消费,但未ack的消息,会造成重复消费)
$goods_list = $this->getXrangeByUnAck();
if(!count($goods_list)) return false;
$goodsDao = new GoodsDao();
foreach ($goods_list as $xread_key => $item){
//执行商品操作
$resout = $this->doEditGoods($item,$goodsDao,1);
if($resout){
$this->redis->xack($this->queue_name,$this->group_name,[$xread_key]);
//ack后手动释放资源,不然一直存在redis中
$this->redis->xdel($this->queue_name,[$xread_key]);
}
}
Log::info('消息重新消费完成~');
}catch (Exception $e){
Log::info('消费者重新消费数据异常:'.$e->getMessage());
}
}
public function doEditGoods($goods_info,$goodsDao = null,$type = 0){
try{
$addGoodsService = new AddGoodsService();
//若有商品id则为编辑商品
if(isset($goods_info['goods_commonid'])){
//编辑商品
$addGoodsService->editGoodsByStore($goods_info['goods_commonid'],$goods_info);
}else{
//如果是死信队列重新消费,则需要检查消息是消费成功,只是ack失败,则重新ack即可
if($type == 1){
$has = $goodsDao->hasGoodsCommBySalt($goods_info['salt']);
if($has) return true;
}
//添加商品
$addGoodsService->addGoodsByStore($goods_info);
}
return true;
}catch (Exception $e){
throw new BadRequestHttpException($e->getMessage());
}
}
}
4、创建触发的功能
dispatch($stream);
}
}
5、创建swoole定时任务,重新消费消息
goodsTimedTask = new GoodsTimedTask();
}
protected $i = 0;
public function run(){
//消息重新消费
$this->goodsTimedTask->doAgainAckByTack();
}
public function interval(){
// 定时器间隔,单位为 ms
return 60000;
}
public function isImmediate(){
return false;
}
}
6、创建触发任务的脚本
目前设定重新消费一次,消费后无论是否消费成功都判定为无效数据,释放资源(实际应用中应该转移当前消息到其他消费者,当超过指定的消费次数或者超过资源存储的时间时,在释放资源)
addGoodsStreamService = new AddGoodsStreamService();
}
public function doAgainAckByTack(){
try{
$this->addGoodsStreamService->doAgainAckByTack();
Log::info('ACK :: 定时消费成功');
}catch (Exception $e){
Log::info('ACK :: 定时消费失败:'.$e->getMessage());
}
}
}



