栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 系统运维 > 运维 > Linux

kafka Java客户端之 consumer API 消费消息

Linux 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

kafka Java客户端之 consumer API 消费消息

背景:我使用docker-compose 搭建的kafka服务
kafka的简单介绍以及docker-compose部署单主机Kafka集群

使用consumer API消费指定Topic里面的消息

首先我们需要使用Admin API 来创建Topic或者使用kafka的脚本文件来创建Topic

脚本文件创建topic

进入kafka容器

docker exec -it ${ConTAINER ID} /bin/bash

cd 到脚本文件的文件夹

cd /opt/kafka/bin

使用脚本文件创建Topic
kafka-1是我使用docker-compose 搭建kafka集群的时候的容器名

./kafka-topics.sh --create --bootstrap-server kafka-1:9092 --replication-factor 2 --partitions 3 --topic xt

当使用localhost的时候不能解析

查看已经存在的topic

./kafka-topics.sh --list --bootstrap-server kafka-1:9092


我们可以使用脚本在topic中写入一些信息(Ctrl+C结束)

./kafka-console-producer.sh --bootstrap-server kafka-1:9092 --topic xt


使用consumer脚本读取在topic中写入的信息

./kafka-console-consumer.sh --bootstrap-server kafka-1:9092  --from-beginning --topic xt


Kafka的集群元信息都会由ZooKeeper维护

Admin API 创建topic

使用java客户端也可以创建topic ,这些操作都封装在了Admin API之中

详细请见kafka客户端操作之Admin API

 //创建Topic实例
public static void createTopic() {

    AdminClient adminClient = adminClient();
    // 副本因子
    Short replicationFactor = 1;
    //创建具有指定副本因子和分区数的新topic。
    NewTopic newTopic = new NewTopic(TOPIC_NAME, 3 , replicationFactor);
    //创建一批新主题。
    //此操作不是事务性的,因此它可能对某些主题成功,而对另一些主题则失败。
    //CreateTopicsResult返回成功后,所有代理可能需要几秒钟才能意识到主题已创建。在此期间, listTopics()和describeTopics(Collection)可能不会返回有关新主题的信息。
    CreateTopicsResult topics = adminClient.createTopics(Arrays.asList(newTopic));
    System.out.println("创建topic成功 : "+ topics.toString());
    System.out.println("---------------------------------------------------------------");

}
Producer API 向指定topic发送消息

详细请见 kafka 客户端之producer API发送消息以及简单源码分析
如果不事先使用producer API发消息到kafka server, kafka consumer将拉取不到消息消费

public static void producerSendWithCallback(Producer producer){

    // 消息对象 - ProducerRecoder
    for(int i=0;i<10;i++){
        ProducerRecord record = new ProducerRecord<>(TOPIC_NAME,"key-"+i,"value-"+i);
        //就是多传入一个回调实例
        
        producer.send(record, new Callback() {
            
            @Override
            public void onCompletion(Recordmetadata recordmetadata, Exception e) {
                System.out.println(
                        "partition : "+recordmetadata.partition()+" , offset : "+recordmetadata.offset());
            }
        });
    }

    // 所有的通道打开都需要关闭  close方法会会将缓存队列状态置为关闭,唤醒io线程将内存中的数据发往broker,避免这个程序的进程突然挂掉,然后内存里面的消息丢失,所以这个方法结束的时候,将消息数据都发送出去
    producer.close();
}
consumer API

kafka客户端consumer从kafka集群中获取消息,并透明地处理kafka集群中出现故障broker,透明地调节适应集群中变化的数据分区。也和broker交互,负载平衡消费者。

消费者维护着与broker的TCP连接来获取消息。如果在使用后没有关闭消费者,则会泄露这些连接。消费者不是线程安全的。

offset(偏移量)和消费者位置

kafka为分区中的每条消息保存一个偏移量(offset),这个偏移量是该分区中一条消息的唯一标示。也表示消费者在分区的位置。例如,一个位置是5的消费者(说明已经消费了0到4的消息),下一个将接收消息的偏移量为5的消息。实际上这有两个与消费者相关的 “位置” 概念:

消费者的位置给出了下一条消息的偏移量。它比消费者在该分区中看到的最大偏移量要大一个。它在每次消费者在调用poll(Duration)中接收消息时自动增长。

已提交的位置是已安全保存的最后偏移量,如果进程失败或重新启动时,消费者将恢复到这个偏移量。消费者可以选择定期自动提交偏移量,也可以选择通过调用commit API来手动的控制(如:同步提交commitSync 和 异步提交commitAsync)。

