栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

RabbitMq------路由模式

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

RabbitMq------路由模式

         官方文档:文档:目录 — RabbitMQ

      生产者按routing key发送消息,不同的消费者端按不同的routing key接收消息。

       路由模式消费者端和发布订阅模式消费者端的区别:

1、exchange的type为direct

2、发送消息的时候加入了routing key

配置类

package com.xmx;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class ConnectionUtil {
    public static Connection getConnection() throws Exception{
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");
        factory.setVirtualHost("/j98");//虚拟主机名
        factory.setUsername("jianzi");//账号
        factory.setPassword("jianzi");//密码
        //创建连接
        Connection newConnection = factory.newConnection();
        return newConnection;
    } 
}

 Producer:消息发送者

package com.xmx;



import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

import java.util.Random;


public class Producter4 {
    private final static String EXCHANGE_NAME = "topic_exchange";

    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        //声明交换机Exchange类型为direct
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");
        for (int i = 1; i <= 10; i++) {
            int random = (new Random()).nextInt(3) + 1;//1-3
            if (random == 1) {
                //发布消息3种routingKey的消息
                String message = "hello info";
                channel.basicPublish(EXCHANGE_NAME, "info", null, message.getBytes());
                System.out.println("路由模式发布info消息:" + message);
            } else if (random == 2) {
                String message = "hello warning";
                channel.basicPublish(EXCHANGE_NAME, "warning", null, message.getBytes());
                System.out.println("路由模式发布warning消息:" + message);
            } else {
                String message = "hello error";
                channel.basicPublish(EXCHANGE_NAME, "error", null, message.getBytes());
                System.out.println("路由模式发布error消息:" + message);
            }
        }
        channel.close();
        connection.close();
    }
}



Consumer:消费者1

package com.xmx;

import com.rabbitmq.client.*;

import java.io.IOException;


public class Customer4 {
    private final static String QUEUE_NAME = "publishSubscrible_queue1";
    private final static String EXCHANGE_NAME = "topic_exchange";
    public static void main(String[] args) throws Exception{
        Connection connection = ConnectionUtil.getConnection();
        final Channel channel = connection.createChannel();
        //声明交换机
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");
        //申明队列
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);

        //队列绑定交换机,指定路由routingKey
        //结束路由routingKey为info和warning的消息
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "info");
//        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "warning");

        //同一时刻服务器只发送1条消息给消费者(能者多劳,消费消息快的,会消费更多的消息)
        //保证在接收端一个消息没有处理完时不会接收另一个消息,即消费者端发送了ack后才会接收下一个消息。
        //在这种情况下生产者端会尝试把消息发送给下一个空闲的消费者。
        channel.basicQos(1);
        //声明消费者
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("路由模式 消费者1 消费消息:"+message);
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        channel.basicConsume(QUEUE_NAME, false, consumer);
    }
}


Consumer:消费者2

消费者2与消费者1不同的地方是绑定的队列不同

package com.xmx;

import com.rabbitmq.client.*;

import java.io.IOException;

//路由模式
public class Customer4 {
    private final static String QUEUE_NAME2 = "publishSubscrible_queue2";
    private final static String EXCHANGE_NAME = "topic_exchange";
    public static void main(String[] args) throws Exception{
        Connection connection = ConnectionUtil.getConnection();
        final Channel channel = connection.createChannel();
        //声明交换机
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");
        //申明队列
        channel.queueDeclare(QUEUE_NAME2, true, false, false, null);

        //队列绑定交换机,指定路由routingKey
        channel.queueBind(QUEUE_NAME2, EXCHANGE_NAME, "warning");


        //同一时刻服务器只发送1条消息给消费者(能者多劳,消费消息快的,会消费更多的消息)
        //保证在接收端一个消息没有处理完时不会接收另一个消息,即消费者端发送了ack后才会接收下一个消息。
        //在这种情况下生产者端会尝试把消息发送给下一个空闲的消费者。
        channel.basicQos(1);
        //声明消费者
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("路由模式 消费者2 消费消息:"+message);
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        channel.basicConsume(QUEUE_NAME2, false, consumer);
    }
}


转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/709111.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号