栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Kafka使用Avro序列化和反序列化

Kafka使用Avro序列化和反序列化

本文目录
  • 1.自定义序列化器与反序列化器
    • 1.1 定义Order实体类
    • 1.2 定义Order序列化类
    • 1.3 生产者代码
    • 1.4. 定义Order反序列化器
    • 1.5 消费者代码
  • 2. 使用Avro序列化和反序列化
    • 2.1 Apache Avro介绍
    • 2.2 创建Maven项目
    • 2.3 创建schema 文件
    • 2.4.Avro生成entity
    • 2.5 生产者代码
    • 2.6 消费者代码

1.自定义序列化器与反序列化器 1.1 定义Order实体类
public class Order implements Serializable {

      private Integer OrderId;

      private String title;

    public Order() {
    }

    public Order(Integer orderId, String title) {
        OrderId = orderId;
        this.title = title;
    }
   // 省略必要的get与set方法
}
1.2 定义Order序列化类

由于Kafka中的数据都是字节数组,在将消息发送到Kafka之前需要先将数据序列化为字节数组。 序列化器的作用就是用于序列化要发送的消息的。

Kafka使用 org.apache.kafka.common.serialization.Serializer 接口用于定义序列化器,将 泛型指定类型的数据转换为字节数组。

Kafka提供了如下常用类型的序列化类:

自定义序列化器需要实现org.apache.kafka.common.serialization.Serializer接口,并实现其中 的 serialize 方法。

public class OrderSerializer implements Serializer {

    @Override
    public byte[] serialize(String topic, Order data) {
        try {
            if (data == null) 
                return null;
            Integer orderId = data.getOrderId();
            String title = data.getTitle();
            int length = 0;
            byte[] bytes = null;
            if (null != title) {
                bytes = title.getBytes("utf-8");
                length = bytes.length;
            }
            //前4个字节保存orderId,
            //第二个4个字节保存title字段的长度
            ByteBuffer buffer = ByteBuffer.allocate(4 + 4 + length);
            buffer.putInt(orderId);
            buffer.putInt(length);
            buffer.put(bytes);
            return buffer.array();
        } catch (UnsupportedEncodingException e) {
            throw new SerializationException("序列化数据异常");
        }
    }
}
1.3 生产者代码
package com.warybee.c1;

import com.warybee.model.Order;
import com.warybee.serializer.OrderSerializer;
import org.apache.kafka.clients.producer.*;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;

public class KafkaProducerDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
        Map configs = new HashMap<>();
        // 设置连接Kafka的初始连接用到的服务器地址
        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "http://192.168.235.132:9092");
        // 设置key的序列化类
        configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerSerializer");
        // 设置自定义的序列化类
        configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, OrderSerializer.class);
        configs.put(ProducerConfig.ACKS_CONFIG,"all");
        KafkaProducer kafkaProducer=new KafkaProducer(configs);
        //定义order
        Order order=new Order();
        order.setOrderId(1);
        order.setTitle("iphone13 pro 256G");
        ProducerRecord producerRecord=new ProducerRecord
        (       "test_order_topic",
                0,
                order.getOrderId(),
                order);
        //消息的异步确认
        kafkaProducer.send(producerRecord, new Callback() {
            @Override
            public void onCompletion(Recordmetadata recordmetadata, Exception exception) {
                if (exception==null){
                    System.out.println("消息的主题:"+recordmetadata.topic());
                    System.out.println("消息的分区:"+recordmetadata.partition());
                    System.out.println("消息的偏移量:"+recordmetadata.offset());
                }else {
                    System.out.println("发送消息异常");
                }
            }
        });


        // 关闭生产者
        kafkaProducer.close();
    }
}

1.4. 定义Order反序列化器

自定义反序列化类,需要实现 org.apache.kafka.common.serialization.Deserializer 接 口,并且实现其中的deserialize方法。

package com.warybee.deserializer;

import com.warybee.model.Order;
import org.apache.kafka.common.serialization.Deserializer;

import java.nio.ByteBuffer;


public class OrderDeserializer implements Deserializer {

    @Override
    public Order deserialize(String topic, byte[] data) {
        ByteBuffer allocate = ByteBuffer.allocate(data.length);
        allocate.put(data);
        allocate.flip();
        Integer orderId = allocate.getInt();
        int length = allocate.getInt();
        String title = new String(data, 8, length);
        return new Order(orderId, title);
    }
}
1.5 消费者代码
package com.warybee.c1;

import com.warybee.deserializer.OrderDeserializer;
import com.warybee.model.Order;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;


public class KafkaConsumerDemo {

    public static void main(String[] args) {
        Map configs = new HashMap<>();
        // 设置连接Kafka的初始连接用到的服务器地址
        // 如果是集群,则可以通过此初始连接发现集群中的其他broker
        configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "http://192.168.235.132:9092");
        //KEY反序列化类
        configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerDeserializer");
        //自定义value反序列化类
        configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, OrderDeserializer.class);
        configs.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer.demo-3");
        configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        //创建消费者对象
        KafkaConsumer consumer = new KafkaConsumer(configs);

        List topics = new ArrayList<>();
        topics.add("test_order_topic");
        //消费者订阅主题
        consumer.subscribe(topics);
        while (true){
            //批量拉取主题消息,每3秒拉取一次
            ConsumerRecords records = consumer.poll(3000);
            //变量消息
            for (ConsumerRecord record : records) {
                System.out.println(record.topic() + "t"
                        + record.partition() + "t"
                        + record.offset() + "t"
                        + record.key() + "t"
                        + record.value().getTitle());

            }
        }
    }
}

