栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

RabbitMQ-消息回退/集群搭建

RabbitMQ-消息回退/集群搭建

在https://editor.csdn.net/md/?articleId=123689998中已经介绍了部分RabbitMQ的相关知识,但是都是基于Rabbit MQ正常工作,在生产环境中由于一些不明原因,导致rabbitmq重启,在RabbitMQ重启期间生产者消息投递失败,导致消息丢失,需要手动处理和恢复,这种情况下会出现如下的报错信息。如何才能进行RabbitMQ的消息可靠投递呢?特别是在这样比较极端的情况,RabbitMQ集群不可用的时候,无法投递的消息该如何处理呢?
所以在RabbitMQ中设置消息回退机制,可以解决消息丢失的问题。

应用[xxx]在[08-1516:36:04]发生[错误日志异常],alertId=[xxx]。由[org.springframework.amqp.rabbit.listener.BlockingQueueConsumer:start:620]触发。
应用xxx 可能原因如下 服务名为: 异常为:org.springframework.amqp.rabbit.listener.BlockingQueueConsumer:start:620, 
产生原因如下:1.org.springframework.amqp.rabbit.listener.QueuesNotAvailableException: Cannot prepare queue for listener. Either the queue doesn't exist or the broker will not allow us to use it.||Consumer received fatal=false exception on startup:

文章目录

RabbitMQ发生异常的处理

交换机确认:

需要实现一个回调接口的函数:配置类消息的生产者消费者 消息回退

生产者代码,加入消息回退的参数回调接口 备份交换机

配置类的修改报警消费者 RabbitMQ其他

幂等性问题

幂等性概念幂等性解决方法消费端的幂等性保障 优先级队列

队列添加优先级的方式

在Web界面进行添加在代码的队列中进行优先级添加在发送消息的代码中进行添加注意 惰性队列

两种模式 RabbitMQ集群

搭建步骤镜像队列

镜像队列的设置:

RabbitMQ发生异常的处理

首先,RabbitMQ发生异常可能有两种情况,可能是交换机异常或者是消息队列异常,所以当交换机收到生产者发送的消息时,需要让生产者知道,该消息交换机已经接收到了,或者没有接收到。
所以就需要在Spring Boot中添加配置信息:发布消息成功到交换器后会触发回调方法

spring.rabbitmq.publisher-/confirm/i-type=correlated

NONE:禁用发布确认模式,是默认值CORRELATED:发布消息成功到交换器后会触发回调方法SIMPLE:经测试有两种效果,其一效果和CORRELATED值一样会触发回调方法,其二在发布消息成功后使用rabbitTemplate调用waitFor/confirm/is或waitFor/confirm/isOrDie方法 等待broker节点返回发送结果,根据返回结果来判定下一步的逻辑,要注意的点是 waitFor/confirm/isOrDie方法如果返回false则会关闭channel,则接下来无法发送消息到broker 交换机确认: 需要实现一个回调接口的函数:

这个回调函数可以感知交换机是否收到消息,如果交换机出现了异常,就返回给生产者。

@Component 
@Slf4j 
public class MyCallBack implements RabbitTemplate./confirm/iCallback { 
 
@Override 
public void /confirm/i(CorrelationData correlationData, boolean ack, String cause) { String id=correlationData!=null?correlationData.getId():""; 
if(ack){ 
	log.info("交换机已经收到id为:{}的消息",id); 
}else{
 	log.info("交换机还未收到id为:{}消息,由于原因:{}",id,cause); 
 	} 
 } 
}

但是实现这个接口只能够让生产者知道该消息是否到达交换机,但是并不知道这条消息的具体内容:如下面的代码:
首先按照该方式设计好交换机和队列

