栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

【SpringCloud】Stream概述、消息驱动生产者、消费者、消费分组、持久化

【SpringCloud】Stream概述、消息驱动生产者、消费者、消费分组、持久化

 学习视频 

尚硅谷SpringCloud框架开发教程(SpringCloudAlibaba微服务分布式架构丨Spring Cloud)

集数:83—91


 学习格言 

不在能知,乃在能行。


 学习笔记 

【Java】学习笔记汇总


 项目地址 

https://gitee.com/zqcliudaliuda/cloud2021


文章目录
  • 一、概述
    • 1.1 引入原因
    • 1.2 概念
    • 1.3 设计思想
    • 1.4 标准流程
    • 1.5 编码API和常用注解
  • 二、消息驱动之生产者
  • 三、消息驱动之消费者
  • 四、消费分组与持久化
    • 4.1 实际场景
    • 4.2 解决重复消费问题
    • 4.3 自定义分组
    • 4.4 持久化

一、概述

官网:https://spring.io/projects/spring-cloud-stream

1.1 引入原因

屏蔽底层消息中间件的差异,降低切换成本,统一消息的编程模型。


因为平台可能使用各种消息中间件,如ActiveMQ、RabbitMQ、RocketMQ、Kafka等,但是一个人不可能精通这么多消息中间件,所以通过stream可以屏蔽消息中间件的差异,进行统一消息的编程模型。

少学东西,多专注于业务。

1.2 概念

官方定义Spring Cloud Stream是一个构建消息驱动微服务的框架。应用程序通过inputs或者outputs与Spring Cloud Stream中binder对象交互。通过我们配置来binding(绑定),而Spring Cloud Stream的binder对象负责与消息中间件交互。所以,我们只需要搞清楚如何与Spring Cloud Stream交互就可以方便使用消息驱动的方式。

通过使用Spring Integration来连接消息代理中间件以实现消息事件驱动。Spring Cloud Stream为些供应商的消息中间件产品提供了个性化的自动化配置实现,引用了发布-订阅、消费组、分区的三个核心概念。

目前仅支持RabbitMQ、Kafka.

1.3 设计思想

 标准MQ

  • 生产者/消费者之间靠消息媒介传递信息内容: Message
  • 消息必须走特定的通道:消息通道 Message Channel
  • 消息通道里的消息如何被消费、收费处理:消息通道的子接口SubscribableChannel,由MessageHandler消息处理器所订阅

 引入Stream

比方说我们用到了RabbitMQ和Kafka,由于这两个消息中间件的架构上的不同,像RabbitMQ有exchange,kafka有Topic和Partitions分区。


这些中间件的差异性导致我们实际项目开发给我们造成了一定的困扰,我们如果用了两个消息队列的其中一种,后面的业务需求,我想往另外一种消息队列进行迁移,这时候无疑就是一个灾难性的, 一大堆东西都要重新推倒重新做,因为它跟我们的系统耦合了,这时候springcloud Stream给我们提供了一种解耦合的方式。

在没有绑定器这个概念的情况下,我们的SpringBoot应用要直接与消息中间件进行信息交互的时候,
由于各消息中间件构建的初衷不同,它们的实现细节上会有较大的差异性。通过定义绑定器作为中间层,完美地实现了应用程序与消息中间件细节之间的隔离。通过向应用程序暴露统一的Channel通道, 使得应用程序不需要再考虑各种不同的消息中间件实现。通过定义绑定器Binder作为中间层,实现了应用程序与消息中间件细节之间的隔离。

 Binder

INPUT对应于消费者
OUTPUT对应于生产者

通过定义绑定器Binder作为中间件,实现了应用程序与消息中间件细节之间的隔离。

Stream中的消息通信方式遵循了发布-订阅漠式

1.4 标准流程

  • Binder:很方便的连接中间件,屏蔽差异
  • Channel:通道,是队列Queue的一种抽象,在消息通讯系统中就是实现存储和转发的媒介,通过channel对队列进行配置
  • Source和Sink:简单的可理解为参照对象是Springcloud Stream自身,从Stream发布消息就是输出,接受消息就是输入
1.5 编码API和常用注解

二、消息驱动之生产者

 步骤1:新建maven项目

