栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 其他

第四章 Kafka-事务相关

其他 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

第四章 Kafka-事务相关

目录

一、Offset自动控制

二、ACK&RETRIES(应答和重试机制)

三、幂等写(性)

四、生产者事务

五、生产者&消费者


一、Offset自动控制

Kafka消费者默认对于未订阅的topic的offset的时候,也就是系统并没有存储该消费者的消费分区的记录信息,默认Kafka消费者的默认首次消费策略:latest(最新)

auto.offset.reset=latest 自动偏移量的重置

  • erliest - 自动将偏移量重置为最早的偏移量

  • latest - 自动将偏移量重置为最新的偏移量

  • none - 如果未找到消费者组的先前偏移量,则向消费者抛出异常

latest:

package com.baron.offset;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Properties;
import java.util.regex.Pattern;


public class KafkaConsumerOffset_0 {
    public static void main(String[] args) {
        // 1、创建KafkaConsumer
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "CentOSA:9092,CentOSB:9092,CentOSC:9092");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        // 以组管理消费者
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "g3");

        // offset默认配置是latest - 如果系统没有消费者的偏移量,系统会读取该分区最新的偏移量作为该组的偏移量
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        KafkaConsumer consumer = new KafkaConsumer<>(props);
        // 2、订阅相关的topics
//        consumer.subscribe(Pattern.compile("^topic.*"));
        consumer.subscribe(Arrays.asList("topic01"));
        // 3、遍历消息队列
        while (true) {
            ConsumerRecords consumerRecords = consumer.poll(Duration.ofSeconds(1));
            if (!consumerRecords.isEmpty()) { // 从队列中取到了数据
                Iterator> it = consumerRecords.iterator();
                while (it.hasNext()) {
                    // 获取一个消息
                    ConsumerRecord record = it.next();
                    String topic = record.topic();
                    int partition = record.partition();
                    long offset = record.offset();

                    String key = record.key();
                    String value = record.value();
                    long timestamp = record.timestamp();
                    System.out.println(topic+"t"+partition+","+offset+"t"+key+""+value+timestamp);
                }
            }
        }

    }
}

earliest:

package com.baron.offset;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Properties;


public class KafkaConsumerOffset_1 {
    public static void main(String[] args) {
        // 1、创建KafkaConsumer
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "CentOSA:9092,CentOSB:9092,CentOSC:9092");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        // 以组管理消费者
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "g2");

        // earliest - 如果系统没有消费者的偏移量,系统会读取该分区最早的偏移量
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        KafkaConsumer consumer = new KafkaConsumer<>(props);
        // 2、订阅相关的topics
//        consumer.subscribe(Pattern.compile("^topic.*"));
        consumer.subscribe(Arrays.asList("topic01"));
        // 3、遍历消息队列
        while (true) {
            ConsumerRecords consumerRecords = consumer.poll(Duration.ofSeconds(1));
            if (!consumerRecords.isEmpty()) { // 从队列中取到了数据
                Iterator> it = consumerRecords.iterator();
                while (it.hasNext()) {
                    // 获取一个消息
                    ConsumerRecord record = it.next();
                    String topic = record.topic();
                    int partition = record.partition();
                    long offset = record.offset();

                    String key = record.key();
                    String value = record.value();
                    long timestamp = record.timestamp();
                    System.out.println(topic+"t"+partition+","+offset+"t"+key+""+value+timestamp);
                }
            }
        }

    }
}

Kafka消费者在消费数据的时候默认会定期的提交消费的偏移量,这样就可以保证所有的消息至少可以被消费者消费1次,用户可以通过一下两个参数配置:

  • enable.auto.commit = true 默认

  • auto.commit.interval.ms = 5000 默认

自动提交偏移量:

package com.baron.offset;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Properties;


public class KafkaConsumerOffset_2 {
    public static void main(String[] args) {
        // 1、创建KafkaConsumer
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "CentOSA:9092,CentOSB:9092,CentOSC:9092");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        // 以组管理消费者
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "g4");

        // 配置offset自动提交的时间间隔,配置成10s自动提交 默认5s
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 10000);
        // offset偏移量自动提交
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);

        // offset默认配置是latest - 如果系统没有消费者的偏移量,系统会读取该分区最新的偏移量作为该组的偏移量
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        KafkaConsumer consumer = new KafkaConsumer<>(props);
        // 2、订阅相关的topics
//        consumer.subscribe(Pattern.compile("^topic.*"));
        consumer.subscribe(Arrays.asList("topic01"));
        // 3、遍历消息队列
        while (true) {
            ConsumerRecords consumerRecords = consumer.poll(Duration.ofSeconds(1));
            if (!consumerRecords.isEmpty()) { // 从队列中取到了数据
                Iterator> it = consumerRecords.iterator();
                while (it.hasNext()) {
                    // 获取一个消息
                    ConsumerRecord record = it.next();
                    String topic = record.topic();
                    int partition = record.partition();
                    long offset = record.offset();

                    String key = record.key();
                    String value = record.value();
                    long timestamp = record.timestamp();
                    System.out.println(topic+"t"+partition+","+offset+"t"+key+""+value+timestamp);
                }
            }
        }

    }
}

