栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

消息中间件(异步消息传递)——RabbitMQ

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

消息中间件(异步消息传递)——RabbitMQ

RabbitMQ

主要内容

  1. AMQP 简介
  2. RabbitMQ 简介
  3. RabbitMQ 原理
  4. Erlang 安装
  5. 安装 RabbitMQ
  6. RabbitMQ 账户管理
  7. 交换器
一、AMQP简介 1 AMQP是什么?

AMQP(Advanced Message Queuing Protocol),高级消息队列协议)是进程之间传递 异步消息的网络协议。

2 AMQP 工作过程

发布者(Publisher)发布消息(Message),经过交换机(Exchange),交换机根据路由规则将收到消息分发给交换机绑定的队列(Queue),最后 AMQP 代理会将消息投递给订阅了此 队列的消费者,或者消费者按照需求自行获取。

3 队列

队列是数据结构中概念。数据存储在一个队列中,数据是有顺序的,先进的先出,后进 后出。其中一侧负责进数据,另一次负责出数据。 MQ(消息队列)很多功能都是基于此队列结构实现的

二、 RabbitMQ 简介 1 RabbitMQ

介绍 RabbitMQ 是由 Erlang 语言编写的基于 AMQP 的消息中间件。而消息中间件作为分 布式系统重要组件之一,可以解决应用耦合,异步消息,流量削峰等问题。

1.1 解决应用耦合 1.1.1 不使用 MQ 时

1.1.2 使用 MQ 解决耦合

2 RabbitMQ 适用场景

排队算法、秒杀活动、消息分发、异步处理、数据同步、处理耗时任务、流量销峰等。

三、 RabbitMQ 原理


1.Message 消息。
消息是不具名的,它由消息头消息体组成。消息体是不透明的,而消息头则由 一系列可选属性组成,这些属性包括:routing-key(路由键)、priority(相对于其他消息的 优先权)、delivery-mode(指出消息可能持久性存储)等。

2.Publisher

消息的生产者。也是一个向交换器发布消息的客户端应用程序。
3.Consumer

消息的消费者。表示一个从消息队列中取得消息的客户端应用程序。

4.Exchange
交换器。用来接收生产者发送的消息并将这些消息路由给服务器中的队列。 三种常用的交换器类型

  1. direct(发布与订阅 完全匹配)
  2. fanout(广播)
  3. topic(主题,规则匹配)

5.Binding
绑定。用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息 队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。

6.Queue
消息队列。用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一 个消息可投入一个或多个队列。消息一直在队列里面,等待消费者链接到这个队列将其取 走。

7.Routing-key
路由键。RabbitMQ 决定消息该投递到哪个队列的规则。(也可以理解为队列的名称, 路由键是 key,队列是 value)
队列通过路由键绑定到交换器。
消息发送到 MQ 服务器时,消息将拥有一个路由键,即便是空的,RabbitMQ 也会将 其和绑定使用的路由键进行匹配。

如果相匹配,消息将会投递到该队列。

如果不匹配,消息将会进入黑洞。

8.Connection
链接。指 rabbit 服务器和服务建立的 TCP 链接。

9.Channel
信道。
1,Channel 中文叫做信道,是 TCP 里面的虚拟链接。例如:电缆相当于 TCP,信道是一个独立光纤束,一条 TCP 连接上创建多条信道是没有问题的。
2,TCP 一旦打开,就会创建 AMQP 信道。
3,无论是发布消息、接收消息、订阅队列,这些动作都是通过信道完成的。
10.Virtual Host
虚拟主机。表示一批交换器,消息队列和相关对象。虚拟主机是共享相同的身份认证 和加密环境的独立服务器域。每个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器, 拥有自己的队列、交换器、绑定和权限机制。vhost 是 AMQP 概念的基础,必须在链接时 指定,RabbitMQ 默认的 vhost 是/

