最近同时发现 Kafka Broker 的 TCP 连接比较多,单台 broker 已经达到了10K+,认为太多的 TCP 连接会给 Broker 带来压力,要求平台开发进行解决。在进行简单的排查后发现有一类任务会启动很多 worker, 每个worker会启动一个 Kafka Prouder ,向某个topic 上报数据,基本情况如下:
-
worker 有 3000+
-
producer 发送总的 qps 300+
-
单条消息大小 100K
-
topic 有 16 个 partition,3 replica,每个 leader partition 分布在不同的 broker 上
综合上面的参数,直观上认为这种情况确实会对 broker 产生压力,原因如下:
-
produer 会向 broker 更新元数据,由于 producer 的实例比较多,发送的请求量会比较大
-
Kafka producer 会和每个 partition 的 leader,建立 TCP 连接,通过简单的计算,可知会产生 16*3000 = 48000 个 TCP 连接, 与单台 broker 建立的连接在 3000+
但仔细思考一下,事情其实并不简单,会涉及到很多知识。
关于元数据更新
Kafka prouder 更新元数据的默认间隔是 5 分钟,对应的配置项如下,折算下来更新元数据的 qps 也就3000/(5*60) = 10,并不会产生太大影响。
.define(metaDATA_MAX_AGE_CONFIG, Type.LONG, 5 * 60 * 1000, atLeast(0), importance.LOW, metaDATA_MAX_AGE_DOC)
关于 TCP 连接
Kafka broker 的网络模型采用的是 NIO, 理论上过多的 TCP 连接并不会对 broker 带来过大压力,如何验证这个问题呢?
由于producer 发送消息总的 qps 不高,实际上并不需要那么多的partition,只要把 partition 个数减少,就能快速验证这个问题。比如把 partition 个数设置为2, 连接数就会减少 3000*(16-2)= 42000。由于 Kafak 的 topic 不能缩容,可以新建一个 topic,但 producer 的个数比较多,修改topic 名称后,全部重启一遍是一件很麻烦的事。这个场景下的数据相对不是那么重要,为了快速验证,最终选择了对 topic 进行强制删除,然后重建。
topic 的强制删除是一个比较暴力的操作,必须要考虑周全,不然很容易触发意想不到的问题。
1. 如果 producer 发送失败,确保不会对业务产生影响,对应的业务代码如下
private static final class ReporterTask extends TimerTask {
private final KafkaProducer producer;
private ReporterTask(KafkaProducer producer) {
this.producer = producer;
}
@Override
public void run() {
try {
producer.send(xxx);
} catch (Throwable t) {
LOG.warn("Error while reporting metrics", t);
}
}
}
//启动代码
executor.scheduleWithFixedDelay(new ReporterTask(producer), period, period, timeunit);
对 prouder 的代码进行分析后,发现是个定时任务,并进行了try catch ,所以发送失败后并不会产生太大影响,重试即可。
2. 对 topic 的删除流程做到心中有数,确认 broker 端的配置。
delete.topic.enable=true # 允许删除topic auto.create.topics.enable=false #禁止自动创建topic
3. 如果允许的话最好把 consumer 停掉,不停掉也是可以的,同样的也要确保消费报错,不会对业务逻辑产生影响。
接下来就是一波操作猛如虎,具体效果如下
调整前后established tcp 变化图
调整前后cpu变化图
调整前后内存变化图
从图中可以看到,有两台 broker 机器的 TCP 连接,瞬间增加了3000+,其它 broker 机器在9分钟之后,TCP 连接下降了3000+,但机器的 cpu,内存并没有明显变化。
由此可以得出,过多的 TCP 连接不会对 broker产生压力,主要原因是 broker的网络模型为NIO,事件驱动的,并不会消耗太多 cpu,根据这篇文章可以知道https://zhuanlan.zhihu.com/p/25241630,一个 TCP 连接大概占用 3.15KB的内存,所以也不会对内存造成压力。
拓展一下,9分钟之后 TCP 连接才关闭,是由 producer 中的配置项决定的, 如果9分钟内 tcp连接没有活跃过,produer 将会把 TCP 连接关闭。由前面可知,produer 每隔5分钟更新一次元数据,所以可以认为 produer 和 broker 之间是长连接。
//producer 端配置
define(CONNECTIONS_MAX_IDLE_MS_CONFIG,
Type.LONG,
9 * 60 * 1000,
importance.MEDIUM,
CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_DOC)
//broker 端配置,默认10分钟
val connectionsMaxIdleMs = getLong(KafkaConfig.ConnectionsMaxIdleMsProp)
最后进行简单的总结
1. 过多的 TCP 连接并不会对 Kafka Broker 带来压力。
2. 尽管在这种情况下,我们还是要有意识的控制 partition 的个数,避免在某些情况下 TCP 的连接多出好几万。



