栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

RabbitMQ使用场景及案例(七)

RabbitMQ使用场景及案例(七)

一、场景介绍

可用于解耦、削峰、异步

1.1 串行方式

串行方式:将订单信息写入数据库成功后,发送注册邮件,再发送注册短信。以上三个任务全部完成后,返回给客户端

public void makeOrder(){
    // 1 :保存订单 
    orderService.saveOrder();
    // 2: 发送短信服务
    messageService.sendSMS("order");//1-2 s
    // 3: 发送email服务
    emailService.sendEmail("order");//1-2 s
    // 4: 发送APP服务
    appService.sendApp("order");    
}

存在问题:
1:每完成一个订单,耗时时间长
2:若3、4步执行失败,造成事务回滚导致下单失败,得不偿失
3:非核心业务代码过多造成冗余

1.2 并行方式

并行方式:将订单信息写入数据库成功后,发送注册邮件的同时,发送注册短信。以上三个任务完成后,返回给客户端。与串行的差别是,并行的方式可以提高处理的时间

public void makeOrder(){
    // 1 :保存订单 
    orderService.saveOrder();
   // 相关发送
   relationMessage();
}
public void relationMessage(){
    // 异步
     theadpool.submit(new Callable{
         public Object call(){
             // 2: 发送短信服务  
             messageService.sendSMS("order");
         }
     })
    // 异步
     theadpool.submit(new Callable{
         public Object call(){
              // 3: 发送email服务
            emailService.sendEmail("order");
         }
     })
      // 异步
     theadpool.submit(new Callable{
         public Object call(){
             // 4: 发送短信服务
             appService.sendApp("order");
         }
     })
      // 异步
         theadpool.submit(new Callable{
         public Object call(){
             // 4: 发送短信服务
             appService.sendApp("order");
         }
     })
}

 

存在问题:
1:耦合度高
2:需要自己写线程池自己维护成本太高
3:出现了消息可能会丢失,需要你自己做消息补偿
4:如何保证消息的可靠性你自己写
5:如果服务器承载不了,你需要自己去写高可用

1.3 异步消息队列的方式

public void makeOrder(){
    // 1 :保存订单 
    orderService.saveOrder();   
    rabbitTemplate.convertSend("ex","2","消息内容");
}

好处
1:完全解耦,用MQ建立桥接
2:有独立的线程池和运行模型
3:出现了消息可能会丢失,MQ有持久化功能
4:如何保证消息的可靠性,死信队列和消息转移的等
5:如果服务器承载不了,你需要自己去写高可用,HA镜像模型高可用。
按照以上约定,用户的响应时间相当于是订单信息写入数据库的时间,也就是50毫秒。注册邮件,发送短信写入消息队列后,直接返回,因此写入消息队列的速度很快,基本可以忽略,因此用户的响应时间可能是50毫秒。因此架构改变后,系统的吞吐量提高到每秒20 QPS。比串行提高了3倍,比并行提高了两倍

1.4 小结

使用消息中间件可以做到:
1、高内聚,低耦合

2、流量的削峰

3、分布式事务的可靠消费和可靠生产
4、索引、缓存、静态化处理的数据同步
5、流量监控
6、日志监控(ELK)
7、下单、订单分发、抢票

二、fanout模式案例演示 2.1 fanout模式图解

2.2 目标

2.3 生产者具体实现 2.3.1 创建生产者工程

2.3.2 pom.xml


    4.0.0

    org.example
    springboot-rabbitmq-fanout-producer
    1.0-SNAPSHOT

    
        8
        8
    

    
        
            
                org.springframework.boot
                spring-boot-dependencies
                2.3.2.RELEASE
                pom
                import
            
        
    

    
        
            org.springframework.boot
            spring-boot-starter-amqp
        
        
            org.springframework.boot
            spring-boot-starter-web
        
        
            org.springframework.boot
            spring-boot-starter-test
        
    

2.3.3 application.yml
# 服务端口
server:
  port: 8080
# 配置rabbitmq服务
spring:
  rabbitmq:
    username: admin
    password: admin
    virtual-host: /
    host: 192.168.229.128
    port: 5672

2.3.4 定义订单的生产者
package com.service;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.UUID;

@Component
public class OrderService {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void makeOrderFanout(String userId, String productId, int num) {
        // 1: 模拟用户下单
        String orderNumer = UUID.randomUUID().toString();