这个主要区别是消费者来控制一条消息什么时候才被认为是已被消费的,控制权在消费者。

在Kafka中无论是producer往topic中写数据,还是consumer从topic中读数据,都避免不了和offset打交道,关于offset主要有以下几个概念。

Last Committed Offset:consumer group最新一次 commit 的 offset,表示这个 group
已经把 Last Committed Offset 之前的数据都消费成功了。Current Position:consumer group 当前消费数据的 offset,也就是说,Last Committed Offset 到 Current Position 之间的数据已经拉取成功,可能正在处理,但是还未 commit。Log End Offset(LEO):记录底层日志(log)中的下一条消息的offset。对producer来说,就是即将插入下一条消息的offset。High Watermark(HW):已经成功备份到其他 replicas 中的最新一条数据的 offset,也就是说 Log End Offset 与 High Watermark 之间的数据已经写入到该 partition 的 leader 中,但是还未完全备份到其他的replicas 中,consumer是无法消费这部分消息(未提交消息)。

每个Kafka副本对象都有两个重要的属性:LEO和HW。注意是所有的副本,而不只是leader副本。关于这两者更详细解释,建议参考这篇文章。

对于消费者而言,我们更多时候关注的是消费完成之后如何和服务器进行消费确认,告诉服务器这部分数据我已经消费过了。

这里就涉及到了2个offset,一个是current position,一个是处理完毕向服务器确认的committed offset。显然,异步模式下committed offset是落后于current position的。如果consumer挂掉了,那么下一次消费数据又只会从committed offset的位置拉取数据,就会导致数据被重复消费。

消费者组和主题订阅

Kafka的消费者组概念,通过 进程池 瓜分消息并处理消息。这些进程可以在同一台机器运行,也可分布到多台机器上,以增加可扩展性和容错性,相同group.id的消费者将视为同一个消费者组。

组中的每个消费者都通过subscribe API动态的订阅一个topic列表。kafka将已订阅topic的消息发送到每个消费者组中。并通过平衡分区在消费者分组中所有成员之间来达到平均。因此每个分区恰好地分配1个消费者(一个消费者组中)。所有如果一个topic有4个分区,并且一个消费者分组有只有2个消费者。那么每个消费者将消费2个分区。

消费者组的成员是动态维护的:如果一个消费者故障。分配给它的分区将重新分配给同一个分组中其他的消费者。同样的,如果一个新的消费者加入到分组,将从现有消费者中移一个给它。这被称为重新平衡分组,并在下面更详细地讨论。当新分区添加到订阅的topic时,或者当创建与订阅的正则表达式匹配的新topic时,也将重新平衡。将通过定时刷新自动发现新的分区,并将其分配给分组的成员。

从概念上讲,你可以将消费者分组看作是由多个进程组成的单一逻辑订阅者。作为一个多订阅系统,Kafka支持对于给定topic任何数量的消费者组,而不重复。

这是在消息系统中常见的功能的略微概括。所有进程都将是单个消费者分组的一部分(类似传统消息传递系统中的队列的语义),因此消息传递就像队列一样,在组中平衡。与传统的消息系统不同的是,虽然,你可以有多个这样的组。但每个进程都有自己的消费者组(类似于传统消息系统中pub-sub的语义),因此每个进程都会订阅到该主题的所有消息。

此外,当分组重新分配自动发生时,可以通过ConsumerRebalanceListener通知消费者,这允许他们完成必要的应用程序级逻辑,例如状态清除,手动偏移提交等。

它也允许消费者通过使用assign(Collection)手动分配指定分区,如果使用手动指定分配分区,那么动态分区分配和协调消费者组将失效。

发现消费者故障

订阅一组topic后,当调用poll(long)时,消费者将自动加入到组中。只要持续的调用poll,消费者将一直保持可用,并继续从分配的分区中接收消息。此外,消费者向服务器定时发送心跳。 如果消费者崩溃或无法在session.timeout.ms配置的时间内发送心跳,则消费者将被视为死亡,并且其分区将被重新分配。

还有一种可能,消费可能遇到“活锁”的情况,它持续的发送心跳,但是没有处理。为了预防消费者在这种情况下一直持有分区,我们使用max.poll.interval.ms活跃检测机制。 在此基础上,如果你调用的poll的频率大于最大间隔,则客户端将主动地离开组,以便其他消费者接管该分区。 发生这种情况时,你会看到offset提交失败(调用commitSync()引发的CommitFailedException)。这是一种安全机制,保障只有活动成员能够提交offset。所以要留在组中,你必须持续调用poll。

