测试&总结
- kafka_2.13-2.7.2
- JDK8
- Spring Boot 2.5.11
疑问
- 生产者在发送消息到Kafka之前,需要显示地使用代码或者在Kafka服务器上创建topic吗?
不需要:在使用kafkaTemplate发送消息的时候,如果topic不存在,那么就会创建topic,但是只能使用默认的分区和副本数。所以如果有需要,最好还是使用new Topic()来创建topic,指定分区和副本数。- 一个分区只能被一个消费组的一个消费者监听,否则报错
- 想往特定的分区发送消息的时候可以指定分区。
- 路由规则就是消息发送到哪个分区,由hash(key)或者指定的分区决定。如果key或者分区不指定,则随机分配消息。 指定key会一直由某个分区接受:
- 如果生产者健康,但是kafka服务端不可达,这时候生产者会把消息存起来,等kafka服务端正常了再推送消息给kafka
- 客户端3秒一次发送心跳给kafka服务端,确认kafka是否存活。
- 默认情况下,即使生产者出现异常也会发送消息给kafka:
- **消息的可靠:**发送消息的可靠性和消费消息的幂等性。
- 配置了事务,必须以事务方式调用,否则报错:
类似这样的需求:要求在生产者端有多个步骤:将消息发送到kafka,然后保存到数据库。如果保存到数据库失败,还想回滚发送到kafka的消息。
配置:
transaction-id-prefix: transaction
retries: 3
acks: all
应该这里实现:- 手动确认消息被消费,最好配合事务:
每日一言
- @KafkaListener(topics = "xywei_test2", concurrency = "2", id = "group_2"),并发数为什么没有起到作用?我已经配置了分区数不是1了。
- 如何显示手动删除kafka消息?
- 为什么同一个客户端里面监听不同组的不同分区,却只有一个组的收到消息?并且还是轮询。我预期的结果应该是3个组都能收到消息!即使把分组的放到不同的服务工程下,也是一样的结果,只是有时候变成了hash_ip的方式,一直被固定的某个服务监听到,只有该服务挂掉才轮到其他服务,是不是我没有配置kafka集群的问题?
- 发送消息的时候,不指定分区,如何得知消息会发送到哪个分区?
- kafkaTemplate.send()的key有何用?
- 在实际生产中是指定分区好还是不指定?
- 生产中哪种业务场景使用同步或者异步发送消息?如何配置?
- 如何实现调用kafkatemplate发送消息完毕整合,发生异常就回滚。还有一种是消费回滚,如何实现?–配置事务一起用
为什么工地上还有50-60岁的人在搬砖?那是因为他们的子女无能!