        // 2: 下单完成以后
        System.out.println("用户 " + userId + ",订单编号是:" + orderNumer);

        // 发送订单信息给RabbitMQ
        String exchangeName = "fanout_order_exchange";
        String routeKey = "";
        rabbitTemplate.convertAndSend(exchangeName, routeKey, orderNumer);
    }
 
}

2.3.5 配置类-绑定关系

package com.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class FanoutRabbitConfig {

    //创建队列
    @Bean
    public Queue fanoutEmailQueue() {
        // durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
        // exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
        // autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
        //一般设置一下队列的持久化就好,其余两个就是默认false
        return new Queue("email.fanout.queue", true);
    }
    @Bean
    public Queue fanoutSmsQueue() {
        return new Queue("sms.fanout.queue", true);
    }
    @Bean
    public Queue fanoutWeixinQueue() {
        return new Queue("weixin.fanout.queue", true);
    }


    //创建交换机
    @Bean
    public FanoutExchange fanoutOrderExchange() {
        return new FanoutExchange("fanout_order_exchange", true, false);
    }


    //绑定关系
    @Bean
    public Binding fanoutEmailBinding() {
        return BindingBuilder.bind(fanoutEmailQueue()).to(fanoutOrderExchange());
    }
    @Bean
    public Binding fanoutSmsBinding() {
        return BindingBuilder.bind(fanoutSmsQueue()).to(fanoutOrderExchange());
    }
    @Bean
    public Binding fanoutWeixinBinding() {
        return BindingBuilder.bind(fanoutWeixinQueue()).to(fanoutOrderExchange());
    }
}

2.3.5 创建生产者测试类
package com.test;

import com.service.OrderService;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest
public class FanoutProducerApplicationTests {

    @Autowired
    OrderService orderService;

    @Test
    public void contextLoads() throws Exception {
        orderService.makeOrderFanout("1", "1", 12);
    }
}

2.4 消费者具体实现 2.4.1 创建消费者工程

2.4.2 引入同样pom的依赖 2.4.3 application.yml
# 服务端口
server:
  port: 8081
# 配置rabbitmq服务
spring:
  rabbitmq:
    username: admin
    password: admin
    virtual-host: /
    host: 192.168.229.128
    port: 5672

2.4.4 创建三个消费者

FanoutEmailService:

package com.service.fanout;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;
// bindings其实就是用来确定队列和交换机绑定关系
@RabbitListener(queues = {"email.fanout.queue"})
@Component
public class FanoutEmailService {
    // @RabbitHandler 代表此方法是一个消息接收的方法。该不要有返回值
    @RabbitHandler
    public void messagerevice(String message){
        // 此处省略发邮件的逻辑
        System.out.println("email-------------->" + message);
    }
}

FanoutSMSService:

package com.service.fanout;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;
// bindings其实就是用来确定队列和交换机绑定关系
@RabbitListener(queues = {"sms.fanout.queue"})
@Component
public class FanoutSMSService {
    @RabbitHandler
    public void messagerevice(String message){
        System.out.println("sms-------------->" + message);
    }
}

FanoutWeixinService:

package com.service.fanout;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;
// bindings其实就是用来确定队列和交换机绑定关系
@RabbitListener(queues = {"weixin.fanout.queue"})
@Component
public class FanoutWeixinService {

    @RabbitHandler
    public void messagerevice(String message){
        System.out.println("weixin-------------->" + message);
    }
}

2.4.5 消费者的主启动类
package com;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class FanoutConsumer {

    public static void main(String[] args) {
        SpringApplication.run(FanoutConsumer.class, args);
    }
}

2.5 测试流程

1、先启动生产者(因为配置类定义在生产者一端,所以若先启动消费者会出现找不到队列的报错信息;解决方案:可以将配置类复制一份到消费者端)
2、启动消费者
3、查看消息是否成功消费

三、Direct模式案例演示 3.1 Direct模式图解

3.2 生产者具体实现

这里只介绍关键步骤,其他步骤参考fanout模式

