- 一、kafka消费者提交的内容
- 二、自动提交
- 三、自动提交代码示例
- 四、手动提交
- 五、手动同步提交代码示例
- 六、手动异步提交代码示例
- 消费组+消费的某个主题+消费的某个分区及消费的偏移量
消费者无论是自动提交还是手动提交,都需要把所属的消费组+消费的某个主题+消费的某个分区及消费的偏移量,这样的信息提交到集群_consumer_offsets主题里面。
二、自动提交//指定了消费者是否自动提交消费位移,默认为true。
// 如果需要减少重复消费或者数据丢失,你可以设置为false。
// 如果为true,需要关注自动提交的时间间隔,该间隔由auto.commit.interval.ms设置。
props.put("enable.auto.commit",true);
props.put("auto.commit.interval.ms","1000");
自动提交:消息poll下来以后,直接提交offset:
- enable.auto.commit (bool) :如果为True,将自动定时提交消费者offset。默认为True。
- auto.commit.interval.ms(int) :自动提交offset之间的间隔毫秒数。如果enable_auto_commit 为true,默认值为: 5000。
- 当设置 enable.auto.commit 为 true,Kafka 会保证在开始调用 poll 方法时,提交上次 poll 返回的所有消息。从顺序上来说,poll 方法的逻辑是先提交上一批消息的位移,再处理下一批消息,因此它能保证不出现消费丢失的情况
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class MyConsumer {
private final static String TOPIC_NAME = "optics-topic";
public static void main(String[] args) throws ExecutionException, InterruptedException {
Properties props = new Properties();
//设置kafka集群的地址
props.put("bootstrap.servers", "10.129.88.26:9092,10.129.88.32:9092,10.129.88.39:9092");
props.put("sasl.jaas.config","org.apache.kafka.common.security.plain.PlainLoginModule " +
"required username="debezium" password="NGFlM2I1NTJlNmFk";");
props.put("security.protocol","SASL_PLAINTEXT");
props.put("sasl.mechanism","PLAIN");
//消费者组
props.put("group.id", "opticsgroup1");
//反序列化
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//指定了消费者是否自动提交消费位移,默认为true。
// 如果需要减少重复消费或者数据丢失,你可以设置为false。
// 如果为true,需要关注自动提交的时间间隔,该间隔由auto.commit.interval.ms设置。
props.put("enable.auto.commit",true);
props.put("auto.commit.interval.ms","1000");
props.put("max.poll.records",500);
//可以根据消费速度的快慢来设置,因为如果两次poll的时间超出了30s的时间间隔,kafka会
//认为消费能力过弱,将其踢出消费组,将分区分配给其他消费者
props.put("max.poll.interval.ms",30*1000);
//consumer给broker发送心跳的间隔时刻
props.put("heartbeat.interval.ms",1000);
//kafka如果超过10秒没有收到消费者宕心跳,则会把消费者提出消费者组,进行rebalance,把分区分配给其他消费者
props.put("session.timeout.ms",10*1000);
//新消费组从头消费
props.put("auto.offset.reset","earliest");
//创建消费者
KafkaConsumer consumer = new KafkaConsumer(props);
//订阅主题
consumer.subscribe(Arrays.asList(TOPIC_NAME));
while (true) {
ConsumerRecords records = consumer.poll(Duration.ofMillis(1000));
for(ConsumerRecord record : records){
System.out.printf("topic = %s, partition = %s, offset = %d, key = %s, value =%sn",
record.topic(), record.partition(), record.offset(), record.key(), record.value());
}
}
}
}
四、手动提交
props.put("enable.auto.commit",false);
手动提交:在消费消息后再提交offset。
- 手动同步提交:阻塞到集群返回ack
- 手动异步提交:在消息消费完后提交,不需要等到集群ack,直接执行之后的逻辑,可以设置一个回调方法,供集群调用
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class MyConsumer {
private final static String TOPIC_NAME = "optics-topic";
public static void main(String[] args) throws ExecutionException, InterruptedException {
Properties props = new Properties();
//设置kafka集群的地址
props.put("bootstrap.servers", "10.129.88.26:9092,10.129.88.32:9092,10.129.88.39:9092");
props.put("sasl.jaas.config","org.apache.kafka.common.security.plain.PlainLoginModule " +
"required username="debezium" password="NGFlM2I1NTJlNmFk";");
props.put("security.protocol","SASL_PLAINTEXT");
props.put("sasl.mechanism","PLAIN");
//消费者组
props.put("group.id", "opticsgroup1");
//反序列化
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//指定了消费者是否自动提交消费位移,默认为true。
// 如果需要减少重复消费或者数据丢失,你可以设置为false。
// 如果为true,需要关注自动提交的时间间隔,该间隔由auto.commit.interval.ms设置。
props.put("enable.auto.commit",true);
props.put("auto.commit.interval.ms","1000");
props.put("max.poll.records",500);
//可以根据消费速度的快慢来设置,因为如果两次poll的时间超出了30s的时间间隔,kafka会
//认为消费能力过弱,将其踢出消费组,将分区分配给其他消费者
props.put("max.poll.interval.ms",30*1000);
//consumer给broker发送心跳的间隔时刻
props.put("heartbeat.interval.ms",1000);
//kafka如果超过10秒没有收到消费者宕心跳,则会把消费者提出消费者组,进行rebalance,把分区分配给其他消费者
props.put("session.timeout.ms",10*1000);
//新消费组从头消费
props.put("auto.offset.reset","earliest");
//创建消费者
KafkaConsumer consumer = new KafkaConsumer(props);
//订阅主题
consumer.subscribe(Arrays.asList(TOPIC_NAME));
while (true) {
ConsumerRecords records = consumer.poll(Duration.ofMillis(1000));
for(ConsumerRecord record : records){
System.out.printf("topic = %s, partition = %s, offset = %d, key = %s, value =%sn",
record.topic(), record.partition(), record.offset(), record.key(), record.value());
}
//所有消息已经消费完
if(records.count() > 0){ //有消息
consumer.commitSync(); //阻塞提交成功
}
}
}
}
六、手动异步提交代码示例
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import java.time.Duration;
import java.util.Arrays;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class MyConsumer {
private final static String TOPIC_NAME = "optics-topic";
public static void main(String[] args) throws ExecutionException, InterruptedException {
Properties props = new Properties();
//设置kafka集群的地址
props.put("bootstrap.servers", "10.129.88.26:9092,10.129.88.32:9092,10.129.88.39:9092");
props.put("sasl.jaas.config","org.apache.kafka.common.security.plain.PlainLoginModule " +
"required username="debezium" password="NGFlM2I1NTJlNmFk";");
props.put("security.protocol","SASL_PLAINTEXT");
props.put("sasl.mechanism","PLAIN");
//消费者组
props.put("group.id", "opticsgroup1");
//反序列化
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//指定了消费者是否自动提交消费位移,默认为true。
// 如果需要减少重复消费或者数据丢失,你可以设置为false。
// 如果为true,需要关注自动提交的时间间隔,该间隔由auto.commit.interval.ms设置。
props.put("enable.auto.commit",true);
props.put("auto.commit.interval.ms","1000");
props.put("max.poll.records",500);
//可以根据消费速度的快慢来设置,因为如果两次poll的时间超出了30s的时间间隔,kafka会
//认为消费能力过弱,将其踢出消费组,将分区分配给其他消费者
props.put("max.poll.interval.ms",30*1000);
//consumer给broker发送心跳的间隔时刻
props.put("heartbeat.interval.ms",1000);
//kafka如果超过10秒没有收到消费者宕心跳,则会把消费者提出消费者组,进行rebalance,把分区分配给其他消费者
props.put("session.timeout.ms",10*1000);
//新消费组从头消费
props.put("auto.offset.reset","earliest");
//创建消费者
KafkaConsumer consumer = new KafkaConsumer(props);
//订阅主题
consumer.subscribe(Arrays.asList(TOPIC_NAME));
while (true) {
ConsumerRecords records = consumer.poll(Duration.ofMillis(1000));
for(ConsumerRecord record : records){
System.out.printf("topic = %s, partition = %s, offset = %d, key = %s, value =%sn",
record.topic(), record.partition(), record.offset(), record.key(), record.value());
}
//所有消息已经消费完
if(records.count() > 0){ //有消息
//手动异步提交offset,当前线程提交offset不会阻塞,可以继续处理后面的程序逻辑
consumer.commitAsync(new OffsetCommitCallback() {
@Override
public void onComplete(Map map, Exception e) {
if (e != null){
System.err.println("Commit failed for " + map);
System.err.println("Commit failed exception: " + e.getStackTrace());
}
}
});
}
}
}
}



