栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

2021-09-28

2021-09-28

Spring Boot 消息队列 RabbitMQ 入门 1.概述

RabbitMQ 是 LShift 提供的一个 Advanced Message Queuing Protocol (AMQP) 的开源实现,由以高性能、健壮以及可伸缩性出名的 Erlang 写成,因此也是继承了这些优点。

AMQP 里主要要说两个组件:Exchange 和 Queue (在 AMQP 1.0 里还会有变动),如下图所示,绿色的 X 就是 Exchange ,红色的是 Queue ,这两者都在 Server 端,又称作 Broker ,这部分是 RabbitMQ 实现的,而蓝色的则是客户端,通常有 Producer 和 Consumer 两种类型:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-wXTatg8m-1632816085117)(C:UsersAdministratorAppDataRoamingTyporatypora-user-imagesimage-20210807153859772.png)]

2. Spring-AMQP

在 Spring 生态中,提供了 Spring-AMQP 项目,让我们更简便的使用 AMQP 。其官网介绍如下:

The Spring AMQP project applies core Spring concepts to the development of AMQP-based messaging solutions.
Spring-AMQP 项目将 Spring 核心概念应用于基于 AMQP 的消息传递解决方案的开发。

It provides a “template” as a high-level abstraction for sending and receiving messages.
它提供了一个“模板”作为发送消息的高级抽象。

It also provides support for Message-driven POJOs with a “listener container”.
它还通过“侦听器容器”为消息驱动的 POJO 提供支持。

These libraries facilitate management of AMQP resources while promoting the use of dependency injection and declarative configuration.
这些库促进 AMQP 资源的管理,同时促进使用依赖注入和声明性配置。

In all of these cases, you will see similarities to the JMS support in the Spring framework.
在所有这些情况下,您将看到与 Spring 框架中的 JMS 支持的相似之处。

The project consists of two parts; spring-amqp is the base abstraction, and spring-rabbit is the RabbitMQ implementation.
该项目包括两个部分:

  • spring-amqp 是 AMQP 的基础抽象。
  • spring-rabbit 是基于 RabbitMQ 对 AMQP 的具体实现。

Features(功能特性):

  • Listener container for asynchronous processing of inbound messages
    监听器容器:异步处理接收到的消息
  • RabbitTemplate for sending and receiving messages
    RabbitTemplate:发送和接收消息
  • RabbitAdmin for automatically declaring queues, exchanges and bindings
    RabbitAdmin:自动创建队列,交换器,绑定器。

在 Spring-Boot 项目中,提供了 AMQP 和 RabbitMQ 的自动化配置,所以我们仅需引入 spring-boot-starter-amqp 依赖,即可愉快的使用。

3.快速入门

在 AMQP 中,Producer 将消息发送到 Exchange ,再由 Exchange 将消息路由到一个或多个 Queue 中(或者丢弃)。

Exchange 根据 Routing Key 和 Binding Key 将消息路由到 Queue 。目前提供了 Direct、Topic、Fanout、Headers 四种类型。

[《RabbitMQ 基础概念详解》](E:教学springBootSpringBoot-Labslab-04-rabbitmqRabbitMQ 基础概念详解.md)

3.1 Direct Exchange

Direct 类型的 Exchange 路由规则比较简单,它会把消息路由到那些 binding key 与 routing key 完全匹配的 Queue 中。以下图的配置为例:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-NZ8NyQ18-1632816085121)(C:UsersAdministratorAppDataRoamingTyporatypora-user-imagesimage-20210807165218425.png)]

  • 我们以 routingKey="error" 发送消息到 Exchange ,则消息会路由到 Queue1(amqp.gen-S9b…) 。
  • 我们以 routingKey="info" 或 routingKey="warning" 来发送消息,则消息只会路由到 Queue2(amqp.gen-Agl…) 。
  • 如果我们以其它 routingKey 发送消息,则消息不会路由到这两个 Queue 中。
  • 总结来说,指定 Exchange + routing key ,有且仅会路由到至多一个 Queue 中。 极端情况下,如果没有匹配,消息就发送到“空气”中,不会进入任何 Queue 中。

