栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Spring cloud stream【消息分组】,java过滤器原理

Spring cloud stream【消息分组】,java过滤器原理

import java.io.Serializable;

public class Product implements Serializable{

private Integer id;

private String name;

public Integer getId() {

return id;

}

public void setId(Integer id) {

this.id = id;

}

public String getName() {

return name;

}

public void setName(String name) {

this.name = name;

}

public Product(Integer id, String name) {

super();

this.id = id;

this.name = name;

}

public Product() {

super();

}

@Override

public String toString() {

return “Product [id=” + id + “, name=” + name + “]”;

}

}

2.创建stream-group-receiverA服务


2.1 创建项目

2.2 pom文件

xsi:schemaLocation=“http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd”>

4.0.0

org.springframework.boot

spring-boot-starter-parent

1.5.13.RELEASE

com.bobo

stream-group-receiverA

0.0.1-SNAPSHOT

org.springframework.cloud

spring-cloud-dependencies

Dalston.SR5

pom

import

org.springframework.boot

spring-boot-starter-web

org.springframework.cloud

spring-cloud-starter-eureka

org.springframework.cloud

spring-cloud-starter-stream-rabbit

org.springframework.boot

spring-boot-maven-plugin

2.3 配置文件

配置文件中配置分组“groupProduct”

spring.application.name=stream-group-receiverA

server.port=9070

#设置服务注册中心地址,指向另一个注册中心

eureka.client.serviceUrl.defaultZone=http://dpb:123456@eureka1:8761/eureka/,http://dpb:123456@eureka2:8761/eureka/

#rebbitmq 链接信息

spring.rabbitmq.host=192.168.88.150

spring.rabbitmq.port=5672

spring.rabbitmq.username=dpb

spring.rabbitmq.password=123

spring.rabbitmq.virtualHost=/

对应 MQ 是 exchange 和消息发送者的 交换器是同一个

spring.cloud.stream.bindings.inputProduct.destination=exchangeProduct

具体分组 对应 MQ 是 队列名称 并且持久化队列 inputProduct 自定义

spring.cloud.stream.bindings.inputProduct.group=groupProduct

2.4 接收消息的接口

public interface IReceiverService {

String INPUT = “inputProduct”;

@Input(INPUT)

SubscribableChannel receiver();

}

2.5 消息的具体处理类

@Service

@EnableBinding(IReceiverService.class)

public class ReceiverService {

@StreamListener(IReceiverService.INPUT)

public void onReceiver(Product p){

System.out.println(“消费者A:”+p);

}

}

注意同样需要添加Product类

package com.bobo.stream.pojo;

import java.io.Serializable;

public class Product implements Serializable{

private Integer id;

private String name;

public Integer getId() {

return id;

}

public void setId(Integer id) {

this.id = id;

}

public String getName() {

return name;

}

public void setName(String name) {

this.name = name;

}

public Product(Integer id, String name) {

super();

this.id = id;

this.name = name;

}

public Product() {

super();

}

@Override

public String toString() {

return “Product [id=” + id + “, name=” + name + “]”;

}

}

2.6 启动类

@SpringBootApplication

@EnableEurekaClient

@EnableBinding(value={IReceiverService.class})

public class StreamReceiverStart {

public static void main(String[] args) {

SpringApplication.run(StreamReceiverStart.class, args);

}

}

3.创建stream-group-receiverB服务


此服务和stream-group-receiverA一样,复制一份只需修改application.properties中的服务名称,端口。我们先将group设置不一样,我们测试来看看

spring.application.name=stream-group-receiverB

server.port=9071

#设置服务注册中心地址,指向另一个注册中心

eureka.client.serviceUrl.defaultZone=http://dpb:123456@eureka1:8761/eureka/,http://dpb:123456@eureka2:8761/eureka/

#rebbitmq 链接信息

spring.rabbitmq.host=192.168.88.150

spring.rabbitmq.port=5672

spring.rabbitmq.username=dpb

spring.rabbitmq.password=123

spring.rabbitmq.virtualHost=/

对应 MQ 是 exchange 和消息发送者的 交换器是同一个

spring.cloud.stream.bindings.inputProduct.destination=exchangeProduct

具体分组 对应 MQ 是 队列名称 并且持久化队列 inputProduct 自定义

spring.cloud.stream.bindings.inputProduct.group=groupProduct1

4.测试代码


@RunWith(SpringRunner.class)

@SpringBootTest(classes=StreamSenderStart.class)

public class StreamTest {

@Autowired

private ISendeService sendService;

@Test

public void testStream(){

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/487091.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号