上面实现的序列化与反序列化类,定义繁琐,不具有通用性,一不小心就会BUG满天飞,不适合在实际项目中使用,只做了解原理即可。接下来使用Apache Avro来实现序列化和反序列化

2. 使用Avro序列化和反序列化 2.1 Apache Avro介绍

Apache Avro是一种与编程语言无关的序列化格式。提供了一种共享数据文件的方式。Avro 数据通过与语言无关的 schema 来定义。schema 通过 JSON 来描述,数据被序列化成二进制文件或 JSON 文件,不过一般会使用二进制文件。Avro 在读写文件时需要用到 schema,schema 一般会被内嵌在数据文件里。

Apache Avro官网文档https://avro.apache.org/docs/current/gettingstartedjava.html

2.2 创建Maven项目
  • 引入依赖

        
                org.apache.kafka
                kafka-clients
                3.0.0
            
            
            
                ch.qos.logback
                logback-classic
                1.2.7
            
           
            
                org.apache.avro
                avro
                1.11.0
            
    
  • Avro 插件

    
            
                
                    org.apache.avro
                    avro-maven-plugin
                    1.11.0
                    
                        
                            generate-sources
                            
                                schema
                            
                            
                                ${project.basedir}/src/main/java/com/warybee/avro/schema/
                                ${project.basedir}/src/main/java/
                            
                        
                    
                
                
                    org.apache.maven.plugins
                    maven-compiler-plugin
                    
                        1.8
                        1.8
                    
                
            
        
    

Avro插件参数说明:

  • sourceDirectory:schema 文件所在目录
  • outputDirectory:根据schema 文件生成的类文件到哪个目录
2.3 创建schema 文件

Apache Avro schema 是使用 JSON 定义的。详细的介绍,可以参考官网文档:https://avro.apache.org/docs/current/gettingstartedjava.html

定义一个order.avsc文件(文件所在目录要与上一步配置的sourceDirectory保持一致),内容如下:

{"namespace": "com.warybee.avro",
  "type": "record",
  "name": "Order",
  "fields": [
    {"name": "orderId", "type": "int"},
    {"name": "title",  "type": "string"},
    {"name": "num", "type": "int"}
  ]
}

IDEA话,可以安装一个Apache Avro IDL Schema Support插件,安装插件后编写schema 有智能提示。

2.4.Avro生成entity

上面配置了Avro 插件,通过maven命令,生成即可。

mvn install

或者IDEA右键->RUN Maven->install

2.5 生产者代码
package com.warybee.avro;

import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.kafka.clients.producer.*;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;


public class KafkaProducerDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
        Map configs = new HashMap<>();
        // 设置连接Kafka的初始连接用到的服务器地址
        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "http://192.168.235.132:9092");
        // 设置key的序列化类
        configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerSerializer");
        // 设置value的序列化类
        configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
        configs.put(ProducerConfig.ACKS_CONFIG,"all");
        KafkaProducer producer=new KafkaProducer(configs);
        //发送100条消息
        for (int i = 0; i < 100; i++) {
             Order order=Order.newBuilder()
                     .setOrderId(i+1)
                     .setTitle("订单: "+(i+1)+" iphone 13 pro 256G")
                     .setNum(1)
                     .build();
            ByteArrayOutputStream out = new ByteArrayOutputStream();
            BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, (BinaryEncoder)null);
            SpecificDatumWriter writer = new SpecificDatumWriter(order.getSchema());
            try {
                writer.write(order, encoder);
                encoder.flush();
                out.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
           ProducerRecord record=new ProducerRecord<>(
                   "test_avro_topic",
                   0,
                   order.getOrderId(),
                   out.toByteArray());
            //发送消息
            producer.send(record, new Callback() {
                @Override
                public void onCompletion(Recordmetadata metadata, Exception exception) {
                      if (exception==null){
                          System.out.println("消息的主题:"+metadata.topic());
                          System.out.println("消息的分区:"+metadata.partition());
                          System.out.println("消息的偏移量:"+metadata.offset());
                      }else {
                          System.out.println("发送消息异常");
                      }
                }
            });
        }
        // 关闭生产者
        producer.close();
    }
}
2.6 消费者代码
package com.warybee.avro;

import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;


public class KafkaConsumerDemo {

    public static void main(String[] args) {
        Map configs = new HashMap<>();
        // 设置连接Kafka的初始连接用到的服务器地址
        // 如果是集群,则可以通过此初始连接发现集群中的其他broker
        configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "http://192.168.235.132:9092");
        //KEY反序列化类
        configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerDeserializer");
        //value反序列化类
        configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        //消费者所在的组ID
        configs.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer.demo.avro");
        configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        //创建消费者对象
        KafkaConsumer consumer = new KafkaConsumer(configs);

        List topics = new ArrayList<>();
        topics.add("test_avro_topic");
        //消费者订阅主题
        consumer.subscribe(topics);

        SpecificDatumReader reader = new SpecificDatumReader<>(Order.getClassSchema());
        try {
            while (true){
                //批量拉取主题消息,每3秒拉取一次
                ConsumerRecords records = consumer.poll(3000);
                for (ConsumerRecord record : records) {
                    Decoder decoder = DecoderFactory.get().binaryDecoder(record.value(), null);
                    Order order=null;
                    try {
                        order=reader.read(null,decoder);
                        System.out.println("订单ID:"+order.getOrderId()+"t"
                                +"订单标题:"+order.getTitle()+"t"
                                +"数量:"+order.getNum());
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
        }finally {
            consumer.close();
        }
    }
}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/673604.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号