栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

RabbitMQ学习笔记 02

RabbitMQ学习笔记 02

RabbitMQ学习笔记 02

01 安装RabbitMQ

Docker安装RabbitMQ 02 RabbitMQ的用户角色分类及权限

nonemanagement:查看自己相关节点信息PolicymakerMonitoringAdministrator 03 入门案例:简单模式的消息队列

1 构建一个Maven工程 jdk1.82 引入依赖3. 定义生产者定义消费者 04 什么是AMQP

AMQP生产者流转过程AMQP消费者流转过程 05 RabbitMQ的核心组成部分

核心组成部分整体架构运行流程 发布订阅Fanout模式(Web操作)Routing(direct)路由模式(web演示)Topic主题模式(Web演示)headers模式,参数模式 06 fanout发布订阅模式实现

生产者 07 direct(路由)模式实现08 Topic(主题)模式实现09 声明创建绑定实现10 Work轮询模式理解11、Work公平模式理解

01 安装RabbitMQ Docker安装RabbitMQ

官方网站

1:https://www.rabbitmq.com/download.html
2:https://registry.hub.docker.com/_/rabbitmq/

直接执行运行命令并设置账号和密码

docker run -di --name myrabbit -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -p 15672:15672 -p 5672:5672 -p 25672:25672 -p 61613:61613 -p 1883:1883 rabbitmq:management

注:需要服务器安全组开放这几个端口
运行RabbitMQ:

docker run -di --name=myrabbit -p 15672:15672 rabbitmq:management

Web端登陆
网址:服务器ip:15672

02 RabbitMQ的用户角色分类及权限 none

不能访问management plugin

management:查看自己相关节点信息

列出自己可以通过AMQP登入的虚拟机查看自己的虚拟机节点 virtual hosts的queues,exchanges和bindings信息查看和关闭自己的channels和connections查看有关自己的虚拟机节点virtual hosts的统计信息。包括其他用户在这个节点virtual hosts中的活动信息。 Policymaker

包含management所有权限查看和创建和删除自己的virtual hosts所属的policies和parameters信息。 Monitoring

包含management所有权限罗列出所有的virtual hosts,包括不能登录的virtual hosts。查看其他用户的connections和channels信息查看其他用户的connections和channels信息查看其他用户的connections和channels信息 Administrator

最高权限可以创建和删除virtual hosts可以查看,创建和删除users查看创建permisssions关闭所有用户的connections

Administrator可以增加用户:

03 入门案例:简单模式的消息队列

官网手册
官网列举的几种消息分发机制:

1 构建一个Maven工程 jdk1.8 2 引入依赖
 
        
            com.rabbitmq
            amqp-client
            5.10.0
        

        
            org.springframework.amqp
            spring-amqp
            2.2.5.RELEASE
        
        
            org.springframework.amqp
            spring-rabbit
            2.2.5.RELEASE
        

        
            com.smarthito
            spring-boot-starter-amqp-rmq
            1.0.1
        
    
3. 定义生产者
package com.sl.rabbitmq.simple;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Producer {

    public static void main(String[] args) {
        // ip 和 端口
        //1.创建连接工程
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("8.142.113.12");  //ip
        connectionFactory.setPort(5672);  //端口
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");
        connectionFactory.setVirtualHost("/");

        Connection connection = null;
        Channel channel = null;
        try {
            //2.创建链接Connection
            connection = connectionFactory.newConnection("生产者");
            //3.通过连接获取通道Channel
            channel = connection.createChannel();
            //4.通过通道创建交换机、声明队列、绑定关系、路由key、发送消息和接收消息
            String queueName = "queue1";
            
            channel.queueDeclare(queueName, false, false, false, null);
            //5.准备消息内容
            String message = "Hello, World!";
            //6.发送消息给队列
            channel.basicPublish("", queueName, null, message.getBytes());

        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }finally {
            //7.关闭通道
            if(channel != null && channel.isOpen()){
                try{
                    channel.close();
                }catch (Exception ex){
                    ex.printStackTrace();
                }
            }
            //8.关闭连接
            if(connection != null && connection.isOpen()){
                try {
                    connection.close();
                }catch (Exception e){
                    e.printStackTrace();
                }
            }
        }

    }

}
 