配置类
@Configuration 
public class /confirm/iConfig { 
	public static final String /confirm/i_EXCHANGE_NAME = "/confirm/i.exchange"; 
	public static final String /confirm/i_QUEUE_NAME = "/confirm/i.queue"; 
	//声明业务Exchange 
	@Bean("/confirm/iExchange") 
	public DirectExchange /confirm/iExchange(){ 
		return new DirectExchange(/confirm/i_EXCHANGE_NAME);
	 } 
	 // 声明确认队列 
	 @Bean("/confirm/iQueue") 
	 public Queue /confirm/iQueue(){ 
	 	return QueueBuilder.durable(/confirm/i_QUEUE_NAME).build(); 
	 } 
	 // 声明确认队列绑定关系 
	 @Bean 
	 public Binding queueBinding(@Qualifier("/confirm/iQueue") Queue queue, 					@Qualifier("/confirm/iExchange") DirectExchange exchange){
	 	 return BindingBuilder.bind(queue).to(exchange).with("key1"); 
	 	}
	 }
消息的生产者
@RestController 
@RequestMapping("//confirm/i") 
@Slf4j 
public class Producer { 
	public static final String /confirm/i_EXCHANGE_NAME = "/confirm/i.exchange"; 		
	@Autowired 
	private RabbitTemplate rabbitTemplate; 
	@Autowired 
	private MyCallBack myCallBack; 
	//依赖注入rabbitTemplate之后再设置它的回调对象 
	@PostConstruct 
	public void init(){ 
		rabbitTemplate.set/confirm/iCallback(myCallBack); 
	} 
	
	@GetMapping("sendMessage/{message}") 
	public void sendMessage(@PathVariable String message){
		//指定消息id为随机数
		CorrelationData correlationData1 = new CorrelationData(UUID.randomUUID().toString());
		String routingKey="key1"; 			
		rabbitTemplate.convertAndSend(/confirm/i_EXCHANGE_NAME,
												routingKey,
										message+routingKey,
										correlationData1); 
		CorrelationData correlationData2=new CorrelationData(UUID.randomUUID().toString());
		rabbitTemplate.convertAndSend(/confirm/i_EXCHANGE_NAME,
												routingKey,
										message+routingKey,
										correlationData2);
		log.info("发送消息内容:{}",message);
	 } 
	}
消费者
@Component 
@Slf4j 
public class /confirm/iConsumer { 
	public static final String /confirm/i_QUEUE_NAME = "/confirm/i.queue";
	@RabbitListener(queues =/confirm/i_QUEUE_NAME) 
	public void receiveMsg(Message message){ 
		String msg=new String(message.getBody()); 
		log.info("接受到队列/confirm/i.queue消息:{}",msg); 
	}
}

结果分析:

可以看到,发送了两条消息,第一条消息的 RoutingKey 为 “key1”,第二条消息的 RoutingKey 为 “key2”,两条消息都成功被交换机接收,也收到了交换机的确认回调,但消费者只收到了一条消息,因为第二条消息的 RoutingKey 与队列的 BindingKey 不一致,也没有其它队列能接收这个消息,所有第二条消息被直接丢弃了。

消息回退

在仅开启了生产者确认机制的情况下,交换机接收到消息后,会直接给消息生产者发送确认消息,如果发现该消息不可路由,那么消息会被直接丢弃,此时生产者是不知道消息被丢弃这个事件的。那么如何让无法被路由的消息让生产者知道。通过设置mandatory参数可以在当消息传递过程中不可达目的地时将消息返回给生产者。

生产者代码,加入消息回退的参数
@Slf4j 
@Component 
public class MessageProducer implements RabbitTemplate./confirm/iCallback , RabbitTemplate.ReturnCallback { 
@Autowired 
private RabbitTemplate rabbitTemplate; 
//rabbitTemplate注入之后就设置该值 
@PostConstruct 
private void init() { 
	rabbitTemplate.set/confirm/iCallback(this); 
	 
 	rabbitTemplate.setMandatory(true); 
	 //设置回退消息交给谁处理 
	 rabbitTemplate.setReturnCallback(this); 
 } 

@GetMapping("sendMessage") 
public void sendMessage(String message){ 
	//让消息绑定一个id值 
	CorrelationData correlationData1 = new CorrelationData(UUID.randomUUID().toString());
	rabbitTemplate.convertAndSend("/confirm/i.exchange","key1",
											message+"key1"
											,correlationData1);
 	log.info("发送消息id为:{}内容为{}",correlationData1.getId(),message+"key1");
	CorrelationData correlationData2 = new 	CorrelationData(UUID.randomUUID().toString()); rabbitTemplate.convertAndSend("/confirm/i.exchange",
										"key2",
								message+"key2",
								correlationData2); 
	log.info("发送消息id为:{}内容为{}",correlationData2.getId(),message+"key2"); 
}


