栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Python

Python 全栈系列122 redis消息队列搭建

Python 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Python 全栈系列122 redis消息队列搭建

说明

不太喜欢rabbitmq之类的消息中间件,出问题不太好调试。打算使用redis替代。

内容 1 安装 1.1 版本

以docker方式安装,可以作为每台主机的一个标配。(其他的标配数据库还有mongo和neo4j, mysql勉强算吧)

考虑到以后的迁移使用,我不希望使用lastest这样的版本,因为可能包的对接调用会失败。redis的版本特性可以参考

可以考虑使用redis 6.0, 详细介绍可以参考这篇文章

Redis6.0之前为什么一直不使用多线程?

官方曾做过类似问题的回复:使用Redis时,几乎不存在CPU成为瓶颈的情况, Redis主要受限于内存和网络。例如在一个普通的Linux系统上,Redis通过使用pipelining每秒可以处理100万个请求,所以如果应用程序主要使用O(N)或O(log(N))的命令,它几乎不会占用太多CPU。

使用了单线程后,可维护性高。多线程模型虽然在某些方面表现优异,但是它却引入了程序执行顺序的不确定性,带来了并发读写的一系列问题,增加了系统复杂度、同时可能存在线程切换、甚至加锁解锁、死锁造成的性能损耗。Redis通过AE事件模型以及IO多路复用等技术,处理性能非常高,因此没有必要使用多线程。单线程机制使得 Redis 内部实现的复杂度大大降低,Hash 的惰性 Rehash、Lpush 等等 “线程不安全” 的命令都可以无锁进行。

Redis6.0为什么要引入多线程呢?

Redis将所有数据放在内存中,内存的响应时长大约为100纳秒,对于小数据包,Redis服务器可以处理80,000到100,000 QPS,这也是Redis处理的极限了,对于80%的公司来说,单线程的Redis已经足够使用了。

但随着越来越复杂的业务场景,有些公司动不动就上亿的交易量,因此需要更大的QPS。常见的解决方案是在分布式架构中对数据进行分区并采用多个服务器,但该方案有非常大的缺点,例如要管理的Redis服务器太多,维护代价大;某些适用于单个Redis服务器的命令不适用于数据分区;数据分区无法解决热点读/写问题;数据偏斜,重新分配和放大/缩小变得更加复杂等等。

从Redis自身角度来说,因为读写网络的read/write系统调用占用了Redis执行期间大部分CPU时间,瓶颈主要在于网络的 IO 消耗, 优化主要有两个方向:

• 提高网络 IO 性能,典型的实现比如使用 DPDK 来替代内核网络栈的方式
• 使用多线程充分利用多核,典型的实现比如 Memcached。

协议栈优化的这种方式跟 Redis 关系不大,支持多线程是一种最有效最便捷的操作方式。所以总结起来,redis支持多线程主要就是两个

默认多线程是不开启的,需要在配置文件中声明

所以其实用新版的正常情况下不必做什么改变,留个念想,万一以后要用。

1.2 安装

感觉这篇文章介绍的比较好。

  • 1 拉取6.0镜像
docker pull redis:6.0
  • 2 准备文件夹(持久化数据以及配置文件)

我打算放在/opt文件夹下面

# mkdir -p 创建多级目录
mkdir -p /opt/redis/data
# 创建配置
touch /opt/redis/redis.conf 
  • 3 在配置文件中配置密码
vim /opt/redis/redis.conf 

---
requirepass YOURPASS

  • 4 启动

先启动一个实验态的容器,结束后自动删除

# 实验态
docker run -it --name='m5_redis'
               --rm
               -v /etc/localtime:/etc/localtime  
               -v /etc/timezone:/etc/timezone
               -e "LANG=C.UTF-8"
               -p 6379:6379
               -v /opt/redis/data:/data
               -v /opt/redis/redis.conf:/etc/redis/redis.conf
               redis:6.0 
               redis-server /etc/redis/redis.conf 
               --appendonly yes


可以看到有一些告警,应该是有一个关键设置没有弄,在低内存的时候可能会导致失败。可以修改一下镜像

# 打开一个容器
docker run -it -v /etc/localtime:/etc/localtime  
               -v /etc/timezone:/etc/timezone
               -e "LANG=C.UTF-8"
               redis:6.0 bash

# 增加一个变量文件(用sysctl的方式没办法生效,容器没有sysctl)
echo 1 > /proc/sys/vm/overcommit_memory

# 提交
docker commit  0ac73e69ed9a registry.cn-hangzhou.aliyuncs.com/YOURIMAGE:v1

# 推送
docker push  registry.cn-hangzhou.aliyuncs.com/YOURIMAGE:v1

再次启动时,没有了告警

