栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

RabbitMQ(简介、概念、安装和springboot整合)

RabbitMQ(简介、概念、安装和springboot整合)

RabbitMQ(简介、概念、安装和springboot整合)

一、MQ简介

​ 在计算机科学中,消息队列((英语:Message queue)是一种进程间通信或同一进程的不同线程间的通信方式,软件的贮列用来处理一系列的输入,通常是来自用户。消息队列提供了异步的通信协议,每一个贮列中的纪录包含详细说明的数据,包含发生的时间,输入设备的种类,以及特定的输入参数,也就是说:消息的发送者和接收者不需要同时与消息队列互交。消息会保存在队列中,直到接收者取回它。

1.1.实现

消息队列常常保存在链表结构中。拥有权限的进程可以向消息队列中写入或读取消息。目前,有很多消息队列有很多开源的实现,包括JBoss Messaging.JORAM、Apache ActiveMQ、Sun0pen Message Queue、IBM MQ、Apache Qpid和HTTPSQS。当前使用较多的消息队列有RabbitMQ、RocketNQ、ActiveMQ、Kafka、ZeroNQ、metaMq等,而部分数据库如Redis、Mysql以及phxsql也可实现消息队列的功能。 1.2.特点

​ MQ是消费者-生产者模型的一个典型的代表,一端往消息队列中不断写入消息,而另一端则可以读取或者订阅队列中的消息。MQ和JMS类似,但不同的是JMS是SUN JAVA消息中间件服务的一个标准和API定义,而MQ则是遵循了AMQP协议的具体实现和产品。
注意:

    AMQP,即DAdvanced MessageQueuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。JMS,Java消息服务(Java Message Service)应用程序接口,是一个Java平台中关于面向消息中间件的API,
    用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。Java消息服务是一个与具体平台无关的API,绝大多数MOM提供商都对JMS提供支持。常见的消息队列,大部分都实现了JMSAPI,如
    ActiveMQ ,Redis以及 RabbitMQ等。
1.3.优缺点

优点

应用耦合、异步处理、流量削锋

解耦

传统模式:

传统模式的缺点:
系统间耦合性太强,如上图所示,系统A在代码中直接调用系统B和系统C的代码,如果将来D系统接入,系统A还需要修改代码,过于麻烦!
中间件模式:

中间件模式的的优点:
将消息写入消息队列,需要消息的系统自己从消息队列中订阅,从而系统A不需要做任何修改。
。异步
传统模式:

传统模式的缺点:
—些非必要的业务逻辑以同步的方式运行,太耗费时间。

中间件模式:

中间件模式的的优点:
将消息写入消息队列,需要消息的系统自己从消息队列中订阅,从而系统A不需要做任何修改。

削峰

传统模式:

传统模式的缺点:
并发量大的时候,所有的请求直接怼到数据库,造成数据库连接异常

中间件模式:

中间件模式的的优点:
系统A慢慢的按照数据库能处理的并发量,从消息队列中慢慢拉取消息。在生产中,这个短暂的高峰期积压是允许的。
缺点
系统可用性降低、系统复杂性增加

1.4.使用场景

​ 消息队列,是分布式系统中重要的组件,其通用的使用场景可以简单地描述为:当不需要立即获得结果,但是并发量又需要进行控制的时候,差不多就是需要使用消息队列的时候
​ 在项目中,将一些无需即时返回且耗时的操作提取出来,进行了异步处理,而这种异步处理的方式大大的节省了服务器的请求响应时间,从而提高了系统的吞吐量。

1.5.为什么使用RabbitMQ

​ AMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。

​ AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。

​ RabbitMQ是一个开源的AMQP实现,服务器端用Erlangi语言编写,支持多种客户端,如: Python、
Ruby、 .NET,Java,JMS、C,PHP,Actionscript, XMPP,STONP等,支持AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。

总结如下:

基于AMQP协议高并发(是一个容量的概念,服务器可以接受的最大任务数量)。高性能(是一个速度的概念,单位时间内服务器可以处理的任务数)高可用(是一个持久的概念,单位时间内服务器可以正常工作的时间比例)。强大的社区支持,以及很多公司都在使用支持插件支持多语言 二、概念

RabbitMQ简介:RabbitMQ是一个由erlang开发的AMQP(Advanved Message Queue Protocol)的开源实现。

Message:消息,消息是不具名的,它由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可能需要持久性存储)等。

Publisher:消息的生产者,也是一个向交换器发布消息的客户端应用程序。

