栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

消息队列RabbitMQ

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

消息队列RabbitMQ

微服务间通讯有同步和异步两种方式

同步通讯:就像打电话,需要实时响应。
同步调用的优点:
- 时效性较强,可以立即得到结果
同步调用的问题:
- 耦合度高
- 性能和吞吐能力下降
- 有额外的资源消耗
- 有级联失败问题

异步通讯:就像发邮件,不需要马上回复。
好处:
- 吞吐量提升:无需等待订阅者处理完成,响应更快速
- 故障隔离:服务没有直接调用,不存在级联失败问题
- 调用间没有阻塞,不会造成无效的资源占用
- 耦合度极低,每个服务都可以灵活插拔,可替换
- 流量削峰:不管发布事件的流量波动多大,都由Broker接收,订阅者可以按照自己的速度去处理事件
缺点:
- 架构复杂了,业务没有明显的流程线,不好管理
- 需要依赖于Broker的可靠、安全、性能

RabbitMQ角色
  • publisher:生产者
  • consumer:消费者
  • exchange:交换机,负责消息路由
  • queue:队列,存储消息
  • virtualHost:虚拟主机,隔离不同租户的exchange、queue、消息的隔离
基本消息队列

基本消息队列的消息发送流程

1.建立connection
2. 创建channel
3. 利用channel声明队列
4. 利用channel向队列发送消息

基本消息队列的消息接收流程

1.建立connection
2. 创建channel
3. 利用channel声明队列
4. 定义consumer的消费行为handleDelivery()
5. 利用channel将消费者与队列绑定

SpringAMQP

SpringAMQP是基于RabbitMQ封装的一套模板,并且还利用SpringBoot对其实现了自动装配,使用起来非常方便。

1.简单消息模型步骤

1.引入依赖


        org.springframework.boot
        spring-boot-starter-amqp
    

2.修改配置

spring:
	  rabbitmq:
		host: 192.168.56.134
		port: 5672
		virtual-host: /
		username: itcast
		password: 123321

3.发送消息:

 // 队列名称
    String queueName = "simple.queue";
    // 消息
    String message = "hello, spring amqp!-miaoxiaowen";
    // 发送消息
    rabbitTemplate.convertAndSend(queueName, message);

4.接收消息:

@Component
public class SpringRabbitListener {
	@RabbitListener(queues = "simple.queue")
	public void listenSimpleQueueMessage(String msg) throws InterruptedException {
		System.out.println("spring 消费者接收到消息:【" + msg + "】");
}}
2.WorkQueue 工作任务模型

让多个消费者绑定到一个队列,共同消费队列中的消息
1.创建多个RabbitListener监听同一个队列
2.多个消费者绑定到一个队列,同一条消息只会被一个消费者处理
3.消息是平均分配给每个消费者,并没有考虑到消费者的处理能力。
4.通过设置prefetch来控制消费者预取的消息数量

3.发布/订阅

同一条消息可以被多个消费者消费
Publisher:生产者将消息发给交换机
Exchange:交换机,接收生产者发送的消息,并且根据类型处理消息到队列中。Exchange(交换机)只负责转发消息,不具备存储消息的能力
- Fanout:广播,将消息交给所有绑定到交换机的队列
- Direct:定向,把消息交给符合指定routing key 的队列
- Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列
Queue:消息队列。
Consumer:消费者

Fanout广播模式

消息发送流程是这样的:
1可以有多个队列
2每个队列都要绑定到Exchange(交换机)
3生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定
4交换机把消息发送给绑定过的所有队列
5订阅队列的消费者都能拿到消息

实现
1.声明队列和交换机

			@Configuration
			public class FanoutConfig {
				
				@Bean
				public FanoutExchange fanoutExchange(){
					return new FanoutExchange("itcast.fanout");
				}

				
				@Bean
				public Queue fanoutQueue1(){
					return new Queue("fanout.queue1");
				}

				
				@Bean
				public Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange){
					return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
				}

				
				@Bean
				public Queue fanoutQueue2(){
					return new Queue("fanout.queue2");
				}

				
				@Bean
				public Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange){
					return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
				}
			}

