栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

Kafka 中的一些坑

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Kafka 中的一些坑

引言

以此记录一下在使用 Kafka 的过程中遇到的一些古怪问题
因为是初学,所以遇到的问题还不算多,之后会慢慢更新(大概 )


生产者相关

记录一下有关 Producer 的一些问题

Topic 不存在

问题描述:org.apache.kafka.common.errors.TimeoutException: Topic xxx not present in metadata after xxx ms.

很显然,意思是 Topic 不存在于 metadata 导致了 Producer 发送超时
这个问题倒是很常见,但是它就怪在:导致这个问题出现的原因千奇百怪。比如:

  • Kafka 中没有这个 Topic,并且设置了不自动创建 Topic
  • Topic 存在,但是要发送的消息所指定的 Partition 信息有错误
  • 甚至于程序中某某依赖包发生了冲突也可能导致了这个错误(StackOverflow 上看到了一个贴子,但是还没验证过)

总之原因就是很奇葩,以上所说的这些问题根源都可以通过在网上搜索一些博客、贴子来解决

问题出现的原因

在这里要说的是另一个导致了这个问题的原因:未开放 Kafka 的相关端口
当时发现是这个原因时,我也是哭笑不得,因为这个原因和这个问题的描述(Topic xxx not presen…)没有半毛钱关系,最开始遇到这个问题的时候也没有往这方面想,导致卡了很久
这里稍微吐槽一下 Kafka 的 Java 客户端,异常描述太笼统了,导致 debug 的时候没有什么头绪

问题的解决方案

既然找到了是未开放端口引起的问题,那么解决起来也很简单:开放对应的端口呗。两种方案:

  • 暂时性开放:
# Kafka 的默认端口是 9092,可以选择暂时性开放
firewall-cmd --zone=public --add-port=9092/tcp
  • 永久性开放
# Kafka 的默认端口是 9092,可以添加 --permanent 选项来永久性开放
firewall-cmd --zone=public --add-port=9092/tcp --permanent
# 重启防火墙
systemctl restart firewalld.service
# 重载配置文件
firewall-cmd --reload

开放端口之后,问题就解决了

Batch 中的消息过期

问题描述:Expiring 1 record(s) for xxx-0:120000 ms has passed since batch creation

问题描述中的 xxx 指对应的 Topic。很明显,是说自批次创建以来,某某主题中的 1 条记录过期了
要理解这个问题,得先了解 Kafka 的生产者发送消息的原理:

“为了提高效率,消息被分批次写入 Kafka。批次就是一组消息,这些消息属于同一个主题和分区。”
													  ———— 来自《Kafka权威指南》

所以,Producer 调用了 send 方法后,消息并不会被立即发送,而是会待在某个批次中之后再一起发送。批次中的消息过期了,说明该批次并没有被发送给 Kafka,消息发送出现了问题

问题出现的原因

针对这个问题,我也在网上了搜了一些博看来看,意识到有可能是 Kafka 的一些配置出现了问题。之前在查看修改 Kafka 的配置文件(server.properties)的时候,留了一个心眼,有意的记住了 listeners 和 advertised.listeners 这两项配置。然后我设置修改了一下这两项配置,果然问题就解决了

问题的解决方案

一开始我是没有对 listeners 和 advertised.listeners 这两项配置做修改的,维持了默认的样子,如下:

# The address the socket server listens on. It will get the value returned from
# java.net.InetAddress.getCanonicalHostName() if not configured.
#   FORMAT:
#     listeners = listener_name://host_name:port
#   EXAMPLE:
#     listeners = PLAINTEXT://your.host.name:9092
#listeners=PLAINTEXT://your.host.name:9092

# Hostname and port the broker will advertise to producers and consumers. If not set,
# it uses the value for "listeners" if configured.  Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
#advertised.listeners=PLAINTEXT://your.host.name:9092

可以看到这两项配置默认是不启用的,被注释掉了
后来我开放了这两项配置:

# The address the socket server listens on. It will get the value returned from
# java.net.InetAddress.getCanonicalHostName() if not configured.
#   FORMAT:
#     listeners = listener_name://host_name:port
#   EXAMPLE:
#     listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://xx.xx.xx.xx:9092

# Hostname and port the broker will advertise to producers and consumers. If not set,
# it uses the value for "listeners" if configured.  Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
advertised.listeners=PLAINTEXT://xx.xx.xx.xx:9092

上面的 xx.xx.xx.xx 都是指你的 Kafka 服务器的 IP 地址,也就是说 listeners 和 advertised.listeners 的值是设置成一样的
然后再用 Producer 发送消息,就 ok 了


最后

文章有什么错误之处欢迎(麻烦)各位指出 ‍♂️

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/884978.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号