网上看了很多SpringBoot整合RabbitMQ的文章都是单交换器单队列模式,在实际项目中可能不一定满足需求,特地实践测试了一下
关于安装RabbitMQ和理论这里就不赘述,网上有很多相关文章
这里使用Topic模式作参考
主题模式配置
1.添加starter依赖2.application.properties中添加连接信息3.主入口类4.Producer配置
RabbitConfig类ScheduledTask类 5.Consumer配置
HellowConsumer类 6.测试
启动Producer启动Consumer 生产者手动ACK确认配置
1.Producer配置
application.propertiesRabbitConfig类 2.启动Producer 消费者手动ACK确认配置
1.Consumer配置
application.propertiesHellowConsumer 2.启动Consumer 死信队列配置
1.Consumer配置
RabbitConfig类ScheduledTask类 2.Consumer配置
HellowConsumer类 3.测试
启动Producer启动Consumer
主题模式配置 1.添加starter依赖2.application.properties中添加连接信息org.springframework.boot spring-boot-starter-amqp org.springframework.boot spring-boot-starter-web
spring.rabbitmq.host=127.0.0.1 spring.rabbitmq.virtual-host=/ spring.rabbitmq.username=root spring.rabbitmq.password=123456 spring.rabbitmq.port=56723.主入口类
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class RabbitmqProducerApplication {
public static void main(String[] args) {
SpringApplication.run(RabbitmqProducerApplication.class, args);
}
}
以上代码Producer和Consumer一致(注意修改项目端口号和项目名称)
4.Producer配置 RabbitConfig类用于配置指定路由key绑定交换器和队列,以及预创建RabbitMQ的交换器和队列(如果没有创建的情况下)
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitConfig {
//队列1
private static final String QUEUE1 = "queue1";
//交换器1
private static final String EXCHANGE1 = "exchange1";
//路由key1
private static final String ROUTINGKEY1 = "route1";
//队列2
private static final String QUEUE2 = "queue2";
//交换器2
private static final String EXCHANGE2 = "exchange2";
//路由key2
private static final String ROUTINGKEY2 = "route2";
@Bean
public Queue queue1(){
// public Queue(
// @NotNull String name, 队列名称
// boolean durable, 是否持久化
// boolean exclusive, 是否排他
// boolean autoDelete, 是否自动删除
// @Nullable java.util.Map arguments 参数)
return new Queue(QUEUE1,true,false,false, null);
}
@Bean
public TopicExchange exchange1(){
// public TopicExchange(
// String name, 交换器名称
// boolean durable, 是否持久化
// boolean autoDelete, 是否排他
// @Nullable java.util.Map arguments 参数)
return new TopicExchange(EXCHANGE1, true, false, null);
}
@Bean
public Binding binding1(){
//指定路由key1绑定队列1和交换器1
return BindingBuilder.bind(queue1()).to(exchange1()).with(ROUTINGKEY1);
}
@Bean
public Queue queue2(){
return new Queue(QUEUE2,true,false,false, null);
}
@Bean
public TopicExchange exchange2(){
return new TopicExchange(EXCHANGE2, true, false, null);
}
@Bean
public Binding binding2(){
//指定路由key2绑定队列2和交换器2
return BindingBuilder.bind(queue2()).to(exchange2()).with(ROUTINGKEY2);
}
}
ScheduledTask类
这里使用定时任务模拟消息发送
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
@Configuration
@EnableScheduling
public class ScheduledTask {
@Autowired
private RabbitTemplate template;
private static final String EXCHANGE1 = "exchange1";
private static final String EXCHANGE2 = "exchange2";
private static final String ROUTINGKEY1 = "route1";
private static final String ROUTINGKEY2 = "route2";
private Integer id = 0;
@Scheduled(cron = "0/1 * * * * ?")
public void task1() {
User user = new User();//模拟业务类
user.setId(id);
if(id % 2 == 0){//轮流给2个队列发送消息模拟
user.setName("队列1 name"+id);
template.convertAndSend(EXCHANGE1, ROUTINGKEY1, user);
}else{
user.setName("队列2 name"+id);
template.convertAndSend(EXCHANGE2, ROUTINGKEY2, user);
}
System.out.println("定时发送消息:" + user.getName());
id++;
}
}
5.Consumer配置
HellowConsumer类
这里消费者使用注解来监听2个队列
import com.example.rabbitmq.config.User;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
@Component
public class HellowConsumer {
@RabbitListener(
queues = {"queue1"}, //队列名称
concurrency = "1" //并发数
)
public void service1(User user){
System.out.println("消息队列1推送来的消息" + user.getName());
}
@RabbitListener(
queues = {"queue2"}, //队列名称
concurrency = "1" //并发数
)
public void service2(User user){
System.out.println("消息队列2推送来的消息" + user.getName());
}
}
6.测试
注意:由于Consumer项目没有配置预创建交换器和队列,所以先启动Producer项目,否则会报错!!!
如果RabbitMQ里面存在同名的交换器或者队列,但是持久化、排他、自动删除参数不一致也会报错!!!
启动Producer 启动Consumer至此测试成功~
自动创建的交换器和队列
绑定的路由key
生产者手动ACK确认配置 1.Producer配置 application.properties#选择确认类型为交互 spring.rabbitmq.publisher-/confirm/i-type=correlated #开启发送消息失败回调 spring.rabbitmq.publisher-returns=trueRabbitConfig类
@Bean
public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory){
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
//定义发送消息到交换器ACK确认的回调函数
rabbitTemplate.set/confirm/iCallback((CorrelationData correlationData, boolean ack, String s) -> {
if(ack){
System.out.println("消息确认成功 ");
}else{
System.out.println("消息确认失败 "+s);
}
});
//定义发送消息到交换器ACK失败的回调函数
rabbitTemplate.setReturnsCallback((returnedMessage) -> {
System.out.println("消息发送失败 "+returnedMessage.getMessage());
});
return rabbitTemplate;
}
2.启动Producer
消费者手动ACK确认配置
1.Consumer配置
application.properties
#配置ACK确认为手动 spring.rabbitmq.listener.simple.acknowledge-mode=manualHellowConsumer
import com.example.rabbitmq.config.User;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
@Component
public class HellowConsumer {
@RabbitListener(
queues = {"queue1"}, //队列名称
concurrency = "1" //并发数
)
public void service1(User user, Message message, Channel channel){
final long deliveryTag = message.getMessageProperties().getDeliveryTag();
System.out.println("消息队列1推送来的消息" + user.getName());
try {
if(message.getMessageProperties().getRedelivered()){
// public void basicReject(
// long deliveryTag, //消息标签
// boolean requeue //是否放回队列
// )
//拒收该消息
channel.basicReject(deliveryTag, false);
System.out.println("消息队列1的消息" + user.getName()+" 拒绝再次处理");
}else{
// public void basicNack(
// long deliveryTag, //消息标签
// boolean multiple, //是否批确认
// boolean requeue //是否放回队列
// )
//不确认该消息
channel.basicNack(deliveryTag, false, true);
System.out.println("消息队列1的消息" + user.getName()+" 返回队列重新处理");
}
} catch (IOException e) {
e.printStackTrace();
}
}
@RabbitListener(
queues = {"queue2"}, //队列名称
concurrency = "1" //并发数
)
public void service2(User user, Message message, Channel channel){
System.out.println("消息队列2推送来的消息" + user.getName());
final long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
// public void basicAck(
// long deliveryTag, //消息标签
// boolean multiple //是否批量确认
// )
//确认该消息
channel.basicAck(deliveryTag,false);
} catch (IOException e) {
e.printStackTrace();
}
}
}
2.启动Consumer
死信队列配置
1.Consumer配置
RabbitConfig类
配置说明:在queue.old队列设置10s过期时间,过期后自动移入queue.dlx队列
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class RabbitConfig {
//原队列
private static final String QUEUEOLD = "queue.old";
//原交换器
private static final String EXCHANGEOLD = "exchange.old";
//原路由key
private static final String ROUTINGKEYOLD = "route.old";
//死信队列
private static final String QUEUEDLX = "queue.dlx";
//死信交换器
private static final String EXCHANGEDLX = "exchange.dlx";
//死信路由key
private static final String ROUTINGKEYDLX = "route.dlx";
@Bean
public Queue queueOld(){
Map arguments = new HashMap<>();
// 消息的生存时间 10s
arguments.put("x-message-ttl", 10000);
// 设置该队列所关联的死信交换器(当队列消息TTL到期后依然没有消费,则加 入死信队列)
arguments.put("x-dead-letter-exchange", "exchange.dlx");
// 设置该队列所关联的死信交换器的routingKey,如果没有特殊指定,使用原 队列的routingKey
arguments.put("x-dead-letter-routing-key", "route.dlx");
return new Queue(QUEUEOLD,true,false,false, arguments);
}
@Bean
public TopicExchange exchangeOld(){
return new TopicExchange(EXCHANGEOLD, true, false, null);
}
@Bean
public Binding bindingOld(){
return BindingBuilder.bind(queueOld()).to(exchangeOld()).with(ROUTINGKEYOLD);
}
@Bean
public Queue queueDlx(){
return new Queue(QUEUEDLX,true,false,false, null);
}
@Bean
public DirectExchange exchangeDlx(){
return new DirectExchange(EXCHANGEDLX, true, false, null);
}
@Bean
public Binding bindingDlx(){
return BindingBuilder.bind(queueDlx()).to(exchangeDlx()).with(ROUTINGKEYDLX);
}
}
ScheduledTask类
向原队列定时发送消息(模拟)
private static final String EXCHANGEOLD = "exchange.old";
private static final String ROUTINGKEYOLD = "route.old";
@Scheduled(cron = "0/1 * * * * ?")
public void task2() {
User user = new User();
user.setId(id);
user.setName("原队列 name"+id);
template.convertAndSend(EXCHANGEOLD, ROUTINGKEYOLD, user);
System.out.println("定时发送消息:" + user.getName());
id++;
}
2.Consumer配置
HellowConsumer类
监听死信队列(原队列不用监听)
@RabbitListener(
queues = {"queue.dlx"},
concurrency = "1"
)
public void service3(User user){
System.out.println("死信队列推送来的消息" + user.getName());
}
3.测试
启动Producer
启动Consumer
测试成功~



