栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

基于redis-stream类型实现商品的异步发布

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

基于redis-stream类型实现商品的异步发布

基于redis-stream类型实现商品的异步发布 简介

​ Redis5.0带来了Stream类型。从字面上看是流类型,但其实从功能上看,应该是Redis对消息队列(MQ,Message Queue)的完善实现。用过Redis做消息队列的都了解,基于Reids的消息队列实现有很多种,例如:

  • PUB/SUB,订阅/发布模式
  • 基于List的 LPUSH+BRPOP 的实现
  • 基于Sorted-Set的实现

​ Redis5.0中发布的Stream类型,也用来实现典型的消息队列。该Stream类型的出现,几乎满足了消息队列具备的全部内容,包括但不限于:

  • 消息ID的序列化生成
  • 消息遍历
  • 消息的阻塞和非阻塞读取
  • 消息的分组消费
  • 未完成消息的处理
  • 消息队列监控

参考文档

https://zhuanlan.zhihu.com/p/60501638

https://www.cnblogs.com/jing1208/p/14201069.html
实现商品的异步发布 1、创建 Jobs派遣任务
php artisan make:job Goods/AddGoodsJob
2、初始化生产者 , 创建消费者组,并且在 handle()中执行消费
data = $goods_info;
			
            //消息的盐标识
            $this->data['salt'] = $this->data['store_id'] .'_'. substr(uniqid(),-7);

            //监听消费队列的标识
            $this->connection = "redis";

            $addGoodsStreamService = new AddGoodsStreamService();

            //生产者数据写入队列
            $addGoodsStreamService->push($this->queue_name,$this->data);
        }catch (Exception $e){
            Log::info('生产者异常捕获:'.$e->getMessage());
        }
    }

    
    public function handle()
    {
        $addGoodsStreamService = new AddGoodsStreamService();

        //消费者消费信息
        $addGoodsStreamService->pop($this->queue_name,function ($message,$xread_key) use (&$addGoodsStreamService){
            //从消费队列中获取到详细的信息
            $goods_info = $message[$this->queue_name][$xread_key[0]];

            try{
                //执行商品操作
                $addGoodsStreamService->doEditGoods($goods_info);
            }catch (Exception $e){
                throw  new  BadRequestHttpException($e->getMessage());
            }
        });
    }
}

3、创建service实现生产者,消费者组,消费者消费消息的功能(AddGoodsStreamService.php)

​ 消费者在消费数据,ack后redis并不会释放资源,需要手动释放资源(xdel)

​ 消息消费了但未ack的消息可以通过(xpending)查看未ack的消息

​ 消息消费失败的数据的数据会存储在死讯队列中,可以通过死讯队列(xrange)重新获取消息重新消费(由定时任务完成即可)

redis = app('redis.connection');
    }

    
    public function push($queue,$messageBody){
        //生产者写入消息队列
        $goods_queue = $this->redis->xadd($queue,'*',$messageBody);

        if(!$goods_queue){
            Log::info('添加商品消息写入失败');

            return false;
        }

        //如果没有创建消费者组,则创建消费者组
        if(!$this->groups($queue)){
            Log::info('创建消费者异常');

            return false;
        }
    }

    
    public function groups($queue){
        //查看队列信息
        $queue_info = $this->redis->xinfo('stream',$queue);

        //消费者组不存在时,创建消费者组
        if($queue_info['groups'] <= 0){
            //添加消费者组
            $create_group = $this->redis->xGroup('create',$queue,$this->group_name,0);

            if(!$create_group)  return false;
        }

        return true;
    }

    
    public function pop($queue,$callback){
        try{
            //消费者消费信息
            $goods_read = $this->redis->xreadGroup($this->group_name,$this->cons_name,[$queue => '>'],1);

            if(!$goods_read){
                Log::info('暂无信息消费');

                return false;
            }

            //获取消费id标识
            $xread_key = array_keys($goods_read[$queue]);

            //执行添加商品的操作
            $resout = $callback($goods_read,$xread_key);

            if($resout){
                Log::info('消息消费失败~');

                return false;
            }

            //ack确认消费,消费成功后删除redis的资源,如果在死信队列中存在消息,则是消费失败没有ack的消息
            //如果重新消费无法消费成功,则认为数据是异常的,xdel消息的同时ack掉数据
            $this->redis->xack($queue,$this->group_name,$xread_key);

            //ack后手动释放资源,不然一直存在redis中
            $this->redis->xdel($queue,$xread_key);

            Log::info('消息消费完成~');


        }catch (Exception $e){
            Log::info('消费者消费数据异常:'.$e->getMessage());
        }
    }

    
    public function getXrangeByUnAck(){
        $msg_info = $this->redis->xRange($this->queue_name,'-','+');

        return $msg_info;
    }

    
    public function doAgainAckByTack(){
        try{
            //从死信队列获取未ack的消息(存在数据已经消费,但未ack的消息,会造成重复消费)
            $goods_list = $this->getXrangeByUnAck();

            if(!count($goods_list)) return false;

            $goodsDao = new GoodsDao();

            foreach ($goods_list as $xread_key => $item){

                //执行商品操作
                $resout = $this->doEditGoods($item,$goodsDao,1);

                if($resout){
                    $this->redis->xack($this->queue_name,$this->group_name,[$xread_key]);

                    //ack后手动释放资源,不然一直存在redis中
                    $this->redis->xdel($this->queue_name,[$xread_key]);
                }
            }

            Log::info('消息重新消费完成~');
        }catch (Exception $e){
            Log::info('消费者重新消费数据异常:'.$e->getMessage());
        }
    }

    
    public function doEditGoods($goods_info,$goodsDao = null,$type = 0){
        try{
            $addGoodsService = new AddGoodsService();

            //若有商品id则为编辑商品
            if(isset($goods_info['goods_commonid'])){
                //编辑商品
                $addGoodsService->editGoodsByStore($goods_info['goods_commonid'],$goods_info);
            }else{
                //如果是死信队列重新消费,则需要检查消息是消费成功,只是ack失败,则重新ack即可
                if($type == 1){
                    $has = $goodsDao->hasGoodsCommBySalt($goods_info['salt']);

                    if($has)    return true;
                }

                //添加商品
                $addGoodsService->addGoodsByStore($goods_info);
            }
            return true;
        }catch (Exception $e){
            throw  new  BadRequestHttpException($e->getMessage());
        }
    }
}

4、创建触发的功能
dispatch($stream);
    }
}

5、创建swoole定时任务,重新消费消息
goodsTimedTask = new GoodsTimedTask();
    }

    protected $i = 0;

    
    public function run(){
        //消息重新消费
        $this->goodsTimedTask->doAgainAckByTack();
    }

    
    public function interval(){
        // 定时器间隔,单位为 ms
        return 60000;
    }

    
    public function isImmediate(){
        return false;
    }
}

6、创建触发任务的脚本

​ 目前设定重新消费一次,消费后无论是否消费成功都判定为无效数据,释放资源(实际应用中应该转移当前消息到其他消费者,当超过指定的消费次数或者超过资源存储的时间时,在释放资源)

addGoodsStreamService = new AddGoodsStreamService();
    }


    
    public function doAgainAckByTack(){
        try{
            $this->addGoodsStreamService->doAgainAckByTack();

            Log::info('ACK :: 定时消费成功');
        }catch (Exception $e){
            Log::info('ACK :: 定时消费失败:'.$e->getMessage());
        }
    }

}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/821956.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号