栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

分布式-MQ-01 RabiitMQ安装及基本使用

分布式-MQ-01 RabiitMQ安装及基本使用

RabiitMQ安装及基本使用

1 RabiitMQ安装2 MQ概述

2.1 MQ的优势2.2 MQ的劣势 3 常见的MQ产品4 控制台使用

4.1 新增用户及配置权限4.2 Virtual Hosts 配置 5 RabbitMQ的不同工作模式特性剖析

5.1 RabbitMQ快速入门之生产与消费5.2RabbitMQ工作模式之workQueues模式

1 RabiitMQ安装

RabbitMQ官网:https://rabbitmq.com/
适用于linux的安装版本:https://rabbitmq.com/install-rpm.html#downloads
当前最新稳定版本:rabbitmq-server-3.9.13-1.el8.noarch.rpm

#本地学习用的虚拟机是centos7系统,学习测试rabbit没有选择最新版本,本文的安装版本
# rabbitmq-server-3.6.5-1.noarch.rpm
# erlang-18.3-1.el7.centos.x86_64.rpm
# socat-1.7.3.2-5.el7.lux.x86_64.rpm

# 1.上传rpm包至服务器
# 2.安装Erlang
rpm -ivh erlang-18.3-1.el7.centos.x86_64.rpm
# 3.安装RabbitMQ
rpm -ivh socat-1.7.3.2-1.1.el7.x86_64.rpm
rpm -ivh rabbitmq-server-3.6.5-1.noarch.rpm
# 4.开启管理界面及配置
rabbitmq-plugins enable rabbitmq_management
# 修改默认配置信息 vim /usr/lib/rabbitmq/lib/rabbitmq_server-3.6.5/ebin/rabbit.app # 比如修改密码、配置等等,例如:loopback_users 中的 <<"guest">>,只保 留guest
# 5.启动
service rabbitmq-server start # 启动服务 
service rabbitmq-server stop # 停止服务
service rabbitmq-server restart # 重启服务
#设置配置文件
cd /usr/share/doc/rabbitmq-server-3.6.5/ cp rabbitmq.config.example /etc/rabbitmq/rabbitmq.config
#windows浏览器登陆管理台
http://192.168.149.128:15672/
# 6.配置虚拟主机及用户


小插曲:安装 socat-1.7.3.2-1.1.el7.x86_64.rpm时,提示error,需要以来插件。解决办法:无视依赖,直接–force --nodeps继续安装。

开启管理界面:

修改默认配置及开启服务(开放guest账号,loopback_users中的删除<<>>即可):


小插曲:此时尝试登陆rabbitMQ管理台发现返回502服务不可用,但是通过在linux服务器上却能下载到首页面,考虑是防火墙问题或端口未开放。

尝试开放端口及关闭防火墙均无效:

firewall-cmd --zone=public --add-port=15672/tcp --permanent #开放指定端口号
firewall-cmd --zone=public --add-port=5672/tcp --permanent #开放指定端口号,后面接口调用需要
systemctl stop firewalld #关闭防火墙
firewall-cmd --reload #重启防火墙 添加完开放端口一样要重启防火墙

最终发现本地开启了全局代理访问,路由转出门再访问局域网肯定还不行的,关掉代理即可。

2 MQ概述

Message Queue消息队列,实在消息传输的过程中保存消息的容器,多用于分布式系统之间进行通信。

2.1 MQ的优势

应用解耦
系统的耦合度降低,容错率提高,可维护性越高。
当A、B、C服务间本不存在依赖关系,如果A服务异常,在同步调用时,将导致B、C服务无法继续执行。
异步提速
当服务提供的某些能力越来越强大、越来越复杂,势必无法稳定保证快速应答。此时将串行化程序,转为异步处理。
A、B、C服务内部处理MQ的任务,相互间不产生影响。
削峰填谷
MQ功能中极其重要的优势。将瞬时过量的请求,写入MQ进行限流,由消费端自行从MQ拉取任务进行处理。
现在后端服务架构中,常见时延要求不高,但处理任务量比较大的需求。且在无法预知某时间点用户侧并发量时,通过削峰填谷保证服务按照处理能力执行任务。
2.2 MQ的劣势

系统可用性降低
系统引入的外部依赖变多,系统的稳定性降低。一旦MQ宕机,会对业务功能造成影响。
解决办法:搭建集群,当单台MQ宕机时,保证其他机器正常可用。系统复杂度提高
MQ的加入增加了系统复杂度,以前的同步远程调用,现在变成异步调用,需要保证消息不丢失。 3 常见的MQ产品

RabbitMQ -> RocketMQ -> kafka -> DMQ

RabbitMQActiveMQRocketMQKafkaDMQ
公司/社区RabbitApache阿里Apache华为
开发语言ErlangJavaJavaScala&JavaJava
协议支持AMQP,XMPP,SMTP,STOMPOpenWire,STOMP,REST,XMPP,AMQP自定义自定义协议,社区封装了http协议支持基于kafka封装
客户端支持语言官方支持Erlang,Java,Ruby等,社区产出多种API,几乎支持所有语言Java,C,C++,Python,PHP,Perl,.net等Java,C++(不成熟)官方支持Java,社区产出多种API,如PHP,Python等Java、Python等
单机吞吐量万级(其次)万级(最差)十万级(最好)十万级(次之)不详
消息延迟微妙级毫秒级毫秒级毫秒以内毫秒级
功能特性并发能力强,性能极其好,延时低,社区活跃,管理界面丰富老牌产品,成熟度高,文档较多MQ功能比较完备,扩展性佳只支持主要的MQ功能,毕竟是为大数据领域准备的。使用zookeeper作为注册中心,实现配置下发、服务发现;kafka作为分布式发布订阅消息系统[0.8.2]
4 控制台使用 4.1 新增用户及配置权限