 @Override 
 public void /confirm/i(CorrelationData correlationData, boolean ack, String cause) { 
	 String id = correlationData != null ? correlationData.getId() : ""; 
	 if (ack) { 
		 log.info("交换机收到消息确认成功, id:{}", id); 
	 } else { 
 		log.error("消息id:{}未成功投递到交换机,原因是:{}", id, cause); 
	 } 
 } 
 @Override 
 public void returnedMessage(Message message, 
							 int replyCode, 
 							String replyText, 
							String exchange, 
 							String routingKey){ 
  	log.info("消息:{}被服务器退回,退回原因:{}, 交换机是:{}, 路由key:{}",
  										new String(message.getBody()),
  										replyText, 
  										exchange, 
  									routingKey); 
  	} 
  }
回调接口

设置当消息无法路由的时候的回调方法

@Component 
@Slf4j 
public class MyCallBack implements RabbitTemplate./confirm/iCallback,
									RabbitTemplate.ReturnCallback { 
	 
	@Override
	public void /confirm/i(CorrelationData correlationData, 
											boolean ack, 
										String cause) { 
		String id=correlationData!=null?correlationData.getId():"";
 	if(ack){
 		 log.info("交换机已经收到id为:{}的消息",id);
  	 }else{ 
  		 log.info("交换机还未收到id为:{}消息,由于原因:{}",id,cause);
    } 
  } 
    //当消息无法路由的时候的回调方法 
    @Override 
    public void returnedMessage(Message message,
    							 int replyCode, 
    						 String replyText, 
   							  String exchange,
     						 String routingKey) { 
   	 log.error("消息{},被交换机{}退回,退回原因:{},路由key:{}",
   	 						new String(message.getBody()),
   							 exchange,
   							 replyText,
   	 						routingKey);
     }
  }

结果分析:
未被正常接收的消息被成功退回给生产者

因此生产者接收到退回的消息之后就可以将消息进行重新发送。

备份交换机

有了 mandatory 参数和回退消息,我们获得了对无法投递消息的感知能力,有机会在生产者的消息无法被投递时发现并处理。但有时候,我们并不知道该如何处理这些无法路由的消息,最多打个日志,然后触发报警,再来手动处理。而通过日志来处理这些无法路由的消息是很不优雅的做法,特别是当生产者所在的服务有多台机器的时候,手动复制日志会更加麻烦而且容易出错。而且设置 mandatory 参数会增加生产者的复杂性,需要添加处理这些被退回的消息的逻辑。如果既不想丢失消息,又不想增加生产者的复杂性,该怎么做呢?前面在设置死信队列的文章中,我们提到,可以为队列设置死信交换机来存储那些处理失败的消息,可是这些不可路由消息根本没有机会进入到队列,因此无法使用死信队列来保存消息。在RabbitMQ中,有一种备份交换机的机制存在,可以很好的应对这个问题。什么是备份交换机呢?备份交换机可以理解为 RabbitMQ 中交换机的“备胎”,当我们为某一个交换机声明一个对应的备份交换机时,就是为它创建一个备胎,当交换机接收到一条不可路由消息时,将会把这条消息转发到备份交换机中,由备份交换机来进行转发和处理,通常备份交换机的类型为 Fanout ,这样就能把所有消息都投递到与其绑定的队列中,然后我们在备份交换机下绑定一个队列,这样所有那些原交换机无法被路由的消息,就会都进入这个队列了。当然,我们还可以建立一个报警队列,用独立的消费者来进行监测和报警。

配置类的修改

在上面的基础上添加了备份交换机,备份队列和报警队列,将这些交换机和队列按照图示的方式进行申明和绑定。

