栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

Springboot整合RabbitMQ

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Springboot整合RabbitMQ

1.进入jar包


    org.springframework.boot
    spring-boot-starter-amqp

2.声明队列、交换机、交换机与队列绑定关系(不需要则不声明,比如简单模式和工作队列模式只需要声明队列)

@Bean
public Queue orderDirectQueue() {
    return new Queue(MQQueueEnum.DIRECT_QUEUE_ORDER.getName(), true, false,false);
}
@Bean
public DirectExchange orderDirectExchange() {
    return new DirectExchange(MQExchangeEnum.EXCHANGE_DIRECT_ORDER.getName(), true, false);
}

@Bean
public Binding bindingDirect() {
    return BindingBuilder.bind(orderDirectQueue())
            .to(orderDirectExchange())
            .with(MQQueueEnum.DIRECT_QUEUE_ORDER.getRoutingKey());
}

3.构建消息发送者

private RabbitTemplate rabbitTemplate;

@Autowired
public void setRabbitTemplate(RabbitTemplate rabbitTemplate) {
    this.rabbitTemplate = rabbitTemplate;
    this.rabbitTemplate.setEncoding("UTF-8");
    this.rabbitTemplate.setConfirmCallback(confirmCallback);
    this.rabbitTemplate.setReturnCallback(returnCallback);
    this.rabbitTemplate.setReplyTimeout(60000);
}
public void sendOrder(String message){
    logger.info("发送在线商城订单创建消息: {}", message);
    sendMsg(MQExchangeEnum.EXCHANGE_DIRECT_ORDER.getName(), MQQueueEnum.DIRECT_QUEUE_ORDER.getRoutingKey(), message, new CorrelationData(RandomUtils.getUUID()));
}

private void sendMsg(String exchangeName, String routingKey, Object request, CorrelationData correlationData) {
    rabbitTemplate.convertAndSend(exchangeName, routingKey, request, message -> {
        message.getMessageProperties().getHeaders().put("spring_returned_message_correlation", correlationData.getId());
        message.getMessageProperties().getHeaders().put("send_ip", getLocalIp());
        return message;
    }, correlationData);
}

private String getLocalIp() {
    try {
        return WebToolUtils.getLocalIP();
    } catch (Exception e) {
        logger.error("获取本地IP异常", e);
    }
    return "0.0.0.0";
}

4.构建消息消费者

@Component

public class OnlineShopOrderReceiver {

    private static Logger logger = LoggerFactory.getLogger(OnlineShopOrderReceiver.class);

    @RabbitListener(queues = {"onlineshop.order.queue.direct"}, errorHandler = "rabbitListenerErrorHandler")
    public void onMessage(Channel channel, Message message) throws IOException, InterruptedException {
        logger.info("收到在线商城订单创建{}发送的消息{}", message.getMessageProperties().getHeaders().get("send_ip"), message);
        //如果不写basicQos(1),则自动MQ会将所有请求平均发送给所有消费者
        //basicQos,MQ不再对消费者一次发送多个请求,而是消费者处理完一个消息后(确认后),在从队列中获取一个新的
        // 处理完业务再取下一条
        //channel.basicQos(1);
        //Thread.sleep(5000);
        try {
            Object correlation = message.getMessageProperties().getHeaders().get("spring_returned_message_correlation");
            if (correlation == null) {
                logger.info("收到错误消息: {}", message);
            } else {
                logger.info("消息处理: {}", new String(message.getBody()));
            }
        } finally {
           channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        }
    }

}

5.配置文件需要配置MQ连接信息

# 配置RabbitMQ的基本信息  ip 端口 username  password..
spring:
  rabbitmq:
    host: #ip
    port: 5672
    username: admin
    password: 123456
    virtual-host: /
    publisher-confirms: true
    publisher-returns: true
    template:
      mandatory: true
      retry:
        enabled: true
        initial-interval: 3000ms
        max-attempts: 3
        max-interval: 10000ms
        multiplier: 1
    #配置消费端
    listener:
      direct:
        acknowledge-mode: manual
      simple:
        acknowledge-mode: manual
        #最小的消费者数量
        concurrency: 5
        #消费失败后从新消费
        #决定被拒绝的消息是否重新入队;默认是true(与参数acknowledge-mode有关系)
        default-requeue-rejected: true
        #最大的消费者数量
        max-concurrency: 5
        #一个消费者最多可处理的nack消息数量,如果有事务的话,必须大于等于transaction数量
        prefetch: 1
        retry:
          #监听重试是否可用
          enabled: true
          #第一次和第二次尝试传递消息的时间间隔
          initial-interval: 1000ms
          #最大重试次数
          max-attempts: 5
          #最大重试时间间隔
          max-interval: 10000ms
          #应用于上一重试间隔的乘数
          multiplier: 1.0
          #重试时有状态or无状态
          stateless: false
 RabbitMQ高级特性