Exchange:交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。Exchange有4种类型:direct(默认),fanout, topic, 和headers(headers和direct交换器完全一致,但性能差很多,目前几乎用不到),不同类型的Exchange转发消息的策略有所区别。

Queue:消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。

Binding:绑定,用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。Exchange 和Queue的绑定可以是多对多的关系。

Connection:网络连接,比如一个TCP连接。

Channel:信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内的虚拟连接,AMQP 命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说建立和销毁 TCP 都是非常昂贵的开销,所以引入了信道的概念,以复用一条TCP 连接。

Consumer:消息的消费者,表示一个从消息队列中取得消息的客户端应用程序。

Virtual Host:虚拟主机,表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加

密环境的独立服务器域。每个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥有自己的队列、交换器、绑定和权限机制。vhost 是 AMQP 概念的基础,必须在连接时指定,RabbitMQ 默认的 vhost 是 / 。

Broker:表示消息队列服务器实体

三、安装
    获取镜像
#指定版本,该版本包含了web控制页面
docker pull rabbitmq:management
    运行镜像
#方式一:默认guest 用户,密码也是 guest
docker run -d --hostname my-rabbit --name rabbit -p 15672:15672 -p 5672:5672 rabbitmq:management

#方式二:设置用户名和密码
docker run -d --hostname my-rabbit --name rabbit -e RABBITMQ_DEFAULT_USER=user -e RABBITMQ_DEFAULT_PASS=password -p 15672:15672 -p 5672:5672 rabbitmq:management

#方式三
docker run -d --name rabbitmq -p 5671:5671 -p 5672:5672 -p 4369:4369 -p 25672:25672 -p 15671:15671 -p 15672:15672 rabbitmq:management

	# 4369, 25672 (Erlang发现&集群端口) 
	# 5672, 5671 (AMQP端口) 15672 (web管理后台端口) 
	# 61613, 61614 (STOMP协议端口) 
	# 1883, 8883 (MQTT协议端口) 
	# https://www.rabbitmq.com/networking.html
    访问ui界面
http://localhost:15672/

四、SpringBoot整合RabbitMQ 4.1. 引入依赖

    org.springframework.boot
    spring-boot-starter-amqp

4.2. application.yml配置
spring:
  rabbitmq:
    host: 192.168.10.123
    port: 5672
    virtual-host: /
    #开启发送端确认
    #publisher-/confirm/is: true
    publisher-/confirm/i-type: correlated
    # 开启发送端消息抵达队列的确认
    publisher-returns: true
    template:
      #只要抵达队列,以异步发送优先调用我们这个return - /confirm/i
      mandatory: true
    listener:
      simple:
        #手动ack消息(手动确认消息是否消费)
        acknowledge-mode: manual
4.3. RabbitConfig配置类
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;

@Configuration
public class MyRabbitConfig {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Primary
    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        this.rabbitTemplate = rabbitTemplate;
        rabbitTemplate.setMessageConverter(messageConverter());
        initRabbitTemplate();
        return rabbitTemplate;
    }

    
    @Bean
    public MessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();
    }

    
    // @PostConstruct  //MyRabbitConfig对象创建完成以后,执行这个方法
    public void initRabbitTemplate() {

        
       
        //设置确认回调
        rabbitTemplate.set/confirm/iCallback((correlationData,ack,cause) -> {
            System.out.println("/confirm/i...correlationData["+correlationData+"]==>ack:["+ack+"]==>cause:["+cause+"]");
        });


        
        rabbitTemplate.setReturnCallback((message,replyCode,replyText,exchange,routingKey) -> {
            System.out.println("Fail Message["+message+"]==>replyCode["+replyCode+"]" +
                    "==>replyText["+replyText+"]==>exchange["+exchange+"]==>routingKey["+routingKey+"]");
        });
    }
}

4.4. RabbitMQConfig(容器中创建交换机、队列和绑定)
package com.lyh.mall.order.config;

import com.lyh.mall.order.entity.OrderEntity;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.io.IOException;
import java.util.HashMap;




@Configuration
public class MyRabbitMQConfig {

    
    
  
   
