栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 云计算 > 云平台

Kafka生产与消费--简单示例

云平台 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Kafka生产与消费--简单示例

下面用java给出一个kafka生产与消费的简单示例:

运行环境:

java:java version "1.8.0_291"

kafka:


    org.apache.kafka
    kafka_2.11
    2.1.0

与消费者相关的3个重要概念:

  • 分组:任何消费者Consumer都是处于一个分组内的,而分组是跟offset紧密关联的。
  • GroupCoordinator: 分组协调器,位置在broker中,每个分组对应一个GroupCoordinator,并且通过groupId的哈希取模算法,跟consumer offset topic的一个分区一一对应,该分组内所有consumer的offset都存储在这个分区。当然了GroupCoordinator还有别的作用,在这里不一一赘述。每个broker中仅有一个GroupCoordinator,可以管理多个分组。
  • ConsumerCoordinator :消费者协调器,位于消费者这一端,每个消费者对应一个ConsumerCoordinator,消费者消费完消息,持久化offset的时候,要用到它。当然了,它还有别的作用,在这里不再赘述。
  • ConsumerCoordinator发送出去的offset信息,被broker中的GroupCoordinator持久化到对应的consumer offset topic 分区。

在这里面,offset的维护是非常重要的一点: 

Consumer工作流程:

 

 

package com.mashibing.kafka;

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.jupiter.api.Test;

import java.io.IOException;
import java.nio.channels.FileChannel;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;


public class Lesson01 {


    

    @Test
    public void producer() throws ExecutionException, InterruptedException {

        String topic = "msb-items";
        Properties p = new Properties();
        p.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"node02:9092,node03:9092,node01:9092");
        //kafka  持久化数据的MQ  数据-> byte[],不会对数据进行干预,双方要约定编解码
        //kafka是一个app::使用零拷贝  sendfile 系统调用实现快速数据消费
        p.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        p.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        //1,0,-1 三个值,设置为-1表示每次发送消息到broker,消息不光要到达leader partition,还要同步到其follower partition。
        p.setProperty(ProducerConfig.ACKS_CONFIG, "-1");

        KafkaProducer producer = new KafkaProducer(p);

        //现在的producer就是一个提供者,面向的其实是broker,虽然在使用的时候我们期望把数据打入topic

        



        while(true){
            for (int i = 0; i < 3; i++) {
                for (int j = 0; j <3; j++) {
                    ProducerRecord record = new ProducerRecord<>(topic, "item"+j,"val" + i);
                    Future send = producer
                            .send(record);
                    //虽然producer.send(...)是一个异步的方法,但是调用send.get()后,又会同步阻塞等待。
                    RecordMetadata rm = send.get();
                    int partition = rm.partition(); //消息被存到哪个分区了
                    long offset = rm.offset(); //消息在分区的offset
                    System.out.println("key: "+ record.key()+" val: "+record.value()+" partition: "+partition + " offset: "+offset);

                }
            }
        }



    }


    public static void main(String[] args) {
        System.out.println(System.currentTimeMillis()-1*1000);
    }


    @Test
    public void consumer(){
        

        //基础配置
        Properties p = new Properties();
        p.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"node02:9092,node03:9092,node01:9092");
        p.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        p.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        //消费的细节
        //需要配置消费者所在的group,如果不显式配置,则会自动为其分配一个
        p.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"OOXX");
        //KAKFA IS MQ  IS STORAGE
        p.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");//第一次启动,米有offset,设置其从最开始的offset开始
        
        p.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true");//自动提交时异步提交,这种方式可能->丢数据&&重复数据
        //一个运行的consumer ,那么自己会维护自己消费进度
        //一旦你自动提交,但是是异步的,可能出现下面的问题
        //1,还没到时间,挂了,没提交(但是业务已经执行完毕),重起一个consuemr,参照offset的时候,会重复消费
        //2,一个批次的数据还没写数据库成功,但是这个批次的offset背异步提交了,挂了,重起一个consuemr,参照offset的时候,会丢失消费