11.Borker
表示消息队列服务器实体。
交换器和队列的关系
交换器是通过路由键和队列绑定在一起的,如果消息拥有的路由键跟队列和交换器的 路由键匹配,那么消息就会被路由到该绑定的队列中。
也就是说,消息到队列的过程中,消息首先会经过交换器,接下来交换器在通过路由 键匹配分发消息到具体的队列中。
路由键可以理解为匹配的规则。

RabbitMQ 为什么需要信道?为什么不是 TCP 直接通信?

  1. TCP 的创建和销毁开销特别大。创建需要 3 次握手,销毁需要 4 次分手。
  2. 如果不用信道,那应用程序就会以 TCP 链接 Rabbit,高峰时每秒成千上万条链接 会造成资源巨大的浪费,而且操作系统每秒处理 TCP 链接数也是有限制的,必定造成性能 瓶颈。
  3. 信道的原理是一条线程一条通道,多条线程多条通道同用一条 TCP 链接。一条 TCP 链接可以容纳无限的信道,即使每秒成千上万的请求也不会成为性能的瓶颈。
四、Erlang安装

RabbitMQ 是使用 Erlang 语言编写的,所以需要先配置 Erlang

1 修改主机名

RabbitMQ 是通过主机名进行访问的,必须指定能访问的主机名。
# vim /etc/sysconfig/network


# vim /etc/hosts
新添加了一行,前面为服务器 ip,空格后面添加计算机主机名

2 安装依赖

# yum -y install make gcc gcc-c++ kernel-devel m4 ncurses-devel openssl-devel unixODBC unixODBC-devel

3 上传并解压

上传 otp_src_22.0.tar.gz 到/usr/local/tmp 目录中,进入目录并解压。
解压时注意,此压缩包不具有 gzip 属性,解压参数没有 z,只有 xf
# cd /usr/local/tmp
# tar xf otp_src_22.0.tar.gz


4 配置参数

先新建/usr/local/erlang 文件夹,作为安装文件夹
#mkdir -p /usr/local/erlang
进入文件夹
# cd otp_src_22.0
配置参数
# ./configure --prefix=/usr/local/erlang --with-ssl --enable-threads --enable-smp-support --enable-kernel-poll --enable-hipe --without-javac

5 编译并安装

编译
# make
安装
# make install

6 修改环境变量

希望能够在任意位置执行erl命令

修改/etc/profile 文件
#vim /etc/profile
在文件中添加下面代码
export PATH=$PATH:/usr/local/erlang/bin

运行文件,让修改内容生效
# source /etc/profile

7 查看配置是否成功

# erl -version

五、 安装 RabbitMQ 1 上传并解压

上传 rabbitmq-server-generic-unix-3.7.17.tar.xz 到/usr/loca/tmp 中
# cd /usr/local/tmp
# tar xf rabbitmq-server-generic-unix-3.7.17.tar.xz

2 复制到 local 下

复制解压文件到/usr/local 下,命名为 rabbitmq
# cp -r rabbitmq_server-3.7.17 /usr/local/rabbitmq

3 配置环境变量

# vim /etc/profile
在文件中添加
export PATH=$PATH:/usr/local/rabbitmq/sbin
解析文件
# source /etc/profile

4 开启 web 管理插件

进入 rabbitmq/sbin 目录
# cd /usr/local/rabbitmq/sbin
查看插件列表
# ./rabbitmq-plugins list
生效管理插件
# ./rabbitmq-plugins enable rabbitmq_management

5 后台运行

启动 rabbitmq。
#./rabbitmq-server -detached
停止命令,如果无法停止,使用 kill -9 进程号进行关闭
./rabbitmqctl stop_app
启动成功信息如下:

5.1 启动错误解决

如果启动RabbitMQ 发生下述错误,可以提供环境配置文件,解决。环境配置文件命 名为: rabbitmq-env.conf。所在位置是: $rabbitmq_home/etc/rabbitmq/目录。内如 是: HOSTNAME=主机名称

解决:

6 查看 web 管理界面

默认可以在安装 rabbitmq 的电脑上通过用户名:guest 密码 guest 进行访问 web 管 理界面
端口号:15672(放行端口,或关闭防火墙)
在虚拟机浏览器中输入:http://localhost:15672

