栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Kafka(五).Kafka&JAVA 高级API

Kafka(五).Kafka&JAVA 高级API

Kafka(五).Kafka&JAVA 高级API 1.Offset自动控制

消费者在没有订阅topic 的offset 的时候,也就是kafka 没有记录消费者的信息的时候,消费者默认首次消费的消费策略;

auto.offset.reset = latest

  • latest 订阅开始最新的offset 默认
  • earliest 单前分区最早的
  • none 没找到消费者的先前偏移量 就向消费者报错
//服务器没有消费者信息时 指定  会拉取获得的分区最早的记录的数据
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

自动提交

kafka 消费者消费数据的时候默认定期提交消费的偏移量。保证所有消息最少被消费一次;相关参数

//自动提交
enable.commit.auto = true //默认
anto.commit.interval.ms = 5000 //默认

自动提交时间

对应java 代码配置文件

properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, true);
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, 10000);

测试,如果消费数据后10000ms 内关闭消费者,数据消费但是没有被自动提交;那么重启消费者,数据还是会被收到,数据会被再次被消费,直到超过了自动提交的时间,数据被确定消费后,消费者才不会在收到此条信息;

关闭自动提交

        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"group1");
        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

消费者每次启动都是从开始最前面开始消费;因为没有设置为自动提交,kafka 为了保证最少被消费一次,对没有被提交的还会再次被消费者收到;

手动提交

while (true){
            ConsumerRecords records = kafkaConsumer.poll(Duration.ofSeconds(1));
            Map offsetInfo = new HashMap<>();
            if(!records.isEmpty()){
                Iterator> iterator = records.iterator();
                while (iterator.hasNext()){
                    ConsumerRecord next = iterator.next();
                    offsetInfo.put(new TopicPartition(next.topic(),next.partition()),new OffsetAndmetadata(next.offset()+1));//注:next.offset()+1 ;

                    kafkaConsumer.commitAsync(offsetInfo, new OffsetCommitCallback() {
                        @Override
                        public void onComplete(Map offsets, Exception exception) {
                            System.out.println("offsets:"+next.offset()+"|||||||||||"+"offsets:"+offsets+"key:"+next.key());
                            System.out.println("exception:"+exception);
                        }
                    });
                }
            }
        }
2.ACK &Retries

生产者在发送一个数据后,要求broker 在规定的时刻回复ack 应答,如果没有在规定的时间内应答,kafka生产者会尝试n次重新发送数据; 默认ack = 1

  • acks = 1 Leader会把record 写到其本地的日志中,但是不会等待所有的follower确认后做出响应,在这种情况下,如果leader确认后记录后立刻就失败了,并且实在follower 复制之前失败,记录就会丢失

  • acks = 0 生产者不会等待服务器的任何响应,该记录会立刻添加到套接字的缓冲区,这个时候以为这已经发送(就是把数据发送到本机的网卡后);这个时候不能以为这服务器已经收到数据

  • acks = all 意味着Leader节点等待全套同步副本确认记录。这保证了只奥至少一个同步副本仍处于活动状态,记录就不会丢失,是最有力的保证,等同于配置acks = -1

如果生产者在规定的时间段内,没有收到Kafka的Leader 的Ack 应答,Kafka可以开启reties 机制。

request.timeout.ms = 30000 默认

retires = 2147483674 默认

上述情况下 可能导致某一个消息会重复的写入分区文件 下面的幂等写就是可以处理 这个问题

//保证ack   ack 机制
properties.put(ProducerConfig.ACKS_CONFIG,"all");
// 重复发送次数
properties.put(ProducerConfig.RETRIES_CONFIG,5);
// 接受Leader 的ack 超时时间
properties.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG,10);
3.幂等写

在HTTP /1.1 中对幂等性的定义是: 一次和多次请求某一个资源对于资源本身应该居勇同样的结果; 就是多次执行对资源本身所产生的影响与一次执行的影响相同。

kafka 在0.11.0.0 版本支持增加了对幂等性的支持。幂等性是针对生产者的角度的特性。可以保证生产者发送的数据不会丢失,而且不会重复。kafka实现幂等写的关键就是识别请求是不是重复的,过滤到重复的请求;