//        p.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"15000");//5秒
//        p.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,""); // POLL 拉取数据,弹性,按需,拉取多少?


        KafkaConsumer consumer = new KafkaConsumer<>(p);


        //kafka 的consumer会动态负载均衡,如果只有一个consumer,其会处理topic下所有的分区的消息
        consumer.subscribe(Arrays.asList("msb-items"), new ConsumerRebalanceListener() {
            @Override
            public void onPartitionsRevoked(Collection partitions) {
                System.out.println("---onPartitionsRevoked:");
                Iterator iter = partitions.iterator();
                while(iter.hasNext()){
                    System.out.println(iter.next().partition());
                }

            }

            @Override
            public void onPartitionsAssigned(Collection partitions) {
                System.out.println("---onPartitionsAssigned:");
                Iterator iter = partitions.iterator();

                while(iter.hasNext()){
                    System.out.println(iter.next().partition());
                }


            }
        });


        

        Map tts =new HashMap<>();
        //通过consumer取回自己分配的分区 as

        Set as = consumer.assignment();

        while(as.size()==0){
            consumer.poll(Duration.ofMillis(100));
            as = consumer.assignment();
        }

        //自己填充一个hashmap,为每个分区设置对应的时间戳
        for (TopicPartition partition : as) {
//            tts.put(partition,System.currentTimeMillis()-1*1000);
            tts.put(partition,1610629127300L);
        }
        //通过consumer的api,取回timeindex的数据
        Map offtime = consumer.offsetsForTimes(tts);


        for (TopicPartition partition : as) {
            //通过取回的offset数据,通过consumer的seek方法,修正自己的消费偏移
            OffsetAndTimestamp offsetAndTimestamp = offtime.get(partition);
            long offset = offsetAndTimestamp.offset();  //不是通过time 换 offset,如果是从mysql读取回来,其本质是一样的
            System.out.println(offset);
            consumer.seek(partition,offset);

        }

        try {
            System.in.read();
        } catch (IOException e) {
            e.printStackTrace();
        }


        while(true){
            
            //微批的感觉
            ConsumerRecords records = consumer.poll(Duration.ofMillis(0));// 0~n

            if(!records.isEmpty()){
                //以下代码的优化很重要
                System.out.println("-----------"+records.count()+"-------------");
                Set partitions = records.partitions(); //每次poll的时候是取多个分区的数据
                //且每个分区内的数据是有序的

                
                for (TopicPartition partition : partitions) {
                    List> pRecords = records.records(partition);
//                    pRecords.stream().sorted()
                    //在一个微批里,按分区获取poll回来的数据
                    //线性按分区处理,还可以并行按分区处理用多线程的方式
                    Iterator> piter = pRecords.iterator();
                    while(piter.hasNext()){
                        ConsumerRecord next = piter.next();
                        int par = next.partition();
                        long offset = next.offset();
                        String key = next.key();
                        String value = next.value();
                        long timestamp = next.timestamp();


                        System.out.println("key: "+ key+" val: "+ value+ " partition: "+par + " offset: "+ offset+"time:: "+ timestamp);

                        TopicPartition sp = new TopicPartition("msb-items", par);
                        OffsetAndMetadata om = new OffsetAndMetadata(offset);
                        HashMap map = new HashMap<>();
                        map.put(sp,om);

                        //持久化offset级别1
                        //手动提交offset:用Sync方式。每消费一条消息提交一次,会产生大量IO,降低系统性能
                        consumer.commitSync(map);//这个是最安全的,每条记录级的更新,第一点
                        //单线程,多线程,都可以
                    }

                    //持久化offset级别2
                    //手动提交offset:用Sync方式。每消费完一个分区,提交这个分区的offset,大大减少IO次数,提高系统性能
                    long poff = pRecords.get(pRecords.size() - 1).offset();//获取分区内最后一条消息的offset
                    OffsetAndMetadata pom = new OffsetAndMetadata(poff);
                    HashMap map = new HashMap<>();
                    map.put(partition,pom);
                    consumer.commitSync( map );//这个是第二种,分区粒度提交offset
                    

                }
                //持久化offset级别3
                //手动提交offset:用Sync方式。全部完事儿,再提交offset(持久化)
                consumer.commitSync();//这个就是按poll的批次提交offset,第3点



//                Iterator> iter = records.iterator();
//                while(iter.hasNext()){
//                    //因为一个consuemr可以消费多个分区,但是一个分区只能给一个组里的一个consuemr消费
//                    ConsumerRecord record = iter.next();
//                    int partition = record.partition();
//                    long offset = record.offset();
//                    String key = record.key();
//                    String value = record.value();
//
//                    System.out.println("key: "+ record.key()+" val: "+ record.value()+ " partition: "+partition + " offset: "+ offset);
//                }
            }



        }





    }





}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/898834.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号