栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

【python3】 【rabbitmq工具类】【创建、删除、绑定】【queue、exchange】

【python3】 【rabbitmq工具类】【创建、删除、绑定】【queue、exchange】

Background

网上查找到关于python操作rabbitmq的,大多是怎么发布、怎么订阅,关于queue和exchange的创建及绑定介绍的不多,这里总结记录下哈。这样在程序里创建会方便点,省得使用shell命令或者去web界面手动创建了。

源码RabbitmqUtil
class RabbitmqUtil:
    """Rabbitmq工具类"""
    connection = None
    channel = None

    def __init__(self, host, vhost, username, password, port=5672):
        try:
            credential = pika.PlainCredentials(username, password)
            self.connection = pika.BlockingConnection(
                pika.ConnectionParameters(host, port, vhost, credential, heartbeat=0))
            self.channel = self.connection.channel()
        except():
            print("rabbitmq init error, please check the config")

    def close(self):
        """关闭连接"""
        if self.connection:
            self.connection.close()
        else:
            print("connection already disconnected")

    def bind_queue_exchange(self, queue, exchange, routing_key):
        """绑定queue和exchange"""
        self.channel.queue_bind(exchange=exchange, queue=queue, routing_key=routing_key)

    def new_queue(self, queue):
        """声明queue,如不存在,则创建"""
        self.channel.queue_declare(queue=queue, durable=True, arguments={'x-message-ttl': 259200000})

    def del_queue(self, queue):
        """Delete the queue"""
        self.channel.queue_delete(queue)

    def new_exchange(self, exchange):
        """声明exchange,如不存在,则创建"""
        self.channel.exchange_declare(exchange=exchange, durable=True, exchange_type='topic')

    def del_exchange(self, exchange):
        """Delete the exchange"""
        self.channel.exchange_delete(exchange=exchange)

    def callback(self, body):
        """接收处理消息的回调函数"""
        super()
        print(str(body).replace('b', '').replace(''', ''))

    def public_msg(self, exchange, routing_key, json):
        """发布消息"""
        self.channel.basic_publish(exchange=exchange, routing_key=routing_key, body=json)

    def consume_msg(self, queue):
        """订阅消息"""
        self.channel.basic_consume(queue, self.callback, True)
        self.channel.start_consuming()
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/630326.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号