消费者提供两个配置设置来控制poll循环:

    max.poll.interval.ms:增大poll的间隔,可以为消费者提供更多的时间去处理返回的消息(调用poll(long)返回的消息,通常返回的消息都是一批)。缺点是此值越大将会延迟组重新平衡。max.poll.records:此设置限制每次调用poll返回的消息数,这样可以更容易的预测每次poll间隔要处理的最大值。通过调整此值,可以减少poll间隔,减少重新平衡分组的

对于消息处理时间不可预测地的情况,这些选项是不够的。 处理这种情况的推荐方法是将消息处理移到另一个线程中,让消费者继续调用poll。 但是必须注意确保已提交的offset不超过实际位置。另外,你必须禁用自动提交,并只有在线程完成处理后才为记录手动提交偏移量(取决于你)。 还要注意,你需要pause暂停分区,不会从poll接收到新消息,让线程处理完之前返回的消息(如果你的处理能力比拉取消息的慢,那创建新线程将导致你机器内存溢出)。

push 还是 pull

Kafka Consumer采用的是主动拉取broker数据进行消费的。一般消息中间件存在推送(server推送数据给consumer)和拉取(consumer主动取服务器取数据)两种方式,这两种方式各有优劣。

如果是选择推送的方式最大的阻碍就是服务器不清楚consumer的消费速度,如果consumer中执行的操作又是比较耗时的,那么consumer可能会不堪重负,甚至会导致系统挂掉。

而采用拉取的方式则可以解决这种情况,consumer根据自己的状态来拉取数据,可以对服务器的数据进行延迟处理。但是这种方式也有一个劣势就是服务器没有数据的时候可能会一直轮询,不过还好Kafka在poll()有参数允许消费者请求在“长轮询”中阻塞,等待数据到达(并且可选地等待直到给定数量的字节可用以确保传输大小)。

示例: 自动提交偏移量(Automatic Offset Committing)
private static void autoCommitedOffset(){

    Properties properties = new Properties();
    // bootstrap.servers是Kafka集群的IP地址。多个时,使用逗号隔开
    properties.setProperty("bootstrap.servers", "kafka服务器IP:9092");
    // 消费者群组
    // Consummer中有一个Consumer group(消费组),由它来决定同一个Consumer group中的消费者具体拉取哪个partition的数据,
    // 所以这里必须指定group.id属性
    properties.setProperty("group.id", "groupxt");
    // 自动提交offset,默认true
    // 每1000ms提交一次
    properties.setProperty("enable.auto.commit", "true");
    properties.setProperty("auto.commit.interval.ms", "1000");
    // 指定序列化类,因为需要通信交互,所以需要序列化
    properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

    KafkaConsumer consumer = new KafkaConsumer(properties);

    // 消费订阅哪一个Topic或者几个Topic
    //通过subscribe()方法订阅主题具有消费者自动再均衡(reblance)的功能,存在多个消费者的情况下可以根据分区分配策略来自动分配各个消费者
    // 与分区的关系。当组内的消费者增加或者减少时,分区关系会自动调整。实现消费负载均衡以及故障自动转移。
    // 使用assign()方法订阅则不具有该功能。
    consumer.subscribe(Arrays.asList(TOPIC_NAME));

//        List partitionInfoList = consumer.partitionsFor("xt");
//        if(null != partitionInfoList) {
//            for(PartitionInfo partitionInfo : partitionInfoList) {
//                consumer.assign(Collections.singletonList(
//                        new TopicPartition(partitionInfo.topic(), partitionInfo.partition())));
//            }
//        }

    //取消订阅  unsubscribe()方法即可以取消通过subscribe()方式实现的订阅,还可以取消通过assign()方式实现的订阅
    //
//        consumer.unsubscribe();

    //也可以通过订阅空列表来达到同样的功效
//        consumer.subscribe(new ArrayList<>());
//        consumer.assign(new ArrayList());


    while (true) {
        //拉取消息,每10000ms拉取一次这里是,等于是说批量的去拉取

        System.out.println("-----------------消费消息-------------------");
        ConsumerRecords records = consumer.poll(Duration.ofMillis(10000));

        System.out.println("获取到"+records.count()+"条消息");

        for (ConsumerRecord record : records) {
            System.out.printf("patition = %d , offset = %d, key = %s, value = %s%n",
                    record.partition(),record.offset(), record.key(), record.value());
        }

        System.out.println("------------------消费消息-------------------");
    }
}
订阅Topic

使用subscribe()方法订阅主题使用assign()方法订阅确定主题和分区

