栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

2022-08-09 Kafka API

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

2022-08-09 Kafka API

Kafka API

一、消息发布流程

 二、Producer

1. 准备工作

(1) 引入依赖

    
        
            org.apache.kafka
            kafka-clients
            2.4.1
        
        
            org.apache.logging.log4j
            log4j-slf4j-impl
            2.12.0
        
    

(2) 于resources下写配置文件log4j2.xml



    
        
        
            
            
        

    

    
        
        
            
        

        
        
            
        
    


2. 获取Producer操作对象

配置项

        
        Properties prop = new Properties();
        // 配置要连接的kafka集群地址
        prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop101:9092,hadoop102:9092,hadoop103:9092");
        // 配置响应等级:0 -> Leader 收到就响应; 1 -> Leader 收到落盘就响应; -1/all -> 所有Partition落盘再响应;
        prop.put(ProducerConfig.ACKS_CONFIG, "-1");
        // 设置重试次数,当Producer发送消息失败时重试的次数
        prop.put(ProducerConfig.RETRIES_CONFIG, "5");
        // 消息对应的键值的序列化器,这里采用了字符串的序列化器
        prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        // 消息本身对应的序列化器,这里采用了字符串的序列化器
        prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        
        KafkaProducer producer = new KafkaProducer<>(prop);

3. 拦截器

拦截器代码如下:

import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.time.Instant;
import java.util.Map;


public class TimeStampInterceptor implements ProducerInterceptor {
    
    @Override
    public ProducerRecord onSend(ProducerRecord producerRecord) {
        
        long now = Instant.now().getEpochSecond();
        String timestampProp = "timestamp:" + now + "n";
        return new ProducerRecord<>(producerRecord.topic(), producerRecord.partition(), producerRecord.key(), timestampProp + producerRecord.value());
    }

    
    @Override
    public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {
        if (e == null){
            // 消息发送成功,即接受到了ack
            StringBuilder stringBuilder = new StringBuilder();
            stringBuilder.append("消息发往的主题为:" + recordMetadata.topic()).append("n");
            stringBuilder.append("消息发往的分区位:" + recordMetadata.partition()).append("n");
            stringBuilder.append("消息逻辑偏移量为:" + recordMetadata.offset()).append("n");
            System.out.print(stringBuilder.toString());
        }else {
            // 消息发送失败,抛出了异常
            System.out.println("消息发送失败:" + e.getMessage());
        }
    }

    
    @Override
    public void close() {

    }

    
    @Override
    public void configure(Map map) {

    }
}

需要在生产者的配置对象中配置拦截器:

        // 指定拦截器链,需要将所有拦截器放在一个列表中
        List interceptors = new ArrayList<>();
        interceptors.add(TimeStampInterceptor.class.getName());
        prop.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);

4. 分区器

分区器代码如下:

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

public class TimeStampPartitioner implements Partitioner {
    
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        // topic中的分区个数
        int partitions = cluster.availablePartitionsForTopic(topic).size();
        Pattern pattern = Pattern.compile(".+=[0-9]+[^\n]");
        Matcher matcher = pattern.matcher((CharSequence) value);
        if (matcher.find()){
            long timestamp = Long.parseLong(matcher.group().split("=")[1]);
            return (int) (timestamp % partitions);
        }
        // 默认到0号分区
        return 0;
    }

    
    @Override
    public void close() {

    }

    
    @Override
    public void configure(Map configs) {

    }
}

需要在生产者的配置对象中配置默认使用的分区器:

        // 指定分区器,配置后默认分区器不再使用粘性分区,而是使用自定义的分区器
        prop.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, TimeStampPartitioner.class.getName());

5. 发送消息

(1) 不带回调 + 异步发送

        
        String baseInfo = "producer produce message-";
        for (int i = 0; i < 10; i++) {
            // 不带回调,异步发送:不管上一条消息有没有成功落盘,上一条消息发送出去后立刻发送下一条消息
            // 使用粘性分区策略或者自定义分区策略(如果要使用自定义的话,必须先配置好分区器)
            producer.send(new ProducerRecord<>("test1", baseInfo + i));
        }