注:Queue 名字 amqp.gen-S9b… 和 amqp.gen-Agl… 自动生成的。

下面,我们来创建一个 Direct Exchange 的使用示例,对应 rabbitmq-demo 项目。

3.1.1 引入依赖

在pom.xml 文件中,引入相关依赖



    
        org.springframework.boot
        spring-boot-starter-parent
        2.2.1.RELEASE
         
    
    4.0.0

    lab-04-rabbitmq-demo

    
        
        
            org.springframework.boot
            spring-boot-starter-amqp
        

        
        
            org.springframework.boot
            spring-boot-starter-test
            test
        
    


3.1.2 应用配置文件

在 resources目录下,创建 application.yaml 配置文件。配置如下:

spring:
  # RabbitMQ 配置项,对应 RabbitProperties 配置类
  rabbitmq:
    host: 127.0.0.1 # RabbitMQ 服务的地址
    port: 5672 # RabbitMQ 服务的端口
    username: guest # RabbitMQ 服务的账号
    password: guest # RabbitMQ 服务的密码
  • 在 spring.rabbitmq 配置项,设置 RabbitMQ 的配置,对应 RabbitProperties 配置类。这里咱暂时最小化添加,更多的配置项,我们在下文的示例中,一点点抽丝剥茧。
  • Spring Boot 提供的 RabbitAutoConfiguration 自动化配置类,实现 RabbitMQ 的自动配置,创建相应的 Producer 和 Consumer 。
3.1.3 Application

创建 Application.java类,配置 @SpringBootApplication 注解即可。代码如下:

@SpringBootApplication
@EnableAsync // 开启异步
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

}

我们额外添加了 @EnableAsync 注解,因为我们稍后要使用 Spring 提供的异步调用的功能。

3.1.4 Demo01Message

创建 Demo01Message消息类,提供给当前示例使用。代码如下:

public class Demo01Message implements Serializable {

    public static final String QUEUE = "QUEUE_DEMO_01";

    public static final String EXCHANGE = "EXCHANGE_DEMO_01";

    public static final String ROUTING_KEY = "ROUTING_KEY_01";

    
    private Integer id;

    public Demo01Message setId(Integer id) {
        this.id = id;
        return this;
    }

    public Integer getId() {
        return id;
    }

    @Override
    public String toString() {
        return "Demo01Message{" +
                "id=" + id +
                '}';
    }

}

  • 注意,要实现 Java Serializable 序列化接口。因为 RabbitTemplate 默认使用 Java 自带的序列化方式,进行序列化 POJO 类型的消息。

  • 在消息类里,我们枚举了 Exchange、Queue、RoutingKey 的名字。

3.1.5 RabbitConfig

创建 RabbitConfig配置类,添加 Direct Exchange 示例相关的 Exchange、Queue、Binding 的配置。代码如下:

@Configuration
public class RabbitConfig {

    
    public static class DirectExchangeDemoConfiguration {

        // 创建 Queue
        @Bean
        public Queue demo01Queue() {
            return new Queue(Demo01Message.QUEUE, // Queue 名字
                    true, // durable: 是否持久化
                    false, // exclusive: 是否排它
                    false); // autoDelete: 是否自动删除
        }

        // 创建 Direct Exchange
        @Bean
        public DirectExchange demo01Exchange() {
            return new DirectExchange(Demo01Message.EXCHANGE,
                    true,  // durable: 是否持久化
                    false);  // exclusive: 是否排它
        }

        // 创建 Binding
        // Exchange:Demo01Message.EXCHANGE
        // Routing key:Demo01Message.ROUTING_KEY
        // Queue:Demo01Message.QUEUE
        @Bean
        public Binding demo01Binding() {
            return BindingBuilder.bind(demo01Queue()).to(demo01Exchange()).with(Demo01Message.ROUTING_KEY);
        }

    }
}

