栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

rabbitmq消息可靠之手动确认

rabbitmq消息可靠之手动确认

rabbitmq 消息值之手动确认 Consume

新建springboot项目,导入rabbitmq依赖

  
            org.springframework.boot
            spring-boot-starter-amqp
   

修改配置文件

server:
  port: 8102  #服务端口
​
spring:
  rabbitmq:
    host:  rabbitmqhost    #rabbitmq地址
    port: 15672   #rabbitmq amqp端口
    username: root  #rabbitmq用户名
    password: root  #rabbitmq密码
    virtual-host: serve #rabbitmq虚拟主机
    listener:
      type: simple
      simple:
        acknowledge-mode: MANUAL # MANUAL消息确认方式 手动确认 ,none 不确认 ,auto自动确认
        retry:
          enabled: true  #开启重试
          max-attempts: 3 #最大重试次数
          initial-interval: 5000ms #重试间隔时间

启动类增加@EnableRabbit

package com.example.demo;
​
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
​
​
@SpringBootApplication
@EnableRabbit
public class DemoApplication {
​
    public static void main(String[] args) {
​
        SpringApplication.run(DemoApplication.class, args);
    }
​
}

新增监听类MsgListener

package com.example.demo;
​

​

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
​
import java.io.IOException;
​

@Component
public class MsgListen {
    @RabbitListener(
            bindings = {@QueueBinding(value = @Queue(value = "blade-user-queue",durable = "ture"),
                    exchange = @Exchange(value = "serve-exchange"),key="blade-user")})
    public void consumer(Channel channel, Message message)throws IOException{
        System.out.println("收到消息"+message.getMessageProperties());
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
    }
}
​

消费者端进行绑定消息队列,非则生产者发送消息会产生路由消息失败问题

Produce

生产者配置

由于生产者可能会遇到消息发送失败的情况,所以需要用数据库进行持久化,来保证消息的可靠性

#数据源配置
spring:
  rabbitmq:
    host: 36.103.242.17
    port: 25672
    username: bladex
    password: bladex
    virtual-host: serve
    retry-interval: 10s
    exchange: serve-exchange
    max-retry-times: 3
    listener:
      type: simple
      simple:
        acknowledge-mode: MANUAL
        retry:
          enabled: true
          max-attempts: 3
          initial-interval: 5000m
  datasource:
    url: #数据库地址
    username: root #用户名
    password: root #密码

创建数据库表名为mq_message具体sql如下:

