栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Python

python 使用rabbitMQ一个简单示例

Python 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

python 使用rabbitMQ一个简单示例

一、安装包
pip install pika
二、生产者代码
import json

import pika

# 验证 用户名和密码
credentials = pika.PlainCredentials('admin', 'admin')
# 创建连接 virtual_host: rabbitMQ 使用的虚拟主机(一个broker可以有多个,对不同用户进行权限分离)
conn = pika.BlockingConnection(pika.ConnectionParameters(host='47.93.210.248', port=5672, virtual_host='/', credentials=credentials))
# 建立一个channel
chan = conn.channel()
# 创建一个队列
chan.queue_declare(queue='active')


def encode_msg(msg):
    """格式化消息"""
    return json.dumps(msg)


while True:
    # 便于测试
    msg = input('msg: ')

    if msg == 'quit':
        break
    # 发送消息 exchange: 把消息发布到指定交换机, 通过这个交换机转发给消费者; 可以不指定
    # exchange 可以在后台创建
    chan.basic_publish(exchange='active_topic', routing_key='active', body=encode_msg(msg))

conn.close()
二、消费者代码
import json

import pika


"""
多个消费者的情况下,采用的是轮训机制依次转发给每一个消费者
"""
# 验证
credentials = pika.PlainCredentials('admin', 'admin')
# 创建连接 virtual_host rabbitMQ 使用的虚拟主机(可以有多个,对不同用户进行权限分离)
conn = pika.BlockingConnection(pika.ConnectionParameters(host='47.93.210.248', port=5672, virtual_host='/', credentials=credentials))
# 建立一个channel
chan = conn.channel()
# 创建一个队列(生成者没有指定 exchange), 如果确定已经创建了, 可以不在创建
# chan.queue_declare(queue='active')
# 生产者指定exchange, 要绑定队列和exchange, 不绑定,exchange不知道把消息转发给那个队列
chan.queue_bind(queue='active', exchange='active_topic')


def callback(chan, method, properties, body):
    """
    消息处理函数 4个参数是固定的
    :param chan: channel对象
    :param method: 交付信息
        包含了:
            consumer_tag: 消费者的标识
            delivery_tag: 消息的索引从1开始
            exchange: 指定的exchange
            redelivered: 是不是重复接收的消息
            routing_key: 队列名称
    :param properties: 消息属性
    :param body: 消息
    :return:
    """
    print('收到', json.loads(body))
    # 手动确认
    chan.basic_ack(delivery_tag=method.delivery_tag)


# auto_ack 默认是False 不给生产者发确认消息(重启consume时会按顺序读取), 如果设置自动确认, 宕机消息就丢了. 可以手动确认
chan.basic_consume(queue='active', on_message_callback=callback)

# 开始监听
chan.start_consuming()

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/850735.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号