栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

SpringBoot整合RabbitMQ实战(多交换器多队列、手动ACK确认、死信队列)

SpringBoot整合RabbitMQ实战(多交换器多队列、手动ACK确认、死信队列)

网上看了很多SpringBoot整合RabbitMQ的文章都是单交换器单队列模式,在实际项目中可能不一定满足需求,特地实践测试了一下

​关于安装RabbitMQ和理论这里就不赘述,网上有很多相关文章
这里使用Topic模式作参考

文章目录

主题模式配置

1.添加starter依赖2.application.properties中添加连接信息3.主入口类4.Producer配置

RabbitConfig类ScheduledTask类 5.Consumer配置

HellowConsumer类 6.测试

启动Producer启动Consumer 生产者手动ACK确认配置

1.Producer配置

application.propertiesRabbitConfig类 2.启动Producer 消费者手动ACK确认配置

1.Consumer配置

application.propertiesHellowConsumer 2.启动Consumer 死信队列配置

1.Consumer配置

RabbitConfig类ScheduledTask类 2.Consumer配置

HellowConsumer类 3.测试

启动Producer启动Consumer

主题模式配置 1.添加starter依赖
        
            org.springframework.boot
            spring-boot-starter-amqp
        
        
            org.springframework.boot
            spring-boot-starter-web
        
2.application.properties中添加连接信息
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.virtual-host=/
spring.rabbitmq.username=root
spring.rabbitmq.password=123456
spring.rabbitmq.port=5672
3.主入口类
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;


@SpringBootApplication
public class RabbitmqProducerApplication {

    public static void main(String[] args) {
        SpringApplication.run(RabbitmqProducerApplication.class, args);
    }
    
}

以上代码Producer和Consumer一致(注意修改项目端口号和项目名称)

4.Producer配置 RabbitConfig类

用于配置指定路由key绑定交换器和队列,以及预创建RabbitMQ的交换器和队列(如果没有创建的情况下)

import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitConfig {

    //队列1
    private static final String QUEUE1 = "queue1";
    //交换器1
    private static final String EXCHANGE1 = "exchange1";
    //路由key1
    private static final String ROUTINGKEY1 = "route1";

    //队列2
    private static final String QUEUE2 = "queue2";
    //交换器2
    private static final String EXCHANGE2 = "exchange2";
    //路由key2
    private static final String ROUTINGKEY2 = "route2";

    @Bean
    public Queue queue1(){
//        public Queue(     
//        @NotNull String name, 队列名称
//        boolean durable,  是否持久化
//        boolean exclusive,    是否排他
//        boolean autoDelete,   是否自动删除
//        @Nullable java.util.Map arguments     参数)
        return new Queue(QUEUE1,true,false,false, null);
    }

    @Bean
    public TopicExchange exchange1(){
//        public TopicExchange(     
//        String name,  交换器名称
//        boolean durable,  是否持久化
//        boolean autoDelete,   是否排他
//        @Nullable java.util.Map arguments     参数)
        return new TopicExchange(EXCHANGE1, true, false, null);
    }

    @Bean
    public Binding binding1(){
        //指定路由key1绑定队列1和交换器1
        return BindingBuilder.bind(queue1()).to(exchange1()).with(ROUTINGKEY1);
    }

    @Bean
    public Queue queue2(){
        return new Queue(QUEUE2,true,false,false, null);
    }

    @Bean
    public TopicExchange exchange2(){
        return new TopicExchange(EXCHANGE2, true, false, null);
    }

    @Bean
    public Binding binding2(){
        //指定路由key2绑定队列2和交换器2
        return BindingBuilder.bind(queue2()).to(exchange2()).with(ROUTINGKEY2);
    }

}
ScheduledTask类

这里使用定时任务模拟消息发送

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;

@Configuration
@EnableScheduling
public class ScheduledTask {

    @Autowired
    private RabbitTemplate template;

    private static final String EXCHANGE1 = "exchange1";
    private static final String EXCHANGE2 = "exchange2";
    private static final String ROUTINGKEY1 = "route1";
    private static final String ROUTINGKEY2 = "route2";
    private Integer id = 0;

    @Scheduled(cron = "0/1 * * * * ?")
    public void task1() {
        User user = new User();//模拟业务类
        user.setId(id);
        if(id % 2 == 0){//轮流给2个队列发送消息模拟
            user.setName("队列1 name"+id);
            template.convertAndSend(EXCHANGE1, ROUTINGKEY1, user);
        }else{
            user.setName("队列2 name"+id);
            template.convertAndSend(EXCHANGE2, ROUTINGKEY2, user);
        }
        System.out.println("定时发送消息:" + user.getName());
        id++;
    }
}
5.Consumer配置 HellowConsumer类

这里消费者使用注解来监听2个队列

import com.example.rabbitmq.config.User;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;

@Component
public class HellowConsumer {

    @RabbitListener(
            queues = {"queue1"},	//队列名称
            concurrency = "1"	//并发数
    )
    public void service1(User user){
        System.out.println("消息队列1推送来的消息" + user.getName());
    }

    @RabbitListener(
            queues = {"queue2"},	//队列名称
            concurrency = "1"	//并发数
    )
    public void service2(User user){
        System.out.println("消息队列2推送来的消息" + user.getName());
    }

}
6.测试

注意:由于Consumer项目没有配置预创建交换器和队列,所以先启动Producer项目,否则会报错!!!

如果RabbitMQ里面存在同名的交换器或者队列,但是持久化、排他、自动删除参数不一致也会报错!!!

启动Producer

启动Consumer

