栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

kafka基础【Producer和Consumer的API操作】

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

kafka基础【Producer和Consumer的API操作】

一 Producer的API 1 消息发送流程

Kafka的Producer发送消息采用的是异步发送的方式。在消息发送的过程中,涉及到了两个线程——main线程和Sender线程,以及一个线程共享变量——RecordAccumulator。main线程将消息发送给RecordAccumulator,Sender线程不断从RecordAccumulator中拉取消息发送到Kafka broker。

kafkaProoducer消息发送流程:

主线程不负责消息传递的具体过程,以提高效率,main将消息封装成ProducerRecord,经过一系列处理分门别类地将消息放到RecordAccumulator(主线程和sender线程共享的资源池)中,sender根据RA中已经分好区的数据发送到topic。

2 实现步骤

导入依赖

    
        org.apache.kafka
        kafka-clients
        2.4.1
    

发送者发送数据程序代码

package com.hike.producer;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.io.IOException;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

public class Producer {
    public static void main(String[] args) throws ExecutionException, InterruptedException, IOException {
        //1 实例化kafka集群(创建对象)
        Properties properties = new Properties();
        //通过配置文件配置
        //properties.load(Producer.class.getClassLoader().getResourceAsStream("kafka.properties"));
        //通过程序代码设置
        properties.setProperty("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
 properties.setProperty("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
        properties.setProperty("acks","all");
        properties.setProperty("bootstrap.servers","hadoop101:9092");
        KafkaProducer producer = new KafkaProducer(properties);

        //2 用集群对象发送数据
        for (int i = 0; i < 10; i++) {
            Future future = producer.send(
                    //2.1 封装ProducerRecord
                    new ProducerRecord(
                            "hello",
                            Integer.toString(i),
                            "Value" + i
                    ),
                    //2.2 回调函数
                    new Callback() {
                        //当sender收到服务器的ack之后,sender线程会调用onCompletion方法
                        public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                            if (e == null) {
                                System.out.println(recordMetadata);
                            }
                        }
                    });
            //RecordMetadata recordMetadata = future.get(); //添加此语句,发送方式变为同步操作
            System.out.println("第" + i + "条发送成功");
        }

        //3 关闭资源
        producer.close();
    }
}

二 Consumer的API 1 自动提交offset

Consumer消费数据时的可靠性是很容易保证的,因为数据在Kafka中是持久化的,故不用担心数据丢失问题。

由于consumer在消费过程中可能会出现断电宕机等故障,consumer恢复后,需要从故障前的位置的继续消费,所以consumer需要实时记录自己消费到了哪个offset,以便故障恢复后继续消费。

所以offset的维护是Consumer消费数据必须考虑的问题。

消费者消费数据程序代码

package com.hike.consumer;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.io.IOException;
import java.util.Collections;
import java.util.Properties;

public class Consumer {
    public static void main(String[] args) throws IOException, InterruptedException {
        //1 新建一个consumer对象
        Properties properties = new Properties();
        properties.load(Consumer.class.getClassLoader().getResourceAsStream("consumer1.properties"));
        KafkaConsumer consumer = new KafkaConsumer(properties);

        //2 用这个对象接收消息
        //发布订阅模式接收消息,先订阅
        consumer.subscribe(Collections.singleton("hello"));
        while(true){
            //从订阅的话题中拉取数据
            ConsumerRecords poll = consumer.poll(2000);
            if(poll.count() == 0){
                Thread.sleep(100);
            }
            //消费拉取到的数据
            for (ConsumerRecord record : poll) {
                System.out.println(record);
            }
        }
        
        //3 关闭资源
        //consumer.close();
    }
}

consumer1.properties文件

key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
bootstrap.servers=hadoop101:9092
enable.auto.commit=true
group.id=test2
auto.offset.reset=earliest
2 手动提交offset

虽然自动提交offset十分简介便利,但由于其是基于时间提交的,开发人员难以把握offset提交的时机。因此Kafka提供了手动提交offset的API。

手动提交offset的方法有两种:分别是commitSync(同步提交)和commitAsync(异步提交)。两者的相同点是,都会将本次poll的一批数据最高的偏移量提交;不同点是,commitSync阻塞当前线程,一直到提交成功,并且会自动失败重试(由不可控因素导致,也会出现提交失败);而commitAsync则没有失败重试机制,故有可能提交失败。

修改配置文件中的enable.auto.commit=false选项并在消费拉取到的数据之后添加consumer.commitSync();语句即可。由于同步提交offset有失败重试机制,故更加可靠。

			 //消费拉取到的数据
            for (ConsumerRecord record : poll) {
                System.out.println(record);
            }
            consumer.commitSync();

同步提交offset更可靠一些,但是由于其会阻塞当前线程,直到提交成功。因此吞吐量会收到很大的影响。在更多的情况下,会选用异步提交offset的方式。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/889086.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号