MQ(message queue),从字面意思上看,本质是个队列,FIFO 先入先出,只不过队列中存放的内容是message 而已,还是一种跨进程的通信机制,用于上下游传递消息。在互联网架构中,MQ 是一种非常常见的上下游“逻辑解耦+物理解耦”的消息通信服务。使用了 MQ 之后,消息发送上游只需要依赖 MQ,不用依赖其他服务。
RabbitMQ的优点由于 erlang 语言的高并发特性,性能较好;吞吐量到万级,MQ 功能比较完备,健壮、稳定、易用、跨平台、支持多种语言,社区活跃度高,更新频率快(但是商业版需要收费)
Rabbit的概念RabbitMQ 是一个消息中间件:它接受并转发消息。你可以把它当做一个快递站点,当你要发送一个包裹时,你把你的包裹放到快递站,快递员最终会把你的快递送到收件人那里,按照这种逻辑 RabbitMQ 是一个快递站,一个快递员帮你传递快件。RabbitMQ 与快递站的主要区别在于,它不处理快件而是接收,存储和转发消息数据。
四大核心概念 生产者产生数据发送消息的程序 消费者
消费者大多时候是一个等待接收消息的程序,生产者消费者很多时候都不在同一个机器,同一个机器可以即使生产者又是消费者 交换机
一方面接收生产者的消息,另一方面将消息推送到队列中,可以将消息推送到特定队列或者多个队列,或者将消息丢弃 队列
RabbitMQ存储消息的一种数据结构,生产者发送的消息只能催存储在队列中,队列仅仅受主机内存以及磁盘的限制约束,本质上是一个很大的消息缓冲区,许多生产者可以将消息发送到一个队列,许多消费者可以从一个队列接收数据。 RabbitMQ的安装
官网地址
https://www.rabbitmq.com/download.html
文件上传
上传到/usr/local/software 目录下(如果没有 software 需要自己创建)
安装文件(分别按照以下顺序安装)
rpm -ivh erlang-21.3-1.el7.x86_64.rpmyum install socat -yrpm -ivh rabbitmq-server-3.8.8-1.el7.noarch.rpm
常用命令(按照以下顺序执行)
添加开机启动 RabbitMQ 服务 chkconfig rabbitmq-server on启动服务 /sbin/service rabbitmq-server start查看服务状态 /sbin/service rabbitmq-server status停止服务(选择执行) /sbin/service rabbitmq-server stop开启 web 管理插件(在页面访问RabbitMQ的管理界面)rabbitmq-plugins enable rabbitmq_management
访问RabbitMQ的管理界面 http://{安装RabbitMQ机器的IP地址}:15672/,用默认的账号密码guest会出现权限问题(关闭防火墙或开放端口)
不关闭防火墙的话,需要开放15672和5672两个端口,一个是连接控制台,一个是连接服务
firewall-cmd --zone=public --add-port=15672/tcp --permanent
firewall-cmd --zone=public --add-port=5672/tcp --permanent
firewall-cmd --reload
如果是云服务器的话,还需要在管理端开放者两个端口
此处添加新的用户访问
创建账号 rabbitmqctl add_user admin 123设置用户角色 rabbitmqctl set_user_tags admin administrator设置用户权限
set_permissions [-p用户 user_admin 具有/vhost1 这个 virtual host 中所有资源的配置、写、读权限] rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"
当前用户和角色 rabbitmqctl list_users再次使用admin登陆。
重置命令
关闭应用的命令为 rabbitmqctl stop_app清除的命令为 rabbitmqctl reset重新启动命令为 rabbitmqctl start_app
2. 创建消息的生产者com.rabbitmq amqp-client 5.8.0 commons-io commons-io 2.6
public class Producer {
// 队列名称
public static final String QUERE_NAME = "hello";
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 工厂IP,连接RabbitMQ队列(安装RabbitMQ机器的IP地址)
factory.setHost("xxx.xxx.xxx.xxx");
//用户名
factory.setUsername("username");
//密码
factory.setPassword("password");
// 创建连接
Connection connection = factory.newConnection();
// 获取信道
Channel channel = connection.createChannel();
channel.queueDeclare(QUERE_NAME,false,false,false,null);
String message = "hello world ";
channel.basicPublish("",QUERE_NAME,null,message.getBytes());
}
}
3. 创建消息的消费者
public class Consumer {
// 队列名称
public static final String QUERE_NAME = "hello";
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 工厂IP,连接RabbitMQ队列
factory.setHost("xxx.xxx.xxx.xxx");
//用户名
factory.setUsername("username");
//密码
factory.setPassword("password");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 推送的消息如何进行消费的接口回调
DeliverCallback deliverCallback = (consumerTag,message)->{
System.out.println(new String(message.getBody()));
};
// 取消消息时的回调
CancelCallback cancelCallback = consumerTag -> {
System.out.println("消息消费被中断");
};
channel.basicConsume(QUERE_NAME,true,deliverCallback,cancelCallback);
}
}
4. 启动消费者与生产者查看结果
生产者Consumer输出hello world
RabbitMQ管理页面有生成的新队列
RabbitMQ的工作队列
轮流接收消息
仿照上面的程序写两个消费者消费同一个队列或者启动两个工作线程,我这里使用idea启动两个工作线程,勾选下方的标识,允许启动多个线程
public class Worker01 {
// 队列名称
public static final String QUERE_NAME = "hello";
public static void main(String[] args) throws IOException, TimeoutException {
xxxxxx上述的方式连接RabbitMQ,生成信道xxxxxxx
// 推送的消息如何进行消费的接口回调
DeliverCallback deliverCallback = (consumerTag, message) -> {
System.out.println("接收到的消息:" + new String(message.getBody()));
};
// 取消消息时的回调
CancelCallback cancelCallback = consumerTag -> {
System.out.println("消息消费被中断");
};
System.out.println("c1等待接收消息。。。。。");
//消息接收
channel.basicConsume(QUERE_NAME, true, deliverCallback, cancelCallback);
}
}
编写生产者并启动
public class Task01 {
public static final String QUERE_NAME = "hello";
public static void main(String[] args) throws IOException, TimeoutException {
xxxxxx上述的方式连接RabbitMQ,生成信道xxxxxxx
// 队列的声明
channel.queueDeclare(QUERE_NAME,false,false,false,null);
// 从控制台接收信息
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()){
String message = scanner.next();
channel.basicPublish("",QUERE_NAME,null,message.getBytes());
System.out.println("发送消息完成:"+message);
}
}
}
通过生产者发送多个消息,查看两个消费者的消费情况,通过程序执行发现生产者总共发送 6个消息,消费者 1 和消费者 2 分别分得3个消息,并且是按照有序的一个接收一次消息
尚硅谷B站RabbitMQ教程:尚硅谷RabbitMQ



