栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

RabbitMQ消息队列(一)

RabbitMQ消息队列(一)

RabbitMQ消息队列中间件

直接安装在Linux上

启动服务

systemctl start rabbitmq-server
查看服务状态

systemctl status rabbitmq-server
停止服务

systemctl stop rabbitmq-server
开机启动服务

systemctl enable rabbitmq-server

一、消息队列介绍 1. 同步调用和异步调用
  • 同步调用
    • A服务调用B服务,需要等待B服务执行完毕的返回值,A服务才可以继续往下执行
    • 同步调用可以通过REST和RPC完成
      • REST:Ribbon,Feign
      • RPC:Dubbo
  • 异步调用
    • A服务调用B服务,A无需等待B服务的执行结果
    • 通过消息队列实现异步调用
2. 消息队列概念
  • MQ全称为Message Queue,消息队列(MQ)是一种应用程序对应用程序的通信方法。应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们。
  • 消息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术。排队指的是应用程序通过队列来通信。队列的使用除去了接收和发送应用程序同时执行的要求。
3. 常见消息队列中间件
  • RabbitMQ、ActiveMQ、RocketMQ、Kafka
    • RabbitMQ 稳定可靠,数据一致,支持多协议,有消息确认,基于erlang语言
    • Kafka 高吞吐,高性能,快速持久化,无消息确认,无消息遗漏,可能会有有重复消息,依赖于zookeeper,成本高.
    • ActiveMQ 不够灵活轻巧,对队列较多情况支持不好.
    • RocketMQ 性能好,高吞吐,高可用性,支持大规模分布式,协议支持单一
二、RabbitMQ 1. RabbitMQ介绍
  • RabbitMQ是一个在AMQP基础上完成的,可复用的企业消息系统。他遵循Mozilla Public License开源协议。
  • AMQP,即Advanced Message Queuing Protocol, 一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。Erlang中的实现有 RabbitMQ等。
  • 主要特性:
    • 保证可靠性:使用一些机制来保证可靠性,如持久化、传输确认、发布确认
    • 灵活的路由功能
    • 可伸缩性:支持消息集群,多台RabbitMQ服务器可以组成一个集群
    • 高可用性:RabbitMQ集群中的某个节点出现问题时队列任然可用
    • 支持多种协议
    • 支持多语言客户端
    • 提供良好的管理界面
    • 提供跟踪机制:如果消息出现异常,可以通过跟踪机制分析异常原因
    • 提供插件机制:可通过插件进行多方面扩展
2. 通过docker安装rabbitmq 2.1 Erlang安装

参考地址:https://www.erlang-solutions.com/downloads/

wget https://packages.erlang-solutions.com/erlang-solutions-2.0-1.noarch.rpmrpm -Uvh erlang-solutions-2.0-1.noarch.rpm
yum install -y erlang
erl -v

安装socat

yum install -y socat
2.2 安装dokcer
(1)yum 包更新到最新
yum update
(2)安装需要的软件包, yum-util 提供yum-config-manager功能,另外两个是devicemapper驱动依赖的
yum install -y yum-utils device-mapper-persistent-data lvm2
(3)设置yum源为阿里云
yum-config-manager --add-repo http://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo
(4)安装docker
yum install docker-ce -y
(5)安装后查看docker版本
docker -v
 (6) 安装加速镜像
 sudo mkdir -p /etc/docker
 sudo tee /etc/docker/daemon.json <<-'EOF'
 {
"registry-mirrors": ["https://0wrdwnn6.mirror.aliyuncs.com"]
 }
 EOF
 sudo systemctl daemon-reload
 sudo systemctl restart docker

docker相关命令

# 启动docker:
systemctl start docker
# 停止docker:
systemctl stop docker
# 重启docker:
systemctl restart docker
# 查看docker状态:
systemctl status docker
# 开机启动:  
systemctl enable docker
systemctl unenable docker
# 查看docker概要信息
docker info
# 查看docker帮助文档
docker --help
2.3 安装rabbitmq
docker run -di --name myrabbit -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -p 15672:15672 -p 5672:5672 -p 25672:25672 -p 61613:61613 -p 1883:1883 rabbitmq:management

启动安装在docker中的RabbitMQ

  • docker ps -a

    查看所有的docker容器

  • docker start ConTAINER ID

    ConTAINER ID是在docker ps -a中显示出的容器的ConTAINER ID

  • docker ps

    查看已启动的容器

三、 RabbitMQ用户管理 1. 逻辑结构
  • 用户
  • 虚拟主机
  • 队列