    @Bean
    public MessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();
    }

    

    
    @Bean
    public Queue orderDelayQueue() {
        
        HashMap arguments = new HashMap<>();
        //死信路由
        arguments.put("x-dead-letter-exchange", "order-event-exchange");
        //死信路由键
        arguments.put("x-dead-letter-routing-key", "order.release.order");
        arguments.put("x-message-ttl", 60000); // 消息过期时间 1分钟
        //创建队列
        Queue queue = new Queue("order.delay.queue", true, false, false, arguments);

        return queue;
    }

    
    @Bean
    public Queue orderReleaseQueue() {

        Queue queue = new Queue("order.release.order.queue", true, false, false);

        return queue;
    }

    
    @Bean
    public Exchange orderEventExchange() {
        
        return new TopicExchange("order-event-exchange", true, false);

    }

    
    @Bean
    public Binding orderCreateBinding() {
        
        return new Binding("order.delay.queue",
                Binding.DestinationType.QUEUE,
                "order-event-exchange",
                "order.create.order",
                null);
    }

    
    @Bean
    public Binding orderReleaseBinding() {

        return new Binding("order.release.order.queue",
                Binding.DestinationType.QUEUE,
                "order-event-exchange",
                "order.release.order",
                null);
    }

    
    @Bean
    public Binding orderReleaseOtherBinding() {

        return new Binding("stock.release.stock.queue",
                Binding.DestinationType.QUEUE,
                "order-event-exchange",
                "order.release.other.#",
                null);
    }


    
    @Bean
    public Queue orderSecKillOrrderQueue() {
        Queue queue = new Queue("order.seckill.order.queue", true, false, false);
        return queue;
    }

    @Bean
    public Binding orderSecKillOrrderQueueBinding() {
        //String destination, DestinationType destinationType, String exchange, String routingKey,
        // 			Map arguments
        Binding binding = new Binding(
                "order.seckill.order.queue",
                Binding.DestinationType.QUEUE,
                "order-event-exchange",
                "order.seckill.order",
                null);

        return binding;
    }

}
4.5. 测试代码

AmqpAdmin:管理组件RabbitTemplate:消息发送处理组件@RabbitListener 监听消息的方法可以有三种参数(不分数量,顺序)Object content, Message message,Channel channel

import com.lyh.mall.order.entity.OrderReturnReasonEntity;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import java.util.Date;

@SpringBootTest
class MallOrderApplicationTests {

    @Autowired
    private AmqpAdmin amqpAdmin;
    @Autowired
    private RabbitTemplate rabbitTemplate;

    
    @Test
    void sendMessige(){
        OrderReturnReasonEntity reasonEntity = new OrderReturnReasonEntity();
        reasonEntity.setId(1L);
        reasonEntity.setCreateTime(new Date());
        reasonEntity.setName("你好!");
        rabbitTemplate.convertAndSend("hello-java-exchange","hello.java",reasonEntity);
        System.out.println("发送消息成功!!");
    }

    
    @Test
    void createExchange() {
        DirectExchange directExchange = new DirectExchange("hello-java-exchange",true,false);
        amqpAdmin.declareExchange(directExchange);
        System.out.println("创建成功");
    }

    
    @Test
    void createQueue() {
        Queue queue = new Queue("hello-java-queue",true,false,false);
        amqpAdmin.declareQueue(queue);
        System.out.println("创建成功");
    }

    
    @Test
    void createBinding() {
        //String destination【目的地】,
        // DestinationType destinationType【目的地类型】,
        // String exchange【交换机】,
        // String routingKey【路由键】,
        //@Nullable Map arguments【自定义参数】
        Binding binding = new Binding("hello-java-queue",Binding.DestinationType.QUEUE,"hello-java-exchange","hello.java",null);
        amqpAdmin.declareBinding(binding);
        System.out.println("创建成功");
    }

}

//service层加监听注解获取 消息数据


     @RabbitListener(queues = {"hello-java-queue"})
    //这个类的这个方法才能接受hello-java-queue消息
    //@RabbitHandler  //类上加注解@RabbitListener(queues = {"hello-java-queue"})
    public void receiveMessage(Message message, OrderReturnReasonEntity content, Channel channel) {

        //拿到消息体
//        byte[] body = message.getBody();
        //拿到消息头
//        MessageProperties properties = message.getMessageProperties();

        System.out.println("接收到消息:" + content);

        //消息处理完 手动确认  deliveryTag在Channel内按顺序自增
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        System.out.println("deliveryTag->" + deliveryTag);

        try {
            if (deliveryTag % 2 == 0) {
                //确认签收 队列删除该消息 false非批量模式
                channel.basicAck(deliveryTag, false);
            } else {
                //拒收退货 第三个参数 -> true:重新入队 false:丢弃
                channel.basicNack(deliveryTag, false, true);
            }
        } catch (IOException e) {
            //网络中断
        }
    }

    //    @RabbitHandler
    //public void receiveMessage2(OrderEntity content) {

       // System.out.println("接收到消息:" + content);
    //}
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/747337.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号