栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

RabbitMq——主题(topic)交换机

RabbitMq——主题(topic)交换机

主题(topic)交换机:该交换机可以更好的以基于多个标准的路由发消息给指定队列。

其路由规则为:routingkey必须为单词列表,单词之间以点号分隔开,*号代表一个单词,#号可以替代零个或多个单词。

举例:

路由*.*.key1绑定队列Q1,如:red.blue.key1、black.green.key1等,通过这些路由都可以发送消息给队列Q1

路由#.key2.*绑定队列Q2,如:a.b.c.key2.red、orange.key2.blue、key2.red等,通过这些路由都可以发送消息给队列Q2

路由key3.#绑定队列Q2,如:key3.red.blue、key3、key3.green、key3.black.blue.red等,通过这些路由都可以将消息发送给Q2

若路由为:#,则该交换机功能与扇出交换机(fanout)功能相同;若路由没有#和*,则该交换机功能与直接交换机功(direct)能相同

案例:一个生产者,通过主题交换机发消息给两个消费者,绑定关系如上图

1、连接信道工具类,同时关闭linux防火墙和开启rabbitmq服务

//连接工厂,创建信道工具类
public class RabbitUtils {
    // 得到一个连接的 channel
    public static Channel getChannel() throws Exception {
        // 创建一个连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.23.111");
        factory.setUsername("user");
        factory.setPassword("123");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        return channel;
    }
}

2、生产者发送消息

        (1)连接信道,声明主题交换机;

        (2)从控制台输入消息,路由,发送消息。

public class Producer {

    public static final String TOPIC_EXCHANGE="topic_exchange";

    public static void main(String[] args) throws Exception {
        //获得信道
        Channel channel = RabbitUtils.getChannel();
        //声明主题交换机
        channel.exchangeDeclare(TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC);
        //发送消息
        Scanner scanner = new Scanner(System.in);
        while (true){
            System.out.print("请输入发送消息:");
            String message=scanner.nextLine();
            System.out.print("请输入发送路由:");
            String routingKey=scanner.nextLine();
            //开始发送消息
            channel.basicPublish(TOPIC_EXCHANGE,routingKey,null,message.getBytes("UTF-8"));
            System.out.println("生产者发送消息:"+message+",经过路由为:"+routingKey+"");
        }

    }
}

3、消费者接收消息

        (1)获取信道,声明队列;

        (2)绑定交换机与队列,绑定路由一定要是单词组;

        (3)调用basicConsumer(队列名,是否自动应答,接受成功回调,接受失败回调)方法接收消息。

public class Consumer1 {
    public static final String TOPIC_EXCHANGE="topic_exchange";

    public static void main(String[] args) throws Exception {
        //获得信道
        Channel channel = RabbitUtils.getChannel();
        //声明队列
        channel.queueDeclare("Q1",false,false,false,null);
        //绑定交换机与队列
        channel.queueBind("Q1",TOPIC_EXCHANGE,"*.*.key1");
        System.out.println("consumer1等待消息接收");
        //接收消息成功回调
        DeliverCallback deliverCallback=(delivery,message)->{
            System.out.println("consumer1收到路由为:"+message.getEnvelope().getRoutingKey()+"的消息:"+new String(message.getBody(),"UTF-8"));
        };
        //接收消息
        channel.basicConsume("Q1",true,deliverCallback,consumerTag->{});
    }
}
public class Consumer2 {
    public static final String TOPIC_EXCHANGE="topic_exchange";

    public static void main(String[] args) throws Exception {
        //获得信道
        Channel channel = RabbitUtils.getChannel();
        //声明队列
        channel.queueDeclare("Q2",false,false,false,null);
        //绑定交换机与队列
        channel.queueBind("Q2",TOPIC_EXCHANGE,"#.key2.*");
        channel.queueBind("Q2",TOPIC_EXCHANGE,"key3.#");
        System.out.println("consumer1等待消息接收");
        //接收消息成功回调
        DeliverCallback deliverCallback=(delivery,message)->{
            System.out.println("consumer2收到路由为:"+message.getEnvelope().getRoutingKey()+"的消息:"+new String(message.getBody(),"UTF-8"));
        };
        //接收消息
        channel.basicConsume("Q2",true,deliverCallback,consumerTag->{});
    }
}

4.测试

发送消息:”你好,consumer1“,路由:red.green.key1,消费者consumer1接受到消息;

发送消息:”你好,consumer2“,路由:red.green.key2.black,消费者consumer2接收到消息;

发送消息:“订单支付成功”,路由:key3.red,消费者consumer2接收到消息;

发送消息:”结算失败“,路由:red.key3.black,消息丢失;

生产者:

消费者consumer1:

消费者consumer2: 

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/762155.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号