商户管理系统:
用于管理与公司合作的商户信息,包括商户准入和审核的全流程。有很多下游业务系统需要用到商户信息,每一个系统都会在自己的数据库中存放商户的关键信息。比如提单系统提单需要商户的名称;放宽系统放款需要商户的账号名;风控系统也要关注商信息的变动。
这种同步数据的场景,之前一直使用定时同步,也就是依赖一个核心的数据库,我们只需要修改核心数据的商户信息,其他系统定时去核心系统中拉取数据。
但是定时同步有两个缺点:首先实时性不高,因为定时任务不可能每分每秒都在运行;其次就是一旦核心数据库出现问题,其他所有的系统都无法同步,耦合性太高。
所以这种情况就可以改用MQ同步,因为考虑到还有其他系统也要用到商户信息,所以我们直接采用广播的方式。
该项目无非就是一个生产者消费者模型 每次商户信息变化后就发出通知消费即可。
整体流程:
生产者:
- 注入Template发送消息。 在任何需要发送MQ消息的地方注入Template,或者在单元测试类中注入生产者,调用send()方法。
消费者:
- 创建配置类,定义队列、交换机、绑定;
- 创建消费者,监听队列;
在实际开发过程中,我们应该将交换机名称及队列名称放在配置文件中统一管理。
com.directexchange=DIRECT_EXCHANGE com.topicexchange=TOPIC_EXCHANGE com.fanoutexchange=FANOUT_EXCHANGE com.directroutingkey=best com.topicroutingkey1=chen com.topicroutingkey2=nanjin
properties文件
server.port=9071
#spring.rabbitmq.host=127.0.0.1
#spring.rabbitmq.port=5672
#spring.rabbitmq.virtual-host=/
#spring.rabbitmq.username=guest
#spring.rabbitmq.password=guest
spring.datasource.url=jdbc:mysql://localhost:3306/rabbitmq?useUnicode=true&characterEncoding=utf8
spring.datasource.username=root
spring.datasource.password=123456
spring.datasource.driver-class-name=com.mysql.jdbc.Driver
propertiesspring.thymeleaf.cache=false
mybatis.typeAliasesPackage=com.example.producer.entity
mybatis.mapperLocations=classpath:mybatis/mapper
@Bean
public RabbitTemplate rabbitTemplate(final ConnectionFactory connectionFactory) {
final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
return rabbitTemplate;
}
}
带着上面的问题,我们可以看一下service的实现类。
以更新商户信息为例。我们先更新数据库,然后调用注入的template发送消息。
@Override
public int update(Merchant merchant) {
int k = merchantMapper.update(merchant);
// 发送消息失败了???
JSONObject title = new JSONObject();
String jsonBody = JSONObject.toJSONString(merchant);
title.put("type", "update");
title.put("desc", "更新商户信息");
title.put("content", jsonBody);
amqpTemplate.convertAndSend(topicExchange, topicRoutingKey, title.toJSONString());
return k;
}
这里的顺序千万要注意!一定是先更新数据库后再发送消息!
如果先发送消息的话,消费者接收到了消息,认为上游数据更新成功了,他接着进行其他业务;但是这个时候上有数据库更新失败了,所以就会导致数据库回滚的话造成数据一致性的问题。
消息可靠性传递但是,如果先更新数据库,然后发送消息的时候失败了?比如服务器没有成功接受或者路由出现了问题,这个时候应该怎么解决呢?
这个时候就需要用到rabbitmq中的消息可靠性机制,该部分会在另外一篇文章详解。
消费者 工程搭建 配置将队列信息采用配置文件的形式
com.directexchange=DIRECT_EXCHANGE com.topicexchange=TOPIC_EXCHANGE com.fanoutexchange=FANOUT_EXCHANGE com.firstqueue=FIRST_QUEUE com.secondqueue=SECOND_QUEUE com.thirdqueue=THIRD_QUEUE com.fourthqueue=FOURTH_QUEUE
server.port=9072 spring.rabbitmq.listener.direct.acknowledge-mode=manual spring.rabbitmq.listener.simple.acknowledge-mode=manual #spring.rabbitmq.cache.channel.size=消息队列绑定
然后需要进行交换机、队列的定义及两者间的绑定。
@Configuration
@PropertySource("classpath:mq.properties")
public class RabbitConfig {
@Value("${com.firstqueue}")
private String firstQueue;
@Value("${com.secondqueue}")
private String secondQueue;
@Value("${com.thirdqueue}")
private String thirdQueue;
@Value("${com.fourthqueue}")
private String fourthQueue;
@Value("${com.directexchange}")
private String directExchange;
@Value("${com.topicexchange}")
private String topicExchange;
@Value("${com.fanoutexchange}")
private String fanoutExchange;
// 创建四个队列
@Bean("FirstQueue")
public Queue getFirstQueue() {
return new Queue(firstQueue);
}
@Bean("SecondQueue")
public Queue getSecondQueue() {
return new Queue(secondQueue);
}
@Bean("ThirdQueue")
public Queue getThirdQueue() {
return new Queue(thirdQueue);
}
@Bean("FourthQueue")
public Queue getFourthQueue() {
return new Queue(fourthQueue);
}
// 创建三个交换机
@Bean("DirectExchange")
public DirectExchange getDirectExchange() {
return new DirectExchange(directExchange);
}
@Bean("TopicExchange")
public TopicExchange getTopicExchange() {
return new TopicExchange(topicExchange);
}
@Bean("FanoutExchange")
public FanoutExchange getFanoutExchange() {
return new FanoutExchange(fanoutExchange);
}
// 定义四个绑定关系
@Bean
public Binding bindFirst(@Qualifier("FirstQueue") Queue queue, @Qualifier("DirectExchange") DirectExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("best");
}
@Bean
public Binding bindSecond(@Qualifier("SecondQueue") Queue queue, @Qualifier("TopicExchange") TopicExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("#");
}
@Bean
public Binding bindThird(@Qualifier("ThirdQueue") Queue queue, @Qualifier("FanoutExchange") FanoutExchange exchange) {
return BindingBuilder.bind(queue).to(exchange);
}
@Bean
public Binding bindFourth(@Qualifier("FourthQueue") Queue queue, @Qualifier("FanoutExchange") FanoutExchange exchange) {
return BindingBuilder.bind(queue).to(exchange);
}
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(new Jackson2JsonMessageConverter());
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
factory.setAutoStartup(true);
return factory;
}
}
消息监听
@Component
@PropertySource("classpath:mq.properties")
@RabbitListener(queues = "${com.secondqueue}", containerFactory = "rabbitListenerContainerFactory")
public class SecondConsumer {
@RabbitHandler
public void process(String msgContent, Channel channel, Message message) throws IOException {
System.out.println("Second Queue received msg : " + msgContent);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
测试
http://127.0.0.1:9071/merchantList
新增、删除、修改信息、修改状态会发送MQ消息到TOPIC_EXCHANGE, 路由到SECOND_QUEUE。
- 先启动消费者consumer
- 在界面上修改商户信息,或者调用生产者的单元测试类发送消息。
修改商户信息后,可以看到消费者工程接收到相关信息。
项目地址rabbitmq-demo地址



