一、声明队列交换机绑定关系
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfigTest {
public static final String TEST_QUEUE = "test.queue";
public static final String TEST_EXCHANGE = "test.exchange";
public static final String TEST_KEY = "TEST.NONE" ;
@Bean(TEST_QUEUE)
public Queue testQueue() {
return new Queue(TEST_QUEUE);
}
@Bean(TEST_EXCHANGE )
public TopicExchange emailTodoTopicExchange() {
return new TopicExchange(TEST_EXCHANGE , true, false);
}
@Bean
public Binding bindingTestQueue(@Qualifier(TEST_QUEUE) Queue queue,
@Qualifier(TEST_EXCHANGE ) TopicExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(TEST_KEY);
}
@Bean
public Jackson2JsonMessageConverter testConverter() {
return new Jackson2JsonMessageConverter();
}
}
二、创建消费者
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import java.io.IOException;
@Component
@Slf4j
public class MqTest {
@Autowired
private StringRedisTemplate stringRedisTemplate;
@RabbitListener(queues = "test.queue")
public void sendMail(ActivitiEventVO event, Channel channel, Message message) throws IOException {
// 获取消息体中的幂等性KEY
String businessKey = event.getBusinessKey();
// 创造redis缓存KEY,为了记录重试次数
String key = "TEST_QUEUE" + ":" + businessKey;
Long count = stringRedisTemplate.opsForValue().increment(key, 1);
try {
// todo 业务代码 切记出现异常需要抛出,否则不会重试,例如下方注释代码:
if ("5".equals(businessKey) || "2".equals(businessKey)) {
log.error("消费失败:{}", businessKey);
throw new RuntimeException();
}
} catch (Exception e) {
e.printStackTrace();
// 这里的10次与后续配置文件中的重试次数相同
if (count != null && count >= 10) {
log.error("重试次数达限删除");
// 重试次数超限处理
basicAck("追加提取流程邮件通知", event, key, channel, message.getMessageProperties().getDeliveryTag());
return;
} else {
throw new RuntimeException();
}
}
// 正常确认消费消息
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
// 删除redis中缓存的key
stringRedisTemplate.delete(key);
log.info("消费成功:{}", businessKey);
}
public void basicAck(String type, ActivitiEventVO event, String key, Channel channel, long var1) throws IOException {
try {
// 删除redis中缓存的key
stringRedisTemplate.delete(key);
if ("5".equals(event.getBusinessKey())) {
log.error("模拟确认失败:{}", event.getBusinessKey());
throw new RuntimeException();
}
// 确认消费消息
channel.basicAck(var1, false);
// todo 可视业务作出警报处理等,例如发邮件报警
} catch (Exception e) {
log.error("消息消费确认失败");
e.printStackTrace();
try {
//出现异常,不删除队列信息,信息重新放置在队列中
channel.basicNack(var1, false, true);
} catch (IOException ioException) {
log.error("重新放置在队列失败");
ioException.printStackTrace();
}
}
}
}
三、配置yml文件
spring:
rabbitmq:
addresses: 172.168.0.1:5672 #RabbitMq的服务器地址
username: root #RabbitMq的服务器的登录名
password: root #RabbitMq的服务器的登录密码
listener:
simple:
retry:
enabled: true #是否开启消费者重试(为false时关闭消费者重试,这时消费端代码异常会一直重复收到消息)
max-attempts: 10 #最大重试次数
initial-interval: 10000 #重试间隔时间(单位毫秒)
max-interval: 30000 #重试最大时间间隔(单位毫秒)
multiplier: 2 #应用于上一次重试间隔的乘法器,例如上次间隔10秒,本次间隔20秒,但上限不能超过重试最大时间间隔。
acknowledge-mode: manual #手动确认消息
prefetch: 1 #消息预读数量 1表示每次从队列中读取一条消息