栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

RabbitMQ

RabbitMQ

RabbitMQ
  • 一、消息中间件概述
    • 1、MQ 概述
    • 2、MQ 的优势
      • 2.1 应用解耦
      • 2.2 任务异步处理
      • 2.3 削峰填谷
    • 3、MQ 的劣势
    • 4、常见的 MQ 产品
    • 5、AMQP 和 JMS
      • 5.1 AMQP
      • 5.2 JMS
      • 5.3 AMQP 与 JMS 区别
    • 6、RabbitMQ
  • 二、安装及配置 RabbitMQ
    • 1、安装依赖环境
    • 2、安装 Erlang
    • 3、安装 RabbitMQ
    • 4、开启管理界面及配置
    • 5、启动
    • 6、配置虚拟主机及用户
      • 6.1 用户角色
      • 6.2 Virtual Hosts 配置
        • 6.2.1 创建 Virtual Hosts
        • 6.2.2 设置 Virtual Hosts 权限
  • 三、RabbitMQ 入门
    • 1、搭建示例工程
      • 1.1 创建工程
      • 1.2 添加依赖
    • 2、编写生产者
    • 3、编写消费者
    • 4、小结
  • 四、RabbitMQ 工作模式
    • 1、Work queues 工作队列模式
      • 1.1 模式说明
      • 1.2 代码
        • 1)生产者
        • 2)消费者1
        • 3)消费者2
      • 1.3 测试
      • 1.4 小结
    • 2、订阅模式概述
    • 3、Publish/Subscribe 发布与订阅模式
      • 3.1 模式说明
      • 3.2 代码
        • 1)生产者
        • 2)消费者1
        • 3)消费者2
      • 3.3 测试
      • 3.4 小结
    • 4、Routing 路由模式
      • 4.1 模式说明
      • 4.2 代码
        • 1)生产者
        • 2)消费者1
        • 3)消费者2
      • 4.3 测试
      • 4.4 小结
    • 5、Topics 通配符模式
      • 5.1 模式说明
      • 5.2 代码
        • 1)生产者
        • 2)消费者1
        • 3)消费者2
      • 5.3 测试
      • 5.4 小结
    • 6、模式总结
  • 五、Spring 整合 RabbitMQ
    • 1、搭建生产者工程
      • 1.1 创建工程
      • 1.2 添加依赖
      • 1.3 配置整合
      • 1.4 发送消息
    • 2、搭建消费者工程
      • 2.1 创建工程
      • 2.2 添加依赖
      • 2.3 配置整合
      • 2.4 消息监听器
        • 1)队列监听器
        • 2)广播监听器1
        • 3)广播监听器2
        • 4)星号通配符监听器
        • 5)井号通配符监听器
        • 6)井号通配符监听器2
  • 六、Spring Boot 整合 RabbitMQ
    • 1、简介
    • 2、搭建生产者工程
      • 2.1 创建工程
      • 2.2 添加依赖
      • 2.3 启动类
      • 2.4 配置 RabbitMQ
        • 1)配置文件
        • 2)绑定交换机和队列
      • 2.5 测试
    • 3、搭建消费者工程
      • 3.1 创建工程
      • 3.2 添加依赖
      • 3.3 启动类
      • 3.4 配置 RabbitMQ
      • 3.5 消息监听处理类
  • 七、高级特性
    • 1、消息的可靠投递
      • 1.1 确认模式
      • 1.2 退回模式
    • 2、Consumer Ack
    • 3、消费端限流
    • 4、TTL
    • 5、死信队列
    • 6、延迟队列

一、消息中间件概述 1、MQ 概述
  • MQ 全称 Message Queue(消息队列),是在消息的传输过程中保存消息的容器。多用于分布式系统之间进行通信。

应用之间的远程调用:

加入 MQ 后应用之间的调用:

2、MQ 的优势 2.1 应用解耦
  • MQ 相当于一个中介,生产方通过 MQ 与消费方交互,它将应用程序进行解耦合。

系统的耦合性越高,容错性就越低,可维护性就越低:

使用 MQ 使得应用间解耦,提升容错性和可维护性:

2.2 任务异步处理
  • 将不需要同步处理的并且耗时长的操作由消息队列通知消息接收方进行异步处理。提高了应用程序的响应时间。

无 MQ:

一个下单操作耗时:20 + 300 + 300 + 300 = 920ms 。
用户点击完下单按钮后,需要等待 920ms 才能得到下单响应,太慢!

有 MQ:

用户点击完下单按钮后,只需等待 25ms 就能得到下单响应 (20 + 5 = 25ms)。
提升用户体验和系统吞吐量(单位时间内处理请求的数目)。
2.3 削峰填谷
  • 如订单系统,在下单的时候就会往数据库写数据。但是数据库只能支撑每秒1000左右的并发写入,并发量再高就容易宕机。低峰期的时候并发也就100多个,但是在高峰期时候,并发量会突然激增到5000以上,这个时候数据库肯定卡死了。
  • 消息被 MQ 保存起来了,然后系统就可以按照自己的消费能力来消费,比如每秒1000个消息,这样慢慢写入数据库,这样就不会卡死数据库了。
  • 但是使用了 MQ 之后,限制消费消息的速度为 1000,但是这样一来,高峰期产生的数据势必会被积压在 MQ 中,高峰就被“削”掉了。但是因为消息积压,在高峰期过后的一段时间内,消费消息的速度还是会维持在 1000QPS ,直到消费完积压的消息,这就叫做“填谷”。
3、MQ 的劣势
  1. 系统可用性降低
    系统引入的外部依赖越多,系统稳定性越差。一旦 MQ 宕机,就会对业务造成影响。如何保证 MQ 的高可用?
  2. 系统复杂度提高
    MQ 的加入大大增加了系统的复杂度,以前系统间是同步的远程调用,现在是通过 MQ 进行异步调用。如何保证消息没有被重复消费?怎么处理消息丢失情况?那么保证消息传递的顺序性?
  3. 一致性问题
    A 系统处理完业务,通过 MQ 给 B、C、D 三个系统发消息,如果 B 系统、C 系统处理成功,D 系统处理失败。如何保证消息数据处理的一致性?
4、常见的 MQ 产品
  • 目前业界有很多的 MQ 产品,例如 RabbitMQ、RocketMQ、ActiveMQ、Kafka、ZeroMQ、metaMq 等,也有直接使用 Redis 充当消息队列的案例,而这些消息队列产品,各有侧重,在实际选型时,需要结合自身需求及 MQ 产品特征,综合考虑。
RabbitMQActiveMQRocketMQKafka
公司/社区RabbitApache阿里Apache
开发语言ErlangJavaJavaScala & Java
协议支持AMQP,XMPP,SMTP,STOMPOpenWire ,STOMP,REST,XMPP,AMQP自定义自定义协议,社区封装了 http 协议支持
客户端支持语言官方支持 Erlang,Java,Ruby 等,社区产出多种 API ,几乎支持所有语言Java,C,C++,Python,PHP,Perl,.net 等Java,C++(不成熟)官方支持 Java ,社区产出多种 API,如 PHP,Python 等
单机吞吐量万级(其次)万级(最差)十万级(最好)十万级(次之)
消息延迟微妙级毫秒级毫秒级毫秒以内
功能特性并发能力强,性能极其好,延时低,社区活跃,管理界面丰富老牌产品,成熟度高,文档较多MQ 功能比较完备,扩展性佳只支持主要的 MQ 功能,毕竟是为大数据领域准备的。
  • 即 RabbitMQ 综合能力较强劲
