RabbitMQ是流行的开源消息队列系统,是AMQP(Advanced Message Queuing Protocol高级消息队列协议)的标准实现,用erlang语言开发。RabbitMQ据说具有良好的性能和时效性,同时还能够非常好的支持集群和负载部署,非常适合在较大规模的分布式系统中使用。
RabbitMQ的5种模式
1.不直接Exchange交换机(默认交换机)simple简单模式:一个生产者生产一个消息到一个队列被一个消费者接收work工作队列模式:生产者发送消息到一个队列中,然后可以被多个消费者监听该队列;一个消息只能被一个消费者接收,消费者之间是竞争关系 2.使用Exchange交换机;订阅模式(交换机:广播fanout、定向direct、通配符topic)
订阅模式与前面的两种模式比较:多了一个角色Exchange交换机,接收生产者发送的消息并决定如何投递消息到其绑定的队列;消息的投递决定于交换机的类型。
发布与订阅模式:使用了fanout广播类型的交换机,可以将一个消息发送到所有绑定了该交换机的队列路由模式:使用了direct定向类型的交换机,消费会携带路由key,交换机根据消息的路由key与队列的路由key进行对比,一致的话那么该队列可以接收到消息通配符模式:使用了topic通配符类型的交换机,消费会携带路由key(*, #),交换机根据消息的路由key与队列的路由key进行对比,匹配的话那么该队列可以接收到消息 二、安装RabbitMQ(windows)
RabbitMQ由Erlang语言开发,Erlang语言用于并发及分布式系统的开发,在电信领域应用广泛,OTP(OpenTelecom Platform)作为Erlang语言的一部分,包含了很多基于Erlang开发的中间件及工具库,安装RabbitMQ需要安装Erlang/OTP,并保持版本匹配。
安装erlang
下载地址:http://erlang.org/download/otp_win64_20.3.exe
以管理员身份运行此文件进行安装。 。
erlang安装完成需要配置erlang 系统环境变量: ERLANG_HOME=C:Program Fileserl9.3(以实际路径为主) 在path中添加%ERLANG_HOME%bin;
安装RabbitMQ
下载地址:https://github.com/rabbitmq/rabbitmq-server/releases/tag/v3.7.14
以管理员身份运行此文件进行安装。安装完成后可以在系统服务中查看到RabbitMQ服务。
配置插件
为了更加方便的管理RabbitMQ服务,可以安装RabbitMQ提供的一个浏览器端管理插件,可以通过浏览器页面方便
的进行服务管理。
安装方式:
1、以管理员身份打开 cmd (不是PowerShell);然后进入在RabbitMQ的安装目录下sbin目录
2、在上述窗口执行命令: rabbitmq-plugins.bat enable rabbitmq_management
打开浏览器访问网站http://localhost:15672进入登录页面,默认账号和密码都为guest
三、整合springboot这边介绍订阅模式中的路由模式,广播则改变交换机类型并且不使用路由key即可。(队列模式则不使用交换机通过队列直接传输比较简单不做介绍)
先启动RabbitMQ
引入pom文件
org.springframework.boot spring-boot-starter-amqp
基本配置 (admin是我自己添加的用户)
spring:
rabbitmq:
host: localhost
port: 5672
virtual-host: /admin
username: admin
password: 123456
该配置用于传输对象自动转换json
@Configuration
public class RabbitConfig {
@Bean
public Jackson2JsonMessageConverter jsonMessageConverter(){
return new Jackson2JsonMessageConverter();
}
}
代码实现
添加交换机、队列、路由key(一个交换机可绑定多个队列,注释的地方为另一种绑定队列交换机的方式)
@Configuration
public class RabbitMQConfig {
//交换机名称
public static final String ITEM_TOPIC_EXCHANGE = "item_topic_exchange";
//队列名称
public static final String ITEM_QUEUE = "item_queue";
//路由key名称
public static final String ITEM_KEY = "item.delete";
//声明交换机
// @Bean("itemTopicExchange")
// public Exchange topicExchange(){
// return ExchangeBuilder.topicExchange(ITEM_TOPIC_EXCHANGE).durable(true).build();
// }
// //声明队列
// @Bean("itemQueue")
// public Queue itemQueue(){
// return QueueBuilder.durable(ITEM_QUEUE).build();
// }
//
// //绑定队列和交换机
// @Bean
// public Binding itemQueueExchange(@Qualifier("itemQueue") Queue queue,@Qualifier("itemTopicExchange") Exchange exchange){
// return BindingBuilder.bind(queue).to(exchange).with("item.delete").noargs();
}
生产者及消费者
@Controller
public class StudentController {
// @Autowired
// private StudentMapper studentMapper;
@Autowired
private RabbitTemplate rabbitTemplate;
@ResponseBody
@RequestMapping(value = "/add" ,method = {RequestMethod.POST})
public void addStudent(){
Student student = new Student();
student.setId(1);
student.setNaem("zs");
rabbitTemplate.convertAndSend(RabbitMQConfig.ITEM_TOPIC_EXCHANGE,"item.delete",student);
rabbitTemplate.convertAndSend(RabbitMQConfig.ITEM_TOPIC_EXCHANGE,"item.update",student+"");
rabbitTemplate.convertAndSend(RabbitMQConfig.ITEM_TOPIC_EXCHANGE,"item.insert",student+"");
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = RabbitMQConfig.ITEM_QUEUE),
exchange = @Exchange(name = RabbitMQConfig.ITEM_TOPIC_EXCHANGE,type = ExchangeTypes.TOPIC),
key = RabbitMQConfig.ITEM_KEY
))
public void myListener1(Student message){
System.out.println("消费者1接收到的消息为:"+message);
}
}
调用接口之后消费者接收到 生产者中路由key为item.delete的对象信息。
需要注意若是修改交换机类型或者绑定队列,需要更改交换机名字或者进RabbitMQ后台删除原来的交换机,不然修改无效!



