1 产生的原因
现在一个很项目可能分为三部分:
前端—>后端---->大数据
而后端开发使用消息中间件,可能会使用RabbitMq
而大数据开发,一般都是使用Kafka,
那么一个项目中有多个消息中间件,并且这两个消息中间件的架构上也有所不同,像RabbitMq有exchange ,Kafka有Topic和Partitions分区,对程序员很不友好,所以产生Spring Cloud Stream
而Spring Cloud Stream就类似jpa,屏蔽底层消息中间件的差异,程序员主要操作Spring Cloud Stream即可,降低切换成本,统一消息编程模型
2 什么是Spring Cloud Stream
Spring Cloud Stream 是一个用来为微服务应用构建消息驱动能力的框架。
它可以基于 Spring Boot 来创建独立的、可用于生产的 Spring 应用程序。Spring Cloud Stream 为一些供应商的消息中间件产品提供了个性化的自动化配置实现,并引入了发布-订阅、消费组、分区这三个核心概念。通过使用 Spring Cloud Stream,可以有效简化开发人员对消息中间件的使用复杂度,让系统开发人员可以有更多的精力关注于核心业务逻辑的处理。
消息中间件有:ActiveMQ、RabbitMQ、RocketMQ、Kafka
但是目前 Spring Cloud Stream 只支持 RabbitMQ 和 Kafka 的自动化配置。
3 Spring Cloud Stream怎么屏蔽底层差异
应用程序通过inputs(生产者)和outputs(消费者)来与Spring Cloud Stream中binder对象交互。
通过我们配置来binding(绑定),而Spring Cloud Stream的binder对象负责与消息中间件交互。
就是利用定义绑定器Binder作为中间层,实现了隔离
4 Spring Cloud Stream的 通信模式
Spring Cloud Stream 中的消息通信方式遵循了发布-订阅模式,当一条消息被投递到消息中间件后,它会通过共享的 Topic 主题进行广播,消息消费者在订阅的主题中收到它并触发自身的业务逻辑处理。
5 Spring Cloud Stream的业务流程
Source和Sink:
简单理解操作对象就是Spring Cloud Stream自身,从Stream发布消息就是输出,接受消息就是输入。
source用于获取数据(要发送到mq的数据)
Channel :
通道,是队列的一种抽象,在消息通讯系统中就是实现存储和转发的媒介。
channel类似SpringCloudStream中的中间件,用于存放source接收到的数据,或者是存放binder拉取的数据
6 Spring Cloud Stream 常用注解和api
7 使用Spring Cloud Stream
需要创建三个项目,一个生产者,两个消费者
7.1)创建生产者
7.1.1)pom
org.springframework.cloud spring-cloud-starter-stream-rabbit
7.1.2) 配置文件
server:
port: 8801
spring:
application:
name: cloud-stream-provider
cloud:
stream:
binders: # 在此处配置要绑定的rabbitmq的服务信息;
defaultRabbit: # 表示定义的名称,用于于binding整合
type: rabbit # 消息组件类型
environment: # 设置rabbitmq的相关的环境配置
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
bindings: # 服务的整合处理
output: # 这个名字是一个通道的名称
destination: studyExchange # 表示要使用的Exchange名称定义
content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain”
binder: defaultRabbit # 设置要绑定的消息服务的具体设置
eureka:
client: # 客户端进行Eureka注册的配置
service-url:
defaultZone: http://localhost:7001/eureka
instance:
lease-renewal-interval-in-seconds: 2 # 设置心跳的时间间隔(默认是30秒)
lease-expiration-duration-in-seconds: 5 # 如果现在超过了5秒的间隔(默认是90秒)
instance-id: send-8801.com # 在信息列表时显示主机名称
prefer-ip-address: true # 访问的路径变为IP地址
7.1.3) 主启动
@SpringBootApplication
@EnableEurekaClient
public class CloudStreamRabbitmqProvider8801Application {
public static void main(String[] args) {
SpringApplication.run(CloudStreamRabbitmqProvider8801Application.class, args);
System.out.println("启动成功");
}
}
7.1.4) 业务类
serice接口和实现类
public interface IMessageProviderService {
String send();
}
//@EnableBinding(Source.class) 定义消息的推送管道(生产消息) 将Channel和Exchanges绑定在一起
@EnableBinding(Source.class)
public class MessageProviderServiceImpl implements IMessageProviderService {
@Resource
private MessageChannel output;
@Override
public String send() {
String serial = UUID.randomUUID().toString();
Message stringMessage = MessageBuilder.withPayload(serial).build();//build会构建一个message类
output.send(stringMessage);
System.out.println("*****serial: " + serial);
return serial;
}
}
controller类
@RestController
public class SendMessageController {
@Resource
private IMessageProviderService messageProviderService;
@GetMapping(value = "/sendMessage")
public String sendMessage() {
return messageProviderService.send();
}
}
这样调用send方法,将消息发送给channel,然后channel将消费发送给binder,然后发送到rabbitmq中,会在rabbitmq中创建一个Exchange,就是我们配置文件中配置的exchange
7.2)创建消费者1号
7.2.1) pom
org.springframework.cloud spring-cloud-starter-stream-rabbit
7.2.2) 配置文件
server:
port: 8802
spring:
application:
name: cloud-stream-consumer
cloud:
stream:
binders: # 在此处配置要绑定的rabbitmq的服务信息;
defaultRabbit: # 表示定义的名称,用于于binding整合
type: rabbit # 消息组件类型
environment: # 设置rabbitmq的相关的环境配置
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
bindings: # 服务的整合处理
input: # 这个名字是一个通道的名称
destination: studyExchange # 表示要使用的Exchange名称定义
content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain”
binder: defaultRabbit # 设置要绑定的消息服务的具体设置
eureka:
client: # 客户端进行Eureka注册的配置
service-url:
defaultZone: http://localhost:7001/eureka
instance:
lease-renewal-interval-in-seconds: 2 # 设置心跳的时间间隔(默认是30秒)
lease-expiration-duration-in-seconds: 5 # 如果现在超过了5秒的间隔(默认是90秒)
instance-id: receive-8802.com # 在信息列表时显示主机名称
prefer-ip-address: true # 访问的路径变为IP地址
7.2.3) 主启动
@SpringBootApplication
@EnableEurekaClient
public class CloudStreamRabbitmqConsumer8802Application {
public static void main(String[] args) {
SpringApplication.run(CloudStreamRabbitmqConsumer8802Application.class, args);
System.out.println("启动成功");
}
}
7.2.4) 业务类
//@EnableBinding(Sink.class) 负责接收channel发送过来的数据进行消费
@Component
@EnableBinding(Sink.class)
public class ReceiveMessageListener {
@Value("${server.port}")
private String serverPort;
@StreamListener(Sink.INPUT) //监听sink的input,而input在配置文件中配置并且绑定了exchange,获取数据
public void input(Message message) {
System.out.println("port:" + serverPort + "t接受:" + message.getPayload());
}
}
7.3)创建消费者2号
当生产者发送消息的时候,可以看到消费者1号和1号都同时消费了同一条消息,产生了重复消费
8 解决重复消费问题
利用Stream中的消息分组解决:
在Stream中同一个group中是竞争关系,保证消息只被一个应用消费一次
不同组可以全面消费(重复消费),Stream默认是不同组的
更改消费者1号和2号的配置文件
bindings: # 服务的整合处理
input: # 这个名字是一个通道的名称
destination: studyExchange # 表示要使用的Exchange名称定义
content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain”
binder: defaultRabbit # 设置要绑定的消息服务的具体设置
group: zuB
9 持久化消费
就是当服务挂了,怎么消费没有消费的数据
这里,先将消费者1号移除zuB组,
然后将消费者1号和2号服务关闭
此时生产者开启,发送3条消息
此时重启消费者1号和2号
可以看到,当消费者1号退出zuB组后,它就获取不到在它宕机的时间段内的数据
但是消费者2号重启后,直接获取到了宕机期间它没有消费的数据,并且消费了
总结:
也就是,当我们没有配置分组时,会出现消息漏消费的问题
而配置分组后,我们可以自动获取未消费的数据



