1、在kafka异步处理数据的时候,为了提高kafka的效率,通常是一口气拉取批量数据进行计算,但是kafka分区数的有限决定了消费者的数量限制,简单的增加消费者数量无法获取到性能的提升,此时需要将批量数据进行分批多线程处理,并在多个线程执行完毕之后再统一提交偏移量
以下是kafka的消费端的配置
kafka.consumer.servers: 192.168.0.1:9092 kafka.consumer.enable.auto.commit: false kafka.consumer.session.timeout: 15000 kafka.consumer.auto.commit.interval: 100 kafka.consumer.auto.offset.reset: latest kafka.consumer.concurrency: 10 kafka.consumer.maxPollRecordsConfig: 100 kafka.consumer.group.id: test01
为了并发处理,在此本人采用了countDownLaunch类来实现,原理: CountDownLatch 定义了一个计数器,和一个阻塞队列, 当计数器的值递减为0之前,阻塞队列里面的线程处于挂起状态,当计数器递减到0时会唤醒阻塞队列所有线程,这里的计数器是一个标志,可以表示一个任务一个线程,也可以表示一个倒计时器,CountDownLatch可以解决那些一个或者多个线程在执行之前必须依赖于某些必要的前提业务先执行的场景
countDownLatch的常规方法说明:
CountDownLatch(int count); //构造方法,创建一个值为count 的计数器。 await();//阻塞当前线程,将当前线程加入阻塞队列。 await(long timeout, TimeUnit unit);//在timeout的时间之内阻塞当前线程,时间一过则当前线程可以执行, countDown();//对计数器进行递减1操作,当计数器递减至0时,当前线程会去唤醒阻塞队列里的所有线程。
由于分批计算需要一定的代码量,如果每个业务模块都写一块代码,就显得重复代码太多,不满足设计原则,所以在此采用了模板方法模式,以下是定义的模板方法类
public abstract class CountDownTemplate {
public void dealData(List data){};
}
以下是工具类的使用方法,需要传入需要同时进行处理的线程数dealThread以及每个线程需要处理的数据条目pageSize
public static void main(String[] args) {
List records = new ArrayList<>();
int size = 23;
int pageSize = 5;
int dealThread = (int) Math.ceil(size / 5.0);
for (int i = 0; i < size; i++) {
records.add(i + ".");
}
testCountDownLaunch(records, new CountDownTemplate() {
@Override
public void dealData(List data) {
System.out.println(data);
}
}, dealThread, pageSize);
System.out.println("执行完毕");
}
以下是工具类的实现代码
static void testCountDownLaunch(List records, CountDownTemplate countDownTemplate, int dealThread, int pageSize) {
CountDownLatch countDownLatch = new CountDownLatch(dealThread);
try {
for (int i = 0; i < dealThread; i++) {
int start, end;
start = i * pageSize;
if (start > records.size()) {
start = records.size();
}
end = (i + 1) * pageSize;
if (end > records.size()) {
end = records.size();
}
List dealList = records.subList(start, end);
new Thread(() -> {
countDownTemplate.dealData(dealList);
countDownLatch.countDown();
}).start();
}
} catch (Exception e) {
e.printStackTrace();
}
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}



