注意:因为是写好了业务,后面总结记录的,所以直接贴代码,抹掉业务信息,对新手来说有一定的参考意义,如有错误,请见谅,知错不改哈哈哈哈哈哈哈哈哈哈。
一、自行百度,安装好rabbit所需要的环境,官网地址Messaging that just works — RabbitMQ
二、安装好环境后,引入依赖进项目。
三、创建消息队列配置,主要用于配置交换机、队列的绑定关系。
@Configuration
public class RabbitMqConfig {
@Bean
CustomExchange testPluginDirectExchange() {
HashMap args = new HashMap<>();
args.put("x-delayed-type", "direct");
return new CustomExchange(TestQueueEnum.QUEUE_TEST_PLUGIN.getExchange(), "x-delayed-message", true, false, args); // durable=true交换机持久化
}
@Bean
public Queue testPluginQueue() { // durable=true队列持久化
return new Queue(TestQueueEnum.QUEUE_TEST_PLUGIN.getQueue(),true,false,false);
}
@Bean
public Binding testPluginBinding(Queue testPluginQueue, CustomExchange testPluginDirectExchange) {
return BindingBuilder.bind(testPluginQueue)
.to(testPluginDirectExchange)
.with(TestQueueEnum.QUEUE_TEST_PLUGIN.getRoutingKey())
.noargs();
}
}
四、创建枚举配置类
**
* 消息队列枚举配置类
*
* @since: 2021/10/6 16:27
**/
@Getter
public enum TestQueueEnum {
QUEUE_TEST_PLUGIN("test.direct", "test.test", "test.test");
private String exchange;
private String queue;
private String routingKey;
TestQueueEnum(String exchange, String queue, String routingKey) {
this.exchange = exchange;
this.queue = queue;
this.routingKey = routingKey;
}
}
五、创建生产者
@Slf4j
@Component
public class TestProducer {
@Autowired
AmqpTemplate amqpTemplate;
@Autowired
RabbitTemplate rabbitTemplate;
@PostConstruct
private void init() {
rabbitTemplate.setMandatory(true);
}
public void sendTestMessage(TestBO testBO) {
// 这里设置消息正确生产并到达交换机中,回调确认时的唯一标识,用来进行消息发送确认
CorrelationData correlationData = new CorrelationData();
correlationData.setId(cloudContractURLReplaceBO.getFileId());
//给延时插件队列发送消息
rabbitTemplate.convertAndSend(TestQueueEnum.QUEUE_TEST_PLUGIN.getExchange(), TestQueueEnum.QUEUE_TEST_PLUGIN.getRoutingKey(), testBO, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
//给消息设置延迟毫秒值,通过给消息设置x-delay头来设置消息从交换机发送到队列的延迟时间
message.getMessageProperties().setHeader("x-delay", testBO.getDelayTimes());
message.getMessageProperties().setCorrelationId(testBO.getId());
return message;
}
}, correlationData);
}
}
六、创建消费者
@Slf4j
@Component
@RabbitListener(queues = "test.test", containerFactory = "rabbitListenerContainerFactory")
public class TestConsumer {
@RabbitHandler
public void handle(TestBO testBO, Channel channel, @Headers Map map) throws Exception{
if (map.get("error")!= null){
System.out.println("错误的消息");
try {
channel.basicNack((Long)map.get(AmqpHeaders.DELIVERY_TAG),false,true); //否认消息
return;
} catch (IOException e) {
e.printStackTrace();
}
}
try {
//这里写你的业务逻辑巴拉巴拉巴拉巴拉巴拉巴巴
// TODO
// 消息消费后确认
channel.basicAck((Long)map.get(AmqpHeaders.DELIVERY_TAG),false);
} catch (IOException e) {
try {
channel.basicNack((Long) map.get(AmqpHeaders.DELIVERY_TAG), false, true); //否认消息
log.info("否认消息");
return;
} catch (IOException ee) {
log.error("否认消息异常");
e.printStackTrace();
}
throw new Exception("错误信息:" + e);
}
}
}
七、消息回调(确认是否发送成功)
@Component
@Slf4j
public class ConfirmCallback implements RabbitTemplate./confirm/iCallback, RabbitTemplate.ReturnCallback{
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void init(){
rabbitTemplate.setConfirmCallback(this); //指定 ConfirmCallback
rabbitTemplate.setReturnCallback(this);
}
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) { // 发送成功!
log.info("消息唯一标识:"+correlationData.getId());
// log.info("发送确认结果:"+ack);
} else {
// log.error("消息唯一标识:"+correlationData.getId());
// log.error("发送确认结果:"+ack);
// log.error("失败原因:"+cause);
//这里写发布失败后的处理逻辑,这里暂时不处理。
}
}
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
log.info("消息无法被路由,被服务器退回。msg:{}, replyCode:{}. replyText:{}, exchange:{}, routingKey :{}",
new String(message.getBody()), replyCode, replyText, exchange, routingKey);
}
}


