栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

Spring Cloud Stream的配置及使用——以RabbitMQ为例

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Spring Cloud Stream的配置及使用——以RabbitMQ为例

1. 简介

https://docs.spring.io/spring-cloud-stream-binder-rabbit/docs/current/reference/html/spring-cloud-stream-binder-rabbit.html
英语好的可以直接看官方文档,文档里讲的更全面

By default, the RabbitMQ Binder implementation maps each destination to a TopicExchange. For each consumer group, a Queue is bound to that TopicExchange.

上图是RabbitMQ Binder(绑定器)。默认情况下,绑定器实现将每一个destination映射到一个TopicExchange。对于每一个消费者组,都有一个队列绑定到那个TopicExchange。

2. 依赖配置

    org.springframework.cloud
    spring-cloud-dependencies
    ${spring-cloud.version}
    pom
    import


    org.springframework.cloud
    spring-cloud-starter-stream-rabbit

3. 生产者配置及消息发送 3.1 yaml配置
spring:
  cloud:
    stream:
      #      如果有一个binder的话,就不需要设置
      default-binder: rabbit
      binders:
        rabbit1:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                host: 192.168.70.224
                port: 5672
                username: admin
                password: 444944
                virtual-host: GHost
        rabbit:
          type: rabbit
          defaultCandidate: false
          environment:
            spring:
              rabbitmq:
                host: 192.168.70.167
                port: 5672
                username: admin
                password: public
                virtual-host: /
      bindings:
        order:
          binder: rabbit
          destination: order
          producer:
            #            默认是true
            autoStartup: true
        cart:
          binder: rabbit
          destination: cart
          routingKeyexpression: han
          producer:
            #            默认是true
            autoStartup: true
# rabbitmq发送的消息默认routingKey为destination, 如果是分区的destination, 默认值为destination-

配置说明:

在Spring Cloud Stream中可以配置多个binder,也就是可以配置连接多个MQ服务器在RabbitMQ中,binding的名称对应的是output和input的名称,destination对应mq中的exchange名称。上述配置会生成order,cart两个exchangerabbitmq发送的消息默认routingKey为destination, 如果是分区的destination, 默认值为destination-,分区内容这里不涉及,具体内容看官方文档 3.2 生产者声明

public interface CartSource {
    
    String OUTPUT = "cart";

    
    @Output(CartSource.OUTPUT)
    MessageChannel output();
}

cloud stream中output表示发送,这里的cart与配置中的binding名称是对应的

//必须添加,要不然无法注入。也可以加在启动类上,可以重复添加
@EnableBinding(CartSource.class)
@Component
public class CartSender {

    @Autowired
    private CartSource orderSource;

    private static final Logger logger= LoggerFactory.getLogger(CartSender.class);

    public void pushMsg(Order order){
        logger.info("sending rabbitmq message:{}",order.toString());
        orderSource.output().send(MessageBuilder.withPayload(order).build());
    }
3.3 消息发送
@RestController
@RequestMapping("mqTest1")
public class MqTest1Controller {
    @Autowired
    CartSender cartSender;
    @GetMapping("streamPush")
    public String streamPush(){
        cartSender.pushMsg(new Order());
        return "hehe";
    }
}
4. 消费者配置 4.1 yaml配置
spring:
  cloud:
    stream:
      #      如果有一个binder的话,就不需要设置
      default-binder: rabbit
      binders:
        rabbit1:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                host: 192.168.70.224
                port: 5672
                username: admin
                password: 444944
                virtual-host: /
        rabbit:
          type: rabbit
          defaultCandidate: false
          environment:
            spring:
              rabbitmq:
                host: 192.168.70.167
                port: 5672
                username: admin
                password: public
                virtual-host: /
      bindings:
        order:
          binder: rabbit
          destination: order
          group: myOrderQueue
          conumer:
            concurrency: 3
        cart:
          binder: rabbit
          destination: cart
          group: myCartQueue
          conumer:
            concurrency: 3
      # rabbit的扩展配置 RabbitExtendedBindingProperties
      rabbit:
        bindings:
#          order:
#            consumer:
#              bindingRoutingKey: order-key
          cart:
            consumer:
              #          如果没有指定routing key,会使用默认的 #
              bindingRoutingKey: cart-key

配置说明:

只有消费者才会创建队列,queue名称为:.routingKey设置
spring.cloud.stream.rabbit.bindings..consumer.bindingRoutingKey=myRoutingKey
如果没有设置的话,默认为#

4.2 消费者声明与消息接收
public interface CartSink {

    String INPUT = "cart";

    
    @Input(CartSink.INPUT)
    SubscribableChannel input();
}

cloud stream中input表示接收,这里的cart与配置中的binding名称是对应的

@EnableBinding(CartSink.class)
public class CartHandler {
    private static final Logger logger = LoggerFactory.getLogger(CartHandler.class);
    
    @StreamListener(CartSink.INPUT)
    public void loggerSink(@Headers MessageHeaders headers, byte[] payload){
        String cartChange=new String(payload);
        logger.info("cart change:{}",cartChange);
    }
}
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/762988.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号