在最近发布的 Flink 1.14.0 版本中对 Source 接口进行了重构,细节可以参考 FLIP-27: Refactor Source Interface
重构之后 API 层面的改动还是非常大的,那在使用新的 API 消费 kafka 数据的时候如何自定义序列化类呢?
Kafka SourceKafkaSourcesource = KafkaSource. builder() .setBootstrapServers(brokers) .setTopics("input-topic") .setGroupId("my-group") .setStartingOffsets(OffsetsInitializer.earliest()) .setValueonlyDeserializer(new SimpleStringSchema()) .build(); env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
KafkaSourceBuilder 类提供了两个方法来反序列数据,分别是 setDeserializer 和 setValueonlyDeserializer 从名字上就应该可以看出这两者的区别,前者是反序列化完整的 ConsumerRecord,后者只反序列化 ConsumerRecord 的 value.然后我们来看一下底层的源码
KafkaSourceBuilder 源码public KafkaSourceBuildersetDeserializer( KafkaRecordDeserializationSchema recordDeserializer) { this.deserializationSchema = recordDeserializer; return this; } public KafkaSourceBuilder setValueonlyDeserializer( DeserializationSchema deserializationSchema) { this.deserializationSchema = KafkaRecordDeserializationSchema.valueonly(deserializationSchema); return this; }
可以看到这两个方法实际上是一样的,虽然两个方法的参数不同,setDeserializer 方法参数类型是 KafkaRecordDeserializationSchema 而 setValueonlyDeserializer 方法的参数类型是 DeserializationSchema 那这两种参数类型有什么区别和联系呢?下面会进一步解释, 但是这两个方法最后返回的都是 KafkaRecordDeserializationSchema 对象,我们继续来看 KafkaRecordDeserializationSchema 的源码
先来看一下 DeserializationSchema 的部分源码
DeserializationSchema 源码@Public public interface DeserializationSchemaKafkaRecordDeserializationSchema 源码extends Serializable, ResultTypeQueryable { @PublicEvolving default void open(InitializationContext context) throws Exception {} T deserialize(byte[] message) throws IOException; @PublicEvolving default void deserialize(byte[] message, Collector out) throws IOException { T deserialize = deserialize(message); if (deserialize != null) { out.collect(deserialize); } } boolean isEndOfStream(T nextElement); }
public interface KafkaRecordDeserializationSchemaextends Serializable, ResultTypeQueryable { @PublicEvolving default void open(DeserializationSchema.InitializationContext context) throws Exception {} @PublicEvolving void deserialize(ConsumerRecord record, Collector out) throws IOException; static KafkaRecordDeserializationSchema of( KafkaDeserializationSchema kafkaDeserializationSchema) { return new KafkaDeserializationSchemaWrapper<>(kafkaDeserializationSchema); } static KafkaRecordDeserializationSchema valueonly( DeserializationSchema valueDeserializationSchema) { return new KafkaValueOnlyDeserializationSchemaWrapper<>(valueDeserializationSchema); } static KafkaRecordDeserializationSchema valueonly( Class extends Deserializer > valueDeserializerClass) { return new KafkaValueOnlyDeserializerWrapper<>( valueDeserializerClass, Collections.emptyMap()); } static > KafkaRecordDeserializationSchema valueonly( Class valueDeserializerClass, Map config) { return new KafkaValueOnlyDeserializerWrapper<>(valueDeserializerClass, config); } }
顾名思义,这两个都是反序列接口,并且都继承了 Serializable, ResultTypeQueryable 这两个接口。不同点是,deserialize 方法的参数不一样,KafkaDeserializationSchema 接口很明显是为反序列化 kafka 数据而生的。DeserializationSchema 接口可以反序列化任意二进制数据,更加具有通用性。所以这两个是同一级接口
如果你想要获取 kafka 的元数据信息选择实现 KafkaDeserializationSchema 接口就可以了,KafkaDeserializationSchema 接口还有 4 个静态方法,其中的 of 方法就是用来反序列化 ConsumerRecord 的,剩下的 3 个 valueonly 是用来反序列化 kafka 消息中的 value 的.
到这里就非常清楚了,如果我们要自定义序列化类,实现 DeserializationSchema 和 KafkaRecordDeserializationSchema 任何一个都是可以的.下面就以 KafkaRecordDeserializationSchema 接口为例,实现一个简单的反序列化类.
MyKafkaDeserialization 自定义序列化类package flink.stream.deserialization; import bean.Jason; import com.alibaba.fastjson.JSON; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.log4j.Logger; public class MyKafkaDeserialization implements KafkaDeserializationSchema{ private static final Logger log = Logger.getLogger(MyKafkaDeserialization.class); private final String encoding = "UTF8"; private boolean includeTopic; private boolean includeTimestamp; public MyKafkaDeserialization(boolean includeTopic, boolean includeTimestamp) { this.includeTopic = includeTopic; this.includeTimestamp = includeTimestamp; } @Override public TypeInformation getProducedType() { return TypeInformation.of(Jason.class); } @Override public boolean isEndOfStream(Jason nextElement) { return false; } @Override public Jason deserialize(ConsumerRecord consumerRecord) throws Exception { if (consumerRecord != null) { try { String value = new String(consumerRecord.value(), encoding); Jason jason = JSON.parseObject(value, Jason.class); if (includeTopic) jason.setTopic(consumerRecord.topic()); if (includeTimestamp) jason.setTimestamp(consumerRecord.timestamp()); return jason; } catch (Exception e) { log.error("deserialize failed : " + e.getMessage()); } } return null; } }
整个实现是非常简单的,这样就可以把消费到的数据反序列化成自己想要的格式,虽然 Flink 1.14.0 重构了 Source 接口,但是反序列化接口几乎没变,只不过在原有的基础上增加了几个方法而已.
使用KafkaSourcesource = KafkaSource. builder() .setProperty("security.protocol", "SASL_PLAINTEXT") .setProperty("sasl.mechanism", "PLAIN") .setProperty("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username="" + username + "" password="" + password + "";") // discover new partitions per 10 seconds .setProperty("partition.discovery.interval.ms", "10000") .setBootstrapServers(broker) .setTopics(topic) .setGroupId(group_id) .setStartingOffsets(OffsetsInitializer.earliest()) .setDeserializer(KafkaRecordDeserializationSchema.of(new MyKafkaDeserialization(true, true))) // 只反序列化 value .setValueonlyDeserializer(new MyDeSerializer()) .build();
setDeserializer 和 setValueonlyDeserializer 只用设置一个即可.
推荐阅读Flink 任务实时监控最佳实践
Flink on yarn 实时日志收集最佳实践
Flink 1.14.0 全新的 Kafka Connector
如果你觉得文章对你有帮助,麻烦点一下赞和在看吧,你的支持是我创作的最大动力.



