一个实现消息队列简单管理的对象
内容代码
import redis
# 基于Redis Stream的消息队列
class LittleRQ:
def __init__(self, host, port, decode_responses=True,password=None):
self.conn = redis.Redis(host=host, port=int(port), decode_responses=decode_responses,password=password)
def del_a_stream(self,stream_name):
if self.conn.exists(stream_name):
self.conn.delete(stream_name)
return True
# 向队列存入一条消息
def add_msg(self,stream_name, msg_dict, maxlen=10, approximate=False):
self.conn.xadd(stream_name, msg_dict,maxlen=maxlen, approximate=approximate)
return True
# 队列的长度
def len_of_queue(self, stream_name):
try:
recs = self.conn.xlen(stream_name)
except:
recs = 0
return recs
# 确保组的存在
def ensure_group(self, stream_name, group_name ='group1', order='fifo'):
try:
res = self.conn.xinfo_groups(stream_name)
if len(res) == 0:
if order.lower() == 'fifo':
self.conn.xgroup_create(stream_name, group_name, id=0)
else:
self.conn.xgroup_create(stream_name, group_name, id='$')
except:
if order.lower() == 'fifo':
self.conn.xgroup_create(stream_name, group_name, id=0)
else:
self.conn.xgroup_create(stream_name, group_name, id='$')
return True
# 检查一个stream的状态
def get_a_stream_state(self, stream_name):
try:
return self.conn.xinfo_stream(stream_name)
except:
return False
# 检查一个group的状态
def get_a_stream_groups(self, stream_name):
try:
return self.conn.xinfo_groups(stream_name)
except:
return False
# 删掉一个队列
def del_a_stream(self, stream_name):
if self.conn.exists(stream_name):
self.conn.delete(stream_name)
return True
# 获取消息
def get_msg(self, stream_name, group_name ='group1', consumer_name = 'consumer1' , count=10000, block=None, msg_id = None):
# 历史消息
consumer_id='0-0'
recs1 = self.conn.xreadgroup(group_name, consumer_name, {stream_name: consumer_id}, block=block, count=count)
# 指定的新消息
consumer_id =msg_id or '>'
recs2 = self.conn.xreadgroup(group_name, consumer_name, {stream_name: consumer_id}, block=block, count=count)
recs = recs1 + recs2
# tuple_list
if len(recs):
return recs[0][1]
else:
return False
# 获取被耽误的消息
def get_pending_info(self, stream_name, group_name='group1'):
pending_info = self.conn.xpending(stream_name, group_name)
return pending_info
# 获取被耽误的消息
def get_pending_msg(self, stream_name, group_name='group1', consumer_name = 'consumer1' ,count = 100):
pending_info = self.get_pending_info(stream_name, group_name = group_name)
pending_num = pending_info.get('pending')
print('>>>', pending_num)
if pending_num>0:
recs = self.get_msg(stream_name,group_name=group_name,consumer_name=consumer_name,count=count , msg_id =pending_info.get('min') )
else:
recs = []
return recs
# 确认消息
def ack_msg(self, stream_name,mid ,group_name='group1'):
return self.conn.xack(stream_name, group_name, mid)
使用
# 1 建立实例
lq = LittleRQ('172.17.0.1',6379, password='YOURPASS' )
# 2 数据
some_data = {'name': 'Ashley', 'age': '16'}
# 3 队列名
stream_name = 'test1'
# 4 存消息
for i in range(int(10)):
lq.add_msg(stream_name, some_data, maxlen =int(10))
# 5 查看队列长度
lq.len_of_queue(stream_name)
# 6 查看pending的消息
pending_msg = lq.get_pending_msg(stream_name)
# 7 将pending消息确认掉
for k in pending_msg:
lq.ack_msg(stream_name ,k[0])
# 8 为某个队列增加组
# 不在当前列表集合中
lq.ensure_group(stream_name)
# 9 获取消息列表/没有消息时返回False
rec_tuple_list = lq.get_msg(stream_name)



