栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

消息驱动--Spring Cloud Stream

消息驱动--Spring Cloud Stream

1 产生的原因

现在一个很项目可能分为三部分:
前端—>后端---->大数据
而后端开发使用消息中间件,可能会使用RabbitMq
而大数据开发,一般都是使用Kafka,
那么一个项目中有多个消息中间件,并且这两个消息中间件的架构上也有所不同,像RabbitMq有exchange ,Kafka有Topic和Partitions分区,对程序员很不友好,所以产生Spring Cloud Stream

而Spring Cloud Stream就类似jpa,屏蔽底层消息中间件的差异,程序员主要操作Spring Cloud Stream即可,降低切换成本,统一消息编程模型

2 什么是Spring Cloud Stream
Spring Cloud Stream 是一个用来为微服务应用构建消息驱动能力的框架。
它可以基于 Spring Boot 来创建独立的、可用于生产的 Spring 应用程序。Spring Cloud Stream 为一些供应商的消息中间件产品提供了个性化的自动化配置实现,并引入了发布-订阅、消费组、分区这三个核心概念。通过使用 Spring Cloud Stream,可以有效简化开发人员对消息中间件的使用复杂度,让系统开发人员可以有更多的精力关注于核心业务逻辑的处理。

消息中间件有:ActiveMQ、RabbitMQ、RocketMQ、Kafka
但是目前 Spring Cloud Stream 只支持 RabbitMQ 和 Kafka 的自动化配置。

3 Spring Cloud Stream怎么屏蔽底层差异


应用程序通过inputs(生产者)和outputs(消费者)来与Spring Cloud Stream中binder对象交互。
通过我们配置来binding(绑定),而Spring Cloud Stream的binder对象负责与消息中间件交互。
就是利用定义绑定器Binder作为中间层,实现了隔离

4 Spring Cloud Stream的 通信模式

Spring Cloud Stream 中的消息通信方式遵循了发布-订阅模式,当一条消息被投递到消息中间件后,它会通过共享的 Topic 主题进行广播,消息消费者在订阅的主题中收到它并触发自身的业务逻辑处理。

5 Spring Cloud Stream的业务流程

Source和Sink:
简单理解操作对象就是Spring Cloud Stream自身,从Stream发布消息就是输出,接受消息就是输入。
source用于获取数据(要发送到mq的数据)

Channel :
通道,是队列的一种抽象,在消息通讯系统中就是实现存储和转发的媒介。
channel类似SpringCloudStream中的中间件,用于存放source接收到的数据,或者是存放binder拉取的数据

6 Spring Cloud Stream 常用注解和api

7 使用Spring Cloud Stream
需要创建三个项目,一个生产者,两个消费者

7.1)创建生产者
7.1.1)pom


            org.springframework.cloud
            spring-cloud-starter-stream-rabbit
        

7.1.2) 配置文件

server:
  port: 8801

spring:
  application:
    name: cloud-stream-provider
  cloud:
    stream:
      binders: # 在此处配置要绑定的rabbitmq的服务信息;
        defaultRabbit: # 表示定义的名称,用于于binding整合
          type: rabbit # 消息组件类型
          environment: # 设置rabbitmq的相关的环境配置
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: guest
                password: guest
      bindings: # 服务的整合处理
        output: # 这个名字是一个通道的名称
          destination: studyExchange # 表示要使用的Exchange名称定义
          content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain”
          binder: defaultRabbit  # 设置要绑定的消息服务的具体设置

eureka:
  client: # 客户端进行Eureka注册的配置
    service-url:
      defaultZone: http://localhost:7001/eureka
  instance:
    lease-renewal-interval-in-seconds: 2 # 设置心跳的时间间隔(默认是30秒)
    lease-expiration-duration-in-seconds: 5 # 如果现在超过了5秒的间隔(默认是90秒)
    instance-id: send-8801.com  # 在信息列表时显示主机名称
    prefer-ip-address: true     # 访问的路径变为IP地址


7.1.3) 主启动

@SpringBootApplication
@EnableEurekaClient
public class CloudStreamRabbitmqProvider8801Application {

    public static void main(String[] args) {
        SpringApplication.run(CloudStreamRabbitmqProvider8801Application.class, args);
        System.out.println("启动成功");
    }

}

7.1.4) 业务类
serice接口和实现类

