栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 面试经验 > 面试问答

如何在kafka中创建自定义序列化程序?

面试问答 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

如何在kafka中创建自定义序列化程序?

在这里,您有一个示例,将自己的序列化器/解串器用于Kafka消息值。对于Kafka消息密钥是同一回事。

我们希望将MyMessage的序列化版本作为Kafka值发送,并再次将其反序列化为使用方的MyMessage对象。

在生产者端序列化MyMessage。

您应该创建一个实现org.apache.kafka.common.serialization.Serializer的序列化器类。

serialize() 方法可以完成工作,接收对象并以字节数组形式返回序列化的版本。

public class MyValueSerializer implements Serializer<MyMessage>{    private boolean isKey;    @Override    public void configure(Map<String, ?> configs, boolean isKey)    {        this.isKey = isKey;    }    @Override    public byte[] serialize(String topic, MyMessage message)    {        if (message == null) { return null;        }        try { (serialize your MyMessage object into bytes) return bytes;        } catch (IOException | RuntimeException e) { throw new SerializationException("Error serializing value", e);        }    }    @Override    public void close()    {    }}final IntegerSerializer keySerializer = new IntegerSerializer();final MyValueSerializer myValueSerializer = new MyValueSerializer();final KafkaProducer<Integer, MyMessage> producer = new KafkaProducer<>(props, keySerializer, myValueSerializer);int messageNo = 1;int kafkaKey = messageNo;MyMessage kafkaValue = new MyMessage();ProducerRecord producerRecord = new ProducerRecord<>(topic, kafkaKey, kafkaValue);producer.send(producerRecord, new DemoCallBack(logTag, startTime, messageNo, strValue));

在用户端反序列化MyMessage。

您应该创建一个实现org.apache.kafka.common.serialization.Deserializer的反序列化器类。

deserialize() 方法可以完成工作,以字节数组形式接收序列化的值并返回您的对象。

public class MyValueDeserializer implements Deserializer<MyMessage>{    private boolean isKey;    @Override    public void configure(Map<String, ?> configs, boolean isKey)    {        this.isKey = isKey;    }    @Override    public MyMessage deserialize(String s, byte[] value)    {        if (value == null) { return null;        }        try { (deserialize value into your MyMessage object) MyMessage message = new MyMessage(); return message;        } catch (IOException | RuntimeException e) { throw new SerializationException("Error deserializing value", e);        }    }    @Override    public void close()    {    }}

然后像这样使用它:

final IntegerDeserializer keyDeserializer = new IntegerDeserializer();final MyValueDeserializer myValueDeserializer = new MyValueDeserializer();final KafkaConsumer<Integer, MyMessage> consumer = new KafkaConsumer<>(props, keyDeserializer, myValueDeserializer);ConsumerRecords<Integer, MyMessage> records = consumer.poll(1000);for (ConsumerRecord<Integer, MyMessage> record : records) {    int kafkaKey = record.key();    MyMessage kafkaValue = record.value();    ...}


转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/495685.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号