本文主要介绍rabbitmq的引入、配置、发送和接收消息,暂不考虑消息的丢失和重复等问题
一、导入maven依赖,我使用的版本和parent的版本一致2.3.12.RELEASE
org.springframework.boot
spring-boot-starter-amqp
二、application.yml中RabbitMQ的基本配置信息,请先自行安装RabbitMQ
spring:
rabbitmq:
host: 101.200.xxx.xx
port: 5672
username: admin
password: admin
三、新建RabbitConfig类配置队列,交换器,绑定关系
常用的Exchange有以下三种,本文示例为DirectExchange
DirectExchange:固定RoutingKey,绑定时和发送消息时完全一致才能经exchange发送到queue
FanoutExchange:广播模式,不需要routingkey,发送到exchange下所有queue
TopicExchange:RoutingKey绑定时可用*和#模式,例如绑定使用*.order.#,那么发送时使用pay.order也能发送到绑定的queue中
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitTestConfig {
public static final String MY_TEST_DIRE_QUEUE = "MyTestDireQueue";
public static final String MY_TEST_DIRECT_EXCHANGE = "MyTestDirectExchange";
public static final String MY_TEST_DIRECT_ROUTING = "MyTestDirectRouting";
@Bean
public Queue testDireQueue() {
return new Queue(MY_TEST_DIRE_QUEUE, true);
}
@Bean
DirectExchange testDirectExchange() {
return new DirectExchange(MY_TEST_DIRECT_EXCHANGE,true,false);
}
@Bean
Binding bindingDirect() {
return BindingBuilder.bind(testDireQueue()).to(testDirectExchange()).with(MY_TEST_DIRECT_ROUTING);
}
}
四、分发消息
1、新建一个MessageTest
@Data
public class MessageTest implements Serializable {
private static final long serialVersionUID = -3496661997686932274L;
private String messageId;
private String content;
}
2、使用RabbitTemplate发送消息,注意发送消息的时机
例如再实际业务中,如果存在事务,应在事务提交后发送消息。1、以防消费方消费了该消息,生产方之后异常事务回滚导致数据不一致,2、或者生产方事务未提交,消费方收到消息后去数据库查询该数据,该数据不存在等问题。
@RestController
@RequestMapping("message/test")
@Slf4j
public class SendMessageTestController {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/send")
public String sendTest(@RequestParam String message) {
String messageId = String.valueOf(UUID.randomUUID());
MessageTest myMessage = new MessageTest();
myMessage.setMessageId(messageId);
myMessage.setContent(message);
rabbitTemplate.convertAndSend(RabbitTestConfig.MY_TEST_DIRECT_EXCHANGE, RabbitTestConfig.MY_TEST_DIRECT_ROUTING, myMessage);
return "成功";
}
}
五、接收消息,注意接收消息方法不要抛出异常,异常会导致消息不能正常消费,从新放入到队列死循环
import com.email.server.config.RabbitTestConfig;
import com.email.server.dto.MessageTest;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
@Service
@Slf4j
public class ConsumeService {
@RabbitListener(queues = RabbitTestConfig.MY_TEST_DIRE_QUEUE)
public void receive(MessageTest data) {
try {
log.info("TestDireQueue receive message================={}",data);
} catch (Exception ex) {
log.error("err",ex);
}
}
}
后续再记录/confirm/iCallback,ReturnCallback,和手动ack确认消息