(2) 带回调 + 异步发送

        
        String baseInfo = "producer produce message-";
        for (int i = 0; i < 10; i++) {
            // 带回调,异步发送:不管上一条消息有没有成功落盘,上一条消息发送出去后立刻发送下一条消息
            // 指定key值,使用key值的哈希码区域分区数决定所属分区
            producer.send(new ProducerRecord<>("test2", UUID.randomUUID().toString(), baseInfo + i), new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    
                    if (exception == null){
                        // 消息成功落盘,返回响应
                        String pattern = "消息发往主题为:{0}; 消息发往分区为:{1}; 消息逻辑偏移量为:{2};";
                        String message = MessageFormat.format(pattern, metadata.topic(), metadata.partition(), metadata.offset());
                        System.out.println(message);

                    }else {
                        // 消息落盘失败,发生异常
                        System.out.println("消息落盘失败,抛出异常为:" + exception.getMessage());
                    }
                }
            });

(3) 不带回调 + 同步发送

        
        String baseInfo = "producer produce message-";
        for (int i = 0; i < 10; i++) {
            // 不带回调,同步发送:当上一条消息成功落盘并返回响应时(响应之前当前线程处于阻塞状态),才会继续发送下一条消息
            // 指定key值,使用key值的哈希码区域分区数决定所属分区
            Future future = producer.send(new ProducerRecord<>("test2", UUID.randomUUID().toString(), baseInfo + i));

            // get()方法等待返回结果,返回之前当前线程处于阻塞
            future.get();
        }

(4) 带回调 + 同步发送

        
        String baseInfo = "producer produce message-";
        for (int i = 0; i < 10; i++) {
            // 带回调,同步发送:当上一条消息成功落盘并返回响应时(响应之前当前线程处于阻塞状态),才会继续发送下一条消息
            // 发送到指定分区
            Future future = producer.send(new ProducerRecord<>("test2", i % 3, UUID.randomUUID().toString(), baseInfo + i), new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    
                    if (exception == null) {
                        // 消息成功落盘,返回响应
                        String pattern = "消息发往主题为:{0}; 消息发往分区为:{1}; 消息逻辑偏移量为:{2};";
                        String message = MessageFormat.format(pattern, metadata.topic(), metadata.partition(), metadata.offset());
                        System.out.println(message);

                    } else {
                        // 消息落盘失败,发生异常
                        System.out.println("消息落盘失败,抛出异常为:" + exception.getMessage());
                    }
                }
            });

            // get()方法等待返回结果,返回之前当前线程处于阻塞
            future.get();
        }

6. 关闭Producer对象

        
        producer.close();
三、Consumer

什么时候会offset重置:

        新的消费者组去消费主题时;

        消费者组之前消费到主题某一位置的消息已被删除;

1. 准备工作

(1) 引入依赖

    
        
            org.apache.kafka
            kafka-clients
            2.4.1
        
        
            org.apache.logging.log4j
            log4j-slf4j-impl
            2.12.0
        
    

(2) 于resources下写配置文件log4j2.xml



    
        
        
            
            
        

    

    
        
        
            
        

        
        
            
        
    


2. 获取Consumer操作对象

配置项

        
        Properties prop = new Properties();
        // 指定要连接的zk服务器地址
        prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop101:9092,hadoop102:9092,hadoop103:9092");
        // 指定消费者所属的消费者组
        prop.put(ConsumerConfig.GROUP_ID_CONFIG, "tp2");
        // 配置offser自动提交
        prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
        // offset 重置的位置:latest(每个分区中最后的逻辑偏移处), earliest(每个分区中最开始的逻辑偏移处,不一定为0,因为可能之前的消息已经被自动删除了)
        prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        // 配置反序列化器,此处为字符串的反序列化器
        prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        
        KafkaConsumer consumer = new KafkaConsumer(prop);

3. 订阅并消费消息

        
        Pattern pattern = Pattern.compile("^test[0-9]+$");
        consumer.subscribe(pattern);
        boolean continued = true;
        String messagePattern = "消息来自主题为:{0}; 消息来自分区为:{1}; 消息的逻辑偏移量为:{2}";
        while (continued){
            // 让消费者每隔2s去主题内消费一部分消息,防止频繁去主题中消费消息,从而导致效率缓慢
            ConsumerRecords consumerRecords = consumer.poll(Duration.ofSeconds(2));
            for (ConsumerRecord record : consumerRecords) {
                // test方法是自定义的,用来判断record是否满足条件,主要作用是用来指定满足某种条件后,终止循环的
                continued = test(record);
                if (!continued){
                    break;
                }
                System.out.println(MessageFormat.format(messagePattern, record.topic(), record.partition(), record.offset()));
            }
        }

