栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

springboot项目中整合RabbitMQ高级消息队列(技术篇)

springboot项目中整合RabbitMQ高级消息队列(技术篇)

注:此篇只是简单整合使用,不涉及RabbitMQ高级消息队列概念

1.在pom中导入rabbitMQ整合启动场景依赖

        
            org.springframework.boot
            spring-boot-starter-amqp
        

2.此时容器中自动配置了RabbitAutoConfiguration类,其给容器放四个重要的的对象

@Bean
		public CachingConnectionFactory rabbitConnectionFactory(RabbitProperties properties,
				ObjectProvider connectionNameStrategy) 


@Bean
		@ConditionalOnSingleCandidate(ConnectionFactory.class)
		@ConditionalOnMissingBean(RabbitOperations.class)
		public RabbitTemplate rabbitTemplate(RabbitProperties properties,
				ObjectProvider messageConverter,
				ObjectProvider retryTemplateCustomizers,
				ConnectionFactory connectionFactory)

@Bean
		@ConditionalOnSingleCandidate(ConnectionFactory.class)
		@ConditionalOnProperty(prefix = "spring.rabbitmq", name = "dynamic", matchIfMissing = true)
		@ConditionalOnMissingBean
		public AmqpAdmin amqpAdmin(ConnectionFactory connectionFactory)

@Bean
		@ConditionalOnSingleCandidate(RabbitTemplate.class)
		public RabbitMessagingTemplate rabbitMessagingTemplate(RabbitTemplate rabbitTemplate) 

3.主启动类上添加开启Rabbit注解

@EnableRabbit

4.编写配置文件

#配置rabbitMq的配置
#指定虚拟机的地址
spring.rabbitmq.host=145.85.127.158
#指定链接端口
spring.rabbitmq.port=5672
#哪个虚拟地址
spring.rabbitmq.virtual-host=/

5.使用AmqpAdmin进行创建Exchange,Queue,Binding等

@Autowired
    private AmqpAdmin amqpAdmin;

    

    //创建交换机
    @Test
    public void createExchange() {
        //创建交换机-交换机名字,是否持久化,是否自动删除
        DirectExchange directExchange = new DirectExchange("java-exchange", true, false);
        amqpAdmin.declareExchange(directExchange);
        log.info("Exchange创建成功:[{}]", "java-exchange");
    }

    //创建队列
    @Test
    public void createQueue() {
        //创建一个队列-队列名字,是否持久,是否排他,是否自动删除
        Queue queue = new Queue("java-queue", true, false, false);
        amqpAdmin.declareQueue(queue);
        log.info("Queue创建成功:[{}]", "java-queue");
    }

    //创建一个绑定关系
    @Test
    public void createBinding() {
//        String destination,目的地
//        DestinationType destinationType,目的地类型
//        String exchange,交换机
//        String routingKey,路由键(重要)(通过路由键判断交换机把消息给到哪个队列 )
//        Map arguments自定义参数
        Binding binding = new Binding("java-queue"
                , Binding.DestinationType.QUEUE
                , "java-exchange"
                , "hello.java", null);
        amqpAdmin.declareBinding(binding);
        log.info("binding创建成功:[{}]", "java-binding");
    }

6.如何通过RabbitTemplate收发消息

@Autowired
    private RabbitTemplate rabbitTemplate;
    
    //发送消息
//    @Test
//    public void sendMessage(){
//        String msg = "hello world";
//        //转换和发送消息,交换机名字,路由键,消息(这里还可以传递对象)
//        rabbitTemplate.convertAndSend("java-exchange","hello.java",msg);
//        log.info("消息发送成功:[{}]",msg);
//    }
    //发送消息
    @Test
    public void sendMessage(){
        String msg = "hello world";
        //这个要传递的类必须实现序列化
        OrderReturnReasonEntity orderReturnReasonEntity = new OrderReturnReasonEntity();
        orderReturnReasonEntity.setId(1L);
        orderReturnReasonEntity.setName("哈哈");
        orderReturnReasonEntity.setCreateTime(new Date());

        //发送对象消息可以是一个json
        //转换和发送消息,交换机名字,路由键,消息(这里还可以传递对象)
        rabbitTemplate.convertAndSend("java-exchange","hello.java",orderReturnReasonEntity);
        log.info("消息发送成功:[{}]",orderReturnReasonEntity);
    }

7.监听消息:使用@RabbitListener和@RabbitHandler


