项目结构1、一台安装RabbitMQ的服务器或虚拟机,可通过浏览器访问。安装教程:Linux:轻松安装RabbitMQ(Centos版)
2、两个初始的springboot项目,一个是生产者(producer),一个是消费者(consumer)
【或者一个项目中构建两个springboot模块】
1、pom文件
org.springframework.boot spring-boot-starter-amqp org.springframework.boot spring-boot-starter-web junit junit org.springframework spring-test
2、yml文件
spring:
rabbitmq:
host: (ip地址)
port: 5672
# virtual-host: xiaolinzi
username: admin
password: admin
#超时时间
connection-timeout: 10000s
#开启消息送达提示
publisher-returns: true
#开启不可达消息不会被broker给删除
template:
mandatory: true
#开启消息确认模式
publisher-confirm-type: correlated
server:
port: 8081
3、配置类
package com.rabbitmq.producer.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfig {
public static final String QUEUE_INFORM_EMAIL = "queue_inform_email";
public static final String QUEUE_INFORM_SMS = "queue_inform_sms";
public static final String EXCHANGE_TOPICS_INFORM="exchange_topics_inform";
public static final String ROUTINGKEY_EMAIL="inform.#.email.#";
public static final String ROUTINGKEY_SMS="inform.#.sms.#";
@Bean(EXCHANGE_TOPICS_INFORM)
public Exchange EXCHANGE_TOPICS_INFORM(){
//durable(true) 持久化,mq重启之后交换机还在
return ExchangeBuilder.topicExchange(EXCHANGE_TOPICS_INFORM).durable(true).build();
}
@Bean(QUEUE_INFORM_EMAIL)
public Queue QUEUE_INFORM_EMAIL(){
return new Queue(QUEUE_INFORM_EMAIL);
}
@Bean(QUEUE_INFORM_SMS)
public Queue QUEUE_INFORM_SMS(){
return new Queue(QUEUE_INFORM_SMS);
}
@Bean
public Binding BINDING_QUEUE_INFORM_EMAIL(@Qualifier(QUEUE_INFORM_EMAIL) Queue queue,
@Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_EMAIL).noargs();
}
@Bean
public Binding BINDING_ROUTINGKEY_SMS(@Qualifier(QUEUE_INFORM_SMS) Queue queue,
@Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_SMS).noargs();
}
}
4、生产者测试类
package com.rabbitmq.producer.controller;
import com.rabbitmq.producer.config.RabbitMQConfig;
import org.junit.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping
public class MainController {
@Autowired
RabbitTemplate rabbitTemplate;
@RequestMapping("rabbit")
public void producerTest() {
//使用rabbitTemplate发送消息
String message = "send email message to user";
rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_TOPICS_INFORM, "inform.email", message);
}
}
消费者
1、pom文件
org.springframework.boot spring-boot-starter-amqp org.springframework.boot spring-boot-starter-web junit junit org.springframework spring-test
2、yml文件
spring:
rabbitmq:
host: (ip地址)
port: 5672
# virtual-host: xiaolinzi
username: admin
password: admin
#超时时间
connection-timeout: 10000s
#开启消息送达提示
publisher-returns: true
#开启不可达消息不会被broker给删除
template:
mandatory: true
#开启消息确认模式
publisher-confirm-type: correlated
server:
port: 8082
3、配置类
package com.rabbitmq.consumer.config;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfig {
public static final String EXCHANGE_TOPICS_INFORM="exchange_topics_inform";
public static final String QUEUE_INFORM_EMAIL = "queue_inform_email";
public static final String QUEUE_INFORM_SMS = "queue_inform_sms";
public static final String ROUTINGKEY_EMAIL="inform.#.email.#";
public static final String ROUTINGKEY_SMS="inform.#.sms.#";
@Bean(EXCHANGE_TOPICS_INFORM)
public Exchange EXCHANGE_TOPICS_INFORM(){
//durable(true) 持久化,mq重启之后交换机还在
return ExchangeBuilder.topicExchange(EXCHANGE_TOPICS_INFORM).durable(true).build();
}
@Bean(QUEUE_INFORM_EMAIL)
public Queue QUEUE_INFORM_EMAIL(){
return new Queue(QUEUE_INFORM_EMAIL);
}
@Bean(QUEUE_INFORM_SMS)
public Queue QUEUE_INFORM_SMS(){
return new Queue(QUEUE_INFORM_SMS);
}
@Bean
public Binding BINDING_QUEUE_INFORM_EMAIL(@Qualifier(QUEUE_INFORM_EMAIL) Queue queue,
@Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_EMAIL).noargs();
}
@Bean
public Binding BINDING_ROUTINGKEY_SMS(@Qualifier(QUEUE_INFORM_SMS) Queue queue,
@Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_SMS).noargs();
}
}
4、监听类
package com.rabbitmq.consumer.handler;
import com.rabbitmq.client.Channel;
import com.rabbitmq.consumer.config.RabbitMQConfig;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class ReceiveHandler {
@RabbitListener(queues = {RabbitMQConfig.QUEUE_INFORM_EMAIL})
public void receive_email(Object msg, Message message, Channel channel){
System.out.println("QUEUE_INFORM_EMAIL msg"+msg);
}
@RabbitListener(queues = {RabbitMQConfig.QUEUE_INFORM_SMS})
public void receive_sms(Object msg, Message message, Channel channel){
System.out.println("QUEUE_INFORM_SMS msg"+msg);
}
}
测试结果
启动生产者和消费者:
postman测试:
RabbitMQ详解之你要的RabbitMQ这里都有



