栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

消息中间件RabbitMQ

消息中间件RabbitMQ

RabbitMQ引言 解决的问题

    能解决模块之间的耦合度高,导致一个模块宕机后,后续全部功能都不能用了的问题,达到解耦的作用。能解决同步通讯的时间成本高的问题,达到异步通讯的作用,提升客户的体验。能解决高并发导致系统压力过大的问题,达到流量削峰的作用,减轻服务器压力。
RabbitMQ介绍

市面上比较火爆的几款MQ:ActiveMQ,RocketMQ,Kafka,RabbitMQ。对比如下:

    语言的支持:ActiveMQ,RocketMQ只支持JAVA语言,Kafka,RabbitMQ支持多门语言;效率方面:ActiveMQ,RocketMQ,Kafka效率都是毫秒级别的,RabbitMQ是微秒级别的;针对消息丢失,消息重复的问题它们四种都有其各自一套解决方法:RabbitMQ针对消息的持久化和重复性问题都有比较成熟的解决方案;学习成本:RabbitMQ非常简单,简单到令人发指。

RabbitMQ效率高的原因:RabbitMQ是基于erlang开发,erlong有一个特点叫面向并发编程,所以并发能力很强,性能极好。

RabbitMQ最初是由Rabbit公司研发和维护的,最终是在Pivotal公司维护的,springboot就是Pivotal公司研发的。RabbitMQ严格遵守高效消息队列协议——AMQP协议,帮助我们在进程之间传递异步消息。
详细介绍请看

RabbitMQ架构

    Publisher:生产者,发布消息到RabbitMQ中的ExchangeConsumer:消费者,监听RabbitMQ中的Queue中的消息Exchange:交换机,和生产者建立连接并接收生产者的消息Queue:队列,Exchange会将消息分发到指定的Queue,Queue和消费者进行交互Routes:路由,交换机以什么样的策略将消息发布到Queue
RabbitMQ的使用 RabbitMQ的通讯方式



Java连接RabbitMQ
    创建Maven项目导入依赖
	
        
        
            com.rabbitmq
            amqp-client
            5.9.0
        

        
            junit
            junit
            4.12
        
        
            junit
            junit
            4.12
            compile
        

    
    创建工具类,连接RabbitMQ
public class RabbitMQClient {

    public static Connection getConnection() {
        // 创建Connection工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");
        factory.setPort(5672);
        factory.setUsername("test");
        factory.setPassword("test");
        factory.setVirtualHost("/test");

        // 创建Connection
        Connection connection = null;
        try {
            connection = factory.newConnection();
        } catch (Exception e) {
            e.printStackTrace();
        }

        // 返回Connection
        return connection;
    }

}
    创建测试类,连接RabbitMQ
public class Demo1 {

    @Test
    public void getConnection() throws IOException {
        // 获取Connection
        Connection connection = RabbitMQClient.getConnection();

        // 其他操作
        System.in.read();

        // 关闭Connection
        connection.close();
    }
}

连接成功,RabbitMQ控制台会有一个Connection,如下图:

通讯方式一:Hello-World

一个生产者,一个默认的交换机,一个队列,一个消费者

    创建生产者,创建一个channel,发布消息到exchange,指定路由规则,发送到队列
public class Publisher {

    @Test
    public void publish() throws Exception{
        // 1.获取连接Connection
        Connection connection = RabbitMQClient.getConnection();

        // 2.创建Channel
        Channel channel = connection.createChannel();

        // 3.发布消息到Exchange,同时指定路由的规则
        
        String msg = "我要成为高级开发工程师!!!";
        channel.basicPublish("","HelloWorld",null,msg.getBytes(StandardCharsets.UTF_8));
        //Ps: exchange是不会帮你将消息持久化到本地的,Queue才会帮你持久化消息

        System.out.println("生产者发布消息成功!");
        // 释放资源
        channel.close();
        connection.close();
    }
}
    创建消费者,创建一个channel,创建一个队列,并且去消费当前队列。
public class Consumer {

    @Test
    public void consume() throws Exception {
        // 1. 获取连接对象
        Connection connection = RabbitMQClient.getConnection();

        // 2. 创建channel
        Channel channel = connection.createChannel();

        // 3. 声明队列-HellWorld
        
        channel.queueDeclare("HelloWorld",true,false,false,null);

        // 4. 开启监听Queue,并指定消费者consumer
        
        channel.basicConsume("HelloWorld",true,new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("接受到的消息是:" + new String(body,"UTF-8"));
            }
        });

        System.out.println("消费者开始监听消息...");
        // 实时监听,保证程序不能停止
        System.in.read();

        // 5. 释放资源
        channel.close();
        connection.close();
    }
}
RabbitMQ通讯方式二:Work

