栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 面试经验 > 面试问答

Spring Kafka的Spring Boot Rest API

面试问答 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Spring Kafka的Spring Boot Rest API

您需要使用a

ReplyingKafkaTemplate
将结果返回到rest控制器。

参见ReplyingKafkaTemplate。

2.1.3版引入了KafkaTemplate的子类来提供请求/回复语义。该类名为ReplyingKafkaTemplate,并且具有一个方法(超类中的方法除外)。

结果是一个ListenableFuture,它用结果(或一个超时异常)进行异步填充。结果还具有sendFuture属性,该属性是调用KafkaTemplate.send()的结果。您可以使用此将来确定发送操作的结果。

该文档有一个示例。

编辑

@SpringBootApplication@RestControllerpublic class So63058608Application {    private static final Logger LOG = LoggerFactory.getLogger(So63058608Application.class);    public static void main(String[] args) {        SpringApplication.run(So63058608Application.class, args);    }    @Autowired    private ReplyingKafkaTemplate<String, String, List<String>> replyTemplate;    @GetMapping(path = "/get")    public List<String> getThem() throws Exception {        RequestReplyFuture<String, String, List<String>> future =     this.replyTemplate.sendAndReceive(new ProducerRecord<>("so63058608-1", 0, null, null));        LOG.info(future.getSendFuture().get(10, TimeUnit.SECONDS).getRecordmetadata().toString());        return future.get(10, TimeUnit.SECONDS).value();    }    @KafkaListener(id = "so63058608-1", topics = "so63058608-1", splitIterables = false)    @SendTo    public List<String> returnList(@Payload(required = false) String payload) {        return new ArrayList<>(List.of("foo", "bar", "baz"));    }    @Bean    public ReplyingKafkaTemplate<String, String, List<String>> replyer(ProducerFactory<String, String> pf, ConcurrentKafkaListenerContainerFactory<String, List<String>> containerFactory) {        containerFactory.setReplyTemplate(kafkaTemplate(pf));        ConcurrentMessageListenerContainer<String, List<String>> container = replyContainer(containerFactory);        ReplyingKafkaTemplate<String, String, List<String>> replyer = new ReplyingKafkaTemplate<>(pf, container);        return replyer;    }    @Bean    public ConcurrentMessageListenerContainer<String, List<String>> replyContainer( ConcurrentKafkaListenerContainerFactory<String, List<String>> containerFactory) {        ConcurrentMessageListenerContainer<String, List<String>> container =     containerFactory.createContainer("so63058608-2");        container.getContainerProperties().setGroupId("so63058608-2");        container.setBatchErrorHandler(new BatchLoggingErrorHandler());        return container;    }    @Bean    public KafkaTemplate<String, String> kafkaTemplate(ProducerFactory<String, String> pf) {        return new KafkaTemplate<>(pf);    }    @Bean    public NewTopic topic1() {        return TopicBuilder.name("so63058608-1").partitions(1).replicas(1).build();    }    @Bean    public NewTopic topic3() {        return TopicBuilder.name("so63058608-2").partitions(1).replicas(1).build();    }}spring.kafka.consumer.key-deserializer=org.springframework.kafka.support.serializer.JsonDeserializerspring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializerspring.kafka.consumer.auto-offset-reset=earliestspring.kafka.consumer.properties.spring.json.trusted.packages=*spring.kafka.producer.key-serializer=org.springframework.kafka.support.serializer.JsonSerializerspring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer$ curl localhost:8080/get["foo","bar","baz"]

编辑2

并返回一些对象的列表…

@SpringBootApplication@RestControllerpublic class So63058608Application {    private static final Logger LOG = LoggerFactory.getLogger(So63058608Application.class);    public static void main(String[] args) {        SpringApplication.run(So63058608Application.class, args);    }    @Autowired    private ReplyingKafkaTemplate<String, String, List<Foo>> replyTemplate;    @GetMapping(path = "/get")    public List<Foo> getThem() throws Exception {        RequestReplyFuture<String, String, List<Foo>> future =     this.replyTemplate.sendAndReceive(new ProducerRecord<>("so63058608-1", 0, null, null));        LOG.info(future.getSendFuture().get(10, TimeUnit.SECONDS).getRecordmetadata().toString());        List<Foo> result = future.get(10, TimeUnit.SECONDS).value();        LOG.info(result.toString());        return result;    }    @KafkaListener(id = "so63058608-1", topics = "so63058608-1", splitIterables = false)    @SendTo    public List<Foo> returnList(@Payload(required = false) String payload) {        return new ArrayList<>(List.of(new Foo("foo"), new Foo("bar"), new Foo("baz")));    }    @Bean    public ReplyingKafkaTemplate<String, String, List<Foo>> replyer(ProducerFactory<String, String> pf, ConcurrentKafkaListenerContainerFactory<String, List<Foo>> containerFactory) {        containerFactory.setReplyTemplate(kafkaTemplate(pf));        ConcurrentMessageListenerContainer<String, List<Foo>> container = replyContainer(containerFactory);        ReplyingKafkaTemplate<String, String, List<Foo>> replyer = new ReplyingKafkaTemplate<>(pf, container);        return replyer;    }    @Bean    public ConcurrentMessageListenerContainer<String, List<Foo>> replyContainer( ConcurrentKafkaListenerContainerFactory<String, List<Foo>> containerFactory) {        ConcurrentMessageListenerContainer<String, List<Foo>> container =     containerFactory.createContainer("so63058608-2");        container.getContainerProperties().setGroupId("so63058608-2");        container.setBatchErrorHandler(new BatchLoggingErrorHandler());        return container;    }    @Bean    public KafkaTemplate<String, String> kafkaTemplate(ProducerFactory<String, String> pf) {        return new KafkaTemplate<>(pf);    }    @Bean    public NewTopic topic1() {        return TopicBuilder.name("so63058608-1").partitions(1).replicas(1).build();    }    @Bean    public NewTopic topic3() {        return TopicBuilder.name("so63058608-2").partitions(1).replicas(1).build();    }    public static JavaType returnType(byte[] data, Headers headers) {        return TypeFactory.defaultInstance()     .constructCollectionLikeType(List.class, Foo.class);    }}class Foo {    private String bar;    public Foo() {    }    public Foo(String bar) {        this.bar = bar;    }    public String getBar() {        return this.bar;    }    public void setBar(String bar) {        this.bar = bar;    }    @Override    public String toString() {        return "Foo [bar=" + this.bar + "]";    }}spring.kafka.consumer.properties.spring.json.value.type.method=com.example.demo.So63058608Application.returnType[Foo [bar=foo], Foo [bar=bar], Foo [bar=baz]]


转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/464307.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号