栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

kafka 自动与手动管理offset

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

kafka 自动与手动管理offset

前言

offset顾名思义,即偏移量,我们知道消息从生产者发送到kafka的topic之后,是进入到不同的分区,在consumer未对消息进行消费之前,消息是有序存储在各个分区中;

offset内部原理

在之前我们了解了kafka的消费者原理之后,提出这样一个疑问,kafka怎么知道某个消费组中的消费者消费消息的进度呢?

1、从0.9版本开始,consumer默认将offset保存在Kafka ,一个内置的topic中,该topic为__consumer_offsets; 2、 Kafka0.9版本之前,consumer默认将offset保存在Zookeeper中;
这也就是说,kafka是通过 offset这个值来管理消费组消费进度的,下面是一张关于kafka的offset的原理图;

关于offset做下面几点补充:

__consumer_offsets 主题里面采用 key 和 value 的方式存储数据;key 是 group.id+topic+ 分区号,value 就是当前 offset 的值;每隔一段时间,kafka 内部会对这个 topic 进行 compact,也就是每个 group.id+topic+分区号就保留最新数据;

默认情况下,保存offset数据的系统主题是看不到的,为了查看该系统主题数据,要将下面这个参数修改为false

exclude.internal.topics=false【在配置文件 config/consumer.properties 中添加配置 exclude.internal.topics=false】

自动提交 offset

为了使我们能够专注于自己的业务逻辑, Kafka 提供了自动提交 offset的功能,自动提交 offset 的相关参数: 1、enable.auto.commit  : 是否开启自动提交 offset 功能,默认是 true;
默认值为 true ,消费者会自动周期性地向服务器提交偏移量
2、auto.commit.interval.ms  : 自动提交 offset 的时间间隔,默认是 5s;
如果设置了 enable.auto.commit 的值为 true , 则该值定义了消 费者偏移量向 Kafka 提交的频率, 默认 5s

 

代码展示 producer 端代码
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

public class OffsetProducer1 {

    public static void main(String[] args) throws Exception {

        // 1. 创建 kafka 生产者的配置对象
        Properties properties = new Properties();
        // 2. 给 kafka 配置对象添加配置信息:bootstrap.servers
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "IP:9092");

        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

        // 3. 创建 kafka 生产者对象
        KafkaProducer kafkaProducer = new KafkaProducer(properties);
        System.out.println("开始发送数据");
        // 4. 调用 send 方法,发送消息
        for (int i = 0; i < 15; i++) {
            kafkaProducer.send(new ProducerRecord<>("zcy234","congge " + i));
        }
        // 5. 关闭资源
        kafkaProducer.close();
    }

}

consumer 端代码
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Properties;

public class OffsetConsumer1 {

    public static void main(String[] args) {
        // 1. 创建 kafka 消费者配置类
        Properties properties = new Properties();
        // 2. 添加配置参数
        // 添加连接
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "IP:9092");

        // 配置序列化 必须
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test_group2");
        // 是否自动提交 offset
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);

        // 修改 提交 offset 的时间周期 1000ms,默认情况下为 5s
        properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);

        //3. 创建 kafka 消费者
        KafkaConsumer consumer = new KafkaConsumer<>(properties);

        //4. 设置消费主题 形参是列表
        consumer.subscribe(Arrays.asList("zcy234"));

        System.out.println("准备开始消费数据");
        //5. 消费数据
        while (true){
            // 读取消息
            ConsumerRecords consumerRecords = consumer.poll(Duration.ofSeconds(1));
            // 输出消息
            for (ConsumerRecord consumerRecord : consumerRecords) {
                System.out.println(consumerRecord.value());
            }
        }

    }

}

核心的代码即添加下面这两行配置

// 是否自动提交 offset
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);

// 修改 提交 offset 的时间周期 1000ms,默认情况下为 5s
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);

运行上面的程序,效果上面和之前差不多,

 

 