只需要在消费者端,添加Qos能力以及更改为手动ack,即可让消费者根据自己的能力去消费指定队列中的消息,而不是默认情况下由RabbitMQ平均分配了。
示例:一个生产者,一个默认的交换机,一个队列,两个消费者

    创建生产者,创建一个channel,发布消息到exchange,指定路由规则,发送到队列
public class Publisher {

    @Test
    public void publish() throws Exception{
        // 1.获取连接Connection
        Connection connection = RabbitMQClient.getConnection();

        // 2.创建Channel
        Channel channel = connection.createChannel();

        // 3.发布消息到Exchange,同时指定路由的规则
        for (int i = 1; i <= 10; i++) {
            String msg = i + ". 我要成为高级开发工程师!!!";
            channel.basicPublish("","Work",null,msg.getBytes(StandardCharsets.UTF_8));
        }

        System.out.println("生产者发布消息成功!");
        // 释放资源
        channel.close();
        connection.close();
    }
}
    创建消费者1,创建一个channel,创建一个队列,设置消费能力-Qos,去消费当前队列,并关闭自动ACK,打开手动ACK。
public class Consumer1 {

    @Test
    public void consume() throws Exception {
        // 1. 获取连接对象
        Connection connection = RabbitMQClient.getConnection();

        // 2. 创建channel
        Channel channel = connection.createChannel();

        // 3. 声明队列-Work
        channel.queueDeclare("Work", true, false, false, null);

        // 指定当前消费者一次消费多少个消息
        channel.basicQos(1);

        // 指定消费者每次消费多少个消息后,需要将自动ack关闭,消费者需要手动ack
        // 4. 开启监听Queue,并指定消费者consumer
        channel.basicConsume("Work", false, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("消费者1号接受到的消息是:" + new String(body, "UTF-8"));

                // 手动ack
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        });

        System.out.println("消费者开始监听消息...");
        // 实时监听,保证程序不能停止
        System.in.read();

        // 5. 释放资源
        channel.close();
        connection.close();
    }

}
    创建消费者2,创建一个channel,创建一个队列,设置消费能力-Qos,去消费当前队列,并关闭自动ACK,打开手动ACK。
public class Consumer2 {

    @Test
    public void consume() throws Exception {
        // 1. 获取连接对象
        Connection connection = RabbitMQClient.getConnection();

        // 2. 创建channel
        Channel channel = connection.createChannel();

        // 3. 声明队列-Work
        channel.queueDeclare("Work", true, false, false, null);

        // * 指定当前消费者一次消费多少个消息
        channel.basicQos(2);

        // 指定消费者每次消费多少个消息后,* 需要将自动ack关闭,消费者需要手动ack
        // 4. 开启监听Queue,并指定消费者consumer
        channel.basicConsume("Work", false, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("消费者2号接受到的消息是:" + new String(body, "UTF-8"));

                // * 手动ack
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        });

        System.out.println("消费者开始监听消息...");
        // 实时监听,保证程序不能停止
        System.in.read();

        // 5. 释放资源
        channel.close();
        connection.close();
    }

}
通讯方式三:Publish/Subscribe

基于Work在生产者中声明Fanout类型的exchange,并且可以将exchange和多个queue绑定在一起,绑定的方式是直接绑定。
示例:一个生产者,一个交换机,两个队列,两个消费者。

    创建生产者,创建一个Fanout类型的channel,发布消息到exchange,指定路由规则,发送到队列
public class Publisher {

    @Test
    public void publish() throws Exception {
        // 1.获取连接Connection
        Connection connection = RabbitMQClient.getConnection();

        // 2.创建Channel
        Channel channel = connection.createChannel();

        // 3.创建exchange - * 绑定某些队列
        
        channel.exchangeDeclare("pubsub-exchange", BuiltinExchangeType.FANOUT);
        channel.queueBind("pubsub-queue1", "pubsub-exchange", "");
        channel.queueBind("pubsub-queue2", "pubsub-exchange", "");

        // 4.发布消息到Exchange,同时指定路由的规则
        for (int i = 1; i <= 10; i++) {
            String msg = i + ". 我要成为高级开发工程师!!!";
            channel.basicPublish("pubsub-exchange", "", null, msg.getBytes(StandardCharsets.UTF_8));
        }

        System.out.println("生产者发布消息成功!");
        // 5.释放资源
        channel.close();
        connection.close();
    }

}
    创建消费者1,创建一个channel,创建一个队列,指定监听的队列,去消费当前队列
