消息驱动生产者:
- 导入POM依赖
org.springframework.cloud spring-cloud-starter-stream-rabbit
YML
server:
port: 8801
spring:
application:
name: cloud-stream-provider
cloud:
stream:
#配置绑定的mq信息
binders:
defaultRabbit:
# 消息组件类型
type: rabbit
#设置rabbitmq的相关环境配置
environment:
spring:
rabbitmq:
host: 192.168.0.6
port: 5672
username:
password:
# 服务的整合处理
bindings:
output:
# 要使用的exchange名称
destination: studyExchange
# 设置消息类型
content-type: application/json
# 设置要绑定的消息服务的具体设置
binder: defaultRabbit
eureka:
client:
fetch-registry: true
register-with-eureka: true
service-url:
defaultZone: http://eureka7001:7001/eureka
instance:
# 设置心跳的间隔时间
lease-renewal-interval-in-seconds: 2
#
lease-expiration-duration-in-seconds: 5
instance-id: send-8801
prefer-ip-address: true 主启动
@SpringBootApplication
public class StreamMQMain8801 {
public static void main(String[] args) {
SpringApplication.run(StreamMQMain8801.class,args);
}
}
service
public interface IMessageProvider {
public String send();
}
实现类
package com.ljw.springcloudstudy.service.impl;
import com.ljw.springcloudstudy.service.IMessageProvider;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
import javax.annotation.Resource;
import java.util.UUID;
@EnableBinding(Source.class)//定义消息的推送管道
public class IMessageProviderImpl implements IMessageProvider {
//指的是发送者发给binder
@Resource
private MessageChannel output;
@Override
public String send() {
UUID serial = UUID.randomUUID();
output.send(MessageBuilder.withPayload(serial).build());
System.out.println("******serial:" + serial);
return null;
}
}
controller
@RestController
public class SendMessageController {
@Resource
private IMessageProvider iMessageProvider;
@GetMapping("/message")
public String sendMessage(){
return iMessageProvider.send();
}
}
启动后,会根据配置文件注册一个交换机
消息驱动的消费者:
导入POM依赖
小 org.springframework.cloud spring-cloud-starter-stream-rabbit
写YML
server:
port: 8802
spring:
application:
name: cloud-stream-provider
cloud:
stream:
#配置绑定的mq信息
binders:
defaultRabbit:
# 消息组件类型
type: rabbit
#设置rabbitmq的相关环境配置
environment:
spring:
rabbitmq:
host: 192.168.0.6
port: 5672
username:
password:
# 服务的整合处理
bindings:
input:
# 要使用的exchange名称
destination: studyExchange
# 设置消息类型
content-type: application/json
# 设置要绑定的消息服务的具体设置
binder: defaultRabbit
eureka:
client:
fetch-registry: true
register-with-eureka: true
service-url:
defaultZone: http://eureka7001:7001/eureka
instance:
# 设置心跳的间隔时间
lease-renewal-interval-in-seconds: 2
#
lease-expiration-duration-in-seconds: 5
instance-id: receive-8802
prefer-ip-address: true 主启动
@SpringBootApplication
public class StreamMQMain8802 {
public static void main(String[] args) {
SpringApplication.run(StreamMQMain8802.class, args);
}
}
controller
package com.ljw.springcloudstudy.controller;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;
import org.springframework.web.bind.annotation.RestController;
@RestController
@EnableBinding(Sink.class)
public class ReceiveMessageListener {
@Value("${server.port}")
private String serverPort;
@StreamListener(Sink.INPUT)
public void input(Message message) {
System.out.println("消费者一号,-----收到的消息:" + message.getPayload() + serverPort);
}
}
那么,有多个消费者的时候,会产生重复消费的问题,假设现在还有一个消费者8803;
解决重复消费的问题:
- 消费者在同一组会产生竞争关系,只有一个可以消费,在不同组会发生重复消费通过分组group来解决这个问题,修改两个消费者的配置文件,设置为同一组:
bindings:
input:
# 要使用的exchange名称
destination: studyExchange
# 设置消息类型
content-type: application/json
# 设置要绑定的消息服务的具体设置
binder: defaultRabbit
group: ljw1
消息持久化:
- 设置了group消息就会持久化,避免了消息丢失的问题。



