栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 数据挖掘与分析

rabbitmq交换器与ack机制

rabbitmq交换器与ack机制

一、 RabbitMQ交换器

1. Direct案例

DirectExchange 路由策略是将消息队列绑定到 DirectExchange 上,当 一条消息到达DirectExchange 时会被转发到与该条消息
routing key 相同的 Queue 上,例如消息队列名为“hello-queue ”,则 routingkey 为“hello-queue ”的消息会被该消息队列接收。

1.1 创建消费者

创建项目,并添加依赖



org.springframework.boot
spring-boot-starter-web


org.springframework.boot
spring-boot-starter-test
test


org.junit.vintage
junit-vintage-engine




org.springframework.boot
spring-boot-starter-amqp


配置文件

spring.application.name=rabbitmq-demo02
spring.rabbitmq.password=guest
spring.rabbitmq.username=guest
spring.rabbitmq.port=5672
spring.rabbitmq.host=192.168.100.120
# 设置交换器名称
mq.config.exchange=log.direct
# info 队列名称
mq.config.queue.info=log.info
# info 路由键

消费者

package com.bobo.consumer;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(
bindings = @QueueBinding(
value = @Queue(value = "${mq.config.queue.error}",autoDelete = "true")
,exchange = @Exchange(value = "${mq.config.exchange}"
,type = ExchangeTypes.DIRECT)
,key = "${mq.config.queue.error.routing.key}"
)
) p
ublic class ErrorRecevier {
@RabbitHandler
public void process(String msg){
System.out.println("error....recevier:" + msg);
}
} p
ackage com.bobo.consumer;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(
bindings = @QueueBinding(
value = @Queue(value = "${mq.config.queue.info}",autoDelete = "true")
,exchange = @Exchange(value = "${mq.config.exchange}"
,type = ExchangeTypes.DIRECT)
,key = "${mq.config.queue.info.routing.key}"
)
) p
ublic class InfoRecevier {
@RabbitHandler
public void process(String msg){
System.out.println("info....recevier:" + msg);
}
}

1.2 创建生产者
创建一个SpringBoot项目,添加和上面一样的依赖
配置文件有区别,不需要添加队列的配置信息

spring.application.name=rabbitmq-demo03
添加生产者的类
测试效果
spring.rabbitmq.password=guest
spring.rabbitmq.username=guest
spring.rabbitmq.port=5672
spring.rabbitmq.host=192.168.100.120
# 设置交换器名称
mq.config.exchange=log.direct
# info 路由键
mq.config.queue.info.routing.key=log.info.routing.key
# error 路由键
mq.config.queue.error.routing.key=log.error.routing.key

添加生产者的类

package com.bobo.provider;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
@Component
public class Sender {
@Autowired
private AmqpTemplate template;
@Value("${mq.config.exchange}")
private String exchange;
@Value("${mq.config.queue.info.routing.key}")
private String routingKey;
public void send(String msg){
// 发送消息
template.convertAndSend(exchange,routingKey,msg);
}
} @
SpringBootTest
class RabbitmqDemo03ApplicationTests {
@Autowired
private Sender sender;
@Test
void contextLoads() {
sender.send("Hello RabbitMQ .... ");
}
}

测试效果

2. Topic案例

TopicExchange 是比较复杂也比较灵活的 种路由策略,在TopicExchange 中,Queue 通过routingkey 绑定到 TopicExchange 上,当消息到
达 TopicExchange 后,TopicExchange 根据消息的routingkey 消息路由到一个或者多 Queue上,相比direct模式topic会更加的灵活些。

