栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

使用Redis搭建消息队列(python版)

使用Redis搭建消息队列(python版)

最近在工作中遇到了一个场景是这样的:

每到月初我们需要向上个月考勤有异常的同学的企业微信推送异常考勤提醒,让有异常的同学及时处理:补卡或者提交对应的请假申请等等。之前的做法是直接循环数据库,查处有异常的同学的考勤数据,然后推送给到相关的同学。一次推送的数据量在1500左右。

这种方法存在的问题,因为是通过同步的定时任务的方式,会因为数据量太大导致定时任务执行超时,导致事务回滚,数据库中未创建对应消息的记录,所以导致消息无法确认。

在发现这个问题之后,我们的第一反应是把定时任务做成异步的,修改的实现方式是,添加一个异步的装饰器,在定时任务触发之后,在后台另外开启一个线程执行查找异常考勤和发送异常消息。

   

当然这种可以解决问题,但不是最优解,所以在新版本的时候,我们就考虑使用消息队列MQ的方式来解决消息的推送问题,最开始我的设想是使用RabbitMQ的发布订阅模式来解决,但是在和公司运维了解情况后发现,公司使用的腾讯云对RabbitMQ的支持并不是很好,和运维沟通后决定使用Redis来实现消息队列。

由于之前也没有基于Redis的实践,所以我其实是在网上找了一些资料,才把问题解决。

  先说结论,基于Redis的实现其实是相当简单的,网上也有现成的代码可供参考,我在post出相关代码之后,会列出我在实现的过程中遇到哪些坑。

代码实现:

文件1:先写一个获取Redis配置的类:

import redis
import logging
logger = logging.getLogger(__name__)


class RedisHelper(object):
    """
    host: redis ip
    port: redis port
    channel: 发送接送消息的频道
    """
    def __init__(self, host, port, db, channel, password=None):
        self.host = host
        self.port = port
        self.db = db
        self.password = password
        self.channel = channel
        self.__conn = redis.Redis(self.host,
                                  self.port,
                                  self.db,
                                  self.password,
                                  decode_responses=True)

    def ping(self):
        try:
            self.__conn.ping()
            return True
        except Exception as e:
            logger.exception(f"连接Redis失败,失败原因为:{e}")
            return False

    # 发送消息
    def public(self,
               msg):
        if self.ping():
            self.__conn.publish(self.channel,
                                msg)
            return True
        else:
            return False

    # 订阅
    def subscribe(self):
        if self.ping():
            pub = self.__conn.pubsub()
            pub.subscribe(self.channel)
            pub.parse_response()
            return pub
        else:
            return False

文件2: 业务文件:发布方

config = self.env.ref('sf_conference_management.wxagent_config_detail_conference_default').sudo()
obj = RedisHelper(host=config.redis_host, port=config.redis_port, db=14, channel='channel:1', password=config.redis_password)
for user_id, message in message_dict.items():
    info = {
                            'title': f'{first.month}月考勤确认',
                            'description': message,
                            'url': url,
                            'user_id': user_id.id,
                            'task_id': str(int(time.time() * 1000)) + ''.join([random.choice('0123456789') for _ in range(5)]),
                            'btn': [{'key': '/confirm/i', 'name': '确认无误'}]
                        }
    _logger.info(f'写入信息为:{info}')
    obj.public(msg=json.dumps(info))

把每一条信息写入到redis的消息队列中

接收方:

@api.model
def sub_info(self):
        """消费消息"""
    config = self.env.ref('sf_conference_management.wxagent_config_detail_conference_default').sudo()
        obj = RedisHelper(host=config.redis_host, port=config.redis_port, db=14, channel='channel:1',
                          password=config.redis_password)
        sub_obj = obj.subscribe()
    while True:
        if sub_obj:
            msg = sub_obj.parse_response()
            info_dict = json.loads(msg[2])
            user_id = info_dict.pop('user_id')
            user_obj = self.env['res.users'].browse(user_id)
            _logger.info(f"发送考勤异常消息给{user_obj.name}.....")
            self.send_qywx_msg(user_obj, msgtype='interactive_taskcard', parameter={
                    'interactive_taskcard': info_dict})
                self.env.cr.commit()
                time.sleep(0.1)
        else:
            break

这个地方,接受方我做了个装饰器,是为了可以在定时任务里面调用该函数。

代码写好之后,我发现先执行发送消息的定时任务,然后再点击获取信息的定时任务发现,并没有把消息实际发送出去,这让我一度怀疑是不是代码写错了,直到第二天我无意间先点击接收方,然后再点击发送方之后,有意思的事情发生了,消息开始一条条的发出来,所以这个地方的逻辑其实是如下图这样的。

 必须要在先开启接收方之后,再发送之后才能及时的都发送出去。大家切记。

 

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/779355.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号