唯一标识: 区分请求是不是唯一的,请求中就应该有请求的唯一标识;还需要一个参数就是请求是否已经处理过,如果新来的请求与处理记录里面的比较,如果存在,就是表明是一个重复的请求,就拒绝掉;

幂等 exactly once ; 消息只持久化一次到kafka Topic 中。在初始化的时候,kafka会给生产者生成一个PID 、producer ID ; 一个PID 板顶一个0 开始的单调递增的序列号;当新消息来的时候对比序列号,如果是比最后持久化的消息的序列号大1 ,表明是新消息。如果不是这种情况,broker 就判断生产者是重新发的消息;

enable.idempotence = false 默认的, 注意: 使用这个的时候呀必须 retries= true 和 acks= all

//开幂等性  精准一次写入
properties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true);
4.事务控制

kafka的幂等写可以保证一条记录发送时的原子性,但是多条记录(多个分区)之间的完整性,就需要开始kafka的事务操作;

kafka在0.11.0.0 引入幂等的概念,也天机了事务的概念。 kafka的事务分类为

  • 生产者事务 only (生产者生产多个数据 其中一个失败的时候 ,会回滚,但是不会删除数据,需要设置 read_committed )
  • 消费者&生产者事务 (微服务 生产者和消费者 放在一个事务中)

一般来说,消费者的默认消费写级别是read_uncommited 数据,这有可能读取到事务失败的数据,所有在开启生产者事务之后,需要用户设置消费者的事务管理级别;

isolation.level = read_uncommitted

该配置有两个选项 上面是其中一个 还有一个是 read_committed ,如果开启事务控制,消费端必须将事物的隔离级别设置为 read_committed

生产的生产者事务的时候,只需要指定transaction.id 属性就可以,一旦开启事务。默认就开启了生产者的幂等写。但是要求 transaction.id 的取值必须时唯一的,同一时刻只能有一个transaction.id 存储 其他的被关闭

**示例 ** 生产者事务

    public static void main(String[] args) {
        //1.这个一般是标配 创建 KafkaProducer
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "CentOSA:9092,CentOSB:9092,CentOSC:9092");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        //

        //开启事务 作为生产者必须配置
        properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "tx_id" + UUID.randomUUID().toString());
        // 配置 kafka 批处理 大小
        properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 1024);
        // 发送的时间 在规定时间内 BATCH_SIZE_ConFIG 不够也会发送  ms值
        properties.put(ProducerConfig.LINGER_MS_CONFIG, 5);
        //  幂等 retries   acks
        properties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
        properties.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 10);
        properties.put(ProducerConfig.RETRIES_CONFIG, 5);
        properties.put(ProducerConfig.ACKS_CONFIG, "all");

        KafkaProducer kafkaProducer = new KafkaProducer(properties);


        kafkaProducer.initTransactions();
        try {
            kafkaProducer.beginTransaction();
            for (int i = 0; i < 10; i++) {
                ProducerRecord record = new ProducerRecord<>("topic02", "key" + i, "value" + i);
                //发送
                kafkaProducer.send(record);

                //报错
                if (i == 5) {
                    int b = 1 / 0; // 模拟报错  生产者事务回滚
                    //  读未提交  会读取到所有发送的数据
                    // 读已提交会拿到 5 之前的记录
                }
            }
            //开启了缓存 需要刷新  也可以 间隔几条就刷新到kafka
            kafkaProducer.flush();
            kafkaProducer.commitTransaction();
        } catch (Exception e) {
            System.out.println(" 事务出错 ");
            kafkaProducer.abortTransaction();
        } finally {
            kafkaProducer.close();
        }
    }
   public static void main(String[] args) {
        //1.这个一般是标配 创建  KafkaConsumer
        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"CentOSA:9092,CentOSB:9092,CentOSC:9092");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"g2");

        //设置消费隔离级别     这个是重点  默认时读未提交
        properties.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG,"read_committed");

        KafkaConsumer kafkaConsumer = new KafkaConsumer (properties);
        kafkaConsumer.subscribe(Pattern.compile("^topic02.*"));
        while (true){
            ConsumerRecords records = kafkaConsumer.poll(Duration.ofSeconds(1));
            if(!records.isEmpty()){
                Iterator> iterator = records.iterator();
                while (iterator.hasNext()){
                    ConsumerRecord next = iterator.next();
                    System.out.println(next.key());
                }
            }
        }
    }

