栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

消息驱动Stream

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

消息驱动Stream

消息驱动生产者: 

    导入POM依赖
            
                org.springframework.cloud
                spring-cloud-starter-stream-rabbit
            

    YML

    server:
      port: 8801
    
    spring:
      application:
        name: cloud-stream-provider
      cloud:
        stream:
          #配置绑定的mq信息
          binders:
            defaultRabbit:
              # 消息组件类型
              type: rabbit
              #设置rabbitmq的相关环境配置
              environment:
                spring:
                  rabbitmq:
                    host: 192.168.0.6
                    port: 5672
                    username: 
                    password: 
          #                服务的整合处理
          bindings:
            output:
              #          要使用的exchange名称
              destination: studyExchange
              #          设置消息类型
              content-type: application/json
                #          设置要绑定的消息服务的具体设置
              binder: defaultRabbit
    
    eureka:
      client:
        fetch-registry: true
        register-with-eureka: true
        service-url:
          defaultZone: http://eureka7001:7001/eureka
      instance:
        #    设置心跳的间隔时间
        lease-renewal-interval-in-seconds: 2
        #
        lease-expiration-duration-in-seconds: 5
        instance-id: send-8801
        prefer-ip-address: true

    主启动

    @SpringBootApplication
    public class StreamMQMain8801 {
        public static void main(String[] args) {
            SpringApplication.run(StreamMQMain8801.class,args);
        }
    }
    

    service

    public interface IMessageProvider {
        public String send();
    }
    

    实现类

    package com.ljw.springcloudstudy.service.impl;
    
    import com.ljw.springcloudstudy.service.IMessageProvider;
    import org.springframework.cloud.stream.annotation.EnableBinding;
    import org.springframework.cloud.stream.messaging.Source;
    import org.springframework.messaging.MessageChannel;
    import org.springframework.messaging.support.MessageBuilder;
    
    import javax.annotation.Resource;
    import java.util.UUID;
    
    
    @EnableBinding(Source.class)//定义消息的推送管道
    public class IMessageProviderImpl implements IMessageProvider {
        //指的是发送者发给binder
        @Resource
        private MessageChannel output;
    
        @Override
        public String send() {
            UUID serial = UUID.randomUUID();
            output.send(MessageBuilder.withPayload(serial).build());
            System.out.println("******serial:" + serial);
            return null;
        }
    }
    

    controller

    @RestController
    public class SendMessageController {
    
        @Resource
        private IMessageProvider iMessageProvider;
    
        @GetMapping("/message")
        public String sendMessage(){
            return iMessageProvider.send();
        }
    }
    

    启动后,会根据配置文件注册一个交换机


消息驱动的消费者:

    导入POM依赖

            
                org.springframework.cloud
                spring-cloud-starter-stream-rabbit
            

    写YML

    server:
      port: 8802
    
    spring:
      application:
        name: cloud-stream-provider
      cloud:
        stream:
          #配置绑定的mq信息
          binders:
            defaultRabbit:
              # 消息组件类型
              type: rabbit
              #设置rabbitmq的相关环境配置
              environment:
                spring:
                  rabbitmq:
                    host: 192.168.0.6
                    port: 5672
                    username: 
                    password: 
          #                服务的整合处理
          bindings:
            input:
              #          要使用的exchange名称
              destination: studyExchange
              #          设置消息类型
              content-type: application/json
              #          设置要绑定的消息服务的具体设置
              binder: defaultRabbit
    
    eureka:
      client:
        fetch-registry: true
        register-with-eureka: true
        service-url:
          defaultZone: http://eureka7001:7001/eureka
      instance:
        #    设置心跳的间隔时间
        lease-renewal-interval-in-seconds: 2
        #
        lease-expiration-duration-in-seconds: 5
        instance-id: receive-8802
        prefer-ip-address: true

    主启动

    @SpringBootApplication
    public class StreamMQMain8802 {
        public static void main(String[] args) {
            SpringApplication.run(StreamMQMain8802.class, args);
        }
    }
    

    controller

    package com.ljw.springcloudstudy.controller;
    
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.cloud.stream.annotation.EnableBinding;
    import org.springframework.cloud.stream.annotation.StreamListener;
    import org.springframework.cloud.stream.messaging.Sink;
    import org.springframework.messaging.Message;
    import org.springframework.web.bind.annotation.RestController;
    
    @RestController
    @EnableBinding(Sink.class)
    public class ReceiveMessageListener {
        @Value("${server.port}")
        private String serverPort;
    
        @StreamListener(Sink.INPUT)
        public void input(Message message) {
            System.out.println("消费者一号,-----收到的消息:" + message.getPayload() + serverPort);
        }
    }
    

    那么,有多个消费者的时候,会产生重复消费的问题,假设现在还有一个消费者8803;


解决重复消费的问题:
    消费者在同一组会产生竞争关系,只有一个可以消费,在不同组会发生重复消费通过分组group来解决这个问题,修改两个消费者的配置文件,设置为同一组:
          bindings:
            input:
              #          要使用的exchange名称
              destination: studyExchange
              #          设置消息类型
              content-type: application/json
              #          设置要绑定的消息服务的具体设置
              binder: defaultRabbit
              group: ljw1


消息持久化:
    设置了group消息就会持久化,避免了消息丢失的问题。
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/777977.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号