幂等性是指,对于Producer生产的同一条消息,至多会被Kafka持久化一次,避免因网络重试等情况导致消息重复,例如用户下单,幂等性可以保证用户不会重复下单。
但是Kafka的幂等性只针对单会话,当一个Producer异常退出并重启后,两个会话重叠的消息是不保证幂等性的,例如,当一条消息发送完后客户端异常退出了,此时客户端并不知道这条消息已经被持久化了,那么在重启后,客户端可能仍然会重发这条消息,而Server将会认为这是一个新的生产者,此消息会被认为是一条全新的消息,此时将会出现重复。
幂等性要求的是消息不重复,那么可以很容易的联想到对于每一条消息带一个唯一ID,且需要是连续自增ID(自增可确认是否存在丢消息的情况,但不能保证不丢,不丢需要Producer失败重试来实现),由Server来判断发送来的消息的ID是否重复。除了唯一ID,Server还需要对Producer进行区分,即需要知道发送过来的消息来自哪个Producer。也就是说幂等需要的两个重要机制为:
- Producer的唯一ID(PID)
- 每个会话中每条消息的唯一ID(sequenceId)
在Producer启动时,需要先获取PID,而PID是需要保证全局唯一的,因此PID是从Server端产生的,产生规则为:
- 每一个Kafka broker都会在本地维护一个PID段,当本地的PID段用完后,会再去ZK上申请(一般每次申请1000个)。
- 当Kafka broker向ZK申请PID段时,会先获取ZK中保存的当前PID段最大值,然后尝试将要申请的PID段写回ZK,这样其他Broker在尝试申请时,不会出现PID段重复的情况。
- Producer重启后,PID也会重新生成,这也是幂等不能跨回话的主要原因。
sequenceId是某条/批消息的属性,对于同一个会话/PID来说,sequenceId是单调递增,且没有断点的。当Server收到produce请求后,会做两步检查:
- 首先Server中会缓存最近五次produce请求的sequenceId,可以通过此缓存信息检查sequenceId是否重复,重复则会返回错误。
- 如果没有重复的情况,那么会再去检查此次请求的sequenceId是否等于上次请求sequenceId+1,不是则认为消息出现丢失等情况,返回错误。
最容易想到的当然就是事务消息了,但其实现成本较高,我这里设想的是:
- 用户自行提供PID和消息的唯一ID,由Server缓存最近几次消息的唯一ID,用于去重。缺点是PID和SequenceId都需要用户来生成。



