栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

Kafka生产者生产消息过慢导致的并发问题

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Kafka生产者生产消息过慢导致的并发问题

1、问题背景

我负责的服务A中,有一个监听系统B发送的MQ消息,在打平消息体的结构后存储到本地数据库,并写入缓存的操作,其中消息体的结构是数据库以及缓存key结构的聚合状态。

例如:消息体为四个字段 1、[2,3,4]、[5,6]、7、8时,数据结构数据就根据[2,3,4]和[5,6],完全打平存储到数据中,而缓存为:1_2_5:[7],[8]、1_3_5:[7],[8]等等。

可能产生的问题点:服务A有两台机器,针对缓存key的维度,两条消息是可能存在同一维度的数据的,因此可能因为并发产生写入缓存时数据被覆盖的问题。

考虑到B系统发送消息的频率很低,低于1QPS,因此认为并发存在的可能性很低,只是增加了巡检数据库与缓存数据不一致的情况进行告警提醒功能。

2、问题描述

某日接到告警,检测到缓存数据与数据库不一致,查看MQ发生消息的记录,发现是有一条消息的数据被覆盖了。而且值得关注的是,这两条数据从Kafka发送到消费组的时间是完全一致的。通过查看B系统的发送记录,其实这两条消息并不是同一时刻被发送到kafka的。

3、问题原因

通过问题描述发现,应该是kafka将消息进行了聚合发送,导致了并发问题的产生,追根溯源,就要考虑到kafka消息发送的特性了。

在kafka的消息发送流程,是一个异步发送的过程。共涉及两个线程:main线程和Sender线程和一个线程共享变量RecordAccumulator(计数累加器)。而发送的过程是main线程发消息给RecordAccumulator,Sender从RecordAccumulator拉取消息发送到kafka的broker。

在main线程中的流程:拦截->序列化->分区

send(ProducerRecord方法)Interceptors(过滤器)Serializer(序列化器)Partitioner(分区器)

发送流程:

 而Serder线程发送消息并不是有消息就往broker的,为了提高效率,Serder会在RecordAccumulator中的数据到达一定数量后,才会发送数据,或者一直等待一段直接都没达到要求的数据量,才会发送消息。

相关参数:

batch.size:只有数据累积到batch.size之后,sender才会发送数据

linger.ms:如果数据迟迟未达到batch.size,sender等待linger.time之后就会发送数据。

因此,本次问题的原因就是B系统发送消息的频率过低,导致Sender线程会等数据累积后一起发送,就导致了A系统的并发现象。

4、解决方案

常规对于并发问题的解决方案,一般都是使用同步锁,但是由于场景的特殊性,加锁就必须加在消息体结构下,而这时候的锁粒度对于业务来说是非常巨大的,相当于对于字段1加锁,会导致A系统基本上处于一个单线程的状态,因此采取了相对代价较小,但是并不那么常规的解法,即在巡检到数据异常后,直接使用数据库数据,覆盖缓存数据。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/708509.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号