六、 RabbitMq 账户管理 1 创建账户

语法:./rabbitmqctl add_user username password
# cd /usr/local/rabbitmq/sbin
# ./rabbitmqctl add_user admin admin123

2 给用户授予管理员角色

其中 smallming 为新建用户的用户名 ,多个角色可以用空格分隔
# ./rabbitmqctl set_user_tags admin administrator

3 给用户授权

“/”表示 RabbitMQ 根虚拟主机
admin表示用户名
".*" ".*" ".*"表示完整权限
# ./rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"

4 登录

使用新建账户和密码在 windows 中访问 rabbitmq 并登录
在浏览器地址栏输入:http://ip:15672/
用户名:admin
密码:admin123

七、 Exchange 交换器(交换机)

交换器负责接收客户端传递过来的消息,并转发到对应的队列中。在 RabbitMQ 中支 持四种交换器

  1. Direct Exchange:直连交换器(默认)
  2. Fanout Exchange:扇形交换器
  3. Topic Exchange:主题交换器
  4. Header Exchange:首部交换器。

在 RabbitMq 的 Web 管理界面中 Exchanges 选项卡就可以看见这四个交换器。

1 direct 交换器


direct 交换器是RabbitMQ默认交换器。默认会进行公平调度。所有接受者依次从消 息队列中获取值。Publisher 给哪个队列发消息,就一定是给哪个队列发送消息。对交换器 绑定的其他队列没有任何影响。
(代码演示)一个队列需要绑定多个消费者
需要使用注解/API:
org.springframework.amqp.core.Queue:队列

AmqpTemplate:操作 RabbitMQ 的接口。负责发送或接收消息

@RabbitListener(queues = "") 注解某个方法为接收消息方法

1.1 代码实现 1.1.1 新建项目 Publisher 1.1.1.1添加依赖


    4.0.0

    com.bjsxt
    amqp_rabbit
    pom
    1.0-SNAPSHOT
    
    
        
            
                org.springframework.boot
                spring-boot-dependencies
                2.2.5.RELEASE
                import
                pom
            
        
    


    
        
            org.springframework.boot
            spring-boot-starter-test
            test
        
        
        
            org.springframework.boot
            spring-boot-starter-amqp
        
    


创建Consumer

RabbitMQ_direct交换器_消息消费者处理逻辑代码开发_RabbitListener注解描述类型

package com.bjsxt.rabbit.consumer;

import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;


@RabbitListener(bindings = {
    @QueueBinding(
            value = @Queue(value = "log-info-queue", autoDelete = "false"),
            exchange = @Exchange(value = "log-ex-direct", type = "direct", autoDelete = "false"),
            key = "direct-rk-info"
    )
})
@Component
public class InfoLogConsumer {
    
    @RabbitHandler
    public void onMessage(String msg){
        System.out.println("InfoLogConsumer 消费消息:" + msg);
    }
}

RabbitMQ_direct交换器_消息消费者配置及启动

创建启动类

package com.bjsxt;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class RabbitConsumerApp {
    public static void main(String[] args) {
        SpringApplication.run(RabbitConsumerApp.class, args);
    }
}
1.1.1.2编写配置文件

新建 application.yml.
host:默认值 localhost
username 默认值:guest
password 默认值:guest

spring:
  rabbitmq:
    host: 192.168.89.141  # RabbitMQ服务的地址,默认localhost
    port: 5672  # RabbitMQ的端口,默认5672。
    username: bjsxt # 访问RabbitMQ的用户名,默认guest
    password: bjsxt # 访问RabbitMQ的密码,默认guest
    virtual-host: /  # 访问RabbitMQ的哪一个虚拟主机,默认为 /

启动测试






RabbitMQ_direct交换器_消息消费者处理逻辑代码开发_RabbitListener注解描述方法

创建LogConsumers

package com.bjsxt.rabbit.consumer;

import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;