2.发送消息

			 // 队列名称
			String exchangeName = "itcast.fanout";
			// 消息
			String message = "hello, everyone!";
			rabbitTemplate.convertAndSend(exchangeName, "", message);

3.接收消息

			@RabbitListener(queues = "fanout.queue1")
			public void listenFanoutQueue1(String msg) {
				System.out.println("消费者1接收到Fanout消息:【" + msg + "】");
			}

			@RabbitListener(queues = "fanout.queue2")
			public void listenFanoutQueue2(String msg) {
				System.out.println("消费者2接收到Fanout消息:【" + msg + "】");
			}
交换机的作用

接收publisher发送的消息
将消息按照规则路由到与之绑定的队列
不能缓存消息,路由失败,消息丢失
FanoutExchange的会将消息路由到每个绑定的队列

Direct模型

不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。
在Direct模型下:
队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)
消息的发送方在 向 Exchange发送消息时,也必须指定消息的 RoutingKey。
Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的 Routing key完全一致,才会接收到消息

实现:
1.声明交换机

@RabbitListener(bindings = @QueueBinding(
			value = @Queue(name = "direct.queue1"),
			exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),
			key = {"red", "blue"}
		))
		public void listenDirectQueue1(String msg){
			System.out.println("消费者接收到direct.queue1的消息:【" + msg + "】");
		}

		@RabbitListener(bindings = @QueueBinding(
			value = @Queue(name = "direct.queue2"),
			exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),
			key = {"red", "yellow"}
		))
		public void listenDirectQueue2(String msg){
			System.out.println("消费者接收到direct.queue2的消息:【" + msg + "】");
		}

2.消息发送

		String exchangeName = "itcast.direct";
		// 消息
		String message = "红色警报!日本乱排核废水,导致海洋生物变异,惊现哥斯拉!";
		// 发送消息
		rabbitTemplate.convertAndSend(exchangeName, "red", message);
Direct交换机与Fanout交换机的差异

Fanout交换机将消息路由给每一个与之绑定的队列
Direct交换机根据RoutingKey判断路由给哪个队列
如果多个队列具有相同的RoutingKey,则与Fanout功能类似
基于@RabbitListener注解声明队列和交换机有哪些常见注解?
@Queue
@Exchange

Topic:通配符模式

通配符规则:
#:匹配一个或多个词
*:匹配不多不少恰好1个词

实现:
1.声明绑定交换机

@RabbitListener(bindings = @QueueBinding(
		value = @Queue(name = "topic.queue1"),
		exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC),
		key = "china.#"
	))
	public void listenTopicQueue1(String msg){
		System.out.println("消费者接收到topic.queue1的消息:【" + msg + "】");
	}

	@RabbitListener(bindings = @QueueBinding(
		value = @Queue(name = "topic.queue2"),
		exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC),
		key = "#.news"
	))
	public void listenTopicQueue2(String msg){
		System.out.println("消费者接收到topic.queue2的消息:【" + msg + "】");
	}
Direct交换机与Topic交换机的差异

Topic交换机接收的消息RoutingKey必须是多个单词,以 **.** 分割
Topic交换机与队列绑定时的bindingKey可以指定通配符
#:代表0个或多个词
*:代表1个词

配置JSON转换器

引入依赖


    com.fasterxml.jackson.dataformat
    jackson-dataformat-xml
    2.9.10

注入bean

@Bean
	public MessageConverter jsonMessageConverter(){
		return new Jackson2JsonMessageConverter();
	}
消息的可靠性

常见的丢失原因包括:
发送时丢失:
生产者发送的消息未送达exchange
消息到达exchange后未到达queue
MQ宕机,queue将消息丢失
consumer接收到消息后未消费就宕机

生产者确认机制

消息发送到MQ以后,会返回一个结果给发送者,表示消息是否处理成功。需要给每个消息设置一个全局唯一id,以区分不同消息,避免ack冲突。
返回结果有两种方式
publisher-confirm,发送者确认
- 消息成功投递到交换机,返回ack
- 消息未投递到交换机,返回nack
publisher-return,发送者回执
- 消息投递到交换机了,但是没有路由到队列。返回ACK,及路由失败原因。

