1 RabiitMQ安装2 MQ概述
2.1 MQ的优势2.2 MQ的劣势 3 常见的MQ产品4 控制台使用
4.1 新增用户及配置权限4.2 Virtual Hosts 配置 5 RabbitMQ的不同工作模式特性剖析
5.1 RabbitMQ快速入门之生产与消费5.2RabbitMQ工作模式之workQueues模式
1 RabiitMQ安装RabbitMQ官网:https://rabbitmq.com/
适用于linux的安装版本:https://rabbitmq.com/install-rpm.html#downloads
当前最新稳定版本:rabbitmq-server-3.9.13-1.el8.noarch.rpm
#本地学习用的虚拟机是centos7系统,学习测试rabbit没有选择最新版本,本文的安装版本 # rabbitmq-server-3.6.5-1.noarch.rpm # erlang-18.3-1.el7.centos.x86_64.rpm # socat-1.7.3.2-5.el7.lux.x86_64.rpm # 1.上传rpm包至服务器 # 2.安装Erlang rpm -ivh erlang-18.3-1.el7.centos.x86_64.rpm # 3.安装RabbitMQ rpm -ivh socat-1.7.3.2-1.1.el7.x86_64.rpm rpm -ivh rabbitmq-server-3.6.5-1.noarch.rpm # 4.开启管理界面及配置 rabbitmq-plugins enable rabbitmq_management # 修改默认配置信息 vim /usr/lib/rabbitmq/lib/rabbitmq_server-3.6.5/ebin/rabbit.app # 比如修改密码、配置等等,例如:loopback_users 中的 <<"guest">>,只保 留guest # 5.启动 service rabbitmq-server start # 启动服务 service rabbitmq-server stop # 停止服务 service rabbitmq-server restart # 重启服务 #设置配置文件 cd /usr/share/doc/rabbitmq-server-3.6.5/ cp rabbitmq.config.example /etc/rabbitmq/rabbitmq.config #windows浏览器登陆管理台 http://192.168.149.128:15672/ # 6.配置虚拟主机及用户
小插曲:安装 socat-1.7.3.2-1.1.el7.x86_64.rpm时,提示error,需要以来插件。解决办法:无视依赖,直接–force --nodeps继续安装。
开启管理界面:
修改默认配置及开启服务(开放guest账号,loopback_users中的删除<<>>即可):
小插曲:此时尝试登陆rabbitMQ管理台发现返回502服务不可用,但是通过在linux服务器上却能下载到首页面,考虑是防火墙问题或端口未开放。
尝试开放端口及关闭防火墙均无效:
firewall-cmd --zone=public --add-port=15672/tcp --permanent #开放指定端口号 firewall-cmd --zone=public --add-port=5672/tcp --permanent #开放指定端口号,后面接口调用需要 systemctl stop firewalld #关闭防火墙 firewall-cmd --reload #重启防火墙 添加完开放端口一样要重启防火墙
最终发现本地开启了全局代理访问,路由转出门再访问局域网肯定还不行的,关掉代理即可。
Message Queue消息队列,实在消息传输的过程中保存消息的容器,多用于分布式系统之间进行通信。
应用解耦
系统的耦合度降低,容错率提高,可维护性越高。
当A、B、C服务间本不存在依赖关系,如果A服务异常,在同步调用时,将导致B、C服务无法继续执行。
异步提速
当服务提供的某些能力越来越强大、越来越复杂,势必无法稳定保证快速应答。此时将串行化程序,转为异步处理。
A、B、C服务内部处理MQ的任务,相互间不产生影响。
削峰填谷
MQ功能中极其重要的优势。将瞬时过量的请求,写入MQ进行限流,由消费端自行从MQ拉取任务进行处理。
现在后端服务架构中,常见时延要求不高,但处理任务量比较大的需求。且在无法预知某时间点用户侧并发量时,通过削峰填谷保证服务按照处理能力执行任务。
2.2 MQ的劣势
系统可用性降低
系统引入的外部依赖变多,系统的稳定性降低。一旦MQ宕机,会对业务功能造成影响。
解决办法:搭建集群,当单台MQ宕机时,保证其他机器正常可用。系统复杂度提高
MQ的加入增加了系统复杂度,以前的同步远程调用,现在变成异步调用,需要保证消息不丢失。
3 常见的MQ产品
RabbitMQ -> RocketMQ -> kafka -> DMQ
| RabbitMQ | ActiveMQ | RocketMQ | Kafka | DMQ | |
|---|---|---|---|---|---|
| 公司/社区 | Rabbit | Apache | 阿里 | Apache | 华为 |
| 开发语言 | Erlang | Java | Java | Scala&Java | Java |
| 协议支持 | AMQP,XMPP,SMTP,STOMP | OpenWire,STOMP,REST,XMPP,AMQP | 自定义 | 自定义协议,社区封装了http协议支持 | 基于kafka封装 |
| 客户端支持语言 | 官方支持Erlang,Java,Ruby等,社区产出多种API,几乎支持所有语言 | Java,C,C++,Python,PHP,Perl,.net等 | Java,C++(不成熟) | 官方支持Java,社区产出多种API,如PHP,Python等 | Java、Python等 |
| 单机吞吐量 | 万级(其次) | 万级(最差) | 十万级(最好) | 十万级(次之) | 不详 |
| 消息延迟 | 微妙级 | 毫秒级 | 毫秒级 | 毫秒以内 | 毫秒级 |
| 功能特性 | 并发能力强,性能极其好,延时低,社区活跃,管理界面丰富 | 老牌产品,成熟度高,文档较多 | MQ功能比较完备,扩展性佳 | 只支持主要的MQ功能,毕竟是为大数据领域准备的。 | 使用zookeeper作为注册中心,实现配置下发、服务发现;kafka作为分布式发布订阅消息系统[0.8.2] |
用户密码自不必多说,来看看Tags的区别:
超级管理员(administrator)
可登陆管理控制台,可查看所有的信息,并且可以对用户,策略(policy)进行操作。监控者(monitoring)
可登陆管理控制台,同时可以查看rabbitmq节点的相关信息(进程数,内存使用情况,磁盘使用情况等)策略制定者(policymaker)
可登陆管理控制台, 同时可以对policy进行管理。但无法查看节点的相关信息普通管理者(management)
仅可登陆管理控制台,无法看到节点信息,也无法对策略进行管理。其他
无法登陆管理控制台,通常就是普通的生产者和消费者。
4.2 Virtual Hosts 配置
创建 Virtual Hosts设置 Virtual Hosts 权限
5 RabbitMQ的不同工作模式特性剖析
RabbitMQ消息队列都是基于消费者生产者模式组织的。其中的Bcoker、channel、Exchange、Queue等概念如下。
Broker:接收和分发消息的应用,RabbitMQ Server就是 Message BrokerVirtual host:出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中,类似于网络中的 namespace 概念。当多个不同的用户使用同一个 RabbitMQ server 提供的服务时,可以划分出多个vhost,每个用户在自己的 vhost 创建 exchange/queue 等Connection:publisher/consumer 和 broker 之间的TCP 连接Channel:类似于Netty的channel,进程间通过TCP连接时,如果建立长连接,通过socket传输内容,可以大大地减少建立连接的开销。Channel 作为轻量级的 Connection 极大减少了操作系统建立 TCP connection 的开销
RabbitMQ的工作模式,在官网中讲的比较详细 https://www.rabbitmq.com/getstarted.html
//消费者
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.149.128");
connectionFactory.setPort(5672);//5672是RabbitMQ的默认端口号
connectionFactory.setUsername("root");
connectionFactory.setPassword("root");
connectionFactory.setVirtualHost("/asky");
Connection connection = connectionFactory.newConnection();
//创建通信管道,相当于TCP中的虚拟连接
Channel channel = connection.createChannel();
//创建队列,声明并创建一个队列,如果队列已存在,则使用这个队列
//第一个参数:队列名称ID
//第二个参数:是否持久化,false对应不持久化数据,MQ停掉数据就会丢失
//第三个参数:是否队列私有化,false则代表所有消费者都可以访问,true代表只有第一次拥有它的消费者才能一直使用,其他消费者不让访问
//第四个:是否自动删除,false代表连接停掉后不自动删除掉这个队列
//其他额外的参数, null
channel.queueDeclare("hello",false, false, false, null);
//从MQ服务器中获取数据
//创建一个消息消费者
//第一个参数:队列名
//第二个参数代表是否自动确认收到消息,false代表手动编程来确认消息,这是MQ的推荐做法
//第三个参数要传入DefaultConsumer的实现类
channel.basicConsume("hello", false, new Reciver(channel));
}
}
class Reciver extends DefaultConsumer {
private Channel channel;
//重写构造函数,Channel通道对象需要从外层传入,在handleDelivery中要用到
public Reciver(Channel channel) {
super(channel);
this.channel = channel;
}
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body);
System.out.println("消费者接收到的消息:"+message);
System.out.println("消息的TagId:"+envelope.getDeliveryTag());
//false只确认签收当前的消息,设置为true的时候则代表签收该消费者所有未签收的消息
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
//生产者
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.149.128");
connectionFactory.setPort(5672);//5672是RabbitMQ的默认端口号
connectionFactory.setUsername("root");
connectionFactory.setPassword("root");
connectionFactory.setVirtualHost("/asky");
Connection connection = connectionFactory.newConnection();
//创建通信管道,相当于TCP中的虚拟连接
Channel channel = connection.createChannel();
//创建队列,声明并创建一个队列,如果队列已存在,则使用这个队列
//第一个参数:队列名称ID
//第二个参数:是否持久化,false对应不持久化数据,MQ停掉数据就会丢失
//第三个参数:是否队列私有化,false则代表所有消费者都可以访问,true代表只有第一次拥有它的消费者才能一直使用,其他消费者不让访问
//第四个:是否自动删除,false代表连接停掉后不自动删除掉这个队列
//其他额外的参数, null
channel.queueDeclare("hello",false, false, false, null);
String message = "zwx";
//四个参数
//exchange 交换机,暂时用不到,发布订阅时才会用到
//队列名称
//额外的设置属性
//最后一个参数是要传递的消息字节数组
channel.basicPublish("","hello", null,message.getBytes());
channel.close();
connection.close();
System.out.println("===发送成功===");
}
}
5.2RabbitMQ工作模式之workQueues模式
与简单模式的区别在于,工作队列下生产者与消费者可以是多个。而消费端没有topic、groupId一说,queue中的消息将依次被多个消费者消费,默认采用轮询模式。而如果为了区分机器性能和处理能力,可以设置如下设置,当机器处理完消息后再从Queue中拉取消息。
//如果不写basicQos(1),则自动MQ会将所有请求平均发送给所有消费者 //basicQos,MQ不再对消费者一次发送多个请求,而是消费者处理完一个消息后(确认后),在从队列中获取一个新的 channel.basicQos(1);//处理完一个取一个