新建maven项目cloud-stream-rabbitmq-provider8801

 步骤2:POM


    
        org.springframework.cloud
        spring-cloud-starter-stream-rabbit
    
    
        org.springframework.cloud
        spring-cloud-starter-netflix-eureka-client
    
    
        org.springframework.boot
        spring-boot-starter-web
    
    
        org.springframework.boot
        spring-boot-starter-actuator
    
    
        org.springframework.boot
        spring-boot-devtools
        runtime
        true
    
    
        org.projectlombok
        lombok
        true
    
    
        org.springframework.boot
        spring-boot-starter-test
        test
    

 步骤3:application.yml

server:
  port: 8801

spring:
  application:
    name: cloud-stream-provider
  cloud:
    stream:
      binders:
        defaultRabbit:
          type: rabbit
          enviroment:
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: guest
                password: guest
      bindings:
        output:
          destination: studyExchange
          content-type: application/json  # 设置消息类型,本次为json,文本则设置 text/plain
          binder: defaultRabbit

eureka:
  client:
    service-url:
      defaultZone: http://eureka7001.com:7001/eureka
  instance:
    lease-renewal-interval-in-seconds: 2 # 心跳时间间隔(默认30s)
    lease-expiration-duration-in-seconds: 5 # 如果超过5s时间间隔(默认90)
    instance-id: send-8801.com  # 在信息列表时显示主机名称
    prefer-ip-address: true  # 访问的路径变为ip地址

 步骤4:主启动类

@SpringBootApplication
public class StreamMQMain8801 {
    public static void main(String[] args) {
        SpringApplication.run(StreamMQMain8801.class, args);
    }
}

 步骤5:业务类

定义接口src/main/java/com/zqc/springcloud/service/impl/MessageProvider.java

public interface IMessageProvider {
    String send();
}

接口实现类src/main/java/com/zqc/springcloud/service/impl/MessageProvider.java

import com.zqc.springcloud.service.IMessageProvider;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;

import javax.annotation.Resource;
import java.util.UUID;

@EnableBinding(Source.class)  // 定义消息的推送管道
public class MessageProvider implements IMessageProvider {

    @Resource
    private MessageChannel output; // 消息发送管道

    @Override
    public String send() {
        String serial = UUID.randomUUID().toString();
        output.send(MessageBuilder.withPayload(serial).build());
        System.out.println("serial: "+ serial);
        return serial;
    }
}

控制类src/main/java/com/zqc/springcloud/controller/SendMessageController.java

@RestController
public class SendMessageController {
    @Resource
    private IMessageProvider messageProvider;

    @GetMapping(value = "/sendMessage")
    public String sendMessage() {
        return messageProvider.send();
    }
}

 步骤6:测试

启动rabbitmq

依次启动:7001eureka、8801

访问http://localhost:15672/#/exchanges,可查看到


多次访问:http://localhost:8801/sendMessage

可在rabbitmq management内查看到:

三、消息驱动之消费者

 步骤1:新建maven项目

新建maven项目cloud-stream-rabbitmq-consumer8802

 步骤2:POM


    
        org.springframework.cloud
        spring-cloud-starter-stream-rabbit
    
    
        org.springframework.cloud
        spring-cloud-starter-netflix-eureka-client
    
    
        org.springframework.boot
        spring-boot-starter-web
    
    
        org.springframework.boot
        spring-boot-starter-actuator
    
    
        org.springframework.boot
        spring-boot-devtools
        runtime
        true
    
    
        org.projectlombok
        lombok
        true
    
    
        org.springframework.boot
        spring-boot-starter-test
        test
    

 步骤3:application.yml

与消费者相比,port不同,name不同;
cloud.stream.bindings.output换成cloud.stream.bindings.input;
instance-id: send-8802.com

server:
  port: 8802

spring:
  application:
    name: cloud-stream-consumer
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
  cloud:
    stream:
      binders:
        defaultRabbit:
          type: rabbit
      bindings:
        input:
          destination: studyExchange
          content-type: application/json  # 设置消息类型,本次为json,文本则设置 text/plain
          binder: defaultRabbit

eureka:
  client:
    service-url:
      defaultZone: http://eureka7001.com:7001/eureka
  instance:
    lease-renewal-interval-in-seconds: 2 # 心跳时间间隔(默认30s)
    lease-expiration-duration-in-seconds: 5 # 如果超过5s时间间隔(默认90)
    instance-id: receive-8802.com  # 在信息列表时显示主机名称
    prefer-ip-address: true  # 访问的路径变为ip地址

 步骤4:主启动类

