栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Kafka的offset提交管理

Kafka的offset提交管理

一:自动提交
//开启offset自动提交
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");

缺点:先提交offset后消费,提交完offset后没有消费就挂机后,可能造成丢失数据

二:手动提交         手动提交 offset 的方法有两种:分别是 commitSync (同步提交) 和 commitAsync (异步 提交) 。两者的相同点是,都会将 本次 poll 的一批数据最高的偏移量提交 ;不同点是, commitSync 阻塞当前线程,一直到提交成功,并且会自动失败重试(由不可控因素导致, 也会出现提交失败);而 commitAsync 则没有失败重试机制,故有可能提交失败。
//开启offset自动提交
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");


2.1:同步提交
        while (true){
            //获取数据
            ConsumerRecords poll = consumer.poll(100);

            //解析数据
            for (ConsumerRecord record : poll) {
                String key = record.key();
                String value = record.value();
                System.out.println("key==>"+key+"  value==>"+value);
            }
            //offset同步提交,当前线程会阻塞直到 offset 提交成功,才会再次拉取数据                       
            consumer.commitSync();

        }
 2.2:异步提交
     while (true){
            //获取数据
            ConsumerRecords poll = consumer.poll(100);

            //解析数据
            for (ConsumerRecord record : poll) {
                String key = record.key();
                String value = record.value();
                System.out.println("key==>"+key+"  value==>"+value);
            }
            //offset异步提交
            consumer.commitAsync(new OffsetCommitCallback() {
                @Override
                public void onComplete(Map offsets, Exception exception) {
                    if (exception != null) {
                        System.err.println("Commit failed for" +
                                offsets);
                    }
                }
            });
        }
2.4:数据漏消费和重复消费分析         无论是同步提交还是异步提交 offset ,都有可能会造成数据的漏消费或者重复消费。先 提交 offset 后消费,有可能造成数据的漏消费;而先消费后提交 offset ,有可能会造成数据 的重复消费。 2.5:自定义存储offset                 offset 的维护是相当繁琐的,因为需要考虑到消费者的 Rebalace。 当有新的消费者加入消费者组、已有的消费者推出消费者组或者所订阅的主题的分区发 生变化,就会触发到分区的重新分配,重新分配的过程叫做 Rebalance。 消费者发生 Rebalance 之后,每个消费者消费的分区就会发生变化。因此消费者要首先获取到自己被重新分配到的分区,并且定位到每个分区最近提交的 offset 位置继续消费。
package com.yrl.comsumer;

import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;

import java.util.*;

public class MyConsumerOffset {
    private static Map currentOffset = new
            HashMap<>();
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "hadoop112:9092");
        props.put("group.id", "test");
        props.put("enable.auto.commit", "false");
        props.put("key.deserializer",
                "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer",
                "org.apache.kafka.common.serialization.StringDeserializer");
        //创建一个消费者
        KafkaConsumer consumer = new KafkaConsumer<>(props);
        //消费者订阅主题
        consumer.subscribe(Arrays.asList("first"), new ConsumerRebalanceListener() {

                    //该方法会在 Rebalance 之前调用
                    @Override
                    public void
                    onPartitionsRevoked(Collection partitions) {
                        commitOffset(currentOffset);
                    }
                    //该方法会在 Rebalance 之后调用
                    @Override
                    public void
                    onPartitionsAssigned(Collection partitions) {
                        currentOffset.clear();
                        for (TopicPartition partition : partitions) {
                            consumer.seek(partition, getOffset(partition));//定位到最近提交的 offset 位置继续消费
                        }
                    }
                });
        while (true) {
            ConsumerRecords records = consumer.poll(100);//消费者拉取数据
            for (ConsumerRecord record : records) {

                currentOffset.put(new TopicPartition(record.topic(),
                        record.partition()), record.offset());
            }
            commitOffset(currentOffset);//异步提交
        }
    }
    //获取某分区的最新 offset
    private static long getOffset(TopicPartition partition) {
        return 0;
    }
    //提交该消费者所有分区的 offset
    private static void commitOffset(Map currentOffset) {
    }
}
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/350331.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号