栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

kafka 多线程消费单节点数据

kafka 多线程消费单节点数据

背景

最近工作中遇到个问题,kafka的消费跟不上,导致大量数据堆积

恰好我们的业务对数据实时性要求比较高,消费能力跟不上使数据延迟了3天才处理完,因此被嘲讽

然而这里又遇到个问题,kafka的发送方式另一个部门的,沟通协商后,无法用新增分区数的方式来解决,只能另想办法

解决思路

后来看代码发现了问题,之前做这个功能的同事在消费的onMessage 方法中,写了太多的业务处理逻辑,处理时长甚至达到10s,这样的话肯定影响kafka的消费,所以最好的处理方式就是快速保存kafka消息,然后多线程处理业务逻辑。

代码

1.保存kafka数据至数据库

    
@Override
    public void onMessage(ConsumerRecord record, Acknowledgment acknowledgment) {
        long start = System.currentTimeMillis();
        int partition = record.partition();
        long offset = record.offset();
        String topic = record.topic();
        System.out.println("kafka消费进入====onMessage=================n" + "partition:" + partition + "==offset:" + offset +"==topic:" + topic +"==groupId:" + groupId +"n========================");
        alarmInfoMapper.insertKafkaLog(partition, offset, topic, groupId);
        System.out.println("kafka消费进入=====================n" + record.value() + "n========================");

        //插入kafka日志消息表 消息体略,需配合kafka消息类
        KafkaInfo kInfo = new KafkaInfo();
        odo.insertKafkalnfo(kInfo);

        //手动提交
        acknowledgment.acknowledge();
        log.info("kafka消费====onMessage=================完成时间:" + (System.currentTimeMillis() - start));
    }

2.多线程处理业务逻辑

  //多线程池
  private static ExecutorService executor = new ThreadPoolExecutor(5, 7,
            0L, TimeUnit.MILLISECONDS,
            new linkedBlockingQueue<>(1024),
            new ThreadPoolExecutor.DiscardOldestPolicy());
@Scheduled(cron="0/2 * * * * ?")
public void syncGetKafkaInfoToMessage() {
        try {
            log.info("开始执行处理信息");
            Integer number = syncGetKafkaInfoToMessage();
            log.info("结束执行处理信息:{}", number);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }


    public Integer syncGetKafkaInfoToMessage(){
        Integer num = 0;
        Integer whileNum = 0;
        //while循环处理kafka信息,直到数据处理结束,再等下次定时任务执行
        while (whileNum == 0){
            long start = System.currentTimeMillis();
            //redis锁开始...
            //获取所有的kafka信息
            List kafkaInfoList = selectNoToAlarmKafkaInfo();
            //没有数据时跳出循环
            if(CollectionUtils.isEmpty(kafkaInfoList)){
                whileNum = 1;
            }
            //修改状态为修改中,更新状态为1
            List ids = kafkaInfoList.stream().map(u-> u.getId()).collect(Collectors.toList());
            updateKafkaInfoByIds(ids,1);
            //redis锁结束...
            logger.info("多线程处理数据========n"+ Arrays.toString(kafkaInfoList.stream().map(u-> u.getId()).toArray()) +"n========");
            CopyOnWriteArrayList resultList = new CopyOnWriteArrayList<>();
            CompletableFuture[] completableFutures = kafkaInfoList.stream().map(kafkaInfo -> CompletableFuture.runAsync(() -> {
                try {
                    try {
                        //这里处理业务逻辑,根据各自情况就不贴代码了
                        getKafkaInfoToMessage(kafkaInfo);
                    } catch (Exception e) {
                        logger.error("多线程处理数据报错========n"+ item.getContext() +"n========" + e.getMessage());
                        e.printStackTrace();
                        //将kafka数据处理有问题的存到处理错误的表中 方便以后查问题
                        KafkaInfoError error = new KafkaInfoError(item,e.getMessage());
                        insertKafkaInfoError(error);
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
                resultList.add(item.getId());
            }, executor)).toArray(CompletableFuture[]::new);
            CompletableFuture.allOf(completableFutures).join();
            //修改状态为已处理,更新状态为2
            num += updateKafkaInfoByIds(resultList.stream().collect(Collectors.toList()),2);
            log.info("多线程处理告警数据=================完成时间:" + (System.currentTimeMillis() - start));
        }
        return num;
    }



        update t_kafka_info
        set to_alarm = #{val}
        
            id in
            
                #{item}
            
        
    
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/762041.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号