您需要使用a
ReplyingKafkaTemplate将结果返回到rest控制器。
参见ReplyingKafkaTemplate。
2.1.3版引入了KafkaTemplate的子类来提供请求/回复语义。该类名为ReplyingKafkaTemplate,并且具有一个方法(超类中的方法除外)。
结果是一个ListenableFuture,它用结果(或一个超时异常)进行异步填充。结果还具有sendFuture属性,该属性是调用KafkaTemplate.send()的结果。您可以使用此将来确定发送操作的结果。
该文档有一个示例。
编辑
@SpringBootApplication@RestControllerpublic class So63058608Application { private static final Logger LOG = LoggerFactory.getLogger(So63058608Application.class); public static void main(String[] args) { SpringApplication.run(So63058608Application.class, args); } @Autowired private ReplyingKafkaTemplate<String, String, List<String>> replyTemplate; @GetMapping(path = "/get") public List<String> getThem() throws Exception { RequestReplyFuture<String, String, List<String>> future = this.replyTemplate.sendAndReceive(new ProducerRecord<>("so63058608-1", 0, null, null)); LOG.info(future.getSendFuture().get(10, TimeUnit.SECONDS).getRecordmetadata().toString()); return future.get(10, TimeUnit.SECONDS).value(); } @KafkaListener(id = "so63058608-1", topics = "so63058608-1", splitIterables = false) @SendTo public List<String> returnList(@Payload(required = false) String payload) { return new ArrayList<>(List.of("foo", "bar", "baz")); } @Bean public ReplyingKafkaTemplate<String, String, List<String>> replyer(ProducerFactory<String, String> pf, ConcurrentKafkaListenerContainerFactory<String, List<String>> containerFactory) { containerFactory.setReplyTemplate(kafkaTemplate(pf)); ConcurrentMessageListenerContainer<String, List<String>> container = replyContainer(containerFactory); ReplyingKafkaTemplate<String, String, List<String>> replyer = new ReplyingKafkaTemplate<>(pf, container); return replyer; } @Bean public ConcurrentMessageListenerContainer<String, List<String>> replyContainer( ConcurrentKafkaListenerContainerFactory<String, List<String>> containerFactory) { ConcurrentMessageListenerContainer<String, List<String>> container = containerFactory.createContainer("so63058608-2"); container.getContainerProperties().setGroupId("so63058608-2"); container.setBatchErrorHandler(new BatchLoggingErrorHandler()); return container; } @Bean public KafkaTemplate<String, String> kafkaTemplate(ProducerFactory<String, String> pf) { return new KafkaTemplate<>(pf); } @Bean public NewTopic topic1() { return TopicBuilder.name("so63058608-1").partitions(1).replicas(1).build(); } @Bean public NewTopic topic3() { return TopicBuilder.name("so63058608-2").partitions(1).replicas(1).build(); }}spring.kafka.consumer.key-deserializer=org.springframework.kafka.support.serializer.JsonDeserializerspring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializerspring.kafka.consumer.auto-offset-reset=earliestspring.kafka.consumer.properties.spring.json.trusted.packages=*spring.kafka.producer.key-serializer=org.springframework.kafka.support.serializer.JsonSerializerspring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer$ curl localhost:8080/get["foo","bar","baz"]编辑2
并返回一些对象的列表…
@SpringBootApplication@RestControllerpublic class So63058608Application { private static final Logger LOG = LoggerFactory.getLogger(So63058608Application.class); public static void main(String[] args) { SpringApplication.run(So63058608Application.class, args); } @Autowired private ReplyingKafkaTemplate<String, String, List<Foo>> replyTemplate; @GetMapping(path = "/get") public List<Foo> getThem() throws Exception { RequestReplyFuture<String, String, List<Foo>> future = this.replyTemplate.sendAndReceive(new ProducerRecord<>("so63058608-1", 0, null, null)); LOG.info(future.getSendFuture().get(10, TimeUnit.SECONDS).getRecordmetadata().toString()); List<Foo> result = future.get(10, TimeUnit.SECONDS).value(); LOG.info(result.toString()); return result; } @KafkaListener(id = "so63058608-1", topics = "so63058608-1", splitIterables = false) @SendTo public List<Foo> returnList(@Payload(required = false) String payload) { return new ArrayList<>(List.of(new Foo("foo"), new Foo("bar"), new Foo("baz"))); } @Bean public ReplyingKafkaTemplate<String, String, List<Foo>> replyer(ProducerFactory<String, String> pf, ConcurrentKafkaListenerContainerFactory<String, List<Foo>> containerFactory) { containerFactory.setReplyTemplate(kafkaTemplate(pf)); ConcurrentMessageListenerContainer<String, List<Foo>> container = replyContainer(containerFactory); ReplyingKafkaTemplate<String, String, List<Foo>> replyer = new ReplyingKafkaTemplate<>(pf, container); return replyer; } @Bean public ConcurrentMessageListenerContainer<String, List<Foo>> replyContainer( ConcurrentKafkaListenerContainerFactory<String, List<Foo>> containerFactory) { ConcurrentMessageListenerContainer<String, List<Foo>> container = containerFactory.createContainer("so63058608-2"); container.getContainerProperties().setGroupId("so63058608-2"); container.setBatchErrorHandler(new BatchLoggingErrorHandler()); return container; } @Bean public KafkaTemplate<String, String> kafkaTemplate(ProducerFactory<String, String> pf) { return new KafkaTemplate<>(pf); } @Bean public NewTopic topic1() { return TopicBuilder.name("so63058608-1").partitions(1).replicas(1).build(); } @Bean public NewTopic topic3() { return TopicBuilder.name("so63058608-2").partitions(1).replicas(1).build(); } public static JavaType returnType(byte[] data, Headers headers) { return TypeFactory.defaultInstance() .constructCollectionLikeType(List.class, Foo.class); }}class Foo { private String bar; public Foo() { } public Foo(String bar) { this.bar = bar; } public String getBar() { return this.bar; } public void setBar(String bar) { this.bar = bar; } @Override public String toString() { return "Foo [bar=" + this.bar + "]"; }}spring.kafka.consumer.properties.spring.json.value.type.method=com.example.demo.So63058608Application.returnType[Foo [bar=foo], Foo [bar=bar], Foo [bar=baz]]


