rabbitmq是消息传递的中间组件,使用rabbitmq可以降低程序耦合性,流量消峰以及异步处理。
(1)降低耦合性:如当用户调用下单系统时,服务器故障,需几分钟修复,此时用户依旧可以下单,等系统修复后,再继续处理相应任务。
(2)流量消峰:如双十一时,下单系统会承受几万下单量,使用消息队列做缓冲,将一秒用户分为几部份,有的用户可能需等十几秒才能下单成功,但可以避免服务器宕机。
(3)异步处理:如A服务发送请求调用B服务,A不用等B服务处理完,即直接返回成功到用户,B服务处理完,会发送处理成功消息到mq,mq在发送给A服务,A便能收到异步处理成功消息。
rabbitmq入门案例:
1.启动rabbitmq服务(此处以安装在linux上的rabbitmq讲解)
(1)启动服务
/sbin/service rabbitmq-server start
(2)查看服务状态
/sbin/service rabbitmq-server status
(3)关闭linux防火墙
systemctl stop firewalld
如下图表示启动成功
2.写生产者代码(即发送消息端)
(1)创建队列名称
(2)发送消息
①调用ConnectionFactory创建连接工厂
②设置工厂连接ip(即linux上的虚拟ip地址),设置用户名和密码
③调用newConnection()方法创建连接,调用createChannel()方法创建信道
④调用queueDeclare()方法声明队列,该方法有五个参数
第一个参数:队列名称
第二个参数:是否需要保存,消息默认存储中内存中,true开启持久化
第三个参数:队列是否只供一个消费者,是否进行消息共享 ,false只能一个消费者消费
第四个参数:是否自动删除 消费者断开是否自动删除
第五个参数:其他参数
⑤调用basicPublish()方法发送消息,该方法有四个参数
第一:发送到哪个交换机,不能为null
第二:队列名称
第三 :其他参数信息
第四: 发送消息的消息体,要调取二进制
public class Producer {
//队列名称
public static final String QUEUE_NAME="hello";
//发送消息
public static void main(String[] args) throws IOException, TimeoutException {
//创建链接工厂
ConnectionFactory factory=new ConnectionFactory();
//设置工厂ip连接队列
factory.setHost("192.168.23.111");
//设置用户名,密码
factory.setUsername("user");
factory.setPassword("123");
//创建连接
Connection connection = factory.newConnection();
//获取连接信道
Channel channel = connection.createChannel();
//信道连接队列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//准备发消息
String message="hello world";
//发送消息
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
System.out.println("消息发送完毕");
}
}
3.消费者(即接受消息端)
(1)创建队列名称(需要与生产者队列名称一致)
(2)接受消息
①创建连接工厂,设置工厂ip,用户和密码(此处与生产者一样)
②调用newConnection()方法创建连接,调用createChannel()方法创建信道
③接受消息回调函数,第一个参数为消息标记,第二个参数为获取的消息
④消息中断回调函数
⑤调用basicConsume()方法接受消息,共有四个参数:
1.消费队列名称
2.是否自动应答(true代表自动答)
3.消费者未成功消费回调
4.消费者取消消费的回调
//消费者
public class Consumer {
//队列名称,需要和生产者队列名字一样
public static final String QUEUE_NAME="hello";
//接受消息
public static void main(String[] args) throws IOException, TimeoutException {
//连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置工厂ip连接队列
factory.setHost("192.168.23.111");
//设置用户名,密码
factory.setUsername("user");
factory.setPassword("123");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//声明,接受消息
DeliverCallback deliverCallback=(consumerTag,message)->{
System.out.println(new String(message.getBody()));
};
//消息被中断声明
CancelCallback cancelCallback=(consumerTag)->{
System.out.println("消息消费被中断");
};
//接受消息
channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
}
}
4.测试
启动生产者发送消息
启动消费者接收消息
测试成功