DROP TABLE IF EXISTS `mq_message`;
CREATE TABLE `mq_message` (
  `id` bigint(11) NOT NULL AUTO_INCREMENT,
  `msg_data` varchar(255) COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '业务数据',
  `exchange_name` varchar(64) COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '交换机名称',
  `routing_key` varchar(64) COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '路由键',
  `status` int(2) DEFAULT NULL COMMENT '消息状态',
  `retry_times` int(11) DEFAULT NULL COMMENT '重试次数',
  `next_retry_date_time` datetime DEFAULT NULL COMMENT '下一次重试时间',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1410059209962156035 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci COMMENT='Mq可靠消息记录';
​
SET FOREIGN_KEY_CHECKS = 1;

对应实体类

package org.springblade.modules.msgtest;
​
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Builder;
import lombok.Data;
​
import java.time.LocalDateTime;
​

@Data
@TableName("mq_message")
@ApiModel(value = "mq消息对象", description = "mq消息可靠记录表")
@Builder
public class MqMessage {
​
    private static final long serialVersionUID = 1L;
​
​
    @ApiModelProperty("主键id")
    @TableId(value = "id", type = IdType.ASSIGN_ID)
    private Long id;
​
    
    @TableField("msg_data")
    private String msgData;
​
    
    @TableField("exchange_name")
    private String exchangeName;
​
    
    @TableField("routing_key")
    private String routingKey;
​
    
    @TableField("status")
    private int status;
​
    
    @TableField("retry_times")
    private int retryTimes;
​
    
    @TableField("next_retry_date_time")
    private LocalDateTime nextRetryDateTime;
​
}
​

进行rabbitmq工厂连接,并进行消息手动确认,当发送失败进行重试,并设置重试机制,当消息路由失败时进行控制台打印

package org.springblade.modules.msgtest;
​
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
​
import javax.annotation.Resource;
import java.time.Duration;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
​

@Configuration
@Slf4j
public class RabbitMqConfiguration {
​
    
    @Value("${spring.rabbitmq.max-retry-times}")
    private int maxRetryTimes;
​
    
    @Value("${spring.rabbitmq.retry-interval}")
    private Duration retryInterval;
​
    @Resource
    private MqMessageMapper mqMessageMapper;
​
​
    
    @Bean(name = "serveConnectionFactory")
    @Primary
    public ConnectionFactory serveConnectionFactory(
        @Value("${spring.rabbitmq.host}") String host,
        @Value("${spring.rabbitmq.port}") int port,
        @Value("${spring.rabbitmq.username}") String username,
        @Value("${spring.rabbitmq.password}") String password,
        @Value("${spring.rabbitmq.virtual-host}") String virtualHost) {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setHost(host);
        connectionFactory.setPort(port);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        connectionFactory.setVirtualHost(virtualHost);
        //发送确认机制
        connectionFactory.setPublisherConfirmType(CachingConnectionFactory./confirm/iType.CORRELATED);
        //路由失败回调
        connectionFactory.setPublisherReturns(true);
        return connectionFactory;
    }
​
    @Bean(name = "serveRabbitTemplate")
    @Primary
    @ConditionalOnProperty(
        name = "message.type.serve-rabbitmq",
        havingValue = "true"
    )
    public RabbitTemplate serveRabbitTemplate(@Qualifier("serveConnectionFactory") ConnectionFactory connectionFactory) {
        RabbitTemplate serverRabbitTemplate = new RabbitTemplate(connectionFactory);
        //消息路由失败通知监听者,而不是将消息丢弃
        serverRabbitTemplate.setMandatory(true);
        //确认消息是否到达MQ
        serverRabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            String correlationDataId = correlationData.getId();
            if (ack) {
                //ACK
                log.debug("消息[{}]投递成功,将DB中的消息状态设置为投递成功", correlationDataId);
                mqMessageMapper.update(null,
                    Wrappers.lambdaUpdate()
                        .set(MqMessage::getStatus, MessageStatusEnum.SUCCESS.getStatus())
                        .eq(MqMessage::getId, correlationDataId)
                );
            } else {
                System.out.println("消息[{}]投递失败,cause:{}");
​
                log.debug("消息[{}]投递失败,cause:{}", correlationDataId, cause);
                //NACK,消息重发
                MqMessage message = mqMessageMapper.selectById(correlationDataId);
                if (message.getRetryTimes() < maxRetryTimes) {
                    //进行重试
                    serverRabbitTemplate.convertAndSend(message.getExchangeName(), message.getRoutingKey(), message.getMsgData(), new CorrelationData(correlationDataId));
                    //更新DB消息状态
                    mqMessageMapper.update(null,
                        Wrappers.lambdaUpdate()
                            .set(MqMessage::getStatus, MessageStatusEnum.SENDING.getStatus())
                            .set(MqMessage::getNextRetryDateTime, LocalDateTime.now(ZoneOffset.ofHours(8)).plus(retryInterval))
                            .set(MqMessage::getRetryTimes, message.getRetryTimes() + 1)
                            .eq(MqMessage::getId, correlationDataId)
                    );
                }
            }
        });
​
        //配置return监听处理,消息无法路由到queue触发,此处只打印了相关信息,具体逻辑需自己实现
        serverRabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            System.out.println("=============returnCallback触发。消息路由到queue失败===========");
            System.out.println("msg="+new String(message.getBody()));
            System.out.println("replyCode="+replyCode);
            System.out.println("replyText="+replyText);
            System.out.println("exchange="+exchange);
            System.out.println("routingKey="+routingKey);
        });
​
        return serverRabbitTemplate;
    }
​
​
}
​

进行消息发送业务编写

package org.springblade.modules.msgtest;
​
import lombok.AllArgsConstructor;
import org.springblade.core.launch.constant.AppConstant;
import org.springblade.core.tenant.annotation.NonDS;
import org.springblade.core.tool.api.R;
import org.springblade.core.tool.jackson.JsonUtil;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import springfox.documentation.annotations.ApiIgnore;
​
import javax.annotation.Resource;
import java.time.Duration;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
​

​
@NonDS
@ApiIgnore
@RestController
@AllArgsConstructor
@RequestMapping("/msg")
public class sengMsg {
​
​
    @Resource
    @Qualifier(value = "serveRabbitTemplate")
    public RabbitTemplate serveRabbitTemplate;
​
    @Resource
    private MqMessageMapper  mqMessageMapper;
​
    @Value("${spring.rabbitmq.retry-interval}")
    private Duration retryInterval;
​
​
    @Value("${spring.rabbitmq.exchange}")
    private String exchange;
​
    @PostMapping("/sendMsg")
    public R sendMsg(@RequestBody msgBody msgBody){
        return R.status(send(msgBody));
    }
​
    boolean send(msgBody msgBody){
        MqMessage mqMessage=MqMessage.builder()
            .msgData(msgBody.getContent())
            .exchangeName(exchange)
            .routingKey(msgBody.getRoutingKey())
            .status(MessageStatusEnum.SENDING.getStatus())
            //下次重试时间
            .nextRetryDateTime(LocalDateTime.now(ZoneOffset.ofHours(8)).plus(retryInterval))
            .retryTimes(0)
            .build();
        mqMessageMapper.insert(mqMessage);
        CorrelationData correlationData = new CorrelationData(mqMessage.getId().toString());
        System.out.println("correlationData = " + JsonUtil.toJson(correlationData));
        //消息投递到MQ
        System.out.println("msgBody = " + JsonUtil.toJson(msgBody));
        System.out.println("msgBody = " + msgBody.getContent());
        System.out.println("msgBody = " + msgBody.getRoutingKey());
        serveRabbitTemplate.convertAndSend(exchange, msgBody.getRoutingKey(), msgBody.getContent(),correlationData);
        return true;
    }
}
​

消息手动确认,这样就实现了amqp协议的生产者和消费者,依据此可实现具体相关的rabbitmq业务。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/673612.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号