栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

SpringBoot整合RabbitMQ详细应用教程(1)-自动确认分发/接收消息

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

SpringBoot整合RabbitMQ详细应用教程(1)-自动确认分发/接收消息

本文主要介绍rabbitmq的引入、配置、发送和接收消息,暂不考虑消息的丢失和重复等问题

一、导入maven依赖,我使用的版本和parent的版本一致2.3.12.RELEASE

        
        
            org.springframework.boot
            spring-boot-starter-amqp
        

二、application.yml中RabbitMQ的基本配置信息,请先自行安装RabbitMQ

spring:
  rabbitmq:
    host: 101.200.xxx.xx
    port: 5672
    username: admin
    password: admin

三、新建RabbitConfig类配置队列,交换器,绑定关系

常用的Exchange有以下三种,本文示例为DirectExchange

DirectExchange:固定RoutingKey,绑定时和发送消息时完全一致才能经exchange发送到queue

FanoutExchange:广播模式,不需要routingkey,发送到exchange下所有queue

TopicExchange:RoutingKey绑定时可用*和#模式,例如绑定使用*.order.#,那么发送时使用pay.order也能发送到绑定的queue中

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitTestConfig {

    public static final String MY_TEST_DIRE_QUEUE = "MyTestDireQueue";
    public static final String MY_TEST_DIRECT_EXCHANGE = "MyTestDirectExchange";
    public static final String MY_TEST_DIRECT_ROUTING = "MyTestDirectRouting";

    
    @Bean
    public Queue testDireQueue() {
        
        return new Queue(MY_TEST_DIRE_QUEUE, true);
    }

    
    @Bean
    DirectExchange testDirectExchange() {
        return new DirectExchange(MY_TEST_DIRECT_EXCHANGE,true,false);
    }

    
    @Bean
    Binding bindingDirect() {
        return BindingBuilder.bind(testDireQueue()).to(testDirectExchange()).with(MY_TEST_DIRECT_ROUTING);
    }
}

四、分发消息

1、新建一个MessageTest

@Data
public class MessageTest implements Serializable {

    private static final long serialVersionUID = -3496661997686932274L;

    private String messageId;

    private String content;
}

2、使用RabbitTemplate发送消息,注意发送消息的时机

      例如再实际业务中,如果存在事务,应在事务提交后发送消息。1、以防消费方消费了该消息,生产方之后异常事务回滚导致数据不一致,2、或者生产方事务未提交,消费方收到消息后去数据库查询该数据,该数据不存在等问题。

@RestController
@RequestMapping("message/test")
@Slf4j
public class SendMessageTestController {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping("/send")
    public String sendTest(@RequestParam String message) {
        String messageId = String.valueOf(UUID.randomUUID());
        MessageTest myMessage = new MessageTest();
        myMessage.setMessageId(messageId);
        myMessage.setContent(message);
        rabbitTemplate.convertAndSend(RabbitTestConfig.MY_TEST_DIRECT_EXCHANGE, RabbitTestConfig.MY_TEST_DIRECT_ROUTING, myMessage);

        return "成功";
    }
}

五、接收消息,注意接收消息方法不要抛出异常,异常会导致消息不能正常消费,从新放入到队列死循环

import com.email.server.config.RabbitTestConfig;
import com.email.server.dto.MessageTest;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

@Service
@Slf4j
public class ConsumeService {
    
    @RabbitListener(queues = RabbitTestConfig.MY_TEST_DIRE_QUEUE)
    public void receive(MessageTest data) {
        try {
            log.info("TestDireQueue receive message================={}",data);
        } catch (Exception ex) {
            log.error("err",ex);
        }
    }
}

 后续再记录/confirm/iCallback,ReturnCallback,和手动ack确认消息

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/678824.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号