package org.jeecg.boot.starter.rabbitmq.config;
import org.jeecg.boot.starter.rabbitmq.constant.MqConstant;
import org.jeecg.boot.starter.rabbitmq.core.MapMessageConverter;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.SerializerMessageConverter;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Scope;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class RabbitConfig {
@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setMandatory(true);
// template.setMessageConverter(new MapMessageConverter());
template.setMessageConverter(new SerializerMessageConverter());
//使用单独的发送连接,避免生产者由于各种原因阻塞而导致消费者同样阻塞
// template.setUsePublisherConnection(true);
// template.setConfirmCallback();
// template.setReturnCallback();
return template;
}
}
package org.jeecg.boot.starter.rabbitmq.constant;
public class MqConstant {
public final static String TEST_QUEUE = "test_queue";
public final static String TEST_EXCHANGE = "test_exchange";
public final static String ROUTERKEY = "test_queue";
public final static String TEST_QUEUE1 = "test_queue1";
public final static String ROUTERKEY1 = "test_queue1";
public final static String TEST_EXCHANGE1 = "test_exchange1";
public final static String DIRECT_QUEUE = "direct_queue";
public final static String DIRECT_EXCHANGE = "direct_exchange";
public final static String DIRECT_ROUTERKEY = "direct_routerKey";
public final static String DLX_EXCHANGE = "dlx_exchange";
public final static String DLX_QUEUE = "dlx_queue";
public final static String DLX_ROUTERKRY = "dlx_routerkey";
//topic 模式
public final static String TOPIC_EXCHANGE = "topic_exchange";
public final static String TOPIC_QUEUE = "topic_queue";
public final static String TPOIC_ROUTERKEY = "topic.#";
//fanout 模式
public final static String FANOUT_EXCHANGE = "fanout_exchange";
public final static String FANOUT_QUEUE = "fanout_queue";
//延迟队列
public static final Integer delay = 10000;
public static final String LAZY_EXCHANGE = "Lazy_Exchange";
public static final String LAZY_QUEUE = "Lazy_Queue";
public static final String LAZY_KEY = "lazy.#";
}
package org.jeecg.boot.starter.rabbitmq.callback;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.nio.charset.StandardCharsets;
@Component
@Slf4j
public class CustomConfirmAndReturnCallback implements RabbitTemplate./confirm/iCallback, RabbitTemplate.ReturnCallback {
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
log.info("returnedMessage回调方法>>>" + new String(message.getBody(), StandardCharsets.UTF_8) + ",replyCode:" + replyCode
+ ",replyText:" + replyText + ",exchange:" + exchange + ",routingKey:" + routingKey);
}
@Override
public void confirm(CorrelationData correlationData, boolean isSendSuccess, String error) {
log.info("/confirm/i回调方法>>>回调消息ID为: " + correlationData.getId());
if (isSendSuccess) {
log.info("/confirm/i回调方法>>>消息发送到交换机成功!");
} else {
log.info("/confirm/i回调方法>>>消息发送到交换机失败!,原因 : [{}]", error);
}
}
}
package org.jeecg.boot.starter.rabbitmq.core;
import org.jeecg.boot.starter.rabbitmq.constant.MqConstant;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.util.StringUtils;
import java.io.Serializable;
public class MqMessageFactoryInit {
public static Message init(String msg, MessageDeliveryMode deliveryMode, String headerKey, String headerValue, String expiration, Integer delay, String contentType,
String correlationId, String messageId, Integer priority, String replyTo) {
MessageProperties properties = new MessageProperties();
if (null != deliveryMode) {
properties.setDeliveryMode(deliveryMode);
}
if (!StringUtils.isEmpty(headerKey) && !StringUtils.isEmpty(headerValue)) {
properties.setHeader(headerKey, headerValue);
}
if (!StringUtils.isEmpty(expiration)) {
properties.setExpiration(expiration);
}
if (null != delay) {
properties.setDelay(delay); //设置延迟的时间
}
if (!StringUtils.isEmpty(contentType)) {
properties.setContentType(contentType);
}
if (!StringUtils.isEmpty(correlationId)) {
properties.setCorrelationId(correlationId);
}
if (!StringUtils.isEmpty(messageId)) {
properties.setMessageId(messageId);
}
if (null != priority) {
properties.setPriority(priority);
}
if (!StringUtils.isEmpty(replyTo)) {
properties.setReplyTo(replyTo);
}
return new Message(msg.getBytes(), properties);
}
public static CorrelationData init(String id) {
return new CorrelationData(id);
}
}
package org.jeecg.boot.starter.rabbitmq.core;
import java.util.HashMap;
import java.util.concurrent.CountDownLatch;
public class SharedVariableUtils {
//回调函数结果:Result管理仓库,每个发布都对应一个result,一个主线程,二个回调的子线程共享这个result,执行完后消除
private static HashMap resultMap = new HashMap<>();
//countDownLatch管理仓库,每个发布消息请求有两个回调:/confirm/iCallback和returnCallback,但第二个成功时就不会执行所以很难得到结果,所以这里并没有使用
//每个回调线程和主线程分别共享各自的CountDownLatch,这个主键可以由appId+tenantId+回调类型来区分。
private static HashMap countDownLatchMap = new HashMap<>();
//发布失败后执行次数的计数器,需要每个线程单独使用
private static ThreadLocal count = ThreadLocal.withInitial(() -> new Integer(0));
public static String confirmCall = "/confirm/i";
public static String returnCall = "return";
//==============================Result=====================================
private static void deleteResult(String resultName) {
resultMap.remove(resultName);
}
public static ResultVo getResult(String resultName) {
ResultVo result = resultMap.get(resultName);
deleteResult(resultName);
return result;
}
public static void setResult(String resultName, String key, String value) {
ResultVo result = new ResultVo();
result.put(key, value);
resultMap.put(resultName, result);
}
//========================================countDownLaunch=======================================================
public static void await(String countDownLatchName) {
CountDownLatch countDownLatch = countDownLatchMap.get(countDownLatchName);
if (countDownLatch == null) {
countDownLatch = new CountDownLatch(1);
countDownLatchMap.put(countDownLatchName, countDownLatch);
}
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void deleteCountDownLatch(String countDownLatchName) {
countDownLatchMap.remove(countDownLatchName);
}
public static void countDown(String countDownLatchName) {
CountDownLatch countDownLatch = countDownLatchMap.get(countDownLatchName);
if (countDownLatch == null) {
countDownLatch = new CountDownLatch(1);
countDownLatchMap.put(countDownLatchName, countDownLatch);
}
countDownLatch.countDown();
}
// ============================================count值===============================================================
//获取count的值
public static int getCount() {
return count.get();
}
//count++
public static void countUp() {
count.set(count.get() + 1);
}
//设置count
public static void setCount(int i) {
count.set(0);
}
//remove
}
package org.jeecg.boot.starter.rabbitmq.exchange;
import org.jeecg.boot.starter.rabbitmq.constant.MqConstant;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class DeadDirectExchangeConfig {
@Bean
public Queue directQueue() {
Map agruments = new HashMap();
agruments.put("x-dead-letter-exchange", MqConstant.DLX_EXCHANGE);
// agruments.put("x-message-ttl", 5000);
// queue:queue的名称
// exclusive:排他队列,如果一个队列被声明为排他队列,该队列仅对首次申明它的连接可见,并在连接断开时自动删除。这里需要注意三点:
// 1. 排他队列是基于连接可见的,同一连接的不同信道是可以同时访问同一连接创建的排他队列;
// 2.“首次”,如果一个连接已经声明了一个排他队列,其他连接是不允许建立同名的排他队列的,这个与普通队列不同;
// 3.即使该队列是持久化的,一旦连接关闭或者客户端退出,该排他队列都会被自动删除的,这种队列适用于一个客户端发送读取消息的应用场景。
// autoDelete:自动删除,如果该队列没有任何订阅的消费者的话,该队列会被自动删除。这种队列适用于临时队列。
return new Queue(MqConstant.DIRECT_QUEUE, true, false, false, agruments);
}
@Bean
public DirectExchange directExchange() {
return new DirectExchange(MqConstant.DIRECT_EXCHANGE, true, false);
}
@Bean
public Binding directBinding() {
return BindingBuilder.bind(directQueue()).to(directExchange()).with(MqConstant.DIRECT_ROUTERKEY);
}
}
package org.jeecg.boot.starter.rabbitmq.exchange;
import org.jeecg.boot.starter.rabbitmq.constant.MqConstant;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class LazyExchangeConfig {
@Bean
public TopicExchange lazyExchange() {
TopicExchange exchange = new TopicExchange(MqConstant.LAZY_EXCHANGE, true, false);
exchange.setDelayed(true); //开启延迟队列
return exchange;
}
@Bean
public Queue lazyQueue() {
return new Queue(MqConstant.LAZY_QUEUE, true);
}
@Bean
public Binding lazyBinding() {
return BindingBuilder.bind(lazyQueue()).to(lazyExchange()).with(MqConstant.LAZY_KEY);
}
}
package org.jeecg.boot.starter.rabbitmq.exchange;
import org.jeecg.boot.starter.rabbitmq.constant.MqConstant;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class TestDirectExchangeConfig {
@Bean
public Queue createQueue() {
//durable(耐用的)是为了防止宕机等异常而导致消息无法及时接收设计的。这个对queue无太多影响,但对topic影响比较大。
return new Queue(MqConstant.TEST_QUEUE, true);
}
@Bean
public DirectExchange createExchange() {
//autoDelete:true自动删除,如果该队列没有任何订阅的消费者的话,该队列会被自动删除。这种队列适用于临时队列。
return new DirectExchange(MqConstant.TEST_EXCHANGE, true, false);
}
@Bean
public Binding binding() {
return BindingBuilder.bind(createQueue()).to(createExchange()).with(MqConstant.ROUTERKEY);
}
@Bean
public Queue createQueue1() {
//durable(耐用的)是为了防止宕机等异常而导致消息无法及时接收设计的。这个对queue无太多影响,但对topic影响比较大。
return new Queue(MqConstant.TEST_QUEUE1, true);
}
@Bean
public DirectExchange createExchange1() {
//autoDelete:true自动删除,如果该队列没有任何订阅的消费者的话,该队列会被自动删除。这种队列适用于临时队列。
return new DirectExchange(MqConstant.TEST_EXCHANGE1, true, false);
}
@Bean
public Binding binding1() {
return BindingBuilder.bind(createQueue1()).to(createExchange1()).with(MqConstant.ROUTERKEY1);
}
}
package org.jeecg.boot.starter.rabbitmq.exchange;
import org.jeecg.boot.starter.rabbitmq.constant.MqConstant;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class TopicExchangeConfig {
@Bean
public Queue dlxQueue() {
return new Queue(MqConstant.DLX_QUEUE, true);
}
@Bean
public TopicExchange dlxExchange() {
return new TopicExchange(MqConstant.DLX_EXCHANGE, true, false);
}
@Bean
public Binding dlxBinding() {
return BindingBuilder.bind(dlxQueue()).to(dlxExchange()).with("#");
}
}
package org.jeecg.modules.pQuery.controller;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import lombok.extern.slf4j.Slf4j;
import org.jeecg.boot.starter.rabbitmq.constant.MqConstant;
import org.jeecg.modules.pQuery.rabitMq.Order;
import org.jeecg.modules.pQuery.sender.SendMqService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.bind.annotation.CrossOrigin;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.io.Serializable;
@Api(tags = "消息队列重写测试")
@RestController
@RequestMapping("/mq")
@CrossOrigin
@Slf4j
public class RabbitMqTest {
@Autowired
private SendMqService sendMqService;
@Value("${spring.rabbitmq.publisher-/confirm/is}")
private String /confirm/is;
@ApiOperation(value = "test1")
@GetMapping(value = "/send")
public void send(String msg) {
log.info("============:"+/confirm/is);
sendMqService.send(msg, MqConstant.ROUTERKEY);
}
@ApiOperation(value = "死信队列")
@GetMapping(value = "/dlsSend")
public void dlsSend(String msg) {
sendMqService.dlsSend(msg, MqConstant.DIRECT_ROUTERKEY);
}
@ApiOperation(value = "一对多的模式")
@GetMapping(value = "/sendTopic")
public void sendTopic(String msg) {
sendMqService.sendTopic(msg, "topic.message");
}
@ApiOperation(value = "广播模式")
@GetMapping(value = "/fanoutSender")
public void fanoutSender() {
Order order = new Order(2,"你好,我是fanout队列");
sendMqService.fanoutSender(order);
}
@ApiOperation(value = "延迟队列")
@GetMapping(value = "/lazySender")
public void lazySender(String msg) {
sendMqService.lazySender(msg,MqConstant.LAZY_KEY);
}
@ApiOperation(value = "测试死性队列/事务回滚/重试机制")
@GetMapping(value = "/testDb")
public void testDb() {
sendMqService.testDb();
}
}
package org.jeecg.modules.pQuery.receive;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.jeecg.boot.starter.rabbitmq.constant.MqConstant;
import org.jeecg.boot.starter.rabbitmq.listenter.MqListener;
import org.jeecg.boot.starter.rabbitmq.receive.baseReceive;
import org.jeecg.common.base.baseMap;
import org.jeecg.common.config.mqtoken.UserTokenContext;
import org.jeecg.common.websocket.DebugContext;
import org.jeecg.modules.pQuery.controller.RabbitMqTest;
import org.jeecg.modules.pQuery.mapper.PeOrderInfoMapper;
import org.jeecg.modules.pQuery.rabitMq.Order;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import java.io.IOException;
import java.util.Map;
@Component
@Slf4j
public class ReceiveMqService {
@Autowired
PeOrderInfoMapper peOrderInfoMapper;
private String token = UserTokenContext.getToken();
@RabbitListener(queues = MqConstant.TEST_QUEUE)
public void customer(Message message, Channel channel) throws IOException {
UserTokenContext.setToken(token);
long deliveryTag = message.getMessageProperties().getDeliveryTag();
channel.basicQos(0, 1, false);
log.info("deliveryTag:{}", deliveryTag);
Integer num = (Integer) message.getMessageProperties().getHeaders().get("num");
if (num == 0) {
//不回队列一个个不消费
channel.basicNack(deliveryTag, false, false);
} else {
log.info("basicAck:{}", "一个个消费");
//一个个消费
channel.basicAck(deliveryTag, false);
}
}
@RabbitListener(queues = MqConstant.DIRECT_QUEUE)
public void customer2(Message message, Channel channel) throws IOException, InterruptedException {
UserTokenContext.setToken(token);
channel.basicQos(0, 1, false);
String correlationId = message.getMessageProperties().getCorrelationId();
System.out.println(correlationId + "*******************");
long deliveryTag = message.getMessageProperties().getDeliveryTag();
System.err.println("-----------consume message----------");
System.err.println("body: " + new String(message.getBody()));
log.info("deliveryTag为:" + deliveryTag);
// channel.basicAck(deliveryTag, false);
channel.basicReject(deliveryTag, false);
}
@RabbitListener(queues = MqConstant.DLX_QUEUE)
public void dlxCustomer(Message message, Channel channel) throws IOException, InterruptedException {
UserTokenContext.setToken(token);
channel.basicQos(0, 1, false);
long deliveryTag = message.getMessageProperties().getDeliveryTag();
System.err.println("-----------consume message----------");
System.err.println("body: " + new String(message.getBody()));
Integer num = (Integer) message.getMessageProperties().getHeaders().get("num");
log.info("死信队列里面的deliveryTag为:" + deliveryTag);
log.info("私信队列准备ack消息了");
channel.basicAck(deliveryTag, false);
}
@RabbitListener(
bindings = @QueueBinding(
exchange = @Exchange(value = MqConstant.TOPIC_EXCHANGE, type = ExchangeTypes.TOPIC,
durable = "true", autoDelete = "false"),
value = @Queue(value = MqConstant.TOPIC_QUEUE, durable = "true"),
key = MqConstant.TPOIC_ROUTERKEY))
public void topicCustomer(Message message, Channel channel) throws IOException {
UserTokenContext.setToken(token);
long deliveryTag = message.getMessageProperties().getDeliveryTag();
System.out.println(deliveryTag + "*************");
channel.basicAck(deliveryTag, false);
}
@RabbitListener(bindings = @QueueBinding(
exchange = @Exchange(value = MqConstant.FANOUT_EXCHANGE, type = ExchangeTypes.FANOUT, durable = "true"),
value = @Queue(value = MqConstant.FANOUT_QUEUE, durable = "true")))
public void fanoutCustomer(@Payload Order order, @Headers Map map, Channel channel,Message message) throws IOException {
UserTokenContext.setToken(token);
System.out.println(order.getId());
Long delivery = (Long) map.get(AmqpHeaders.DELIVERY_TAG);
log.info("批次: {},序列号:{}", delivery,message.getMessageProperties().getDeliveryTag());
try {
System.out.println("消费成功");
channel.basicAck(delivery, false);
} catch (IOException e) {
System.out.println("消费失败");
channel.basicNack(
delivery, false, false
);
}
}
@RabbitListener(queues = MqConstant.LAZY_QUEUE)
public void lazyCustomer(Message message, Channel channel) {
UserTokenContext.setToken(token);
long deliveryTag = message.getMessageProperties().getDeliveryTag();
System.err.println("body: " + new String(message.getBody()));
try {
channel.basicAck(deliveryTag, false);
} catch (IOException e) {
try {
channel.basicNack(deliveryTag, false, false);
} catch (IOException e1) {
log.info("失败");
}
}
}
@RabbitListener(queues = MqConstant.TEST_QUEUE1)
@Transactional
public void customer1(String msg, Channel channel, Message message) throws IOException {
UserTokenContext.setToken(token);
try {
// 这里模拟一个空指针异常,
log.info("【Consumer01】批次: {}", message.getMessageProperties().getDeliveryTag());
log.info("【Consumer01成功接收到消息】>>> {}", msg);
peOrderInfoMapper.setValues();
String string = null;
string.length();
// 确认收到消息,只确认当前消费者的一个消息收到
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
e.printStackTrace();
//次重复消息投递
log.info("【Consumer01】批次: {}", message.getMessageProperties().getDeliveryTag());
// 拒绝消息,并且不再重新进入队列,重试机制两个条件1抛出异常,2不能重入队列而是重试,此处设置false
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
throw e;
}
}
}
package org.jeecg.modules.pQuery.sender;
import lombok.extern.slf4j.Slf4j;
import org.jeecg.boot.starter.rabbitmq.callback.Custom/confirm/iAndReturnCallback;
import org.jeecg.boot.starter.rabbitmq.constant.MqConstant;
import org.jeecg.common.base.baseMap;
import org.jeecg.modules.pQuery.controller.RabbitMqTest;
import org.jeecg.modules.pQuery.rabitMq.Order;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.UUID;
@Component
@Slf4j
public class SendMqService extends CustomConfirmAndReturnCallback {
@Autowired
private RabbitTemplate rabbitTemplate;
// @Autowired
// public SendMqService(RabbitTemplate amqpTemplate) {
// amqpTemplate.setConfirmCallback(this::/confirm/i);
// amqpTemplate.setReturnCallback(this::returnedMessage);
// this.rabbitTemplate = amqpTemplate;
// }
@PostConstruct
public void init() {
//指定 ConfirmCallback
rabbitTemplate.setConfirmCallback(this);
//指定 ReturnCallback
rabbitTemplate.setReturnCallback(this);
}
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
super.confirm(correlationData, ack, cause);
log.info("数据编号========correlationdata:{}======ack:{}", ack, correlationData);
if (!ack) {
log.info("异常处理...."+cause);
}
}
public void returnedMessage(Message message, int replyCode,
String replyText, String exchange, String routingKey) {
super.returnedMessage(message, replyCode, replyText, exchange, routingKey);
log.info("return exchange: {}, routingKey: {}, replyCode: {}, replyText: {}",
exchange, routingKey, replyCode, replyText);
}
public void send(String msg, String routingKey) {
for (int i = 0; i <= 5; i++) {
baseMap map = new baseMap();
map.put("num", i);
// MessageProperties properties = new MessageProperties();
// properties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
// properties.setHeader("num", i);
// Message message = new Message(msg.getBytes(), map);
// amqpTemplate.convertAndSend(MqConstant.TEST_EXCHANGE, routingKey, map);
// amqpTemplate.convertAndSend(MqConstant.TEST_EXCHANGE, routingKey, map, message -> {
// return message;
// });
MessageProperties properties = new MessageProperties();
properties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
properties.setHeader("num",i);
Message message = new Message(msg.getBytes(), properties);
rabbitTemplate.convertAndSend(MqConstant.TEST_EXCHANGE, routingKey, message);
}
}
public void dlsSend(String msg, String directRouterkey) {
for (int i = 0; i <= 5; i++) {
MessageProperties properties = new MessageProperties();
properties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
properties.setHeader("num",i);
properties.setExpiration("5000");
Message message = new Message(msg.getBytes(), properties);
String a = UUID.randomUUID().toString();
CorrelationData correlationData = new CorrelationData(String.valueOf(i));
rabbitTemplate.convertAndSend(MqConstant.DIRECT_EXCHANGE, MqConstant.DIRECT_ROUTERKEY, message,correlationData);
}
}
public void sendTopic(String msg, String routingKey) {
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
rabbitTemplate.convertAndSend(MqConstant.TOPIC_EXCHANGE,routingKey,msg,correlationData);
}
public void fanoutSender(Order order) {
CorrelationData correlationData = new CorrelationData(order.getId().toString());
rabbitTemplate.convertAndSend(MqConstant.FANOUT_EXCHANGE,null,order,correlationData);
}
public void lazySender(String msg, String routingKey) {
MessageProperties messageProperties = new MessageProperties();
messageProperties.setDelay(MqConstant.delay); //设置延迟的时间
//设置消息投递模式持久性和临时性,临时性重启会丢失或者没有接收者会丢失
messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
Message message = new Message(msg.getBytes(),messageProperties);
rabbitTemplate.convertAndSend(MqConstant.LAZY_EXCHANGE,routingKey,message);
}
public void testDb() {
for (int i = 0; i < 1; i++) {
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
// log.info("【Producer】发送的消费ID = {}", correlationData.getId());
String msg = "hello confirm message" + i;
// log.info("【Producer】发送的消息 = {}", msg);
rabbitTemplate.convertAndSend(MqConstant.TEST_EXCHANGE1, MqConstant.ROUTERKEY1, msg, correlationData);
}
}
}
依赖
org.springframework.cloud
spring-cloud-starter-bus-amqp
配置
#rabbitmq配置
rabbitmq:
host: pe-boot-rabbitmq
username: admin
password: admin
port: 5672
virtual-host: /
connection-timeout: 15000
#开启/confirm/i模式
publisher-/confirm/is: true
#开启return模式,前提是下面的mandatory设置为true否则会删除消息
publisher-returns: true
#消费者端开启自动ack模式
template.mandatory: true
#新版本publisher-/confirm/is已经修改为publisher-/confirm/i-type,默认为NONE,CORRELATED值是发布消息成功到交换器会触发回调
publisher-/confirm/i-type: correlated
listener:
simple:
acknowledge-mode: manual
#并发消费者的最大值
max-concurrency: 5
#并发消费者的初始化值
concurrency: 1
#每个消费者每次监听时可拉取
prefetch: 1
# 重试机制
retry:
#是否开启消费者重试
enabled: true
#最大重试次数
max-attempts: 5
#重试间隔时间(单位毫秒)
initial-interval: 5000
#重试最大时间间隔(单位毫秒)
max-interval: 1200000
#间隔时间乘子,间隔时间*乘子=下一次的间隔时间,最大不能超过设置的最大间隔时间
multiplier: 2



