栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

RabbitMQ学习笔记03

RabbitMQ学习笔记03

RabbitMQ学习笔记03

01 RabbitMQ的使用场景

1 解耦、削峰、异步

同步异步的问题(串行)并行方式 异步线程池异步消息队列的方式 2 高内聚、低耦合 2 SpringBoot整合RabbitMQ实现fanout模式3 Springboot的Direct模式实现

01 RabbitMQ的使用场景 1 解耦、削峰、异步 同步异步的问题(串行)

串行方式:将订单信息写入数据库成功后,发送注册邮件,再发送注册短信。以上三个任务全部完成后,返回给客户端。

并行方式 异步线程池

并行方式:将订单信息写入数据库成功后,发送注册邮件的同时,发送注册短信。以上三个任务完成后,返回给客户端。与串行的差别是,并行的方式可以提高处理的时间。

代码示例

public void makeOrder(){
    // 1 :保存订单 
    orderService.saveOrder();
   // 相关发送
   relationMessage();
}
public void relationMessage(){
    // 异步
     theadpool.submit(new Callable{
         public Object call(){
             // 2: 发送短信服务  
             messageService.sendSMS("order");
         }
     })
    // 异步
     theadpool.submit(new Callable{
         public Object call(){
              // 3: 发送email服务
            emailService.sendEmail("order");
         }
     })
      // 异步
     theadpool.submit(new Callable{
         public Object call(){
             // 4: 发送短信服务
             appService.sendApp("order");
         }
     })
      // 异步
         theadpool.submit(new Callable{
         public Object call(){
             // 4: 发送短信服务
             appService.sendApp("order");
         }
     })
}
 

存在问题:
1:耦合度高
2:需要自己写线程池自己维护成本太高
3:出现了消息可能会丢失,需要你自己做消息补偿
4:如何保证消息的可靠性你自己写
5:如果服务器承载不了,你需要自己去写高可用

异步消息队列的方式


好处
1:完全解耦,用MQ建立桥接
2:有独立的线程池和运行模型
3:出现了消息可能会丢失,MQ有持久化功能
4:如何保证消息的可靠性,死信队列和消息转移的等
5:如果服务器承载不了,你需要自己去写高可用,HA镜像模型高可用。
按照以上约定,用户的响应时间相当于是订单信息写入数据库的时间,也就是50毫秒。注册邮件,发送短信写入消息队列后,直接返回,因此写入消息队列的速度很快,基本可以忽略,因此用户的响应时间可能是50毫秒。因此架构改变后,系统的吞吐量提高到每秒20 QPS。比串行提高了3倍,比并行提高了两倍

2 高内聚、低耦合



04、分布式事务的可靠消费和可靠生产
05、索引、缓存、静态化处理的数据同步
06、流量监控
07、日志监控(ELK)
08、下单、订单分发、抢票

2 SpringBoot整合RabbitMQ实现fanout模式

使用springboot完成rabbitmq的消费模式-Fanout

新建SpringBoot项目


在application.yml中配置连接rabbitmq:

# 服务端口
server:
  port: 8080
# 配置rabbitmq服务
spring:
  rabbitmq:
    username: admin
    password: admin
    virtual-host: /
    host: XXXXXX
    port: 5672

生产者订单业务OrderService

package com.sl.rabbitmq.springbootorderrabbitmqproducer.service;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.UUID;

@Service
public class OrderService {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void makeOrder(int userId, int productId, int num) {
        //1.根据商品id查询库存是否充足
        //2.保存订单
        String orderId = UUID.randomUUID().toString();
        System.out.println("订单生产成功" + orderId);
        //3.通过MQ来完成消息的分发
        String exchangeName = "fanout_order_exchange";
        String routingKey = "";
        
        rabbitTemplate.convertAndSend(exchangeName, routingKey, orderId);
    }

}

在配置类中完成交换机和队列的绑定关系