@Configuration 
public class /confirm/iConfig {
	public static final String /confirm/i_EXCHANGE_NAME = "/confirm/i.exchange"; 
	public static final String /confirm/i_QUEUE_NAME = "/confirm/i.queue"; 
	public static final String BACKUP_EXCHANGE_NAME = "backup.exchange"; 
	public static final String BACKUP_QUEUE_NAME = "backup.queue"; 
	public static final String WARNING_QUEUE_NAME = "warning.queue"; 
	// 声明确认队列 
	@Bean("/confirm/iQueue") 
	public Queue /confirm/iQueue(){ 
		return QueueBuilder.durable(/confirm/i_QUEUE_NAME).build(); 
	} 
	//声明确认队列绑定关系 
	@Bean 
	public Binding queueBinding(@Qualifier("/confirm/iQueue") Queue queue,
					@Qualifier("/confirm/iExchange") DirectExchange exchange){ 
	return BindingBuilder.bind(queue).to(exchange).with("key1"); 
	} 
	//声明备份Exchange 
	@Bean("backupExchange") 
	public FanoutExchange backupExchange(){ 
		return new FanoutExchange(BACKUP_EXCHANGE_NAME); 
	}
	//声明确认Exchange交换机的备份交换机 
	@Bean("/confirm/iExchange") 
	public DirectExchange /confirm/iExchange(){ 
	ExchangeBuilder exchangeBuilder = ExchangeBuilder
						.directExchange(/confirm/i_EXCHANGE_NAME)
						.durable(true) 
						//设置该交换机的备份交换机 
						.withArgument("alternate-exchange", BACKUP_EXCHANGE_NAME); 
		return (DirectExchange)exchangeBuilder.build();
	 }
	 // 声明警告队列 
	 @Bean("warningQueue") 
	 public Queue warningQueue(){ 
	 	return QueueBuilder.durable(WARNING_QUEUE_NAME).build(); 
	 } 
	 // 声明报警队列绑定关系 
	 @Bean 
	 public Binding warningBinding(@Qualifier("warningQueue") Queue queue, 
	 @Qualifier("backupExchange") FanoutExchange backupExchange){
	 	 return BindingBuilder.bind(queue).to(backupExchange); 
	 	} 
	 // 声明备份队列 
	 @Bean("backQueue") 
	 public Queue backQueue(){ 
		 return QueueBuilder.durable(BACKUP_QUEUE_NAME).build();
	  } 
	  // 声明备份队列绑定关系 
	  @Bean 
	  public Binding backupBinding(@Qualifier("backQueue") Queue queue, @Qualifier("backupExchange") FanoutExchange backupExchange){ 
	  return BindingBuilder.bind(queue).to(backupExchange); 
	  }
 }
报警消费者
@Component 
@Slf4j 
public class WarningConsumer { 
	public static final String WARNING_QUEUE_NAME = "warning.queue";
	@RabbitListener(queues = WARNING_QUEUE_NAME)
	public void receiveWarningMsg(Message message) { 
		String msg = new String(message.getBody()); 
		log.error("报警发现不可路由消息:{}", msg); 
	} 
}

重新启动项目的时候需要把原来的/confirm/i.exchange删除因为我们修改了其绑定属性,不然报错。


mandatory参数与备份交换机可以一起使用的时候,如果两者同时开启,消息究竟何去何从?谁优先级高,经过上面结果显示答案是:备份交换机优先级高。
因为未被路由的消息是经过被封交换机的报警队列获得的,并没有通过Mandatory回退的形式告诉生产者。

RabbitMQ其他 幂等性问题 幂等性概念

用户对于同一操作发起的一次请求或者多次请求的结果是一致的,不会因为多次点击而产生了副作用。举个最简单的例子,那就是支付,用户购买商品后支付,支付扣款成功,但是返回结果的时候网络异常,此时钱已经扣了,用户再次点击按钮,此时会进行第二次扣款,返回结果成功,用户查询余额发现多扣钱了,流水记录也变成了两条。在以前的单应用系统中,我们只需要把数据操作放入事务中即可,发生错误立即回滚,但是再响应客户端的时候也有可能出现网络中断或者异常等等。
消费者在消费MQ中的消息时,MQ已把消息发送给消费者,消费者在给MQ返回ack时网络中断,故MQ未收到确认信息,该条消息会重新发给其他的消费者,或者在网络重连后再次发送给该消费者,但实际上该消费者已成功消费了该条消息,造成消费者消费了重复的消息。

幂等性解决方法

