栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

多线程运行项目时,统计数据量----AtomicLong

多线程运行项目时,统计数据量----AtomicLong

多线程运行,记录流动数据量;

本项目属于kafka消费者,每天需要消费的数据多则达到上亿级别,所以开设的多线程去消费数据,目前的需求是需要记录kafka每个主题每天消费的数据量;
因为kafka每个主题(topic)下面会有多个分区(pratition),多个消费者(一个消费者相当于一个线程)同时去消费这些分区,所以在统计数据量的时候,需要整合分区统计出每个主题总消费数据量。
又因为是多个主题(topic)每个主题下多个(分区),消费者也是多个,所以:
代码如下:

//定义多主题多线程poll数据Map(全局变量)
public static Map atomicLongMap = new HashMap<>();

在kafka消费者监听方法中:

//每个主题定义一个多线程数据量统计Map(如果消费的数据所属的topic已经创建了AtomicLong,则记录++,如果所属的topic还没有AtomicLong,则创建)
if (atomicLongMap.isEmpty() || atomicLongMap.get(topic) == null){
    atomicLongMap.put(topic,new AtomicLong(0));
}
//多线程统计数据量(根据topic统计每个topic的数据量)
Long data_amt = atomicLongMap.get(record.topic()).getAndIncrement();

data_amt 即为多线程每个主题消费到的数据总量

AtomicLong atomicLong = new AtomicLong();
Long count = atomicLong.getAndIncrement();
(.getAndIncrement() 该方法即多线程统计数据量)

----END----

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/355026.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号