栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

kafka多线程并发消费处理

kafka多线程并发消费处理

1、在kafka异步处理数据的时候,为了提高kafka的效率,通常是一口气拉取批量数据进行计算,但是kafka分区数的有限决定了消费者的数量限制,简单的增加消费者数量无法获取到性能的提升,此时需要将批量数据进行分批多线程处理,并在多个线程执行完毕之后再统一提交偏移量

以下是kafka的消费端的配置

kafka.consumer.servers: 192.168.0.1:9092
kafka.consumer.enable.auto.commit: false
kafka.consumer.session.timeout: 15000
kafka.consumer.auto.commit.interval: 100
kafka.consumer.auto.offset.reset: latest
kafka.consumer.concurrency: 10
kafka.consumer.maxPollRecordsConfig: 100
kafka.consumer.group.id: test01

为了并发处理,在此本人采用了countDownLaunch类来实现,原理: CountDownLatch 定义了一个计数器,和一个阻塞队列, 当计数器的值递减为0之前,阻塞队列里面的线程处于挂起状态,当计数器递减到0时会唤醒阻塞队列所有线程,这里的计数器是一个标志,可以表示一个任务一个线程,也可以表示一个倒计时器,CountDownLatch可以解决那些一个或者多个线程在执行之前必须依赖于某些必要的前提业务先执行的场景

countDownLatch的常规方法说明:

CountDownLatch(int count); //构造方法,创建一个值为count 的计数器。
await();//阻塞当前线程,将当前线程加入阻塞队列。
await(long timeout, TimeUnit unit);//在timeout的时间之内阻塞当前线程,时间一过则当前线程可以执行,
countDown();//对计数器进行递减1操作,当计数器递减至0时,当前线程会去唤醒阻塞队列里的所有线程。

由于分批计算需要一定的代码量,如果每个业务模块都写一块代码,就显得重复代码太多,不满足设计原则,所以在此采用了模板方法模式,以下是定义的模板方法类

public abstract class CountDownTemplate {
    public void dealData(List data){};
}

以下是工具类的使用方法,需要传入需要同时进行处理的线程数dealThread以及每个线程需要处理的数据条目pageSize

public static void main(String[] args) {
    List records = new ArrayList<>();
    int size = 23;
    int pageSize = 5;
    int dealThread = (int) Math.ceil(size / 5.0);


    for (int i = 0; i < size; i++) {
        records.add(i + ".");
    }
    testCountDownLaunch(records, new CountDownTemplate() {
        @Override
        public void dealData(List data) {
            System.out.println(data);
        }
    }, dealThread, pageSize);
    System.out.println("执行完毕");
}

以下是工具类的实现代码

static void testCountDownLaunch(List records, CountDownTemplate countDownTemplate, int dealThread, int pageSize) {
    CountDownLatch countDownLatch = new CountDownLatch(dealThread);
    try {
        for (int i = 0; i < dealThread; i++) {
            int start, end;
            start = i * pageSize;
            if (start > records.size()) {
                start = records.size();
            }
            end = (i + 1) * pageSize;
            if (end > records.size()) {
                end = records.size();
            }


            List dealList = records.subList(start, end);
            new Thread(() -> {
                countDownTemplate.dealData(dealList);
                countDownLatch.countDown();
            }).start();
        }
    } catch (Exception e) {
        e.printStackTrace();
    }


    try {
        countDownLatch.await();
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/612462.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号