新建springboot项目,导入rabbitmq依赖
org.springframework.boot spring-boot-starter-amqp
修改配置文件
server: port: 8102 #服务端口 spring: rabbitmq: host: rabbitmqhost #rabbitmq地址 port: 15672 #rabbitmq amqp端口 username: root #rabbitmq用户名 password: root #rabbitmq密码 virtual-host: serve #rabbitmq虚拟主机 listener: type: simple simple: acknowledge-mode: MANUAL # MANUAL消息确认方式 手动确认 ,none 不确认 ,auto自动确认 retry: enabled: true #开启重试 max-attempts: 3 #最大重试次数 initial-interval: 5000ms #重试间隔时间
启动类增加@EnableRabbit
package com.example.demo;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
@EnableRabbit
public class DemoApplication {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
}
新增监听类MsgListener
package com.example.demo;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
@Component
public class MsgListen {
@RabbitListener(
bindings = {@QueueBinding(value = @Queue(value = "blade-user-queue",durable = "ture"),
exchange = @Exchange(value = "serve-exchange"),key="blade-user")})
public void consumer(Channel channel, Message message)throws IOException{
System.out.println("收到消息"+message.getMessageProperties());
channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
}
}
消费者端进行绑定消息队列,非则生产者发送消息会产生路由消息失败问题
Produce生产者配置
由于生产者可能会遇到消息发送失败的情况,所以需要用数据库进行持久化,来保证消息的可靠性
#数据源配置 spring: rabbitmq: host: 36.103.242.17 port: 25672 username: bladex password: bladex virtual-host: serve retry-interval: 10s exchange: serve-exchange max-retry-times: 3 listener: type: simple simple: acknowledge-mode: MANUAL retry: enabled: true max-attempts: 3 initial-interval: 5000m datasource: url: #数据库地址 username: root #用户名 password: root #密码
创建数据库表名为mq_message具体sql如下:
DROP TABLE IF EXISTS `mq_message`; CREATE TABLE `mq_message` ( `id` bigint(11) NOT NULL AUTO_INCREMENT, `msg_data` varchar(255) COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '业务数据', `exchange_name` varchar(64) COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '交换机名称', `routing_key` varchar(64) COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '路由键', `status` int(2) DEFAULT NULL COMMENT '消息状态', `retry_times` int(11) DEFAULT NULL COMMENT '重试次数', `next_retry_date_time` datetime DEFAULT NULL COMMENT '下一次重试时间', PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=1410059209962156035 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci COMMENT='Mq可靠消息记录'; SET FOREIGN_KEY_CHECKS = 1;
对应实体类
package org.springblade.modules.msgtest;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Builder;
import lombok.Data;
import java.time.LocalDateTime;
@Data
@TableName("mq_message")
@ApiModel(value = "mq消息对象", description = "mq消息可靠记录表")
@Builder
public class MqMessage {
private static final long serialVersionUID = 1L;
@ApiModelProperty("主键id")
@TableId(value = "id", type = IdType.ASSIGN_ID)
private Long id;
@TableField("msg_data")
private String msgData;
@TableField("exchange_name")
private String exchangeName;
@TableField("routing_key")
private String routingKey;
@TableField("status")
private int status;
@TableField("retry_times")
private int retryTimes;
@TableField("next_retry_date_time")
private LocalDateTime nextRetryDateTime;
}
进行rabbitmq工厂连接,并进行消息手动确认,当发送失败进行重试,并设置重试机制,当消息路由失败时进行控制台打印
package org.springblade.modules.msgtest;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import javax.annotation.Resource;
import java.time.Duration;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
@Configuration
@Slf4j
public class RabbitMqConfiguration {
@Value("${spring.rabbitmq.max-retry-times}")
private int maxRetryTimes;
@Value("${spring.rabbitmq.retry-interval}")
private Duration retryInterval;
@Resource
private MqMessageMapper mqMessageMapper;
@Bean(name = "serveConnectionFactory")
@Primary
public ConnectionFactory serveConnectionFactory(
@Value("${spring.rabbitmq.host}") String host,
@Value("${spring.rabbitmq.port}") int port,
@Value("${spring.rabbitmq.username}") String username,
@Value("${spring.rabbitmq.password}") String password,
@Value("${spring.rabbitmq.virtual-host}") String virtualHost) {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost(host);
connectionFactory.setPort(port);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
connectionFactory.setVirtualHost(virtualHost);
//发送确认机制
connectionFactory.setPublisherConfirmType(CachingConnectionFactory./confirm/iType.CORRELATED);
//路由失败回调
connectionFactory.setPublisherReturns(true);
return connectionFactory;
}
@Bean(name = "serveRabbitTemplate")
@Primary
@ConditionalOnProperty(
name = "message.type.serve-rabbitmq",
havingValue = "true"
)
public RabbitTemplate serveRabbitTemplate(@Qualifier("serveConnectionFactory") ConnectionFactory connectionFactory) {
RabbitTemplate serverRabbitTemplate = new RabbitTemplate(connectionFactory);
//消息路由失败通知监听者,而不是将消息丢弃
serverRabbitTemplate.setMandatory(true);
//确认消息是否到达MQ
serverRabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
String correlationDataId = correlationData.getId();
if (ack) {
//ACK
log.debug("消息[{}]投递成功,将DB中的消息状态设置为投递成功", correlationDataId);
mqMessageMapper.update(null,
Wrappers.lambdaUpdate()
.set(MqMessage::getStatus, MessageStatusEnum.SUCCESS.getStatus())
.eq(MqMessage::getId, correlationDataId)
);
} else {
System.out.println("消息[{}]投递失败,cause:{}");
log.debug("消息[{}]投递失败,cause:{}", correlationDataId, cause);
//NACK,消息重发
MqMessage message = mqMessageMapper.selectById(correlationDataId);
if (message.getRetryTimes() < maxRetryTimes) {
//进行重试
serverRabbitTemplate.convertAndSend(message.getExchangeName(), message.getRoutingKey(), message.getMsgData(), new CorrelationData(correlationDataId));
//更新DB消息状态
mqMessageMapper.update(null,
Wrappers.lambdaUpdate()
.set(MqMessage::getStatus, MessageStatusEnum.SENDING.getStatus())
.set(MqMessage::getNextRetryDateTime, LocalDateTime.now(ZoneOffset.ofHours(8)).plus(retryInterval))
.set(MqMessage::getRetryTimes, message.getRetryTimes() + 1)
.eq(MqMessage::getId, correlationDataId)
);
}
}
});
//配置return监听处理,消息无法路由到queue触发,此处只打印了相关信息,具体逻辑需自己实现
serverRabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
System.out.println("=============returnCallback触发。消息路由到queue失败===========");
System.out.println("msg="+new String(message.getBody()));
System.out.println("replyCode="+replyCode);
System.out.println("replyText="+replyText);
System.out.println("exchange="+exchange);
System.out.println("routingKey="+routingKey);
});
return serverRabbitTemplate;
}
}
进行消息发送业务编写
package org.springblade.modules.msgtest;
import lombok.AllArgsConstructor;
import org.springblade.core.launch.constant.AppConstant;
import org.springblade.core.tenant.annotation.NonDS;
import org.springblade.core.tool.api.R;
import org.springblade.core.tool.jackson.JsonUtil;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import springfox.documentation.annotations.ApiIgnore;
import javax.annotation.Resource;
import java.time.Duration;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
@NonDS
@ApiIgnore
@RestController
@AllArgsConstructor
@RequestMapping("/msg")
public class sengMsg {
@Resource
@Qualifier(value = "serveRabbitTemplate")
public RabbitTemplate serveRabbitTemplate;
@Resource
private MqMessageMapper mqMessageMapper;
@Value("${spring.rabbitmq.retry-interval}")
private Duration retryInterval;
@Value("${spring.rabbitmq.exchange}")
private String exchange;
@PostMapping("/sendMsg")
public R sendMsg(@RequestBody msgBody msgBody){
return R.status(send(msgBody));
}
boolean send(msgBody msgBody){
MqMessage mqMessage=MqMessage.builder()
.msgData(msgBody.getContent())
.exchangeName(exchange)
.routingKey(msgBody.getRoutingKey())
.status(MessageStatusEnum.SENDING.getStatus())
//下次重试时间
.nextRetryDateTime(LocalDateTime.now(ZoneOffset.ofHours(8)).plus(retryInterval))
.retryTimes(0)
.build();
mqMessageMapper.insert(mqMessage);
CorrelationData correlationData = new CorrelationData(mqMessage.getId().toString());
System.out.println("correlationData = " + JsonUtil.toJson(correlationData));
//消息投递到MQ
System.out.println("msgBody = " + JsonUtil.toJson(msgBody));
System.out.println("msgBody = " + msgBody.getContent());
System.out.println("msgBody = " + msgBody.getRoutingKey());
serveRabbitTemplate.convertAndSend(exchange, msgBody.getRoutingKey(), msgBody.getContent(),correlationData);
return true;
}
}
消息手动确认,这样就实现了amqp协议的生产者和消费者,依据此可实现具体相关的rabbitmq业务。