@Component
public class LogConsumers {
    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue(value = "log-error-queue"),
                    exchange = @Exchange(value = "log-ex-direct"),
                    key = "direct-rk-error"
            )
    })
    public void onLogErrorMessage(String msg){
        System.out.println("错误日志信息:" + msg);
    }

    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue(value = "log-warn-queue", autoDelete = "false"),
                    exchange = @Exchange(value = "log-ex-direct"),
                    key = "direct-rk-warn"
            )
    })
    public void onLogWarnMessage(String msg){
        System.out.println("警告日志信息:" + msg);
    }
}

启动测试





RabbitMQ_direct交换器_消息发送者代码开发

创建publisher

package com.bjsxt.rabbit.sender;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;


@Component
public class LogMessageSender {
    @Autowired
    private AmqpTemplate template;

    
    public void sendMessage(String exchange, String routingKey, String message){
        this.template.convertAndSend(exchange, routingKey, message);
    }
}

创建启动类

package com.bjsxt;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class RabbitPublisherApp {
    public static void main(String[] args) {
        SpringApplication.run(RabbitPublisherApp.class, args);
    }
}

创建测试代码

package com.bjsxt.test;

import com.bjsxt.RabbitPublisherApp;
import com.bjsxt.entity.User;
import com.bjsxt.rabbit.fanoutsender.UserMessageSender;
import com.bjsxt.rabbit.sender.LogMessageSender;
import com.bjsxt.rabbit.topicsender.TopicMessageSender;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import java.util.Random;


@SpringBootTest(classes = {RabbitPublisherApp.class})
@RunWith(SpringRunner.class)
public class TestPublisher {
    @Autowired
    private LogMessageSender sender;
    @Autowired
    private UserMessageSender userMessageSender;
    @Autowired
    private TopicMessageSender topicMessageSender;

    private String exchange = "log-ex-direct";
    private String rkInfo = "direct-rk-info";
    private String rkError = "direct-rk-error";
    private String rkWarn = "direct-rk-warn";


    @Test
    public void testSend(){
        Random r = new Random();
        // 发送10条消息。
        for(int i = 0 ; i < 10; i++){
            // rInt%3 - 0:投递消息到info;1:投递消息到error;2:投递消息到warn
            int rInt = r.nextInt(100);
            if(rInt%3 == 0){
                this.sender.sendMessage(exchange, rkInfo, "发送Info日志消息 - index="+i+";rInt="+rInt);
            }else if(rInt%3 == 1){
                this.sender.sendMessage(exchange, rkError, "发送error日志消息 - index="+i+";rInt="+rInt);
            }else{
                this.sender.sendMessage(exchange, rkWarn, "发送warn日志消息 - index="+i+";rInt="+rInt);
            }
        }
    }
}

编写配置文件

spring:
  rabbitmq:
    host: 192.168.89.141
    username: bjsxt
    password: bjsxt

启动测试
先启动Consumer
运行testSend


RabbitMQ_direct交换器_消息消费者集群搭建方式

相同代码启动多次,自动搭建Consumer集群

package com.bjsxt;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class RabbitConsumerApp1 {
    public static void main(String[] args) {
        SpringApplication.run(RabbitConsumerApp1.class, args);
    }
}



创建测试代码

package com.bjsxt.test;

import com.bjsxt.RabbitPublisherApp;
import com.bjsxt.entity.User;
import com.bjsxt.rabbit.fanoutsender.UserMessageSender;
import com.bjsxt.rabbit.sender.LogMessageSender;
import com.bjsxt.rabbit.topicsender.TopicMessageSender;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import java.util.Random;


@SpringBootTest(classes = {RabbitPublisherApp.class})
@RunWith(SpringRunner.class)
public class TestPublisher {
    @Autowired
    private LogMessageSender sender;
    @Autowired
    private UserMessageSender userMessageSender;
    @Autowired
    private TopicMessageSender topicMessageSender;

    private String exchange = "log-ex-direct";
    private String rkInfo = "direct-rk-info";
    private String rkError = "direct-rk-error";
    private String rkWarn = "direct-rk-warn";