5、AMQP 和 JMS
  • 实现 MQ 的大致有两种主流方式:AMQP、JMS 。
5.1 AMQP
  • AMQP,即 Advanced Message Queuing Protocol(高级消息队列协议),是一个网络协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,遵循此协议,不收客户端和中间件产品和开发语言限制。2006年,AMQP 规范发布。类比 HTTP。
5.2 JMS
  1. JMS 即 Java 消息服务(JavaMessage Service)应用程序接口,是一个 Java 平台中关于面向消息中间件的 API 。
  2. JMS 是 JavaEE 规范中的一种,类比 JDBC 。
  3. 很多消息中间件都实现了 JMS 规范,例如:ActiveMQ 。RabbitMQ 官方没有提供 JMS 的实现包,但是开源社区有。
5.3 AMQP 与 JMS 区别
  1. JMS 是定义了统一的接口,来对消息操作进行统一;AMQP 是通过规定协议来统一数据交互的格式。
  2. JMS 限定了必须使用 Java 语言;AMQP 只是协议,不规定实现方式,因此是跨语言的。
  3. JMS 规定了两种消息模式;而 AMQP 的消息模式更加丰富。
6、RabbitMQ
  1. RabbitMQ 官方地址:https://www.rabbitmq.com/

  2. 2007年,Rabbit 技术公司基于 AMQP 标准开发的 RabbitMQ 1.0 发布。RabbitMQ 采用 Erlang 语言开发。Erlang 语言专门为开发高并发和分布式系统的一种语言,在电信领域使用广泛。

  3. RabbitMQ 基础架构如下图:

  4. RabbitMQ 中的相关概念:

    • Broker:接收和分发消息的应用,RabbitMQ Server 就是 Message Broker 。
    • Virtual host:出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中,类似于网络中的 namespace 概念。当多个不同的用户使用同一个 RabbitMQ server 提供的服务时,可以划分出多个 vhost,每个用户在自己的 vhost 创建 exchange/queue 等。
    • Connection:publisher/consumer 和 broker 之间的 TCP 连接。
    • Channel:如果每一次访问 RabbitMQ 都建立一个 Connection,在消息量大的时候建立 TCP Connection 的开销将是巨大的,效率也较低。Channel 是在 connection 内部建立的逻辑连接,如果应用程序支持多线程,通常每个 thread 创建单独的 channel 进行通讯,AMQP method 包含了 channel id 帮助客户端和 message broker 识别 channel,所以 channel 之间是完全隔离的。Channel 作为轻量级的 Connection 极大减少了操作系统建立 TCP connection 的开销。
    • Exchange:message 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发消息到 queue 中去。常用的类型有:direct (point-to-point), topic (publish-subscribe) and fanout (multicast)。
    • Queue:消息最终被送到这里等待 consumer 取走。
    • Binding:exchange 和 queue 之间的虚拟连接,binding 中可以包含 routing key。Binding 信息被保存到 exchange 中的查询表中,用于 message 的分发依据。

RabbitMQ 提供了6种模式:简单模式,work 模式,Publish/Subscribe 发布与订阅模式,Routing 路由模式,Topics 主题模式,RPC 远程调用模式(远程调用,不太算 MQ ;暂不作介绍)。

官网对应模式介绍:https://www.rabbitmq.com/getstarted.html

二、安装及配置 RabbitMQ

安装环境:CentOS7+

1、安装依赖环境 2、安装 Erlang 3、安装 RabbitMQ 4、开启管理界面及配置 5、启动 6、配置虚拟主机及用户 6.1 用户角色 6.2 Virtual Hosts 配置 6.2.1 创建 Virtual Hosts 6.2.2 设置 Virtual Hosts 权限 三、RabbitMQ 入门

简单模式:

在上图的模型中,有以下概念:

  • P:生产者,也就是要发送消息的程序。
  • C:消费者:消息的接收者,会一直等待消息到来。
  • queue:消息队列,图中红色部分。类似一个邮箱,可以缓存消息;生产者向其中投递消息,消费者从其中取出消息。
1、搭建示例工程 1.1 创建工程

1.2 添加依赖

往 lxs-rabbitmq 的 pom.xml 文件中添加如下依赖:


	com.rabbitmq
	amqp-client
	5.6.0

2、编写生产者

编写消息生产者 com.lxs.rabbitmq.simple.Producer :

package com.lxs.rabbitmq.simple;

import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;


public class Producer {

    static final String QUEUE_NAME = "simple_queue";

    public static void main(String[] args) throws Exception {
        // 创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 主机地址:默认为 localhost
        connectionFactory.setHost("localhost");
        // 连接端口,默认为 5672
        connectionFactory.setPort(5672);
        // 虚拟主机名称,默认为 /
        connectionFactory.setVirtualHost("/xzk");
        // 连接用户名,默认为 guest 
        connectionFactory.setUsername("lxs");
        // 连接密码,默认为 guest
        connectionFactory.setPassword("lxs");

        // 创建连接
        Connection connection = connectionFactory.newConnection();

        // 创建频道
        Channel channel = connection.createChannel();
        
        
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);
        // 要发送的信息
        String message = "你好,小兔子!";
        
        
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
        System.out.println("已发送消息:" + message);
        // 关闭资源
        channel.close();
        connection.close();
    }
}

在执行上述的消息发送之后;可以登录 rabbitMQ 的管理控制台,可以发现队列和其消息:

3、编写消费者

抽取创建 connection 的工具类 com.lxs.rabbitmq.util.ConnectionUtil :

package com.lxs.rabbitmq.util;

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;


public class ConnectionUtil {
	public static Connection getConnection() throws Exception {
		// 创建连接工厂
		ConnectionFactory connectionFactory = new ConnectionFactory();
		// 主机地址,默认为 localhost
		connectionFactory.setHost("localhost");
		// 连接端口,默认为 5672
		connectionFactory.setPort(5672);
		// 虚拟主机名称,默认为 /
		connectionFactory.setVirtualHost("/xzk");
		// 连接用户名,默认为 guest
		connectionFactory.setUsername("lxs");
		// 连接密码,默认为 guest
		connectionFactory.setPassword("lxs");
		// 创建连接
		return connectionFactory.newConnection();
	}
}

编写消息的消费者 com.lxs.rabbitmq.simple.Consumer :

package com.lxs.rabbitmq.simple;

import com.lxs.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;


public class Consumer {
    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtil.getConnection();
        // 创建频道
        Channel channel = connection.createChannel();
        
        
        channel.queueDeclare(Producer.QUEUE_NAME, true, false, false, null);
        // 创建消费者;并设置消息处理
        DefaultConsumer consumer = new DefaultConsumer(channel){
        
            
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties 
                                       properties,byte[] body) throws IOException {
                // 路由key
                System.out.println("路由 key 为:" + envelope.getRoutingKey());
                // 交换机
                System.out.println("交换机为:" + envelope.getExchange());
                // 消息 id
                System.out.println("消息 id 为:" + envelope.getDeliveryTag());
                // 收到的消息
                System.out.println("接收到的消息为:" + new String(body, "utf-8"));
            }
        };

        
        channel.basicConsume(Producer.QUEUE_NAME, true, consumer);
        // 不关闭资源,应该一直监听消息
        // channel.close();
        // connection.close();
    }
}
4、小结