手动提交 offset 虽然自动提交 offset 十分简单便利,但由于其是基于时间提交的,开发人员难以把握 offset 提交的时机。因 此Kafka 还提供了手动提交 offset 的 API,关于手动提交offset,做如下几点说明:

手动提交offset的方法有两种:分别是commitSync(同步提交)和commitAsync(异步提交);两者的相 同点是,都会将本次提交的一批数据最高的偏移量提交;不同点是,同步提交阻塞当前线程,一直到提交成 功,并且会自动失败重试(由不可控因素导致,也会出现提交失败);而异步提交则没有失败重试机制,故 有可能提交失败;

commitSync(同步提交):必须等待offset提交完毕,再去消费下一批数据;

commitAsync(异步提交) :发送完提交offset请求后,就开始消费下一批数据了;

 

同步提交 offset

由于同步提交 offset 有失败重试机制,故更加可靠,但是由于一直等待提交结果,提 交的效率比较低。 下面看同步提交offset的consumer的完整代码:
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

public class SyncConsumer1 {

    public static void main(String[] args) {
        // 1. 创建 kafka 消费者配置类
        Properties properties = new Properties();
        // 2. 添加配置参数
        // 添加连接
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "IP:9092");

        // 配置序列化 必须
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test_group3");
        // 是否自动提交 offset
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);

        // 修改 提交 offset 的时间周期 1000ms,默认情况下为 5s
        properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);

        //3. 创建 kafka 消费者
        KafkaConsumer consumer = new KafkaConsumer<>(properties);

        // 是否自动提交 offset
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        //3. 创建 kafka 消费者
        //4. 设置消费主题 形参是列表
        consumer.subscribe(Arrays.asList("zcy234"));
        System.out.println("准备开始消费数据");

        //5. 消费数据
        while (true){
            // 读取消息
            ConsumerRecords consumerRecords =
                    consumer.poll(Duration.ofSeconds(1));
            // 输出消息
            for (ConsumerRecord consumerRecord :
                    consumerRecords) {
                System.out.println(consumerRecord.value());
            }

            // 同步提交 offset
            consumer.commitSync();
        }

    }

}

仍然使用上面的producer向zcy234这个topic发送几条消息,观察消费端控制台输出情况,仍然可以正常消费到消息;

 

异步提交 offset 虽然同步提交 offset 更可靠一些,但是由于其会阻塞当前线程,直到提交成功。因此 吞吐量会受到很大的影响。因此更多的情况下,会选用异步提交 offset 的方式。 下面是完整的代码
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

public class AsyncConsumer1 {

    public static void main(String[] args) {
        // 1. 创建 kafka 消费者配置类
        Properties properties = new Properties();
        // 2. 添加配置参数
        // 添加连接
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "IP:9092");

        // 配置序列化 必须
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        // 配置消费者组
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test_group5");
        // 是否自动提交 offset
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        //3. 创建 Kafka 消费者
        KafkaConsumer consumer = new
                KafkaConsumer<>(properties);
        //4. 设置消费主题 形参是列表
        consumer.subscribe(Arrays.asList("zcy234"));
        //5. 消费数据
        while (true) {
            // 读取消息
            ConsumerRecords consumerRecords = consumer.poll(Duration.ofSeconds(1));
            // 输出消息
            for (ConsumerRecord consumerRecord : consumerRecords) {
                System.out.println(consumerRecord.value());
            }
            // 异步提交 offset
            consumer.commitAsync();
        }
    }
}

仍然使用上面的producer向zcy234这个topic发送几条消息,观察消费端控制台输出情况,仍然可以正常消费到消息;

 

指定 Offset 消费

kafka中消费者在消费数据时的offset的机制有3种,默认情况下为latest,即从最近的那一次的位置开始消费;

auto.offset.reset = earliest | latest | none 默认是 latest