示例 消费者&生产者事务

topic1生产者 :生产数据

kafkaProducer.initTransactions();
        try {
            kafkaProducer.beginTransaction();
            for (int i = 0; i < 10; i++) {
                ProducerRecord record = new ProducerRecord<>("topic01", "key" + i, "value" + i);
                //发送
                kafkaProducer.send(record);

                //报错
                if (i == 5) {
                    int b = 1 / 0; // 模拟报错  生产者事务回滚
                    //  读未提交  会读取到所有发送的数据
                    // 读已提交会拿到 5 之前的记录
                }
            }
            //开启了缓存 需要刷新  也可以 间隔几条就刷新到kafka
            kafkaProducer.flush();
            kafkaProducer.commitTransaction();
        } catch (Exception e) {
            System.out.println(" 事务出错 ");
            kafkaProducer.abortTransaction();
        } finally {
            kafkaProducer.close();
        }

topic1消费者 & topic2 生产者 ===》 topic 01 出错失败回滚 ==》 topic2 就会收不到数据

 public static void main(String[] args) {
        KafkaConsumer kafkaTopic01Consumer = buildConsummer("g2");
        kafkaTopic01Consumer.subscribe(Arrays.asList("topic01"));
        KafkaProducer kafkaTopic02Producer = buildProducer();

        //1.初始化
        kafkaTopic02Producer.initTransactions();
        while (true){
            ConsumerRecords records = kafkaTopic01Consumer.poll(Duration.ofSeconds(1));
            if(!records.isEmpty()){
                Map offsets = new HashMap<>();
                Iterator> iterator = records.iterator();
                //开启 1 事务
                kafkaTopic02Producer.beginTransaction();
                try {
                    //业务代码处理
                    while (iterator.hasNext()){
                        ConsumerRecord record = iterator.next();
                        System.out.println(record.key());
                        offsets.put(new TopicPartition(record.topic(),record.partition()),new OffsetAndmetadata(record.offset()+1));
                        //构建到下一个业务/topic2 的数据
                        ProducerRecord nextRecord = new ProducerRecord<>("topic02,",record.key(),record.value()+"处理业务1");
                        kafkaTopic02Producer.send(nextRecord);
                    }
                    //提交事务
                    kafkaTopic02Producer.sendOffsetsToTransaction(offsets,"g2");
                    kafkaTopic02Producer.commitTransaction();
                } catch (ProducerFencedException e) {
                    System.out.println("error");
                    //回滚 下一个topic02 / 业务处理topic 就收不到了  微服务事务关联
                    kafkaTopic02Producer.abortTransaction();
                }
            }
        }
    }

    public static KafkaProducer buildProducer(){
        //1.这个一般是标配 创建 KafkaProducer
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "CentOSA:9092,CentOSB:9092,CentOSC:9092");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        //

        //开启事务 作为生产者必须配置
        properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "tx_id" + UUID.randomUUID().toString());
        // 配置 kafka 批处理 大小
        properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 1024);
        // 发送的时间 在规定时间内 BATCH_SIZE_ConFIG 不够也会发送  ms值
        properties.put(ProducerConfig.LINGER_MS_CONFIG, 5);
        //  幂等 retries   acks
        properties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
        properties.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 10);
        properties.put(ProducerConfig.RETRIES_CONFIG, 5);
        properties.put(ProducerConfig.ACKS_CONFIG, "all");

        KafkaProducer kafkaProducer = new KafkaProducer(properties);
        return kafkaProducer;
    }

    public static KafkaConsumer buildConsummer(String groupId){
        //1.这个一般是标配 创建  KafkaConsumer
        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"CentOSA:9092,CentOSB:9092,CentOSC:9092");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,groupId);


        //设置消费隔离级别
        properties.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG,"read_committed");
        //这里必须 关闭消费者的  offset自动提交
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);

        KafkaConsumer kafkaConsumer = new KafkaConsumer (properties);
        return kafkaConsumer;
    }
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/582919.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号