文章目录
- 前言
- 一、说明
- 二、使用
- 1.引入库
- 2.解读
- 3.配置文件
- 总结
前言
屏蔽底层消息中间件的差异,降低切换成本,同一消息的变成模型
目前只支持Rabbitmq和kafka
中文文档
一、springcloud stream是什么
其实就类似于jdbc一样,统一了对消息处理的细节,让我们不在于注重某一个消息队列是怎样实现的,都有一样的操作api
二、使用
1.引入库
:
当前项目要依赖的坐标
org.springframework.cloud
spring-cloud-starter-stream-ribbon
它所依赖的版本库
父工程org.springframework.boot spring-boot-dependencies2.2.2.RELEASE pom import org.springframework.cloud spring-cloud-dependenciesHoxton.SR1 pom import ........ com.alibaba.cloud spring-cloud-alibaba-dependencies2.1.0.RELEASE pom import
2.解读
Binder: 是应用与消息中间剑的封装,目前实行了kafka和Rabbitmq的binder,
通过Binder可以很方便的连接中间剑,可以动态的改变消息类型(对应kafka的topic,
Rabbitmq的exchange),这些可以通过配置文件来实现。
@Input 注解标识输入通道,通过该输入通道接收的消息进入应用程序
@Output 注解标识输出通道。发布的消息将通过该通道离开应用程序
@StreamListener 监听队列,用于消费的队列的消息接收
@EnableBinding 指通道channel和exchange绑定在一起。
3.配置文件
server:
port: 8004
spring:
application:
name: producer-8002
profiles:
active: dev
cloud:
stream:
binders:
defaultRabbit:
type: rabbit #消息组件类型
environment:
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
bindings: #服务的整合处理
output: #通道名称 消息发送需要配置
destination: exchange #交换机名称
content-type: application/json #消息类型
binder: defaultRabbit #设置要绑定的消息服务的具体设置
input: # 消息接收需要配置
destination: exchange #交换机名称
content-type: application/json #消息类型
binder: defaultRabbit #设置要绑定的消息服务的具体设置
配置文件中的 output和intput一个是发送和接收的配置,我们可以配置在一起,或者分开(因为有的服务是专门发送消息和接收消息的)
4.消息生产者我们不需要去关注我们的Rabbitmq的细节
定义接口:
public interface ProducerService {
String send();
}
接口实现类:
import com.sz.service.ProducerService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
import javax.annotation.Resource;
@EnableBinding(Source.class) //定义消息的推送管道
public class ProducerServiceImpl implements ProducerService{
@Resource
private MessageChannel output; //消息发送管道
@Override
public String send() {
boolean flag = output.send(MessageBuilder.withPayload("消息内容").build());
System.out.println(flag);
return "";
}
}
//注意,导包的时候不要导错了
5.消息消费者
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
@EnableBinding(Sink.class)
public class ConsumerListener {
@StreamListener(Sink.INPUT)
public void input(Message message){
System.out.println("消费消息:"+message.getPayload());
}
}
//注意,导包的时候不要导错了
此时我们写一个接口去发送消息:
然后控制台就会有消费接收的消息(因为我们打印了)
6.问题 1.重复消费就比如我们上面的消费者代码,我们在开一个服务,写上一样的代码去监听这个交换机,
那么两个监听的方法都可以获取到消息。
这个时候我们就需要使用分组来解决。
注意:在stream中处于同一个分组中的消费者是竞争关系,就能够保证每个消息只能被同一个组中的其中一个消费者消费。但是不同的组是可以消费的。
说明白点就是,处在同一个组的消费者只有一个能够消费到消息,
消费者分布在不同的组上,则每个组都有一个消费者可以消费到消息。
自定义分组:
input:
destination: exchange #交换机名称
content-type: application/json #消息类型
binder: defaultRabbit #设置要绑定的消息服务的具体设置
group: groupA #分组名称
就是在消费者的服务配置文件中添加group: groupA
多个消费者我们就使用相同的分组名称就可以了
2.持久化
其实使用了分组就已经实现了持久化。 但是要注意的一点是:一点要配置分组, 因为配置了分组后,就算宕机了,重新启动后也能够消费到之前的消息, 但是如果把group分组信息删除了,则重启后,是接收不到之前的消息的。 注意注意:



