栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

mq 的Publish/Subscribe 模式

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

mq 的Publish/Subscribe 模式

一个交换机绑定多个队列,使用交换机使用fanout 类型,那么会发给与之绑定的所有队列。路由key 一定要为空字符串

代码如下:
Publish

import pika
from pika.exchange_type import ExchangeType

class Producer(object):
    def __init__(self, queue_name,exchange_name, username, password, host, port, virtual_host):
        con_param = {
            "host": host,
            "port": port,
            "virtual_host": virtual_host,
            "credentials": pika.credentials.PlainCredentials(
                username, password)
        }
        # 建立连接
        self.con = pika.BlockingConnection(pika.ConnectionParameters(**con_param))
        # 声明队列
        self.channel = self.con.channel()
        self.channel.queue_declare(queue=queue_name)
        self.channel.exchange_declare(exchange=exchange_name, exchange_type=ExchangeType.fanout)

    def send_message(self,queue_name,exchange_name,routing_key, body):
        """fanout 类型的交换机 routing_key 为空字符串,给所有绑定这个交换价的队列发送消息"""
        # 绑定交换机
        self.channel.queue_bind(queue=queue_name, exchange=exchange_name, routing_key="")
        # 发送消息
        self.channel.basic_publish(exchange=exchange_name, routing_key=routing_key, body=body)
        # 关闭通道
        self.channel.close()
        # 关闭连接
        self.con.close()


if __name__ == '__main__':
    p = Producer("test", "logs","tom", "tom@tom", "localhost", 5672, "/afei")
    p.send_message("test", "logs","","have a good time")
    p1 = Producer("test01", "logs","tom", "tom@tom", "localhost", 5672, "/afei")
    p1.send_message("test01","logs","","good luck !")

Subscribe

import pika


class Consumer(object):

    def __init__(self, queue_name, username, password, host, port, virtual_host):
        con_param = {
            "host": host,
            "port": port,
            "virtual_host": virtual_host,
            "credentials": pika.credentials.PlainCredentials(
                username, password)
        }

        # 建立连接
        self.con = pika.BlockingConnection(pika.ConnectionParameters(**con_param))
        # 创建通道
        self.channel = self.con.channel()
        self.queue_name = queue_name

    def consume_message(self):
        def callback(ch, method, properties, body):
            print("ch===%r" % ch)
            print("method===%r" % method)
            print("properties===%r" % properties)
            print("[x] Received %r" % body)

        # 消费对象
        self.channel.basic_consume(queue=self.queue_name, on_message_callback=callback, auto_ack=True)
        # 开始消费
        self.channel.start_consuming()
        self.channel.close()


if __name__ == '__main__':
    try:
        c = Consumer("test", "tom", "tom@tom", "localhost", 5672, "/afei")
        c.consume_message()
    except KeyboardInterrupt:
        exit(0)
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/708703.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号