@SpringBootApplication
public class StreamMQMain8802 {
    public static void main(String[] args) {
        SpringApplication.run(StreamMQMain8802.class,args);
    }
}

 步骤5:业务类

src/main/java/com/zqc/springcloud/controller/ReceiveMessageListenerController.java

import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;

@Component
@EnableBinding(Sink.class)
public class ReceiveMessageListenerController {

    @Value("${server.port}")
    private String serverPort;

    @StreamListener(Sink.INPUT)
    public void input(Message message) {
        System.out.println("消费者1号接收到的消息:" + message.getPayload() + "t port:" + serverPort);
    }
}

 步骤6:测试

访问5次:http://localhost:8801/sendMessage

8801控制台输出:

serial: 68c2cb4f-c1b9-4b8e-8005-e7bfa89e3d1e
serial: 5a2e7fbb-6e2d-4c17-bbd2-32d6f0c2b41e
serial: 16456ab8-246c-4f60-a4dc-213dcec73546
serial: ddb67b0b-c00b-456d-9ef9-4284f05a2102
serial: f68b6b58-ca67-4572-a7dd-0f9aca0717d0

8802控制台输出:

消费者1号接收到的消息:68c2cb4f-c1b9-4b8e-8005-e7bfa89e3d1e	 port:8802
消费者1号接收到的消息:5a2e7fbb-6e2d-4c17-bbd2-32d6f0c2b41e	 port:8802
消费者1号接收到的消息:16456ab8-246c-4f60-a4dc-213dcec73546	 port:8802
消费者1号接收到的消息:ddb67b0b-c00b-456d-9ef9-4284f05a2102	 port:8802
消费者1号接收到的消息:f68b6b58-ca67-4572-a7dd-0f9aca0717d0	 port:8802

rabbitmq控制台可看到:

四、消费分组与持久化

依照8802克隆出8803。

依次启动:7001、8801、8802、8803

访问eureka7001:http://localhost:7001/

8801发送消息时,8802、8803都会接收到,存在重复消费的问题。

4.1 实际场景

比如在如下场景中,订单系统我们做集群部署,都会从RabbitMQ中获取订单信息,那如果一个订单同时被两个服务获取到,那么就会造成数据错误,我们得避免这种情况。这时我们就可以使用Stream中的消息分组来解决。

注意在Stream中处于同一个group中的多个消费者是竞争关系,就能够保证消息只会被其中一个应用消费一次。

不同组是可以全面消费的(重复消费)


8802和8803时不同的分组,会重复消费。

4.2 解决重复消费问题

故障现象:重复消费
导致原因:默认分组group不同,组流水号不一样,所以被认为成不同组。

解决原理:微服务应用放置于同一个group中,就能够保证消息只会被其中一个应用消费一次。不同的组是可以消费的,同一个组内会发生竞争关系,只有其中一个可以消费。

4.3 自定义分组

修改8802和8803的application.xml,group都修改为group1

访问5次:http://localhost:8801/sendMessage

8801控制台输出:

serial: 65a0bcf4-a7b6-4e60-8587-5dac8b2f74db
serial: b16d1fc4-7ab0-428d-b479-1f672b6118d7
serial: 50414987-7621-469b-8a38-e6127bb3b8f7
serial: 7086e411-3e57-4821-8bd4-d612fd9cfecf
serial: fb907bbf-6886-4808-a2bc-be073d28a5f0

8802控制台输出:

消费者1号接收到的消息:65a0bcf4-a7b6-4e60-8587-5dac8b2f74db	 port:8802
消费者1号接收到的消息:50414987-7621-469b-8a38-e6127bb3b8f7	 port:8802
消费者1号接收到的消息:fb907bbf-6886-4808-a2bc-be073d28a5f0	 port:8802

8803控制台输出:

消费者1号接收到的消息:b16d1fc4-7ab0-428d-b479-1f672b6118d7	 port:8803
消费者1号接收到的消息:7086e411-3e57-4821-8bd4-d612fd9cfecf	 port:8803

此时8802和8803将竞争8801发送的消息,轮询消费。

4.4 持久化

group设置后,当消费者关闭时生产者发送消息,消费端启动后仍会收到之前没有处理的消息。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/584489.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号