在 DirectExchangeDemoConfiguration 内部静态类中,我们创建了 Exchange、Queue、Binding 三个 Bean ,后续 RabbitAdmin会自动创建交换器、队列、绑定器。

3.1.6 Demo01Producer

创建 Demo01Producer类,它会使用 Spring-AMQP 封装提供的 RabbitTemplate ,实现发送消息。代码如下:

@Component
public class Demo01Producer {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void syncSend(Integer id) {
        // 创建 Demo01Message 消息
        Demo01Message message = new Demo01Message();
        message.setId(id);
        // 同步发送消息
        rabbitTemplate.convertAndSend(Demo01Message.EXCHANGE, Demo01Message.ROUTING_KEY, message);
    }

    public void syncSendDefault(Integer id) {
        // 创建 Demo01Message 消息
        Demo01Message message = new Demo01Message();
        message.setId(id);
        // 同步发送消息
        rabbitTemplate.convertAndSend(Demo01Message.QUEUE, message);
    }

    @Async
    public ListenableFuture asyncSend(Integer id) {
        try {
            // 发送消息
            this.syncSend(id);
            // 返回成功的 Future
            return AsyncResult.forValue(null);
        } catch (Throwable ex) {
            // 返回异常的 Future
            return AsyncResult.forExecutionException(ex);
        }
    }

}

  • RabbitTemplate 是 AmqpTemplate 接口的实现类,所以此时使用 AmqpTemplate 亦可。不过又因为 RabbitTemplate 还实现了其它接口,所以操作会更为丰富。因此,这里我们选择了注入 RabbitTemplate 属性。
  • #syncSend(Integer id) 方法,调用 RabbitTemplate 的同步发送消息方法。方法定义如下:
void convertAndSend(String exchange, String routingKey, Object message) throws AmqpException;

​ 指定 Exchange + RoutingKey ,从而路由到一个 Queue 中。

  • #syncSendDefault(Integer id) 方法,也调用 RabbitTemplate 的同步发送消息方法。方法定义如下
// AmqpTemplate.java

void convertAndSend(String routingKey, Object message) throws AmqpException;

是不是觉得有点奇怪,这里我们传入的 RoutingKey 为队列名?!因为 RabbitMQ 有一条默认的 Exchange: (AMQP default) 规则:The default exchange is implicitly bound to every queue, with a routing key equal to the queue name. It is not possible to explicitly bind to, or unbind from the default exchange. It also cannot be deleted 。

翻译过来的意思:默认交换器,隐式地绑定到每个队列,路由键等于队列名称。

所以,此处即使我们传入的 RoutingKey 为队列名,一样可以发到对应队列。

  • #asyncSend(Integer id) 方法,通过 @Async 注解,声明异步调用该方法,从而实现异步消息到 RabbitMQ 中。因为 RabbitTemplate 并未像 KafkaTemplate 或 RocketMQTemplate 直接提供了异步发送消息的方法,所以我们需要结合 Spring 异步调用来实现。
3.1.7 Demo01Consumer

创建 Demo01Consumer 类,消费消息。代码如下:

@Component@RabbitListener(queues = Demo01Message.QUEUE)public class Demo01Consumer {    private Logger logger = LoggerFactory.getLogger(getClass());    @RabbitHandler    public void onMessage(Demo01Message message) {        logger.info("[onMessage][线程编号:{} 消息内容:{}]", Thread.currentThread().getId(), message);    }//    @RabbitHandler(isDefault = true)//    public void onMessage(org.springframework.amqp.core.Message message) {//        logger.info("[onMessage][线程编号:{} 消息内容:{}]", Thread.currentThread().getId(), message);//    }}
  • 在类上,添加了 @RabbitListener 注解,声明了消费的队列是 "QUEUE_DEMO_01" 。
  • 在方法上,添加了 @RabbitHandler注解,申明了处理消息的方法。同时,方法入参为消息的类型。这里,我们设置了 3.1.4 Demo01Message。
  • 如果我们想要获得消费消息的更多信息,例如说,RoutingKey、创建时间等等信息,则可以注释掉的那段代码,通过方法入参为 org.springframework.amqp.core.Message 类型。不过绝大多数情况下,我们并不需要这么做。
3.1.8 简单测试
// Demo01ProducerTest.java@RunWith(SpringRunner.class)@SpringBootTest(classes = Application.class)public class Demo01ProducerTest {    private Logger logger = LoggerFactory.getLogger(getClass());    @Autowired    private Demo01Producer producer;    @Test    public void testSyncSend() throws InterruptedException {        int id = (int) (System.currentTimeMillis() / 1000);        producer.syncSend(id);        logger.info("[testSyncSend][发送编号:[{}] 发送成功]", id);        // 阻塞等待,保证消费        new CountDownLatch(1).await();    }    @Test    public void tesSyncSendDefault() throws InterruptedException {        int id = (int) (System.currentTimeMillis() / 1000);        producer.syncSendDefault(id);        logger.info("[tesSyncSendDefault][发送编号:[{}] 发送成功]", id);        // 阻塞等待,保证消费        new CountDownLatch(1).await();    }    @Test    public void testAsyncSend() throws InterruptedException {        int id = (int) (System.currentTimeMillis() / 1000);        producer.asyncSend(id).addCallback(new ListenableFutureCallback() {            @Override            public void onFailure(Throwable e) {                logger.info("[testASyncSend][发送编号:[{}] 发送异常]]", id, e);            }            @Override            public void onSuccess(Void aVoid) {                logger.info("[testASyncSend][发送编号:[{}] 发送成功,发送成功]", id);            }        });        logger.info("[testASyncSend][发送编号:[{}] 调用完成]", id);        // 阻塞等待,保证消费        new CountDownLatch(1).await();    }}
3.2 Topic Exchange

前面讲到 Direct Exchange路由规则,是完全匹配 binding key 与routing key,但这种严格的匹配方式在很多情况下不能满足实际业务需求。

Topic Exchange 在匹配规则上进行了扩展,它与 Direct 类型的Exchange 相似,也是将消息路由到 binding key 与 routing key 相匹配的 Queue 中,但这里的匹配规则有些不同,它约定:

  • routing key 为一个句点号 "." 分隔的字符串。我们将被句点号"."分隔开的每一段独立的字符串称为一个单词,例如 “stock.usd.nyse”、”nyse.vmw”、”quick.orange.rabbit”
  • binding key 与 routing key 一样也是句点号 "." 分隔的字符串。
  • binding key 中可以存在两种特殊字符 "*" 与 "#",用于做模糊匹配。其中 "*" 用于匹配一个单词,"#" 用于匹配多个单词(可以是零个)。

以下图中的配置为例:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-WUiJ9hOh-1632816085124)(E:教学springBootSpringBoot-Labslab-04-rabbitmqSpring Boot 消息队列 RabbitMQ 入门.assetsimage-20210808092541042.png)]

  • routingKey="quick.orange.rabbit" 的消息会同时路由到 Q1 与 Q2 。
  • routingKey="lazy.orange.fox" 的消息会路由到 Q1 。
  • routingKey="lazy.brown.fox" 的消息会路由到 Q2 。
  • routingKey="lazy.pink.rabbit" 的消息会路由到Q2(只会投递给 Q2 一次,虽然这个 routingKey 与 Q2 的两个 bindingKey 都匹配)。
  • routingKey="quick.brown.fox"、routingKey="orange"、routingKey="quick.orange.male.rabbit" 的消息将会被丢弃,因为它们没有匹配任何 bindingKey 。

