栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Python

Python一些可能用的到的函数系列81 基于Redis Stream的简单消息队列对象

Python 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Python一些可能用的到的函数系列81 基于Redis Stream的简单消息队列对象

说明

一个实现消息队列简单管理的对象

内容

代码

import redis
# 基于Redis Stream的消息队列
class LittleRQ:
    def __init__(self, host, port, decode_responses=True,password=None):
        self.conn  = redis.Redis(host=host, port=int(port), decode_responses=decode_responses,password=password) 
    
    def del_a_stream(self,stream_name):
        if self.conn.exists(stream_name):
            self.conn.delete(stream_name)
        return True
    # 向队列存入一条消息
    def add_msg(self,stream_name, msg_dict, maxlen=10, approximate=False):
        self.conn.xadd(stream_name, msg_dict,maxlen=maxlen, approximate=approximate)
        return True
    # 队列的长度
    def len_of_queue(self, stream_name):
        try:
            recs = self.conn.xlen(stream_name)
        except:
            recs = 0
        return recs
    # 确保组的存在
    def ensure_group(self, stream_name, group_name ='group1', order='fifo'):
        try:
            res = self.conn.xinfo_groups(stream_name)
            if len(res) == 0:
                if order.lower() == 'fifo':
                    self.conn.xgroup_create(stream_name, group_name, id=0)
                else:
                    self.conn.xgroup_create(stream_name, group_name, id='$')                
        except:
            if order.lower() == 'fifo':
                self.conn.xgroup_create(stream_name, group_name, id=0)
            else:
                self.conn.xgroup_create(stream_name, group_name, id='$')
        return True
    # 检查一个stream的状态
    def get_a_stream_state(self, stream_name):
        try:
            return self.conn.xinfo_stream(stream_name)
        except:
            return False
    # 检查一个group的状态
    def get_a_stream_groups(self, stream_name):
        try:
            return self.conn.xinfo_groups(stream_name)
        except:
            return False
    
    # 删掉一个队列
    def del_a_stream(self, stream_name):
        if self.conn.exists(stream_name):
            self.conn.delete(stream_name)
        return True
    
    # 获取消息
    def get_msg(self, stream_name, group_name ='group1', consumer_name = 'consumer1' , count=10000, block=None, msg_id = None):
        # 历史消息
        consumer_id='0-0'
        recs1 = self.conn.xreadgroup(group_name, consumer_name, {stream_name: consumer_id}, block=block, count=count) 
            
        # 指定的新消息
        consumer_id =msg_id or '>'
        recs2 = self.conn.xreadgroup(group_name, consumer_name, {stream_name: consumer_id}, block=block, count=count) 
        
        recs = recs1 + recs2
        # tuple_list
        if len(recs):
            return recs[0][1]
        else:
            return False
    # 获取被耽误的消息
    def get_pending_info(self, stream_name, group_name='group1'):
        pending_info = self.conn.xpending(stream_name, group_name)
        return pending_info
    # 获取被耽误的消息
    def get_pending_msg(self, stream_name, group_name='group1', consumer_name = 'consumer1' ,count = 100):
        pending_info = self.get_pending_info(stream_name, group_name = group_name)
        pending_num = pending_info.get('pending') 
        print('>>>', pending_num)
        if pending_num>0:
            recs =  self.get_msg(stream_name,group_name=group_name,consumer_name=consumer_name,count=count , msg_id =pending_info.get('min')  )
        else:
            recs = []
        return recs
    # 确认消息
    def ack_msg(self, stream_name,mid ,group_name='group1'):
        return self.conn.xack(stream_name, group_name, mid)

使用

# 1 建立实例
lq = LittleRQ('172.17.0.1',6379, password='YOURPASS' )
# 2 数据
some_data = {'name': 'Ashley', 'age': '16'}
# 3 队列名
stream_name = 'test1'
# 4 存消息
for i in range(int(10)):
    lq.add_msg(stream_name, some_data, maxlen =int(10))
# 5 查看队列长度
lq.len_of_queue(stream_name)
# 6 查看pending的消息
pending_msg = lq.get_pending_msg(stream_name)
# 7 将pending消息确认掉
for k in pending_msg:
    lq.ack_msg(stream_name ,k[0])
# 8 为某个队列增加组
# 不在当前列表集合中
lq.ensure_group(stream_name)
# 9 获取消息列表/没有消息时返回False
rec_tuple_list = lq.get_msg(stream_name)
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/580801.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号