package com.bobo.consumer;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;
@Component
@RabbitListener(
bindings = @QueueBinding(
value = @Queue(value = "${mq.config.queue.error}",autoDelete = "true")
,exchange = @Exchange(
value = "${mq.config.exchange}"
,type = ExchangeTypes.TOPIC)
,key = "*.log.error"
)
) p
ublic class ErrorRecevier {
@RabbitHandler
public void process(String msg){
System.out.println("error ... recevier:" + msg);
}
} p
ackage com.bobo.provider;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
@Component
public class OrderSender {
@Autowired
private AmqpTemplate template;
@Value("${mq.config.exchange}")
private String exchange;

public void send(String msg){
template.convertAndSend(exchange,"Order.log.debug","Order log debug"+msg);
template.convertAndSend(exchange,"Order.log.info","Order log info"+msg);
template.convertAndSend(exchange,"Order.log.error","Order log error"+msg);
template.convertAndSend(exchange,"Order.log.warn","Order log warn"+msg);
}
}
3. Fanout案例

FanoutExchange 的数据交换策略是把所有到达 FanoutExchang 的消息转发给所有与它绑定的Queue ,在这种策略中, routingkey 将
不起任何作用.

package com.bobo.consumer;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;
@Component
@RabbitListener(
bindings = @QueueBinding(
value = @Queue(value = "${mq.config.queue.sms}",autoDelete = "true")
,exchange = @Exchange(value = "${mq.config.exchange}",type = ExchangeTypes.FANOUT)
)
) p
ublic class SmsRecevier {
@RabbitHandler
public void process(String msg){
System.out.println("Sms .... recevider:" + msg);
}
}
p
ackage com.bobo.provider;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
二、RabbitMQ高级 1. 持久化

消息的可靠性是RabbitMQ的一大特色,RabbitMQ是如何保证消息的可靠性的呢?–> 消息的持久化
创建消费者

注意,此时我们需要设置autoDelete=false

创建服务提供者
@Component
public class Sender {
@Autowired
private AmqpTemplate template;
@Value("${mq.config.exchange}")
private String exchange;

public void send(String msg){
template.convertAndSend(exchange,"",msg);
}
}

单元测试
当消费者处理了一段时间的消息之后,断开连接,然后消费者再上线我们发现消费者又能够处理掉下线后提供者发送的消息,保证了消息的
完整性

autoDelete属性可以在@Queue配置也可以在@Exchange配置,具体如下:

@Queue:当所有的消费者客户端连接断开后,是否自定删除队列
true:删除,false:不删除
@Exchange:当所有的绑定队列都不再使用时,是否自动删除交换器
true:删除,false:不删除
2. ACK确认机制 2.1 什么是ACK

如果消息在处理过程中,消费者的服务器在处理消息时出现了异常,那么可能这条正在处理的消息就没有完成消息消费,数据就会丢
失,为了确保数据不会丢失,RabbitMQ支持消息确认机制-ACK

2.2 ACK消息确认机制

ACK(Acknowledge Character)是消费者从RabbitMQ收到消息并处理完成后,反馈给RabbitMQ的,RabbitMQ接收到反馈信息后才会将消息从队列中删除。

  1. 如果一个消费者在处理消息出现了网络不稳定,集群异常等现象,会将消息重新放入队列中。
  2. 如果在集群的情况下,RabbitMQ会立即将这个消息推送给这个在线的其他的消费者,这种机制保障了消费者在服务端故障的时候不会
    丢失任何的数据和任务
  3. 消息永远不会从RabbitMQ中删除:只有当消费者正确发送ACK反馈后,RabbitMQ收到确认后,消息才会从RabbitMQ的服务中删除
  4. 消息的ACK机制默认就是打开的
    ACK的验证
    在服务端我们给出一个错误

    然后我们再去掉错误,发现消息会被正常的消费

    ACK的注意事项
    如果忘记掉ACK,那么后果会比较严重,当Consumer退出时,Message会一直重复分发,然后RabbitMQ会占用越来越多的内存,由于
    RabbitMQ会长时间的运行,因此这个 内存泄漏 是致命的,我们可以通过设置重试次数来防止这个问题,在Consumer的
    application.properties中设置如下参数
spring.rabbitmq.listener.simple.retry.enabled=true
## 设置重试次数
spring.rabbitmq.listener.simple.retry.max-attempts=5
## 重试的间隔时间
spring.rabbitmq.listener.simple.retry.initial-interval=5000
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/278622.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号