栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

spring中kafka消息丢失问题(提供者、消费者)附解决方案

spring中kafka消息丢失问题(提供者、消费者)附解决方案

spring kafka 可能会有的消息丢失的问题 以及解决方案 1.Kafka发送消息的两种方式

Kafka发送消息分为同步(sync)、异步(async)在与spring集成中通过配置文件修改
配置文件放在文末

  

acks = 0;表示producer不需要等待broker确认收到消息

acks = 1;表示producer至少需要等待leader已经成功写入本地log,但是follower如果没有成功备份同时leader挂掉,就造成消息丢失

acks = all 或者-1;表示需要等待 min.insync.replicas(默认为1,推荐配置大于2,如果配置为2,此时就需要leader和一个follower同步完后,才会返回ack)

Kafka生产者发送消息防止丢失

java提供了一个注解:@PostConstruct用来修饰非静态的void()方法,作用就是:当我们的项目启动时候就会进行预热,首先执行这个方法

使用kafkaTemplate在这个预热方法中使用setProducerListener();方法来为消息生产者创建一个监听器监听消息发送是否成功

@PostConstruct
public void initKafka(){
    kafkaTemplate.setProducerListener(new ProducerListener(){
        public void onSuccess(String topic, Integer partition, String key, String value, Recordmetadata recordmetadata) {
            System.out.println("kafka发送消息成功:topic:" + topic + "key:" + key + "value:" + value);
        }
        public void onError(String topic, Integer partition, String key, String value, Exception exception) {
            // 消息发送失败 可以执行一系列操作....
            System.err.println("kafka发送消息失败:topic:" + topic + "key:" + key + "value:" + value);
        }
    });
}
2.Kafka consumer接收消息

2.1自动提交

假设我们生产者向服务器成功发送7条消息,消费者批量拿取,假设第一次拿取1,2两条消息,当消费者拿到消息之后会周期性的提交offset,也就是说,我拿到消息之后我就反馈给broker:我下次要从3开始,如果宕机,重启后还是从3开始就会造成消息丢失

解决方案: 在kafka producer发送之前,存入redis缓存中,当消费者正常消费时候在删除,同时,在编写一个定时任务,定时获取redis中消费失败的消息重新发送

// 防止消息丢失 (存入redis中)
redisTemplate.opsForHash().put("order",out_trade_no,order);
// kafka 发送消息
kafkaTemplate.send("test","order", JSON.toJSONString(order));

定时任务 cron表达式在线生成

@Scheduled(cron = "0 * * * * ?")
public void reSendMsg(){
    List orderList = redisTemplate.opsForHash().values("order");
    if(orderList == null){
        return;
    }
    // 重新发送给消费者完成消费
    for (Order order : orderList) {
        kafkaTemplate.send("hgShop","order", JSON.toJSONString(order));
    }
}
2.2手动提交

当我们使用的手动提交时候,我们拿走1,2数据,但是在没有消费成功的情况下不反馈给 broker,比如:我消息1 消费完成,这时候消费者挂掉,重启之后,消费者重新获取消息,还是从2开始,因为消息1消费完成之后反馈给broker,但是消息2丢失了,没有反馈,broker就认为没有消费,这样也就能较好的避免大部分消息丢失。

要注意的是

在消费者端一定是实现 AcknowledgingMessageListener 接口 实现 重写两个参数的方法 然后 手动提交!!!

配置文件 自动提交–消费者



	
		
			
			
			
			
			
			
			
			
			
			
				
			
		
	
    
    
    
        
    
    
    
        
    

自动提交–生产者



    
    
        
            
                
                
             
                

                
            
        
    
    
    
        
            
        
    
    
    
        
        
        
    

手动提交–消费者



	
	
		
		
	

	
	
		
		
		
		
		
		
	

	       
	


	
	
		
			
		
	

	
		
			
				                   
				
				
				
		
				
				
				
				

				

				
			
		
	

手动提交–生产者



    
    
        
            
                
                
                
                
                
                
                
                
                
                

                
                
                
                
            
        
    

    
    
        
            
        
    

    
    
        
        
        
    

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/758389.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号