上述的入门案例中中其实使用的是如下的简单模式:

在上图的模型中,有以下概念:

  1. P:生产者,也就是要发送消息的程序。
  2. C:消费者:消息的接受者,会一直等待消息到来。
  3. queue:消息队列,图中红色部分。类似一个邮箱,可以缓存消息;生产者向其中投递消息,消费者从其中取出消息。
四、RabbitMQ 工作模式 1、Work queues 工作队列模式 1.1 模式说明

  • Work Queues 与入门程序的简单模式相比,多了一个或一些消费端,多个消费端共同消费同一个队列中的消息。
  • 应用场景:对于 任务过重或任务较多情况使用工作队列可以提高任务处理的速度。
1.2 代码
  • Work Queues 与入门程序的简单模式的代码是几乎一样的;可以完全复制,并复制多一个消费者进行多个消费者同时消费消息的测试。
1)生产者
package com.lxs.rabbitmq.work;

import com.lxs.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;


public class Producer {
    static final String QUEUE_NAME = "work_queue";
    public static void main(String[] args) throws Exception {
        // 创建连接
        Connection connection = ConnectionUtil.getConnection();
        // 创建频道
        Channel channel = connection.createChannel();

        
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);
        
        for (int i = 1; i <= 30; i++) {
            // 发送信息
            String message = "你好,小兔子!work模式-- " + i;
            
            
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
            System.out.println("已发送消息:" + message);
        }
        // 关闭资源
        channel.close();
        connection.close();
    }
}
2)消费者1
package com.lxs.rabbitmq.work;

import com.lxs.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;


public class Consumer1 {
    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtil.getConnection();
        // 创建频道
        Channel channel = connection.createChannel();

        
        channel.queueDeclare(Producer.QUEUE_NAME, true, false, false, null);
        // 一次只能接收并处理一个消息
        channel.basicQos(1);
        // 创建消费者;并设置消息处理
        DefaultConsumer consumer = new DefaultConsumer(channel){
        
            
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) throws IOException {
                try {
                    // 路由 key
                    System.out.println("路由 key 为:" + envelope.getRoutingKey());
                    // 交换机
                    System.out.println("交换机为:" + envelope.getExchange());
                    // 消息 id
                    System.out.println("消息 id 为:" + envelope.getDeliveryTag());
                    // 收到的消息
                    System.out.println("消费者1 - 接收到的消息为:" + new String(body,"utf-8"));
                    Thread.sleep(1000);
                    // 确认消息
                    channel.basicAck(envelope.getDeliveryTag(), false);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
            }
        };

        
        channel.basicConsume(Producer.QUEUE_NAME, false, consumer);
    }
}
3)消费者2
package com.lxs.rabbitmq.work;

import com.lxs.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;


public class Consumer2 {
    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtil.getConnection();
        // 创建频道
        Channel channel = connection.createChannel();

        
        channel.queueDeclare(Producer.QUEUE_NAME, true, false, false, null);
        // 一次只能接收并处理一个消息
        channel.basicQos(1);
        // 创建消费者;并设置消息处理
        DefaultConsumer consumer = new DefaultConsumer(channel){
        
            
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) throws IOException {
                try {
                    // 路由 key
                    System.out.println("路由 key 为:" + envelope.getRoutingKey());
                    // 交换机
                    System.out.println("交换机为:" + envelope.getExchange());
                    // 消息 id
                    System.out.println("消息 id 为:" + envelope.getDeliveryTag());
                    // 收到的消息
                    System.out.println("消费者2 - 接收到的消息为:" + new String(body,"utf-8"));
                    Thread.sleep(1000);
                    // 确认消息
                    channel.basicAck(envelope.getDeliveryTag(), false);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
            }
        };

        
        channel.basicConsume(Producer.QUEUE_NAME, false, consumer);
    }
}
1.3 测试
  • 启动两个消费者,然后再启动生产者发送消息;到 IDEA 的两个消费者对应的控制台查看是否竞争性的接收到消息。

1.4 小结
  • 在一个队列中如果有多个消费者,那么消费者之间对于同一个消息的关系是竞争的关系。
2、订阅模式概述

订阅模式示例图:

前面2个案例中,只有3个角色:

  1. P:生产者,也就是要发送消息的程序。
  2. C:消费者:消息的接受者,会一直等待消息到来。
  3. queue:消息队列,图中红色部分。

而在订阅模型中,多了一个 exchange 角色,而且过程略有变化:

  1. P:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给 X(交换机)。
  2. C:消费者,消息的接受者,会一直等待消息到来。
  3. Queue:消息队列,接收消息、缓存消息。
  4. Exchange:交换机,图中的 X。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于 Exchange 的类型。Exchange 有常见以下3种类型:
    1. Fanout:广播,将消息交给所有绑定到交换机的队列。
    2. Direct:定向,把消息交给符合指定 routing key 的队列。
    3. Topic:通配符,把消息交给符合 routing pattern(路由模式) 的队列。

Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与 Exchange 绑定,或者没有符合路由规则的队列,那么消息会丢失!

3、Publish/Subscribe 发布与订阅模式 3.1 模式说明

  • 发布订阅模式: 1、每个消费者监听自己的队列。 2、生产者将消息发给 broker ,由交换机将消息转发到绑定此交换机的每个队列,每个绑定交换机的队列都将接收到消息。
3.2 代码 1)生产者
package com.lxs.rabbitmq.ps;

import com.lxs.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;


public class Producer {
    // 交换机名称
    static final String FANOUT_EXCHAGE = "fanout_exchange";
    // 队列名称
    static final String FANOUT_QUEUE_1 = "fanout_queue_1";
    // 队列名称
    static final String FANOUT_QUEUE_2 = "fanout_queue_2";
    public static void main(String[] args) throws Exception {
        // 创建连接
        Connection connection = ConnectionUtil.getConnection();
        // 创建频道
        Channel channel = connection.createChannel();
        
        channel.exchangeDeclare(FANOUT_EXCHAGE, BuiltinExchangeType.FANOUT);

        
        channel.queueDeclare(FANOUT_QUEUE_1, true, false, false, null);
        channel.queueDeclare(FANOUT_QUEUE_2, true, false, false, null);
        // 队列绑定交换机
        channel.queueBind(FANOUT_QUEUE_1, FANOUT_EXCHAGE, "");
        channel.queueBind(FANOUT_QUEUE_2, FANOUT_EXCHAGE, "");
        for (int i = 1; i <= 10; i++) {
            // 发送信息
            String message = "你好,小兔子!发布订阅模式 -- " + i;
            
            channel.basicPublish(FANOUT_EXCHAGE, "", null, message.getBytes());
            System.out.println("已发送消息:" + message);
        }
        // 关闭资源
        channel.close();
        connection.close();
    }
}
2)消费者1
package com.lxs.rabbitmq.ps;

import com.lxs.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;