4. Offset提交方式

(1) 自动提交

配置对象指定自动提交即可:

        
        Properties prop = new Properties();
        // 指定要连接的zk服务器地址
        prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop101:9092,hadoop102:9092,hadoop103:9092");
        // 指定消费者所属的消费者组
        prop.put(ConsumerConfig.GROUP_ID_CONFIG, "tp2");
        // 配置offser自动提交,每隔一段时间自动提交
        prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
        // offset 重置的位置:latest(每个分区中最后的逻辑偏移处), earliest(每个分区中最开始的逻辑偏移处,不一定为0,因为可能之前的消息已经被自动删除了)
        prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        // 配置反序列化器,此处为字符串的反序列化器
        prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

(2) 异步手动提交

首先需要关闭自动提交:

        
        Properties prop = new Properties();
        // 指定要连接的zk服务器地址
        prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop101:9092,hadoop102:9092,hadoop103:9092");
        // 指定消费者所属的消费者组
        prop.put(ConsumerConfig.GROUP_ID_CONFIG, "tp2");
        // 配置offser自动提交
        prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        // offset 重置的位置:latest(每个分区中最后的逻辑偏移处), earliest(每个分区中最开始的逻辑偏移处,不一定为0,因为可能之前的消息已经被自动删除了)
        prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        // 配置反序列化器,此处为字符串的反序列化器
        prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

消费消息后异步手动提交:

        
        Pattern pattern = Pattern.compile("^test[0-9]+$");
        consumer.subscribe(pattern);
        boolean continued = true;
        String messagePattern = "消息来自主题为:{0}; 消息来自分区为:{1}; 消息的逻辑偏移量为:{2}";
        while (continued){
            // 让消费者每隔2s去主题内消费一部分消息,防止频繁去主题中消费消息,从而导致效率缓慢
            ConsumerRecords consumerRecords = consumer.poll(Duration.ofSeconds(2));
            for (ConsumerRecord record : consumerRecords) {
                // test方法是自定义的,用来判断record是否满足条件,主要作用是用来指定满足某种条件后,终止循环的
                continued = test(record);
                if (!continued){
                    break;
                }
                System.out.println(MessageFormat.format(messagePattern, record.topic(), record.partition(), record.offset()));
            }
            // 异步手动提交
            consumer.commitAsync();
        }

(3) 同步手动提交

首先需要关闭自动提交:

        
        Properties prop = new Properties();
        // 指定要连接的zk服务器地址
        prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop101:9092,hadoop102:9092,hadoop103:9092");
        // 指定消费者所属的消费者组
        prop.put(ConsumerConfig.GROUP_ID_CONFIG, "tp2");
        // 配置offser自动提交
        prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        // offset 重置的位置:latest(每个分区中最后的逻辑偏移处), earliest(每个分区中最开始的逻辑偏移处,不一定为0,因为可能之前的消息已经被自动删除了)
        prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        // 配置反序列化器,此处为字符串的反序列化器
        prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

消费消息后同步手动提交:

        
        Pattern pattern = Pattern.compile("^test[0-9]+$");
        consumer.subscribe(pattern);
        boolean continued = true;
        String messagePattern = "消息来自主题为:{0}; 消息来自分区为:{1}; 消息的逻辑偏移量为:{2}";
        while (continued){
            // 让消费者每隔2s去主题内消费一部分消息,防止频繁去主题中消费消息,从而导致效率缓慢
            ConsumerRecords consumerRecords = consumer.poll(Duration.ofSeconds(2));
            for (ConsumerRecord record : consumerRecords) {
                // test方法是自定义的,用来判断record是否满足条件,主要作用是用来指定满足某种条件后,终止循环的
                continued = test(record);
                if (!continued){
                    break;
                }
                System.out.println(MessageFormat.format(messagePattern, record.topic(), record.partition(), record.offset()));
            }
            // 异步手动提交
            consumer.commitSync();
        }

5. 关闭Consumer对象

        
        consumer.close();
四、案例

1. 场景:

现在有三个Topic分别为:baidu、iqiyi、other,每一个Topic都拥有两个分区;分别启动一个生产者和一个消费者,生产者负责产生网址消息,并要求每一条网址消息都要附加上时间戳,将时间戳范围在0~11点的消息发往0号分区,11~24点的消息发往1号分区,消费者负责消费多个主题中的数据。

2. 准备工作

(1) 引入依赖

    
        
            org.apache.kafka
            kafka-clients
            2.4.1
        
        
            org.apache.logging.log4j
            log4j-slf4j-impl
            2.12.0
        
    

(2) 于resources下写配置文件log4j2.xml



    
        
        
            
            
        

    

    
        
        
            
        

        
        
            
        
    


(3) 创建主题

kafka-ops.sh topics --create --topic baidu --partitions 2 --replication-factor 3
kafka-ops.sh topics --create --topic iqiyi--partitions 2 --replication-factor 3
kafka-ops.sh topics --create --topic other--partitions 2 --replication-factor 3

3. 编码

拦截器TimestampInterceptor类的编写,用于为每一条消息添加时间戳:

import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.time.Instant;
import java.util.Map;

public class TimestampInterceptor implements ProducerInterceptor {
    @Override
    public ProducerRecord onSend(ProducerRecord record) {
        Long now = Instant.now().getEpochSecond();
        return new ProducerRecord<>(record.topic(), record.partition(), record.key(), "timestamp=" + now + "n" + record.value());
    }

    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map configs) {

    }
}

分区器TimestampPartitioner类编写,用于将消息分区存放:

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;

import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

public class TimestampPartitioner implements Partitioner {
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        int partition = -1;
        Pattern pattern = Pattern.compile(".+=[0-9]+[^\n]");
        Matcher matcher = pattern.matcher((CharSequence) value);
        if (matcher.find()){
            Long timestamp = Long.parseLong(matcher.group().split("=")[1]);
            Instant instant = Instant.ofEpochSecond(timestamp);
            LocalDateTime dateTime = LocalDateTime.ofInstant(instant, ZoneId.systemDefault());
            if (dateTime.getHour() >= 0 && dateTime.getHour() <= 11){
                partition = 0;
            }else {
                partition = 1;
            }
        }
        return partition;
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map configs) {

    }
}

生产者代码:

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;
import java.util.UUID;

public class WebsiteProducer {
    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop101:9092,hadoop102:9092,hadoop103:9092");
        properties.put(ProducerConfig.RETRIES_CONFIG, "5");
        properties.put(ProducerConfig.ACKS_CONFIG, "all");
        properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, TimestampInterceptor.class.getName());
        properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, TimestampPartitioner.class.getName());
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        KafkaProducer producer = new KafkaProducer<>(properties);
        String topic = "";
        String website = "";
        for (int i = 0; i < 1000; i++) {
            if (i % 3 == 0){
                topic = "baidu";
                website = "https://www.baidu.com/" + UUID.randomUUID().toString();
            }else if (i % 3 == 1){
                topic = "iqiyi";
                website = "https://www.iqiyi.com/" + UUID.randomUUID().toString();
            }else {
                topic = "other";
                website = "https://www." + UUID.randomUUID().toString() + ".com/" + UUID.randomUUID().toString();
            }
            producer.send(new ProducerRecord<>(topic, website));
        }
        producer.close();
    }
}

消费者代码:

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.text.MessageFormat;
import java.time.Duration;
import java.util.Properties;
import java.util.regex.Pattern;

public class WebsiteConsumer {
    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop101:9092.hadoop102:9092,hadoop103:9092");
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "tp");
        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        KafkaConsumer consumer = new KafkaConsumer<>(properties);
        consumer.subscribe(Pattern.compile("(baidu|iqiyi|other)"));
        String pattern = "topic: {0}; partition: {1}; offset: {2}; value: {3}";
        boolean continued = true;
        while (continued){
            ConsumerRecords records = consumer.poll(Duration.ofSeconds(2));
            for (ConsumerRecord record : records) {
                continued = test(record);
                if (!continued){
                    break;
                }
                System.out.println(MessageFormat.format(pattern, record.topic(), record.partition(), record.offset(), record.value()));
            }
        }

        consumer.close();
    }

    public static boolean test(ConsumerRecord record){
        return true;
    }
}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/1039327.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号