栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

RabbitMQ消息中间件技术精讲(二)

RabbitMQ消息中间件技术精讲(二)

文章目录

第二章 `RabbitMQ`核心概念

2.1-大厂使用`RabbitMQ`2.2-`RabbitMQ`高性能原因2.3-`AMQP`协议2.4-`AMQP`核心概念2.5-RabbitMQ整体架构与消息流转2.6-`RabbitMQ`环境安装2.7-命令行与管控台结合讲解2.8-`SpringBoot`和`RabbitMQ`简单应用2.9-交换机讲解2.10-绑定、队列、消息、虚拟主机详解

第二章 RabbitMQ核心概念 2.1-大厂使用RabbitMQ
    RabbitMQ介绍

    是一个开源的消息代理和队列服务器,用来在不同应用直接共享数据,实现跨语言问题试用Erlang语言编写基于AMQP协议 滴滴,美团,头条,去哪儿

    开源,性能优秀,稳定性保障提供可靠性消息投递模式(/confirm/i模式),返回模式(return)与SpringAMQP完美整合,API丰富集群模式丰富,表达式配置,HA模式(高可用模式),镜像队列模型保证数据不丢失的前提下做到高可靠性、可用性

2.2-RabbitMQ高性能原因
    Erlang语言用于交换机领域,Erlang优点:Erlang有着和原生Socket一样的延迟
2.3-AMQP协议

    AMQP - advanced message queuing protocol 高级消息队列协议 pk JMS java message service

    AMQP 是具有现代特征的二进制协议,是一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计

    AMQP协议模式(->server->virtual host->exchange)3次主要的关联

2.4-AMQP核心概念
    Server:又称Broker,接受客户端的连接,实现AMQP实体服务Connection:连接Channel:网络信道,几乎所有的操作都在Channel中进行,Channel是进行消息读写的通道,客户端可建立多个Channel,每个Channel代表一个会话任务。类似登录的session。Message:消息,服务器与应用程序之间传递的数据,由Properties和Body组成。Properties可以对消息进行修饰,比如消息的优先级、延迟投递等高级特性,Body则是消息体内容。Virtual Host:虚拟地址,用于进行逻辑隔离(例如redis 中有16个db,db0-15,比如内存16G,不是说每个db分配1G,db0也是可以存储16个G的,这就是逻辑隔离,不是物理隔离),最上层的消息路由。一个Virtual Host里面可以有若干个Exchange和Queue,同一个Virtual Host 里面不能有重名的Exchange和QueueExchange:交换机,接收消息,根据路由键转发消息到指定绑定的队列上Binding:Exchange与Queue之间的虚拟连接,binding可以包含routing keyRouting key : 一个路由规则,虚拟机可以用它来确定如何路由一个特定消息Queue:消息队列,保存消息并将他们转发到消费者
2.5-RabbitMQ整体架构与消息流转

    整体架构(生产者关注交换机,消费者监听队列,双方解耦)

2.6-RabbitMQ环境安装
    安装Erlang安装RabbitMQ(推荐直接用Docker安装)另起文章说明
# 第一步:查看仓库里的RabbitMQ
docker search rabbitmq
# 第二步:安装
docker pull rabbitmq:3.8
# 第三步:启动
docker run -d --hostname my-rabbit --name rabbit -p 15672:15672 -p 5672:5672 rabbitmq:3.8
# 第四步:安装插件
docker ps 
docker exec -it 镜像ID /bin/bash
rabbitmq-plugins enable rabbitmq_management
# 第五步:访问地址
http://ip地址:15672,这里的用户名和密码默认都是guest
2.7-命令行与管控台结合讲解

    命令行工具

    # 想看什么就List
    	 rabbitmqctl list_exchanges
    	 rabbitmqctl list_queues
         rabbitmqctl list_users
    # 想加什么就add
        rabbitmqctl add_user username password
    # 想清空什么就purge
    	 rabbitmqctl purge_queue
    # 想删除什么就delete
    	 rabbitmqctl delete_queue
    # 想知道就  --help
        
    rabbitmqctl reset 移除所有数据,必须先停止RabbitMQ
    rabbitmqctl join_cluster  [--ram] 组成集群命令
    rabbitmqctl cluster_status 查看集群状态
    rabbitmqctl change_cluster_node_type disc|ram  修改集群节点的存储方式,磁盘或内存
    rabbitmqctl forget_cluster_node [--offline] 忘记节点(摘除节点)
     
    

    管控台

    overview总览connections 连接channels 信道exchanges 交换机

    type 一栏,4种方式,direct直连模式 fanout广播模式 topic 路由模式 headers 数据头模式features(特性)一栏,D(durable)持久化,即使停止了,在开启也会有数据 I(internal)内部的 queues 队列admin 用户管理

2.8-SpringBoot和RabbitMQ简单应用
    ConnectionFactory 获取连接工厂Connection 新建连接Channel 获取数据信道,可发送和接收数据Producer 和Consumer 生产者和消费者

生产者