public class Consumer1 {

    @Test
    public void consume() throws Exception {
        // 1. 获取连接对象
        Connection connection = RabbitMQClient.getConnection();

        // 2. 创建channel
        Channel channel = connection.createChannel();

        // 3. 声明队列:pubsub-queue1
        channel.queueDeclare("pubsub-queue1", true, false, false, null);

        // 4. 开启监听Queue,并指定消费者consumer
        channel.basicConsume("pubsub-queue1", true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者1号接受到的消息是:" + new String(body, "UTF-8"));
            }
        });

        System.out.println("消费者开始监听消息...");
        // 实时监听,保证程序不能停止
        System.in.read();

        // 5. 释放资源
        channel.close();
        connection.close();
    }

}
    创建消费者2,创建一个channel,创建一个队列,指定监听的队列,去消费当前队列
public class Consumer2 {

    @Test
    public void consume() throws Exception {
        // 1. 获取连接对象
        Connection connection = RabbitMQClient.getConnection();

        // 2. 创建channel
        Channel channel = connection.createChannel();

        // 3. 声明队列:pubsub-queue2
        channel.queueDeclare("pubsub-queue2", true, false, false, null);

        // 4. 开启监听Queue,并指定消费者consumer
        channel.basicConsume("pubsub-queue2", true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者2号接受到的消息是:" + new String(body, "UTF-8"));
            }
        });

        System.out.println("消费者开始监听消息...");
        // 实时监听,保证程序不能停止
        System.in.read();

        // 5. 释放资源
        channel.close();
        connection.close();
    }

}
通讯方式四:Routing

生产者的channel绑定多个队列,并指定RoutingKey,发送消息时指定RoutingKey,就会将消息发送到绑定相同的Routing的队列上,然后消费者监听哪个队列就会收到哪个队列上的消息了。
示例:一个生产者,一个默认交换机,两个队列,三个路由,两个消费者

    创建生产者,创建一个channel,绑定上两个队列,一个队列指定一个路由,一个队列指定两个路由
public class Publisher {

    @Test
    public void publish() throws Exception {
        // 1.获取连接Connection
        Connection connection = RabbitMQClient.getConnection();

        // 2.创建Channel
        Channel channel = connection.createChannel();

        // 3.创建exchange - 绑定某一个队列
        
        channel.exchangeDeclare("routing-exchange", BuiltinExchangeType.DIRECT);
        channel.queueBind("routing-queue-error", "routing-exchange", "ERROR");
        channel.queueBind("routing-queue-error", "routing-exchange", "DEBUG");
        channel.queueBind("routing-queue-info", "routing-exchange", "INFO");

        // 4.发布消息到Exchange,同时指定路由的规则
        String msg = "我要成为高级开发工程师!!!";
        channel.basicPublish("routing-exchange", "INFO", null, msg.getBytes(StandardCharsets.UTF_8));
        channel.basicPublish("routing-exchange", "INFO", null, msg.getBytes(StandardCharsets.UTF_8));
        channel.basicPublish("routing-exchange", "INFO", null, msg.getBytes(StandardCharsets.UTF_8));
        msg = msg + "实现失败!";
        channel.basicPublish("routing-exchange", "ERROR", null, msg.getBytes(StandardCharsets.UTF_8));
        msg = msg + "这是一个BUG!";
        channel.basicPublish("routing-exchange", "DEBUG", null, msg.getBytes(StandardCharsets.UTF_8));

        System.out.println("生产者发布消息成功!");
        // 5.释放资源
        channel.close();
        connection.close();
    }

}
    创建消费者1,创建一个channel,绑定其中一个的队列,监听队列,消费消息
public class Consumer1 {

    @Test
    public void consume() throws Exception {
        // 1. 获取连接对象
        Connection connection = RabbitMQClient.getConnection();

        // 2. 创建channel
        Channel channel = connection.createChannel();

        // 3. 声明队列:pubsub-queue1
        channel.queueDeclare("routing-queue-error", true, false, false, null);

        // 4. 开启监听Queue,并指定消费者consumer
        channel.basicConsume("routing-queue-error", true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者ERROR接受到的消息是:" + new String(body, "UTF-8"));
            }
        });

        System.out.println("消费者开始监听消息...");
        // 实时监听,保证程序不能停止
        System.in.read();

        // 5. 释放资源
        channel.close();
        connection.close();
    }

}
    创建消费者2,创建一个channel,绑定另一个的队列,监听队列,消费消息