3.2.1 创建Direct的配置类
package com.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class DirectRabbitConfig {

    //创建队列
    @Bean
    public Queue directEmailQueue() {
        // durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
        // exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
        // autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
        //一般设置一下队列的持久化就好,其余两个就是默认false
        return new Queue("email.direct.queue", true);
    }
    @Bean
    public Queue directSmsQueue() {
        return new Queue("sms.direct.queue", true);
    }
    @Bean
    public Queue directWeixinQueue() {
        return new Queue("weixin.direct.queue", true);
    }


    //创建交换机
    @Bean
    public DirectExchange directOrderExchange() {
        return new DirectExchange("direct_order_exchange", true, false);
    }


    //绑定关系
    @Bean
    public Binding directEmailBinding() {
        return BindingBuilder.bind(directEmailQueue()).to(directOrderExchange()).with("email");
    }
    @Bean
    public Binding directSmsBinding() {
        return BindingBuilder.bind(directSmsQueue()).to(directOrderExchange()).with("sms");
    }
    @Bean
    public Binding directWeixinBinding() {
        return BindingBuilder.bind(directWeixinQueue()).to(directOrderExchange()).with("weixin");
    }
}

3.2.2 订单生产者
package com.service;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.UUID;

@Component
public class OrderService {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void makeOrderFanout(String userId, String productId, int num) {
        // 1: 模拟用户下单
        String orderNumer = UUID.randomUUID().toString();

        // 2: 下单完成以后
        System.out.println("用户 " + userId + ",订单编号是:" + orderNumer);

        // 发送订单信息给RabbitMQ
        String exchangeName = "fanout_order_exchange";
        String routeKey = "";
        rabbitTemplate.convertAndSend(exchangeName, routeKey, orderNumer);
    }

    public void makeOrderDirect(String userId, String productId, int num) {
        // 1: 模拟用户下单
        String orderNumer = UUID.randomUUID().toString();

        // 2: 下单完成以后
        System.out.println("用户 " + userId + ",订单编号是:" + orderNumer);

        // 发送订单信息给RabbitMQ
        String exchangeName = "direct_order_exchange";
        rabbitTemplate.convertAndSend(exchangeName, "email", orderNumer);
        rabbitTemplate.convertAndSend(exchangeName, "sms", orderNumer);
    }
 
}

3.2.3 生产者测试类
package com.test;

import com.service.OrderService;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest
public class FanoutProducerApplicationTests {

    @Autowired
    OrderService orderService;

    @Test
    public void contextLoads() throws Exception {
        orderService.makeOrderFanout("1", "1", 12);
    }

    @Test
    public void contextLoads2() throws Exception {
        orderService.makeOrderDirect("1", "1", 12);
    }

    
}

3.3 消费者具体实现 3.3.1 创建三个消费者

DirectEmailService:

package com.service.direct;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;

// bindings其实就是用来确定队列和交换机绑定关系
@RabbitListener(queues = {"email.direct.queue"})
@Component
public class DirectEmailService {
    // @RabbitHandler 代表此方法是一个消息接收的方法。该不要有返回值
    @RabbitHandler
    public void messagerevice(String message){
        // 此处省略发邮件的逻辑
        System.out.println("email-------------->" + message);
    }
}

DirectSMSService:

package com.service.direct;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

// bindings其实就是用来确定队列和交换机绑定关系
@RabbitListener(queues = {"sms.direct.queue"})
@Component
public class DirectSMSService {
    @RabbitHandler
    public void messagerevice(String message){
        System.out.println("sms-------------->" + message);
    }
}

DirectWeixinService:

package com.service.direct;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

// bindings其实就是用来确定队列和交换机绑定关系
@RabbitListener(queues = {"weixin.direct.queue"})
@Component
public class DirectWeixinService {

    @RabbitHandler
    public void messagerevice(String message){
        System.out.println("weixin-------------->" + message);
    }
}

四、topic模式案例演示 4.1 topic模式图解

4.2 特别说明

这个说明下,前面两个案例都是采用配置类的方式实现交换机和队列的创建和绑定,因此特地用此案例来演示一下注解方式实现。

4.3 生产者具体实现

由于采用注解方式,不需要创建配置类,因此只需要创建订单生产者和测试类即可

4.3.1 订单生产者
package com.service;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.UUID;

@Component
public class OrderService {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void makeOrderFanout(String userId, String productId, int num) {
        // 1: 模拟用户下单
        String orderNumer = UUID.randomUUID().toString();

        // 2: 下单完成以后
        System.out.println("用户 " + userId + ",订单编号是:" + orderNumer);