@RabbitListener(queues = {"java-queue"})//标识监听这个队列
@Service("orderItemService")
public class OrderItemServiceImpl extends ServiceImpl implements OrderItemService {


    
    @RabbitHandler //真正的消息队列处理在这里执行,根据这个参数OrderReturnReasonEntity的类型进行接收
    public void recieveMessage1(Message message, OrderReturnReasonEntity content,
                               Channel channel) {
        //获取消息体
        System.out.println("接收到的消息:" + message + "===>类型:" + content);
        byte[] body = message.getBody();
        //获取消息头
        MessageProperties messageProperties = message.getMessageProperties();
        //进行线程休眠
//        try {
//            Thread.sleep(1000);
//        } catch (InterruptedException e) {
//            e.printStackTrace();
//        }
        System.out.println("消息处理完成:"+content.getName());
//        System.out.println("接收到的消息:"+message+"===>类型:"+message.getClass());
    }

    @RabbitHandler //真正的消息队列处理在这里执行,根据这个参数OrderReturnReasonEntity的类型进行接收
    public void recieveMessage2(Message message, OrderEntity content,
                               Channel channel) {
        //获取消息体
        System.out.println("接收到的消息:" + message + "===>类型:" + content);
        byte[] body = message.getBody();
        //获取消息头
        MessageProperties messageProperties = message.getMessageProperties();
        //进行线程休眠
//        try {
//            Thread.sleep(1000);
//        } catch (InterruptedException e) {
//            e.printStackTrace();
//        }
        System.out.println("消息处理完成:"+content.getOrderSn());
//        System.out.println("接收到的消息:"+message+"===>类型:"+message.getClass());
    }

}

8.保证可靠抵达配置文件:

#开启确认消息已发送到交换机,选择确认类型为交互,/confirm/iCallback调用这个函数
spring.rabbitmq.publisher-/confirm/i-type=correlated
#开启发送端消息抵达队列的确认
spring.rabbitmq.publisher-returns=true
#只要抵达就以异步方式优先回调这个returncallback函数
spring.rabbitmq.template.mandatory=true

#手动ack消息回复
spring.rabbitmq.listener.simple.acknowledge-mode=manual

9.保证可靠抵达服务端配置:

@Configuration
public class MyRabbitConfig {

    @Autowired
    private RabbitTemplate rabbitTemplate;//为实现可靠抵达,服务端需要进行定制化set/confirm/iCallback,setReturnCallback

    //如果容器中有消息转化对象,就用容器中的
    @Bean
    public MessageConverter getMessageConverter(){
        return new Jackson2JsonMessageConverter();
    }

    //定制化消息队列模板
    @PostConstruct//MyRabbitConfig对象创建完成后,执行这个方法
    public void initRabbitTemplate(){
        //设置确认回调
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                System.out.println("comfirm....correlationData["+correlationData+"]==>ack["+ack+"]==>cause["+cause+"]");
            }
        });
        //设置消息抵达队列的确认回调
        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                System.out.println("return....message["+message+"]==>replyCode["+replyCode+"]==>replyText["+replyText+"]==>exchange["+exchange+"]==>routingKey["+routingKey+"]");
            }
        });
    }
}

10.保证可靠抵达ack(修改监听处理方法):

@RabbitHandler //真正的消息队列处理在这里执行,根据这个参数OrderReturnReasonEntity的类型进行接收
    public void recieveMessage1(Message message, OrderReturnReasonEntity content,
                               Channel channel) throws IOException {
        //获取消息体
        System.out.println("接收到的消息:" + message + "===>类型:" + content);
        byte[] body = message.getBody();
        //获取消息头
        MessageProperties messageProperties = message.getMessageProperties();
        //进行线程休眠
//        try {
//            Thread.sleep(1000);
//        } catch (InterruptedException e) {
//            e.printStackTrace();
//        }
        System.out.println("消息处理完成:"+content.getName());
        
        //这个在channel通道中是自增的
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
//        System.out.println("接收到的消息:"+message+"===>类型:"+message.getClass());

        try {
            
            //签收模式 进行判断签收
            if (deliveryTag%2==0){

                channel.basicAck(deliveryTag,false);
                System.out.println("签收了消息:"+deliveryTag);
            }else {
                System.out.println("没有签收消息:"+deliveryTag);
            }
        } catch (IOException e) {
            //网络中断
            e.printStackTrace();
            //退货模式
            
            channel.basicNack(deliveryTag,false,false);
            
            //channel.basicReject();
            System.out.println("拒绝签收消息:"+deliveryTag);
        }
    }

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/744885.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号