至此测试成功~

自动创建的交换器和队列

绑定的路由key

生产者手动ACK确认配置 1.Producer配置 application.properties
#选择确认类型为交互
spring.rabbitmq.publisher-/confirm/i-type=correlated
#开启发送消息失败回调
spring.rabbitmq.publisher-returns=true
RabbitConfig类
	@Bean
    public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory){
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        //定义发送消息到交换器ACK确认的回调函数
        rabbitTemplate.set/confirm/iCallback((CorrelationData correlationData, boolean ack, String s) -> {
            if(ack){
                System.out.println("消息确认成功 ");
            }else{
                System.out.println("消息确认失败 "+s);
            }
        });
        //定义发送消息到交换器ACK失败的回调函数
        rabbitTemplate.setReturnsCallback((returnedMessage) -> {
            System.out.println("消息发送失败 "+returnedMessage.getMessage());
        });
        return rabbitTemplate;
    }
2.启动Producer

消费者手动ACK确认配置 1.Consumer配置 application.properties
#配置ACK确认为手动
spring.rabbitmq.listener.simple.acknowledge-mode=manual
HellowConsumer
import com.example.rabbitmq.config.User;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;

@Component
public class HellowConsumer {

    @RabbitListener(
            queues = {"queue1"},	//队列名称
            concurrency = "1"	//并发数
    )
    public void service1(User user, Message message, Channel channel){
        final long deliveryTag = message.getMessageProperties().getDeliveryTag();
        System.out.println("消息队列1推送来的消息" + user.getName());
        try {
            if(message.getMessageProperties().getRedelivered()){
//                public void basicReject(
//                long deliveryTag, //消息标签
//                boolean requeue   //是否放回队列
//                )
                //拒收该消息
                channel.basicReject(deliveryTag, false);
                System.out.println("消息队列1的消息" + user.getName()+" 拒绝再次处理");
            }else{
//                public void basicNack(     
//                long deliveryTag, //消息标签
//                boolean multiple, //是否批确认
//                boolean requeue   //是否放回队列
//                )
                //不确认该消息
                channel.basicNack(deliveryTag, false, true);
                System.out.println("消息队列1的消息" + user.getName()+" 返回队列重新处理");
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    @RabbitListener(
            queues = {"queue2"},	//队列名称
            concurrency = "1"	//并发数
    )
    public void service2(User user, Message message, Channel channel){
        System.out.println("消息队列2推送来的消息" + user.getName());
        final long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try {
//            public void basicAck(     
//            long deliveryTag,     //消息标签
//            boolean multiple      //是否批量确认
//            )
            //确认该消息
            channel.basicAck(deliveryTag,false);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

}
2.启动Consumer

死信队列配置 1.Consumer配置 RabbitConfig类

配置说明:在queue.old队列设置10s过期时间,过期后自动移入queue.dlx队列

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class RabbitConfig {

    //原队列
    private static final String QUEUEOLD = "queue.old";
    //原交换器
    private static final String EXCHANGEOLD = "exchange.old";
    //原路由key
    private static final String ROUTINGKEYOLD = "route.old";

    //死信队列
    private static final String QUEUEDLX = "queue.dlx";
    //死信交换器
    private static final String EXCHANGEDLX = "exchange.dlx";
    //死信路由key
    private static final String ROUTINGKEYDLX = "route.dlx";

    @Bean
    public Queue queueOld(){
        Map arguments = new HashMap<>();
        // 消息的生存时间 10s
        arguments.put("x-message-ttl", 10000);
        // 设置该队列所关联的死信交换器(当队列消息TTL到期后依然没有消费,则加 入死信队列)
        arguments.put("x-dead-letter-exchange", "exchange.dlx");
        // 设置该队列所关联的死信交换器的routingKey,如果没有特殊指定,使用原 队列的routingKey
        arguments.put("x-dead-letter-routing-key", "route.dlx");
        return new Queue(QUEUEOLD,true,false,false, arguments);
    }

    @Bean
    public TopicExchange exchangeOld(){
        return new TopicExchange(EXCHANGEOLD, true, false, null);
    }

    @Bean
    public Binding bindingOld(){
        return BindingBuilder.bind(queueOld()).to(exchangeOld()).with(ROUTINGKEYOLD);
    }

    @Bean
    public Queue queueDlx(){
        return new Queue(QUEUEDLX,true,false,false, null);
    }

    @Bean
    public DirectExchange exchangeDlx(){
        return new DirectExchange(EXCHANGEDLX, true, false, null);
    }

    @Bean
    public Binding bindingDlx(){
        return BindingBuilder.bind(queueDlx()).to(exchangeDlx()).with(ROUTINGKEYDLX);
    }

}
ScheduledTask类

向原队列定时发送消息(模拟)

    private static final String EXCHANGEOLD = "exchange.old";
    private static final String ROUTINGKEYOLD = "route.old";

    @Scheduled(cron = "0/1 * * * * ?")
    public void task2() {
        User user = new User();
        user.setId(id);
        user.setName("原队列 name"+id);
        template.convertAndSend(EXCHANGEOLD, ROUTINGKEYOLD, user);
        System.out.println("定时发送消息:" + user.getName());
        id++;
    }
2.Consumer配置 HellowConsumer类

监听死信队列(原队列不用监听)

	@RabbitListener(
            queues = {"queue.dlx"},
            concurrency = "1"
    )
    public void service3(User user){
        System.out.println("死信队列推送来的消息" + user.getName());
    }
3.测试 启动Producer

启动Consumer

测试成功~

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/773926.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号