public interface IMessageProviderService {
    
    String send();
}


//@EnableBinding(Source.class) 定义消息的推送管道(生产消息) 将Channel和Exchanges绑定在一起
@EnableBinding(Source.class)
public class MessageProviderServiceImpl implements IMessageProviderService {
    
    @Resource
    private MessageChannel output;

    @Override
    public String send() {
        String serial = UUID.randomUUID().toString();
        Message stringMessage = MessageBuilder.withPayload(serial).build();//build会构建一个message类
        output.send(stringMessage);

        System.out.println("*****serial: " + serial);
        return serial;
    }
}

controller类

@RestController
public class SendMessageController {
    @Resource
    private IMessageProviderService messageProviderService;

    @GetMapping(value = "/sendMessage")
    public String sendMessage() {
        return messageProviderService.send();
    }
}

这样调用send方法,将消息发送给channel,然后channel将消费发送给binder,然后发送到rabbitmq中,会在rabbitmq中创建一个Exchange,就是我们配置文件中配置的exchange

7.2)创建消费者1号
7.2.1) pom

   
            org.springframework.cloud
            spring-cloud-starter-stream-rabbit
        

7.2.2) 配置文件

server:
  port: 8802

spring:
  application:
    name: cloud-stream-consumer
  cloud:
    stream:
      binders: # 在此处配置要绑定的rabbitmq的服务信息;
        defaultRabbit: # 表示定义的名称,用于于binding整合
          type: rabbit # 消息组件类型
          environment: # 设置rabbitmq的相关的环境配置
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: guest
                password: guest
      bindings: # 服务的整合处理
        input: # 这个名字是一个通道的名称
          destination: studyExchange # 表示要使用的Exchange名称定义
          content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain”
          binder: defaultRabbit  # 设置要绑定的消息服务的具体设置


eureka:
  client: # 客户端进行Eureka注册的配置
    service-url:
      defaultZone: http://localhost:7001/eureka
  instance:
    lease-renewal-interval-in-seconds: 2 # 设置心跳的时间间隔(默认是30秒)
    lease-expiration-duration-in-seconds: 5 # 如果现在超过了5秒的间隔(默认是90秒)
    instance-id: receive-8802.com  # 在信息列表时显示主机名称
    prefer-ip-address: true     # 访问的路径变为IP地址


7.2.3) 主启动

@SpringBootApplication
@EnableEurekaClient
public class CloudStreamRabbitmqConsumer8802Application {

    public static void main(String[] args) {
        SpringApplication.run(CloudStreamRabbitmqConsumer8802Application.class, args);
        System.out.println("启动成功");
    }
}

7.2.4) 业务类

//@EnableBinding(Sink.class) 负责接收channel发送过来的数据进行消费
@Component
@EnableBinding(Sink.class)
public class ReceiveMessageListener {
    @Value("${server.port}")
    private String serverPort;

    @StreamListener(Sink.INPUT) //监听sink的input,而input在配置文件中配置并且绑定了exchange,获取数据
    public void input(Message message) {
        System.out.println("port:" + serverPort + "t接受:" + message.getPayload());
    }

}

7.3)创建消费者2号

当生产者发送消息的时候,可以看到消费者1号和1号都同时消费了同一条消息,产生了重复消费

8 解决重复消费问题

利用Stream中的消息分组解决
在Stream中同一个group中是竞争关系,保证消息只被一个应用消费一次
不同组可以全面消费(重复消费),Stream默认是不同组的

更改消费者1号和2号的配置文件

bindings: # 服务的整合处理
        input: # 这个名字是一个通道的名称
          destination: studyExchange # 表示要使用的Exchange名称定义
          content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain”
          binder: defaultRabbit  # 设置要绑定的消息服务的具体设置
          group: zuB

9 持久化消费

就是当服务挂了,怎么消费没有消费的数据

这里,先将消费者1号移除zuB组,
然后将消费者1号和2号服务关闭
此时生产者开启,发送3条消息
此时重启消费者1号和2号
可以看到,当消费者1号退出zuB组后,它就获取不到在它宕机的时间段内的数据
但是消费者2号重启后,直接获取到了宕机期间它没有消费的数据,并且消费了
总结:
也就是,当我们没有配置分组时,会出现消息漏消费的问题
而配置分组后,我们可以自动获取未消费的数据

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/745654.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号