项目中需要通过RabbitMQ发送业务消息由其他服务进行消费,即一个生产者对应多个消费者的场景,本文采用RabbitMQ的Topic模式实现。
二、Maven依赖三、yml配置org.springframework.boot spring-boot-starter-amqp ${spring-boot.version}
spring:
rabbitmq:
addresses: 127.0.0.1:5672
virtual-host: /
username: guest
password: guest
port: 5672
listener:
simple:
acknowledge-mode: manual
concurrency: 5
max-concurrency: 10
四、生产者
1.配置
@Configuration
public class RabbitmqConfig {
private static String EXCHANGE_NAME = "test.exchange";
public static String ROUTING_KEY = "test.routingKey";
@Bean
public TopicExchange tenantAccountExchange() {
return new TopicExchange(EXCHANGE_NAME);
}
}
2.发送代码
@Slf4j
@Component
public class ProducerService {
@Resource
private RabbitTemplate rabbitTemplate;
public void sendMessage(String exchange, String routingKey, Object msg) {
try {
// 添加消息id和时间戳
MessagePostProcessor messagePostProcessor = (message) -> {
MessageProperties messageProperties = message.getMessageProperties();
messageProperties.setMessageId(UUID.randomUUID().toString());
messageProperties.setTimestamp(new Date());
return message;
};
// 发送消息
rabbitTemplate.convertAndSend(exchange, routingKey, msg, messagePostProcessor);
} catch (AmqpException e) {
log.error(String.format("send message failed, exchange=%s, routingKey=%s, msg=%s", exchange, routingKey, JSON.toJSONString(msg)), e);
}
}
}
public void pushMessageToMQ(ApiDTO apiDTO) {
String msg = JSONObject.toJSONString(apiDTO);
log.info("pushMessageToMQ msg: {}", msg);
producerService.sendMessage(RabbitmqConfig.EXCHANGE_NAME, RabbitmqConfig.ROUTING_KEY, msg);
}
五、消费者
@Slf4j
@Component
public class RabbitmqListener {
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "test.queue", durable = "true"),
exchange = @Exchange(value = "test.exchange", type = ExchangeTypes.TOPIC),
key = "test.routingKey"
))
public void receiveMessage(String msg, Channel channel, Message message) throws IOException {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
log.info("receiveMessage deliveryTag: {}, message:{}", deliveryTag, msg);
// TODO 业务处理
channel.basicAck(deliveryTag, false);
}
}
六、控制台查看绑定关系