    @Test
    public void testSend2Consumers(){
        for(int i = 0; i < 10; i++){
            this.sender.sendMessage(exchange, rkInfo, "info消息"+i);
            this.sender.sendMessage(exchange, rkError, "error消息"+i);
            this.sender.sendMessage(exchange, rkWarn, "warn消息"+i);
        }
    }

    @Test
    public void testSend(){
        Random r = new Random();
        // 发送10条消息。
        for(int i = 0 ; i < 10; i++){
            // rInt%3 - 0:投递消息到info;1:投递消息到error;2:投递消息到warn
            int rInt = r.nextInt(100);
            if(rInt%3 == 0){
                this.sender.sendMessage(exchange, rkInfo, "发送Info日志消息 - index="+i+";rInt="+rInt);
            }else if(rInt%3 == 1){
                this.sender.sendMessage(exchange, rkError, "发送error日志消息 - index="+i+";rInt="+rInt);
            }else{
                this.sender.sendMessage(exchange, rkWarn, "发送warn日志消息 - index="+i+";rInt="+rInt);
            }
        }
    }
}

2 fanout 交换器


扇形交换器,实际上做的事情就是广播,fanout 会把消息发送给所有的绑定在当前交 换器上的队列。对应 Consumer 依然采用公平调度方式。
(代码演示)一个交换器需要绑定多个队列
需要使用注解/API:

  • FanoutExchange:fanout 交换器
  • Binding:绑定交换器和队列
  • BindingBuilder:Binding 的构建器
  • amq.fanout:内置 fanout 交换器名称
2.1 RabbitMQ_fanout交换器_消息消费者代码开发 2.1.1 代码


创建pojo

package com.bjsxt.entity;

import java.io.Serializable;
import java.util.Objects;

// 实体类型
public class User implements Serializable {
    // 定义一个序列化唯一ID。
    public static final long serialVersionUID = 1L;
    private Long id;
    private String name;
    private int age;

    public User(){}