public class Producer {
private static final String EXCHANGE_NAME = "exchange_01";
private static final String ROUTING_KEY = "routingkey_01";
private static final String QUEUE_NAME = "queue_01";
private static final String IP_ADDRESS = "127.0.0.1";
private static final String USER = "admin";
private static final String PASSWORD = "admin";
private static final int PORT = 5672;


public static void main(String[] args) throws IOException, TimeoutException {
    //1-创建连接工厂
    ConnectionFactory connectionFactory = new ConnectionFactory();
    connectionFactory.setHost(IP_ADDRESS);
    connectionFactory.setPort(PORT);
    connectionFactory.setUsername(USER);
    connectionFactory.setPassword(PASSWORD);
    //2-创建连接Connection
    Connection connection = connectionFactory.newConnection();
    //3-创建信道Channel
    Channel channel = connection.createChannel();
    //4-创建交换机Exchange-交换机名称,type,是否持久化,是否自动删除,没有其他参数
    channel.exchangeDeclare(EXCHANGE_NAME,"direct",true,false,null);
    //5-创建队列Queue-队列名称,是否持久化,是否排他(保证顺序消费),是否自动删除,没有其他参数
    channel.queueDeclare(QUEUE_NAME,true,false,false,null);
    //6-队列与交换机进行绑定
    channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,ROUTING_KEY);
    String msg = "hello,rabbitmq";
    //7-发送数据
    //basicPublish(String exchange, String routingKey, boolean mandatory, BasicProperties props, byte[] body)
    // 如果没有指定交换机,则会默认使用AMQP default这个交换机,同时也会默认认为队列名称 == routing key 
    channel.basicPublish(EXCHANGE_NAME,ROUTING_KEY,MessageProperties.PERSISTENT_BASIC,msg.getBytes());
    //8-关闭资源
    channel.close();
    connection.close();
}
}

消费者

public class Consumer {
    private static final String QUEUE_NAME = "queue_01";
    private static final String IP_ADDRESS = "127.0.0.1";
    private static final String USER = "admin";
    private static final String PASSWORD = "admin";
    private static final int PORT = 5672;

    public static void main(String[] args) throws IOException, TimeoutException {
        //1-创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(IP_ADDRESS);
        connectionFactory.setPort(PORT);
        connectionFactory.setUsername(USER);
        connectionFactory.setPassword(PASSWORD);
        //2-创建连接Connection
        Connection connection = connectionFactory.newConnection();
        //3-创建信道Channel
        Channel channel = connection.createChannel();
        //4-创建消费者
        DefaultConsumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("msg-"+new String(body));
            }
        };
        //5-消费数据-队列名称,设置是否自动签收,消费者
        channel.basicConsume(QUEUE_NAME,true,consumer);
        //6-关闭资源
        channel.close();
        connection.close();
    }
}
2.9-交换机讲解

    Exchange:接收消息,并根据路由键转发消息 到所绑定的队列

    交换机属性

    Name 交换机名称Type 交换机类型 direct-topic-fanout-headersDurability 是否持久化Auto delete 当最后一个绑定到Exchange上的队列删除后,自动删除该ExchangeInternal 当前Exchange是否用于RabbitMQ内部使用,默认false,基本不会修改这个属性值Arguments 扩展参数

    交换机类型

    Direct 直连交换机

    所有发送到Direct Exchange的消息,都会被转发到RoutingKey中指定的QueueDriect 模式可以使用RabbitMQ自带的Exchange:default Exchange,所以不需要将Exchange进行任何绑定操作,消息传递时,RoutingKey必须完整匹配才会被队列接收,否则该消息会被抛弃。

    Topic 主题交换机,路由规则交换机

    所有发送到Topic Exchange 的消息被转发到所有关心RoutingKey中指定Topic的Queue上

    Exchange 将RoutingKey和某个Topic进行模糊匹配,此时队列需要绑定一个Topic

    # 匹配一个或多个词 比如: log.# 能够匹配log.info.a 或 log.message* 只匹配一个词 比如:log.* 能够匹配 log.info

Fanout 扇形交换机,播发交换机

不处理路由键(RoutingKey),只需要简单的将队列绑定到交换机上发送到交换机的消息,都会被转发到与该交换机绑定的所有队列上转发消息是最快的 2.10-绑定、队列、消息、虚拟主机详解

    Binding 绑定

    Exchange和Exchange、Queue之间的连接关系Binding中可以包含RoutingKey或其他参数

    Queue 队列

    实际存储消息数据Durablity是否持久化,Durable(持久化):是,Transient:否(瞬时状态)Auto delete 如果是yes,代表当最后一个监听被移除后,该Queue会自动删除

    Message 消息

    本质就是一段数据,由Properties和Payload(Body体)组成

    常用属性:delivery mode(配送模式)、headers(自定义属性)

    其他属性:content_type、content_encoding、priority(优先级)

    correlation_id(消息唯一id,相关ack、幂等时使用)、reply_to(做重回队列时使用,比如消息消费失败了回到指定队列)、expiration(过期时间)、message_id(消息id)

    //设置message参数
    AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
        .deliveryMode(2) //配送模式2,表示持久化,即使rabbitmq重启数据也不会丢失
        .contentEncoding("UTF-8")
        .expiration("10000") //设置过期时间
        .build();
    
    channel.basicPublish(EXCHANGE_NAME,ROUTING_KEY,properties,msg.getBytes());
    

    Virtual host虚拟主机

    虚拟地址,用于进行逻辑隔离,最上层的消息路由,类似Redis16个db,这里要深入理解下一个虚拟主机中,可以有若干个Exchange和Queue同一个虚拟主机中,不可以有重名的Exchange和Queue

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/745698.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号