感谢奥列格张贴这一解决方案背后的想法。从本质上讲,我增强了他的建议,以便大致处理以下两者之间的桥梁:
- 功能齐全的网络控制器;可以接收网络请求。
- 流供应商;可以将任何消息转发到消息传递基础结构。
此解决方案将Oleg示例中描述的问题封装在的自定义实现中
Supplier。这种实现公开了一个API来触发,
Supplier以发出作为参数传递的消息。这样的类如下所示:
import org.springframework.messaging.Message;import org.springframework.messaging.support.MessageBuilder;import java.util.function.Supplier;import reactor.core.publisher.EmitterProcessor;import reactor.core.publisher.Flux;public class StreamSupplier implements Supplier<Flux<?>> { private static final String SPRING_CLOUD_STREAM_SENDTO_DESTINATION = "spring.cloud.stream.sendto.destination"; public static <T> Message<?> createMessage(T payload, String destination) { MessageBuilder<T> builder = MessageBuilder.withPayload(payload); if (destination != null && !destination.isEmpty()) builder.setHeader(SPRING_CLOUD_STREAM_SENDTO_DESTINATION, destination); return builder.build(); } private String defaultDestination; private EmitterProcessor<? super Object> processor = EmitterProcessor.create(); public StreamSupplier() { this(null); } public StreamSupplier(String defaultDestination) { this.defaultDestination = defaultDestination; } // SEND APIs public <T> Message<?> sendMessage(T payload) { return sendMessage(payload, defaultDestination); } public <T> Message<?> sendMessage(T payload, String destination) { return sendBody(createMessage(payload, destination)); } public <T> T sendBody(T body) { processor.onNext(body); return body; } @Override public Flux<?> get() { return processor; }}然后,开发人员只需:
- 在应用程序中将该特定
Supplier
实现的实例注册为;然后将其扫描到中。bean``Spring``spring-cloud-function``bean``FunctionCatalog
- 创建一个Web功能,以使用先前注册的功能将任何消息转发到流式基础
Supplier
结构-可以使用的所有功能进行配置spring-cloud-stream
。
下面的示例演示了这一点:
import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;import org.springframework.context.annotation.Bean;import org.springframework.stereotype.Controller;import java.util.function.Function;import java.util.function.Supplier;import reactor.core.publisher.Flux;@SpringBootApplication@Controllerpublic class MyApp { public static void main(String[] args) { SpringApplication.run(MyApp.class, "--spring.cloud.function.definition=streamSupplierFunction;webToStreamFunction"); } // Functional Web Controller @Bean public Function<String, String> webToStreamFunction() { return msg -> streamSupplier().sendBody(msg); } // Functional Stream Supplier @Bean public Supplier<Flux<?>> streamSupplierFunction() { return new StreamSupplier(); } // DOUBLE REGISTRATION TO AVOID POLLABLE ConFIGURATION // LIMITATION OF SPRING-CLOUD-FUNCTION @Bean public StreamSupplier streamSupplier() { return (StreamSupplier) streamSupplierFunction(); }}


