栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

kafka问题 Aattempt to heart beat failed since the group is rebalancing

kafka问题 Aattempt to heart beat failed since the group is rebalancing

kafka 2.11 attempt to heart beat failed since the group is rebalancing
生产环境遇到kafka 2.11 重平衡问题,记录
为了解决问题,先还原此报错

window下搭建kafka单节点

https://kafka.apache.org/downloads 2.1.1版本
下载后解压
修改 config目录下 zookeeper.properties **dataDir **指定zk数据存放目录(默认是linux目录结构)
修改 config目录下 server.properties log **log.dirs **指定kafka日志存放目录(默认是linux目录结构)

启动zk
.binwindowszookeeper-server-start.bat configzookeeper.properties

zk端口默认2181

启动kafka
 .binwindowskafka-server-start.bat configserver.properties

kafka默认端口 9092

使用kafka_toole工具连接测试:

测试成功,单节点搭建完成

模拟程序生产者和消费者

开发程序,模拟生产者和消费者

# 引入jar包

  org.springframework.kafka
  spring-kafka

配置文件
spring.kafka.producer.bootstrap-servers=localhost:9092 # 配置kafka地址
spring.kafka.consumer.max-poll-records=1 #每次拉取一条数据 方便测试
spring.kafka.consumer.heartbeat-interval=100 # 心跳时间100ms
生产者

使用KafkaTemplate
controller和service代码省略

package com.example.kafka_test.service.impl;

import com.example.kafka_test.service.ProducerService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

import static com.example.kafka_test.KafkaConstant.TOPIC_NAME;


@Service
public class ProducerServiceImpl implements ProducerService {

    @Autowired
    private KafkaTemplate kafkaTemplate;

    @Override
    public void sendMsg(String msg) {
        kafkaTemplate.send(TOPIC_NAME, msg);
    }
}

消费者
package com.example.kafka_test.service.impl;

import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;

import static com.example.kafka_test.KafkaConstant.CONSUMER_GROUP;
import static com.example.kafka_test.KafkaConstant.TOPIC_NAME;


@Slf4j
@Component
public class ConsumerServiceImpl {
    @KafkaListener(topics = TOPIC_NAME, groupId = CONSUMER_GROUP)
    public void consumerMsg(String msg) {
        try {
            # 休眠 防止消费速度快 无法观察日志
            Thread.sleep(10000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        log.info(msg);
    }
}

测试1-重平衡的触发 造数据

使用postman调用生产者造数据

启动第一个消费者


正常启动,且开始消费

启动第二个消费者

同样的分区,同样的消费者组
启动成功后,发现第一个消费者大量输出重平衡日志,且重平衡期间数据并没有消费

总结

消费者的数量变动会触发Rebalance 重平衡期间数据不消费

测试2-还原报错

Attempt to heart beat failed since the group is rebalancing

修改配置参数
spring.kafka.consumer.heartbeat-interval=100 # 每100ms发送一次心跳
spring.kafka.consumer.properties.max.poll.interval.ms=9000 #最大每9000ms拉取一次数据
spring.kafka.consumer.properties.session.timeout.ms=30000 #超过此时间没有发送心跳,则认为消费者死亡,剔除组,并触发rebalance

修改后像测试1一样重启,发现大量报错![image.png](https://img-blog.csdnimg.cn/img_convert/427ef7f0f4905a582a30bab7b68bb7c1.png#clientId=u2e6e65cc-8171-4&from=paste&height=296&id=u186d515a&margin=[object Object]&name=image.png&originHeight=296&originWidth=1834&originalType=binary&ratio=1&size=95316&status=done&style=none&taskId=u3e3f0995-2de6-4f0a-b3cb-374988c7bc8&width=1834)

还原报错成功,总结一下

总结 错误原因:

Attempt to heart beat failed since the group is rebalancing
先分析一下这句话,发送心跳请求失败,消费者组正在重平衡
也就是说触发这个问题的条件有两个:

  1. 发送心跳
  2. 消费者重平衡
发送心跳请求:

在kafka 0.11版本之前,心跳请求是跟poll()请求一起发送的,即拉取一次数据发送一次心跳
在kafka 0.11版本之后,心跳请求是单独的线程,由 spring.kafka.consumer.heartbeat-interval 参数控制心跳请求的间隔时间

重平衡:

触发重平衡的情况如下:

  1. 有新的consumer加入
  2. 旧的consumer挂了
  3. coordinator挂了,集群选举出新的coordinator
  4. topic的partition新加
  5. consumer调用unsubscrible(),取消topic的订阅

经过以上分析,可以得到结果;
由于拉取数据消费过慢,两次poll之间的时间超过了session.timeout.ms,被认为此消费者已死亡,触发了rebalance,而在rebalance的过程中,发送心跳请求导致的报错.

解决办法:
  1. 调高心跳请求的间隔时间 heartbeat-interval 此项可不调整,因为心跳时间偏大的情况下,也可能触发rebalance
  2. 调高超时时间 session.timeout.ms
  3. 减少每次poll的拉取数据量 max-poll-records 防止每次poll拉取的数据处理的时间过长,导致超时

其中最好的办法就是2 3
最重要的就是poll到的数据要在session.timeout.ms时间内处理完.

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/604653.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号