下面,我们来创建一个 Topic Exchange 的使用示例。

3.2.1Demo02Message

创建 Demo02Message消息类,提供给当前示例使用。代码如下:

public class Demo02Message implements Serializable {    public static final String QUEUE = "QUEUE_DEMO_02";    public static final String EXCHANGE = "EXCHANGE_DEMO_02";    public static final String ROUTING_KEY = "#.yu.nai";        private Integer id;    public Demo02Message setId(Integer id) {        this.id = id;        return this;    }    public Integer getId() {        return id;    }    @Override    public String toString() {        return "Demo02Message{" +                "id=" + id +                '}';    }
  • 在消息类里,我们枚举了 Exchange、Queue、RoutingKey 的名字。
  • 重点看我们新定义的路由键 ROUTING_KEY = "#.yu.nai" ,我们要匹配以 "yu.nai" 结尾,开头可以是任意个单词的。
3.2.2RabbitConfig

修改 RabbitConfig 配置类,添加 Topic Exchange 示例相关的 Exchange、Queue、Binding 的配置。

    public static class TopicExchangeDemoConfiguration {        // 创建 Queue        @Bean        public Queue demo02Queue() {            return new Queue(Demo02Message.QUEUE, // Queue 名字                    true, // durable: 是否持久化                    false, // exclusive: 是否排它                    false); // autoDelete: 是否自动删除        }        // 创建 Topic Exchange        @Bean        public TopicExchange demo02Exchange() {            return new TopicExchange(Demo02Message.EXCHANGE,                    true,  // durable: 是否持久化                    false);  // exclusive: 是否排它        }        // 创建 Binding        // Exchange:Demo02Message.EXCHANGE        // Routing key:Demo02Message.ROUTING_KEY        // Queue:Demo02Message.QUEUE        @Bean        public Binding demo02Binding() {            return BindingBuilder.bind(demo02Queue()).to(demo02Exchange()).with(Demo02Message.ROUTING_KEY);        }    }

在 TopicExchangeDemoConfiguration 内部静态类中,我们也是创建了 Exchange、Queue、Binding 三个 Bean 。有差异点的是,这次我们创建的是 TopicExchange 。

3.2.3 Demo02Producer

创建Demo02Producer 类 (消息生产者),它会使用 Spring-AMQP 封装提供的 RabbitTemplate ,实现发送消息。

@Componentpublic class Demo02Producer {    @Autowired    private RabbitTemplate rabbitTemplate;    public void syncSend(Integer id, String routingKey) {        // 创建 Demo02Message 消息        Demo02Message message = new Demo02Message();        message.setId(id);        // 同步发送消息        rabbitTemplate.convertAndSend(Demo02Message.EXCHANGE, routingKey, message);    }}

和3.1.6 Demo01Producer的 #syncSend(Integer id) 方法大体相似,差异点在于新增了方法参数 routingKey ,方便我们传入不同的路由键。

3.2.4 Demo02Consumer

创建 Demo02Consumer类,消费消息。

@Component@RabbitListener(queues = Demo02Message.QUEUE)public class Demo02Consumer {    private Logger logger = LoggerFactory.getLogger(getClass());    @RabbitHandler    public void onMessage(Demo02Message message) {        logger.info("[onMessage][线程编号:{} 消息内容:{}]", Thread.currentThread().getId(), message);    }}

和3.1.7 Demo01Consumer基本一致,差别在于消费的队列是 "QUEUE_DEMO_02"

3.2.5 简单测试
@RunWith(SpringRunner.class)@SpringBootTest(classes = Application.class)public class Demo02ProducerTest {    private Logger logger = LoggerFactory.getLogger(getClass());    @Autowired    private Demo02Producer producer;    @Test    public void testSyncSendSuccess() throws InterruptedException {        int id = (int) (System.currentTimeMillis() / 1000);        producer.syncSend(id, "da.yu.nai");        logger.info("[testSyncSend][发送编号:[{}] 发送成功]", id);        // 阻塞等待,保证消费        new CountDownLatch(1).await();    }    @Test    public void testSyncSendFailure() throws InterruptedException {        int id = (int) (System.currentTimeMillis() / 1000);        producer.syncSend(id, "yu.nai.shuai");        logger.info("[testSyncSend][发送编号:[{}] 发送成功]", id);        // 阻塞等待,保证消费        new CountDownLatch(1).await();    }}
  • #testSyncSendSuccess() 方法,发送消息的 RoutingKey 是 "da.yu.nai" ,可以匹配到 "DEMO_QUEUE_02" 。
  • #testSyncSendFailure() 方法,发送消息的 RoutingKey 是 "yu.nai.shuai" ,无法匹配到 "DEMO_QUEUE_02" 。
3.3 Fanout

Fanout Exchange 路由规则非常简单,它会把所有发送到该 Exchange 的消息路由到所有与它绑定的 Queue 中。如下图:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-oe5FmfkZ-1632816085126)(E:教学springBootSpringBoot-Labslab-04-rabbitmqSpring Boot 消息队列 RabbitMQ 入门.assetsimage-20210808094431400.png)]