        // 发送订单信息给RabbitMQ
        String exchangeName = "fanout_order_exchange";
        String routeKey = "";
        rabbitTemplate.convertAndSend(exchangeName, routeKey, orderNumer);
    }

    public void makeOrderDirect(String userId, String productId, int num) {
        // 1: 模拟用户下单
        String orderNumer = UUID.randomUUID().toString();

        // 2: 下单完成以后
        System.out.println("用户 " + userId + ",订单编号是:" + orderNumer);

        // 发送订单信息给RabbitMQ
        String exchangeName = "direct_order_exchange";
        rabbitTemplate.convertAndSend(exchangeName, "email", orderNumer);
        rabbitTemplate.convertAndSend(exchangeName, "sms", orderNumer);
    }

    public void makeOrderTopic(String userId, String productId, int num) {
        // 1: 模拟用户下单
        String orderNumer = UUID.randomUUID().toString();

        // 2: 下单完成以后
        System.out.println("用户 " + userId + ",订单编号是:" + orderNumer);

        // 发送订单信息给RabbitMQ
        String exchangeName = "topic_order_exchange";
        //#0或多,*1或多
        //#.email.#
        //#.sms.*
        //weixin.#
        String routingKey = "sms.email";
        rabbitTemplate.convertAndSend(exchangeName, routingKey, orderNumer);
    }
}

4.3.2 生产者测试类
package com.test;

import com.service.OrderService;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest
public class FanoutProducerApplicationTests {

    @Autowired
    OrderService orderService;

    @Test
    public void contextLoads() throws Exception {
        orderService.makeOrderFanout("1", "1", 12);
    }

    @Test
    public void contextLoads2() throws Exception {
        orderService.makeOrderDirect("1", "1", 12);
    }

    @Test
    public void contextLoads3() throws Exception {
        orderService.makeOrderTopic("1", "1", 12);
    }
}

4.4 消费者具体实现 4.4.1 创建三个消费者

TopicEmailService:

package com.service.topic;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;


@RabbitListener(bindings =@QueueBinding(
        value = @Queue(value = "email.topic.queue",autoDelete = "false"),
        exchange = @Exchange(value = "topic_order_exchange",type = ExchangeTypes.TOPIC),
        key = "#.email.#"
))
@Component
public class TopicEmailService {
    // @RabbitHandler 代表此方法是一个消息接收的方法。该不要有返回值
    @RabbitHandler
    public void messagerevice(String message){
        // 此处省略发邮件的逻辑
        System.out.println("emailtopic-------------->" + message);
    }
}

TopicSMSService:

package com.service.topic;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;

@RabbitListener(bindings =@QueueBinding(
        value = @Queue(value = "sms.topic.queue",autoDelete = "false"),
        exchange = @Exchange(value = "topic_order_exchange",type = ExchangeTypes.TOPIC),
        key = "#.sms.*"
))
@Component
public class TopicSMSService {
    // @RabbitHandler 代表此方法是一个消息接收的方法。该不要有返回值
    @RabbitHandler
    public void messagerevice(String message){
        // 此处省略发邮件的逻辑
        System.out.println("smstopic-------------->" + message);
    }
}

TopicWeixinService:

package com.service.topic;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;

@RabbitListener(bindings =@QueueBinding(
        value = @Queue(value = "weixin.topic.queue",autoDelete = "false"),
        exchange = @Exchange(value = "topic_order_exchange",type = ExchangeTypes.TOPIC),
        key = "weixin.#"
))
@Component
public class TopicWeixinService {
    @RabbitHandler
    public void messagerevice(String message){
        System.out.println("weixintopic-------------->" + message);
    }
}

4.4.2 简单介绍消费者

由上述代码可以见,交换机和队列的创建和绑定都在消费者这一端实现了,通过注解的方式。

五、总结

1、交换机和队列的创建和绑定配置类,应该定义在消费者还是生产者那边?
2、使用配置类方式还是使用注解方式好?

本人意见:
1:可以在两端都添加好配置,生产者一端可随意,但消费者一端必须添加,因为大多数情况,我们都是先启动消费者的。但还是视情况而定。
2:推荐使用配置类方式,因为配置类方式逻辑比较清晰,代码内聚方便管理,而且后续对消息和队列设置TTL属性时也较为方便和直观。

转载请注明:文章转载自 www.mshxw.com
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号