开启RabbitMQ服务

测试运行生产者,连接成功后可以在RabbitMQ的Web端看到connection:

然后可以看到建立的通道和队列:



Overview界面:

可以对队列的消息进行的预览和测试如下:

手动创建持久化队列:

Web控制台还可以做:

定义消费者
package com.sl.rabbitmq.simple;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer {

    public static void main(String[] args) {
        // ip 和 端口
        //1.创建连接工程
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("8.142.113.12");  //ip
        connectionFactory.setPort(5672);  //端口
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");
        connectionFactory.setVirtualHost("/");

        Connection connection = null;
        Channel channel = null;
        try {
            //2.创建链接Connection
            connection = connectionFactory.newConnection("生产者");
            //3.通过连接获取通道Channel
            channel = connection.createChannel();
            //4.通过通道创建交换机、声明队列、绑定关系、路由key、发送消息和接收消息
            channel.basicConsume("queue1", true, new DeliverCallback() {
                public void handle(String s, Delivery delivery) throws IOException {
                    System.out.println("收到消息是" + new String(delivery.getBody(), "UTF-8"));
                }
            }, new CancelCallback() {
                public void handle(String s) throws IOException {
                    System.out.println("接收消息失败");
                }
            });

            System.out.println("开始接收消息");
            System.in.read();
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }finally {
            //7.关闭通道
            if(channel != null && channel.isOpen()){
                try{
                    channel.close();
                }catch (Exception ex){
                    ex.printStackTrace();
                }
            }
            //8.关闭连接
            if(connection != null && connection.isOpen()){
                try {
                    connection.close();
                }catch (Exception e){
                    e.printStackTrace();
                }
            }
        }
    }

}

运行消费者主程序:
控制台打印:

队列中的消息已被消费:

此时结束消费者进程,非持久化队列会被自动移除:

注:非持久化队列的数据也会存盘,但随着服务的重启会丢失

04 什么是AMQP

AMQP全称:Advanced Message Queuing Protocol(高级消息队列协议)。是应用层协议的一个开发标准,为面向消息的中间件设计。

AMQP生产者流转过程

为什么RabbitMQ是基于通道进行处理,而非连接?
长连接—channel

AMQP消费者流转过程

05 RabbitMQ的核心组成部分 核心组成部分


Exchange是交换机,在生产者发送消息给队列时调用的Channel.basicPublish()方法第一个参数就是指定交换机。而在简单模式中exchange参数赋给空字符串""的值,那么可以存在没有交换机的队列吗?
答案是没有指定交换机的时候会存在一个默认的交换机。

核心概念:
Server:又称Broker ,接受客户端的连接,实现AMQP实体服务。 安装rabbitmq-server
Connection:连接,应用程序与Broker的网络连接 TCP/IP/ 三次握手和四次挥手
Channel:网络信道,几乎所有的操作都在Channel中进行,Channel是进行消息读写的通道,客户端可以建立对各Channel,每个Channel代表一个会话任务。
Message :消息:服务与应用程序之间传送的数据,由Properties和body组成,Properties可是对消息进行修饰,比如消息的优先级,延迟等高级特性,Body则就是消息体的内容。
Virtual Host 虚拟地址,用于进行逻辑隔离,最上层的消息路由,一个虚拟主机理由可以有若干个Exhange和Queueu,同一个虚拟主机里面不能有相同名字的Exchange
Exchange:交换机,接受消息,根据路由键发送消息到绑定的队列。(不具备消息存储的能力)
Bindings:Exchange和Queue之间的虚拟连接,binding中可以保护多个routing key.
Routing key:是一个路由规则,虚拟机可以用它来确定如何路由一个特定消息。
Queue:队列:也成为Message Queue,消息队列,保存消息并将它们转发给消费者。