1:C 22 Nov 2021 13:15:17.604 # oO0OoO0OoO0Oo Redis is starting oO0OoO0OoO0Oo
1:C 22 Nov 2021 13:15:17.604 # Redis version=6.0.16, bits=64, commit=00000000, modified=0, pid=1, just started
1:C 22 Nov 2021 13:15:17.604 # Configuration loaded
                _._
           _.-``__ ''-._
      _.-``    `.  `_.  ''-._           Redis 6.0.16 (00000000/0) 64 bit
  .-`` .-```.  ```/    _.,_ ''-._
 (    '      ,       .-`  | `,    )     Running in standalone mode
 |`-._`-...-` __...-.``-._|'` _.-'|     Port: 6379
 |    `-._   `._    /     _.-'    |     PID: 1
  `-._    `-._  `-./  _.-'    _.-'
 |`-._`-._    `-.__.-'    _.-'_.-'|
 |    `-._`-._        _.-'_.-'    |           http://redis.io
  `-._    `-._`-.__.-'_.-'    _.-'
 |`-._`-._    `-.__.-'    _.-'_.-'|
 |    `-._`-._        _.-'_.-'    |
  `-._    `-._`-.__.-'_.-'    _.-'
      `-._    `-.__.-'    _.-'
          `-._        _.-'
              `-.__.-'

1:M 22 Nov 2021 13:15:17.606 # Server initialized
1:M 22 Nov 2021 13:15:17.606 * Ready to accept connections

进入运行态

docker run -d --name='m5_redis'
               --restart=always 
               -v /etc/localtime:/etc/localtime  
               -v /etc/timezone:/etc/timezone
               -e "LANG=C.UTF-8"
               -p 6379:6379
               -v /opt/redis/data:/data
               -v /opt/redis/redis.conf:/etc/redis/redis.conf
               registry.cn-hangzhou.aliyuncs.com/YOURIMAGE:v1 
               redis-server /etc/redis/redis.conf 
               --appendonly yes
2 连通性测试
import redis

local_ip = '172.17.0.1'

r = redis.Redis(host=local_ip, port=6379, decode_responses=True,password='YOURPASS')  
r.set('name', 'runoob')  # 设置 name 对应的值
print(r['name'])
print(r.get('name'))  # 取出键 name 对应的值
print(type(r.get('name')))  # 查看类型
---
runoob
runoob

--- 
#看起来都是字符型的
r.set('test_num', 123)  # 设置 test_num 对应的值
print(r['test_num'])
print(r.get('test_num'))  # 取出键 test_num 对应的值
print(type(r.get('test_num')))  # 查看类型

123
123

3 Redis消息队列

Redis提供了两种方式来作消息队列。一个是使用生产者消费模式模式【不用】,另外一个方法就是发布订阅者模式。前者会让一个或者多个客户端监听消息队列,一旦消息到达,消费者马上消费,谁先抢到算谁的,如果队列里没有消息,则消费者继续监听。后者也是一个或多个客户端订阅消息频道,只要发布者发布消息,所有订阅者都能收到消息,订阅者都是平等的。

Redis 本身是有一个 Redis 发布订阅 (pub/sub) 来实现消息队列的功能,但它有个缺点就是消息无法持久化,如果出现网络断开、Redis 宕机等,消息就会被丢弃。

简单来说发布订阅 (pub/sub) 可以分发消息,但无法记录历史消息。

最后采用Stream模式(Redis5.0以后支持)

主要参考的文章有两篇:

  • 1 Python Redis Stream 消息队列 消费组
  • 2 Redis-Python交互:流Stream

第一篇是从一个实现的角度给了一个例子,第二篇更全的介绍了功能。

3.1 业务需求

我计划做一个标准服务,这个服务会以批量的形式进行命名实体的识别。这个服务有三部分:

  • 1 一个外部接口服务,快速的将请求缓存到消息队列。
  • 2 一个以PM2(Projectmeta_V2)组织的流程集,处理原始数据,将识别结果缓存在output。
  • 3 一个流程集,将输出缓存送到目标数据库,获得确认后将所有的过程数据删除,释放磁盘空间。

输出:

  • 1 其中第一部分,需要做一个接口文档提供给使用者。
  • 2 第二部分则需要封装为一个项目文件夹,通过docker部署到本地(GPU)和云主机(CPU)。
  • 3 第三部分需要制定目标表的格式和空间使用/释放规则。

这里仅讨论第一部分的功能:可以瞬间接受大量的业务端请求

计划:

  • 1 使用Flask起一个web服务
  • 2 使用Redis的Stream作为消息队列
  • 3 收到数据请求后,Flask向Redis的队列里存数据,当队列满时告诉使用者

和Redis队列相关的部分,我希望:

  • 1 维持一个足够长的缓冲队列
  • 2 采用FIFO的方式消费
  • 3 当服务器因为其他原因down机可以自动恢复
  • 4 每次有固定的“消费上限”
  • 5 可以手动的销毁队列
  • 6 可以随时查看队列的状态
3.2 概念

以下是可能需要的一些概念

序号名称解释
1st_name流名称,也就是队列名称
2gp_name组名称, 现在对我来说意义不大
3cm_name消费者名称,目前我也只会有一个
4r.xinfo_groups(st_name)查看某个流的组信息(有几个消费者,pending多少,上一个id是多少)
5r.xinfo_stream(st_name)查看某个流的信息(队列长度,有几个组,第一个和最后一个记录)
6r.xadd给某个流增加数据,r.xadd(stream_name, {‘name’: ‘Billy’, ‘age’:16})
7r.xgroup_create为某个流创建组,r.xgroup_create(stream_name, group_name, id=0),# 0 从开始消费, $ 从尾部消费
8consumer_id自动编号是毫秒级时间戳加上序号,形如123-123。所以0-0是最小(且不存在)的,如果consumer_id =‘>’,表示取确认信息的下一条
9r.xlen(name)Stream内消息的长度

一些注意点:

  • 1 当重启时,先消费0-0再往后走(两次没有查询到结果的空是不一样的)
items = r.xreadgroup(group_name, consumer_name, {stream_name: consumer_id}, block=0, count=1) 
[['test1', []]]

consumer_id ='>'
# 当没有更新的值
items = r.xreadgroup(group_name, consumer_name, {stream_name: consumer_id}, block=None, count=1) 
[]

通过x_add来限制队列的最大长度

    if r.exists(stream_name):
        r.delete(stream_name)
    r1 = r.xadd(stream_name, {'name': 'jack'},maxlen=3, approximate=False)
    r2 = r.xadd(stream_name, {'name': 'Tom'},maxlen=3, approximate=False)
    r3 = r.xadd(stream_name, {'name': 'Will'},maxlen=3, approximate=False)
    r4 = r.xadd(stream_name, {'name': 'Billy', 'age':16},maxlen=3, approximate=False)
---
r.xinfo_stream(stream_name)
{'length': 3,
 'radix-tree-keys': 1,
 'radix-tree-nodes': 2,
 'last-generated-id': '1637571821993-0',
 'groups': 0,
 'first-entry': ('1637571821661-1', {'name': 'Tom'}),
 'last-entry': ('1637571821993-0', {'name': 'Billy', 'age': '16'})}
  • 2 block模式
    因为要面对多个队列,所以不要设置block为数字,否则会阻塞通道。

  • 3 查询有几个streams
    竟然没有找到一个一次性查询所有队列名称的,所以要增加一个查询streams的接口。在服务中维持一个当前列表集合,有新的消息过来的时候判断一下,如果不在集合里就扩充这个集合。当worker来查询streams名称集合的时候返回,并存一下盘。同时,还要提供一个接口手工删除某个队列(或者干脆清空)。

下面进行功能函数的模拟。

序号名称解释
1add_msg向队列里增加消息(扁平化字典), 参数 maxlen, approximate=False
2len_of_queue查询一个队列的消息长度
3get_msg从队列里取消息, group,consumer, stream, mid , count=n, block=None,
4make_a_group声明对一个队列的处理方式(FIFO或LIFO)
5get_a_stream_state获取一个stream的状态
6get_a_group_state获取一个group的状态
7ack_a_msg确认一条消息被消费
8del_a_stream如果队列存在就删掉
%%timeit
lq.add_msg(stream_name, some_data)
---
154 µs ± 4.87 µs per loop (mean ± std. dev. of 7 runs, 10000 loops each)

就算0.2ms寸一条记录吧,每秒可以存5000条消息。

尝试存100万条,一共花了148秒(增加了200M左右内存耗用,重置队列后很快释放)。

start = time.time()
for i in range(int(1e6)):
    lq.add_msg(stream_name, some_data, maxlen =int(1e6))
print(time.time() - start)

消息队列的操作对象我放在了这里介绍

以下模拟以下生产和消费的一个场景

生产:

current_q_set ={'a','b'}
lq_produce = LittleRQ('172.17.0.1',6379, password='YOURPASS' )
new_q = 'test2'
data_dict =  {'name': 'Ashley', 'age': '16'}

# 冲进15条消息,但是队列只允许最大10条
for i in range(15):
    lq_produce.add_msg(new_q, some_data, maxlen =int(10))

# 查看当前队列长度
lq_produce.len_of_queue(new_q)
10

# 如果队列是新的,为其分配组
if new_q not in current_q_set:
    lq_produce.ensure_group(new_q)

lq_produce.get_a_stream_groups(new_q)

[{'name': 'group1', 'consumers': 0, 'pending': 0, 'last-delivered-id': '0-0'}]

不太清楚stream认定消息pending的机制,我发现只要另起一个实例(消费者)去声明new_q(test2)时,队列里的所有消息立即变为pending。(用原来的实例去看也是如此)

消费:

lq_consumer =  LittleRQ('172.17.0.1',6379, password='YOURPASS' )

new_q = 'test2'
# 获取pendings msg
pending_msg =  lq_consumer.get_pending_msg(new_q)

for msg in pending_msg:
	# 处理逻辑: 打印
    print(msg[1])
    # 发起确认
    lq_consumer.ack_msg(new_q, msg[0])

# 没有pending了
lq_consumer.get_a_stream_groups(new_q)

[{'name': 'group1',
  'consumers': 1,
  'pending': 0,
  'last-delivered-id': '1637596315558-2'}]
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/588598.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号