public class Consumer1 {
    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtil.getConnection();
        // 创建频道
        Channel channel = connection.createChannel();
        // 声明交换机
        channel.exchangeDeclare(Producer.FANOUT_EXCHAGE,
        BuiltinExchangeType.FANOUT);

        
        channel.queueDeclare(Producer.FANOUT_QUEUE_1, true, false, false, null);
        // 队列绑定交换机
        channel.queueBind(Producer.FANOUT_QUEUE_1, Producer.FANOUT_EXCHAGE, "");
        // 创建消费者;并设置消息处理
        DefaultConsumer consumer = new DefaultConsumer(channel){
            
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) throws IOException {
                // 路由 key
                System.out.println("路由 key 为:" + envelope.getRoutingKey());
                // 交换机
                System.out.println("交换机为:" + envelope.getExchange());
                // 消息 id
                System.out.println("消息 id 为:" + envelope.getDeliveryTag());
                // 收到的消息
                System.out.println("消费者1 - 接收到的消息为:" + new String(body,"utf-8"));
            }
        };

        
        channel.basicConsume(Producer.FANOUT_QUEUE_1, true, consumer);
    }
}
3)消费者2
package com.lxs.rabbitmq.ps;

import com.lxs.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;


public class Consumer2 {
    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtil.getConnection();
        // 创建频道
        Channel channel = connection.createChannel();
        // 声明交换机
        channel.exchangeDeclare(Producer.FANOUT_EXCHAGE,
        BuiltinExchangeType.FANOUT);

        
        channel.queueDeclare(Producer.FANOUT_QUEUE_2, true, false, false, null);
        // 队列绑定交换机
        channel.queueBind(Producer.FANOUT_QUEUE_2, Producer.FANOUT_EXCHAGE, "");
        // 创建消费者;并设置消息处理
        DefaultConsumer consumer = new DefaultConsumer(channel){
            
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) throws IOException {
                // 路由 key
                System.out.println("路由 key 为:" + envelope.getRoutingKey());
                // 交换机
                System.out.println("交换机为:" + envelope.getExchange());
                // 消息 id
                System.out.println("消息 id 为:" + envelope.getDeliveryTag());
                // 收到的消息
                System.out.println("消费者2 - 接收到的消息为:" + new String(body,"utf-8"));
            }
        };

        
        channel.basicConsume(Producer.FANOUT_QUEUE_2, true, consumer);
    }
}
3.3 测试
  • 启动所有消费者,然后使用生产者发送消息;在每个消费者对应的控制台可以查看到生产者发送的所有消息;到达广播的效果。
  • 在执行完测试代码后,其实到 RabbitMQ 的管理后台找到 Exchanges 选项卡,点击 fanout_exchange 的交换机,可以查看到如下的绑定:
3.4 小结
  • 交换机需要与队列进行绑定,绑定之后;一个消息可以被多个消费者都收到。

发布订阅模式与工作队列模式的区别:

  1. 工作队列模式不用定义交换机,而发布/订阅模式需要定义交换机。
  2. 发布/订阅模式的生产方是面向交换机发送消息,工作队列模式的生产方是面向队列发送消息(底层使用默认交换机)。
  3. 发布/订阅模式需要设置队列和交换机的绑定,工作队列模式不需要设置,实际上工作队列模式会将队列绑 定到默认的交换机 。
4、Routing 路由模式 4.1 模式说明

路由模式特点:

  1. 队列与交换机的绑定,不能是任意绑定了,而是要指定一个 RoutingKey (路由 key)。
  2. 消息的发送方在向 Exchange 发送消息时,也必须指定消息的 RoutingKey 。
  3. Exchange 不再把消息交给每一个绑定的队列,而是根据消息的 Routing Key 进行判断,只有队列的 Routingkey 与消息的 Routing key 完全一致,才会接收到消息。


图解:

  1. P:生产者,向 Exchange 发送消息,发送消息时,会指定一个 routing key 。
  2. X:Exchange(交换机),接收生产者的消息,然后把消息递交给与 routing key 完全匹配的队列。
  3. C1:消费者,其所在队列指定了需要 routing key 为 error 的消息。
  4. C2:消费者,其所在队列指定了需要 routing key 为 info、error、warning 的消息。
4.2 代码

  • 在编码上与 Publish/Subscribe 发布与订阅模式的区别是交换机的类型为:Direct ,还有队列绑定交换机的时候需要指定 routing key。
1)生产者
package com.lxs.rabbitmq.routing;

import com.lxs.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;


public class Producer {
    // 交换机名称
    static final String DIRECT_EXCHAGE = "direct_exchange";
    // 队列名称
    static final String DIRECT_QUEUE_INSERT = "direct_queue_insert";
    // 队列名称
    static final String DIRECT_QUEUE_UPDATE = "direct_queue_update";
    public static void main(String[] args) throws Exception {
        // 创建连接
        Connection connection = ConnectionUtil.getConnection();
        // 创建频道
        Channel channel = connection.createChannel();
        
        channel.exchangeDeclare(DIRECT_EXCHAGE, BuiltinExchangeType.DIRECT);

        
        channel.queueDeclare(DIRECT_QUEUE_INSERT, true, false, false, null);
        channel.queueDeclare(DIRECT_QUEUE_UPDATE, true, false, false, null);
        // 队列绑定交换机
        channel.queueBind(DIRECT_QUEUE_INSERT, DIRECT_EXCHAGE, "insert");
        channel.queueBind(DIRECT_QUEUE_UPDATE, DIRECT_EXCHAGE, "update");
        // 发送信息
        String message = "新增了商品。路由模式;routing key 为 insert " ;
        
        
        channel.basicPublish(DIRECT_EXCHAGE, "insert", null,message.getBytes());
        System.out.println("已发送消息:" + message);
        // 发送信息
        message = "修改了商品。路由模式;routing key 为 update " ;
        
        channel.basicPublish(DIRECT_EXCHAGE, "update", null,message.getBytes());
        System.out.println("已发送消息:" + message);
        // 关闭资源
        channel.close();
        connection.close();
    }
}
2)消费者1
package com.lxs.rabbitmq.routing;

import com.lxs.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;


public class Consumer1 {
    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtil.getConnection();
        // 创建频道
        Channel channel = connection.createChannel();
        // 声明交换机
        channel.exchangeDeclare(Producer.DIRECT_EXCHAGE,BuiltinExchangeType.DIRECT);

        
        channel.queueDeclare(Producer.DIRECT_QUEUE_INSERT, true, false, false, null);
        // 队列绑定交换机
        channel.queueBind(Producer.DIRECT_QUEUE_INSERT, Producer.DIRECT_EXCHAGE,"insert");
        // 创建消费者;并设置消息处理
        DefaultConsumer consumer = new DefaultConsumer(channel){
            
            
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) throws IOException {
                // 路由 key
                System.out.println("路由 key 为:" + envelope.getRoutingKey());
                // 交换机
                System.out.println("交换机为:" + envelope.getExchange());
                // 消息 id
                System.out.println("消息 id 为:" + envelope.getDeliveryTag());
                // 收到的消息
                System.out.println("消费者1 - 接收到的消息为:" + new String(body,"utf-8"));
            }
        };

        
        channel.basicConsume(Producer.DIRECT_QUEUE_INSERT, true, consumer);
    }
}
3)消费者2
package com.lxs.rabbitmq.routing;

import com.lxs.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;