整体架构

运行流程

发布订阅Fanout模式(Web操作)

创建交换机

交换机绑定队列

交换机发送消息

结果是三个队列都能收到消息

也就是在发布与订阅模式中,所有订阅者都能收到发布者的消息

Routing(direct)路由模式(web演示)


在发布订阅模式的基础上多了路由,根据Routing key做选择

绑定队列时需要指定队列的Routing key

发消息指定Routing key

结果是只有queue1和queue2收到了消息

Topic主题模式(Web演示)

在路由模式的基础上又增加了模糊匹配

Routing key中的#代表的是0级或多级(每级以 . 分割)
对queue1来说就是发布的消息的key只要以com开头,后面有多少级都无所谓,queue1都会收到消息
Routing key中的*可以匹配一级

比如指定com.course.order的key发送消息:

发现queue1、queue2、queue3都能收到这个消息。

headers模式,参数模式

可以根据携带的参数进行过滤

06 fanout发布订阅模式实现

生产者

package com.sl.rabbitmq.routing;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Producer {

    public static void main(String[] args) {
        // ip 和 端口
        //1.创建连接工程
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("8.142.113.12");  //ip
        connectionFactory.setPort(5672);  //端口
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");
        connectionFactory.setVirtualHost("/");

        Connection connection = null;
        Channel channel = null;
        try {
            //2.创建链接Connection
            connection = connectionFactory.newConnection("生产者");
            //3.通过连接获取通道Channel
            channel = connection.createChannel();
            //4.准备发送消息的内容
            String message = "你好,fanout";
            String  exchangeName = "fanout-exchange";
            String routingKey = "";
            // 5: 发送消息给中间件rabbitmq-server
            // @params1: 交换机exchange
            // @params2: 队列名称/routingkey
            // @params3: 属性配置
            // @params4: 发送消息的内容
            channel.basicPublish(exchangeName, routingKey, null, message.getBytes());
            System.out.println("发送消息成功");
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }finally {
            //7.关闭通道
            if(channel != null && channel.isOpen()){
                try{
                    channel.close();
                }catch (Exception ex){
                    ex.printStackTrace();
                }
            }
            //8.关闭连接
            if(connection != null && connection.isOpen()){
                try {
                    connection.close();
                }catch (Exception e){
                    e.printStackTrace();
                }
            }
        }

    }

}

定义消费者

package com.sl.rabbitmq.routing;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer {

    private static Runnable runnable = () -> {
        // ip 和 端口
        //1.创建连接工程
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("8.142.113.12");  //ip
        connectionFactory.setPort(5672);  //端口
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");
        connectionFactory.setVirtualHost("/");

        //获取队列名称
        final String queueName = Thread.currentThread().getName();

        Connection connection = null;
        Channel channel = null;
        try {
            //2.创建链接Connection
            connection = connectionFactory.newConnection("生产者");
            //3.通过连接获取通道Channel
            channel = connection.createChannel();

            //4.定义接收消息的回调
            Channel finalChannel = channel;

            channel.basicConsume(queueName, true, new DeliverCallback() {
                public void handle(String s, Delivery delivery) throws IOException {
                    System.out.println(queueName + "收到消息是" + new String(delivery.getBody(), "UTF-8"));
                }
            }, new CancelCallback() {
                public void handle(String s) throws IOException {
                    System.out.println("接收消息失败");
                }
            });

            System.out.println("开始接收消息");
            System.in.read();
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }finally {
            //7.关闭通道
            if(channel != null && channel.isOpen()){
                try{
                    channel.close();
                }catch (Exception ex){
                    ex.printStackTrace();
                }
            }
            //8.关闭连接
            if(connection != null && connection.isOpen()){
                try {
                    connection.close();
                }catch (Exception e){
                    e.printStackTrace();
                }
            }
        }
    };

    public static void main(String[] args) {
        // 启动三个线程去执行
        new Thread(runnable, "queue1").start();
        new Thread(runnable, "queue2").start();
        new Thread(runnable, "queue3").start();
    }

}

