Kafka
1 Kafka中的基本概念2 spring-kafka
2.1 集群消费(Clustering)2.2 @KafkaListener2.3 批量发送消息2.4 批量消费消息2.5 消费重试2.6 广播消费2.7 并发消费2.8 顺序消息2.9 事务消息2.10 消费进度的提交机制2.11 配置示例 3 消息重复消费与幂等性
3.1 重复消费的问题3.2 幂等性 4 消息的可靠性
4.1 消费端弄丢了数据4.2 Kafka 弄丢了数据4.3 生产者会不会弄丢数据? 5 参考文章
Kafka消息持久化到磁盘,因此可用于批量消费
支持 Server 间的消息分区及分布式消费,同时保证每个 partition 内的消息顺序传输
消息被处理的状态是在 consumer 端维护,而不是由 server 端维护,broker 无状态,consumer 自己保存 offset。
同时支持离线数据处理和实时数据处理。
参考文章消息队列之 Kafka
1 Kafka中的基本概念Broker:Kafka 集群中的一台或多台服务器统称为 BrokerTopic:每条发布到 Kafka 的消息都有一个类别,这个类别被称为 Topic 。(物理上不同 Topic 的消息分开存储。逻辑上一个 Topic 的消息虽然保存于一个或多个broker上,但用户只需指定消息的 Topic 即可生产或消费数据而不必关心数据存于何处)Partition:Topic 物理上的分区,一个 Topic 可以分为多个 Partition ,每个 Partition 是一个有序的队列。Partition 中的每条消息都会被分配一个有序的 id(offset)Producer:消息和数据的生产者,可以理解为往 Kafka 发消息的客户端Consumer:消息和数据的消费者,可以理解为从 Kafka 取消息的客户端Consumer Group:每个 Consumer 属于一个特定的 Consumer Group(可为每个 Consumer 指定Group Name,若不指定 Group Name 则属于默认的 Group)。 这是 Kafka 用来实现一个 Topic 消息的广播(发给所有的 Consumer )和单播(发给任意一个 Consumer )的手段。一个 Topic 可以有多个 Consumer Group。Topic 的消息会复制(不是真的复制,是概念上的)到所有的 Consumer Group,但每个 Consumer Group 只会把消息发给该 Consumer Group 中的一个 Consumer。如果要实现广播,只要每个 Consumer 有一个独立的 Consumer Group 就可以了。如果要实现单播只要所有的 Consumer 在同一个 Consumer Group 。用 Consumer Group 还可以将 Consumer 进行自由的分组而不需要多次发送消息到不同的 Topic 。 2 spring-kafka 2.1 集群消费(Clustering)
// Demo01Producer.java
@Component
public class Demo01Producer {
@Resource
private KafkaTemplate
// Demo01Consumer.java
@Component
public class Demo01Consumer {
private Logger logger = LoggerFactory.getLogger(getClass());
@KafkaListener(topics = Demo01Message.TOPIC,
groupId = "demo01-consumer-group-" + Demo01Message.TOPIC)
public void onMessage(Demo01Message message) {
logger.info("[onMessage][线程编号:{} 消息内容:{}]", Thread.currentThread().getId(), message);
}
}
// Demo01AConsumer.java
@Component
public class Demo01AConsumer {
private Logger logger = LoggerFactory.getLogger(getClass());
@KafkaListener(topics = Demo01Message.TOPIC,
groupId = "demo01-A-consumer-group-" + Demo01Message.TOPIC)
public void onMessage(ConsumerRecord record) {
logger.info("[onMessage][线程编号:{} 消息内容:{}]", Thread.currentThread().getId(), record);
}
}
差异一,在方法上,添加了 @KafkaListener 注解,声明消费的 Topic 还是 "DEMO_01" ,消费者分组修改成了 "demo01-A-consumer-group-DEMO_01" 。这样,我们就可以测试 Kafka 集群消费的特性。
集群消费模式下,相同 Consumer Group 的每个 Consumer 实例平均分摊消息。
也就是说,如果我们发送一条 Topic 为 "DEMO_01" 的消息,可以分别被 "demo01-A-consumer-group-DEMO_01" 和 "demo01-consumer-group-DEMO_01" 都消费一次。
但是,如果我们启动两个该示例的实例,则消费者分组 "demo01-A-consumer-group-DEMO_01" 和 "demo01-consumer-group-DEMO_01" 都会有多个 Consumer 示例。此时,我们再发送一条 Topic 为 "DEMO_01" 的消息,只会被 "demo01-A-consumer-group-DEMO_01" 的一个 Consumer 消费一次,也同样只会被 "demo01-A-consumer-group-DEMO_01" 的一个 Consumer 消费一次。
通过集群消费的机制,我们可以实现针对相同 Topic ,不同消费者分组实现各自的业务逻辑。例如说:用户注册成功时,发送一条 Topic 为 "USER_REGISTER" 的消息。然后,不同模块使用不同的消费者分组,订阅该 Topic ,实现各自的拓展逻辑:
积分模块:判断如果是手机注册,给用户增加 20 积分。优惠劵模块:因为是新用户,所以发放新用户专享优惠劵。站内信模块:因为是新用户,所以发送新用户的欢迎语的站内信。… 等等
这样,我们就可以将注册成功后的业务拓展逻辑,实现业务上的解耦,未来也更加容易拓展。同时,也提高了注册接口的性能,避免用户需要等待业务拓展逻辑执行完成后,才响应注册成功。
差异二,方法参数,设置消费的消息对应的类不是 Demo01Message 类,而是 Kafka 内置的 ConsumerRecord 类。通过 ConsumerRecord 类,我们可以获取到消费的消息的更多信息,例如说消息的所属队列、创建时间等等属性,不过消息的内容(value)就需要自己去反序列化。当然,一般情况下,我们不会使用 ConsumerRecord 类。
2.2 @KafkaListenerpublic @interface KafkaListener {
String id() default "";
String containerFactory() default "";
String[] topics() default {};
String topicPattern() default "";
TopicPartition[] topicPartitions() default {};
String containerGroup() default "";
String errorHandler() default "";
String groupId() default "";
boolean idIsGroup() default true;
String clientIdPrefix() default "";
String beanRef() default "__listener";
String concurrency() default "";
String autoStartup() default "";
String[] properties() default {};
}
2.3 批量发送消息
application.properties
spring.kafka.producer.batch-size=16384 # 每次批量发送消息的最大数量 spring.kafka.producer.buffer-memory=33554432 # 每次批量发送消息的最大内存 spring.kafka.producer.properties.linger.ms=30000 # 批处理延迟时间上限。不管是否消息数量是否到达 batch-size 或者消息大小到达 buffer-memory 后,都直接发送一次请求。
批量发送消息的producer看起来没有什么特别的区别
2.4 批量消费消息application.properties
spring.kafka.listener.type=BATCH # 监听器类型,默认为 SINGLE ,只监听单条消息。配置 BATCH ,监听多条消息,批量消费 spring.kafka.consumer.max-poll-records=100 # poll 一次消息拉取的最大数量 spring.kafka.consumer.fetch-min-size= 10 # poll 一次消息拉取的最小数据量,单位:字节 spring.kafka.consumer.fetch-max-wait=10000 # poll 一次拉取的阻塞的最大时长,单位:毫秒。这里指的是阻塞拉取需要满足至少 fetch-min-size 大小的消息
// Demo02Consumer.java
@Component
public class Demo02Consumer {
private Logger logger = LoggerFactory.getLogger(getClass());
@KafkaListener(topics = Demo02Message.TOPIC,
groupId = "demo02-consumer-group-" + Demo02Message.TOPIC)
public void onMessage(List messages) {
logger.info("[onMessage][线程编号:{} 消息数量:{}]", Thread.currentThread().getId(), messages.size());
}
}
2.5 消费重试
Spring-Kafka 提供消费重试的机制。在消息消费失败的时候,Spring-Kafka 会通过消费重试机制,重新投递该消息给 Consumer ,让 Consumer 有机会重新消费消息,实现消费成功。
当然,Spring-Kafka 并不会无限重新投递消息给 Consumer 重新消费,而是在默认情况下,达到 N 次重试次数时,Consumer 还是消费失败时,该消息就会进入到死信队列。
死信队列用于处理无法被正常消费的消息。当一条消息初次消费失败,Spring-Kafka 会自动进行消息重试;达到最大重试次数后,若消费依然失败,则表明消费者在正常情况下无法正确地消费该消息,此时,Spring-Kafka 不会立刻将消息丢弃,而是将其发送到该消费者对应的特殊队列中。
Spring-Kafka 将这种正常情况下无法被消费的消息称为死信消息(Dead-Letter Message),将存储死信消息的特殊队列称为死信队列(Dead-Letter Queue)。后续,我们可以通过对死信队列中的消息进行重发,来使得消费者实例再次进行消费
KafkaConfiguration配置类,增加消费异常的 ErrorHandler 处理器
// KafkaConfiguration.java
@Configuration
public class KafkaConfiguration {
@Bean
@Primary
public ErrorHandler kafkaErrorHandler(KafkaTemplate, ?> template) {
// <1> 创建 DeadLetterPublishingRecoverer 对象
// 负责实现,在重试到达最大次数时,Consumer 还是消费失败时,该消息就会发送到死信队列。
ConsumerRecordRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
// <2> 创建 FixedBackOff 对象
// 我们配置了重试 3 次,每次固定间隔 30 秒
BackOff backOff = new FixedBackOff(10 * 1000L, 3L);
// <3> 创建 SeekToCurrentErrorHandler 对象
// 处理异常,串联整个消费重试的整个过程
return new SeekToCurrentErrorHandler(recoverer, backOff);
}
}
在消息消费失败时,SeekToCurrentErrorHandler 会将 调用 Kafka Consumer 的 #seek(TopicPartition partition, long offset) 方法,将 Consumer 对于该消息对应的 TopicPartition 分区的本地进度设置成该消息的位置。这样,Consumer 在下次从 Kafka Broker 拉取消息的时候,又能重新拉取到这条消费失败的消息,并且是第一条。同时,Spring-Kafka 使用 FailedRecordTracker 对每个 Topic 的每个 TopicPartition 消费失败次数进行计数,这样相当于对该 TopicPartition 的第一条消费失败的消息的消费失败次数进行计数。 这里,胖友好好思考下,结合艿艿在上一点的描述。另外,在 FailedRecordTracker 中,会调用 BackOff 来进行计算,该消息的下一次重新消费的时间,通过 Thread#sleep(...) 方法,实现重新消费的时间间隔。有一点需要注意,FailedRecordTracker 提供的计数是客户端级别的,重启 JVM 应用后,计数是会丢失的。所以,如果想要计数进行持久化,需要自己重新实现下 FailedRecordTracker 类,通过 ZooKeeper 存储计数。 2.6 广播消费
广播消费模式下,相同 Consumer Group 的每个 Consumer 实例都接收全量的消息。
不过 Kafka 并不直接提供内置的广播消费的功能!!!此时,我们只能退而求其次,每个 Consumer 独有一个 Consumer Group ,从而保证都能接收到全量的消息。
#广播订阅下,我们一般情况下,无需消费历史的消息,而是从订阅的 Topic 的队列的尾部开始消费即可,所以配置为 latest spring.kafka.consumer.auto-offset-reset=latest2.7 并发消费
Spring-Kafka @KafkaListener ,默认是串行消费的。显然,这在监听的 Topic 每秒消息量比较大的时候,会导致消费不及时,导致消息积压的问题。
虽然说,我们可以通过启动多个 JVM 进程,实现多进程的并发消费,从而加速消费的速度。但是问题是,否能够实现多线程的并发消费呢?
@KafkaListener注解有 concurrency 属性,它可以指定并发消费的线程数。例如说,如果设置 concurrency=4 时,Spring-Kafka 就会为该 @KafkaListener 创建 4 个线程,进行并发消费。
首先,我们来创建一个 Topic 为 "DEMO_06" ,并且设置其 Partition 分区数为 10 。然后,我们创建一个 Demo06Consumer 类,并在其消费方法上,添加 @KafkaListener(concurrency=2) 注解。再然后,我们启动项目。Spring-Kafka 会根据 @KafkaListener(concurrency=2) 注解,创建 2 个 Kafka Consumer 。注意噢,是 2 个 Kafka Consumer 呢!!!后续,每个 Kafka Consumer 会被单独分配到一个线程中,进行拉取消息,消费消息。之后,Kafka Broker 会将 Topic 为 "DEMO_06" 分配给创建的 2 个 Kafka Consumer 各 5 个 Partition 。 如果不了解 Kafka Broker “分配区分”机制单独胖友,可以看看 《Kafka 消费者如何分配分区》 文章。这样,因为 @KafkaListener(concurrency=2) 注解,创建 2 个 Kafka Consumer ,就在各自的线程中,拉取各自的 Topic 为 "DEMO_06" 的 Partition 的消息,各自串行消费。从而,实现多线程的并发消费。
2.8 顺序消息注意,不要配置 concurrency 属性过大,则创建的 Kafka Consumer 分配不到消费 Topic 的 Partition 分区,导致不断的空轮询。
顺序消息的定义:
普通顺序消息 :Producer 将相关联的消息发送到相同的消息队列。完全严格顺序 :在【普通顺序消息】的基础上,Consumer 严格顺序消费。
Spring-Kafka 在 Consumer 消费消息时,天然就支持按照 Topic 下的 Partition 下的消息,顺序消费。即使在并发消费时,也能保证如此。
那么,只需要考虑将 Producer 将相关联的消息发送到 Topic 下的相同的 Partition 即可,如果胖友了解 Producer 发送消息的分区策略的话,只要我们发送消息时,指定了消息的 key ,Producer 则会根据 key 的哈希值取模来获取到其在 Topic 下对应的 Partition
// Demo06Producer.java
public SendResult syncSendOrderly(Integer id) throws ExecutionException, InterruptedException {
// 创建 Demo01Message 消息
Demo06Message message = new Demo06Message();
message.setId(id);
// 同步发送消息
// 因为我们使用 String 的方式序列化 key ,所以需要将 id 转换成 String
return kafkaTemplate.send(Demo06Message.TOPIC, String.valueOf(id), message).get();
}
2.9 事务消息
Kafka 内置提供事务消息的支持
不过 Kafka 提供的并不是完整的的事务消息的支持,缺少了回查机制
目前,常用的分布式消息队列,只有 RocketMQ 提供了完整的事务消息的支持
# Kafka 的事务消息需要基于幂等性来实现,所以必须保证所有节点都写入成功 spring.kafka.producer.acks=all # 事务编号的前缀。需要保证相同应用配置相同,不同应用配置不同 spring.kafka.producer.transaction-id-prefix=demo
使用 Kafka-Spring 封装提供的 KafkaTemplate ,实现发送事务消息。代码如下:
// Demo07Producer.java
@Component
public class Demo07Producer {
private Logger logger = LoggerFactory.getLogger(getClass());
public String syncSendInTransaction(Integer id, Runnable runner) throws ExecutionException, InterruptedException {
return kafkaTemplate.executeInTransaction(new KafkaOperations.OperationsCallback
使用 kafkaTemplate 提交的#executeInTransaction(OperationsCallback 在 #executeInTransaction(...) 方法中,我们可以通过 KafkaOperations 来执行发送消息等 Kafka 相关的操作,也可以执行自己的业务逻辑。在 #executeInTransaction(...) 方法的开始,它会自动动创建 Kafka 的事务;然后执行我们定义的 KafkaOperations 的逻辑;如果成功,则提交 Kafka 事务;如果失败,则回滚 Kafka 事务。 另外,我们定义了一个 runner 参数,用于表示本地业务逻辑~
注意,如果 Kafka Producer 开启了事务的功能,则所有发送的消息,都必须处于 Kafka 事务之中,否则会抛出 " No transaction is in process; possible solutions: run the template operation within the scope of a template.executeInTransaction() operation, start a transaction with @Transactional before invoking the template method, run in a transaction started by a listener container when consuming a record" 异常。 所以,如果胖友的业务中,即存在需要事务的情况,也存在不需要事务的情况,需要分别定义两个 KafkaTemplate(Kafka Producer)。 集成到 Spring Transaction 体系: Spring-Kafka 提供了对 Spring Transaction 的集成,所以在实际开发中,我们只需要配合使用 @Transactional 注解,来声明事务即可,而无需使用 KafkaTemplate 提供的 #executeInTransaction(...) 模板方法。 原生 Kafka Consumer 消费端,有两种消费进度提交的提交机制: 【默认】自动提交,通过配置 enable.auto.commit=true ,每过 auto.commit.interval.ms 时间间隔,都会自动提交消费消费进度。而提交的时机,是在 Consumer 的 #poll(...) 方法的逻辑里完成,在每次从 Kafka Broker 拉取消息时,会检查是否到达自动提交的时间间隔,如果是,那么就会提交上一次轮询拉取的位置。手动提交,通过配置 enable.auto.commit=false ,后续通过 Consumer 的 #commitSync(...) 或 #commitAsync(...) 方法,同步或异步提交消费进度。
Spring-Kafka Consumer 消费端,提供了更丰富的消费者进度的提交机制,更加灵活。当然,也是分成自动提交和手动提交两个大类。在 AckMode 枚举类中,可以看到每一种具体的方式。代码如下: 既然现在存在原生 Kafka 和 Spring-Kafka 提供的两种消费进度的提交机制,我们应该怎么配置呢? 使用原生 Kafka 的方式,通过配置 spring.kafka.consumer.enable-auto-commit=true 。然后,通过 spring.kafka.consumer.auto-commit-interval 设置自动提交的频率。使用 Spring-Kafka 的方式,通过配置 spring.kafka.consumer.enable-auto-commit=false 。然后通过 spring.kafka.listener.ack-mode 设置具体模式。另外,还有 spring.kafka.listener.ack-time 和 spring.kafka.listener.ack-count 可以设置自动提交的时间间隔和消息条数。
默认什么都不配置的情况下,使用 Spring-Kafka 的 BATCH 模式:每一次消息被消费完成后,在下次拉取消息之前,自动提交。 首先,比如 RabbitMQ、RocketMQ、Kafka,都有可能会出现消息重复消费的问题,正常。因为这问题通常不是 MQ 自己保证的,是由我们开发来保证的。挑一个 Kafka 来举个例子,说说怎么重复消费吧。 Kafka 实际上有个 offset 的概念,就是每个消息写进去,都有一个 offset,代表消息的序号,然后 consumer 消费了数据之后,每隔一段时间(定时定期),会把自己消费过的消息的 offset 提交一下,表示“我已经消费过了,下次我要是重启啥的,你就让我继续从上次消费到的 offset 来继续消费吧”。 但是凡事总有意外,比如我们之前生产经常遇到的,就是你有时候重启系统,看你怎么重启了,如果碰到点着急的,直接 kill 进程了,再重启。这会导致 consumer 有些消息处理了,但是没来得及提交 offset,尴尬了。重启之后,少数消息会再次消费一次。 注意:新版的 Kafka 已经将 offset 的存储从 Zookeeper 转移至 Kafka brokers,并使用内部位移主题 __consumer_offsets 进行存储。 幂等性,通俗点说,就一个数据,或者一个请求,给你重复来多次,你得确保对应的数据是不会改变的,不能出错 几个思路: 比如你拿个数据要写库,你先根据主键查一下,如果这数据都有了,你就别插入了,update 一下好吧。比如你是写 Redis,那没问题了,反正每次都是 set,天然幂等性。比如你不是上面两个场景,那做的稍微复杂一点,你需要让生产者发送每条数据的时候,里面加一个全局唯一的 id,类似订单 id 之类的东西,然后你这里消费到了之后,先根据这个 id 去比如 Redis 里查一下,之前消费过吗?如果没有消费过,你就处理,然后这个 id 写 Redis。如果消费过了,那你就别处理了,保证别重复处理相同的消息即可。比如基于数据库的唯一键来保证重复数据不会重复插入多条。因为有唯一键约束了,重复数据插入只会报错,不会导致数据库中出现脏数据。
如何保证 MQ 的消费是幂等性的,需要结合具体的业务来看。 唯一可能导致消费者弄丢数据的情况,就是说,你消费到了这个消息,然后消费者那边自动提交了 offset,让 Kafka 以为你已经消费好了这个消息,但其实你才刚准备处理这个消息,你还没处理,你自己就挂了,此时这条消息就丢咯。 这不是跟 RabbitMQ 差不多吗,大家都知道 Kafka 会自动提交 offset,那么只要关闭自动提交 offset,在处理完之后自己手动提交 offset,就可以保证数据不会丢。但是此时确实还是可能会有重复消费,比如你刚处理完,还没提交 offset,结果自己挂了,此时肯定会重复消费一次,自己保证幂等性就好了。 生产环境碰到的一个问题,就是说我们的 Kafka 消费者消费到了数据之后是写到一个内存的 queue 里先缓冲一下,结果有的时候,你刚把消息写入内存 queue,然后消费者会自动提交 offset。然后此时我们重启了系统,就会导致内存 queue 里还没来得及处理的数据就丢失了。 这块比较常见的一个场景,就是 Kafka 某个 broker 宕机,然后重新选举 partition 的 leader。大家想想,要是此时其他的 follower 刚好还有些数据没有同步,结果此时 leader 挂了,然后选举某个 follower 成 leader 之后,不就少了一些数据?这就丢了一些数据啊。 生产环境也遇到过,我们也是,之前 Kafka 的 leader 机器宕机了,将 follower 切换为 leader 之后,就会发现说这个数据就丢了。 所以此时一般是要求起码设置如下 4 个参数: 给 topic 设置 replication.factor 参数:这个值必须大于 1,要求每个 partition 必须有至少 2 个副本。在 Kafka 服务端设置 min.insync.replicas 参数:这个值必须大于 1,这个是要求一个 leader 至少感知到有至少一个 follower 还跟自己保持联系,没掉队,这样才能确保 leader 挂了还有一个 follower 吧。在 producer 端设置 acks=all :这个是要求每条数据,必须是写入所有 replica 之后,才能认为是写成功了。在 producer 端设置 retries=MAX (很大很大很大的一个值,无限次重试的意思):这个是要求一旦写入失败,就无限重试,卡在这里了。
我们生产环境就是按照上述要求配置的,这样配置之后,至少在 Kafka broker 端就可以保证在 leader 所在 broker 发生故障,进行 leader 切换时,数据不会丢失。 如果按照上述的思路设置了 acks=all ,一定不会丢,要求是,你的 leader 接收到消息,所有的 follower 都同步到了消息之后,才认为本次写成功了。如果没满足这个条件,生产者会自动不断的重试,重试无限次。 参考芋道 Spring Boot 消息队列 Kafka 入门 参考芋道 Kafka 极简入门 参考消息队列之 Kafka 参考Kafka 消费者如何分配分区 参考github仓库advanced-java
2.10 消费进度的提交机制
// ContainerProperties#AckMode.java
public enum AckMode {
// ========== 自动提交 ==========
RECORD,
BATCH,
TIME,
COUNT,
COUNT_TIME,
// ========== 手动提交 ==========
MANUAL,
MANUAL_IMMEDIATE,
}
spring:
# Kafka 配置项,对应 KafkaProperties 配置类
kafka:
bootstrap-servers: 127.0.0.1:9092 # 指定 Kafka Broker 地址,可以设置多个,以逗号分隔
# Kafka Producer 配置项
producer:
acks: 1 # 0-不应答。1-leader 应答。all-所有 leader 和 follower 应答。
retries: 3 # 发送失败时,重试发送的次数
key-serializer: org.apache.kafka.common.serialization.StringSerializer # 消息的 key 的序列化
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer # 消息的 value 的序列化
# Kafka Consumer 配置项
consumer:
auto-offset-reset: earliest # 设置消费者分组最初的消费进度为 earliest 。可参考博客 https://blog.csdn.net/lishuangzhe7047/article/details/74530417 理解
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
properties:
spring:
json:
trusted:
packages: cn.iocoder.springboot.lab03.kafkademo.message
# Kafka Consumer Listener 监听器配置
listener:
missing-topics-fatal: false # 消费监听接口监听的主题不存在时,默认会报错。所以通过设置为 false ,解决报错
logging:
level:
org:
springframework:
kafka: ERROR # spring-kafka INFO 日志太多了,所以我们限制只打印 ERROR 级别
apache:
kafka: ERROR # kafka INFO 日志太多了,所以我们限制只打印 ERROR 级别
3 消息重复消费与幂等性
3.1 重复消费的问题
3.2 幂等性



