Avro是一种与编程语言无关的序列化格式
丰富的数据结构紧凑快速的二进制数据格式提供容器文件,用来持久化数据远程过程调用与动态语言充分集成,代码生成不需要读写数据文件,也不需要实现RPC协议avro依靠schema
pom.xml
org.apache.kafka kafka-clients 2.4.1 org.apache.avro avro 1.11.0 org.apache.maven.plugins maven-compiler-plugin 3.5.1 1.8 1.8 org.apache.avro avro-maven-plugin 1.11.0 generate-resources schema ${project.basedir}/src/main/avro/ ${project.basedir}/src/main/java/
定义schema文件
People.avsc
{
"namespace": "com.nq",
"type": "record",
"name": "People",
"fields" : [
{"name": "name", "type":"string"},
{"name": "age", "type":"int"},
{"name": "hasHouse", "type":"boolean"},
{"name": "children","type":"string" }
]
}
运行mvn avro:schema 生成的java类在项目文件在/target/generated-sources/avro/com/nq/People.java
复制到自己需要的地方
此外还可以下载avro-tools 生成实体类
生产者
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.Properties;
public class AvroProducer {
public static void main(String[] args) throws IOException {
Properties props = new Properties();
String topic = "test-vip";
// 改成自己的
props.put("bootstrap.servers", "kafka-node01:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
KafkaProducer producer = new KafkaProducer(props);
ByteArrayOutputStream out = new ByteArrayOutputStream();
SpecificDatumWriter datumWriter = new SpecificDatumWriter<>(People.class);
BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, null);
for (int i = 0; i < 1000; i++) {
out.reset();
People people = new People();
people.setName("达拉崩吧---" + i);
people.setAge(i);
people.setChildren("chilren===" + i);
people.setHasHouse(i % 2 == 0);
datumWriter.write(people, encoder);
encoder.flush();
ProducerRecord record = new ProducerRecord<>(topic, "vip-" + i, out.toByteArray());
producer.send(record);
}
out.close();
producer.close();
}
}
消费者
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.io.IOException;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class AvroConsumer {
public static void main(String[] args) {
Properties props = new Properties();
String topic = "test-vip";
// 改成自己的
props.put("bootstrap.servers", "kafka-node01:9092");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
props.put("group.id", "avro-test");
props.put("auto.offset.reset","latest");
KafkaConsumer consumer = new KafkaConsumer(props);
SpecificDatumReader datumReader = new SpecificDatumReader<>(People.getClassSchema());
consumer.subscribe(Collections.singletonList(topic));
try {
while (true){
ConsumerRecords records = consumer.poll(Duration.ofMillis(1000L));
for (ConsumerRecord record : records) {
BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(record.value(), null);
People people = null;
try {
people = datumReader.read(null, decoder);
} catch (IOException e) {
e.printStackTrace();
}
System.out.println("key: " + record.key()+"t" + people);
}
}
} finally {
consumer.close();
}
}
}