2. 用户管理 2.1 用户级别
  • 超级管理员administrator,可以登录控制台,查看所有信息,可以对用户和策略进行操作
  • 监控者monitoring,可以登录控制台,可以查看节点的相关信息,比如进程数,内存磁盘使用情况
  • 策略制定者policymaker ,可以登录控制台,制定策略,但是无法查看节点信息
  • 普通管理员 management 仅能登录控制台
  • 其他, 无法登录控制台,一般指的是提供者和消费者
2.2 添加用户(命令模式)
  • docker ps
    #进入容器
    docker exec -it d2dd40da7056 /bin/bash
    
  • 添加/配置用户test 设置密码为test

    rabbitmqctl add_user test test
    

    如果不是通过docker安装rabbitmq,则需要在rabbitmq的sbin目录下执行./rabbitmqctl add_user test test

  • 设置用户权限

    #设置admin为administrator级别
    rabbitmqctl set_user_tags test administrator
    
2.3添加用户(web方式)
  • 浏览器访问:http://47.113.192.192/:15672/ (使用guest guest 登录, guest 具有最高权限,只能在本机登录;先使用命令行创建一个用户)
2.4 添加虚拟主机

2.5 用户绑定虚拟主机

四、RabbitMQ工作模式 1. 消息队列的模式

参考文档:http://www.rabbitmq.com/getstarted.html

1.1 简单模式

简单模式就是生产者将消息发到队列,消费者从队列中取消息,一条消息对应一个消费者

一个队列只有一个消费者

1.2 工作模式

Work模式就是一条消息可以被多个消费者尝试接收,但是最终只能有一个消费者能获取

1.3 订阅模式

一条消息可以被多个消费者同时获取

生产者将消息发送到交换机,消费者将自己对应的队列注册到交换机

当发送消息后所有注册了队列的消费者都可以收到消息

1.4 路由模式

生产者将消息发送到了type为direct模式的交换机

消费者的队列在将自己绑定到路由的时候会给自己绑定一个key

只有生产者发送对应key格式的消息时,相应队列才会收到消息

五、RabbtiMQ交换机和队列管理 1. 创建队列 2. 创建交换机 3. 交换机绑定队列

进入交换机ex1

在模式为fanout的交换机ex1上绑定两个队列queue1,queue2

六、在普通Maven应用中使用MQ

RabbitMQ消息队列模式

1. 简单模式 1.1 消息生产者
  • 创建Maven 项目

  • 添加RabbitMQ连接需要的依赖

    
        com.rabbitmq
        amqp-client
        4.10.0
    
    
        org.slf4j
        slf4j-log4j12
        1.7.25
    
    
        org.apache.commons
        commons-lang3
        3.9
    
    
  • 创建日志配置文件 log4j.properties

    log4j.rootLogger=DEBUG,A1 log4j.logger.com.taotao = DEBUG 
    log4j.logger.org.mybatis = DEBUG
    log4j.appender.A1=org.apache.log4j.ConsoleAppender
    log4j.appender.A1.layout=org.apache.log4j.PatternLayout
    log4j.appender.A1.layout.ConversionPattern=%-d{yyyy-MM-dd HH:mm:ss,SSS} [%t] [%c]-[%p] %m%n
    
  • 创建MQ连接帮助类

    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    public class ConnectionUtil {
    
        public static Connection getConnection() throws IOException, TimeoutException {
            //1.创建连接工厂
            ConnectionFactory factory = new ConnectionFactory();
            //2.在工厂对象中设置MQ的连接信息(ip,port,virtualhost,username,password)
            factory.setHost("47.113.192.192");
            factory.setPort(5672);
            factory.setVirtualHost("host1");
            factory.setUsername("admin");
            factory.setPassword("admin");
            //3.通过工厂对象获取与MQ的连接
            Connection connection = factory.newConnection();
            return connection;
        }
    }
    
  • 消息生产者发送消息

    import com.eicoma.mq.utils.ConnectionUtil;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    public class SendMsg {
    
        public static void main(String[] args) throws IOException, TimeoutException {
            String msg = "Hello RabbitMQ";
    
            Connection connection = ConnectionUtil.getConnection(); //相当于JDBC中的数据库连接
            Channel channel = connection.createChannel();   //相当于JDBC的statement
    
            //定义队列(在Java代码中新建一个MQ队列)
            //参数1:定义的队列名
            //参数2:队列中的数据是否选择持久化
            //参数3:是否排外(当前队列是否为当前连接所私有)
            //参数4:自动删除(当此队列的连接数为0时,此队列会销毁(无论队列中是否还有数据))
            //参数5:设置当前队列的参数
            //channel.queueDeclare("queue7",false,false,false,null);
    
            //参数1:交换机名称(此处为简单模式,没有交换机)
            //参数2:目标队列名称
            //参数3:设置当前这条消息的属性(如设置过期时间)
            //参数4:消息的内容
            channel.basicPublish("","queue1",null,msg.getBytes());
            System.out.println("发送" + msg);
    
            channel.close();
            connection.close();
        }
    }
    
    