当 Kafka 中没有初始偏移量(消费者组第一次消费)或服务器上不再存在当前偏移量 时(例如该数据已被删除),该怎么办? 1、 earliest :自动将偏移量重置为最早的偏移量, --from-beginning; 2、latest (默认值) :自动将偏移量重置为最新偏移量; 3、none :如果未找到消费者组的先前偏移量,则向消费者抛出异常;

 

于是在实际业务中可能会遇到这么一种场景,即新的消费者并不想消费最早的那一批消息,而是指定从某个offset位置开始消费;

下面看具体的consumer端代码:

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.*;

public class SpecialOffsetConsumer1 {

    public static void main(String[] args) {
        // 0 配置信息
        Properties properties = new Properties();
        // 连接
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "IP:9092");
        // key value 反序列化
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test_group6");

        // 1 创建一个消费者
        KafkaConsumer kafkaConsumer = new KafkaConsumer<>(properties);

        // 2 订阅一个主题
        ArrayList topics = new ArrayList<>();
        topics.add("zcy234");
        kafkaConsumer.subscribe(topics);
        Set assignment = new HashSet<>();

        // 获取消费者分区分配信息(有了分区分配信息才能开始消费),避免开始消费的时候分区信息还未就绪
        while (assignment.size() == 0) {
            kafkaConsumer.poll(Duration.ofSeconds(1));
            assignment = kafkaConsumer.assignment();
        }

        // 遍历所有分区,并指定 offset 从 5 的位置开始消费
        for (TopicPartition tp : assignment) {
            kafkaConsumer.seek(tp, 5);
        }

        System.out.println("准备开始消费数据");
        // 3 消费该主题数据
        while (true) {
            ConsumerRecords consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
            for (ConsumerRecord consumerRecord : consumerRecords) {
                System.out.println(consumerRecord);
            }
        }
    }

}

运行这段代码,然后再次使用上面的producer发送消息,观察控制台输出效果,可以看到,数据消费的offset的位置从5开始

 

指定时间消费
需求:在生产环境中,比如说遇到最近消费的某一段时间的数据有异常,想重新按照时间消费?或者要求按照时间消费前一天的数据,怎么处理?

下面看具体的代码处理

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.*;

public class SpecialTimeConsumer1 {

    public static void main(String[] args) {
        // 0 配置信息
        Properties properties = new Properties();
        // 连接
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "101.34.23.80:9092");
        // key value 反序列化
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test_group7");

        // 创建一个消费者
        KafkaConsumer kafkaConsumer = new KafkaConsumer<>(properties);
        // 订阅主题
        ArrayList topics = new ArrayList<>();
        topics.add("zcy234");
        kafkaConsumer.subscribe(topics);
        Set assignment = new HashSet<>();
        while (assignment.size() == 0) {
            kafkaConsumer.poll(Duration.ofSeconds(1));
            // 获取消费者分区分配信息(有了分区分配信息才能开始消费)
            assignment = kafkaConsumer.assignment();
        }
        HashMap timestampToSearch = new HashMap<>();
        // 封装集合存储,每个分区对应一天前的数据
        for (TopicPartition topicPartition : assignment) {
            //用当前时间减去业务上需要回退的时间,比如这里想重新消费24个小时之前的数据
            timestampToSearch.put(topicPartition, System.currentTimeMillis() - 1 * 24 * 3600 * 1000);
        }

        // 获取从 1 天前开始消费的每个分区的 offset
        Map offsets = kafkaConsumer.offsetsForTimes(timestampToSearch);

        // 遍历每个分区,对每个分区设置消费时间。
        for (TopicPartition topicPartition : assignment) {
            OffsetAndTimestamp offsetAndTimestamp = offsets.get(topicPartition);
            // 根据时间指定开始消费的位置
            if (offsetAndTimestamp != null) {
                kafkaConsumer.seek(topicPartition, offsetAndTimestamp.offset());
            }
        }
    }
}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/762707.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号