(1)引入目的
构建消息驱动的微服务框架,为多种消息中间件提供统一的消息编程模型,目前仅支持RabbitMQ与KafKa;
(2)标准MQ
生产者与消费者之间靠消息媒介传递消息内容;消息必须走特定通道;
(3)实现原理
应用程序通过inputs与outputs来与SpringCloudStream的Binder对象交互。Inputs对应于消费者,Ouputs对应于生产者;通过定义绑定器Binder作为中间对象负责与消息中间件的交互,实现了应用程序与消息中间件细节之间的解耦;Stream的消息通信方式遵循发布-订阅模式,Topic主题进行广播,在RabbitMQ为Exchange,在Kafka为Topic; 1.2 Stream编码常用注解
(1)基本流程
Binder:连接中间件,屏蔽差异;Channel:通道,队列Queue的一种抽象,在消息通讯系统中就是实现存储和转发的媒介,通过频道(Channel)对队列进行配置;Source与Sink:消息的输入与输出;
(2)注解
(1)搭建消息驱动生产发送者
引入RabbitMQ中间件,也可以使用其他的,如Kafaka等
org.springframework.cloud spring-cloud-starter-stream-rabbit
cloud2022 org.example 1.0-SNAPSHOT 4.0.0 cloud-stream cloud-stream org.springframework.cloud spring-cloud-starter-stream-rabbit org.springframework.cloud spring-cloud-starter-netflix-eureka-client org.springframework.boot spring-boot-starter-web org.springframework.boot spring-boot-starter-actuator org.springframework.boot spring-boot-devtools runtime true org.projectlombok lombok true org.springframework.boot spring-boot-starter-test test org.springframework.boot spring-boot-devtools runtime true org.springframework.cloud spring-cloud-netflix-eureka-client
配置文件
server:
port: 8801
eureka:
client:
register-with-eureka: true #是否要注册
fetchRegistry: true #是否抓取注册信息
service-url:
defaultZone: http://eureka7001:7001/eureka #,http://eureka7002:7002/eureka
instance: #修改主机名
instance-id: stream-out
prefer-ip-address: true #访问路径显示IP
spring:
application:
name: cloud-stream-provider
cloud:
stream:
binders: # 在此处配置要绑定的rabbitmq的服务信息;
defaultRabbit: # 表示定义的名称,用于于binding整合
type: rabbit # 消息组件类型
environment: # 设置rabbitmq的相关的环境配置
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
bindings: # 服务的整合处理
output: # 这个名字是一个通道的名称
destination: studyExchange # 表示要使用的Exchange名称定义
content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain”
binder: defaultRabbit # 设置要绑定的消息服务的具体设置
datasource:
url: jdbc:mysql://localhost:3306/springboot-mybatisplus?serverTimezone=Asia/Shanghai&useSSL=false&useUnicode=true&characterEncoding=utf8&characterSetResults=utf8
username: root
password: 123456
driver-class-name=com: mysql.cj.jdbc.Driver
binder: defaultRabbit 报红,但是不影响。
主启动
package com.stream;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication(scanbasePackages = "com.stream")
public class StreamProviderApplication {
public static void main(String[] args) {
SpringApplication.run(StreamProviderApplication.class, args);
}
}
消息发送MessageService
package com.stream.Service;
public interface MessageService {
public String send();
}
消息发送实现MessageServiceImpl
package com.stream.Service.Iml;
import com.stream.Service.MessageService;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
import java.util.UUID;
@EnableBinding(Source.class) //定义消息推送管道
public class MessageServiceImpl implements MessageService {
@Resource
private MessageChannel output; //消息发送管道
@Override
public String send() {
String msg = UUID.randomUUID().toString();
output.send(MessageBuilder.withPayload(msg).build());
return null;
}
}
消息发送Controller
package com.stream.Controller;
import com.stream.Service.MessageService;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
@RestController
public class SendMessageController {
@Resource
private MessageService messageService;
@GetMapping("/send")
public String send(){
return messageService.send();
}
}
启动并登录RabbitMQ,访问接口
(2)搭建消息驱动消费接收者
pom.xml
cloud2022 org.example 1.0-SNAPSHOT 4.0.0 stream-consumer stream-consumer org.springframework.cloud spring-cloud-starter-stream-rabbit org.springframework.cloud spring-cloud-starter-netflix-eureka-client org.springframework.boot spring-boot-starter-web org.springframework.boot spring-boot-starter-actuator org.springframework.boot spring-boot-devtools runtime true org.projectlombok lombok true org.springframework.boot spring-boot-starter-test test org.springframework.boot spring-boot-devtools runtime true org.springframework.cloud spring-cloud-netflix-eureka-client
application.yml:input变为output
server:
port: 8802
spring:
application:
name: cloud-stream-comsumer
cloud:
stream:
binders: # 在此处配置要绑定的rabbitmq的服务信息;
defaultRabbit: # 表示定义的名称,用于于binding整合
type: rabbit # 消息组件类型
environment: # 设置rabbitmq的相关的环境配置
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
bindings: # 服务的整合处理
#**************区别,消息发送者output,接受者input
input: # 这个名字是一个通道的名称
destination: studyExchange # 表示要使用的Exchange名称定义
content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain”
binder: defaultRabbit # 设置要绑定的消息服务的具体设置
datasource:
url: jdbc:mysql://localhost:3306/springboot-mybatisplus?serverTimezone=Asia/Shanghai&useSSL=false&useUnicode=true&characterEncoding=utf8&characterSetResults=utf8
username: root
password: 123456
driver-class-name=com: mysql.cj.jdbc.Driver
eureka:
client:
register-with-eureka: true #是否要注册
fetchRegistry: true #是否抓取注册信息
service-url:
defaultZone: http://eureka7001:7001/eureka #,http://eureka7002:7002/eureka
instance: #修改主机名
instance-id: stream-in
prefer-ip-address: true #访问路径显示IP
主启动
package com.consumer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication(scanbasePackages = "com.consumer")
public class StreamConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(StreamConsumerApplication.class, args);
}
}
消息接收Controller
package com.consumer.Controller;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
@Component
@EnableBinding(Sink.class)
public class StreamConsumerController {
@StreamListener(Sink.INPUT)
public void input(Message message){
System.out.println("接收到消息为:"+message.getPayload()+ "-8802");
}
}
访问:浏览器调用消息发送接口发送消息
1.4 消息重复消费Group同一个消息被两个服务接收,就会出现重复消费问题,比如以上例子,再建一个相同的消息接收方,Stream就会默认为是两个组,就会出现重复消费;同一个group中的多个消费者是竞争关系,只要保证消息被其中一个应用消费,同组其他应用就无法消费;不同组之间可以实现重复消费。
分组前:
添加配置分组
分组后:
给其中一个消息接受方去掉group分组属性;然后停掉两个消息接受服务;发送4条消息;启动消息接受服务;去掉group的消息接受方不会接受消息,另一方会接受到4条消息;



