更新 (5.5及更高版本)
Confluent版本5.5通过PrimitiveAvroSerde(cf. https://github.com/confluentinc/schema-registry/blob/5.5.x/avro-serde/src/main/java/io/confluent/kafka/streams添加了对原始Avro类型的本地支持/serdes/avro/PrimitiveAvroSerde.java)
原始答案 (版本5.4及更高版本):
这是一个已知问题。原始Avro类型无法与Confluent的AvroSerdes配合使用,因为Serdes只能与GenericAvroRecord和一起SpecificAvroRecord使用。
比较https://github.com/confluentinc/schema-registry/tree/master/avro-serde/src/main/java/io/confluent/kafka/streams/serdes/avro。
因此,基于KafkaAvroSerializer和构建您自己的SerdeKafkaAvroDeserializer是正确的方法。为了能够将其作为默认Serde传递到配置中,您不能使用,Serdes.serdeFrom因为由于泛型类型擦除而导致类型信息丢失。
但是,您可以实现自己的扩展Serde接口的类,并将自定义类传递给config:
public class MySerde extends Serde<Integer> { // use KafkaAvroSerializer and KafkaAvroDeserializer and cast `Object` to `Integer`}config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, MySerde.class);


