栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 面试经验 > 面试问答

如何在Spring Cloud Stream中配置函数的绑定以将其输入绑定到Web终结点并将其输出绑定到Kafka主题

面试问答 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

如何在Spring Cloud Stream中配置函数的绑定以将其输入绑定到Web终结点并将其输出绑定到Kafka主题

感谢奥列格张贴这一解决方案背后的想法。从本质上讲,我增强了他的建议,以便大致处理以下两者之间的桥梁:

  1. 功能齐全的网络控制器;可以接收网络请求。
  2. 流供应商;可以将任何消息转发到消息传递基础结构。

此解决方案将Oleg示例中描述的问题封装在的自定义实现中

Supplier
。这种实现公开了一个API来触发,
Supplier
以发出作为参数传递的消息。这样的类如下所示:

import org.springframework.messaging.Message;import org.springframework.messaging.support.MessageBuilder;import java.util.function.Supplier;import reactor.core.publisher.EmitterProcessor;import reactor.core.publisher.Flux;public class StreamSupplier implements Supplier<Flux<?>> {    private static final String SPRING_CLOUD_STREAM_SENDTO_DESTINATION = "spring.cloud.stream.sendto.destination";    public static <T> Message<?> createMessage(T payload, String destination) {        MessageBuilder<T> builder = MessageBuilder.withPayload(payload);        if (destination != null && !destination.isEmpty()) builder.setHeader(SPRING_CLOUD_STREAM_SENDTO_DESTINATION, destination);        return builder.build();    }    private String defaultDestination;    private EmitterProcessor<? super Object> processor = EmitterProcessor.create();    public StreamSupplier() {        this(null);    }    public StreamSupplier(String defaultDestination) {        this.defaultDestination = defaultDestination;    }    // SEND APIs    public <T> Message<?> sendMessage(T payload) {        return sendMessage(payload, defaultDestination);    }    public <T> Message<?> sendMessage(T payload, String destination) {        return sendBody(createMessage(payload, destination));    }    public <T> T sendBody(T body) {        processor.onNext(body);        return body;    }        @Override    public Flux<?> get() {        return processor;    }}

然后,开发人员只需:

  1. 在应用程序中将该特定
    Supplier
    实现的实例注册为;然后将其扫描到中。
    bean``Spring``spring-cloud-function``bean``FunctionCatalog
  2. 创建一个Web功能,以使用先前注册的功能将任何消息转发到流式基础
    Supplier
    结构-可以使用的所有功能进行配置
    spring-cloud-stream

下面的示例演示了这一点:

import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;import org.springframework.context.annotation.Bean;import org.springframework.stereotype.Controller;import java.util.function.Function;import java.util.function.Supplier;import reactor.core.publisher.Flux;@SpringBootApplication@Controllerpublic class MyApp {    public static void main(String[] args) {        SpringApplication.run(MyApp.class,     "--spring.cloud.function.definition=streamSupplierFunction;webToStreamFunction");    }    // Functional Web Controller    @Bean    public Function<String, String> webToStreamFunction() {        return msg -> streamSupplier().sendBody(msg);    }    // Functional Stream Supplier    @Bean    public Supplier<Flux<?>> streamSupplierFunction() {        return new StreamSupplier();    }    // DOUBLE REGISTRATION TO AVOID POLLABLE ConFIGURATION    // LIMITATION OF SPRING-CLOUD-FUNCTION    @Bean    public StreamSupplier streamSupplier() {        return (StreamSupplier) streamSupplierFunction();    }}


转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/635652.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号