这里用三个消费的线程去消费三个队列的消息

注:在代码中队列都没有声明的原因是之前在Web端声明和绑定过了

结果:

订阅者都收到了消息,并且之前队列中的消息都被消费完:

07 direct(路由)模式实现


修改交换机名字:

队列路由key:

指定Routing key发送消息

 String routingKey = "email";


结果是Routing key为email的队列收到消息

08 Topic(主题)模式实现


交换机:

同样只在direct模式的基础上修改交换机名称和Routing key即可

			//4.准备发送消息的内容
            String message = "你好,topic";
            String  exchangeName = "topic_exchange";
            String routingKey = "com.order.test.xxx";

结果:

09 声明创建绑定实现

创建交换机:

//创建交换机
            String exchangeName = "direct_message_exchange";
            String exchangeType = "direct";
            
            channel.exchangeDeclare(exchangeName, exchangeType, true);

声明队列:

            channel.queueDeclare("queue5", true, false, false, null);
            channel.queueDeclare("queue6", true, false, false, null);
            channel.queueDeclare("queue7", true, false, false, null);

绑定队列和交换机:

//绑定队列和交换机的关系
            
            channel.queueBind("queue5", exchangeName, "order");
            channel.queueBind("queue6", exchangeName, "order");
            channel.queueBind("queue7", exchangeName, "course");

发送消息

 channel.basicPublish(exchangeName, "order", null, message.getBytes());

结果:

队列5、6收到消息

10 Work轮询模式理解


当有多个消费者时,我们的消息会被哪个消费者消费呢,我们又该如何均衡消费者消费信息的多少呢?
主要有两种模式:
1、轮询模式的分发:一个消费者一条,按均分配;
2、公平分发:根据消费者的消费能力进行公平分发,处理快的处理的多,处理慢的处理的少;按劳分配。

Work模式 - 轮询模式(Round-Robin)
特点:该模式接收消息是当有多个消费者接入时,消息的分配模式是一个消费者分配一条,直至消息消费完成。

生产者