1.2 消息消费者
  • 消息消费者接收消息

    import com.eicoma.mq.utils.ConnectionUtil;
    import com.rabbitmq.client.*;
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    public class ReceiveMsg {
    
        public static void main(String[] args) throws IOException, TimeoutException {
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel();
    
            DefaultConsumer consumer = new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    //body就是从队列中获取的数据
                    String msg = new String(body);
                    System.out.println("接收的数据是:" + msg);
                }
            };
    
            channel.basicConsume("queue1",true,consumer);
        }
    }
    
    
2. 工作模式 2.1 消息生产者
  • 消息生产者发送消息

    import com.eicoma.mq.utils.ConnectionUtil;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import java.io.IOException;
    import java.util.Scanner;
    import java.util.concurrent.TimeoutException;
    
    public class SendMsg {
    
        public static void main(String[] args) throws IOException, TimeoutException {
            System.out.println("请输入消息:");
            Scanner scanner = new Scanner(System.in);
            String msg = null;
            while(!"quit".equals(msg = scanner.nextLine())){
                Connection connection = ConnectionUtil.getConnection(); //相当于JDBC中的数据库连接
                Channel channel = connection.createChannel();   //相当于JDBC的statement
    
                //定义队列(在Java代码中新建一个MQ队列)
                //参数1:定义的队列名
                //参数2:队列中的数据是否选择持久化
                //参数3:是否排外(当前队列是否为当前连接所私有)
                //参数4:自动删除(当此队列的连接数为0时,此队列会销毁(无论队列中是否还有数据))
                //参数5:设置当前队列的参数
                //channel.queueDeclare("queue7",false,false,false,null);
    
                //参数1:交换机名称(此处为工作模式,没有交换机)
                //参数2:目标队列名称
                //参数3:设置当前这条消息的属性(如设置过期时间)
                //参数4:消息的内容
                channel.basicPublish("","queue2",null,msg.getBytes());
                System.out.println("发送" + msg);
    
                channel.close();
                connection.close();
            }
        }
    }
    
    
2.2 消息消费者
  • 消息消费者接收消息

    • consumer1
    import com.eicoma.mq.utils.ConnectionUtil;
    import com.rabbitmq.client.*;
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    public class ReceiveMsg {
    
        public static void main(String[] args) throws IOException, TimeoutException {
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel();
    
            DefaultConsumer consumer = new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    //body就是从队列中获取的数据
                    String msg = new String(body);
                    System.out.println("consumer1接收的数据是:" + msg);
                }
            };
    
            channel.basicConsume("queue2",true,consumer);
        }
    }
    
    
    • consumer2
    import com.eicoma.mq.utils.ConnectionUtil;
    import com.rabbitmq.client.*;
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    public class ReceiveMsg {
    
        public static void main(String[] args) throws IOException, TimeoutException {
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel();
    
            DefaultConsumer consumer = new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    //body就是从队列中获取的数据
                    String msg = new String(body);
                    System.out.println("consumer2接收的数据是:" + msg);
                }
            };
    
            channel.basicConsume("queue2",true,consumer);
        }
    }
    
3. 订阅模式 3.1 消息生产者
  • 消息生产者发送消息
import com.eicoma.mq.utils.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;

public class SendMsg {

    public static void main(String[] args) throws IOException, TimeoutException {
        System.out.println("请输入消息:");
        Scanner scanner = new Scanner(System.in);
        String msg = null;
        while(!"quit".equals(msg = scanner.nextLine())){
            Connection connection = ConnectionUtil.getConnection(); //相当于JDBC中的数据库连接
            Channel channel = connection.createChannel();   //相当于JDBC的statement
            //参数1:交换机名称
            //参数2:目标队列名称
            //参数3:设置当前这条消息的属性(如设置过期时间)
            //参数4:消息的内容
            channel.basicPublish("ex1","",null,msg.getBytes());
            System.out.println("发送" + msg);

            channel.close();
            connection.close();
        }
    }
}
3.2 消息消费者
  • 消息消费者接收消息

    • consumer1
    import com.eicoma.mq.utils.ConnectionUtil;
    import com.rabbitmq.client.*;
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    public class ReceiveMsg {
    
        public static void main(String[] args) throws IOException, TimeoutException {
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel();
    
            DefaultConsumer consumer = new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    //body就是从队列中获取的数据
                    String msg = new String(body);
                    System.out.println("consumer1接收的数据是:" + msg);
                }
            };
    
            channel.basicConsume("queue3",true,consumer);
        }
    }
    
    
    • consumer2
    import com.eicoma.mq.utils.ConnectionUtil;
    import com.rabbitmq.client.*;
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    public class ReceiveMsg {
    
        public static void main(String[] args) throws IOException, TimeoutException {
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel();
    
            DefaultConsumer consumer = new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    //body就是从队列中获取的数据
                    String msg = new String(body);
                    System.out.println("consumer2接收的数据是:" + msg);
                }
            };
    
            channel.basicConsume("queue4",true,consumer);
        }
    }
    
