2.配置rabbitmqorg.springframework.boot spring-boot-starter-amqp
- 配置文件
rabbitmq:
host: 127.0.0.1
port: 5672
username: zytool
password: 123456
virtual-host: /
publisher-returns: true # 开启发送消息失败返回 对应RabbitTemplate.ReturnCallback接口
publisher-/confirm/i-type: correlated # publisher-/confirm/is和publisher-returns是对于消息生产端的配置
listener:
simple:
#并发数与最大并发数
concurrency: 2
max-concurrency: 2
#预取数
prefetch: 6
acknowledge-mode: manual #开启手动确认模式(针对于消息消费端)
- 配置交换机、队列、以及绑定
@Configuration
@Slf4j
public class RabbitMqConfig implements RabbitTemplate./confirm/iCallback, RabbitTemplate.ReturnCallback{
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void init(){
rabbitTemplate.set/confirm/iCallback(this); // 指定 /confirm/iCallback
rabbitTemplate.setReturnCallback(this); // 指定 ReturnCallback
}
@Bean
public DirectExchange myDirectExchange() {
// 参数意义:
// name: 名称
// durable: true
// autoDelete: 自动删除
return new DirectExchange(MqConstant.NEWSEXCHANGE, true, false);
}
@Bean
public Queue myDirectQueue() {
return new Queue(MqConstant.NEWSQUEUE, true);
}
@Bean
public Binding bindingDirect() {
return BindingBuilder.bind(myDirectQueue())
.to(myDirectExchange())
.with(MqConstant.NEWSROUTINGKEY);
}
}
3. 实现消息发送确认
配置类需要实现RabbitTemplate./confirm/iCallback, RabbitTemplate.ReturnCallback
@Override
public void /confirm/i(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
log.info("消息成功到达exchange!");
} else {
log.info("消息未能成功到达exchange!{}", correlationData.getReturnedMessage());
log.info("消息到达Exchange失败原因:{}", cause);
// 根据业务逻辑实现消息补偿机制。如记录到数据库或者日志里面,通过定时任务再去跑
}
}
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
log.info("消息报文:{}", new String(message.getBody()));
log.info("消息编号:{}", replyCode);
log.info("描述:{}", replyText);
log.info("交换机名称:{}", exchange);
log.info("路由名称:{}", routingKey);
// 根据业务逻辑实现消息补偿机制
}
4. 实现消息消费确认
这里我通过 NewsLister 类来监听队列
@Component
@Slf4j
public class NewsListener {
@Autowired
private NewsInfoService newsInfoService;
@RabbitHandler
@RabbitListener(queues = "newsDirectQueue")
public void getNews(Object rabbitObject, Message message, Channel channel) throws InterruptedException, IOException {
String msg = new String(message.getBody());
Boolean ret = false;
try {
Thread.sleep(1000);
NewsInfoVO newsInfoVO = JSONObject.parseObject(msg, NewsInfoVO.class);
long snowId = SnowflakeIdUtil.getSnowId();
newsInfoVO.setId(snowId + "");
newsInfoVO.setStatus(CommonConstant.NEWS_SUBMIT);
newsInfoVO.setCreateBy(666l);
newsInfoVO.setCreateTime(new Date());
newsInfoService.save(newsInfoVO);
ret = true;
} catch (Exception e) {
// redelivered = true, 表明该消息是重复处理消息
Boolean redelivered = message.getMessageProperties().getRedelivered();
try {
if (redelivered) {
// 消息已重复处理失败, 扔掉消息
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); // 拒绝消息
log.error("消息[{}]重新处理失败,扔掉消息", msg); //可以记录到数据库
}
// redelivered != true,表明该消息是第一次消费
if (!redelivered) {
// 消息重新放回队列
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
log.error("消息[{}]处理失败,重新放回队列", msg);
}
} catch (Exception e1) {
e1.printStackTrace();
}
} finally {
if (ret) {
//消息确认Ack
channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
log.info("消息消费正常!");
} else {
log.info("消息消费异常!");
}
}
}
}



