栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Day8-SpringCloud消息驱动Stream与链路追踪Sleuth

Day8-SpringCloud消息驱动Stream与链路追踪Sleuth

SpringCloud Stream 消息驱动

屏蔽底层消息中间件的差异 统一消息的编程模型(没有什么是套一层接口解决不了的~)

比方说我们用到了RabbitMQ和Kafka,由于这两个消息中间件的架构上的不同,

像RabbitMQ有exchange,kafka有Topic和Partitions分区,

这些中间件的差异性导致我们实际项目开发给我们造成了一定的困扰,我们如果用了两个消息队列的其中一种,后面的业务需求,我想往另外一种消息队列进行迁移,这时候无疑就是一个灾难性的,一大堆东西都要重新推倒重新做,因为它跟我们的系统耦合了,这时候springcloud Stream给我们提供了一种解耦合的方式。

Stream总体架构图

发送和接收消息流程 生产者
  • pom.xml 引入依赖

         
                org.springframework.cloud
                spring-cloud-starter-stream-rabbit
            
    
  • yaml配置

    server:
      port: 8801
    
    spring:
      application:
        name: cloud-stream-provider
    
      rabbitmq:
        host: 106.14.154.114
        port: 5672
        username: admin
        password: 123456
      cloud:
          stream:
             binders: # 在此处配置要绑定的rabbitmq的服务信息;
              defaultRabbit: # 表示定义的名称,用于于binding整合
                type: rabbit # 消息组件类型
             bindings: # 服务的整合处理
               output: # 这个名字是一个通道的名称
                destination: studyExchange # 表示要使用的Exchange名称定义
                content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain”
                binder: defaultRabbit # 设置要绑定的消息服务的具体设置
    
    eureka:
      client: # 客户端进行Eureka注册的配置
        service-url:
          defaultZone: http://localhost:7001/eureka,http://localhost:7002/eureka
      instance:
        lease-renewal-interval-in-seconds: 2 # 设置心跳的时间间隔(默认是30秒)
        lease-expiration-duration-in-seconds: 5 # 如果现在超过了5秒的间隔(默认是90秒)
    #    instance-id: send-8801.com  # 在信息列表时显示主机名称
    #    prefer-ip-address: true     # 访问的路径变为IP地址
    
  • 发送消息

    Controller

        @GetMapping(value = "/sendMessage")
        public String sendMessage()
        {
            return messageProvider.send();
        }
    

    messageProvider.send()

    package com.atguigu.springcloud.service.impl;
    
    import com.atguigu.springcloud.service.IMessageProvider;
    import org.springframework.cloud.stream.annotation.EnableBinding;
    import org.springframework.integration.support.MessageBuilderFactory;
    import org.springframework.messaging.MessageChannel;
    import org.springframework.integration.support.MessageBuilder;
    import javax.annotation.Resource;
    import org.springframework.cloud.stream.messaging.Source;
    
    import javax.annotation.Resource;
    import java.util.UUID;
    
    @EnableBinding(Source.class) //定义消息的推送管道 Source 是推送方 Sink 是接收方
    public class MessageProviderImpl implements IMessageProvider
    {
        @Resource
        private MessageChannel output; // 消息发送管道
    
        @Override
        public String send()
        {
            String serial = UUID.randomUUID().toString();
            output.send(MessageBuilder.withPayload(serial).build());
            System.out.println("*****serial: "+serial);
            return null;
        }
    }
    
    
  • 调用接口 向RabbitMQ中发送消息

http://localhost:8801/sendMessage

消费者

pom.xml

        
            org.springframework.cloud
            spring-cloud-starter-stream-rabbit
        

yaml

server:
  port: 8802

spring:
  application:
    name: cloud-stream-consumer


  rabbitmq:
    host: 106.14.154.114
    port: 5672
    username: admin
    password: 123456
  cloud:
      stream:
        binders: # 在此处配置要绑定的rabbitmq的服务信息;
          defaultRabbit: # 表示定义的名称,用于于binding整合
            type: rabbit # 消息组件类型
#            environment: # 设置rabbitmq的相关的环境配置
#              spring:
#                rabbitmq:
#                  host: localhost
#                  port: 5672
#                  username: guest
#                  password: guest
        bindings: # 服务的整合处理
          input: # 这个名字是一个通道的名称
            destination: studyExchange # 表示要使用的Exchange名称定义
            content-type: application/json # 设置消息类型,本次为对象json,如果是文本则设置“text/plain”
            binder: defaultRabbit # 设置要绑定的消息服务的具体设置

Controller

package com.atguigu.springcloud.controller;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

@Component
@EnableBinding(Sink.class) //表明绑定接收方
public class ReceiveMessageListenerController
{
    @Value("${server.port}")
    private String serverPort;


    @StreamListener(Sink.INPUT)
    public void input(Message message)
    {
        System.out.println("消费者1号,----->接受到的消息: "+message.getPayload()+"t  port: "+serverPort);
    }
}

分组消费与持久化

先给出结论 不同组是订阅模式 会重复消费 同一组中所有单位只有一个能消费

设置分组

        bindings: # 服务的整合处理
          input: # 这个名字是一个通道的名称
            destination: studyExchange # 表示要使用的Exchange名称定义
            content-type: application/json # 设置消息类型,本次为对象json,如果是文本则设置“text/plain”
            binder: defaultRabbit # 设置要绑定的消息服务的具体设置
            group: atguiguA # 设置消费者的分组

验证: 设置统一分组 每个消费者消费不同的消息

设置不同分组 每个消费者消费相同消息:

持久化

给出结论: 当消费者宕机之后 设置分组的消费者可消费 还在交换机中的消息

验证

都设置不同分组

每个消费者消费同一消息

设置相同分组:

后启动的没抢到消息~~


SpringCloud Sleuth 链路追踪

引出: 在微服务框架中,一个由客户端发起的请求在后端系统中会经过多个不同的的服务节点调用来协同产生最后的请求结果,每一个前段请求都会形成一条复杂的分布式服务调用链路,链路中的任何一环出现高延时或错误都会引起整个请求最后的失败。

挑明说吧: 就是查看每条路线和在该路线下所花的时间

使用步骤
  • 【zipkin】下载安装启动 端口: http://localhost:9411

  • 客户端服务端分别导入依赖

            
            
                org.springframework.cloud
                spring-cloud-starter-zipkin
            
    
  • Yaml配置

server:
  port: 8001

spring:
  application:
    name: cloud-payment-service
  zipkin:
    base-url: http://localhost:9411
  sleuth:
    sampler:

      #采样率值介于 0 到 1 之间,1 则表示全部采集
     probability: 1
  • 测试即可

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/696032.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号