队列配置
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;
@Configuration
@Component
public class xxxxxRabbitMqConfig {
// 交换机名
@Value("${xxxxxxxxxx}")
private String 交换机名;
// 队列名
@Value("${xxxxxxxxxx}")
private String 队列名;
// Routingkey
@Value("${xxxxxxxx}")
private String Routingkey;
// 声明 死信队列交换机
@Bean("exchange")(可以自定义名字)
public DirectExchange exchange() {
return new DirectExchange(channelOrgChangeExchange);
}
// 声明队列
@Bean("queue") (可以自定义名字)
public Queue queue() { // 导包: org.springframework.amqp.core
return new Queue(队列名字, true);
}
// 绑定交换机和队列
@Bean(绑定需要和前面定义名字关联)
public Binding queuebBindingX(@Qualifier("queue") Queue queue,
@Qualifier("exchange") DirectExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(routingKey);
}
}
消息发送
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
import com.alibaba.fastjson.JSON;
--------------------------------------------------------------
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private RedisTemplate redisTemplate;
---------------------------------------------------------------
//可以自定义发送消息实体 赋值转为json字符串
String sendMsg = JSON.toJSONString("发送消息实体");
CorrelationData correlationData = new CorrelationData();
// 绑定消息发送确认回调方法
rabbitTemplate.set/confirm/iCallback(/confirm/iCallback);
// 发送消息之前将消息存入 redis中 k消息id v发送消息
archiveMsg(correlationData.getId(), sendMsg);
rabbitTemplate.convertAndSend(交换机名字, routingkey, sendMsg, correlationData);
log.info("mq发送完成:消息[{}]", sendMsg);
private final RabbitTemplate.ConfirmCallback confirmCallback = (CorrelationData correlationData, boolean ack, String cause) -> {
// mQ 发消息唯一id
String id = correlationData != null ? correlationData.getId() : "";
if (ack) {
// 发送成功
if (!StringUtils.isEmpty(id)) {
redisTemplate.delete(id);
}
} else {
// 发送失败
repeatMsg(id);
}
};
// 发送消息存储Redis
private void archiveMsg(String id, String msg) {
if (com.alibaba.druid.util.StringUtils.isEmpty(id)) {
log.error("rabbitmq收到未知的空消息!");
return;
}
log.info("消息存档,消息Id[{}]", id);
redisTemplate.opsForSet().add(id, msg);
}
// 消息发送失败重新发送
private void repeatMsg(String id) {
if (com.alibaba.druid.util.StringUtils.isEmpty(id)) {
log.error("rabbitmq收到未知的空消息!");
return;
}
// redis中获取到失败的消息
String msg = redisTemplate.opsForValue().get(id).toString();
CorrelationData correlationData = new CorrelationData();
correlationData.setId(id);
// 绑定消息发送确认回调方法
rabbitTemplate.set/confirm/iCallback(/confirm/iCallback);
// 重新发送消息
rabbitTemplate.convertAndSend(交换机, routingkey, msg);
}
消息接收
```java
@RabbitHandler
@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "队列名字", durable = "true"), exchange =
@Exchange(value = "交换机名字"), key = "key名字"))
public void 方法名(@Payload String msg, @Headers Map headers, Channel channel)
throws Exception {
String msgId = String.valueOf(headers.get("id"));
long deliveryTag = (long) headers.get(AmqpHeaders.DELIVERY_TAG);
log.info("接mq通知消息,msgId:{},消息:{},headers:{}", msgId, msg, headers);
try {
自定义接收实体 = JSON.parseObject(msg, 转换类型.class);
log.info("实体对象:{}", 自定义接收实体);
} catch (Exception e) {
channel.basicNack(deliveryTag, false, false);
return;
}
if (null != 接收实体) {
//接收到实体处理
} catch (Exception e) {
log.info("处理异常");
}
}
}