也许问题出在Nifi如何写入(编码)Avro数据与消费者应用程序读取(解码)数据的方式之间不匹配。
简而言之,Avro的API提供了两种不同的序列化方法:
- 用于创建适当的Avro 文件 :编码数据记录,但也将Avro模式嵌入到一种前导中(通过
org.apache.avro.file.{DataFileWriter/DataFileReader})。将模式嵌入Avro文件非常有意义,因为(a)Avro文件的“有效载荷”通常比嵌入式Avro模式大几个数量级,并且(b)然后您可以根据自己的心脏内容来复制或移动这些文件并且仍然确保您可以再次阅读它们而不必咨询某人或某物。 - 仅对数据记录进行编码,即不嵌入模式(通过
org.apache.avro.io.{BinaryEnprer/BinaryDeprer};请注意程序包名称的区别:io
此处vs。file
以上)。例如,当对正在写入Kafka主题的消息进行Avro编码时,通常会首选这种方法,因为与上述变体1相比,您不会招致将Avro模式重新嵌入到每条消息中的开销,前提是您假设(非常合理)政策是,对于同一Kafka主题,将使用相同的Avro模式对消息进行格式化/编码。这是一个重要的优势,因为在流数据上下文中,运动中数据记录通常比上述静止数据Avro文件小得多(通常在100个字节和几百个KB之间)(通常为数百个或200个字节)。千MB);因此Avro模式的大小相对较大,因此在将2000条数据记录写入Kafka时,您不想将其嵌入2000x。缺点是您必须“以某种方式” 跟踪Avro模式如何映射到Kafka主题-或更准确地说,您必须以某种方式跟踪消息使用哪种Avro模式进行编码,而不必沿用直接嵌入模式的路径。好消息是Kafka生态系统中可用的工具(Avro模式注册表)可透明地执行此操作。因此,与变体1相比,变体2以牺牲便利为代价提高了效率。
效果是,取决于您使用上面的(1)还是(2),编码的Avro数据的“有线格式”看起来会有所不同。
我对Apache
Nifi不太熟悉,但是快速浏览一下源代码(例如ConvertAvroToJSON.java)对我来说,它使用的是变体1,即将Avro模式嵌入到Avro记录旁边。但是,您的使用者代码使用
DeprerFactory.get().binaryDeprer()并因此使用了变体2(未嵌入架构)。
也许这可以解释您遇到的错误?