通过subscribe()方法订阅主题具有消费者自动再均衡(reblance)的功能,存在多个消费者的情况下可以根据分区分配策略来自动分配各个消费者与分区的关系。当组内的消费者增加或者减少时,分区关系会自动调整。实现消费负载均衡以及故障自动转移。使用assign()方法订阅则不具有该功能。

取消订阅
consumer.unsubscribe();
consumer.subscribe(new ArrayList<>());
consumer.assign(new ArrayList());

上面的三行代码作用相同,都是取消订阅,其中unsubscribe()方法即可以取消通过subscribe()方式实现的订阅,还可以取消通过assign()方式实现的订阅。

broker通过心跳机器自动检测consumer组中失败的进程,消费者会自动ping集群,告诉进群它还活着。只要消费者能够做到这一点,它就被认为是活着的,并保留分配给它分区的权利,如果它停止心跳的时间超过session.timeout.ms,那么就会认为是故障的,它的分区将被分配到别的进程。

手动提交偏移量(Manual Offset Control)

需要自己提交offset,kafka 客户端也提供了两种提交offset的方式

同步提交commitSync()异步提交commitAsync()

 
 private static boolean commitedOffset() {

     Properties properties = new Properties();
     properties.setProperty("bootstrap.servers", "kafka服务器IP:9092");
     properties.setProperty("group.id", "groupxt");
     
     //关闭自动提交offset
     properties.setProperty("enable.auto.commit", "false");
     properties.setProperty("auto.commit.interval.ms", "1000");
     
     properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
     properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

     KafkaConsumer consumer = new KafkaConsumer(properties);
     
     // 消费订阅哪一个Topic或者几个Topic
     consumer.subscribe(Arrays.asList(TOPIC_NAME));
     while (true) {
         ConsumerRecords records = consumer.poll(Duration.ofMillis(10000));
         for (ConsumerRecord record : records) {
             // 想把数据保存到数据库,成功就成功,不成功...
             try{
                 //模拟业务操作
                 Thread.sleep(500);
                 System.out.printf("patition = %d , offset = %d, key = %s, value = %s%n",
                         record.partition(), record.offset(), record.key(), record.value());

             }catch (Exception e) {
                 // 如果失败,则不要提交offset返回false,让调用方处理
                 // 所以下次消费还是从之前的offset开始消费
                 e.printStackTrace();
                 return false;
             }

         }

         // 如果成功,手动异步提交offset
         // 异步提交offset可能会导致重复消费
         consumer.commitAsync();
         //手动同步提交offset
         //但是同步提交一样也会导致重复消费问题,因为消息消费和offset提交并没有保证是一个原子操作,所以还是会导致重复消费问题
//            consumer.commitSync();
     }
 }
针对Partition提交offset

可以获取多个partition,每个partition单独提交offset

private static void commitedOffsetWithPartition() {
    Properties properties = new Properties();
    properties.setProperty("bootstrap.servers", "kafka服务器IP:9092");
    properties.setProperty("group.id", "groupxt");
    properties.setProperty("enable.auto.commit", "false");
    properties.setProperty("auto.commit.interval.ms", "1000");
    properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

    KafkaConsumer consumer = new KafkaConsumer(properties);
    // 消费订阅哪一个Topic或者几个Topic
    consumer.subscribe(Arrays.asList(TOPIC_NAME));
    while (true) {
       ConsumerRecords records = consumer.poll(Duration.ofMillis(10000));
       // 每个partition单独处理
       for(TopicPartition partition : records.partitions()){
           List> pRecord = records.records(partition);
           for (ConsumerRecord record : pRecord) {
               System.out.printf("patition = %d , offset = %d, key = %s, value = %s%n",
                       record.partition(), record.offset(), record.key(), record.value());

           }
           long lastOffset = pRecord.get(pRecord.size() -1).offset();
           // 针对单个partition中的offset单独进行提交
           Map offset = new HashMap<>();
           //从下一个offset开始消费,不然会出现重复消费
           offset.put(partition,new OffsetAndmetadata(lastOffset+1));
           // 提交offset
           consumer.commitSync(offset);
           System.out.println("=============partition - "+ partition +" end================");
       }
    }
}