实现
1.修改配置

spring:
	rabbitmq:
		publisher-confirm-type: correlated  #开启publisher-confirm  两种类型:simple:同步等待confirm结果,直到超时; correlated:异步回调,定义ConfirmCallback,MQ返回结果时会回调这个ConfirmCallback
		publisher-returns: true #开启publish-return功能,同样是基于callback机制,不过是定义ReturnCallback
		template:
		mandatory: true  #定义消息路由失败时的策略。true,则调用ReturnCallback;false:则直接丢弃消息

2.定义Return回调
每个RabbitTemplate只能配置一个ReturnCallback,因此需要在项目加载时配置

		@Slf4j
		@Configuration
		public class CommonConfig implements ApplicationContextAware {
			@Override
			public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
				// 获取RabbitTemplate
				RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
				// 设置ReturnCallback
				rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
					// 投递失败,记录日志
					log.info("消息发送失败,应答码{},原因{},交换机{},路由键{},消息{}",
							 replyCode, replyText, exchange, routingKey, message.toString());
					// 如果有业务需要,可以重发消息
				});
			}
		}

3.定义ConfirmCallback

ConfirmCallback可以在发送消息时指定,因为每个业务处理confirm成功或失败的逻辑不一定相同。
		 // 1.消息体
		String message = "hello, spring amqp!";
		// 2.全局唯一的消息ID,需要封装到CorrelationData中
		CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
		// 3.添加callback
		correlationData.getFuture().addCallback(
			result -> {
				if(result.isAck()){
					// 3.1.ack,消息成功
					log.debug("消息发送成功, ID:{}", correlationData.getId());
				}else{
					// 3.2.nack,消息失败
					log.error("消息发送失败, ID:{}, 原因{}",correlationData.getId(), result.getReason());
				}
			},
			ex -> log.error("消息发送异常, ID:{}, 原因{}",correlationData.getId(),ex.getMessage())
		);
		// 4.发送消息
		rabbitTemplate.convertAndSend("task.direct", "task", message, correlationData);
mq持久化

交换机持久化
通过代码指定交换机持久化:

@Bean
	public DirectExchange simpleExchange(){
		// 三个参数:交换机名称、是否持久化、当没有queue与其绑定时是否自动删除
		return new DirectExchange("simple.direct", true, false);
	}	

队列持久化

@Bean
	public Queue simpleQueue(){
		// 使用QueueBuilder构建队列,durable就是持久化的
		return QueueBuilder.durable("simple.queue").build();
	}

消息持久化
设置消息的属性(MessageProperties),指定delivery-mode:1:非持久化;2:持久化

消费者确认机制

manual:手动ack,需要在业务代码结束后,调用api发送ack。
auto:自动ack,由spring监测listener代码是否出现异常,没有异常则返回ack;抛出异常则返回nack
none:关闭ack,MQ假定消费者获取消息后会成功处理,因此消息投递后立即被删除
- none模式下,消息投递是不可靠的,可能丢失
- auto模式类似事务机制,出现异常时返回nack,消息回滚到mq;没有异常,返回ack
- manual:自己根据业务情况,判断什么时候该ack
一般,我们都是使用默认的auto即可。

实现:
1.修改配置

spring:
  rabbitmq:
	listener:
	  simple:
		acknowledge-mode: auto # 关闭ack
失败重试机制

本地重试:利用Spring的retry机制,在消费者出现异常时利用本地重试,而不是无限制的requeue到mq队列
1.修改配置:

		spring:
		  rabbitmq:
			listener:
			  simple:
				retry:
				  enabled: true # 开启消费者失败重试
				  initial-interval: 1000 # 初识的失败等待时长为1秒
				  multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
				  max-attempts: 3 # 最大重试次数
				  stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false