如果用户需要自己管理offset的自动提交,可以关闭offset的自动提交,手动管理offset提交的偏移量,注意用户提交的offset偏移量永远都要比本次消费的偏移量+1,因为提交的offset是Kafka消费者下一次抓取数据的位置。

消费者手动提交偏移量:

package com.baron.offset;

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.*;


public class KafkaConsumerOffset_2 {
    public static void main(String[] args) {
        // 1、创建KafkaConsumer
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "CentOSA:9092,CentOSB:9092,CentOSC:9092");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        // 以组管理消费者
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "g5");

        // 配置offset自动提交的时间间隔,配置成10s自动提交 默认5s
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 10000);
        // offset偏移量自动提交
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);

        // offset默认配置是latest - 如果系统没有消费者的偏移量,系统会读取该分区最新的偏移量作为该组的偏移量
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        KafkaConsumer consumer = new KafkaConsumer<>(props);
        // 2、订阅相关的topics
//        consumer.subscribe(Pattern.compile("^topic.*"));
        consumer.subscribe(Arrays.asList("topic01"));
        // 3、遍历消息队列
        while (true) {
            ConsumerRecords consumerRecords = consumer.poll(Duration.ofSeconds(1));
            if (!consumerRecords.isEmpty()) { // 从队列中取到了数据
                Iterator> it = consumerRecords.iterator();
                // 自动提交偏移量,记录分区的消费元数据信息
                Map offsets = new HashMap<>();
                while (it.hasNext()) {
                    // 获取一个消息
                    ConsumerRecord record = it.next();
                    String topic = record.topic();
                    int partition = record.partition();
                    long offset = record.offset();

                    String key = record.key();
                    String value = record.value();
                    long timestamp = record.timestamp();

                    // 记录消费分区的偏移量元数据,一定在提交的时候偏移量信息offset+1
                    offsets.put(new TopicPartition(topic, partition), new OffsetAndmetadata(offset+1));
                    // 提交消费者偏移量
                    consumer.commitAsync(offsets, new OffsetCommitCallback() {
                        @Override
                        public void onComplete(Map offsets, Exception exception) {
                            System.out.println("offsets:" + offsets + "t exception:" + exception);
                        }
                    });
                    System.out.println(topic+"t"+partition+","+offset+"t"+key+""+value+timestamp);

                }
            }
        }

    }
}

二、ACK&RETRIES(应答和重试机制)

Kafka生产者在发送完一个消息之后,要求leader的broker在规定的时间内ACK应答生产者,如果没有在规定的时间内应答,Kafka生产者会尝试n次重新发送消息。

acks=1 默认

  • acks=1 - Leader会将Record写到其本地日志中,但会在不等待所有Follower的完全确认的情况下做出响应。在这种情况下,如果leader在确认记录后立即失败,但在Follower复制记录之前失败,则记录将丢失。

  • acks=0 - 生产者根本不会等待服务器任何确认。该记录将立即添加到套接字缓存区中并视为已发送。在这种情况下,不能保证服务器已经收到记录。

  • acks=all - 这意味着leader将等待全套同步副本确认记录。这保证了只要至少一个同步副本仍处于活动状态,记录就不会丢失。这是最有力的保证。这等效于acks=-1设置。

如果生产者在规定时间内,没有得到Kafka的Leader的ACK应答,Kafka可以开启retries机制。

request.timeout.ms = 30000 默认 30s

retries = 2147483647 默认 (默认无限次IInteger最大值)

由于网络问题写入磁盘上超时,上图可以能导致数据重复

超时重发代码:  

package com.baron.acks;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;


public class KafkaProducerAcks {
    public static void main(String[] args) {
        // 1、创建KafkaProducer
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "CentOSA:9092,CentOSB:9092,CentOSC:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        // 设置acks以及retries
        props.put(ProducerConfig.ACKS_CONFIG, "all");
        // 不包含第一次,如果尝试发送3次都失败,则系统放弃发送
        props.put(ProducerConfig.RETRIES_CONFIG, 3);

        // 将检测超时的时间设置为1ms,由于1ms内未来不及应答到生产者,将会重复发送消息到服务器造成数据重复
        props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 1);

        KafkaProducer producer = new KafkaProducer(props);
        // 2、生产消息
        ProducerRecord record =
                new ProducerRecord<>("topic01", "ack", "test ack");
//                    new ProducerRecord<>("topic01", "value" + i);
        // 发送消息给服务器
        producer.send(record);
        producer.flush();
        // 3、关闭producer
        producer.close();

    }
}

三、幂等写(性)

Kafka 在0.11.0.0版本支持增加了对幂等的支持。幂等是针对生产者角度的特性。幂等可以保证生产者发送的消息,不会丢失,而且不会重复。实现幂等的关键点就是服务端可以区分请求是否重复,过滤掉重复的请求。要区分请求是否重复的有两点:

唯一标识:想要区分请求是否重复,请求中就得有唯一标识。例如支付请求中,订单号就是唯一标识。

记录下已处理过的请求标识:光有唯一标识还不够,还需要记录下那些请求是已经处理过的,这样当收到新的请求时,用新请求中的标识和处理记录进行比较,如果处理记录中有相同的标识,说明是重复记录,拒绝掉。

幂等又称为exactly once。要停止多次处理消息,必须仅将其持久化到Kafka Topic中仅仅一次。在初始化期间,kafka会给生产者生成一个唯一的ID称为Producer ID或PID。

PID和序列号与消息捆绑在一起,然后发送给Broker。由于序列号从零开始并且单调递增,因此,仅当消息的序列号比该PID/TopicPartition对中最后提交的消息序列号正好大1时,Broker才会接受该消息。如果不是这种情况,则Broker认定是生产者重新发送该消息。

enable.idempotence=false 默认

注意:在使用幂等性的时候,要求必须开启retries=true和acks=all

四、生产者事务

Kafka的幂等性,只能保证一条记录在分区发送的原子性,但是如果要保证多条记录之间的完整性,这个时候就需要开启kafka的事务操作。在Kafka-0.11除了引入的幂等性的概念,同时也引入了事务的概念。通常Kafka的事务分为生产者事务、消费者&生产者事务。一般来说默认消费者消费的级别是read_uncommited数据,这有可能读取到事务失败的数据,所有在开启生产者事务之后,需要用户设置消费者的事务隔离级别。

isolation.level = read_uncommited 默认

该选项有两个值read_uncommitted和read_committed,如果开始事务控制,消费端必须将事务的隔离级别设置为read_committed

开启的生产者事务的时候,只需要指定transcational.id属性即可,一旦开启了事务,默认生产者就已经开启了幂等性。但是要求“transcational.id”的取值必须是唯一的,同一时刻只能有一个“transcational.id”存储在,其他的将会被关闭。

五、生产者&消费者

生产者: 

package com.baron.tranactions;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;
import java.util.UUID;


public class KafkaProducerTransactionsProducerOnly {
    public static void main(String[] args) {
        KafkaProducer producer = buildKafkaProducer();

        // 初始化事务
        producer.initTransactions();
        try {
            // 开启事务
            producer.beginTransaction();
            // 2、生产消息
            for(int i=0; i<30; i++) {
                if (i == 8) {
                    i = 1/0;
                }
                ProducerRecord record =
                        new ProducerRecord<>("topic01", "key" + i, "value" + i);
                // 发送消息给服务器
                producer.send(record);
                producer.flush();
            }
            // 提交事务
            producer.commitTransaction();
            // 3、关闭producer
            producer.close();
        }catch (Exception e) {
            e.printStackTrace();
            System.err.println("出现了错误~" + e.getMessage());
            // 终止事务
            producer.abortTransaction();
        }

    }

    public static KafkaProducer buildKafkaProducer() {
        // 1、创建KafkaProducer
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "CentOSA:9092,CentOSB:9092,CentOSC:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        // 必须配置事务ID,必须是唯一的
        props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transaction-id"+ UUID.randomUUID().toString());
        // 配置Kafka批处理大小
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 1024);
        // 等待20ms如果batch中数据不足1024大小,也将发送数据
        props.put(ProducerConfig.LINGER_MS_CONFIG,20);

        // 配置重试机制和幂等
        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
        props.put(ProducerConfig.ACKS_CONFIG, "all");
        // 请求超时
        props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 20000);
        return new KafkaProducer(props);
    }
}

消费者:

package com.baron.tranactions;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Properties;
import java.util.regex.Pattern;


public class KafkaConsumerTransactionsReadCommitted {
    public static void main(String[] args) {
        // 1、创建KafkaConsumer
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "CentOSA:9092,CentOSB:9092,CentOSC:9092");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        // 以组管理消费者
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "g1");

        // 设置消费者的消费事务的隔离级别, 读取已提交的数据

        props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");

        KafkaConsumer consumer = new KafkaConsumer<>(props);
        // 2、订阅相关的topics
        consumer.subscribe(Arrays.asList("topic01"));
        // 3、遍历消息队列
        while (true) {
            ConsumerRecords consumerRecords = consumer.poll(Duration.ofSeconds(1));
            if (!consumerRecords.isEmpty()) { // 从队列中取到了数据
                Iterator> it = consumerRecords.iterator();
                while (it.hasNext()) {
                    // 获取一个消息
                    ConsumerRecord record = it.next();
                    String topic = record.topic();
                    int partition = record.partition();
                    long offset = record.offset();

                    String key = record.key();
                    String value = record.value();
                    long timestamp = record.timestamp();
                    System.out.println(topic+"t"+partition+","+offset+"t"+key+""+value+timestamp);
                }
            }
        }

    }
}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/279535.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号