MQ消费者的幂等性的解决一般使用全局ID 或者写个唯一标识比如时间戳 或者UUID 或者订单消费者消费MQ中的消息也可利用MQ的该id来判断,或者可按自己的规则生成一个全局唯一id,每次消费消息时用该id先判断该消息是否已消费过。

消费端的幂等性保障

在海量订单生成的业务高峰期,生产端有可能就会重复发生了消息,这时候消费端就要实现幂等性,这就意味着我们的消息永远不会被消费多次,即使我们收到了一样的消息。业界主流的幂等性有两种操作:

a.唯一ID+指纹码机制,利用数据库主键去重。
指纹码:我们的一些规则或者时间戳加别的服务给到的唯一信息码,它并不一定是我们系统生成的,基本都是由我们的业务规则拼接而来,但是一定要保证唯一性,然后就利用查询语句进行判断这个id是否存在数据库中,优势就是实现简单就一个拼接,然后查询判断是否重复;劣势就是在高并发时,如果是单个数据库就会有写入性能瓶颈当然也可以采用分库分表提升性能,但也不是我们最推荐的方式。b.利用redis的原子性去实现
利用redis执行setnx命令,天然具有幂等性。从而实现不重复消费 优先级队列

在我们系统中有一个订单催付的场景,我们的客户在天猫下的订单,淘宝会及时将订单推送给我们,如果在用户设定的时间内未付款那么就会给用户推送一条短信提醒,很简单的一个功能对吧,但是,tmall商家对我们来说,肯定是要分大客户和小客户的对吧,比如像苹果,小米这样大商家一年起码能给我们创造很大的利润,所以理应当然,他们的订单必须得到优先处理,而曾经我们的后端系统是使用redis来存放的定时轮询,大家都知道redis只能用List做一个简简单单的消息队列,并不能实现一个优先级的场景,
所以订单量大了后采用RabbitMQ进行改造和优化,如果发现是大客户的订单给一个相对比较高的优先级,否则就是默认优先级。

队列添加优先级的方式

在Web界面进行添加

点击Maximum priority属性,设置相应的优先级就行。

在代码的队列中进行优先级添加
Map params=new HashMap();
params.put("x-max-priotity",10);//表示可以设置的最大优先级
channel.queueDeclare("hello",true,false,false,params);
在发送消息的代码中进行添加
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().priority(5).build();
注意

消息如果为设置优先级,默认优先级最低。
要让队列实现优先级需要做的事情有如下事情:

队列需要设置为优先级队列,消息需要设置消息的优先级,消费者需要等待消息已经发送到队列中才去消费,因为这样才有机会对消息进行排序 惰性队列

RabbitMQ从3.6.0版本开始引入了惰性队列的概念。惰性队列会尽可能的将消息存入磁盘中,而在消费者消费到相应的消息时才会被加载到内存中,它的一个重要的设计目标是能够支持更长的队列,即支持更多的消息存储。当消费者由于各种各样的原因(比如消费者下线、宕机亦或者是由于维护而关闭等)而致使长时间内不能消费消息造成堆积时,惰性队列就很有必要了。
默认情况下,当生产者将消息发送到RabbitMQ的时候,队列中的消息会尽可能的存储在内存之中,这样可以更加快速的将消息发送给消费者。即使是持久化的消息,在被写入磁盘的同时也会在内存中驻留一份备份。当RabbitMQ需要释放内存的时候,会将内存中的消息换页至磁盘中,这个操作会耗费较长的时间,也会阻塞队列的操作,进而无法接收新的消息。虽然RabbitMQ的开发者们一直在升级相关的算法,但是效果始终不太理想,尤其是在消息量特别大的时候。

两种模式

队列具备两种模式:default和lazy。

默认的为default模式,在3.6.0之前的版本无需做任何变更。lazy模式即为惰性队列的模式,可以通过调用channel.queueDeclare方法的时候在参数中设置,也可以通过Policy的方式设置,如果一个队列同时使用这两种方式设置的话,那么Policy的方式具备更高的优先级。如果要通过声明的方式改变已有队列的模式的话,那么只能先删除队列,然后再重新声明一个新的。在队列声明的时候可以通过“x-queue-mode”参数来设置队列的模式,取值为“default”和“lazy”。下面示例中演示了一个惰性队列的声明细节:

Map args = new HashMap();
args.put("x-queue-mode", "lazy");
channel.queueDeclare("myqueue", false, false, false, args);
RabbitMQ集群

最开始我们介绍了如何安装及运行RabbitMQ服务,不过这些是单机版的,无法满足目前真实应用的要求。如果RabbitMQ服务器遇到内存崩溃、机器掉电或者主板故障等情况,该怎么办?单台RabbitMQ服务器可以满足每秒1000条消息的吞吐量,那么如果应用需要RabbitMQ服务满足每秒10万条消息的吞吐量呢?购买昂贵的服务器来增强单机RabbitMQ务的性能显得捉襟见肘,搭建一个RabbitMQ集群才是解决实际问题的关键.

搭建步骤

将三个主机进行相应的配置之后,三个Rabbit MQ就相当于是相通的。

1.首先需要三台服务器。2.修改3台机器的主机名称 ,在该文件中修改vim /etc/hostname3.一台主机相当于一个node节点,在每个主机上都配置各个节点的 hosts 文件,让各个节点都能互相识别对方,三台主机都能相互识别对方。
vim /etc/hosts

192.168.6.100 node1
192.168.6.100 node2
192.168.6.100 node3

4.确保各个节点的cookie文件使用的是同一个值
在node1上执行远程操作命令
scp /var/lib/rabbitmq/.erlang.cookie root@node2:/var/lib/rabbitmq/.erlang.cookie
scp /var/lib/rabbitmq/.erlang.cookie root@node3:/var/lib/rabbitmq/.erlang.cookie5.启动RabbitMQ服务,顺带启动Erlang虚拟机和RbbitMQ应用服务(在三台节点上分别执行以下命令)
rabbitmq-server -detached6.在节点2执行
rabbitmqctl stop_app
(rabbitmqctl stop会将Erlang虚拟机关闭,rabbitmqctl stop_app只关闭RabbitMQ服务)
rabbitmqctl reset
rabbitmqctl join_cluster rabbit@node1
rabbitmqctl start_app(只启动应用服务)7.在节点3执行
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl join_cluster rabbit@node2
rabbitmqctl start_app8.集群状态
rabbitmqctl cluster_status9.需要重新设置用户
创建账号
rabbitmqctl add_user admin 123
设置用户角色
rabbitmqctl set_user_tags admin administrator
设置用户权限
rabbitmqctl set_permissions -p “/” admin “." ".” “.*”10.解除集群节点(node2和node3机器分别执行)
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl start_app
rabbitmqctl cluster_status
rabbitmqctl forget_cluster_node rabbit@node2(node1机器上执行) 镜像队列

如果RabbitMQ集群中只有一个Broker节点,那么该节点的失效将导致整体服务的临时性不可用,并且也可能会导致消息的丢失。可以将所有消息都设置为持久化,并且对应队列的durable属性也设置为true,但是这样仍然无法避免由于缓存导致的问题:因为消息在发送之后和被写入磁盘井执行刷盘动作之间存在一个短暂却会产生问题的时间窗。通过publisher/confirm/i机制能够确保客户端知道哪些消息己经存入磁盘,尽管如此,一般不希望遇到因单点故障导致的服务不可用。
引入镜像队列(Mirror Queue)的机制,可以将队列镜像到集群中的其他Broker节点之上,如果集群中的一个节点失效了,队列能自动地切换到镜像中的另一个节点上以保证服务的可用性。

镜像队列的设置:

1.启动三台集群节点

2.随便找一个节点添加policy(相关策略)在web界面中进行添加就行

Name:随便取就行

pattern:设置为镜像模式

apply to:Exchanges and queues

ha-mode:exactly

ha-params:2,表示自动备份两份

ha-sync-mode:设置为自动同步

3.在node1节点上创建一个队列发送一条消息,可以看到该队列在其他的节点上存在镜像队列

4.停掉node1之后发现node2成为了镜像队列

5.就算整个集群只剩下一台机器了 依然能消费队列里面的消息,说明队列里面的消息被镜像队列传递到相应机器里面了

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/780564.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号