栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

springcloud详解(springcloud最新版本)

springcloud详解(springcloud最新版本)

1、消息驱动Stream 1.1 基本概念

(1)引入目的

构建消息驱动的微服务框架,为多种消息中间件提供统一的消息编程模型,目前仅支持RabbitMQ与KafKa;

(2)标准MQ

生产者与消费者之间靠消息媒介传递消息内容;消息必须走特定通道;

(3)实现原理

应用程序通过inputs与outputs来与SpringCloudStream的Binder对象交互。Inputs对应于消费者,Ouputs对应于生产者;通过定义绑定器Binder作为中间对象负责与消息中间件的交互,实现了应用程序与消息中间件细节之间的解耦;Stream的消息通信方式遵循发布-订阅模式,Topic主题进行广播,在RabbitMQ为Exchange,在Kafka为Topic; 1.2 Stream编码常用注解

(1)基本流程

Binder:连接中间件,屏蔽差异;Channel:通道,队列Queue的一种抽象,在消息通讯系统中就是实现存储和转发的媒介,通过频道(Channel)对队列进行配置;Source与Sink:消息的输入与输出;

(2)注解

1.3 Stream使用

(1)搭建消息驱动生产发送者

引入RabbitMQ中间件,也可以使用其他的,如Kafaka等

       
            org.springframework.cloud
            spring-cloud-starter-stream-rabbit
        


    
        cloud2022
        org.example
        1.0-SNAPSHOT
    
    4.0.0

    cloud-stream
    cloud-stream

    
        
            org.springframework.cloud
            spring-cloud-starter-stream-rabbit
        

        
            org.springframework.cloud
            spring-cloud-starter-netflix-eureka-client
        

        
            org.springframework.boot
            spring-boot-starter-web
        
        
            org.springframework.boot
            spring-boot-starter-actuator
        
        
            org.springframework.boot
            spring-boot-devtools
            runtime
            true
        
        
            org.projectlombok
            lombok
            true
        
        
            org.springframework.boot
            spring-boot-starter-test
            test
        

        
            org.springframework.boot
            spring-boot-devtools
            runtime
            true
        
        
            org.springframework.cloud
            spring-cloud-netflix-eureka-client
        

    


配置文件

server:
  port: 8801

eureka:
  client:
    register-with-eureka: true #是否要注册
    fetchRegistry: true #是否抓取注册信息
    service-url:
      defaultZone: http://eureka7001:7001/eureka #,http://eureka7002:7002/eureka
  instance: #修改主机名
    instance-id: stream-out
    prefer-ip-address: true #访问路径显示IP

spring:
  application:
    name: cloud-stream-provider
  cloud:
    stream:
      binders: # 在此处配置要绑定的rabbitmq的服务信息;
        defaultRabbit: # 表示定义的名称,用于于binding整合
          type: rabbit # 消息组件类型
          environment: # 设置rabbitmq的相关的环境配置
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: guest
                password: guest
      bindings: # 服务的整合处理
        output: # 这个名字是一个通道的名称
          destination: studyExchange # 表示要使用的Exchange名称定义
          content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain”
          binder: defaultRabbit # 设置要绑定的消息服务的具体设置

  datasource:
    url: jdbc:mysql://localhost:3306/springboot-mybatisplus?serverTimezone=Asia/Shanghai&useSSL=false&useUnicode=true&characterEncoding=utf8&characterSetResults=utf8
    username: root
    password: 123456
    driver-class-name=com: mysql.cj.jdbc.Driver

binder: defaultRabbit 报红,但是不影响。

主启动

package com.stream;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication(scanbasePackages = "com.stream")
public class StreamProviderApplication {

    public static void main(String[] args) {
        SpringApplication.run(StreamProviderApplication.class, args);
    }

}

消息发送MessageService

package com.stream.Service;

public interface MessageService {
    public String send();
}

消息发送实现MessageServiceImpl

package com.stream.Service.Iml;

import com.stream.Service.MessageService;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;

import java.util.UUID;

