学习视频
尚硅谷SpringCloud框架开发教程(SpringCloudAlibaba微服务分布式架构丨Spring Cloud)
集数:83—91
学习格言
不在能知,乃在能行。
学习笔记
【Java】学习笔记汇总
项目地址
https://gitee.com/zqcliudaliuda/cloud2021
文章目录
- 一、概述
- 1.1 引入原因
- 1.2 概念
- 1.3 设计思想
- 1.4 标准流程
- 1.5 编码API和常用注解
- 二、消息驱动之生产者
- 三、消息驱动之消费者
- 四、消费分组与持久化
- 4.1 实际场景
- 4.2 解决重复消费问题
- 4.3 自定义分组
- 4.4 持久化
官网:https://spring.io/projects/spring-cloud-stream
1.1 引入原因屏蔽底层消息中间件的差异,降低切换成本,统一消息的编程模型。
因为平台可能使用各种消息中间件,如ActiveMQ、RabbitMQ、RocketMQ、Kafka等,但是一个人不可能精通这么多消息中间件,所以通过stream可以屏蔽消息中间件的差异,进行统一消息的编程模型。
少学东西,多专注于业务。
1.2 概念官方定义Spring Cloud Stream是一个构建消息驱动微服务的框架。应用程序通过inputs或者outputs与Spring Cloud Stream中binder对象交互。通过我们配置来binding(绑定),而Spring Cloud Stream的binder对象负责与消息中间件交互。所以,我们只需要搞清楚如何与Spring Cloud Stream交互就可以方便使用消息驱动的方式。
通过使用Spring Integration来连接消息代理中间件以实现消息事件驱动。Spring Cloud Stream为些供应商的消息中间件产品提供了个性化的自动化配置实现,引用了发布-订阅、消费组、分区的三个核心概念。
目前仅支持RabbitMQ、Kafka.
标准MQ
- 生产者/消费者之间靠消息媒介传递信息内容: Message
- 消息必须走特定的通道:消息通道 Message Channel
- 消息通道里的消息如何被消费、收费处理:消息通道的子接口SubscribableChannel,由MessageHandler消息处理器所订阅
引入Stream
比方说我们用到了RabbitMQ和Kafka,由于这两个消息中间件的架构上的不同,像RabbitMQ有exchange,kafka有Topic和Partitions分区。
这些中间件的差异性导致我们实际项目开发给我们造成了一定的困扰,我们如果用了两个消息队列的其中一种,后面的业务需求,我想往另外一种消息队列进行迁移,这时候无疑就是一个灾难性的, 一大堆东西都要重新推倒重新做,因为它跟我们的系统耦合了,这时候springcloud Stream给我们提供了一种解耦合的方式。
在没有绑定器这个概念的情况下,我们的SpringBoot应用要直接与消息中间件进行信息交互的时候,
由于各消息中间件构建的初衷不同,它们的实现细节上会有较大的差异性。通过定义绑定器作为中间层,完美地实现了应用程序与消息中间件细节之间的隔离。通过向应用程序暴露统一的Channel通道, 使得应用程序不需要再考虑各种不同的消息中间件实现。通过定义绑定器Binder作为中间层,实现了应用程序与消息中间件细节之间的隔离。
Binder
INPUT对应于消费者
OUTPUT对应于生产者
通过定义绑定器Binder作为中间件,实现了应用程序与消息中间件细节之间的隔离。
Stream中的消息通信方式遵循了发布-订阅漠式
1.4 标准流程- Binder:很方便的连接中间件,屏蔽差异
- Channel:通道,是队列Queue的一种抽象,在消息通讯系统中就是实现存储和转发的媒介,通过channel对队列进行配置
- Source和Sink:简单的可理解为参照对象是Springcloud Stream自身,从Stream发布消息就是输出,接受消息就是输入
步骤1:新建maven项目
新建maven项目cloud-stream-rabbitmq-provider8801
步骤2:POM
org.springframework.cloud spring-cloud-starter-stream-rabbit org.springframework.cloud spring-cloud-starter-netflix-eureka-client org.springframework.boot spring-boot-starter-web org.springframework.boot spring-boot-starter-actuator org.springframework.boot spring-boot-devtools runtime true org.projectlombok lombok true org.springframework.boot spring-boot-starter-test test
步骤3:application.yml
server:
port: 8801
spring:
application:
name: cloud-stream-provider
cloud:
stream:
binders:
defaultRabbit:
type: rabbit
enviroment:
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
bindings:
output:
destination: studyExchange
content-type: application/json # 设置消息类型,本次为json,文本则设置 text/plain
binder: defaultRabbit
eureka:
client:
service-url:
defaultZone: http://eureka7001.com:7001/eureka
instance:
lease-renewal-interval-in-seconds: 2 # 心跳时间间隔(默认30s)
lease-expiration-duration-in-seconds: 5 # 如果超过5s时间间隔(默认90)
instance-id: send-8801.com # 在信息列表时显示主机名称
prefer-ip-address: true # 访问的路径变为ip地址
步骤4:主启动类
@SpringBootApplication
public class StreamMQMain8801 {
public static void main(String[] args) {
SpringApplication.run(StreamMQMain8801.class, args);
}
}
步骤5:业务类
定义接口src/main/java/com/zqc/springcloud/service/impl/MessageProvider.java
public interface IMessageProvider {
String send();
}
接口实现类src/main/java/com/zqc/springcloud/service/impl/MessageProvider.java
import com.zqc.springcloud.service.IMessageProvider;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
import javax.annotation.Resource;
import java.util.UUID;
@EnableBinding(Source.class) // 定义消息的推送管道
public class MessageProvider implements IMessageProvider {
@Resource
private MessageChannel output; // 消息发送管道
@Override
public String send() {
String serial = UUID.randomUUID().toString();
output.send(MessageBuilder.withPayload(serial).build());
System.out.println("serial: "+ serial);
return serial;
}
}
控制类src/main/java/com/zqc/springcloud/controller/SendMessageController.java
@RestController
public class SendMessageController {
@Resource
private IMessageProvider messageProvider;
@GetMapping(value = "/sendMessage")
public String sendMessage() {
return messageProvider.send();
}
}
步骤6:测试
启动rabbitmq
依次启动:7001eureka、8801
访问http://localhost:15672/#/exchanges,可查看到
多次访问:http://localhost:8801/sendMessage
可在rabbitmq management内查看到:
步骤1:新建maven项目
新建maven项目cloud-stream-rabbitmq-consumer8802
步骤2:POM
org.springframework.cloud spring-cloud-starter-stream-rabbit org.springframework.cloud spring-cloud-starter-netflix-eureka-client org.springframework.boot spring-boot-starter-web org.springframework.boot spring-boot-starter-actuator org.springframework.boot spring-boot-devtools runtime true org.projectlombok lombok true org.springframework.boot spring-boot-starter-test test
步骤3:application.yml
与消费者相比,port不同,name不同;
cloud.stream.bindings.output换成cloud.stream.bindings.input;
instance-id: send-8802.com
server:
port: 8802
spring:
application:
name: cloud-stream-consumer
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
cloud:
stream:
binders:
defaultRabbit:
type: rabbit
bindings:
input:
destination: studyExchange
content-type: application/json # 设置消息类型,本次为json,文本则设置 text/plain
binder: defaultRabbit
eureka:
client:
service-url:
defaultZone: http://eureka7001.com:7001/eureka
instance:
lease-renewal-interval-in-seconds: 2 # 心跳时间间隔(默认30s)
lease-expiration-duration-in-seconds: 5 # 如果超过5s时间间隔(默认90)
instance-id: receive-8802.com # 在信息列表时显示主机名称
prefer-ip-address: true # 访问的路径变为ip地址
步骤4:主启动类
@SpringBootApplication
public class StreamMQMain8802 {
public static void main(String[] args) {
SpringApplication.run(StreamMQMain8802.class,args);
}
}
步骤5:业务类
src/main/java/com/zqc/springcloud/controller/ReceiveMessageListenerController.java
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
@Component
@EnableBinding(Sink.class)
public class ReceiveMessageListenerController {
@Value("${server.port}")
private String serverPort;
@StreamListener(Sink.INPUT)
public void input(Message message) {
System.out.println("消费者1号接收到的消息:" + message.getPayload() + "t port:" + serverPort);
}
}
步骤6:测试
访问5次:http://localhost:8801/sendMessage
8801控制台输出:
serial: 68c2cb4f-c1b9-4b8e-8005-e7bfa89e3d1e serial: 5a2e7fbb-6e2d-4c17-bbd2-32d6f0c2b41e serial: 16456ab8-246c-4f60-a4dc-213dcec73546 serial: ddb67b0b-c00b-456d-9ef9-4284f05a2102 serial: f68b6b58-ca67-4572-a7dd-0f9aca0717d0
8802控制台输出:
消费者1号接收到的消息:68c2cb4f-c1b9-4b8e-8005-e7bfa89e3d1e port:8802 消费者1号接收到的消息:5a2e7fbb-6e2d-4c17-bbd2-32d6f0c2b41e port:8802 消费者1号接收到的消息:16456ab8-246c-4f60-a4dc-213dcec73546 port:8802 消费者1号接收到的消息:ddb67b0b-c00b-456d-9ef9-4284f05a2102 port:8802 消费者1号接收到的消息:f68b6b58-ca67-4572-a7dd-0f9aca0717d0 port:8802
rabbitmq控制台可看到:
依照8802克隆出8803。
依次启动:7001、8801、8802、8803
访问eureka7001:http://localhost:7001/
8801发送消息时,8802、8803都会接收到,存在重复消费的问题。
比如在如下场景中,订单系统我们做集群部署,都会从RabbitMQ中获取订单信息,那如果一个订单同时被两个服务获取到,那么就会造成数据错误,我们得避免这种情况。这时我们就可以使用Stream中的消息分组来解决。
注意在Stream中处于同一个group中的多个消费者是竞争关系,就能够保证消息只会被其中一个应用消费一次。
不同组是可以全面消费的(重复消费)
8802和8803时不同的分组,会重复消费。
故障现象:重复消费
导致原因:默认分组group不同,组流水号不一样,所以被认为成不同组。
解决原理:微服务应用放置于同一个group中,就能够保证消息只会被其中一个应用消费一次。不同的组是可以消费的,同一个组内会发生竞争关系,只有其中一个可以消费。
4.3 自定义分组修改8802和8803的application.xml,group都修改为group1
访问5次:http://localhost:8801/sendMessage
8801控制台输出:
serial: 65a0bcf4-a7b6-4e60-8587-5dac8b2f74db serial: b16d1fc4-7ab0-428d-b479-1f672b6118d7 serial: 50414987-7621-469b-8a38-e6127bb3b8f7 serial: 7086e411-3e57-4821-8bd4-d612fd9cfecf serial: fb907bbf-6886-4808-a2bc-be073d28a5f0
8802控制台输出:
消费者1号接收到的消息:65a0bcf4-a7b6-4e60-8587-5dac8b2f74db port:8802 消费者1号接收到的消息:50414987-7621-469b-8a38-e6127bb3b8f7 port:8802 消费者1号接收到的消息:fb907bbf-6886-4808-a2bc-be073d28a5f0 port:8802
8803控制台输出:
消费者1号接收到的消息:b16d1fc4-7ab0-428d-b479-1f672b6118d7 port:8803 消费者1号接收到的消息:7086e411-3e57-4821-8bd4-d612fd9cfecf port:8803
此时8802和8803将竞争8801发送的消息,轮询消费。
4.4 持久化group设置后,当消费者关闭时生产者发送消息,消费端启动后仍会收到之前没有处理的消息。