public class Consumer2 {
    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtil.getConnection();
        // 创建频道
        Channel channel = connection.createChannel();
        // 声明交换机
        channel.exchangeDeclare(Producer.DIRECT_EXCHAGE,BuiltinExchangeType.DIRECT);

        
        channel.queueDeclare(Producer.DIRECT_QUEUE_INSERT, true, false, false, null);
        // 队列绑定交换机
        channel.queueBind(Producer.DIRECT_QUEUE_INSERT, Producer.DIRECT_EXCHAGE,"insert");
        // 创建消费者;并设置消息处理
        DefaultConsumer consumer = new DefaultConsumer(channel){
            
            
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) throws IOException {
                // 路由 key
                System.out.println("路由 key 为:" + envelope.getRoutingKey());
                // 交换机
                System.out.println("交换机为:" + envelope.getExchange());
                // 消息 id
                System.out.println("消息 id 为:" + envelope.getDeliveryTag());
                // 收到的消息
                System.out.println("消费者2 - 接收到的消息为:" + new String(body,"utf-8"));
            }
        };

        
        channel.basicConsume(Producer.DIRECT_QUEUE_INSERT, true, consumer);
    }
}
4.3 测试
  • 启动所有消费者,然后使用生产者发送消息;在消费者对应的控制台可以查看到生产者发送对应 routing key 对应队列的消息;到达按照需要接收的效果。
  • 在执行完测试代码后,其实到 RabbitMQ 的管理后台找到 Exchanges 选项卡,点击 direct_exchange 的交换机,可以查看到如下的绑定:

4.4 小结
  • Routing 模式要求队列在绑定交换机时要指定 routing key,消息会转发到符合 routing key 的队列。
5、Topics 通配符模式 5.1 模式说明
  1. Topic 类型与 Direct 相比,都是可以根据 RoutingKey 把消息路由到不同的队列。只不过 Topic 类型 Exchange 可以让队列在绑定 Routing key 的时候使用通配符!
  2. Routingkey 一般都是有一个或多个单词组成,多个单词之间以 ”.” 分割,例如: item.insert 。
  3. 通配符规则:
    #:匹配一个或多个词。
    *:匹配不多不少恰好1个词。
  4. 举例:
    item.# :能够匹配 item.insert.abc 或者 item.insert 。
    item.* :只能匹配 item.insert 。

图解:

  1. 红色 Queue:绑定的是 usa.# ,因此凡是以 usa. 开头的 routing key 都会被匹配到。
  2. 黄色 Queue:绑定的是 #.news ,因此凡是以 .news 结尾的 routing key 都会被匹配。
5.2 代码

1)生产者
  1. 使用 topic 类型的 Exchange,发送消息的 routing key 有3种: item.insert 、item.update 、item.delete :
package com.lxs.rabbitmq.topic;

import com.lxs.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;


public class Producer {
    // 交换机名称
    static final String TOPIC_EXCHAGE = "topic_exchange";
    // 队列名称
    static final String TOPIC_QUEUE_1 = "topic_queue_1";
    // 队列名称
    static final String TOPIC_QUEUE_2 = "topic_queue_2";
    public static void main(String[] args) throws Exception {
        // 创建连接
        Connection connection = ConnectionUtil.getConnection();
        // 创建频道
        Channel channel = connection.createChannel();
        
        channel.exchangeDeclare(TOPIC_EXCHAGE, BuiltinExchangeType.TOPIC);
        // 发送信息
        String message = "新增了商品。Topic 模式;routing key 为 item.insert " ;
        channel.basicPublish(TOPIC_EXCHAGE, "item.insert", null,message.getBytes());
        System.out.println("已发送消息:" + message);
        // 发送信息
        message = "修改了商品。Topic 模式;routing key 为 item.update" ;
        channel.basicPublish(TOPIC_EXCHAGE, "item.update", null,message.getBytes());
        System.out.println("已发送消息:" + message);
        // 发送信息
        message = "删除了商品。Topic 模式;routing key 为 item.delete" ;
        channel.basicPublish(TOPIC_EXCHAGE, "item.delete", null,message.getBytes());
        System.out.println("已发送消息:" + message);
        // 关闭资源
        channel.close();
        connection.close();
    }
}
2)消费者1
  1. 接收两种类型的消息:更新商品和删除商品。
package com.lxs.rabbitmq.topic;

import com.lxs.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;


public class Consumer1 {
    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtil.getConnection();
        // 创建频道
        Channel channel = connection.createChannel();
        // 声明交换机
        channel.exchangeDeclare(Producer.TOPIC_EXCHAGE,BuiltinExchangeType.TOPIC);

        
        channel.queueDeclare(Producer.TOPIC_QUEUE_1, true, false, false, null);
        // 队列绑定交换机
        channel.queueBind(Producer.TOPIC_QUEUE_1, Producer.TOPIC_EXCHAGE,"item.update");
        channel.queueBind(Producer.TOPIC_QUEUE_1, Producer.TOPIC_EXCHAGE,"item.delete");
        // 创建消费者;并设置消息处理
        DefaultConsumer consumer = new DefaultConsumer(channel){
        
            
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) throws IOException {
                // 路由 key
                System.out.println("路由 key 为:" + envelope.getRoutingKey());
                // 交换机
                System.out.println("交换机为:" + envelope.getExchange());
                // 消息 id
                System.out.println("消息 id 为:" + envelope.getDeliveryTag());
                // 收到的消息
                System.out.println("消费者1 - 接收到的消息为:" + new String(body,"utf-8"));
            }
        };

        
        channel.basicConsume(Producer.TOPIC_QUEUE_1, true, consumer);
    }
}
3)消费者2
  1. 接收所有类型的消息:新增商品,更新商品和删除商品。
package com.lxs.rabbitmq.topic;

import com.lxs.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;


public class Consumer2 {
    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtil.getConnection();
        // 创建频道
        Channel channel = connection.createChannel();
        // 声明交换机
        channel.exchangeDeclare(Producer.TOPIC_EXCHAGE,BuiltinExchangeType.TOPIC);

        
        channel.queueDeclare(Producer.TOPIC_QUEUE_2, true, false, false, null);
        // 队列绑定交换机
        channel.queueBind(Producer.TOPIC_QUEUE_2, Producer.TOPIC_EXCHAGE,"item.*");

        // 创建消费者;并设置消息处理
        DefaultConsumer consumer = new DefaultConsumer(channel){
        
            
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) throws IOException {
                // 路由 key
                System.out.println("路由 key 为:" + envelope.getRoutingKey());
                // 交换机
                System.out.println("交换机为:" + envelope.getExchange());
                // 消息 id
                System.out.println("消息 id 为:" + envelope.getDeliveryTag());
                // 收到的消息
                System.out.println("消费者2 - 接收到的消息为:" + new String(body,"utf-8"));
            }
        };

        
        channel.basicConsume(Producer.TOPIC_QUEUE_2, true, consumer);
    }
}
5.3 测试
  1. 启动所有消费者,然后使用生产者发送消息;在消费者对应的控制台可以查看到生产者发送对应 routing key 对应队列的消息;到达按照需要接收的效果;并且这些 routing key 可以使用通配符。
  2. 在执行完测试代码后,其实到 RabbitMQ 的管理后台找到 Exchanges 选项卡,点击 topic_exchange 的交换机,可以查看到如下的绑定:

5.4 小结
  • Topic 主题模式可以实现 Publish/Subscribe 发布与订阅模式和 Routing 路由模式 的功能;只是 Topic 在配置 routing key 的时候可以使用通配符,显得更加灵活。
6、模式总结

