栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

SpringBoot整合RabbitMQ 5种模式的注解绑定

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

SpringBoot整合RabbitMQ 5种模式的注解绑定

1、导入依赖


    org.springframework.boot
    spring-boot-starter-amqp



 
     com.fasterxml.jackson.dataformat
     jackson-dataformat-xml

2、配置连接信息
spring:
	rabbitmq:
        host: localhost
        port: 5672
        username: root
        password: root
        virtual-host: /
       # listener:
     	#	 simple:
       	#	 prefetch: 1    # 工作队列能者多劳模式
3、5种使用模式 1、HelloWorld模式

一个队列一个消费者

// 消息发布
@Test
void testHelloWorldMode(){
    rabbitTemplate.convertAndSend("helloworld_queue","hello world!");
}

// 消息订阅
@RabbitListener(queuesToDeclare = {@Queue(name = "helloworld_queue")})
public void helloWorldC1(String msg){
    System.out.println("helloWorldC1:-------->"+msg);
}

2、Work模式 按均分配

一个队列,多个消费者

注意:一条消息只能被消费一次,默认是按均分配,在消费者开始消费之前队列中的消息就已经分配好了

往队列中放入50条消息

@Test
void testWorkMode(){
    String msg = "work mode";
    for (int i = 1; i <= 50; i++) {
        rabbitTemplate.convertAndSend("work_zs_queue","work mode"+i);
    }
}

创建两个消费者,并且设置20ms 和 200ms 的延迟

@RabbitListener(queuesToDeclare = {@Queue(name = "work_zs_queue")})
  @SneakyThrows
  public void workc1(String msg){
    Thread.sleep(20);
    System.out.println("workc1:---------------->"+msg);
  }

  @RabbitListener(queuesToDeclare = {@Queue(name="work_zs_queue")})
  @SneakyThrows
  public void workc2(String msg){
    Thread.sleep(200);
    System.err.println("workc2:------------------>"+msg);
  }

运行结果:

可以发现workc1比workc2提前完成消费任务,并且c1 c2是按照奇偶数顺序消费的任务,这也进一步验证了在消费开始前就已经分配好了任务

这种按均分配的效果效率低下,我们应该遵循能者多劳的方式去分配任务

能者多劳

修改配置文件,让消费者一次只能接收一个任务,当前任务消费完才可以接收下一个任务

spring:
  rabbitmq:
    host: 192.168.137.7
    port: 5672
    username: root
    password: root
    virtual-host: /
    listener:   # 消息确认机制
      simple:
        prefetch: 1

重新启动运行代码

3、Fanout模式

fanout模式也叫广播模式,每一条消息多可以被绑定在同一个交换机上的所有队列的消费者消费

参数1:交换机:fanout_exchange

参数2:routingkey 在fanout模式不使用,会在direct和topic模式使用

参数3:发送的消息

@Test
  void testFanoutMode(){
    rabbitTemplate.convertAndSend("fanout_exchange","","fanout mode 1");
    rabbitTemplate.convertAndSend("fanout_exchange","","fanout mode 2");
  }

定义消费者

使用@RabbitListener注解中的bindings声明并绑定交换机和队列

@RabbitListener(bindings = @QueueBinding(
          value = @Queue(name = "fanout_queue1"),
          exchange = @Exchange(name = "fanout_exchange2",type = ExchangeTypes.FANOUT)
  ))
  public void fanoutc1(String msg){
    System.out.println("fanoutc1:------------------>"+msg);
  }

  @RabbitListener(bindings=@QueueBinding(
          value = @Queue(name = "fanout_queue2"),
          exchange = @Exchange(name = "fanout_exchange2",type = ExchangeTypes.FANOUT)
  ))
  public void fanoutc2(String msg){
    System.out.println("fanoutc1:------------------>"+msg);
  }

运行结果:

每一条消息都会被所有消费者消费

4、direct模式

direct模式与fanout模式的区别在于,队列都是绑定同一个交换机,但是在队列上会添加routingkey标识

消费者会根据不同的表示去消费对应队列中的消息

@RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "direct_queue1"),
        exchange = @Exchange(name = "direct_zs_exchange",type = ExchangeTypes.DIRECT),
        key = "zs_news1"
))
public void direct1(String msg){
  System.out.println("direct_queuq1:---------------------->"+msg);
}

@RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "direct_queue2"),
        exchange = @Exchange(name = "direct_zs_exchange",type = ExchangeTypes.DIRECT),
        key = "zs_news2"
))
public void direct2(String msg){
  System.out.println("direct_queuq2:---------------------->"+msg);
}

@Test
void testDirectMode(){
  rabbitTemplate.convertAndSend("direct_zs_exchange","zs_news1","direct mode1");
  rabbitTemplate.convertAndSend("direct_zs_exchange","zs_news2","direct mode2");
}

5、topic模式
  • Topic交换机接收的消息RoutingKey必须是多个单词,以 **.** 分割
  • Topic交换机与队列绑定时的bindingKey可以指定通配符
  • #:代表0个或多个词
  • *:代表1个词
@RabbitListener(bindings = @QueueBinding(
          value = @Queue(name = "topic_queue1"),
          exchange = @Exchange(name = "topic_zs_exchange",type = ExchangeTypes.TOPIC),
          key = "zs_new.#"
  ))
  public void topic1(String msg){
    User user = JSONUtil.toBean(msg, User.class);
    System.out.println(user.toString());
  }

  @RabbitListener(bindings = @QueueBinding(
          value = @Queue(name = "topic_queue1"),
          exchange = @Exchange(name = "topic_zs_exchange",type = ExchangeTypes.TOPIC),
          key = "#.zs_new"
  ))
  public void topic2(String msg){
    User user = JSONUtil.toBean(msg, User.class);
    System.out.println(user.toString());
  }

@Test
void testTopicMode(){
  String jsonStr1 = JSONUtil.toJsonStr(new User("小爽", 22));
  rabbitTemplate.convertAndSend("topic_zs_exchange","zs_new.user",jsonStr1);

  String jsonStr2 = JSONUtil.toJsonStr(new User("路飞", 17));
  rabbitTemplate.convertAndSend("topic_zs_exchange","lufei.zs_new",jsonStr2);
}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/854395.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号