Maven必须配置
org.apache.kafka kafka-clients2.4.1
Java实现
package com.qf.kafka;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
public class MyConsumer {
private final static String TOPIC_NAME = "my-replicated-topic";
private final static String CONSUMER_GROUP_NAME = "testGroup";
public static void main(String[] args) {
// 1.设置参数
// 使用 Properties对象 存放键值对(参数配置信息)
Properties props = new Properties();
// 键值对中存入 bootstrap_server => kafka节点或集群(用逗号隔开)
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.232.136:9092,192.168.232.136:9093,192.168.232.136:9094");
// 设置消费组名
props.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP_NAME);
// 把需要发送的key从字符串序列化为字节数组
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// 把需要发送的value从字符串序列化为字节数组
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// 2.创建消费者的客户端,传入参数(默认是最新+1开始消费)
KafkaConsumer consumer = new KafkaConsumer(props);
// 3.消费者订阅主题列表
consumer.subscribe(Arrays.asList(TOPIC_NAME));
while (true){
// 4.poll消息
ConsumerRecords records = consumer.poll(Duration.ofMillis(1000));
// 5.循环打印获取的消息
for(ConsumerRecord record: records){
System.out.printf("收到消息:partition = %d, offset = %d, key = %s, value = %s%n", record.partition(), record.offset(), record.key(), record.value());
}
}
}
}
手动同步提交offset
public class MyConsumer {
public static void main(String[] args) {
Properties props = new Properties();
// 设置手动提交offset
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
KafkaConsumer consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(TOPIC_NAME));
while (true){
ConsumerRecords records = consumer.poll(Duration.ofMillis(1000));
// 代码逻辑...
//手动同步提交
if (records.count() > 0){
// 当前线程会阻塞直到offset提交成功
// 一般使用同步提交,因为提交之后一般也没有什么逻辑代码了
consumer.commitSync();
}
}
}
}
手动异步提交offset
public class MyConsumer {
public static void main(String[] args) {
Properties props = new Properties();
// 设置手动提交offset
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
KafkaConsumer consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(TOPIC_NAME));
while (true){
ConsumerRecords records = consumer.poll(Duration.ofMillis(1000));
// 代码逻辑...
//手动异步提交
if (records.count() > 0){
consumer.commitAsync(new OffsetCommitCallback() {
@Override
public void onComplete(Map map, Exception e) {
if (e != null){
System.err.println("Commit failed for " + map);
System.err.println("Commit failed exception: " + e.getStackTrace());
}
}
});
}
}
}
}
指定分区消费
// 替换第3步,指定该主题0号分区进行消费 consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));
消息回溯消费
// 替换第3步,消息回溯消费 consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0))); consumer.seekToBeginning(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));
指定offset消费
// 替换第3步,指定offset消费 consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0))); consumer.seek(new TopicPartition(TOPIC_NAME, 0), 10);
从指定时间点消费
// 替换第3步,从指定时间点开始消费 // 1.获取主题下所有分区 ListtopicPartitions = consumer.partitionsFor(TOPIC_NAME); // 2.获取一小时起的时间点 long fetchDataTime = new Date().getTime() - 1000 * 60 * 60; // 3. 创建一个hashMap HashMap map = new HashMap<>(); // 4. 循环所有分区,将分区和时间点(分区=>1小时前时间点)存入hashMap中 for(PartitionInfo par: topicPartitions) map.put(new TopicPartition(TOPIC_NAME, par.partition()), fetchDataTime); // 5.根据分区和时间hashMap,拿到分区和偏移量(分区对象->偏移量对象)存入map中 Map parMap = consumer.offsetsForTimes(map); // 6.遍历键值对(key分区对象 -> value偏移量对象) for(Map.Entry entry: parMap.entrySet()){ // 7. 获取键(分区对象) TopicPartition key = entry.getKey(); // 8. 获取值(偏移量对象) OffsetAndTimestamp value = entry.getValue(); // 9. 校验,为空则跳过 if (key==null || value==null) continue; // 10.获取实际偏移量 long offset = value.offset(); // 11.打印分区和偏移量 System.out.println("partition-" + key.partition() + "|offset-" + offset); System.out.println(); // 12.偏移量不为Null则进入设置 if (value != null){ // 13.设置指定偏移量开始消费 consumer.assign(Arrays.asList(key)); consumer.seek(key, offset); } }



