import java.io.Serializable;
public class Product implements Serializable{
private Integer id;
private String name;
public Integer getId() {
return id;
}
public void setId(Integer id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public Product(Integer id, String name) {
super();
this.id = id;
this.name = name;
}
public Product() {
super();
}
@Override
public String toString() {
return “Product [id=” + id + “, name=” + name + “]”;
}
}
2.创建stream-group-receiverA服务
2.1 创建项目 2.2 pom文件
xsi:schemaLocation=“http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd”> 4.0.0 org.springframework.boot spring-boot-starter-parent 1.5.13.RELEASE com.bobo stream-group-receiverA 0.0.1-SNAPSHOT org.springframework.cloud spring-cloud-dependencies Dalston.SR5 pom import org.springframework.boot spring-boot-starter-web org.springframework.cloud spring-cloud-starter-eureka org.springframework.cloud spring-cloud-starter-stream-rabbit org.springframework.boot spring-boot-maven-plugin 配置文件中配置分组“groupProduct” spring.application.name=stream-group-receiverA server.port=9070 #设置服务注册中心地址,指向另一个注册中心 eureka.client.serviceUrl.defaultZone=http://dpb:123456@eureka1:8761/eureka/,http://dpb:123456@eureka2:8761/eureka/ #rebbitmq 链接信息 spring.rabbitmq.host=192.168.88.150 spring.rabbitmq.port=5672 spring.rabbitmq.username=dpb spring.rabbitmq.password=123 spring.rabbitmq.virtualHost=/ spring.cloud.stream.bindings.inputProduct.destination=exchangeProduct spring.cloud.stream.bindings.inputProduct.group=groupProduct public interface IReceiverService { String INPUT = “inputProduct”; @Input(INPUT) SubscribableChannel receiver(); } @Service @EnableBinding(IReceiverService.class) public class ReceiverService { @StreamListener(IReceiverService.INPUT) public void onReceiver(Product p){ System.out.println(“消费者A:”+p); } } 注意同样需要添加Product类 package com.bobo.stream.pojo; import java.io.Serializable; public class Product implements Serializable{ private Integer id; private String name; public Integer getId() { return id; } public void setId(Integer id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public Product(Integer id, String name) { super(); this.id = id; this.name = name; } public Product() { super(); } @Override public String toString() { return “Product [id=” + id + “, name=” + name + “]”; } } @SpringBootApplication @EnableEurekaClient @EnableBinding(value={IReceiverService.class}) public class StreamReceiverStart { public static void main(String[] args) { SpringApplication.run(StreamReceiverStart.class, args); } } 3.创建stream-group-receiverB服务 此服务和stream-group-receiverA一样,复制一份只需修改application.properties中的服务名称,端口。我们先将group设置不一样,我们测试来看看 spring.application.name=stream-group-receiverB server.port=9071 #设置服务注册中心地址,指向另一个注册中心 eureka.client.serviceUrl.defaultZone=http://dpb:123456@eureka1:8761/eureka/,http://dpb:123456@eureka2:8761/eureka/ #rebbitmq 链接信息 spring.rabbitmq.host=192.168.88.150 spring.rabbitmq.port=5672 spring.rabbitmq.username=dpb spring.rabbitmq.password=123 spring.rabbitmq.virtualHost=/ spring.cloud.stream.bindings.inputProduct.destination=exchangeProduct spring.cloud.stream.bindings.inputProduct.group=groupProduct1 4.测试代码 @RunWith(SpringRunner.class) @SpringBootTest(classes=StreamSenderStart.class) public class StreamTest { @Autowired private ISendeService sendService; @Test public void testStream(){