    @Override
    public String toString() {
        return "User{" +
                "id=" + id +
                ", name='" + name + ''' +
                ", age=" + age +
                '}';
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (o == null || getClass() != o.getClass()) return false;
        User user = (User) o;
        return age == user.age &&
                Objects.equals(id, user.id) &&
                Objects.equals(name, user.name);
    }

    @Override
    public int hashCode() {
        return Objects.hash(id, name, age);
    }

    public Long getId() {
        return id;
    }

    public void setId(Long id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public int getAge() {
        return age;
    }

    public void setAge(int age) {
        this.age = age;
    }
}

创建 FanoutConsumers

package com.bjsxt.rabbit.fanout;

import com.bjsxt.entity.User;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;


@Component
public class FanoutConsumers {
    
    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue(value = "queue-user-1", autoDelete = "false"),
                    exchange = @Exchange(value = "ex-fanout", type = "fanout", autoDelete = "false")
            )
    })
    public void onMessage1(User user){
        System.out.println("onMessage1 run : " + user);
    }

    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue(value = "queue-user-2", autoDelete = "false"),
                    exchange = @Exchange(value = "ex-fanout", type = "fanout")
            )
    })
    public void onMessage2(User user){
        System.out.println("onMessage2 run : " + user);
    }
}

启动查看



2.2RabbitMQ_fanout交换器_消息发送者代码开发 2.2.1 创建发送消息者

package com.bjsxt.rabbit.fanoutsender;

import com.bjsxt.entity.User;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;


@Component
public class UserMessageSender {
    @Autowired
    private AmqpTemplate template;

    
    public void send(User user){
        this.template.convertAndSend("ex-fanout", "", user);
    }
}

创建测试代码

package com.bjsxt.test;

import com.bjsxt.RabbitPublisherApp;
import com.bjsxt.entity.User;
import com.bjsxt.rabbit.fanoutsender.UserMessageSender;
import com.bjsxt.rabbit.sender.LogMessageSender;
import com.bjsxt.rabbit.topicsender.TopicMessageSender;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import java.util.Random;


@SpringBootTest(classes = {RabbitPublisherApp.class})
@RunWith(SpringRunner.class)
public class TestPublisher {
    @Autowired
    private LogMessageSender sender;
    @Autowired
    private UserMessageSender userMessageSender;
    @Autowired
    private TopicMessageSender topicMessageSender;

    private String exchange = "log-ex-direct";
    private String rkInfo = "direct-rk-info";
    private String rkError = "direct-rk-error";
    private String rkWarn = "direct-rk-warn";


    @Test
    public void testSendUserMessage2Fanout(){
        for(int i = 0; i < 3; i++){
            User user = new User();
            user.setId((long) i);
            user.setName("姓名 - " + i);
            user.setAge(20+i);

            this.userMessageSender.send(user);
        }
    }

    @Test
    public void testSend2Consumers(){
        for(int i = 0; i < 10; i++){
            this.sender.sendMessage(exchange, rkInfo, "info消息"+i);
            this.sender.sendMessage(exchange, rkError, "error消息"+i);
            this.sender.sendMessage(exchange, rkWarn, "warn消息"+i);
        }
    }

    @Test
    public void testSend(){
        Random r = new Random();
        // 发送10条消息。
        for(int i = 0 ; i < 10; i++){
            // rInt%3 - 0:投递消息到info;1:投递消息到error;2:投递消息到warn
            int rInt = r.nextInt(100);
            if(rInt%3 == 0){
                this.sender.sendMessage(exchange, rkInfo, "发送Info日志消息 - index="+i+";rInt="+rInt);
            }else if(rInt%3 == 1){
                this.sender.sendMessage(exchange, rkError, "发送error日志消息 - index="+i+";rInt="+rInt);
            }else{
                this.sender.sendMessage(exchange, rkWarn, "发送warn日志消息 - index="+i+";rInt="+rInt);
            }
        }
    }
}

先启动Consumer
启动 testSendUserMessage2Fanout

3 topic 交换器 3.1 RabbitMQ_topic交换器_执行流程介绍


允许在路由键(RoutingKey)中出现匹配规则。

路由键的写法和包写法相同。com.bjsxt.xxxx.xxx 格式。

在绑定时可以带有下面特殊符号,中间可以出现:

  • * : 代表一个单词(两个.之间内容)
  • # : 0 个或多个字符

接收方依然是公平调度,同一个队列中内容轮换获取值。

需要使用注解/API:

  • TopicExchange:Topic 交换器
  • amq.topic:内置 topic 交换器名称
RabbitMQ_topic交换器_消息消费者代码开发 代码

创建Consumer

package com.bjsxt.rabbit.topic;

import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;


@Component
public class TopicConsumers {
    
    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue(value = "queue-sms-topic", autoDelete = "false"),
                    exchange = @Exchange(value = "ex-topic", type = "topic"),
                    key = "*.rk.sms"
            )
    })
    public void onUserSMSMessage(String message){
        System.out.println("用户短信消息内容是:" + message);
    }

    
    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue(value = "queue-email-topic", autoDelete = "false"),
                    exchange = @Exchange(value = "ex-topic", type = "topic"),
                    key = "*.rk.email"
            )
    })
    public void onUserEmailMessage(String message){
        System.out.println("用户邮件消息内容是:" + message);
    }

    
    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue(value = "queue-all-topic", autoDelete = "false"),
                    exchange = @Exchange(value = "ex-topic", type = "topic"),
                    key = "*.rk.*"
            )
    })
    public void onUserServiceMessage(String message){
        System.out.println("执行的消息处理逻辑是:" + message);
    }
}

启动RabbitConsumerAPP



RabbitMQ_topic交换器_消息发送者代码开发 代码

package com.bjsxt.rabbit.topicsender;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;


@Component
public class TopicMessageSender {
    @Autowired
    private AmqpTemplate template;

    
    public void send(String exchange, String routingKey, String message){
        template.convertAndSend(exchange, routingKey, message);
    }
}

创建测试类

package com.bjsxt.test;

import com.bjsxt.RabbitPublisherApp;
import com.bjsxt.entity.User;
import com.bjsxt.rabbit.fanoutsender.UserMessageSender;
import com.bjsxt.rabbit.sender.LogMessageSender;
import com.bjsxt.rabbit.topicsender.TopicMessageSender;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import java.util.Random;


@SpringBootTest(classes = {RabbitPublisherApp.class})
@RunWith(SpringRunner.class)
public class TestPublisher {
    @Autowired
    private LogMessageSender sender;
    @Autowired
    private UserMessageSender userMessageSender;
    @Autowired
    private TopicMessageSender topicMessageSender;

    private String exchange = "log-ex-direct";
    private String rkInfo = "direct-rk-info";
    private String rkError = "direct-rk-error";
    private String rkWarn = "direct-rk-warn";

    @Test
    public void testSendMessage2Topic(){
        // 随机数%6
        // 0 rk - user.rk.sms *.rk.*  *.rk.sms
        // 1 rk - user.rk.email   *.rk.* *.rk.email
        // 2 rk - order.rk.sms *.rk.*  *.rk.sms
        // 3 rk - order.rk.email  *.rk.* *.rk.email
        // 4 rk - reg.rk.sms *.rk.*  *.rk.sms
        // 5 rk - reg.rk.qq  *.rk.*
        Random r = new Random();
        for(int i = 0; i < 10; i++){
            int rInt = r.nextInt(100);
            if(rInt%6 == 0){
                this.topicMessageSender.send("ex-topic",
                        "user.rk.sms",
                        "用户登录验证码是123456 - 发送短信");
            }else if(rInt%6 == 1){
                this.topicMessageSender.send("ex-topic",
                        "user.rk.email",
                        "用户登录验证码是123456 - 发送到邮箱");
            }else if(rInt%6 == 2){
                this.topicMessageSender.send("ex-topic",
                        "order.rk.sms",
                        "订单下订成功 - 发送短信");
            }else if(rInt%6 == 3){
                this.topicMessageSender.send("ex-topic",
                        "order.rk.email",
                        "订单下订成功 - 发送到邮箱");
            }else if(rInt%6 == 4){
                this.topicMessageSender.send("ex-topic",
                        "reg.rk.sms",
                        "注册验证码是654321 - 发送短信");
            }else if(rInt%6 == 5){
                this.topicMessageSender.send("ex-topic",
                        "reg.rk.qq",
                        "注册验证码是654321 - 发送QQ信息");
            }
        }
    }

    @Test
    public void testSendUserMessage2Fanout(){
        for(int i = 0; i < 3; i++){
            User user = new User();
            user.setId((long) i);
            user.setName("姓名 - " + i);
            user.setAge(20+i);

            this.userMessageSender.send(user);
        }
    }

    @Test
    public void testSend2Consumers(){
        for(int i = 0; i < 10; i++){
            this.sender.sendMessage(exchange, rkInfo, "info消息"+i);
            this.sender.sendMessage(exchange, rkError, "error消息"+i);
            this.sender.sendMessage(exchange, rkWarn, "warn消息"+i);
        }
    }

    @Test
    public void testSend(){
        Random r = new Random();
        // 发送10条消息。
        for(int i = 0 ; i < 10; i++){
            // rInt%3 - 0:投递消息到info;1:投递消息到error;2:投递消息到warn
            int rInt = r.nextInt(100);
            if(rInt%3 == 0){
                this.sender.sendMessage(exchange, rkInfo, "发送Info日志消息 - index="+i+";rInt="+rInt);
            }else if(rInt%3 == 1){
                this.sender.sendMessage(exchange, rkError, "发送error日志消息 - index="+i+";rInt="+rInt);
            }else{
                this.sender.sendMessage(exchange, rkWarn, "发送warn日志消息 - index="+i+";rInt="+rInt);
            }
        }
    }
}

先运行Consumer
运行testSendMessage2Topic

项目练习源码:https://gitee.com/cutelili/rabbit-mq/tree/master/amqp_rabbit

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/331873.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号