  • 生产者(P)发送到 Exchange(X)的所有消息都会路由到图中的两个 Queue,并最终被两个消费者(C1 与 C2)消费。
  • 总结来说,指定 Exchange ,会路由到多个绑定的 Queue 中。
3.3.1 Demo03Message

创建 Demo03Message 消息类

public class Demo03Message implements Serializable {    public static final String QUEUE_A = "QUEUE_DEMO_03_A";    public static final String QUEUE_B = "QUEUE_DEMO_03_B";    public static final String EXCHANGE = "EXCHANGE_DEMO_03";        private Integer id;    public Demo03Message setId(Integer id) {        this.id = id;        return this;    }    public Integer getId() {        return id;    }    @Override    public String toString() {        return "Demo03Message{" +                "id=" + id +                '}';    }}
  • 我们未定义 RoutingKey 的名字。因为,Fanout Exchange 仅需要 Exchange 即可。
  • 我们定义两个 Queue 的名字。因为,我们要测试 Fanout Exchange 投递到多个 Queue 的效果。
3.3.2 RabbitConfig

添加 Fanout Exchange 示例相关的 Exchange、Queue、Binding 的配置。

      public static class FanoutExchangeDemoConfiguration {        // 创建 Queue A        @Bean        public Queue demo03QueueA() {            return new Queue(Demo03Message.QUEUE_A, // Queue 名字                    true, // durable: 是否持久化                    false, // exclusive: 是否排它                    false); // autoDelete: 是否自动删除        }        // 创建 Queue B        @Bean        public Queue demo03QueueB() {            return new Queue(Demo03Message.QUEUE_B, // Queue 名字                    true, // durable: 是否持久化                    false, // exclusive: 是否排它                    false); // autoDelete: 是否自动删除        }        // 创建 Fanout Exchange        @Bean        public FanoutExchange demo03Exchange() {            return new FanoutExchange(Demo03Message.EXCHANGE,                    true,  // durable: 是否持久化                    false);  // exclusive: 是否排它        }        // 创建 Binding A        // Exchange:Demo03Message.EXCHANGE        // Queue:Demo03Message.QUEUE_A        @Bean        public Binding demo03BindingA() {            return BindingBuilder.bind(demo03QueueA()).to(demo03Exchange());        }        // 创建 Binding B        // Exchange:Demo03Message.EXCHANGE        // Queue:Demo03Message.QUEUE_B        @Bean        public Binding demo03BindingB() {            return BindingBuilder.bind(demo03QueueB()).to(demo03Exchange());        }    }
  • 在 FanoutExchangeDemoConfiguration 内部静态类中,我们也是创建了 Exchange、Queue、Binding 三个 Bean 。有差异点的是,这次我们创建的是 FanoutExchange 。
  • 同时,因为我们要投递到两个 Queue 中,所以我们创建了两个 Binding 。
3.3.3Demo03Producer

创建Demo03Producer

// Demo03Producer.java@Componentpublic class Demo03Producer {    @Autowired    private RabbitTemplate rabbitTemplate;    public void syncSend(Integer id) {        // 创建 Demo03Message 消息        Demo03Message message = new Demo03Message();        message.setId(id);        // 同步发送消息        rabbitTemplate.convertAndSend(Demo03Message.EXCHANGE, null, message);    }}
  • 和3.1.6 Demo01Producer的 #syncSend(Integer id) 方法大体相似,差异点在于传入 routingKey = null ,因为不需要。Fanout 是不需要 routingKey 的
3.3.4 Demo03Consumer

我们要创建两个消费之 分别消费 A 和 B的消息

// Demo03ConsumerA.java@Component@RabbitListener(queues = Demo03Message.QUEUE_A)public class Demo03ConsumerA {    private Logger logger = LoggerFactory.getLogger(getClass());    @RabbitHandler    public void onMessage(Demo03Message message) {        logger.info("[onMessage][线程编号:{} 消息内容:{}]", Thread.currentThread().getId(), message);    }}// Demo03ConsumerB.java@Component@RabbitListener(queues = Demo03Message.QUEUE_B)public class Demo03ConsumerB {    private Logger logger = LoggerFactory.getLogger(getClass());    @RabbitHandler    public void onMessage(Demo03Message message) {        logger.info("[onMessage][线程编号:{} 消息内容:{}]", Thread.currentThread().getId(), message);    }}
3.3.5 简单测试
// Demo03ProducerTest.java@RunWith(SpringRunner.class)@SpringBootTest(classes = Application.class)public class Demo03ProducerTest {    private Logger logger = LoggerFactory.getLogger(getClass());    @Autowired    private Demo03Producer producer;    @Test    public void testSyncSend() throws InterruptedException {        int id = (int) (System.currentTimeMillis() / 1000);        producer.syncSend(id);        logger.info("[testSyncSend][发送编号:[{}] 发送成功]", id);        // 阻塞等待,保证消费        new CountDownLatch(1).await();    }}
3.4 HeadersExchange

Headers Exchange 不依赖于 routing key 与 binding key 的匹配规则来路由消息,而是根据发送的消息内容中的 headers 属性进行匹配。