RabbitMQ工作模式:

  1. 简单模式 HelloWorld 一个生产者、一个消费者,不需要设置交换机(使用
    默认的交换机)。
  2. 工作队列模式 Work Queue 一个生产者、多个消费者(竞争关系),不需要设置交换机(使用默认的交换机)。
  3. 发布订阅模式 Publish/subscribe 需要设置类型为 fanout 的交换机,并且交换机和队列进行绑定,当发送消息到交换机后,交换机会将消息发送到绑定的队列。
  4. 路由模式 Routing 需要设置类型为 direct 的交换机,交换机和队列进行绑定,并且指定 routing key,当发送消息到交换机后,交换机会根据 routing key 将消息发送到对应的队列。
  5. 通配符模式 Topic 需要设置类型为 topic 的交换机,交换机和队列进行绑定,并且指定通配符方式的 routing key,当发送消息到交换机后,交换机会根据 routing key 将消息发送到对应的队列。
五、Spring 整合 RabbitMQ 1、搭建生产者工程 1.1 创建工程

1.2 添加依赖

修改 pom.xml 文件内容为如下:



    
    4.0.0
    com.lxs
    spring-rabbitmq-producer
    1.0-SNAPSHOT
    
    
        
            org.springframework
            spring-context
            5.1.7.RELEASE
        
        
            org.springframework.amqp
            spring-rabbit
            2.1.8.RELEASE
        
        
            junit
            junit
            4.12
        
        
            org.springframework
            spring-test
            5.1.7.RELEASE
        
    

1.3 配置整合
  1. 创建 spring-rabbitmq-producersrcmainresourcespropertiesrabbitmq.properties 连接参数等配置文件;
rabbitmq.host=192.168.220.12
rabbitmq.port=5672
rabbitmq.username=lxs
rabbitmq.password=lxs
rabbitmq.virtual-host=/xzk
  1. 创建 spring-rabbitmq-producersrcmainresourcesspringspring-rabbitmq.xml 整合配置文件;


    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
        
            
            
    	
    
    
    
    
    
    
    
    
    
        
            
            
            
        
    
    
    

1.4 发送消息
  1. 创建测试文件 spring-rabbitmqproducersrctestjavacomlxsrabbitmqProducerTest.java 。
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring/spring-rabbitmq.xml")
public class ProducerTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    
    @Test
    public void queueTest(){
        //路由键与队列同名
        rabbitTemplate.convertAndSend("spring_queue", "只发队列 spring_queue 的消息。");
    }
    
    
    @Test
    public void fanoutTest(){
        
        rabbitTemplate.convertAndSend("spring_fanout_exchange", "", "发送到 spring_fanout_exchange 交换机的广播消息");
    }
    
    
    @Test
    public void topicTest(){
        
        rabbitTemplate.convertAndSend("spring_topic_exchange", "lxs.bj", "发送到 spring_topic_exchange 交换机 lxs.bj 的消息");
        rabbitTemplate.convertAndSend("spring_topic_exchange", "lxs.bj.1", "发送到 spring_topic_exchange 交换机 lxs.bj.1 的消息");
        rabbitTemplate.convertAndSend("spring_topic_exchange", "lxs.bj.2", "发送到 spring_topic_exchange 交换机 lxs.bj.2 的消息");
        rabbitTemplate.convertAndSend("spring_topic_exchange", "xzk.cn", "发送到 spring_topic_exchange 交换机 xzk.cn 的消息");
    }
}
2、搭建消费者工程 2.1 创建工程

2.2 添加依赖

修改 pom.xml 文件内容为如下:



    
    4.0.0
    com.lxs
    spring-rabbitmq-consumer
    1.0-SNAPSHOT
    
    
        
            org.springframework
            spring-context
            5.1.7.RELEASE
        
        
            org.springframework.amqp
            spring-rabbit
            2.1.8.RELEASE
        
        
            junit
            junit
            4.12
        
        
            org.springframework
            spring-test
            5.1.7.RELEASE
        
    

2.3 配置整合
  1. 创建 spring-rabbitmq-consumersrcmainresourcespropertiesrabbitmq.properties 连接参数等配置文件;
rabbitmq.host=192.168.192.168.220.12
rabbitmq.port=5672
rabbitmq.username=lxs
rabbitmq.password=lxs
rabbitmq.virtual-host=/xzk
  1. 创建 spring-rabbitmq-consumersrcmainresourcesspringspring-rabbitmq.xml 整合配置文件;


    
    
    
    
    
    
    
    
    
    
    
    
    
        
        
        
        
        
        
    

2.4 消息监听器 1)队列监听器
  1. 创建 spring-rabbitmqconsumersrcmainjavacomlxsrabbitmqlistenerSpringQueueListener.java 。
public class SpringQueueListener implements MessageListener {
    public void onMessage(Message message) {
        try {
            String msg = new String(message.getBody(), "utf-8");
            System.out.printf("接收路由名称为:%s,路由键为:%s,队列名为:%s的消息:%s n",
                              message.getMessageProperties().getReceivedExchange(),
                              message.getMessageProperties().getReceivedRoutingKey(),
                              message.getMessageProperties().getConsumerQueue(),msg);
        } catch (Exception e) {
        	e.printStackTrace();
        }
    }
}
2)广播监听器1
  1. 创建 spring-rabbitmqconsumersrcmainjavacomlxsrabbitmqlistenerFanoutListener1.java 。
