目录
一、Offset自动控制
二、ACK&RETRIES(应答和重试机制)
三、幂等写(性)
四、生产者事务
五、生产者&消费者
一、Offset自动控制
Kafka消费者默认对于未订阅的topic的offset的时候,也就是系统并没有存储该消费者的消费分区的记录信息,默认Kafka消费者的默认首次消费策略:latest(最新)
auto.offset.reset=latest 自动偏移量的重置
-
erliest - 自动将偏移量重置为最早的偏移量
-
latest - 自动将偏移量重置为最新的偏移量
-
none - 如果未找到消费者组的先前偏移量,则向消费者抛出异常
latest:
package com.baron.offset;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Properties;
import java.util.regex.Pattern;
public class KafkaConsumerOffset_0 {
public static void main(String[] args) {
// 1、创建KafkaConsumer
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "CentOSA:9092,CentOSB:9092,CentOSC:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// 以组管理消费者
props.put(ConsumerConfig.GROUP_ID_CONFIG, "g3");
// offset默认配置是latest - 如果系统没有消费者的偏移量,系统会读取该分区最新的偏移量作为该组的偏移量
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
KafkaConsumer consumer = new KafkaConsumer<>(props);
// 2、订阅相关的topics
// consumer.subscribe(Pattern.compile("^topic.*"));
consumer.subscribe(Arrays.asList("topic01"));
// 3、遍历消息队列
while (true) {
ConsumerRecords consumerRecords = consumer.poll(Duration.ofSeconds(1));
if (!consumerRecords.isEmpty()) { // 从队列中取到了数据
Iterator> it = consumerRecords.iterator();
while (it.hasNext()) {
// 获取一个消息
ConsumerRecord record = it.next();
String topic = record.topic();
int partition = record.partition();
long offset = record.offset();
String key = record.key();
String value = record.value();
long timestamp = record.timestamp();
System.out.println(topic+"t"+partition+","+offset+"t"+key+""+value+timestamp);
}
}
}
}
}
earliest:
package com.baron.offset;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Properties;
public class KafkaConsumerOffset_1 {
public static void main(String[] args) {
// 1、创建KafkaConsumer
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "CentOSA:9092,CentOSB:9092,CentOSC:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// 以组管理消费者
props.put(ConsumerConfig.GROUP_ID_CONFIG, "g2");
// earliest - 如果系统没有消费者的偏移量,系统会读取该分区最早的偏移量
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
KafkaConsumer consumer = new KafkaConsumer<>(props);
// 2、订阅相关的topics
// consumer.subscribe(Pattern.compile("^topic.*"));
consumer.subscribe(Arrays.asList("topic01"));
// 3、遍历消息队列
while (true) {
ConsumerRecords consumerRecords = consumer.poll(Duration.ofSeconds(1));
if (!consumerRecords.isEmpty()) { // 从队列中取到了数据
Iterator> it = consumerRecords.iterator();
while (it.hasNext()) {
// 获取一个消息
ConsumerRecord record = it.next();
String topic = record.topic();
int partition = record.partition();
long offset = record.offset();
String key = record.key();
String value = record.value();
long timestamp = record.timestamp();
System.out.println(topic+"t"+partition+","+offset+"t"+key+""+value+timestamp);
}
}
}
}
}
Kafka消费者在消费数据的时候默认会定期的提交消费的偏移量,这样就可以保证所有的消息至少可以被消费者消费1次,用户可以通过一下两个参数配置:
-
enable.auto.commit = true 默认
-
auto.commit.interval.ms = 5000 默认
自动提交偏移量:
package com.baron.offset;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Properties;
public class KafkaConsumerOffset_2 {
public static void main(String[] args) {
// 1、创建KafkaConsumer
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "CentOSA:9092,CentOSB:9092,CentOSC:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// 以组管理消费者
props.put(ConsumerConfig.GROUP_ID_CONFIG, "g4");
// 配置offset自动提交的时间间隔,配置成10s自动提交 默认5s
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 10000);
// offset偏移量自动提交
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
// offset默认配置是latest - 如果系统没有消费者的偏移量,系统会读取该分区最新的偏移量作为该组的偏移量
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
KafkaConsumer consumer = new KafkaConsumer<>(props);
// 2、订阅相关的topics
// consumer.subscribe(Pattern.compile("^topic.*"));
consumer.subscribe(Arrays.asList("topic01"));
// 3、遍历消息队列
while (true) {
ConsumerRecords consumerRecords = consumer.poll(Duration.ofSeconds(1));
if (!consumerRecords.isEmpty()) { // 从队列中取到了数据
Iterator> it = consumerRecords.iterator();
while (it.hasNext()) {
// 获取一个消息
ConsumerRecord record = it.next();
String topic = record.topic();
int partition = record.partition();
long offset = record.offset();
String key = record.key();
String value = record.value();
long timestamp = record.timestamp();
System.out.println(topic+"t"+partition+","+offset+"t"+key+""+value+timestamp);
}
}
}
}
}
如果用户需要自己管理offset的自动提交,可以关闭offset的自动提交,手动管理offset提交的偏移量,注意用户提交的offset偏移量永远都要比本次消费的偏移量+1,因为提交的offset是Kafka消费者下一次抓取数据的位置。
消费者手动提交偏移量:
package com.baron.offset;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.*;
public class KafkaConsumerOffset_2 {
public static void main(String[] args) {
// 1、创建KafkaConsumer
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "CentOSA:9092,CentOSB:9092,CentOSC:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// 以组管理消费者
props.put(ConsumerConfig.GROUP_ID_CONFIG, "g5");
// 配置offset自动提交的时间间隔,配置成10s自动提交 默认5s
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 10000);
// offset偏移量自动提交
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
// offset默认配置是latest - 如果系统没有消费者的偏移量,系统会读取该分区最新的偏移量作为该组的偏移量
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
KafkaConsumer consumer = new KafkaConsumer<>(props);
// 2、订阅相关的topics
// consumer.subscribe(Pattern.compile("^topic.*"));
consumer.subscribe(Arrays.asList("topic01"));
// 3、遍历消息队列
while (true) {
ConsumerRecords consumerRecords = consumer.poll(Duration.ofSeconds(1));
if (!consumerRecords.isEmpty()) { // 从队列中取到了数据
Iterator> it = consumerRecords.iterator();
// 自动提交偏移量,记录分区的消费元数据信息
Map offsets = new HashMap<>();
while (it.hasNext()) {
// 获取一个消息
ConsumerRecord record = it.next();
String topic = record.topic();
int partition = record.partition();
long offset = record.offset();
String key = record.key();
String value = record.value();
long timestamp = record.timestamp();
// 记录消费分区的偏移量元数据,一定在提交的时候偏移量信息offset+1
offsets.put(new TopicPartition(topic, partition), new OffsetAndmetadata(offset+1));
// 提交消费者偏移量
consumer.commitAsync(offsets, new OffsetCommitCallback() {
@Override
public void onComplete(Map offsets, Exception exception) {
System.out.println("offsets:" + offsets + "t exception:" + exception);
}
});
System.out.println(topic+"t"+partition+","+offset+"t"+key+""+value+timestamp);
}
}
}
}
}
二、ACK&RETRIES(应答和重试机制)
Kafka生产者在发送完一个消息之后,要求leader的broker在规定的时间内ACK应答生产者,如果没有在规定的时间内应答,Kafka生产者会尝试n次重新发送消息。
acks=1 默认
-
acks=1 - Leader会将Record写到其本地日志中,但会在不等待所有Follower的完全确认的情况下做出响应。在这种情况下,如果leader在确认记录后立即失败,但在Follower复制记录之前失败,则记录将丢失。
-
acks=0 - 生产者根本不会等待服务器任何确认。该记录将立即添加到套接字缓存区中并视为已发送。在这种情况下,不能保证服务器已经收到记录。
-
acks=all - 这意味着leader将等待全套同步副本确认记录。这保证了只要至少一个同步副本仍处于活动状态,记录就不会丢失。这是最有力的保证。这等效于acks=-1设置。
如果生产者在规定时间内,没有得到Kafka的Leader的ACK应答,Kafka可以开启retries机制。
request.timeout.ms = 30000 默认 30s
retries = 2147483647 默认 (默认无限次IInteger最大值)
由于网络问题写入磁盘上超时,上图可以能导致数据重复
超时重发代码:
package com.baron.acks;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class KafkaProducerAcks {
public static void main(String[] args) {
// 1、创建KafkaProducer
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "CentOSA:9092,CentOSB:9092,CentOSC:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// 设置acks以及retries
props.put(ProducerConfig.ACKS_CONFIG, "all");
// 不包含第一次,如果尝试发送3次都失败,则系统放弃发送
props.put(ProducerConfig.RETRIES_CONFIG, 3);
// 将检测超时的时间设置为1ms,由于1ms内未来不及应答到生产者,将会重复发送消息到服务器造成数据重复
props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 1);
KafkaProducer producer = new KafkaProducer(props);
// 2、生产消息
ProducerRecord record =
new ProducerRecord<>("topic01", "ack", "test ack");
// new ProducerRecord<>("topic01", "value" + i);
// 发送消息给服务器
producer.send(record);
producer.flush();
// 3、关闭producer
producer.close();
}
}
三、幂等写(性)
Kafka 在0.11.0.0版本支持增加了对幂等的支持。幂等是针对生产者角度的特性。幂等可以保证生产者发送的消息,不会丢失,而且不会重复。实现幂等的关键点就是服务端可以区分请求是否重复,过滤掉重复的请求。要区分请求是否重复的有两点:
唯一标识:想要区分请求是否重复,请求中就得有唯一标识。例如支付请求中,订单号就是唯一标识。
记录下已处理过的请求标识:光有唯一标识还不够,还需要记录下那些请求是已经处理过的,这样当收到新的请求时,用新请求中的标识和处理记录进行比较,如果处理记录中有相同的标识,说明是重复记录,拒绝掉。
幂等又称为exactly once。要停止多次处理消息,必须仅将其持久化到Kafka Topic中仅仅一次。在初始化期间,kafka会给生产者生成一个唯一的ID称为Producer ID或PID。
PID和序列号与消息捆绑在一起,然后发送给Broker。由于序列号从零开始并且单调递增,因此,仅当消息的序列号比该PID/TopicPartition对中最后提交的消息序列号正好大1时,Broker才会接受该消息。如果不是这种情况,则Broker认定是生产者重新发送该消息。
enable.idempotence=false 默认
注意:在使用幂等性的时候,要求必须开启retries=true和acks=all
四、生产者事务
Kafka的幂等性,只能保证一条记录在分区发送的原子性,但是如果要保证多条记录之间的完整性,这个时候就需要开启kafka的事务操作。在Kafka-0.11除了引入的幂等性的概念,同时也引入了事务的概念。通常Kafka的事务分为生产者事务、消费者&生产者事务。一般来说默认消费者消费的级别是read_uncommited数据,这有可能读取到事务失败的数据,所有在开启生产者事务之后,需要用户设置消费者的事务隔离级别。
isolation.level = read_uncommited 默认
该选项有两个值read_uncommitted和read_committed,如果开始事务控制,消费端必须将事务的隔离级别设置为read_committed
开启的生产者事务的时候,只需要指定transcational.id属性即可,一旦开启了事务,默认生产者就已经开启了幂等性。但是要求“transcational.id”的取值必须是唯一的,同一时刻只能有一个“transcational.id”存储在,其他的将会被关闭。
五、生产者&消费者
生产者:
package com.baron.tranactions;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.UUID;
public class KafkaProducerTransactionsProducerOnly {
public static void main(String[] args) {
KafkaProducer producer = buildKafkaProducer();
// 初始化事务
producer.initTransactions();
try {
// 开启事务
producer.beginTransaction();
// 2、生产消息
for(int i=0; i<30; i++) {
if (i == 8) {
i = 1/0;
}
ProducerRecord record =
new ProducerRecord<>("topic01", "key" + i, "value" + i);
// 发送消息给服务器
producer.send(record);
producer.flush();
}
// 提交事务
producer.commitTransaction();
// 3、关闭producer
producer.close();
}catch (Exception e) {
e.printStackTrace();
System.err.println("出现了错误~" + e.getMessage());
// 终止事务
producer.abortTransaction();
}
}
public static KafkaProducer buildKafkaProducer() {
// 1、创建KafkaProducer
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "CentOSA:9092,CentOSB:9092,CentOSC:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// 必须配置事务ID,必须是唯一的
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transaction-id"+ UUID.randomUUID().toString());
// 配置Kafka批处理大小
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 1024);
// 等待20ms如果batch中数据不足1024大小,也将发送数据
props.put(ProducerConfig.LINGER_MS_CONFIG,20);
// 配置重试机制和幂等
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
props.put(ProducerConfig.ACKS_CONFIG, "all");
// 请求超时
props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 20000);
return new KafkaProducer(props);
}
}
消费者:
package com.baron.tranactions;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Properties;
import java.util.regex.Pattern;
public class KafkaConsumerTransactionsReadCommitted {
public static void main(String[] args) {
// 1、创建KafkaConsumer
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "CentOSA:9092,CentOSB:9092,CentOSC:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// 以组管理消费者
props.put(ConsumerConfig.GROUP_ID_CONFIG, "g1");
// 设置消费者的消费事务的隔离级别, 读取已提交的数据
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
KafkaConsumer consumer = new KafkaConsumer<>(props);
// 2、订阅相关的topics
consumer.subscribe(Arrays.asList("topic01"));
// 3、遍历消息队列
while (true) {
ConsumerRecords consumerRecords = consumer.poll(Duration.ofSeconds(1));
if (!consumerRecords.isEmpty()) { // 从队列中取到了数据
Iterator> it = consumerRecords.iterator();
while (it.hasNext()) {
// 获取一个消息
ConsumerRecord record = it.next();
String topic = record.topic();
int partition = record.partition();
long offset = record.offset();
String key = record.key();
String value = record.value();
long timestamp = record.timestamp();
System.out.println(topic+"t"+partition+","+offset+"t"+key+""+value+timestamp);
}
}
}
}
}



