一、使用Stream向RabbitMQ发送/接收消息:
1.导入stream和rabbitmq依赖包:
org.springframework.boot spring-boot-starter-actuatororg.springframework.cloud spring-cloud-starter-stream-rabbit
2.application.yml中配置stream和rabbitmq:
(1)配置rmq服务器连接信息:
spring: application: name: stream-service rabbitmq: #配置rabbitmq服务器连接信息 host: 192.168.233.147 port: 5672 username: root password: root ... server: port: 10004 management: #开启actuator-endpoint security: enabled: false #关闭security endpoints: web: exposure: include: "*" endpoint: health: show-details: always
(2)配置stream发送/接收普通消息/分组消息/延迟消息:
spring: ... cloud: stream: instance-count: 2 #(2)参与消息分区的接收者实例总数为2个(消息分区配置) instance-index: 0 #(2)接收者实例初始索引为0(消息分区配置) rabbit: #(3)延迟消息配置(RabbitMQ要安装延迟队列插件) bindings: delayed-sender: producer: delayedExchange: true #开启消息延迟 bindings: #将发送者与接收者共同绑定到mall_exchange(exchange名称) #############################(1)普通消息配置 start############################# receiver: #消息接收配置,此处名称为Topic.receiver destination: topic_exchange #exchange名称 sender: #消息发送配置,此处名称为Topic.sender destination: topic_exchange #exchange名称 #############################(1)普通消息配置 end############################# #############################(2)分组消息配置 start############################# group-receiver: #分区消息接收配置,此处名称为GroupTopic.group-receiver destination: group_exchange #exchange名称 group: group1 #设置分组名 consumer: partitioned: true #打开消息分区功能 group-sender: #分区消息发送配置,此处名称为GroupTopic.group-sender destination: group_exchange #exchange名称 producer: partitionCount: 2 #2个分区 partitionKeyexpression: headers['instanceIndex'] #规定指定实例索引的接收者才能收到消息,发送时通过MessageBuilder.setHeader(key名, 索引值)设置接收者索引 #############################(2)分组消息配置 end############################# #############################(3)延迟消息配置 start############################# delayed-receiver: #分区消息接收配置,此处名称为DelayedTopic.delayed-receiver destination: delayed_exchange #exchange名称 delayed-sender: #分区消息发送配置,此处名称为DelayedTopic.delayed-sender destination: delayed_exchange #exchange名称 #############################(3)延迟消息配置 end#############################
3.创建自定义普通/分组/延迟消息Topic:
(1)普通消息Topic:
public interface Topic { //自定义普通消息Topic
String INPUT = "receiver"; //自定义名
String OUTPUT = "sender"; //自定义名
@Input(INPUT)
SubscribableChannel input();
@Output(OUTPUT)
MessageChannel output();
}
(2)分组消息Topic:
public interface GroupTopic { //分组消息Topic
String INPUT = "group-receiver"; //自定义名
String OUTPUT = "group-sender"; //自定义名
@Input(INPUT)
SubscribableChannel input();
@Output(OUTPUT)
MessageChannel output();
}
(3)延迟消息Topic:
public interface DelayedTopic { //延迟消息Topic
String INPUT = "delayed-receiver"; //自定义名
String OUTPUT = "delayed-sender"; //自定义名
@Input(INPUT)
SubscribableChannel input();
@Output(OUTPUT)
MessageChannel output();
}
4.创建消息接收类接收普通/分组/延迟消息:
@EnableBinding(value = {Sink.class, Topic.class, GroupTopic.class, DelayedTopic.class}) //接收rmq消息
public class MsgReceive {
@StreamListener(Sink.INPUT) //接收普通消息,Sink.INPUT为系统自带
public void onMsg0(String msg) {
//此处接受到普通消息
}
@StreamListener(Topic.INPUT) //接收普通消息,Topic.INPUT为自定义
public void onMsg(String msg) {
//此处接受到普通消息
}
@StreamListener(GroupTopic.INPUT) //接收分组消息,GroupTopic.INPUT为自定义
public void onGroupMsg(String msg) {
// public void onGroupMsg(String msg, @Header("version") String version) { //接收带版本号的分组消息,可以根据版本号处理不同的逻辑
//此处接受到分组消息
}
@StreamListener(DelayedTopic.INPUT) //接收延迟消息,DelayedTopic.INPUT为自定义
public void onDelayedMsg(String msg) {
//此处接受到延迟消息
}
}
5.测试发送消息:
@Controller
public class MsgSendController {
@Autowired
private Topic topic; //消息发送
@Autowired
private GroupTopic groupTopic; //分组消息发送
@Autowired
private DelayedTopic delayedTopic; //延迟消息发送
@RequestMapping("/sendMsg")
@ResponseBody
public String sendMsg(String msg) {
topic.output().send(MessageBuilder.withPayload(msg).build()); //发送消息到rmq
return "发送普通消息";
}
@RequestMapping("/sendGroupMsg")
@ResponseBody
public String sendGroupMsg(String instanceIndex, String msg) {
groupTopic.output().send(MessageBuilder.withPayload(msg).setHeader("instanceIndex", instanceIndex).build());
//groupTopic.output().send(MessageBuilder.withPayload(msg).setHeader("version", "1.0.0").build()); //发送带版本号的消息
return "发送分组消息";
}
@RequestMapping("/sendDelayedMsg")
@ResponseBody
public String sendDelayedMsg(String msg) {
delayedTopic.output().send(MessageBuilder.withPayload(msg).setHeader("x-delay", 5000).build()); //通过x-delay头字段控制延迟,此处5毫秒
return "发送延迟消息";
}
}
二、其他配置:
1.配置异常重试(application.yml中):
(1)单主机上异常重试:
spring: cloud: stream: bindings: receiver: #receiver为Topic.receiver consumer: maxAttempts: 3 #单机上异常重试:发生异常时,消息重复发送的最大次数
(2)多主机上异常重试:
spring: rabbitmq: listener: direct: default-requeue-rejected: true #开启多主机上异常重试(方式1):全局异常重试 cloud: stream: bindings: receiver: #receiver为Topic.receiver consumer: maxAttempts: 1 #多主机上异常重试时,强制单机异常重试为1次(也就是不重试) rabbit: bindings: receiver: #receiver为Topic.receiver consumer: requeueRejected: true #开启多主机上异常重试(方式2):仅对receiver异常重试
2.配置死信队列(application.yml中):
spring: cloud: stream: bindings: receiver: #receiver为Topic.receiver consumer: maxAttempts: 3 #异常重试次数 rabbit: bindings: receiver: #receiver为Topic.receiver consumer: autoBindDlq: true #开启死信队列,默认队列名为xxx.dlq
3.异常降级处理:
@EnableBinding(value = {...})
public class MsgReceive {
...
@ServiceActivator(inputChannel = "group_exchange.group1.errors") //配置异常降级回调方法(异常重试超过最大次数后触发),inputChannel命名规则:destination.group.errors
public void callback(Message> message) { //message为org.springframework.messaging.Message
//处理异常降级逻辑
}
}