public class FanoutListener1 implements MessageListener {
    public void onMessage(Message message) {
        try {
            String msg = new String(message.getBody(), "utf-8");
            System.out.printf("广播监听器1:接收路由名称为:%s,路由键为:%s,队列名为:%s的消息:%s n",
                              message.getMessageProperties().getReceivedExchange(),
                              message.getMessageProperties().getReceivedRoutingKey(),
                              message.getMessageProperties().getConsumerQueue(),msg);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
3)广播监听器2
  1. 创建 spring-rabbitmqconsumersrcmainjavacomlxsrabbitmqlistenerFanoutListener2.java 。
public class FanoutListener2 implements MessageListener {
    public void onMessage(Message message) {
        try {
            String msg = new String(message.getBody(), "utf-8");
            System.out.printf("广播监听器2:接收路由名称为:%s,路由键为:%s,队列名为:%s的消息:%s n",
                              message.getMessageProperties().getReceivedExchange(),
                              message.getMessageProperties().getReceivedRoutingKey(),
                              message.getMessageProperties().getConsumerQueue(),msg);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
4)星号通配符监听器
  1. 创建 spring-rabbitmqconsumersrcmainjavacomlxsrabbitmqlistenerTopicListenerStar.java 。
public class TopicListenerStar implements MessageListener {
    public void onMessage(Message message) {
        try {
            String msg = new String(message.getBody(), "utf-8");
            System.out.printf("通配符*监听器:接收路由名称为:%s,路由键为:%s,队列名为:%s的消息:%s n",
                              message.getMessageProperties().getReceivedExchange(),
                              message.getMessageProperties().getReceivedRoutingKey(),
                              message.getMessageProperties().getConsumerQueue(),msg);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
5)井号通配符监听器
  1. 创建 spring-rabbitmqconsumersrcmainjavacomlxsrabbitmqlistenerTopicListenerWell.java 。
public class TopicListenerWell implements MessageListener {
    public void onMessage(Message message) {
        try {
            String msg = new String(message.getBody(), "utf-8");
            System.out.printf("通配符#监听器:接收路由名称为:%s,路由键为:%s,队列名为:%s的消息:%s n",
                              message.getMessageProperties().getReceivedExchange(),
                              message.getMessageProperties().getReceivedRoutingKey(),
                              message.getMessageProperties().getConsumerQueue(),msg);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
6)井号通配符监听器2
  1. 创建 spring-rabbitmqconsumersrcmainjavacomlxsrabbitmqlistenerTopicListenerWell2.java 。
public class TopicListenerWell2 implements MessageListener {
    public void onMessage(Message message) {
        try {
            String msg = new String(message.getBody(), "utf-8");
            System.out.printf("通配符#监听器2:接收路由名称为:%s,路由键为:%s,队列名为:%s的消息:%s n",
                              message.getMessageProperties().getReceivedExchange(),
                              message.getMessageProperties().getReceivedRoutingKey(),
                              message.getMessageProperties().getConsumerQueue(),msg);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
六、Spring Boot 整合 RabbitMQ 1、简介
  • 在 Spring 项目中,可以使用 Spring-Rabbit 去操作 RabbitMQ https://github.com/spring-projects/spring-amqp 。
  • 尤其是在 spring boot 项目中只需要引入对应的 amqp 启动器依赖即可,方便的使用 RabbitTemplate 发送消息,使用注解接收消息。

一般在开发过程中:
生产者工程:

  1. application.yml 文件配置 RabbitMQ 相关信息。
  2. 在生产者工程中编写配置类,用于创建交换机和队列,并进行绑定。
  3. 注入 RabbitTemplate 对象,通过 RabbitTemplate 对象发送消息到交换机。

消费者工程:

  1. application.yml 文件配置 RabbitMQ 相关信息。
  2. 创建消息处理类,用于接收队列中的消息并进行处理。
2、搭建生产者工程 2.1 创建工程
  1. 创建生产者工程 springboot-rabbitmq-producer 。
2.2 添加依赖
  1. 修改 pom.xml 文件内容为如下:


    4.0.0
    
        org.springframework.boot
        spring-boot-starter-parent
        2.1.4.RELEASE
    
    com.lxs
    springboot-rabbitmq-producer
    1.0-SNAPSHOT
    
        
            org.springframework.boot
            spring-boot-starter-amqp
        
        
            org.springframework.boot
            spring-boot-starter-test
        
    

2.3 启动类
package com.lxs.rabbitmq;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class ProducerApplication {
    public static void main(String[] args) {
        SpringApplication.run(ProducerApplication.class);
    }
}
2.4 配置 RabbitMQ 1)配置文件
  • 创建 application.yml,内容如下:
spring:
	rabbitmq:
        host: 192.168.220.12
        port: 5672
        virtual-host: /xzk
        username: lxs
        password: lxs
2)绑定交换机和队列
  • 创建 RabbitMQ 队列与交换机绑定的配置类 com.lxs.rabbitmq.config.RabbitMQConfig 。
package com.lxs.rabbitmq.config;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQConfig {
    // 交换机名称
    public static final String ITEM_TOPIC_EXCHANGE =
        "springboot_item_topic_exchange";
    // 队列名称
    public static final String ITEM_QUEUE = "springboot_item_queue";
    // 声明交换机
    @Bean("itemTopicExchange")
    public Exchange topicExchange(){
        return
            ExchangeBuilder.topicExchange(ITEM_TOPIC_EXCHANGE).durable(true).build();
    }
    // 声明队列
    @Bean("itemQueue")
    public Queue itemQueue(){
        return QueueBuilder.durable(ITEM_QUEUE).build();
    }
    // 绑定队列和交换机
    @Bean
    public Binding itemQueueExchange(@Qualifier("itemQueue") Queue queue,
                                     @Qualifier("itemTopicExchange") Exchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with("item.#").noargs();
    }
}
2.5 测试
  • 在生产者工程 springboot-rabbitmq-producer 中创建测试类,发送消息:
package com.lxs.rabbitmq;

import com.lxs.rabbitmq.config.RabbitMQConfig;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitMQTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Test
    public void test(){
        rabbitTemplate.convertAndSend(RabbitMQConfig.ITEM_TOPIC_EXCHANGE,
                                      "item.insert", "商品新增,routing key 为item.insert");
        rabbitTemplate.convertAndSend(RabbitMQConfig.ITEM_TOPIC_EXCHANGE,
                                      "item.update", "商品修改,routing key 为item.update");
        rabbitTemplate.convertAndSend(RabbitMQConfig.ITEM_TOPIC_EXCHANGE,
                                      "item.delete", "商品删除,routing key 为item.delete");
    }
}

先运行上述测试程序(交换机和队列才能先被声明和绑定),然后启动消费者;在消费者工程 springboot-rabbitmq-consumer 中控制台查看是否接收到对应消息。

另外;也可以在 RabbitMQ 的管理控制台中查看到交换机与队列的绑定:

3、搭建消费者工程 3.1 创建工程
  1. 创建消费者工程 springboot-rabbitmq-consumer 。
3.2 添加依赖
  1. 修改 pom.xml 文件内容为如下:


    4.0.0
    
        org.springframework.boot
        spring-boot-starter-parent
        2.1.4.RELEASE
    
    com.lxs
    springboot-rabbitmq-consumer
    1.0-SNAPSHOT
    
        org.springframework.boot
        spring-boot-starter-amqp
        
        
            org.springframework.boot
            spring-boot-starter-test
        
    

3.3 启动类
package com.lxs.rabbitmq;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class ConsumerApplication {
    public static void main(String[] args) {
        SpringApplication.run(ConsumerApplication.class);
    }
}
3.4 配置 RabbitMQ
  1. 创建 application.yml ,内容如下:
spring:
	rabbitmq:
		host: 192.168.220.12
		port: 5672
		virtual-host: /xzk
		username: lxs
		password: lxs
3.5 消息监听处理类
  1. 编写消息监听器 com.lxs.rabbitmq.listener.MyListener 。
package com.lxs.rabbitmq.listener;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class MyListener {
    
    @RabbitListener(queues = "springboot_item_queue")
    public void myListener1(String message){
        System.out.println("消费者接收到的消息为:" + message);
    }
}
七、高级特性 1、消息的可靠投递

在使用 RabbitMQ 的时候,作为消息发送方希望杜绝任何消息丢失或者投递失败场景。RabbitMQ 为我们提供了两种方式用来控制消息的投递可靠性模式。

  • confirm 确认模式
  • return 退回模式

rabbitmq 整个消息投递的路径为:producer —> rabbitmq broker —> exchange —> queue —> consumer 。

  • 消息从 producer 到 exchange 则会返回一个 confirmCallback 。
  • 消息从 exchange–>queue 投递失败则会返回一个 returnCallback 。

我们将利用这两个 callback 控制消息的可靠性投递。

1.1 确认模式
  • 消息从 producer 到 exchange 则会返回一个 confirmCallback 。
@Test
public void test/confirm/i() {
    // 2. 定义回调
    rabbitTemplate.set/confirm/iCallback(new RabbitTemplate./confirm/iCallback() {
        
        @Override
        public void /confirm/i(CorrelationData correlationData, boolean ack, String cause) {
            System.out.println("confirm 方法被执行了....");
            if (ack) {
                // 接收成功
                System.out.println("接收成功消息" + cause);
            } else {
                // 接收失败
                System.out.println("接收失败消息" + cause);
                // 做一些处理,让消息再次发送。
            }
        }
    });
    // 3. 发送消息
    rabbitTemplate.convertAndSend("test_exchange_/confirm/i", "/confirm/i", "message /confirm/i....");
}
1.2 退回模式
  1. 消息从 exchange–>queue 投递失败则会返回一个 returnCallback 。
@Test
public void testReturn() {
    // 1.设置交换机处理失败消息的模式
    rabbitTemplate.setMandatory(true);
    // 2.设置 ReturnCallBack
    rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
        
        @Override
        public void returnedMessage(Message message, int replyCode, String
                                    replyText, String exchange, String routingKey) {
            System.out.println("return 执行了....");
            System.out.println(message);
            System.out.println(replyCode);
            System.out.println(replyText);
            System.out.println(exchange);
            System.out.println(routingKey);
            // 处理
        }
    });
    // 3. 发送消息
    rabbitTemplate.convertAndSend("test_exchange_/confirm/i", "/confirm/i234","message /confirm/i....");
}
2、Consumer Ack

ack 指 Acknowledge,确认。 表示消费端收到消息后的确认方式。
有三种确认方式:

  • 自动确认:acknowledge=“none” 。
  • 手动确认:acknowledge=“manual” 。
  • 根据异常情况确认:acknowledge=“auto” 。

其中自动确认是指,当消息一旦被 Consumer 接收到,则自动确认收到,并将相应 message 从 RabbitMQ 的消息缓存中移除。但是在实际业务处理中,很可能消息接收到,业务处理出现异常,那么该消息就会丢失。如果设置了手动确认方式,则需要在业务处理成功后,调用 channel.basicAck() ,手动签收,如果出现异常,则调用 channel.basicNack() 方法,让其自动重新发送消息。

package com.lxs.listener;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;
import java.io.IOException;


@Component
public class AckListener implements ChannelAwareMessageListener {
    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try {
            // 1.接收转换消息
            System.out.println(new String(message.getBody()));
            // 2. 处理业务逻辑
            System.out.println("处理业务逻辑...");
            int i = 3/0; // 出现错误
            // 3. 手动签收
            channel.basicAck(deliveryTag,true);
        } catch (Exception e) {
            // e.printStackTrace();
            // 4.拒绝签收
            
            channel.basicNack(deliveryTag,true,true);
            // channel.basicReject(deliveryTag,true);
        }
    }
}

.....
3、消费端限流

package com.lxs.listener;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;


@Component
public class QosListener implements ChannelAwareMessageListener {
    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        // Thread.sleep(1000);
        // 1.获取消息
        System.out.println(new String(message.getBody()));
        // 2. 处理业务逻辑
        // 3. 签收
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
    }
}
4、TTL

Time To Live,消息过期时间设置。

  • 管控台中设置队列 TTL:
  • 代码实现:

配置文件:


    
    
        
        
    


    
        
    

代码:

@Test
public void testTtl() {
    
    // 消息后处理对象,设置一些消息的参数信息
    MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
        @Override
        public Message postProcessMessage(Message message) throws
            AmqpException {
            // 1.设置message的信息
            message.getMessageProperties().setExpiration("5000");// 消息的过期时间
                // 2.返回该消息
                return message;
        }
    };
    // 消息单独过期
    rabbitTemplate.convertAndSend("test_exchange_ttl", "ttl.hehe", "message ttl....", messagePostProcessor);
                                  // for (int i = 0; i < 10; i++) {
                                  // if(i == 5){
                                  // //消息单独过期
                                  // rabbitTemplate.convertAndSend("test_exchange_ttl", "ttl.hehe","message ttl....",messagePostProcessor);
    // }else{
    // // 不过期的消息
    // rabbitTemplate.convertAndSend("test_exchange_ttl", "ttl.hehe","message ttl....");
    //
    // }
    //
    // }
}
5、死信队列

死信队列,英文缩写:DLX 。Dead Letter Exchange(死信交换机),当消息成为 Dead message 后,可以被重新发送到另一个交换机,这个交换机就是 DLX。

消息成为死信的三种情况:

  1. 队列消息长度到达限制;
  2. 消费者拒接消费消息,basicNack/basicReject ,并且不把消息重新放入原目标队列, requeue=false;
  3. 原队列存在消息过期设置,消息到达超时时间未被消费;

队列绑定死信交换机:

  • 给队列设置参数: x-dead-letter-exchange 和 x-dead-letter-routing-key 。

    代码实现:
  • 配置

    
    
        
        
        
        
        
        
        
        
    


    
        
        
    




    
        
    

测试代码:

  • 生产端测试:
@Test
public void testDlx(){
    // 1. 测试过期时间,死信消息
    //rabbitTemplate.convertAndSend("test_exchange_dlx","test.dlx.haha","我是一条消息,我会死吗?");
        // 2. 测试长度限制后,消息死信
        
        // 3. 测试消息拒收
        rabbitTemplate.convertAndSend("test_exchange_dlx","test.dlx.haha","我是一条消息,我会死吗?");
}
  • 消费端监听:
package com.lxs.listener;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;

@Component
public class DlxListener implements ChannelAwareMessageListener {
    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try {
            // 1.接收转换消息
            System.out.println(new String(message.getBody()));
            // 2. 处理业务逻辑
            System.out.println("处理业务逻辑...");
            int i = 3/0; // 出现错误
            // 3. 手动签收
            channel.basicAck(deliveryTag,true);
        } catch (Exception e) {
            // e.printStackTrace();
            System.out.println("出现异常,拒绝接受");
            // 4.拒绝签收,不重回队列 requeue=false
            channel.basicNack(deliveryTag,true,false);
        }
    }
}

6、延迟队列

延迟队列,即消息进入队列后不会立即被消费,只有到达指定时间后,才会被消费。
需求:

  1. 下单后,30分钟未支付,取消订单,回滚库存。
  2. 新用户注册成功7天后,发送短信问候。

实现方式:

  1. 定时器。
  2. 延迟队列。


很可惜,在 RabbitMQ 中并未提供延迟队列功能。
但是可以使用:TTL + 死信队列组合实现延迟队列的效果。


代码实现:

  • 配置


    
    
        
        
        
    


    
        
    




    
        
        
    

  • 生产端测试
@Test
public void testDelay() throws InterruptedException {
    // 1.发送订单消息。 将来是在订单系统中,下单成功后,发送消息
    rabbitTemplate.convertAndSend("order_exchange","order.msg","订单信息:id=1,time=2019年8月17日16:41:47");
    

}
  • 消费端监听:
package com.lxs.listener;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;

@Component
public class OrderListener implements ChannelAwareMessageListener {
    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try {
            // 1.接收转换消息
            System.out.println(new String(message.getBody()));
            // 2. 处理业务逻辑
            System.out.println("处理业务逻辑...");
            System.out.println("根据订单id查询其状态...");
            System.out.println("判断状态是否为支付成功");
            System.out.println("取消订单,回滚库存....");
            // 3. 手动签收
            channel.basicAck(deliveryTag,true);
        } catch (Exception e) {
            // e.printStackTrace();
            System.out.println("出现异常,拒绝接受");
            // 4.拒绝签收,不重回队列 requeue=false
            channel.basicNack(deliveryTag,true,false);
        }
    }
}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/326631.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号