1、POM文件导入
org.apache.kafka kafka_2.122.1.1
2、yml配置文件这样写
kafka:
consumer:
enable-auto-commit: true
group-id: kafkaProducer
auto-commit-interval: 1000
auto-offset-reset: latest
bootstrap-servers: ip:prot
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
concurrency: 3
3、kafka配置类
package com.harzone.kafka;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;
import java.util.Arrays;
import java.util.Properties;
@Component
public class KafkaConsumerConfig implements ApplicationRunner {
@Value("${kafka.consumer.bootstrap-servers}")
private String servers;
@Value("${kafka.consumer.enable-auto-commit}")
private boolean enableAutoCommit;
@Value("${kafka.consumer.auto-commit-interval}")
private String autoCommitInterval;
@Value("${kafka.consumer.group-id}")
private String groupId;
@Value("${kafka.consumer.auto-offset-reset}")
private String autoOffsetReset;
@Value("${kafka.consumer.concurrency}")
private int concurrency;
private final String topic = "test_producer";
Properties props;
private KafkaConsumer kafkaConsumer;
private Properties initProperties() {
// zookeeper 配置
props = new Properties();
props.put("bootstrap.servers", servers);
// group 代表一个消费组
props.put("group.id", "kafkaProducer");
props.put("session.timeout.ms", "30000");
// 往zookeeper上写offset的频率
props.put("auto.commit.interval.ms", "1000");
// key的反序列化类型
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
return props;
}
@Override
public void run(ApplicationArguments args) throws Exception {
//初始化kafka链接
initProperties();
//Runtime.getRuntime().availableProcessors()线程数量
KafkaConsumerThread consumerThread = new KafkaConsumerThread(props, topic, Runtime.getRuntime().availableProcessors() - 1);
consumerThread.start();
}
}
4、kafka多线程拉取实现
package com.harzone.kafka;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class KafkaConsumerThread extends Thread {
private static final Logger log = LoggerFactory.getLogger(KafkaConsumerThread.class);
private KafkaConsumer kafkaConsumer;
//线程池
private ExecutorService executorService;
//线程数
private int threadNumber;
static List list = new ArrayList<>();
//有参构造函数,初始化数据配置
public KafkaConsumerThread(Properties properties, String topic, int availableProcessors) {
kafkaConsumer = new KafkaConsumer(properties);
//subscribe从最新处消费(assign从最后处消费)----singletonList:返回一个不可变的列表
kafkaConsumer.subscribe(Collections.singletonList(topic));
this.threadNumber = availableProcessors;
//ThreadPoolExecutor线程池(核心线程池大小,最大线程数,线程最大空闲时间,时间单位,线程等待队列,拒绝策略)
executorService = new ThreadPoolExecutor(threadNumber, threadNumber, 0L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(1000), new ThreadPoolExecutor.CallerRunsPolicy());
}
@Override
public void run() {
try {
while (true) {
//poll(多长时间拉取一次ms)
ConsumerRecords records = kafkaConsumer.poll(Duration.ofMillis(10));
if (!records.isEmpty()) {
executorService.submit(new RecordsHandler(records));
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
kafkaConsumer.close();
}
}
}