也可以给消费者指定partition进行消费,一个消费者可以消费多个partition里面的数据

 
 private static void commitedOffsetWithPartition2() {
     Properties properties = new Properties();
     properties.setProperty("bootstrap.servers", "kafka服务器IP:9092");
     properties.setProperty("group.id", "test");
     properties.setProperty("enable.auto.commit", "false");
     properties.setProperty("auto.commit.interval.ms", "1000");
     properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
     properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

     KafkaConsumer consumer = new KafkaConsumer(properties);

     // xt的0,1,2 三个partition
     TopicPartition p0 = new TopicPartition(TOPIC_NAME, 0);
     TopicPartition p1 = new TopicPartition(TOPIC_NAME, 1);
     TopicPartition p2  = new TopicPartition(TOPIC_NAME,2);

     // 消费订阅哪一个Topic或者几个Topic
//        consumer.subscribe(Arrays.asList(TOPIC_NAME));

     // 消费订阅某个Topic的某个分区
     consumer.assign(Arrays.asList(p0));

     while (true) {
         ConsumerRecords records = consumer.poll(Duration.ofMillis(10000));
         // 每个partition单独处理  因为他手动了,实际每次都是消费的p0分区里面的消息
         for(TopicPartition partition : records.partitions()){
             List> pRecord = records.records(partition);
             for (ConsumerRecord record : pRecord) {
                 System.out.printf("patition = %d , offset = %d, key = %s, value = %s%n",
                         record.partition(), record.offset(), record.key(), record.value());

             }
             long lastOffset = pRecord.get(pRecord.size() -1).offset();
             // 单个partition中的offset,并且进行提交
             Map offset = new HashMap<>();
             offset.put(partition,new OffsetAndmetadata(lastOffset+1));
             // 提交offset
             consumer.commitSync(offset);
             System.out.println("=============partition - "+ partition +" end================");
         }
     }
 }
consumer结构图

在以上代码中,可以看到设置了group.id这个配置项,这是一个Consumer的必要配置项,因为在Kafka中,Consumer需要位于一个Consumer Group里。具体如下图所示:

在上图中是一个Consumer消费一个Partition,是一对一的关系。但一个Consumer可以消费多个Partition,是一对多的关系。

Consumer的注意事项:

单个Partition的消息只能由Consumer Group中的某个Consumer来消费Consumer从Partition中消费消息是顺序的,默认从头开始消费如果Consumer Group中只有一个Consumer,那么这个Consumer会消费所有Partition中的消息 kafka 怎样做到不重复消费

只要保证处理消息和提交offset得操作是原子操作,就可以做到不重复消费。我们可以自己管理committed offset,而不让kafka来进行管理。

比如如下使用方式:

如果消费的数据刚好需要存储在数据库,那么可以把offset也存在数据库,就可以就可以在一个事物中提交这两个结果,保证原子操作。借助搜索引擎,把offset和数据一起放到索引里面,比如Elasticsearch

每条记录都有自己的offset,所以如果要管理自己的offset还得要做下面事情

设置enable.auto.commit=false使用每个ConsumerRecord提供的offset来保存消费的位置。在重新启动时使用seek(TopicPartition, long)恢复上次消费的位置。

通过上面的方式就可以在消费端实现"Exactly Once"的语义,即保证只消费一次。但是是否真的需要保证不重复消费呢?这个得看具体业务,重复消费数据对整体有什么影响在来决定是否需要做到不重复消费。

几个重要的消费者参数

fetch.min.bytes

配置poll()拉取请求过程种能从Kafka拉取的最小数据量,如果可用数据量小于它指定的大小会等到有足够可用数据时才会返回给消费者,其默认值时1B

fetch.max.wait.ms

和fetch.min.bytes有关,用于指定Kafka的等待时间,默认时间500ms。如果fetch.min.bytes设置为1MB,fetch.max.wait.ms设置为100ms,Kafka收到消费者请求后,要么返回1MB数据,要么在100ms后返回所有可用数据,就看哪个提交得到满足。

max.poll.records

用于控制单次调用poll()能返回的最大记录数量,默认为500条数据

partition.assignment.stragety

分区会被分配给群组的消费者,这个参数用于指定分区分配策略。默认是RangeAssignore,可选的还有RoundRobinAssignor。同样它还支持自定义

无法消费的数据怎么办?

可能由于你的业务逻辑有些数据没法消费这个时候怎么办?同样的还是的看你认为这个数据有多重要或者多不重要,如果重要可以记录日志,把它存入文件或者数据库,以便于稍候进行重试或者定向分析。

References:

https://www.orchome.com/451#item-9https://www.jianshu.com/p/abbc09ed6703https://blog.51cto.com/zero01/2498017

(写博客主要是对自己学习的归纳整理,资料大部分来源于书籍、网络资料和自己的实践,整理不易,但是难免有不足之处,如有错误,请大家评论区批评指正。同时感谢广大博主和广大作者辛苦整理出来的资源和分享的知识。)

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/729521.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号