用户密码自不必多说,来看看Tags的区别:

超级管理员(administrator)
可登陆管理控制台,可查看所有的信息,并且可以对用户,策略(policy)进行操作。监控者(monitoring)
可登陆管理控制台,同时可以查看rabbitmq节点的相关信息(进程数,内存使用情况,磁盘使用情况等)策略制定者(policymaker)
可登陆管理控制台, 同时可以对policy进行管理。但无法查看节点的相关信息普通管理者(management)
仅可登陆管理控制台,无法看到节点信息,也无法对策略进行管理。其他
无法登陆管理控制台,通常就是普通的生产者和消费者。 4.2 Virtual Hosts 配置

创建 Virtual Hosts设置 Virtual Hosts 权限
5 RabbitMQ的不同工作模式特性剖析

RabbitMQ消息队列都是基于消费者生产者模式组织的。其中的Bcoker、channel、Exchange、Queue等概念如下。

Broker:接收和分发消息的应用,RabbitMQ Server就是 Message BrokerVirtual host:出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中,类似于网络中的 namespace 概念。当多个不同的用户使用同一个 RabbitMQ server 提供的服务时,可以划分出多个vhost,每个用户在自己的 vhost 创建 exchange/queue 等Connection:publisher/consumer 和 broker 之间的TCP 连接Channel:类似于Netty的channel,进程间通过TCP连接时,如果建立长连接,通过socket传输内容,可以大大地减少建立连接的开销。Channel 作为轻量级的 Connection 极大减少了操作系统建立 TCP connection 的开销


RabbitMQ的工作模式,在官网中讲的比较详细 https://www.rabbitmq.com/getstarted.html

5.1 RabbitMQ快速入门之生产与消费

//消费者
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer {
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.149.128");
        connectionFactory.setPort(5672);//5672是RabbitMQ的默认端口号
        connectionFactory.setUsername("root");
        connectionFactory.setPassword("root");
        connectionFactory.setVirtualHost("/asky");

        Connection connection = connectionFactory.newConnection();
        //创建通信管道,相当于TCP中的虚拟连接
        Channel channel = connection.createChannel();

        //创建队列,声明并创建一个队列,如果队列已存在,则使用这个队列
        //第一个参数:队列名称ID
        //第二个参数:是否持久化,false对应不持久化数据,MQ停掉数据就会丢失
        //第三个参数:是否队列私有化,false则代表所有消费者都可以访问,true代表只有第一次拥有它的消费者才能一直使用,其他消费者不让访问
        //第四个:是否自动删除,false代表连接停掉后不自动删除掉这个队列
        //其他额外的参数, null
        channel.queueDeclare("hello",false, false, false, null);

        //从MQ服务器中获取数据

        //创建一个消息消费者
        //第一个参数:队列名
        //第二个参数代表是否自动确认收到消息,false代表手动编程来确认消息,这是MQ的推荐做法
        //第三个参数要传入DefaultConsumer的实现类
        channel.basicConsume("hello", false, new Reciver(channel));

    }
}

class  Reciver extends DefaultConsumer {

    private Channel channel;
    //重写构造函数,Channel通道对象需要从外层传入,在handleDelivery中要用到
    public Reciver(Channel channel) {
        super(channel);
        this.channel = channel;
    }

    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

        String message = new String(body);
        System.out.println("消费者接收到的消息:"+message);

        System.out.println("消息的TagId:"+envelope.getDeliveryTag());
        //false只确认签收当前的消息,设置为true的时候则代表签收该消费者所有未签收的消息
        channel.basicAck(envelope.getDeliveryTag(), false);
    }
}
//生产者
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Producer {
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.149.128");
        connectionFactory.setPort(5672);//5672是RabbitMQ的默认端口号
        connectionFactory.setUsername("root");
        connectionFactory.setPassword("root");
        connectionFactory.setVirtualHost("/asky");

        Connection connection = connectionFactory.newConnection();
        //创建通信管道,相当于TCP中的虚拟连接
        Channel channel = connection.createChannel();

        //创建队列,声明并创建一个队列,如果队列已存在,则使用这个队列
        //第一个参数:队列名称ID
        //第二个参数:是否持久化,false对应不持久化数据,MQ停掉数据就会丢失
        //第三个参数:是否队列私有化,false则代表所有消费者都可以访问,true代表只有第一次拥有它的消费者才能一直使用,其他消费者不让访问
        //第四个:是否自动删除,false代表连接停掉后不自动删除掉这个队列
        //其他额外的参数, null
        channel.queueDeclare("hello",false, false, false, null);

        String message = "zwx";
        //四个参数
        //exchange 交换机,暂时用不到,发布订阅时才会用到
        //队列名称
        //额外的设置属性
        //最后一个参数是要传递的消息字节数组
        channel.basicPublish("","hello", null,message.getBytes());
        channel.close();
        connection.close();
        System.out.println("===发送成功===");
    }
}
5.2RabbitMQ工作模式之workQueues模式


与简单模式的区别在于,工作队列下生产者与消费者可以是多个。而消费端没有topic、groupId一说,queue中的消息将依次被多个消费者消费,默认采用轮询模式。而如果为了区分机器性能和处理能力,可以设置如下设置,当机器处理完消息后再从Queue中拉取消息。

//如果不写basicQos(1),则自动MQ会将所有请求平均发送给所有消费者
//basicQos,MQ不再对消费者一次发送多个请求,而是消费者处理完一个消息后(确认后),在从队列中获取一个新的
channel.basicQos(1);//处理完一个取一个
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/745062.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号