消费者在没有订阅topic 的offset 的时候,也就是kafka 没有记录消费者的信息的时候,消费者默认首次消费的消费策略;
auto.offset.reset = latest
- latest 订阅开始最新的offset 默认
- earliest 单前分区最早的
- none 没找到消费者的先前偏移量 就向消费者报错
//服务器没有消费者信息时 指定 会拉取获得的分区最早的记录的数据 properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
自动提交
kafka 消费者消费数据的时候默认定期提交消费的偏移量。保证所有消息最少被消费一次;相关参数
//自动提交 enable.commit.auto = true //默认 anto.commit.interval.ms = 5000 //默认
自动提交时间
对应java 代码配置文件
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, true); properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, 10000);
测试,如果消费数据后10000ms 内关闭消费者,数据消费但是没有被自动提交;那么重启消费者,数据还是会被收到,数据会被再次被消费,直到超过了自动提交的时间,数据被确定消费后,消费者才不会在收到此条信息;
关闭自动提交
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"group1");
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
消费者每次启动都是从开始最前面开始消费;因为没有设置为自动提交,kafka 为了保证最少被消费一次,对没有被提交的还会再次被消费者收到;
手动提交
while (true){
ConsumerRecords records = kafkaConsumer.poll(Duration.ofSeconds(1));
Map offsetInfo = new HashMap<>();
if(!records.isEmpty()){
Iterator> iterator = records.iterator();
while (iterator.hasNext()){
ConsumerRecord next = iterator.next();
offsetInfo.put(new TopicPartition(next.topic(),next.partition()),new OffsetAndmetadata(next.offset()+1));//注:next.offset()+1 ;
kafkaConsumer.commitAsync(offsetInfo, new OffsetCommitCallback() {
@Override
public void onComplete(Map offsets, Exception exception) {
System.out.println("offsets:"+next.offset()+"|||||||||||"+"offsets:"+offsets+"key:"+next.key());
System.out.println("exception:"+exception);
}
});
}
}
}
2.ACK &Retries
生产者在发送一个数据后,要求broker 在规定的时刻回复ack 应答,如果没有在规定的时间内应答,kafka生产者会尝试n次重新发送数据; 默认ack = 1
-
acks = 1 Leader会把record 写到其本地的日志中,但是不会等待所有的follower确认后做出响应,在这种情况下,如果leader确认后记录后立刻就失败了,并且实在follower 复制之前失败,记录就会丢失
-
acks = 0 生产者不会等待服务器的任何响应,该记录会立刻添加到套接字的缓冲区,这个时候以为这已经发送(就是把数据发送到本机的网卡后);这个时候不能以为这服务器已经收到数据
-
acks = all 意味着Leader节点等待全套同步副本确认记录。这保证了只奥至少一个同步副本仍处于活动状态,记录就不会丢失,是最有力的保证,等同于配置acks = -1
如果生产者在规定的时间段内,没有收到Kafka的Leader 的Ack 应答,Kafka可以开启reties 机制。
request.timeout.ms = 30000 默认
retires = 2147483674 默认
上述情况下 可能导致某一个消息会重复的写入分区文件 下面的幂等写就是可以处理 这个问题
//保证ack ack 机制 properties.put(ProducerConfig.ACKS_CONFIG,"all"); // 重复发送次数 properties.put(ProducerConfig.RETRIES_CONFIG,5); // 接受Leader 的ack 超时时间 properties.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG,10);3.幂等写
在HTTP /1.1 中对幂等性的定义是: 一次和多次请求某一个资源对于资源本身应该居勇同样的结果; 就是多次执行对资源本身所产生的影响与一次执行的影响相同。
kafka 在0.11.0.0 版本支持增加了对幂等性的支持。幂等性是针对生产者的角度的特性。可以保证生产者发送的数据不会丢失,而且不会重复。kafka实现幂等写的关键就是识别请求是不是重复的,过滤到重复的请求;
唯一标识: 区分请求是不是唯一的,请求中就应该有请求的唯一标识;还需要一个参数就是请求是否已经处理过,如果新来的请求与处理记录里面的比较,如果存在,就是表明是一个重复的请求,就拒绝掉;
幂等 exactly once ; 消息只持久化一次到kafka Topic 中。在初始化的时候,kafka会给生产者生成一个PID 、producer ID ; 一个PID 板顶一个0 开始的单调递增的序列号;当新消息来的时候对比序列号,如果是比最后持久化的消息的序列号大1 ,表明是新消息。如果不是这种情况,broker 就判断生产者是重新发的消息;
enable.idempotence = false 默认的, 注意: 使用这个的时候呀必须 retries= true 和 acks= all
//开幂等性 精准一次写入 properties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true);4.事务控制
kafka的幂等写可以保证一条记录发送时的原子性,但是多条记录(多个分区)之间的完整性,就需要开始kafka的事务操作;
kafka在0.11.0.0 引入幂等的概念,也天机了事务的概念。 kafka的事务分类为
- 生产者事务 only (生产者生产多个数据 其中一个失败的时候 ,会回滚,但是不会删除数据,需要设置 read_committed )
- 消费者&生产者事务 (微服务 生产者和消费者 放在一个事务中)
一般来说,消费者的默认消费写级别是read_uncommited 数据,这有可能读取到事务失败的数据,所有在开启生产者事务之后,需要用户设置消费者的事务管理级别;
isolation.level = read_uncommitted
该配置有两个选项 上面是其中一个 还有一个是 read_committed ,如果开启事务控制,消费端必须将事物的隔离级别设置为 read_committed
生产的生产者事务的时候,只需要指定transaction.id 属性就可以,一旦开启事务。默认就开启了生产者的幂等写。但是要求 transaction.id 的取值必须时唯一的,同一时刻只能有一个transaction.id 存储 其他的被关闭
**示例 ** 生产者事务
public static void main(String[] args) {
//1.这个一般是标配 创建 KafkaProducer
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "CentOSA:9092,CentOSB:9092,CentOSC:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
//
//开启事务 作为生产者必须配置
properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "tx_id" + UUID.randomUUID().toString());
// 配置 kafka 批处理 大小
properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 1024);
// 发送的时间 在规定时间内 BATCH_SIZE_ConFIG 不够也会发送 ms值
properties.put(ProducerConfig.LINGER_MS_CONFIG, 5);
// 幂等 retries acks
properties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
properties.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 10);
properties.put(ProducerConfig.RETRIES_CONFIG, 5);
properties.put(ProducerConfig.ACKS_CONFIG, "all");
KafkaProducer kafkaProducer = new KafkaProducer(properties);
kafkaProducer.initTransactions();
try {
kafkaProducer.beginTransaction();
for (int i = 0; i < 10; i++) {
ProducerRecord record = new ProducerRecord<>("topic02", "key" + i, "value" + i);
//发送
kafkaProducer.send(record);
//报错
if (i == 5) {
int b = 1 / 0; // 模拟报错 生产者事务回滚
// 读未提交 会读取到所有发送的数据
// 读已提交会拿到 5 之前的记录
}
}
//开启了缓存 需要刷新 也可以 间隔几条就刷新到kafka
kafkaProducer.flush();
kafkaProducer.commitTransaction();
} catch (Exception e) {
System.out.println(" 事务出错 ");
kafkaProducer.abortTransaction();
} finally {
kafkaProducer.close();
}
}
public static void main(String[] args) {
//1.这个一般是标配 创建 KafkaConsumer
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"CentOSA:9092,CentOSB:9092,CentOSC:9092");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"g2");
//设置消费隔离级别 这个是重点 默认时读未提交
properties.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG,"read_committed");
KafkaConsumer kafkaConsumer = new KafkaConsumer (properties);
kafkaConsumer.subscribe(Pattern.compile("^topic02.*"));
while (true){
ConsumerRecords records = kafkaConsumer.poll(Duration.ofSeconds(1));
if(!records.isEmpty()){
Iterator> iterator = records.iterator();
while (iterator.hasNext()){
ConsumerRecord next = iterator.next();
System.out.println(next.key());
}
}
}
}
示例 消费者&生产者事务
topic1生产者 :生产数据
kafkaProducer.initTransactions();
try {
kafkaProducer.beginTransaction();
for (int i = 0; i < 10; i++) {
ProducerRecord record = new ProducerRecord<>("topic01", "key" + i, "value" + i);
//发送
kafkaProducer.send(record);
//报错
if (i == 5) {
int b = 1 / 0; // 模拟报错 生产者事务回滚
// 读未提交 会读取到所有发送的数据
// 读已提交会拿到 5 之前的记录
}
}
//开启了缓存 需要刷新 也可以 间隔几条就刷新到kafka
kafkaProducer.flush();
kafkaProducer.commitTransaction();
} catch (Exception e) {
System.out.println(" 事务出错 ");
kafkaProducer.abortTransaction();
} finally {
kafkaProducer.close();
}
topic1消费者 & topic2 生产者 ===》 topic 01 出错失败回滚 ==》 topic2 就会收不到数据
public static void main(String[] args) {
KafkaConsumer kafkaTopic01Consumer = buildConsummer("g2");
kafkaTopic01Consumer.subscribe(Arrays.asList("topic01"));
KafkaProducer kafkaTopic02Producer = buildProducer();
//1.初始化
kafkaTopic02Producer.initTransactions();
while (true){
ConsumerRecords records = kafkaTopic01Consumer.poll(Duration.ofSeconds(1));
if(!records.isEmpty()){
Map offsets = new HashMap<>();
Iterator> iterator = records.iterator();
//开启 1 事务
kafkaTopic02Producer.beginTransaction();
try {
//业务代码处理
while (iterator.hasNext()){
ConsumerRecord record = iterator.next();
System.out.println(record.key());
offsets.put(new TopicPartition(record.topic(),record.partition()),new OffsetAndmetadata(record.offset()+1));
//构建到下一个业务/topic2 的数据
ProducerRecord nextRecord = new ProducerRecord<>("topic02,",record.key(),record.value()+"处理业务1");
kafkaTopic02Producer.send(nextRecord);
}
//提交事务
kafkaTopic02Producer.sendOffsetsToTransaction(offsets,"g2");
kafkaTopic02Producer.commitTransaction();
} catch (ProducerFencedException e) {
System.out.println("error");
//回滚 下一个topic02 / 业务处理topic 就收不到了 微服务事务关联
kafkaTopic02Producer.abortTransaction();
}
}
}
}
public static KafkaProducer buildProducer(){
//1.这个一般是标配 创建 KafkaProducer
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "CentOSA:9092,CentOSB:9092,CentOSC:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
//
//开启事务 作为生产者必须配置
properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "tx_id" + UUID.randomUUID().toString());
// 配置 kafka 批处理 大小
properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 1024);
// 发送的时间 在规定时间内 BATCH_SIZE_ConFIG 不够也会发送 ms值
properties.put(ProducerConfig.LINGER_MS_CONFIG, 5);
// 幂等 retries acks
properties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
properties.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 10);
properties.put(ProducerConfig.RETRIES_CONFIG, 5);
properties.put(ProducerConfig.ACKS_CONFIG, "all");
KafkaProducer kafkaProducer = new KafkaProducer(properties);
return kafkaProducer;
}
public static KafkaConsumer buildConsummer(String groupId){
//1.这个一般是标配 创建 KafkaConsumer
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"CentOSA:9092,CentOSB:9092,CentOSC:9092");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
properties.put(ConsumerConfig.GROUP_ID_CONFIG,groupId);
//设置消费隔离级别
properties.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG,"read_committed");
//这里必须 关闭消费者的 offset自动提交
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
KafkaConsumer kafkaConsumer = new KafkaConsumer (properties);
return kafkaConsumer;
}



