栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

RabbitMQ项目实战——商户管理系统

RabbitMQ项目实战——商户管理系统

项目介绍

商户管理系统:

用于管理与公司合作的商户信息,包括商户准入和审核的全流程。有很多下游业务系统需要用到商户信息,每一个系统都会在自己的数据库中存放商户的关键信息。比如提单系统提单需要商户的名称;放宽系统放款需要商户的账号名;风控系统也要关注商信息的变动。

这种同步数据的场景,之前一直使用定时同步,也就是依赖一个核心的数据库,我们只需要修改核心数据的商户信息,其他系统定时去核心系统中拉取数据。

但是定时同步有两个缺点:首先实时性不高,因为定时任务不可能每分每秒都在运行;其次就是一旦核心数据库出现问题,其他所有的系统都无法同步,耦合性太高。

所以这种情况就可以改用MQ同步,因为考虑到还有其他系统也要用到商户信息,所以我们直接采用广播的方式。

该项目无非就是一个生产者消费者模型 每次商户信息变化后就发出通知消费即可。

整体流程:

生产者:

  1. 注入Template发送消息。 在任何需要发送MQ消息的地方注入Template,或者在单元测试类中注入生产者,调用send()方法。

消费者:

  1. 创建配置类,定义队列、交换机、绑定;
  2. 创建消费者,监听队列;
生产者 工程搭建 配置

在实际开发过程中,我们应该将交换机名称及队列名称放在配置文件中统一管理。

com.directexchange=DIRECT_EXCHANGE
com.topicexchange=TOPIC_EXCHANGE
com.fanoutexchange=FANOUT_EXCHANGE
com.directroutingkey=best
com.topicroutingkey1=chen
com.topicroutingkey2=nanjin

properties文件

server.port=9071
#spring.rabbitmq.host=127.0.0.1
#spring.rabbitmq.port=5672
#spring.rabbitmq.virtual-host=/
#spring.rabbitmq.username=guest
#spring.rabbitmq.password=guest
spring.datasource.url=jdbc:mysql://localhost:3306/rabbitmq?useUnicode=true&characterEncoding=utf8
spring.datasource.username=root
spring.datasource.password=123456
spring.datasource.driver-class-name=com.mysql.jdbc.Driver
propertiesspring.thymeleaf.cache=false
mybatis.typeAliasesPackage=com.example.producer.entity
mybatis.mapperLocations=classpath:mybatis/mapper
    @Bean
    public RabbitTemplate rabbitTemplate(final ConnectionFactory connectionFactory) {
        final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
        return rabbitTemplate;
    }
}

带着上面的问题,我们可以看一下service的实现类。

以更新商户信息为例。我们先更新数据库,然后调用注入的template发送消息。

@Override
public int update(Merchant merchant) {

    int k = merchantMapper.update(merchant);

    // 发送消息失败了???
    JSONObject title = new JSONObject();
    String jsonBody = JSONObject.toJSONString(merchant);
    title.put("type", "update");
    title.put("desc", "更新商户信息");
    title.put("content", jsonBody);
    amqpTemplate.convertAndSend(topicExchange, topicRoutingKey, title.toJSONString());

    return k;
}

这里的顺序千万要注意!一定是先更新数据库后再发送消息!

如果先发送消息的话,消费者接收到了消息,认为上游数据更新成功了,他接着进行其他业务;但是这个时候上有数据库更新失败了,所以就会导致数据库回滚的话造成数据一致性的问题。

消息可靠性传递

但是,如果先更新数据库,然后发送消息的时候失败了?比如服务器没有成功接受或者路由出现了问题,这个时候应该怎么解决呢?

这个时候就需要用到rabbitmq中的消息可靠性机制,该部分会在另外一篇文章详解。

消费者 工程搭建 配置

将队列信息采用配置文件的形式

com.directexchange=DIRECT_EXCHANGE
com.topicexchange=TOPIC_EXCHANGE
com.fanoutexchange=FANOUT_EXCHANGE

com.firstqueue=FIRST_QUEUE
com.secondqueue=SECOND_QUEUE
com.thirdqueue=THIRD_QUEUE
com.fourthqueue=FOURTH_QUEUE
server.port=9072
spring.rabbitmq.listener.direct.acknowledge-mode=manual
spring.rabbitmq.listener.simple.acknowledge-mode=manual
#spring.rabbitmq.cache.channel.size=
消息队列绑定

然后需要进行交换机、队列的定义及两者间的绑定。

@Configuration
@PropertySource("classpath:mq.properties")
public class RabbitConfig {
    @Value("${com.firstqueue}")
    private String firstQueue;

    @Value("${com.secondqueue}")
    private String secondQueue;

    @Value("${com.thirdqueue}")
    private String thirdQueue;

    @Value("${com.fourthqueue}")
    private String fourthQueue;

    @Value("${com.directexchange}")
    private String directExchange;

    @Value("${com.topicexchange}")
    private String topicExchange;

    @Value("${com.fanoutexchange}")
    private String fanoutExchange;

    // 创建四个队列
    @Bean("FirstQueue")
    public Queue getFirstQueue() {
        return new Queue(firstQueue);
    }

    @Bean("SecondQueue")
    public Queue getSecondQueue() {
        return new Queue(secondQueue);
    }

    @Bean("ThirdQueue")
    public Queue getThirdQueue() {
        return new Queue(thirdQueue);
    }

    @Bean("FourthQueue")
    public Queue getFourthQueue() {
        return new Queue(fourthQueue);
    }

    // 创建三个交换机
    @Bean("DirectExchange")
    public DirectExchange getDirectExchange() {
        return new DirectExchange(directExchange);
    }

    @Bean("TopicExchange")
    public TopicExchange getTopicExchange() {
        return new TopicExchange(topicExchange);
    }

    @Bean("FanoutExchange")
    public FanoutExchange getFanoutExchange() {
        return new FanoutExchange(fanoutExchange);
    }

    // 定义四个绑定关系
    @Bean
    public Binding bindFirst(@Qualifier("FirstQueue") Queue queue, @Qualifier("DirectExchange") DirectExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("best");
    }

    @Bean
    public Binding bindSecond(@Qualifier("SecondQueue") Queue queue, @Qualifier("TopicExchange") TopicExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("#");
    }

    @Bean
    public Binding bindThird(@Qualifier("ThirdQueue") Queue queue, @Qualifier("FanoutExchange") FanoutExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange);
    }

    @Bean
    public Binding bindFourth(@Qualifier("FourthQueue") Queue queue, @Qualifier("FanoutExchange") FanoutExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange);
    }

    
    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setMessageConverter(new Jackson2JsonMessageConverter());
        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        factory.setAutoStartup(true);
        return factory;
    }
}
消息监听
@Component
@PropertySource("classpath:mq.properties")
@RabbitListener(queues = "${com.secondqueue}", containerFactory = "rabbitListenerContainerFactory")
public class SecondConsumer {
    @RabbitHandler
    public void process(String msgContent, Channel channel, Message message) throws IOException {
        System.out.println("Second Queue received msg : " + msgContent);

        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
}
测试

http://127.0.0.1:9071/merchantList

新增、删除、修改信息、修改状态会发送MQ消息到TOPIC_EXCHANGE, 路由到SECOND_QUEUE。

  1. 先启动消费者consumer
  2. 在界面上修改商户信息,或者调用生产者的单元测试类发送消息。

修改商户信息后,可以看到消费者工程接收到相关信息。

项目地址

rabbitmq-demo地址

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/651045.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号