@EnableBinding(Source.class) //定义消息推送管道
public class MessageServiceImpl implements MessageService {

    @Resource
    private MessageChannel output; //消息发送管道

    @Override
    public String send() {
        String msg = UUID.randomUUID().toString();
        output.send(MessageBuilder.withPayload(msg).build());
        return null;
    }
}

消息发送Controller

package com.stream.Controller;

import com.stream.Service.MessageService;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;

@RestController
public class SendMessageController {

    @Resource
    private MessageService messageService;

    @GetMapping("/send")
    public String send(){
        return messageService.send();
    }
}

启动并登录RabbitMQ,访问接口

(2)搭建消息驱动消费接收者

pom.xml



    
        cloud2022
        org.example
        1.0-SNAPSHOT
    
    4.0.0

    stream-consumer
    stream-consumer

    
        
            org.springframework.cloud
            spring-cloud-starter-stream-rabbit
        

        
            org.springframework.cloud
            spring-cloud-starter-netflix-eureka-client
        

        
            org.springframework.boot
            spring-boot-starter-web
        
        
            org.springframework.boot
            spring-boot-starter-actuator
        
        
            org.springframework.boot
            spring-boot-devtools
            runtime
            true
        
        
            org.projectlombok
            lombok
            true
        
        
            org.springframework.boot
            spring-boot-starter-test
            test
        

        
            org.springframework.boot
            spring-boot-devtools
            runtime
            true
        
        
            org.springframework.cloud
            spring-cloud-netflix-eureka-client
        

    


application.yml:input变为output

server:
  port: 8802


spring:
  application:
    name: cloud-stream-comsumer
  cloud:
    stream:
      binders: # 在此处配置要绑定的rabbitmq的服务信息;
        defaultRabbit: # 表示定义的名称,用于于binding整合
          type: rabbit # 消息组件类型
          environment: # 设置rabbitmq的相关的环境配置
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: guest
                password: guest
      bindings: # 服务的整合处理
        #**************区别,消息发送者output,接受者input
        input: # 这个名字是一个通道的名称
          destination: studyExchange # 表示要使用的Exchange名称定义
          content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain”
          binder: defaultRabbit # 设置要绑定的消息服务的具体设置

  datasource:
    url: jdbc:mysql://localhost:3306/springboot-mybatisplus?serverTimezone=Asia/Shanghai&useSSL=false&useUnicode=true&characterEncoding=utf8&characterSetResults=utf8
    username: root
    password: 123456
    driver-class-name=com: mysql.cj.jdbc.Driver


eureka:
  client:
    register-with-eureka: true #是否要注册
    fetchRegistry: true #是否抓取注册信息
    service-url:
      defaultZone: http://eureka7001:7001/eureka #,http://eureka7002:7002/eureka
  instance: #修改主机名
    instance-id: stream-in
    prefer-ip-address: true #访问路径显示IP

主启动

package com.consumer;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication(scanbasePackages = "com.consumer")
public class StreamConsumerApplication {

    public static void main(String[] args) {
        SpringApplication.run(StreamConsumerApplication.class, args);
    }

}

消息接收Controller

package com.consumer.Controller;

import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;

@Component
@EnableBinding(Sink.class)
public class StreamConsumerController {

    @StreamListener(Sink.INPUT)
    public void input(Message message){
        System.out.println("接收到消息为:"+message.getPayload()+ "-8802");
    }
}

访问:浏览器调用消息发送接口发送消息

1.4 消息重复消费Group

同一个消息被两个服务接收,就会出现重复消费问题,比如以上例子,再建一个相同的消息接收方,Stream就会默认为是两个组,就会出现重复消费;同一个group中的多个消费者是竞争关系,只要保证消息被其中一个应用消费,同组其他应用就无法消费;不同组之间可以实现重复消费。

分组前:

添加配置分组

分组后:

1.5 消息持久化

给其中一个消息接受方去掉group分组属性;然后停掉两个消息接受服务;发送4条消息;启动消息接受服务;去掉group的消息接受方不会接受消息,另一方会接受到4条消息;

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/771932.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号