RabbitMQ 是一个消息中间件:它接受并转发消息。你可以把它当做一个快递站点,当你要发送一个包 裹时,你把你的包裹放到快递站,快递员最终会把你的快递送到收件人那里,按照这种逻辑 RabbitMQ 是 一个快递站,一个快递员帮你传递快件。RabbitMQ 与快递站的主要区别在于,它不处理快件而是接收, 存储和转发消息数据。
一、四大核心概念生产者 :产生数据发送消息的程序是生产者
交换机 :交换机是 RabbitMQ 非常重要的一个部件,一方面它接收来自生产者的消息,另一方面它将消息 推送到队列中。交换机必须确切知道如何处理它接收到的消息,是将这些消息推送到特定队列还是推 送到多个队列,亦或者是把消息丢弃,这个得有交换机类型决定
队列 :队列是 RabbitMQ 内部使用的一种数据结构,尽管消息流经 RabbitMQ 和应用程序,但它们只能存 储在队列中。队列仅受主机的内存和磁盘限制的约束,本质上是一个大的消息缓冲区。许多生产者可 以将消息发送到一个队列,许多消费者可以尝试从一个队列接收数据。这就是我们使用队列的方式 消费者 消费与接收具有相似的含义。
消费者:大多时候是一个等待接收消息的程序。请注意生产者,消费 者和消息中间件很多时候并不在同一机器上。同一个应用程序既可以是生产者又是可以是消费者。
二、RabbitMQ安装1)官网地址 https://www.rabbitmq.com/download.html
2)文件上传 上传到/local/software 目录下(自定义创建目录存放rabbitmq安装文件)
3)安装文件(分别按照以下顺序安装)
#首先安装rabbitmq需要的erlang环境;建议使用 centos7.X版本的系统 rpm -ivh erlang-21.3-1.el7.x86_64.rpm yum install socat -y rpm -ivh rabbitmq-server-3.8.8-1.el7.noarch.rpm 常用命令(按照以下顺序执行) 添加开机启动 RabbitMQ 服务: chkconfig rabbitmq-server on 启动服务: /sbin/service rabbitmq-server start 查看服务状态: /sbin/service rabbitmq-server status
停止服务(选择执行)
/sbin/service rabbitmq-server stop
开启 web 管理插件
rabbitmq-plugins enable rabbitmq_management
用默认账号密码(guest)访问地址 http://192.168.0.195:15672/出现权限问题
添加一个新的用户 #创建账号(用户名:admin 密码:123) rabbitmqctl add_user admin 123 设置用户角色 rabbitmqctl set_user_tags admin administrator 设置用户权限 rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*" 用户 user_admin 具有/vhost1 这个 virtual host 中所有资源的配置、写、读权限 查看当前用户和角色 rabbitmqctl list_users
可使用admin 、123登录rabbitmq后台了!
关闭应用的命令为:rabbitmqctl stop_app 清除的命令为:rabbitmqctl reset 重新启动命令为:rabbitmqctl start_app三、hello world
1)pom.xml依赖
org.apache.maven.plugins
maven-compiler-plugin
8
8
com.rabbitmq
amqp-client
5.8.0
commons-io
commons-io
2.6
2)生产者
package com.st.rabbitmq.one_test;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Producer {
private final static String QUEUE_NAME = "hello";//队列名称
public static void main(String[] args) throws Exception {
//创建一个连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.0.195");
factory.setUsername("admin");
factory.setPassword("123");
//channel 实现了自动 close 接口 自动关闭 不需要显示关闭
try(Connection connection = factory.newConnection(); Channel channel =
connection.createChannel()) {
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
String message="This is rabbitmq send first message!";
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
System.out.println("消息发送完毕");
}
}
}
3)消费者
package com.st.rabbitmq.one_test;
import com.rabbitmq.client.*;
import com.st.rabbitmq.Util.RabbitMqUtils;
public class Consumer {
private final static String QUEUE_NAME = "hello";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
System.out.println("等待接收消息....");
//推送的消息如何进行消费的接口回调
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody());
System.out.println(message);
};
//取消消费的一个回调接口 如在消费的时候队列被删除掉了
CancelCallback cancelCallback = (consumerTag) -> {
System.out.println("消息消费被中断...");
};
channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
}
}
4)RabbitMqUtils工具类
package com.st.rabbitmq.Util;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class RabbitMqUtils {
//得到一个连接的 channel
public static Channel getChannel() throws Exception{
//创建一个连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.0.195");
factory.setUsername("admin");
factory.setPassword("123");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
return channel;
}
}



