我们选择在linux下安装
安装的前提需要在虚拟机下安装docker
| docker pull rabbitmq:management(拉去镜像) docker run -d -p 5672:5672 -p 15672:15672 --name rabbitmq rabbitmq:management(设置容器) |
安装完成后访问我们的管理页面 http://192.168.1.111:15672/
192.168.1.111是你们设置的虚拟机地址
安装虚拟机和docker可以翻阅我的文章
显示登录界面表示成功了。账号密码默认 guest 。
2.rabbitmq的概念 2.1.1. 什么是 MQMQ(message queue),从字面意思上看,本质是个队列,FIFO 先入先出,只不过队列中存放的内容是 message 而已,还是一种跨进程的通信机制,用于上下游传递消息。在互联网架构中,MQ 是一种非常常 见的上下游“逻辑解耦+物理解耦”的消息通信服务。使用了 MQ 之后,消息发送上游只需要依赖 MQ,不 用依赖其他服务。
2.1.2. 为什么要用 MQ1.流量消峰 举个例子,如果订单系统最多能处理一万次订单,这个处理能力应付正常时段的下单时绰绰有余,正 常时段我们下单一秒后就能返回结果。但是在高峰期,如果有两万次下单操作系统是处理不了的,只能限 制订单超过一万后不允许用户下单。使用消息队列做缓冲,我们可以取消这个限制,把一秒内下的订单分 散成一段时间来处理,这时有些用户可能在下单十几秒后才能收到下单成功的操作,但是比不能下单的体 验要好。 2.应用解耦 以电商应用为例,应用中有订单系统、库存系统、物流系统、支付系统。用户创建订单后,如果耦合 调用库存系统、物流系统、支付系统,任何一个子系统出了故障,都会造成下单操作异常。当转变成基于 消息队列的方式后,系统间调用的问题会减少很多,比如物流系统因为发生故障,需要几分钟来修复。在 这几分钟的时间里,物流系统要处理的内存被缓存在消息队列中,用户的下单操作可以正常完成。当物流 系统恢复后,继续处理订单信息即可,中单用户感受不到物流系统的故障,提升系统的可用性。
2.2.1. RabbitMQ 的概念RabbitMQ 是一个消息中间件:它接受并转发消息。你可以把它当做一个快递站点,当你要发送一个包 裹时,你把你的包裹放到快递站,快递员最终会把你的快递送到收件人那里,按照这种逻辑 RabbitMQ 是 一个快递站,一个快递员帮你传递快件。RabbitMQ 与快递站的主要区别在于,它不处理快件而是接收, 存储和转发消息数据。
2.2.2. 四大核心概念生产者
产生数据发送消息的程序是生产者
交换机
交换机是 RabbitMQ 非常重要的一个部件,一方面它接收来自生产者的消息,另一方面它将消息 推送到队列中。交换机必须确切知道如何处理它接收到的消息,是将这些消息推送到特定队列还是推 送到多个队列,亦或者是把消息丢弃,这个得有交换机类型决定
队列
队列是 RabbitMQ 内部使用的一种数据结构,尽管消息流经 RabbitMQ 和应用程序,但它们只能存 储在队列中。队列仅受主机的内存和磁盘限制的约束,本质上是一个大的消息缓冲区。许多生产者可 以将消息发送到一个队列,许多消费者可以尝试从一个队列接收数据。这就是我们使用队列的方式
消费者
消费与接收具有相似的含义。消费者大多时候是一个等待接收消息的程序。请注意生产者,消费 者和消息中间件很多时候并不在同一机器上。同一个应用程序既可以是生产者又是可以是消费者。
还有很多很想细节的需要大家查阅互联网的其他信息。
在这里提出一些重点和常用的点。
3.实战测试小dome 3.1准备新建一个maven工程
引入mq的依赖
3.2 用代码演示生产者->队列->消费者关系 一个生产者对应一个消费者com.rabbitmq amqp-client5.8.0 commons-io commons-io2.6
原理图
创建实体类(以后每次的案例都会放在com.dfp.xxx包下 ,每次安排想对应的一组案例都会在同一个xxx包下)
package com.dfp.one;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Producer {
//队列名称
private final static String QUEUE_NAME="dfp_1";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory=new ConnectionFactory();
factory.setHost("192.168.1.111");
factory.setUsername("guest");
factory.setPassword("guest");
//channel实现自动close接口自动关闭,不需要显示
// 创建连接
Connection connection= factory.newConnection();
// 创建信道
Channel channel= connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
String msg="hello,dfp";
channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());//二进制传输
System.out.println("消息发送完毕");
}
}
package com.dfp.one;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer {
private final static String QUEUE_NAME ="dfp_1";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.1.111");
factory.setUsername("guest");
factory.setPassword("guest");
Connection connection = factory.newConnection();
Channel channel= connection.createChannel();
System.out.println("等待消息接收。。。。");
//推送的消息如何进行消费的接口回调
DeliverCallback deliverCallback=(consumerTag,delivert)->{
String msg=new String(delivert.getBody());
System.out.println(msg);
};
//取消消费者的一个回调接口,如何在消费的时候队列删除掉了
CancelCallback cancelCallback=(consumerTag)->{
System.out.println("消息中断。。。");
};
channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
}
}
先启动消费者程序在启动生产者
3.3 一个生产者对应多个消费者工作队列(又称任务队列)的主要思想是避免立即执行资源密集型任务,而不得不等待它完成。 相反我们安排任务在之后执行。我们把任务封装为消息并将其发送到队列。在后台运行的工作进 程将弹出任务并最终执行作业。当有多个工作线程时,这些工作线程将一起处理这些任务。
说白一点,就像排队入场的时候我们只有一条队伍排队但是,电影院的入口有三或者多个个检票员
新建如下:
take是一个生产者
我们将代码重复的部分封装到一个工具类上方便运用
public class take {
private static final String QUEUE_ANME="dfp_1";
public static void main(String[] args) throws Exception{
//创建连接
Channel channel= RabbitMqUtils.getChanne1();
//声明队列初始化
channel.queueDeclare(QUEUE_ANME,false,false,false,null);
//控制台输入
Scanner scanner=new Scanner(System.in);
while (scanner.hasNext()){
String msg= scanner.next();
channel.basicPublish("",QUEUE_ANME,null,msg.getBytes());
System.out.println("发送完毕:"+msg);
}
}
}
public class Work01 {
private final static String QUEUE_NAME ="dfp_1";
public static void main(String[] args) throws IOException, TimeoutException {
// 创建连接
Channel channel= RabbitMqUtils.getChanne1();
//消息的接收 回调
DeliverCallback deliverCallback=(consumerTag,msg)->{
System.out.println("接收到的消息:"+new String(msg.getBody()));
};
//接收的消息被取消执行以下内容
CancelCallback cancelCallback =(consumerTag)->{
System.out.println(consumerTag+"消息发送者取消接口回调");
};
System.out.println("c1等待接收消息...");
channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
}
}
public class Work02 {
private final static String QUEUE_NAME ="dfp_1";
public static void main(String[] args) throws IOException, TimeoutException {
// 创建连接
Channel channel= RabbitMqUtils.getChanne1();
//消息的接收
DeliverCallback deliverCallback=(consumerTag,msg)->{
System.out.println("接收到的消息:"+new String(msg.getBody()));
};
//接收的消息被取消执行以下内容
CancelCallback cancelCallback =(consumerTag)->{
System.out.println(consumerTag+"消息发送者取消接口回调");
};
System.out.println("c2等待接收消息...");
channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
}
}
结果
细心的朋友会发现有的消息丢失了,我们一会来讲怎么解决。即使有数据的丢失但是队列和消费者的消费顺序是按照顺序轮训获取生产者发送的消息。
3.4 消息应答 (解决上述问题)概念:
消费者完成一个任务可能需要一段时间,如果其中一个消费者处理一个长的任务并仅只完成 了部分突然它挂掉了,会发生什么情况。RabbitMQ 一旦向消费者传递了一条消息,便立即将该消 息标记为删除。在这种情况下,突然有个消费者挂掉了,我们将丢失正在处理的消息。以及后续 发送给该消费这的消息,因为它无法接收到。 为了保证消息在发送过程中不丢失,rabbitmq 引入消息应答机制,消息应答就是:消费者在接 收到消息并且处理该消息之后,告诉 rabbitmq 它已经处理了,rabbitmq 可以把该消息删除了。
自动应答
消息发送后立即被认为已经传送成功,这种模式需要在高吞吐量和数据传输安全性方面做权 衡,因为这种模式如果消息在接收到之前,消费者那边出现连接或者 channel 关闭,那么消息就丢 失了,当然另一方面这种模式消费者那边可以传递过载的消息,没有对传递的消息数量进行限制, 当然这样有可能使得消费者这边由于接收太多还来不及处理的消息,导致这些消息的积压,最终 使得内存耗尽,最终这些消费者线程被操作系统杀死,所以这种模式仅适用在消费者可以高效并 以某种速率能够处理这些消息的情况下使用。
消息应答的方法
A.Channel.basicAck(用于肯定确认)
RabbitMQ 已知道该消息并且成功的处理消息,可以将其丢弃了
B.Channel.basicNack(用于否定确认)
C.Channel.basicReject(用于否定确认)
与 Channel.basicNack 相比少一个参数 不处理该消息了直接拒绝,可以将其丢弃了
3.5 Multiple 的解释
手动应答的好处是可以批量应答并且减少网络拥堵multiple 的 true 和 false 代表不同意思 true 代表批量应答 channel 上未应答的消息 比如说 channel 上有传送 tag 的消息 5,6,7,8 当前 tag 是 8 那么此时 5-8 的这些还未应答的消息都会被确认收到消息应答 false 同上面相比 只会应答 tag=8 的消息 5,6,7 这三个消息依然不会被确认收到消息应答
图片解释
3.5消息自动重新入队
如果消费者由于某些原因失去连接(其通道已关闭,连接已关闭或 TCP 连接丢失),导致消息 未发送 ACK 确认,RabbitMQ 将了解到消息未完全处理,并将对其重新排队。如果此时其他消费者 可以处理,它将很快将其重新分发给另一个消费者。这样,即使某个消费者偶尔死亡,也可以确 保不会丢失任何消息
代码实现
新建如下
生产者代码几乎没差别
public class take {
private static final String TACK_QUEUE_ANME="ack-queue";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channe1 = RabbitMqUtils.getChanne1();
//声明队列
channe1.queueDeclare(TACK_QUEUE_ANME,false,false,false,null);
//获取控制台的输入消息
Scanner scanner=new Scanner(System.in);
while (scanner.hasNext()){
String msg= scanner.next();
channe1.basicPublish("",TACK_QUEUE_ANME,null,msg.getBytes("UTF-8"));//防止中文编码错误
System.out.println("产生消息"+new Date()+":" +msg);
}
}
}
设置手动应答
channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
并且关闭自动应答
public class Work03 {
private static final String TACK_QUEUE_ANME="ack-queue";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMqUtils.getChanne1();
System.out.println("C1 等待接收消息处理时间较短");
//接收到消息的回调
DeliverCallback deliverCallback=(consumerTag,message)->{
SleepUtils.sleep(1);
String msg = new String(message.getBody(),"UTF-8");
System.out.println("c1接收的消息:"+new Date() +":"+msg);
//手动应答
channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
};
// 未接收到消息的回调
CancelCallback cancelCallback=(consumerTag)->{
System.out.println("消费者取消消费接收接口回调");
};
boolean autoAck = false;
channel.basicConsume(TACK_QUEUE_ANME,autoAck,deliverCallback,cancelCallback);
}
}
public class Work04 {
private static final String TACK_QUEUE_ANME="ack-queue";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMqUtils.getChanne1();
System.out.println("C2 等待接收消息处理时间较长长长");
//接收到消息的回调
DeliverCallback deliverCallback=(consumerTag, message)->{
SleepUtils.sleep(30);
String msg = new String(message.getBody(),"UTF-8");
System.out.println("c2接收的消息"+new Date() +":"+msg);
//手动应答
channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
};
// 未接收到消息的回调
CancelCallback cancelCallback=(consumerTag)->{
System.out.println("消费者取消消费接收接口回调");
};
boolean autoAck = false;
channel.basicConsume(TACK_QUEUE_ANME,autoAck,deliverCallback,cancelCallback);
}
}
work03和04 的对比差距在 接受到生产者的信息之后处理的时间不同
SleepUtils.sleep(30)让work04等待30执行
并且在等待的图中关闭work04如果按照轮训的方式 88信号原本应该在work04打印,当我们结束了work04 ,88信号回到队列让work03读取
3.6队列持久化应用场景:我们知道如何处理任务不丢失的情况,但是如何保障当 RabbitMQ 服务停掉以后消
息生产者发送过来的消息不丢失。默认情况下 RabbitMQ 退出或由于某种原因崩溃时,它忽视队列 和消息,除非告知它不要这样做。确保消息不会丢失需要做两件事: 我们需要将队列和消息都标 记为持久化 。 承接上述代码注意!!
因为在rabbitmq的服务器中如果我们已经注册的一条队列,我们需要修改他的队列状态,要删除原先的队列重新创建一条队列不然会报错!
删除队列
点击delete
重新启动take的main方法
我们发现ack-queue标记D表示队列持久化
这样就保证了队列的持久化,使得消息在队列中不会消失有了一定的保证。
3.7消息持久化既然有队列的持久化顾名思义就有消息的持久化来保证消息传输的安全不丢失,将消息标记为持久化并不能完全保证不会丢失消息。尽管它告诉 RabbitMQ 将消息保存到磁盘,但是 这里依然存在当消息刚准备存储在磁盘的时候 但是还没有存储完,消息还在缓存的一个间隔点。此时并没
有真正写入磁盘。 要想让消息实现持久化需要在消息生产者修改代码在发,创建发送消息处修改如下 MessageProperties.PERSISTENT_TEXT_PLAIN 添 加这个属性。 4.消息分发方式 4.1不公平分发在上述的几个案例中,我们使用到的分发方式是轮训分发,意识就是在消费者获取消息的时候,是一个消费者处理消息是一个接着一个按顺序交替执行的,即使你处理的慢也是没有影响的。但是这样遇到上述的work03和work04。对于work04处理较慢的速度来说,就会使得整体的处理消息的速度是减慢的了。不公平分发的原理就是速度快的消费者会主动去帮助速度慢的消费者处理信息。
通俗易懂的原理就是,能者多劳。
因为这是处理接受信息的方式所以我们在消费者的代码上进行修改
我们修改信道的。basicQos()属性不给值默认是0,我们设为1表示不公平分发
结果
我们发现第一次发生的aa还没有处理完毕,所有速度快的c1程序处理了后续的所有信息。
4.2预取值分发按照上述的分发原则不公平分发,显然觉得太不公平。对于正常的观点来说,2个机器一个慢一个快,慢的在做一个拿快的就要全部做完了。
因此我们引入取值分发,就是我们设定消费者要处理几条数据,在消费消息堆积的时候。
比如 我让work03消费2个work04消费5个
但是这个消费的数量也不是绝对定义的而是一种动态消费能力只有在消费能力都达到上限才有明显的效果。
修改代码
其实和不公平分发的设置时一样的,当2个工作的取值相同的时候就会根据能力不同安排处理任务。
其实这个道理我理解的来看就是,从总队列中取值到规定的消费者程序中,但是由于消费者处理快慢不同,每个程序先拿了x个任务这x个任务在处理之前再进行一次排队等待。
就好像去医院挂号,在大厅排队挂号了,好了之后去医生科室排队。
5.发布确认的策略原理:
生产者将信道设置成 confirm 模式,一旦信道进入 confirm 模式, 所有在该信道上面发布的 消息都将会被指派一个唯一的 ID (从 1 开始),一旦消息被投递到所有匹配的队列之后,broker 就会发送一个确认给生产者(包含消息的唯一 ID),这就使得生产者知道消息已经正确到达目的队 列了,如果消息和队列是可持久化的,那么确认消息会在将消息写入磁盘之后发出,broker 回传 给生产者的确认消息中 delivery-tag 域包含了确认消息的序列号,此外 broker 也可以设置 basic.ack 的 multiple 域,表示到这个序列号之前的所有消息都已经得到了处理。 confirm 模式最大的好处在于他是异步的,一旦发布一条消息,生产者应用程序就可以在等信 道返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者应用便可以通过回调 方法来处理该确认消息,如果 RabbitMQ 因为自身内部错误导致消息丢失,就会发送一条 nack 消 息,生产者应用程序同样可以在回调方法中处理该 nack 消息。 发布确认的策略有三种单个确认发布 、 批量确认发布和异步确认发布。单个确认发布 :发布一条消息确认一次,可以了解每个发布的情况但是很慢。
批量确认发布:一个发布数量确认一次,较快但是不知道发布的详细情况
异步确认发布:生产者可以不听的发布交由rabbitmq的监听器去监控,如果有消息rabbitmq会返回消息给你。
代码实现:
public class MessageToConfirm {
public static final int MESSAGE_COUNT=1000;
public static void main(String[] args) throws Exception {
/
ConcurrentSkipListMap out=new ConcurrentSkipListMap<>();
//消息发送成功的回调函数
ConfirmCallback ack=( tag, multiple)->{
//2.删除已经确认的消息,剩下的就是未确认的消息
if (multiple){
//如果是批量获取,批量删除
ConcurrentNavigableMap confirmed = out.headMap(tag,true);
confirmed.clear();
}else {
//不是批量,移除当前标记的信息
out.remove(tag);
}
System.out.println("已确认的消息"+tag);
};
//消息发送失败的回调函数
ConfirmCallback nac=(tag,multiple)->{
String message = out.get(tag);
//打印一下未确认的消息是那些
System.out.println("未确认消息的内容"+message+"---->未确认的消息的标记"+tag);
};
//准备消息监听器
channel.addConfirmListener(ack,nac);
long strat=System.currentTimeMillis();
for (int i = 0; i < MESSAGE_COUNT; i++) {
String message="-->异步"+i;
channel.basicPublish("",queue_name,null,message.getBytes());
//不用设置发布确认 boolean flag=channel.waitForConfirms();
//1.记录所有要发送的消息总和
out.put(channel.getNextPublishSeqNo(),message);
}
long end=System.currentTimeMillis();
System.out.println("异步确认消耗时间"+(end-strat)+"ms");
}
}
6.交换机
6.1.Exchanges 概念
RabbitMQ
消息传递模型的核心思想是
:
生产者生产的消息从不会直接发送到队列
。实际上,通常生产 者甚至都不知道这些消息传递传递到了哪些队列中。 相反,生产者只能将消息发送到交换机
(exchange)
,交换机工作的内容非常简单,一方面它接收来 自生产者的消息,另一方面将它们推入队列。交换机必须确切知道如何处理收到的消息。是应该把这些消 息放到特定队列还是说把他们到许多队列中还是说应该丢弃它们。这就的由交换机的类型来决定。
6.2 Exchanges 的类型
总共有以下类型:
直接
(direct),
主题
(topic) ,
标题
(headers) ,
扇出
(fanout)
6.3临时队列
之前的章节我们使用的是具有特定名称的队列
(
还记得
hello
和
ack_queue
吗?
)
。队列的名称我们
来说至关重要
-
我们需要指定我们的消费者去消费哪个队列的消息。
每当我们连接到
Rabbit
时,我们都需要一个全新的空队列,为此我们可以创建一个具有
随机名称
的队列
,或者能让服务器为我们选择一个随机队列名称那就更好了。其次
一旦我们断开了消费者的连
接,队列将被自动删除。 创建临时队列的方式如下:
String queueName = channel.queueDeclare().getQueue();
6.4交换机和队列的绑定关系
什么是 bingding 呢, binding 其实是 exchange 和 queue 之间的桥梁,它告诉我们 exchange 和那个队 列进行了绑定关系。比如说下面这张图告诉我们的就是 X 与 Q1 和 Q2 进行了绑定,绑定了之后这个交换机只能通过识别这个绑定来指定队列发送消息。6.5Fanout 介绍
Fanout 这种类型非常简单。正如从名称中猜到的那样,它是将接收到的所有消息 广播 到它知道的 所有队列中。系统中默认有些 exchange 类型 代码演示 创建如下public class send {
//交换机名称
public static final String EXCHANGE_NAME = "logs";
public static void main(String[] args) throws Exception {
Channel channe1 = RabbitMqUtils.getChanne1();
channe1.exchangeDeclare(EXCHANGE_NAME,"fanout");
//控制台输入消息
Scanner scanner=new Scanner(System.in);
while (scanner.hasNext()){
String msg = scanner.next();
channe1.basicPublish(EXCHANGE_NAME,"",null,msg.getBytes("utf-8"));
System.out.println("生产者发布信息"+msg);
}
}
}
public class receiveA {
//交换机名称
public static final String EXCHANGE_NAME="logs";
public static void main(String[] args) throws Exception{
Channel channe1 = RabbitMqUtils.getChanne1();
channe1.exchangeDeclare(EXCHANGE_NAME,"fanout");
String queueName = channe1.queueDeclare().getQueue();
channe1.queueBind(queueName,EXCHANGE_NAME,"");
System.out.println("等待消息接收....");
//接收消息回调
DeliverCallback deliverCallback=(tag,msg)->{
System.out.println("R_A接收的消息为:"+new String(msg.getBody(),"utf-8"));
};
//消息取消回调 我设为空方法体
channe1.basicConsume(queueName,true,deliverCallback,consumerTag->{});
}
}
public class receiveB {
//交换机名称
public static final String EXCHANGE_NAME="logs";
public static void main(String[] args) throws Exception{
Channel channe1 = RabbitMqUtils.getChanne1();
channe1.exchangeDeclare(EXCHANGE_NAME,"fanout");
String queueName = channe1.queueDeclare().getQueue();
channe1.queueBind(queueName,EXCHANGE_NAME,"");
System.out.println("等待消息接收....");
//接收消息回调
DeliverCallback deliverCallback=(tag, msg)->{
System.out.println("R_B接收的消息为:"+new String(msg.getBody(),"utf-8"));
};
//消息取消回调 我设为空方法体
channe1.basicConsume(queueName,true,deliverCallback,consumerTag->{});}
结果
对比之前的方式,在使用了交换机后并且设置了 fanout的类型,消费者不再是轮训接收消息。每个消息都会获取。
6.6 直接交换机 direct 上一节中的我们的日志系统将所有消息广播给所有消费者,对此我们想做一些改变,例如我们希 望将日志消息写入磁盘的程序仅接收严重错误 (errros) ,而不存储哪些警告 (warning) 或信息 (info) 日志 消息避免浪费磁盘空间。Fanout 这种交换类型并不能给我们带来很大的灵活性 - 它只能进行无意识的 广播,在这里我们将使用 direct 这种类型来进行替换,这种类型的工作方式是,消息只去到它绑定的 routingKey 队列中去。 当然如果 exchange 的绑定类型是 direct , 但是它绑定的多个队列的 key 如果都相同 ,在这种情 况下虽然绑定类型是 direct 但是它表现的就和 fanout 有点类似了 ,就跟广播差不多 代码演示新建如下public class sendDirect {
//交换机名称
public static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] args) throws Exception {
Channel channe1 = RabbitMqUtils.getChanne1();
channe1.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
//控制台输入消息
System.out.println("输入发送的消息:");
Scanner scanner=new Scanner(System.in);
while (scanner.hasNext()){
System.out.println("输入绑定秘钥:");
Scanner exchangeType=new Scanner(System.in);//输入key
String msg = scanner.next();
channe1.basicPublish(EXCHANGE_NAME,exchangeType.next(),null,msg.getBytes("utf-8"));
System.out.println("生产者发布信息"+msg);
}
}
}
public class ReceiveLogsDirect01 {
public static final String EXCHANGE_NAME="direct_logs";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channe1 = RabbitMqUtils.getChanne1();
//声明交换机 绑定类型DIRECT
channe1.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
//声明队列
channe1.queueDeclare("c1",false,false,false,null);
//绑定秘钥 多重绑定方式 表示这个队列可以接受2个秘钥传输信息
channe1.queueBind("c1",EXCHANGE_NAME,"info");
channe1.queueBind("c1",EXCHANGE_NAME,"out");
//接收消息回调
DeliverCallback deliverCallback=(tag,msg)->{
System.out.println("c1接收消息:"+new String(msg.getBody(),"utf-8"));
};
channe1.basicConsume("c1",true,deliverCallback,tagN->{});
}
}
public class ReceiveLogsDirect02 {
public static final String EXCHANGE_NAME="direct_logs";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channe1 = RabbitMqUtils.getChanne1();
//声明交换机 绑定类型
channe1.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
//声明队列
channe1.queueDeclare("c2",false,false,false,null);
//绑定秘钥info
channe1.queueBind("c2",EXCHANGE_NAME,"error");
//接收消息回调
DeliverCallback deliverCallback=(tag,msg)->{
System.out.println("c2接收消息:"+new String(msg.getBody(),"utf-8"));
};
channe1.basicConsume("c2",true,deliverCallback,tagN->{});
}
}
在对于c1队列的交换机我们设置多重绑定
结果如下 6.7主题交换机介绍Topics介绍:
在上一个小节中,我们改进了日志记录系统。我们没有使用只能进行随意广播的 fanout 交换机,而是 使用了 direct 交换机,从而有能实现有选择性地接收日志。尽管使用 direct 交换机改进了我们的系统,但是它仍然存在局限性-比方说我们想接收的日志类型有 info.base 和 info.advantage,某个队列只想 info.base 的消息,那这个时候 direct 就办不到了。这个时候 就只能使用 topic 类型
Topic 的要求 发送到类型是 topic 交换机的消息的 routing_key 不能随意写,必须满足一定的要求,它 必须是一个单 词列表,以点号分隔开 。这些单词可以是任意单词,比如说: "stock.usd.nyse", "nyse.vmw", "quick.orange.rabbit". 这种类型的。当然这个单词列表最多不能超过 255 个字节。 在这个规则列表中,其中有两个替换符是大家需要注意的 *( 星号 ) 可以代替一个单词、 #( 井号 ) 可以替代零个或多个单词 案例介绍:代码案例:
新建如下
public class sendTopic {
//交换机名称
public static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] args) throws Exception {
Channel channe1 = RabbitMqUtils.getChanne1();
channe1.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
//控制台输入消息
System.out.print("输入发送的消息:");
Scanner scanner=new Scanner(System.in);
while (scanner.hasNext()){
System.out.println("输入绑定秘钥:");
Scanner exchangeType=new Scanner(System.in);//输入key
String msg = scanner.next();
channe1.basicPublish(EXCHANGE_NAME,exchangeType.next(),null,msg.getBytes("utf-8"));
System.out.println("生产者发布信息"+msg);
}
}
}
public class R1 {
public static final String EXCHANGE_NAME="topic_logs";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channe1 = RabbitMqUtils.getChanne1();
//声明交换机 名称和类型
channe1.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
//声明队列
String name="R1";
channe1.queueDeclare(name,false,false,false,null);
//队列交换机绑定
channe1.queueBind(name,EXCHANGE_NAME,"*.dfp.*");
System.out.println("等待消息接收");
DeliverCallback deliverCallback=(tag,msg)->{
System.out.println("接收的队列信息:"+new String(msg.getBody(),"utf-8")+"绑定的key"+msg.getEnvelope().getRoutingKey());
};
channe1.basicConsume(name,true,deliverCallback,message->{});
}
}
public class R2 {
public static final String EXCHANGE_NAME="topic_logs";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channe1 = RabbitMqUtils.getChanne1();
//声明交换机 名称和类型
channe1.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
//声明队列
String name="R2";
channe1.queueDeclare(name,false,false,false,null);
//队列交换机绑定
channe1.queueBind(name,EXCHANGE_NAME,"*.*.com");
channe1.queueBind(name,EXCHANGE_NAME,"www.#");
System.out.println("R2等待消息接收");
DeliverCallback deliverCallback=(tag, msg)->{
System.out.println("R2接收的队列信息:"+new String(msg.getBody(),"utf-8")+"绑定的key"+msg.getEnvelope().getRoutingKey());
};
channe1.basicConsume(name,true,deliverCallback,message->{});
}
}
这个类似于直接交换机但是命名更多样
结果
7.死信的概念 死信的意思就是不被消费者消费的队列消息。一般来说,producer 将消息投递到 broker 或者直接到 queue 里了,consumer 从 queue 取出消息进行消费,但某些时候由于特定的 原因导致 queue 中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信自然就有了死信队列。 应用场景: 为了保证订单业务的消息数据不丢失,需要使用到 RabbitMQ 的死信队列机制,当消息 消费发生异常时,将消息投入死信队列中. 还有比如说 : 用户在商城下单成功并点击去支付后在指定时 间未支付时自动失效。 死信的来源:时间过期、队列值满溢出和拒绝接受消息。 7.1时间过期死信代码演示新建如下
正常的producer生产者程序,不同的是在C1设置 正常的接收程序和死信程序,C2直接定义一个接收死信的程序。
代码如下
public class Producer {
private static final String NORMAL_EXCHANGE = "normal_exchange";
public static void main(String[] args) throws Exception{
Channel channel = RabbitMqUtils.getChanne1();
//设置死信消息 设置他的过期时间10s
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();
for (int i = 0; i < 11; i++) {
String msg="info:"+i;
channel.basicPublish(NORMAL_EXCHANGE,"dfp",properties,msg.getBytes());//过期条件
// channel.basicPublish(NORMAL_EXCHANGE,"dfp",null,msg.getBytes());
}
}
}
public class Consumer01 {
//普通交换机
private static final String NORMAL_EXCHANGE = "normal_exchange";
//死信交换机
private static final String DEAD_EXCHANGE = "dead_exchange";
//普通队列
private static final String NORMAL_QUEUE="normal_queue";
//死信队列
private static final String DEAD_QUEUE = "dead_queue";
public static void main(String[] args) throws Exception{
Channel channel = RabbitMqUtils.getChanne1();
//声明交换机 类型都是直接类型
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);// //普通交换机
channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT); //死信交换机
Map arguments=new HashMap<>();
//正常队列设置死信交换机 参数 key 是固定值
arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE);
//正常队列设置死信 routing-key 参数 key 是固定值
arguments.put("x-dead-letter-routing-key", "lisi");
//正常队列设置死信条件,队列最大容量是7 意味着超出7 的消息被认为死信
arguments.put("x-max-length", 7);
//声明队列 我们在其他参数要把死信条件注入进去,当我们在普通队列遇到死信添加的时候就会进行转发
channel.queueDeclare(NORMAL_QUEUE,false,false,false,arguments); //普通队列
channel.queueDeclare(DEAD_QUEUE,false,false,false,null); //死信队列
//交换机和队列绑定
//普通绑定 我们绑定 的key是自定义的dfp
channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"dfp");
//死信绑定 是固定的值 list
channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"lisi");
System.out.println("等待消息接收");
DeliverCallback deliverCallback=(tag,msg)->{
System.out.println("c01消息成功接收:"+new String(msg.getBody(),"utf-8"));
};
channel.basicConsume(NORMAL_QUEUE,true,deliverCallback,logo->{});
}
}
public class Consumer02 {
//死信队列
private static final String DEAD_QUEUE = "dead_queue";
public static void main(String[] args) throws Exception{
Channel channel = RabbitMqUtils.getChanne1();
System.out.println("等待消息接收");
DeliverCallback deliverCallback=(tag,msg)->{
System.out.println("c01消息成功接收:"+new String(msg.getBody(),"utf-8"));
};
channel.basicConsume(DEAD_QUEUE,true,deliverCallback,logo->{});
}
}
我们先启动c1程序初始化交换机和队列然后执行Producer和c2程序 等待10s后 ,Producer发送的消息没人接收但是我们只绑定了dfp的key但是程序可以向c2的死信队列进行转发读取
结果
7.2队列满死信代码演示发送11条数据
我们在上述中设置了长度限制为7我们修改代码进行演示
结果只有0-3的数据进行死信接收了另外的7条在正常队列对接等待处理
7.3 拒绝消息死信 修改如下 生产者代码
消费者代码
设置i=2的消息,在消费者接收到的时候 拒绝接收并且不放回源队列
资料来源于尚硅谷,尚硅谷学习笔记更新ing.....