public class Consumer2 {

    @Test
    public void consume() throws Exception {
        // 1. 获取连接对象
        Connection connection = RabbitMQClient.getConnection();

        // 2. 创建channel
        Channel channel = connection.createChannel();

        // 3. 声明队列:pubsub-queue2
        channel.queueDeclare("routing-queue-info", true, false, false, null);

        // 4. 开启监听Queue,并指定消费者consumer
        channel.basicConsume("routing-queue-info", true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者INFO接受到的消息是:" + new String(body, "UTF-8"));
            }
        });

        System.out.println("消费者开始监听消息...");
        // 实时监听,保证程序不能停止
        System.in.read();

        // 5. 释放资源
        channel.close();
        connection.close();
    }

}
通讯方式五:Topic

生产者的channel绑定多个队列,并指定RoutingKey,这里指定RoutingKey可以使用通配符‘#’和占位符‘’,发送消息时指定适配的RoutingKey,就会将消息发送到绑定适配的Routing的队列上,然后消费者监听哪个队列就会收到哪个队列上的消息了。
示例:一个生产者,一个默认交换机,八个队列,使用通配符 ‘#’ 和占位符 ‘
’ 的路由,两个消费者

    创建生产者,创建一个channel,绑定上八个队列,一个队列指定一种路由
public class Publisher {
    // 交换机名称
    private static final String exchangeName = "topic-exchange";

    @Test
    public void publish() throws Exception {
        // 1.获取连接Connection
        Connection connection = RabbitMQClient.getConnection();

        // 2.创建Channel
        Channel channel = connection.createChannel();

        // 3.创建exchange - 绑定某一个队列
        
        channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC);
        // topic与Routing不同的是:指定RoutingKey可以使用通配符#,占位符*
        channel.queueBind("topic-queue1", exchangeName, "order.*");
        channel.queueBind("topic-queue2", exchangeName, "order.#");
        channel.queueBind("topic-queue3", exchangeName, "*.order");
        channel.queueBind("topic-queue4", exchangeName, "#.order");
        channel.queueBind("topic-queue5", exchangeName, "#.order.*");
        channel.queueBind("topic-queue6", exchangeName, "*.order.*");
        channel.queueBind("topic-queue7", exchangeName, "#.order.#");
        channel.queueBind("topic-queue8", exchangeName, "*.order.#");

        // 4.发布消息到Exchange,同时指定路由的规则
        String msg = "我要成为高级开发工程师!!!";
        channel.basicPublish(exchangeName, "order.item", null, msg.getBytes(StandardCharsets.UTF_8));
        channel.basicPublish(exchangeName, "macbook.order", null, msg.getBytes(StandardCharsets.UTF_8));
        channel.basicPublish(exchangeName, "macbook.order.item", null, msg.getBytes(StandardCharsets.UTF_8));

        System.out.println("生产者发布消息成功!");
        // 5.释放资源
        channel.close();
        connection.close();
    }

}
    创建八个消费者分别监听这个八个队列
public class Consumer1 {
    // 队列名称
    private static final String queueName = "topic-queue1";

    @Test
    public void consume() throws Exception {
        // 1. 获取连接对象
        Connection connection = RabbitMQClient.getConnection();

        // 2. 创建channel
        Channel channel = connection.createChannel();

        // 3. 声明队列:pubsub-queue1
        channel.queueDeclare(queueName, true, false, false, null);

        // 4. 开启监听Queue,并指定消费者consumer
        channel.basicConsume(queueName, true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者ERROR接受到的消息是:" + new String(body, "UTF-8"));
            }
        });

        System.out.println("消费者开始监听消息...");
        // 实时监听,保证程序不能停止
        System.in.read();

        // 5. 释放资源
        channel.close();
        connection.close();
    }

}

此处就创建一个,其余七个一样,需要改一下queueName和类名。

测试结果:
topic-queue1 收到 1 条消息;
topic-queue2 收到 1 条消息;
topic-queue3 收到 3 条消息;
topic-queue4 收到 2 条消息;
topic-queue5 收到 3 条消息;
topic-queue6 收到 2 条消息;
topic-queue7 收到 3 条消息;
topic-queue8 收到 2 条消息;

测试结果出人意料:为什么topic-queue3会收到三条消息?

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/760940.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号