package com.sl.rabbitmq.work.lunxun;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Producer {
    public static void main(String[] args) {
        // 1: 创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 2: 设置连接属性
        connectionFactory.setHost("8.142.113.12");  //ip
        connectionFactory.setPort(5672);  //端口
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");
        connectionFactory.setVirtualHost("/");
        Connection connection = null;
        Channel channel = null;
        try {
            // 3: 从连接工厂中获取连接
            connection = connectionFactory.newConnection("生产者");
            // 4: 从连接中获取通道channel
            channel = connection.createChannel();
            // 6: 准备发送消息的内容
            for (int i = 1; i <= 20; i++) {
                //消息的内容
                String msg = "Work_lunxun:" + i;
                // 7: 发送消息给中间件rabbitmq-server
                // @params1: 交换机exchange
                // @params2: 队列名称/routingkey
                // @params3: 属性配置
                // @params4: 发送消息的内容
                channel.basicPublish("", "queue1", null, msg.getBytes());
            }
            System.out.println("消息发送成功!");
        } catch (Exception ex) {
            ex.printStackTrace();
            System.out.println("发送消息出现异常...");
        } finally {
            // 7: 释放连接关闭通道
            if (channel != null && channel.isOpen()) {
                try {
                    channel.close();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
            if (connection != null) {
                try {
                    connection.close();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
        }
    }
}

消费者Consumer1,Consumer2
Consumer1:

package com.sl.rabbitmq.work.lunxun;
import com.rabbitmq.client.*;
import java.io.IOException;

public class Consumer1 {

    public static void main(String[] args) {
        // ip 和 端口
        //1.创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 2: 设置连接属性
        connectionFactory.setHost("8.142.113.12");  //ip
        connectionFactory.setPort(5672);  //端口
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");
        connectionFactory.setVirtualHost("/");
        Connection connection = null;
        Channel channel = null;
        try {
            // 3: 从连接工厂中获取连接
            connection = connectionFactory.newConnection("消费者-Work1");
            // 4: 从连接中获取通道channel
            channel = connection.createChannel();
            // 5: 申明队列queue存储消息
            
            // 这里如果queue已经被创建过一次了,可以不需要定义
//            channel.queueDeclare("queue1", false, false, false, null);
            // 同一时刻,服务器只会推送一条消息给消费者
            // 6: 定义接受消息的回调
            Channel finalChannel = channel;
            finalChannel.basicQos(1);
            finalChannel.basicConsume("queue1", true, new DeliverCallback() {
                @Override
                public void handle(String s, Delivery delivery) throws IOException {
                    try{
                        System.out.println("Consumer1-收到消息是:" + new String(delivery.getBody(), "UTF-8"));
                        Thread.sleep(2000);
                    }catch(Exception ex){
                        ex.printStackTrace();
                    }
                }
            }, new CancelCallback() {
                @Override
                public void handle(String s) throws IOException {
                }
            });
            System.out.println("Consumer1-开始接受消息");
            System.in.read();
        } catch (Exception ex) {
            ex.printStackTrace();
            System.out.println("发送消息出现异常...");
        } finally {
            // 7: 释放连接关闭通道
            if (channel != null && channel.isOpen()) {
                try {
                    channel.close();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
            if (connection != null && connection.isOpen()) {
                try {
                    connection.close();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
        }
    }

}

Consumer2同理。

先运行两个消费者进程,再运行生产者进程
结果:


这就是Work轮询模式。
在轮询模式中不会因为消费者的速度慢而导致消息分发不公平,例如将Consumer2
的线程休眠时间改为
结果还是一样的,Consumer1并不会少消费。

11、Work公平模式理解

将轮询模式的自动应答改为手动应答:

package com.sl.rabbitmq.work.lunxun;
import com.rabbitmq.client.*;
import java.io.IOException;

public class Consumer1 {

    public static void main(String[] args) {
        // ip 和 端口
        //1.创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 2: 设置连接属性
        connectionFactory.setHost("8.142.113.12");  //ip
        connectionFactory.setPort(5672);  //端口
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");
        connectionFactory.setVirtualHost("/");
        Connection connection = null;
        Channel channel = null;
        try {
            // 3: 从连接工厂中获取连接
            connection = connectionFactory.newConnection("消费者-Work1");
            // 4: 从连接中获取通道channel
            channel = connection.createChannel();
            // 5: 申明队列queue存储消息
            
            // 这里如果queue已经被创建过一次了,可以不需要定义
//            channel.queueDeclare("queue1", false, false, false, null);
            // 同一时刻,服务器只会推送一条消息给消费者
            // 6: 定义接受消息的回调
            Channel finalChannel = channel;
            finalChannel.basicQos(1);
            finalChannel.basicConsume("queue1", false, new DeliverCallback() {
                @Override
                public void handle(String s, Delivery delivery) throws IOException {
                    try{
                        System.out.println("Consumer1-收到消息是:" + new String(delivery.getBody(), "UTF-8"));
                        Thread.sleep(2000);
                        finalChannel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
                    }catch(Exception ex){
                        ex.printStackTrace();
                    }
                }
            }, new CancelCallback() {
                @Override
                public void handle(String s) throws IOException {
                }
            });
            System.out.println("Consumer1-开始接受消息");
            System.in.read();
        } catch (Exception ex) {
            ex.printStackTrace();
            System.out.println("发送消息出现异常...");
        } finally {
            // 7: 释放连接关闭通道
            if (channel != null && channel.isOpen()) {
                try {
                    channel.close();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
            if (connection != null && connection.isOpen()) {
                try {
                    connection.close();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
        }
    }

}

Consumer1在消费完一条消息后休眠2000毫秒即2秒,Consumer消费完一条消息后休眠200毫秒
结果:


公平分发讲究能者多劳
注:公平模式需要改为手动应答

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/722685.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号