在这里,您有一个示例,将自己的序列化器/解串器用于Kafka消息值。对于Kafka消息密钥是同一回事。
我们希望将MyMessage的序列化版本作为Kafka值发送,并再次将其反序列化为使用方的MyMessage对象。
在生产者端序列化MyMessage。
您应该创建一个实现org.apache.kafka.common.serialization.Serializer的序列化器类。
serialize() 方法可以完成工作,接收对象并以字节数组形式返回序列化的版本。
public class MyValueSerializer implements Serializer<MyMessage>{ private boolean isKey; @Override public void configure(Map<String, ?> configs, boolean isKey) { this.isKey = isKey; } @Override public byte[] serialize(String topic, MyMessage message) { if (message == null) { return null; } try { (serialize your MyMessage object into bytes) return bytes; } catch (IOException | RuntimeException e) { throw new SerializationException("Error serializing value", e); } } @Override public void close() { }}final IntegerSerializer keySerializer = new IntegerSerializer();final MyValueSerializer myValueSerializer = new MyValueSerializer();final KafkaProducer<Integer, MyMessage> producer = new KafkaProducer<>(props, keySerializer, myValueSerializer);int messageNo = 1;int kafkaKey = messageNo;MyMessage kafkaValue = new MyMessage();ProducerRecord producerRecord = new ProducerRecord<>(topic, kafkaKey, kafkaValue);producer.send(producerRecord, new DemoCallBack(logTag, startTime, messageNo, strValue));在用户端反序列化MyMessage。
您应该创建一个实现org.apache.kafka.common.serialization.Deserializer的反序列化器类。
deserialize() 方法可以完成工作,以字节数组形式接收序列化的值并返回您的对象。
public class MyValueDeserializer implements Deserializer<MyMessage>{ private boolean isKey; @Override public void configure(Map<String, ?> configs, boolean isKey) { this.isKey = isKey; } @Override public MyMessage deserialize(String s, byte[] value) { if (value == null) { return null; } try { (deserialize value into your MyMessage object) MyMessage message = new MyMessage(); return message; } catch (IOException | RuntimeException e) { throw new SerializationException("Error deserializing value", e); } } @Override public void close() { }}然后像这样使用它:
final IntegerDeserializer keyDeserializer = new IntegerDeserializer();final MyValueDeserializer myValueDeserializer = new MyValueDeserializer();final KafkaConsumer<Integer, MyMessage> consumer = new KafkaConsumer<>(props, keyDeserializer, myValueDeserializer);ConsumerRecords<Integer, MyMessage> records = consumer.poll(1000);for (ConsumerRecord<Integer, MyMessage> record : records) { int kafkaKey = record.key(); MyMessage kafkaValue = record.value(); ...}


