目录
一、概述
1、SpringCloud Stream是什么
2、设计思想
3、Stream有什么用
4、编码AIP和常用注解
二、消息驱动之生产者搭建
1、创建模块
2、修改pom文件
3、编写yml文件
4、编写主启动类
5、编写业务类
(1)service
(2)controller
6、运行测试
三、消息驱动之消费者搭建
1、新建模块
2、修改pom文件
3、编写yml文件
4、编写主启动类
5、编写业务逻辑代码
controller
6、运行测试
一、概述
1、SpringCloud Stream是什么
官方定义 Spring Cloud Stream 是一个构建消息驱动微服务的框架。
应用程序通过 inputs 或者 outputs 来与 Spring Cloud Stream中binder对象交互。通过我们配置来binding(绑定) ,而 Spring Cloud Stream 的 binder对象负责与消息中间件交互。所以,我们只需要搞清楚如何与 Spring Cloud Stream 交互就可以方便使用消息驱动的方式。通过使用Spring Integration来连接消息代理中间件以实现消息事件驱动。
Spring Cloud Stream 为一些供应商的消息中间件产品提供了个性化的自动化配置实现,引用了发布-订阅、消费组、分区的三个核心概念。
目前仅支持RabbitMQ、Kafka。
官方定义 Spring Cloud Stream 是一个构建消息驱动微服务的框架。
应用程序通过 inputs 或者 outputs 来与 Spring Cloud Stream中binder对象交互。通过我们配置来binding(绑定) ,而 Spring Cloud Stream 的 binder对象负责与消息中间件交互。所以,我们只需要搞清楚如何与 Spring Cloud Stream 交互就可以方便使用消息驱动的方式。通过使用Spring Integration来连接消息代理中间件以实现消息事件驱动。
Spring Cloud Stream 为一些供应商的消息中间件产品提供了个性化的自动化配置实现,引用了发布-订阅、消费组、分区的三个核心概念。
目前仅支持RabbitMQ、Kafka。
2、设计思想
生产者/消费者之间靠消息媒介Message传递信息内容 ;
消息必须走特定的通道MessageChannel;
消息通道MessageChannel的子接口SubscribableChannel,由MessageHandler消息处理器所订阅。
Stream中的消息通信方式遵循了发布-订阅模式,使用Topic主题进行广播(在RabbitMQ就是Exchange,在Kakfa中就是Topic) 。
3、Stream有什么用
比方说我们用到了RabbitMQ和Kafka,由于这两个消息中间件的架构上的不同,像RabbitMQ有exchange,kafka有Topic和Partitions分区,这些中间件的差异性导致我们实际项目开发给我们造成了一定的困扰。我们如果用了两个消息队列的其中一种,假如后面的业务需求需要往另外一种消息队列进行迁移,这无疑是灾难性的:一大堆东西都要重新推倒重新做,因为它跟我们的系统耦合了,这时候springcloud Stream给我们提供了一种解耦合的方式。
在没有绑定器这个概念的情况下,我们的SpringBoot应用要直接与消息中间件进行信息交互的时候,由于各消息中间件构建的初衷不同,它们的实现细节上会有较大的差异性。通过定义绑定器作为中间层,完美地实现了应用程序与消息中间件细节之间的隔离。通过向应用程序暴露统一的Channel通道,使得应用程序不需要再考虑各种不同的消息中间件实现。通过定义绑定器Binder作为中间层,实现了应用程序与消息中间件细节之间的隔离。
比方说我们用到了RabbitMQ和Kafka,由于这两个消息中间件的架构上的不同,像RabbitMQ有exchange,kafka有Topic和Partitions分区,这些中间件的差异性导致我们实际项目开发给我们造成了一定的困扰。我们如果用了两个消息队列的其中一种,假如后面的业务需求需要往另外一种消息队列进行迁移,这无疑是灾难性的:一大堆东西都要重新推倒重新做,因为它跟我们的系统耦合了,这时候springcloud Stream给我们提供了一种解耦合的方式。
在没有绑定器这个概念的情况下,我们的SpringBoot应用要直接与消息中间件进行信息交互的时候,由于各消息中间件构建的初衷不同,它们的实现细节上会有较大的差异性。通过定义绑定器作为中间层,完美地实现了应用程序与消息中间件细节之间的隔离。通过向应用程序暴露统一的Channel通道,使得应用程序不需要再考虑各种不同的消息中间件实现。通过定义绑定器Binder作为中间层,实现了应用程序与消息中间件细节之间的隔离。
4、编码AIP和常用注解
二、消息驱动之生产者搭建
1、创建模块
新建普通maven模块cloud-stream-rabbitmq-provider8801
2、修改pom文件
cloud
com.shang.cloud
1.0-SNAPSHOT
4.0.0
cloud-stream-rabbitmq-provider8801
org.springframework.boot
spring-boot-starter-web
org.springframework.boot
spring-boot-starter-actuator
org.springframework.cloud
spring-cloud-starter-netflix-eureka-client
org.springframework.cloud
spring-cloud-starter-stream-rabbit
org.springframework.boot
spring-boot-devtools
runtime
true
org.projectlombok
lombok
true
org.springframework.boot
spring-boot-starter-test
test
8
8
3、编写yml文件
server:
port: 8801
spring:
application:
name: cloud-stream-provider
cloud:
stream:
binders: # 在此处配置要绑定的rabbitmq的服务信息;
defaultRabbit: # 表示定义的名称,用于于binding整合
type: rabbit # 消息组件类型
environment: # 设置rabbitmq的相关的环境配置
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
bindings: # 服务的整合处理
output: # 这个名字是一个通道的名称
destination: studyExchange # 表示要使用的Exchange名称定义
content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain”
binder: defaultRabbit # 设置要绑定的消息服务的具体设置
eureka:
client: # 客户端进行Eureka注册的配置
service-url:
defaultZone: http://localhost:7001/eureka
instance:
lease-renewal-interval-in-seconds: 2 # 设置心跳的时间间隔(默认是30秒)
lease-expiration-duration-in-seconds: 5 # 如果现在超过了5秒的间隔(默认是90秒)
instance-id: send-8801.com # 在信息列表时显示主机名称
prefer-ip-address: true # 访问的路径变为IP地址
4、编写主启动类
@SpringBootApplication
public class StreamMQMain8801
{
public static void main(String[] args)
{
SpringApplication.run(StreamMQMain8801.class,args);
}
}
5、编写业务类
(1)service
public interface IMessageProvider
{
public String send();
}
public interface IMessageProvider
{
public String send();
}
在service下再建impl包
@EnableBinding(Source.class) //定义消息的推送管道
public class MessageProviderImpl implements IMessageProvider
{
@Resource
private MessageChannel output; // 消息发送管道
@Override
public String send()
{
String serial = UUID.randomUUID().toString();
output.send(MessageBuilder.withPayload(serial).build());
System.out.println("*****serial: "+serial);
return null;
}
}
(2)controller
@RestController
public class SendMessageController
{
@Resource
private IMessageProvider messageProvider;
@GetMapping(value = "/sendMessage")
public String sendMessage()
{
return messageProvider.send();
}
}
6、运行测试
启动7001eureka、RabbitMQ、8801
访问 http://localhost:8801/sendMessage 多刷新几遍,模拟发送了多条消息
然后看看后台
再看看RabbitMQ: http://localhost:15672/#/
此处对应的是
三、消息驱动之消费者搭建
1、新建模块
新建普通maven模块 cloud-stream-rabbitmq-consumer8802
2、修改pom文件cloud com.shang.cloud 1.0-SNAPSHOT 4.0.0 cloud-stream-rabbitmq-consumer8802org.springframework.boot spring-boot-starter-weborg.springframework.cloud spring-cloud-starter-netflix-eureka-clientorg.springframework.cloud spring-cloud-starter-stream-rabbitorg.springframework.boot spring-boot-starter-actuatororg.springframework.boot spring-boot-devtoolsruntime true org.projectlombok lomboktrue org.springframework.boot spring-boot-starter-testtest 8 8
3、编写yml文件
server:
port: 8802
spring:
application:
name: cloud-stream-consumer
cloud:
stream:
binders: # 在此处配置要绑定的rabbitmq的服务信息;
defaultRabbit: # 表示定义的名称,用于于binding整合
type: rabbit # 消息组件类型
environment: # 设置rabbitmq的相关的环境配置
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
bindings: # 服务的整合处理
input: # 这个名字是一个通道的名称
destination: studyExchange # 表示要使用的Exchange名称定义
content-type: application/json # 设置消息类型,本次为对象json,如果是文本则设置“text/plain”
binder: defaultRabbit # 设置要绑定的消息服务的具体设置
eureka:
client: # 客户端进行Eureka注册的配置
service-url:
defaultZone: http://localhost:7001/eureka
instance:
lease-renewal-interval-in-seconds: 2 # 设置心跳的时间间隔(默认是30秒)
lease-expiration-duration-in-seconds: 5 # 如果现在超过了5秒的间隔(默认是90秒)
instance-id: receive-8802.com # 在信息列表时显示主机名称
prefer-ip-address: true # 访问的路径变为IP地址
4、编写主启动类
5、编写业务逻辑代码
controller
@Component
@EnableBinding(Sink.class)
public class ReceiveMessageListener
{
@Value("${server.port}")
private String serverPort;
@StreamListener(Sink.INPUT)
public void input(Message message)
{
System.out.println("消费者1号,------->接收到的消息:" + message.getPayload()+"t port: "+serverPort);
}
}
@Component
@EnableBinding(Sink.class)
public class ReceiveMessageListener
{
@Value("${server.port}")
private String serverPort;
@StreamListener(Sink.INPUT)
public void input(Message message)
{
System.out.println("消费者1号,------->接收到的消息:" + message.getPayload()+"t port: "+serverPort);
}
}
6、运行测试
启动eureka7001、生产者8801、消费者8802
访问 http://localhost:8801/sendMessage 多刷新几次
在exchange中点击studyExchange:



