无名exchange
channel.basicPublish("","hello",null,message.getBytes());
第一个参数是交换机的名称。空字符串表示默认或无名称交换机:消息能路由发送到队列中其实
是由
routingKey(bindingkey)
绑定
key
指定的,如果它存在的话
临时队列
每当我们连接到 Rabbit
时,我们都需要一个全新的空队列,为此我们可以创建一个具有随机名称
的队列,或者能让服务器为我们选择一个随机队列名称那就更好了。其次一旦我们断开了消费者的连接,队列将被自动删除
String queueName = channel.queueDeclare().getQueue();绑定 (bindings) 什么是 bingding 呢, binding 其实是 exchange 和 queue 之间的桥梁,它告诉我们 exchange 和那个队列进行了绑定关系。比如说下面这张图告诉我们的就是 X 与 Q1 和 Q2 进行了绑定
Fanout 将接收到的所有消息 广播 到它知道的 所有队列中。系统中默认有些 exchange 类型 Direct exchange 日志系统将所有消息广播给所有消费者,对此我们想做一些改变,例如我们希 望将日志消息写入磁盘的程序仅接收严重错误 (errros) ,而不存储哪些警告 (warning) 或信息 (info) 日志消息避免浪费磁盘空间。Fanout 这种交换类型并不能给我们带来很大的灵活性 - 它只能进行无意识的广播,在这里我们将使用 direct 这种类型来进行替换,这种类型的工作方式是,消息只去到它绑定的routingKey 队列中去。
X 绑定了两个队列,绑定类型是 direct 。队列 Q1 绑定键为 orange , 队列 Q2 绑定键有两个 : 一个绑定键为 black ,另一个绑定键为 green. 在这种绑定情况下,生产者发布消息到 exchange 上,绑定键为 orange 的消息会被发布到队列 Q1 。绑定键为 blackgreen 和的消息会被发布到队列 Q2 ,其他消息类型的消息将被丢弃。 多重绑定 当然如果 exchange 的绑定类型是 direct , 但是它绑定的多个队列的 key 如果都相同 ,在这种情 况下虽然绑定类型是 direct 但是它表现的就和 fanout 有点类似了 ,就跟广播差不多
Topic 的要求 发送到类型是 topic 交换机的消息的 routing_key 不能随意写,必须满足一定的要求,它 必须是一个单 词列表,以点号分隔开 。这些单词可以是任意单词,比如说:"stock.usd.nyse", "nyse.vmw", "quick.orange.rabbit".这种类型的。当然这个单词列表最多不能超过 255 个字节。 在这个规则列表中,其中有两个替换符是大家需要注意的 *(星号 )可以代替一个单词 #(井号 )可以替代零个或多个单词
public class EmitLogTopic {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] argv) throws Exception
{try (Channel channel = RabbitUtils.getChannel()) {
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
Map bindingKeyMap = new HashMap<>();
bindingKeyMap.put("quick.orange.rabbit","被队列 Q1Q2 接收到");
bindingKeyMap.put("lazy.orange.elephant","被队列 Q1Q2 接收到");
bindingKeyMap.put("quick.orange.fox","被队列 Q1 接收到");
bindingKeyMap.put("lazy.brown.fox","被队列 Q2 接收到");
bindingKeyMap.put("lazy.pink.rabbit","虽然满足两个绑定但只被队列 Q2 接收一次");
bindingKeyMap.put("quick.brown.fox","不匹配任何绑定不会被任何队列接收到会被丢弃");
bindingKeyMap.put("quick.orange.male.rabbit","是四个单词不匹配任何绑定会被丢弃");
bindingKeyMap.put("lazy.orange.male.rabbit","是四个单词但匹配 Q2");
for (Map.Entry bindingKeyEntry:
bindingKeyMap.entrySet()){String bindingKey =
bindingKeyEntry.getKey();
String message = bindingKeyEntry.getValue();
channel.basicPublish(EXCHANGE_NAME,bindingKey, null,
message.getBytes("UTF-8"));
System.out.println("生产者发出消息" + message);
}
}
}
}
public class ReceiveLogsTopic01 {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] argv) throws Exception {
Channel channel = RabbitUtils.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
//声明 Q1 队列与绑定关系
String queueName="Q1";
channel.queueDeclare(queueName, false, false, false, null);
channel.queueBind(queueName, EXCHANGE_NAME, "*.orange.*");
System.out.println("等待接收消息........... ");
DeliverCallback deliverCallback = (consumerTag, delivery) ->
{ String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" 接 收 队 列 :"+queueName+" 绑 定 键:"+delivery.getEnvelope().getRoutingKey()+",消息:"+message);
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
});
}
}
public class ReceiveLogsTopic02 {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] argv) throws Exception {
Channel channel = RabbitUtils.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
//声明 Q2 队列与绑定关系
String queueName="Q2";
channel.queueDeclare(queueName, false, false, false, null);
channel.queueBind(queueName, EXCHANGE_NAME, "*.*.rabbit");
channel.queueBind(queueName, EXCHANGE_NAME, "lazy.#");
System.out.println("等待接收消息........... ");
DeliverCallback deliverCallback = (consumerTag, delivery) ->
{ String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" 接 收 队 列 :"+queueName+" 绑 定 键:"+delivery.getEnvelope().getRoutingKey()+",消息:"+message);
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
});
}
}
死信队列
某些时候由于特定的
原因导致
queue
中的某些消息无法被消费
,这样的消息如果没有
后续的处理,就变成了死信,有死信自然就有了死信队列
死信的来源
消息
TTL
过期
队列达到最大长度
(
队列满了,无法再添加数据到
mq
中
)
消息被拒绝
(basic.reject
或
basic.nack)
并且
requeue=false
延迟队列
延时队列
,
队列内部是有序的,最重要的特性就体现在它的延时属性上,延时队列中的元素是希望
在指定时间到了以后或之前取出和处理,简单来说,延时队列就是用来存放需要在指定时间被处理的元素的队列。
延迟队列使用场景
1.
订单在十分钟之内未支付则自动取消
2.
新创建的店铺,如果在十天内都没有上传过商品,则自动发送消息提醒。
3.
用户注册成功后,如果三天内没有登陆则进行短信提醒。
4.
用户发起退款,如果三天内没有得到处理则通知相关运营人员。
5.
预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议
RabbitMQ
中的
TTL
TTL
是
RabbitMQ
中一个消息或者队列的属性,表明一条消息或者该队列中的所有
消息的最大存活时间,单位是毫秒。换句话说,如果一条消息设置了 TTL 属性或者进入了设置TTL 属性的队列,那么这条消息如果在TTL 设置的时间内没有被消费,则会成为"死信"。如果同时配置了队列的TTL 和消息的TTL,那么较小的那个值将会被使用,有两种方式设置 TTL。
消息设置TTL
rabbitremplate.convertAndSend(exchange:"X", routingKey :"XC", message, correlationData −>{
correlationData.getMessageProperties().setExpiration(ttlTime);
return correlationData ;
}):
队列设置TTL
//声明队列的TTI
args,put("x-message-tt1") 5000);
return QueueBuilder.durable(QUEUEA).withArguments(args),build();
如果设置了队列的 TTL
属性,那么一旦消息过期,就会被队列丢弃
(
如果配置了死信队列被丢到死信队 列中)
,而第二种方式,消息即使过期,也不一定会被马上丢弃,因为
消息是否过期是在即将投递到消费者
之前判定的
,如果当前队列有严重的消息积压情况,则已过期的消息也许还能存活较长时间;另外,还需 要注意的一点是,如果不设置 TTL
,表示消息永远不会过期,如果将
TTL
设置为
0,则表示除非此时可以 直接投递该消息到消费者,否则该消息将会被丢弃


