最近工作中遇到个问题,kafka的消费跟不上,导致大量数据堆积
恰好我们的业务对数据实时性要求比较高,消费能力跟不上使数据延迟了3天才处理完,因此被嘲讽
然而这里又遇到个问题,kafka的发送方式另一个部门的,沟通协商后,无法用新增分区数的方式来解决,只能另想办法
解决思路后来看代码发现了问题,之前做这个功能的同事在消费的onMessage 方法中,写了太多的业务处理逻辑,处理时长甚至达到10s,这样的话肯定影响kafka的消费,所以最好的处理方式就是快速保存kafka消息,然后多线程处理业务逻辑。
代码1.保存kafka数据至数据库
@Override
public void onMessage(ConsumerRecord record, Acknowledgment acknowledgment) {
long start = System.currentTimeMillis();
int partition = record.partition();
long offset = record.offset();
String topic = record.topic();
System.out.println("kafka消费进入====onMessage=================n" + "partition:" + partition + "==offset:" + offset +"==topic:" + topic +"==groupId:" + groupId +"n========================");
alarmInfoMapper.insertKafkaLog(partition, offset, topic, groupId);
System.out.println("kafka消费进入=====================n" + record.value() + "n========================");
//插入kafka日志消息表 消息体略,需配合kafka消息类
KafkaInfo kInfo = new KafkaInfo();
odo.insertKafkalnfo(kInfo);
//手动提交
acknowledgment.acknowledge();
log.info("kafka消费====onMessage=================完成时间:" + (System.currentTimeMillis() - start));
}
2.多线程处理业务逻辑
//多线程池
private static ExecutorService executor = new ThreadPoolExecutor(5, 7,
0L, TimeUnit.MILLISECONDS,
new linkedBlockingQueue<>(1024),
new ThreadPoolExecutor.DiscardOldestPolicy());
@Scheduled(cron="0/2 * * * * ?")
public void syncGetKafkaInfoToMessage() {
try {
log.info("开始执行处理信息");
Integer number = syncGetKafkaInfoToMessage();
log.info("结束执行处理信息:{}", number);
} catch (Exception e) {
e.printStackTrace();
}
}
public Integer syncGetKafkaInfoToMessage(){
Integer num = 0;
Integer whileNum = 0;
//while循环处理kafka信息,直到数据处理结束,再等下次定时任务执行
while (whileNum == 0){
long start = System.currentTimeMillis();
//redis锁开始...
//获取所有的kafka信息
List kafkaInfoList = selectNoToAlarmKafkaInfo();
//没有数据时跳出循环
if(CollectionUtils.isEmpty(kafkaInfoList)){
whileNum = 1;
}
//修改状态为修改中,更新状态为1
List ids = kafkaInfoList.stream().map(u-> u.getId()).collect(Collectors.toList());
updateKafkaInfoByIds(ids,1);
//redis锁结束...
logger.info("多线程处理数据========n"+ Arrays.toString(kafkaInfoList.stream().map(u-> u.getId()).toArray()) +"n========");
CopyOnWriteArrayList resultList = new CopyOnWriteArrayList<>();
CompletableFuture[] completableFutures = kafkaInfoList.stream().map(kafkaInfo -> CompletableFuture.runAsync(() -> {
try {
try {
//这里处理业务逻辑,根据各自情况就不贴代码了
getKafkaInfoToMessage(kafkaInfo);
} catch (Exception e) {
logger.error("多线程处理数据报错========n"+ item.getContext() +"n========" + e.getMessage());
e.printStackTrace();
//将kafka数据处理有问题的存到处理错误的表中 方便以后查问题
KafkaInfoError error = new KafkaInfoError(item,e.getMessage());
insertKafkaInfoError(error);
}
} catch (Exception e) {
e.printStackTrace();
}
resultList.add(item.getId());
}, executor)).toArray(CompletableFuture[]::new);
CompletableFuture.allOf(completableFutures).join();
//修改状态为已处理,更新状态为2
num += updateKafkaInfoByIds(resultList.stream().collect(Collectors.toList()),2);
log.info("多线程处理告警数据=================完成时间:" + (System.currentTimeMillis() - start));
}
return num;
}
update t_kafka_info set to_alarm = #{val} id in #{item}