2。开启本地重试时,消息处理过程中抛出异常,不会requeue到队列,而是在消费者本地重试,重试达到最大次数后,Spring会返回ack,消息会被丢弃。
3.在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有MessageRecovery接口来处理,它包含三种不同的实现:
RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式
ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队
RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机

				@Configuration
				public class ErrorMessageConfig {
					1)在consumer服务中定义处理失败消息的交换机和队列
					@Bean
					public DirectExchange errorMessageExchange(){
						return new DirectExchange("error.direct");
					}
					@Bean
					public Queue errorQueue(){
						return new Queue("error.queue", true);
					}
					@Bean
					public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){
						return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
					}
					2)定义一个RepublishMessageRecoverer,关联队列和交换机
					@Bean
					public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
						return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
					}
				}
死信交换机

死信:当一个队列中的消息满足下列情况之一时,可以成为死信(dead letter):
- 消费者使用basic.reject或 basic.nack声明消费失败,并且消息的requeue参数设置为false
- 消息是一个过期消息,超时无人消费(消息所在的队列设置了超时时间,消息本身设置了超时时间)
- 要投递的队列消息满了,无法投递

死信交换机的使用场景
如果队列绑定了死信交换机,死信会投递到死信交换机;
可以利用死信交换机收集所有消费者处理失败的消息(死信),交由人工处理,进一步提高消息队列的可靠性。

死信队列:消息被消费者拒绝后进入死信交换机,然后进行死信队列,专门的服务取出死信进行处理

TTL(死信队列,延迟队列)

场景:消费者延迟收到消息的效果
延迟发送短信
用户下单,如果用户在15 分钟内未支付,则自动取消
预约工作会议,20分钟后自动通知所有参会人员

消息超时的两种方式
- 给队列设置ttl属性,进入队列后超过ttl时间的消息变为死信
- 给消息设置ttl属性,队列接收到消息超过ttl时间后变为死信
如何实现发送一个消息20秒后消费者才收到消息?
- 给消息的目标队列(没有消费者)指定死信交换机
- 将消费者监听的队列绑定到死信交换机
- 发送消息时给消息设置超时时间为20秒

延迟队列(插件)

声明一个交换机,交换机的类型可以是任意类型,只需要设定delayed属性为true即可,然后声明队列与其绑定即可。发送消息时,一定要携带x-delay属性,指定延迟的时间:
声明一个交换机,添加delayed属性为true
发送消息时,添加x-delay头,值为超时时间

惰性队列

当生产者发送消息的速度超过了消费者处理消息的速度,就会导致队列中的消息堆积,直到队列存储消息达到上限。之后发送的消息就会成为死信,可能会被丢弃,这就是消息堆积问题。
惰性队列的特征如下:

  • 接收到消息后直接存入磁盘而非内存
  • 消费者要消费消息时才会从磁盘中读取并加载到内存
  • 支持数百万条的消息存储

设置一个队列为惰性队列,只需要在声明队列时,指定x-queue-mode属性为lazy即可。
1.基于bean:.lazy()
2.基于@RabbitListener arguments

惰性队列的优点
基于磁盘存储,消息上限高
没有间歇性的page-out,性能比较稳定

惰性队列的缺点
基于磁盘存储,消息时效性会降低
性能受限于磁盘的IO

解决消息堆积

增加更多消费者,提高消费速度。也就是我们之前说的work queue模式
扩大队列容积,提高堆积上限

消息堆积问题的解决方案

队列上绑定多个消费者,提高消费速度
使用惰性队列,可以再mq中保存更多消息

RabbitMQ集群 普通集群

是一种分布式集群,将队列分散到集群的各个节点,从而提高整个集群的并发能力。
会在集群的各个节点间共享部分数据,包括:交换机、队列元信息。不包含队列中的消息。
当访问集群某节点时,如果队列不在该节点,会从数据所在节点传递到当前节点并返回
队列所在节点宕机,队列中的消息就会丢失

镜像集群:
是一种主从集群,普通集群的基础上,添加了主从备份功能,提高集群的数据可用性。
交换机、队列、队列中的消息会在各个mq的镜像节点之间同步备份。
创建队列的节点被称为该队列的主节点,备份到的其它节点叫做该队列的镜像节点。
一个队列的主节点可能是另一个队列的镜像节点
所有操作都是主节点完成,然后同步给镜像节点
主宕机后,镜像节点会替代成新的主

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/832048.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号