4. 路由模式 4.1 消息生产者
  • 消息生产者发送消息
import com.eicoma.mq.utils.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;

public class SendMsg {

    public static void main(String[] args) throws IOException, TimeoutException {
        System.out.println("请输入消息:");
        Scanner scanner = new Scanner(System.in);
        String msg = null;
        while(!"quit".equals(msg = scanner.nextLine())){
            Connection connection = ConnectionUtil.getConnection(); //相当于JDBC中的数据库连接
            Channel channel = connection.createChannel();   //相当于JDBC的statement

            //参数1:交换机名称
            //参数2:目标队列名称,但由于这里有交换机,因此该参数为Key
            //参数3:设置当前这条消息的属性(如设置过期时间)
            //参数4:消息的内容
            if(msg.startsWith("a")){
                channel.basicPublish("ex2","a" ,null,msg.getBytes());
            }else if(msg.startsWith("b")){
                channel.basicPublish("ex2","b" ,null,msg.getBytes());
            }
            System.out.println("发送" + msg);

            channel.close();
            connection.close();
        }
    }
}

4.2 消息消费者
  • 消息消费者接收消息

    • consumer1
    import com.eicoma.mq.utils.ConnectionUtil;
    import com.rabbitmq.client.*;
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    public class ReceiveMsg {
    
        public static void main(String[] args) throws IOException, TimeoutException {
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel();
    
            DefaultConsumer consumer = new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    //body就是从队列中获取的数据
                    String msg = new String(body);
                    System.out.println("consumer1接收的数据是:" + msg);
                }
            };
    
            channel.basicConsume("queue5",true,consumer);
        }
    }
    
    
    
    • consumer2
    import com.eicoma.mq.utils.ConnectionUtil;
    import com.rabbitmq.client.*;
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    public class ReceiveMsg {
    
        public static void main(String[] args) throws IOException, TimeoutException {
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel();
    
            DefaultConsumer consumer = new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    //body就是从队列中获取的数据
                    String msg = new String(body);
                    System.out.println("consumer2接收的数据是:" + msg);
                }
            };
    
            channel.basicConsume("queue6",true,consumer);
        }
    }
    
    
七、在SpringBoot中使用MQ

SpringBoot可以完成自动配置和依赖注入,通过Spring直接获取RabbitMQ的连接对象

1. 消息生产者
  • 相关依赖

    
        org.springframework.boot
        spring-boot-starter-amqp
    
    
        org.springframework.boot
        spring-boot-starter-thymeleaf
    
    
        org.springframework.boot
        spring-boot-starter-web
    
    
    
        org.projectlombok
        lombok
        true
    
    
        org.springframework.boot
        spring-boot-starter-test
        test
    
    
        org.springframework.amqp
        spring-rabbit-test
        test
    
    
  • 配置applicaiton.yml

    server:
      port: 9001
    
    spring:
      application:
        name: producer
      rabbitmq:
        host: 47.113.192.192
        port: 5672
        virtual-host: host1
        username: admin
        password: admin
    
  • 生产者发送消息

    import org.springframework.amqp.core.AmqpTemplate;
    import org.springframework.stereotype.Service;
    import javax.annotation.Resource;
    
    @Service
    public class TestService {
    
        @Resource
        private AmqpTemplate amqpTemplate;
    
        public void senMsg(String msg){
                //1.发送消息到队列
                amqpTemplate.convertAndSend("queue1",msg);
            
                //2.发送消息到交换机(订阅交换机)
                amqpTemplate.convertAndSend("ex1","",msg);
            
                //3.发送消息到交换机(路由交换机)
                amqpTemplate.convertAndSend("ex2","a",msg);
                amqpTemplate.convertAndSend("ex2","b",msg);
        }
    }
    
2. 消息消费者
  • 添加依赖

  • 配置yml

    server:
      port: 9002
    
    spring:
      application:
        name: producer
      rabbitmq:
        host: 47.113.192.192
        port: 5672
        virtual-host: host1
        username: admin
        password: admin
    
  • 消费者接收消息

    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Service;
    
    @Service
    @RabbitListener(queues = {"queue1"})
    public class ReceiveMsgService {
    
        @RabbitHandler
        public void receiveMsg(String msg){
            System.out.println("接收msg:" + msg);
        }
    }
    
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/433753.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号