package com.sl.rabbitmq.springbootorderrabbitmqproducer.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMqConfiguration {

    //1.声明注册fanout模式的交换机
    @Bean
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange("fanout_order_exchange", true, false);
    }
    //2.声明队列sms.fanout.queue, email.fanout.queue, duanxin.fanout.queue
    @Bean
    public Queue smsQueue(){
        return new Queue("sms.fanout.queue", true);
    }
    @Bean
    public Queue emailQueue(){
        return new Queue("email.fanout.queue", true);
    }
    @Bean
    public Queue duanxinQueue(){
        return new Queue("duanxin.fanout.queue", true);
    }
    //3.完成绑定关系(队列和交换机)
    @Bean
    public Binding smsBinding(){
        return BindingBuilder.bind(smsQueue()).to(fanoutExchange());
    }
    @Bean
    public Binding emailBinding(){
        return BindingBuilder.bind(emailQueue()).to(fanoutExchange());
    }
    @Bean
    public Binding duanxinBinding(){
        return BindingBuilder.bind(duanxinQueue()).to(fanoutExchange());
    }


}

测试

@SpringBootTest
class SpringbootOrderRabbitmqProducerApplicationTests {

    @Autowired
    private OrderService orderService;

    @Test
    void contextLoads() {

        orderService.makeOrder(1, 1, 12);

    }

}

结果:


绑定关系创建完成

消费者部分

新建一个module
同样配置application.yml

# 服务端口
server:
  port: 8081
# 配置rabbitmq服务
spring:
  rabbitmq:
    username: admin
    password: admin
    virtual-host: /
    host: XXXXXX
    port: 5672

创建三个消费者类

package com.sl.rabbitmq.springbootorderrabbitmqconsumer.service.fanout;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;

@Service
@RabbitListener(queues = {"sms.fanout.queue"})
public class SMSConsumer {

    @RabbitHandler
    public void receiveMessage(String message){
        System.out.println("sms fanout--接收到了订单信息:" + message);
    }
    
}

两个注解
@RabbitListener指定类或方法消费某交换机或队列的消息
@RabbitHandler为消息的落脚点,在这里会作为message参数传入注解的方法中

启动生产者模块
再次启动生产者测试分发消息
结果:

3 Springboot的Direct模式实现

配置绑定 DirectRabbiitMqConfiguration类

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class DirectRabbitMqConfiguration {

    //1.声明注册fanout模式的交换机
    @Bean
    public DirectExchange directExchange(){
        return new DirectExchange("direct_order_exchange", true, false);
    }
    //2.声明队列sms.fanout.queue, email.fanout.queue, duanxin.fanout.queue
    @Bean
    public Queue directsmsQueue(){
        return new Queue("sms.direct.queue", true);
    }
    @Bean
    public Queue directemailQueue(){
        return new Queue("email.direct.queue", true);
    }
    @Bean
    public Queue directduanxinQueue(){
        return new Queue("duanxin.direct.queue", true);
    }
    //3.完成绑定关系(队列和交换机)
    @Bean
    public Binding smsBinding(){
        return BindingBuilder.bind(directsmsQueue()).to(directExchange()).with("sms");
    }
    @Bean
    public Binding emailBinding(){
        return BindingBuilder.bind(directemailQueue()).to(directExchange()).with("email");
    }
    @Bean
    public Binding duanxinBinding(){
        return BindingBuilder.bind(directduanxinQueue()).to(directExchange()).with("duanxin");
    }

}


生产者方法

public void makeOrderDirect(int userId, int productId, int num) {
        //1.根据商品id查询库存是否充足
        //2.保存订单
        String orderId = UUID.randomUUID().toString();
        System.out.println("订单生产成功" + orderId);
        //3.通过MQ来完成消息的分发
        String exchangeName = "direct_order_exchange";
        //String routingKey = "";
        
        rabbitTemplate.convertAndSend(exchangeName, "email", orderId);
        rabbitTemplate.convertAndSend(exchangeName, "duanxin", orderId);
    }

消费者

@Service
@RabbitListener(queues = {"duanxin.direct.queue"})
public class DirectDuanxinConsumer {

    @RabbitHandler
    public void receiveMessage(String message){
        System.out.println("duanxin direct--接收到了订单信息:" + message);
    }

}

运行消费者服务,运行生产者测试
结果:

转载请注明:文章转载自 www.mshxw.com
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号