1.消息的可靠性投递

springboot开启confirm,return模式

publisher-confirms: true

publisher-returns: true

final RabbitTemplate.ConfirmCallback confirmCallback =
        (correlationData, ack, cause) -> logger.info("correlationData: {}  ack:{}", correlationData, ack);


final RabbitTemplate.ReturnCallback returnCallback =
        (message, replyCode, replyText, exchange, routingKey) ->
        logger.info("消息{}不可达,exchange: {}, routingKey: {}, replyCode: {}, replyText: {}",
            message.toString(), exchange, routingKey, replyCode, replyText);

 消费端设置消息签收

listener:
  simple:
    acknowledge-mode: manual/none

manual:手动确认(推荐使用),消息手动签收后,才会从队列中清空,如果此处业务处理异常,可以不签收,调用channel.basicNack回退给队列,让队列自动重新发送(一般不这么做),如果异常一直存在的话,消息会不断的发送,造成网络开销,建议签收然后将异常的消息放入缓存或数据库,手动处理。

none:自动确认,消息会在消费端自动签收,如果消息处理异常也不会重新发送消息了,造成了消息丢失。

保证消息可靠性
  1. 持久化--交换机持久化,队列持久化,消息持久化
  2. 生成端--confirm+return消息确认机制
  3. 消费端--消息签收处理
  4. Broker要高可用
2.消费端限流

可用设置一次性从队列中拉取多少条消息,消息签收一定要设置为手动签收

listener:
  simple:
    acknowledge-mode: manual
    prefetch: 1

如果从队列中拉取全量消息,势必会对消费端造成压力

3.TTL 

Time To Live 存活时间,可用设置每条消息存活时间

private void sendMsg(String exchangeName, String routingKey, Object request, CorrelationData correlationData) {
    rabbitTemplate.convertAndSend(exchangeName, routingKey, request, message -> {
        message.getMessageProperties().getHeaders().put("spring_returned_message_correlation", correlationData.getId());
        message.getMessageProperties().getHeaders().put("send_ip", getLocalIp());
        // 设置TTL消息过期时间
        message.getMessageProperties().setExpiration("10000");
        return message;
    }, correlationData);

也可以设置队列中全部消息存活时间,在声明队列时设置:

@Bean
public Queue orderDirectQueue() {
    Map map = new HashMap();
    // 设置队列中消息过期时间
    map.put("x-message-ttl",6000);
    return new Queue(MQQueueEnum.DIRECT_QUEUE_ORDER.getName(), true, false, false, map);
}

4.死信队列

 DLX:Dead Letter Exchange

当消息成为死信息后,会被发送达死信交换机中。

成为死信的情况:

1.队列消息长度达到限制

2.消息没被消费者消费,且不重回队列

3.消息设置了过期时间,到达了过期时间,没有被消费

 5.延时队列

TTL+死信队列构成延时队列,可用于订单过期未支付的一些业务处理

声明队列和交换机

@Bean
public Queue orderDirectQueue() {
    Map map = new HashMap();
    // 设置队列中消息过期时间
    map.put("x-message-ttl",6000);

    // x-dead-letter-exchange    这里声明当前队列绑定的死信交换机
    map.put("x-dead-letter-exchange", "deadLetterExchange");
    // x-dead-letter-routing-key  这里声明当前队列的死信路由key
    map.put("x-dead-letter-routing-key", "dle.err");
    return new Queue(MQQueueEnum.DIRECT_QUEUE_ORDER.getName(), true, false, false, map);
}

//声明死信Exchange
@Bean
public DirectExchange deadLetterExchange(){
    return new DirectExchange("deadLetterExchange");
}

// 声明死信队列A
@Bean
public Queue deadLetterQueue(){
    return new Queue("dle-queue");
}

@Bean
public Binding deadLetterQueueBinding(Queue deadLetterQueue, DirectExchange deadLetterExchange){
    return BindingBuilder.bind(deadLetterQueue).to(deadLetterExchange).with("dle.err");
}

消费者:

​​​​​​​

@RabbitListener(queues = "dle-queue")
public void deadLetterQueue(String msg, Channel channel, Message message) throws IOException {
    logger.info("死信队列消费消息: {}", msg);
    channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/872238.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号