Spring Cloud Stream 是一个用来为微服务应用构建消息驱动能力的框架。引入了发布-订阅、消费组、分区这三个核心概念。
why可以做到各消息中间的无缝切换,例如一个系统为rabbit,另一个为kafka,可以对两个中间件进行消息topic整合,之后发布给消费者
howprovider:
依赖:
org.springframework.cloud spring-cloud-starter-stream-rabbit
yml:
server:
port: 8801
spring:
application:
name: cloud-stream-provider
cloud:
stream:
binders: # 在此处配置要绑定的rabbitmq的服务信息;
defaultRabbit: # 表示定义的名称,用于于binding整合
type: rabbit # 消息组件类型
environment: # 设置rabbitmq的相关的环境配置
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
bindings: # 服务的整合处理
output: # 这个名字是一个通道的名称
destination: studyExchange # 表示要使用的Exchange名称定义
content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain”
binder: defaultRabbit # 设置要绑定的消息服务的具体设置
#group: left #避免重复消费
eureka:
client: # 客户端进行Eureka注册的配置
service-url:
defaultZone: http://localhost:7001/eureka
MessageService.java
@EnableBinding(Source.class)
public class MessageServiceImpl implements IMessageService {
@Resource
private MessageChannel output;
@Override
public String send() {
String serial = UUID.randomUUID().toString();
Message stringMessage = MessageBuilder.withPayload(serial).build();
output.send(stringMessage);
System.out.println("*****serial: " + serial);
return serial;
}
}
MessageController.java
@RestController
public class StreamRabbitController {
@Resource
private IMessageService iMessageService;
@GetMapping(value = "/sendMessage")
public String sendMessage() {
return iMessageService.send();
}
}
consumer:
依赖,yml同provider
Listener.java
@Component
@EnableBinding(Sink.class)
public class ReceiveMSGListener {
@Value("${server.port}")
private String serverPort;
@StreamListener(Sink.INPUT)
public void input(Message message) {
System.out.println("port:" + serverPort + "t接受:" + message.getPayload());
}
}
当然可以建立多个消费者,设置不同的端口号。
8802:
8803:
如果不想信息被重复消费,可以将8802,8803配置在同一个分组