  • 在绑定 Queue 与 Exchange 时指定一组 headers 键值对。
  • 当消息发送到 Exchange 时,RabbitMQ 会取到该消息的 headers(也是一个键值对的形式),对比其中的键值对是否完全匹配 Queue 与 Exchange 绑定时指定的键值对;如果完全匹配则消息会路由到该 Queue ,否则不会路由到该 Queue 。
3.4.1 Demo04Message

创建Demo04Message

public class Demo04Message implements Serializable {    public static final String QUEUE = "QUEUE_DEMO_04_A";    public static final String EXCHANGE = "EXCHANGE_DEMO_04";    public static final String HEADER_KEY = "color";    public static final String HEADER_VALUE = "red";        private Integer id;    // ... 省略 set/get/toString 方法}
  • 我们未定意思 RoutingKey 的名字。因为,Headers Exchange 是通过 Exchange + Headers 的组合。
  • 我们定义一个 Headers 键值对,color = red 。
3.4.2 RabbitConfig

RabbitConfig 中增加 Headers Exchange 示例相关的 Exchange、Queue、Binding 的配置。

     public static class HeadersExchangeDemoConfiguration {        // 创建 Queue        @Bean        public Queue demo04Queue() {            return new Queue(Demo04Message.QUEUE, // Queue 名字                    true, // durable: 是否持久化                    false, // exclusive: 是否排它                    false); // autoDelete: 是否自动删除        }        // 创建 Headers Exchange        @Bean        public HeadersExchange demo04Exchange() {            return new HeadersExchange(Demo04Message.EXCHANGE,                    true,  // durable: 是否持久化                    false);  // exclusive: 是否排它        }        // 创建 Binding        // Exchange:Demo04Message.EXCHANGE        // Queue:Demo04Message.QUEUE        // Headers: Demo04Message.HEADER_KEY + Demo04Message.HEADER_VALUE        @Bean        public Binding demo4Binding() {            return BindingBuilder.bind(demo04Queue()).to(demo04Exchange())                    .where(Demo04Message.HEADER_KEY).matches(Demo04Message.HEADER_VALUE); // 配置 Headers 匹配        }    }
  • 在 TopicExchangeDemoConfiguration 内部静态类中,我们也是创建了 Exchange、Queue、Binding 三个 Bean 。有差异点的是,这次我们创建的是 HeadersExchange 。
  • 同时,我们创建的 Binding 是基于 Headers 匹配。
3.4.3 Demo04Producer

创建 Demo04Producer

@Componentpublic class Demo04Producer {    @Autowired    private RabbitTemplate rabbitTemplate;    public void syncSend(Integer id, String headerValue) {        // 创建 MessageProperties 属性        MessageProperties messageProperties = new MessageProperties();        messageProperties.setHeader(Demo04Message.HEADER_KEY, headerValue); // 设置 header        // 创建 Message 消息        Message message = rabbitTemplate.getMessageConverter().toMessage(                new Demo04Message().setId(id), messageProperties);        // 同步发送消息        rabbitTemplate.send(Demo04Message.EXCHANGE, null, message);    }}
  • 和3.1.6 Demo01Producer的 #syncSend(Integer id) 方法大体相似,差异点在于新增了方法参数 headerValue ,方便我们传入不同的 Headers 值。
  • 因为 RabbitTemplate 会提供很方便的传递 Headers 的 API 方法,所以我们只好自己构建,当然也比较简单哈。
3.4.4 Demo04Consumer

创建Demo04Consumer

@Component@RabbitListener(queues = Demo04Message.QUEUE)public class Demo04Consumer {    private Logger logger = LoggerFactory.getLogger(getClass());    @RabbitHandler    public void onMessage(Demo04Message message) {        logger.info("[onMessage][线程编号:{} 消息内容:{}]", Thread.currentThread().getId(), message);    }//    @RabbitHandler(isDefault = true)//    public void onMessage(org.springframework.amqp.core.Message message) {//        logger.info("[onMessage][线程编号:{} 消息内容:{}]", Thread.currentThread().getId(), message);//    }}
3.4.5 简单测试
@RunWith(SpringRunner.class)@SpringBootTest(classes = Application.class)public class Demo04ProducerTest {    private Logger logger = LoggerFactory.getLogger(getClass());    @Autowired    private Demo04Producer producer;    @Test    public void testSyncSendSuccess() throws InterruptedException {        int id = (int) (System.currentTimeMillis() / 1000);        producer.syncSend(id, Demo04Message.HEADER_VALUE);        logger.info("[testSyncSend][发送编号:[{}] 发送成功]", id);        // 阻塞等待,保证消费        new CountDownLatch(1).await();    }    @Test    public void testSyncSendFailure() throws InterruptedException {        int id = (int) (System.currentTimeMillis() / 1000);        producer.syncSend(id, "error");        logger.info("[testSyncSend][发送编号:[{}] 发送成功]", id);        // 阻塞等待,保证消费        new CountDownLatch(1).await();    }}
  • #testSyncSendSuccess() 方法,发送消息的 Headers 的值 "red" ,可以匹配到 "DEMO_QUEUE_04" 。
  • #testSyncSendFailure() 方法,发送消息的 Headers 的值 "error" ,无法匹配到 "DEMO_QUEUE_04" 。
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/278521.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号