- 1.自定义序列化器与反序列化器
- 1.1 定义Order实体类
- 1.2 定义Order序列化类
- 1.3 生产者代码
- 1.4. 定义Order反序列化器
- 1.5 消费者代码
- 2. 使用Avro序列化和反序列化
- 2.1 Apache Avro介绍
- 2.2 创建Maven项目
- 2.3 创建schema 文件
- 2.4.Avro生成entity
- 2.5 生产者代码
- 2.6 消费者代码
public class Order implements Serializable {
private Integer OrderId;
private String title;
public Order() {
}
public Order(Integer orderId, String title) {
OrderId = orderId;
this.title = title;
}
// 省略必要的get与set方法
}
1.2 定义Order序列化类
由于Kafka中的数据都是字节数组,在将消息发送到Kafka之前需要先将数据序列化为字节数组。 序列化器的作用就是用于序列化要发送的消息的。
Kafka使用 org.apache.kafka.common.serialization.Serializer 接口用于定义序列化器,将 泛型指定类型的数据转换为字节数组。
Kafka提供了如下常用类型的序列化类:
自定义序列化器需要实现org.apache.kafka.common.serialization.Serializer接口,并实现其中 的 serialize 方法。
public class OrderSerializer implements Serializer1.3 生产者代码{ @Override public byte[] serialize(String topic, Order data) { try { if (data == null) return null; Integer orderId = data.getOrderId(); String title = data.getTitle(); int length = 0; byte[] bytes = null; if (null != title) { bytes = title.getBytes("utf-8"); length = bytes.length; } //前4个字节保存orderId, //第二个4个字节保存title字段的长度 ByteBuffer buffer = ByteBuffer.allocate(4 + 4 + length); buffer.putInt(orderId); buffer.putInt(length); buffer.put(bytes); return buffer.array(); } catch (UnsupportedEncodingException e) { throw new SerializationException("序列化数据异常"); } } }
package com.warybee.c1;
import com.warybee.model.Order;
import com.warybee.serializer.OrderSerializer;
import org.apache.kafka.clients.producer.*;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
public class KafkaProducerDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
Map configs = new HashMap<>();
// 设置连接Kafka的初始连接用到的服务器地址
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "http://192.168.235.132:9092");
// 设置key的序列化类
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerSerializer");
// 设置自定义的序列化类
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, OrderSerializer.class);
configs.put(ProducerConfig.ACKS_CONFIG,"all");
KafkaProducer kafkaProducer=new KafkaProducer(configs);
//定义order
Order order=new Order();
order.setOrderId(1);
order.setTitle("iphone13 pro 256G");
ProducerRecord producerRecord=new ProducerRecord
( "test_order_topic",
0,
order.getOrderId(),
order);
//消息的异步确认
kafkaProducer.send(producerRecord, new Callback() {
@Override
public void onCompletion(Recordmetadata recordmetadata, Exception exception) {
if (exception==null){
System.out.println("消息的主题:"+recordmetadata.topic());
System.out.println("消息的分区:"+recordmetadata.partition());
System.out.println("消息的偏移量:"+recordmetadata.offset());
}else {
System.out.println("发送消息异常");
}
}
});
// 关闭生产者
kafkaProducer.close();
}
}
1.4. 定义Order反序列化器
自定义反序列化类,需要实现 org.apache.kafka.common.serialization.Deserializer 接 口,并且实现其中的deserialize方法。
package com.warybee.deserializer; import com.warybee.model.Order; import org.apache.kafka.common.serialization.Deserializer; import java.nio.ByteBuffer; public class OrderDeserializer implements Deserializer1.5 消费者代码{ @Override public Order deserialize(String topic, byte[] data) { ByteBuffer allocate = ByteBuffer.allocate(data.length); allocate.put(data); allocate.flip(); Integer orderId = allocate.getInt(); int length = allocate.getInt(); String title = new String(data, 8, length); return new Order(orderId, title); } }
package com.warybee.c1;
import com.warybee.deserializer.OrderDeserializer;
import com.warybee.model.Order;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class KafkaConsumerDemo {
public static void main(String[] args) {
Map configs = new HashMap<>();
// 设置连接Kafka的初始连接用到的服务器地址
// 如果是集群,则可以通过此初始连接发现集群中的其他broker
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "http://192.168.235.132:9092");
//KEY反序列化类
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerDeserializer");
//自定义value反序列化类
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, OrderDeserializer.class);
configs.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer.demo-3");
configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
//创建消费者对象
KafkaConsumer consumer = new KafkaConsumer(configs);
List topics = new ArrayList<>();
topics.add("test_order_topic");
//消费者订阅主题
consumer.subscribe(topics);
while (true){
//批量拉取主题消息,每3秒拉取一次
ConsumerRecords records = consumer.poll(3000);
//变量消息
for (ConsumerRecord record : records) {
System.out.println(record.topic() + "t"
+ record.partition() + "t"
+ record.offset() + "t"
+ record.key() + "t"
+ record.value().getTitle());
}
}
}
}
上面实现的序列化与反序列化类,定义繁琐,不具有通用性,一不小心就会BUG满天飞,不适合在实际项目中使用,只做了解原理即可。接下来使用Apache Avro来实现序列化和反序列化
2. 使用Avro序列化和反序列化 2.1 Apache Avro介绍Apache Avro是一种与编程语言无关的序列化格式。提供了一种共享数据文件的方式。Avro 数据通过与语言无关的 schema 来定义。schema 通过 JSON 来描述,数据被序列化成二进制文件或 JSON 文件,不过一般会使用二进制文件。Avro 在读写文件时需要用到 schema,schema 一般会被内嵌在数据文件里。
Apache Avro官网文档https://avro.apache.org/docs/current/gettingstartedjava.html
2.2 创建Maven项目-
引入依赖
org.apache.kafka kafka-clients 3.0.0 ch.qos.logback logback-classic 1.2.7 org.apache.avro avro 1.11.0 -
Avro 插件
org.apache.avro avro-maven-plugin 1.11.0 generate-sources schema ${project.basedir}/src/main/java/com/warybee/avro/schema/ ${project.basedir}/src/main/java/ org.apache.maven.plugins maven-compiler-plugin 1.8 1.8
Avro插件参数说明:
- sourceDirectory:schema 文件所在目录
- outputDirectory:根据schema 文件生成的类文件到哪个目录
Apache Avro schema 是使用 JSON 定义的。详细的介绍,可以参考官网文档:https://avro.apache.org/docs/current/gettingstartedjava.html
定义一个order.avsc文件(文件所在目录要与上一步配置的sourceDirectory保持一致),内容如下:
{"namespace": "com.warybee.avro",
"type": "record",
"name": "Order",
"fields": [
{"name": "orderId", "type": "int"},
{"name": "title", "type": "string"},
{"name": "num", "type": "int"}
]
}
IDEA话,可以安装一个Apache Avro IDL Schema Support插件,安装插件后编写schema 有智能提示。
2.4.Avro生成entity上面配置了Avro 插件,通过maven命令,生成即可。
mvn install
或者IDEA右键->RUN Maven->install
2.5 生产者代码package com.warybee.avro;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.kafka.clients.producer.*;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
public class KafkaProducerDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
Map configs = new HashMap<>();
// 设置连接Kafka的初始连接用到的服务器地址
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "http://192.168.235.132:9092");
// 设置key的序列化类
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerSerializer");
// 设置value的序列化类
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
configs.put(ProducerConfig.ACKS_CONFIG,"all");
KafkaProducer producer=new KafkaProducer(configs);
//发送100条消息
for (int i = 0; i < 100; i++) {
Order order=Order.newBuilder()
.setOrderId(i+1)
.setTitle("订单: "+(i+1)+" iphone 13 pro 256G")
.setNum(1)
.build();
ByteArrayOutputStream out = new ByteArrayOutputStream();
BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, (BinaryEncoder)null);
SpecificDatumWriter writer = new SpecificDatumWriter(order.getSchema());
try {
writer.write(order, encoder);
encoder.flush();
out.close();
} catch (IOException e) {
e.printStackTrace();
}
ProducerRecord record=new ProducerRecord<>(
"test_avro_topic",
0,
order.getOrderId(),
out.toByteArray());
//发送消息
producer.send(record, new Callback() {
@Override
public void onCompletion(Recordmetadata metadata, Exception exception) {
if (exception==null){
System.out.println("消息的主题:"+metadata.topic());
System.out.println("消息的分区:"+metadata.partition());
System.out.println("消息的偏移量:"+metadata.offset());
}else {
System.out.println("发送消息异常");
}
}
});
}
// 关闭生产者
producer.close();
}
}
2.6 消费者代码
package com.warybee.avro;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class KafkaConsumerDemo {
public static void main(String[] args) {
Map configs = new HashMap<>();
// 设置连接Kafka的初始连接用到的服务器地址
// 如果是集群,则可以通过此初始连接发现集群中的其他broker
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "http://192.168.235.132:9092");
//KEY反序列化类
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerDeserializer");
//value反序列化类
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
//消费者所在的组ID
configs.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer.demo.avro");
configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
//创建消费者对象
KafkaConsumer consumer = new KafkaConsumer(configs);
List topics = new ArrayList<>();
topics.add("test_avro_topic");
//消费者订阅主题
consumer.subscribe(topics);
SpecificDatumReader reader = new SpecificDatumReader<>(Order.getClassSchema());
try {
while (true){
//批量拉取主题消息,每3秒拉取一次
ConsumerRecords records = consumer.poll(3000);
for (ConsumerRecord record : records) {
Decoder decoder = DecoderFactory.get().binaryDecoder(record.value(), null);
Order order=null;
try {
order=reader.read(null,decoder);
System.out.println("订单ID:"+order.getOrderId()+"t"
+"订单标题:"+order.getTitle()+"t"
+"数量:"+order.getNum());
} catch (IOException e) {
e.printStackTrace();
}
}
}
}finally {
consumer.close();
}
}
}



