# 前置 Zookeeper
docker run -d --name zookeeper -p 2181:2181 -v /etc/localtime:/etc/localtime wurstmeister/zookeeper
# 安装 Kafka
docker run -d --name kafka -p 9092:9092 -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT=${Zookeeper_Host}:${Zookeeper_Port} -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://${Kafka_Host}:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 -it
wurstmeister/kafka /bin/bash
# 进入 Kafka
docker exec -it kafka /bin/bash
# 进到 bin 目录
cd /opt/kafka_2.13-2.7.0/bin/
# 启动消息提供者
bash-4.4# kafka-console-producer.sh --broker-list localhost:9092 --topic vipsoft_kafka
# 启动消息消费者
bash-4.4# kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic vipsoft_kafka --from-beginning
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 3 --topic test
2. Java代码
2.1. Pom.xml
2.2. Producerorg.apache.kafka kafka_2.12 1.0.0 provided org.apache.kafka kafka-clients 1.0.0 org.apache.kafka kafka-streams 1.0.0
package com.mashiro.kafka;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
public class Producer {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"xx.xxx.xxx.xxx:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
KafkaProducer producer = new KafkaProducer(props);
String topic = "test-topic";
try {
while (true) {
String msg = "Hello Kafka" + UUID.randomUUID().toString().substring(5);
ProducerRecord record = new ProducerRecord(topic, msg);
producer.send(record);
System.out.println("Message Send Success !!");
TimeUnit.SECONDS.sleep(3);
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
producer.close();
}
}
}
2.3. Consumer
package com.mashiro.kafka;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
public class Consumer {
public static void main(String[] args) throws InterruptedException {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"xx.xxx.xxx.xxx:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group1");
KafkaConsumer consumer = new KafkaConsumer(props);
consumer.subscribe(Collections.singletonList("test-topic"));
while (true) {
ConsumerRecords poll = consumer.poll(100);
poll.forEach(System.out::println);
TimeUnit.SECONDS.sleep(1);
}
}
}
RabbitMQ
1. 中间件
1.1. 什么是中间件
-
中间件(Middleware)是处于操作系统和应用程序之间的软件,也有人认为它应该属于操作系统中的一部分。
-
人们在使用中间件时,往往是一组中间件集成在一起,构成一个平台(包括开发平台和运行平台),但在这组中间件中必须要有一个通信中间件,即中间件=平台+通信,这个定义也限定了只有用于分布式系统中才能称为中间件,同时还可以把它与支撑软件和实用软件区分开来。
- 中间件屏蔽了底层操作系统的复杂性,使程序开发人员面对一个简单而统一的开发环境,减少程序设计的复杂性,将注意力集中在自己的业务上,不必再为程序在不同系统软件上的移植而重复工作,从而大大减少了技术上的负担。
- 中间件带给应用系统的,不只是开发的简便、开发周期的缩短,也减少了系统的维护、运行和管理的工作量,还减少了计算机总体费用的投入。
- 消息中间件 ActiveMQ、RabbitMQ、Kafka、RocketMQ。
- 负载均衡中间件 Nginx,、Lvs。
- 缓存中间件 Memcache、Redis。
- 数据库中间件 Sharding JDBC、My Cat。
- …
- 理解中间件在项目架构中的作用以及中间件的底层实现
- 使用一些类似的生活概念去理解中间件
- 使用一些流程图或者脑图的方式去梳理中间件在架构中的作用
- 尝试用 Java 技术去实现中间件的原理
- 思考中间件在项目设计中设计和使用的原因
- 找到对应的替代总结方案
- 编写博文总结类同中间件技术的对比和使用场景
- 查看中间件的源码以及开源项目和博文
- 消息中间件负责数据的传递、存储和分发消费三个部分。
- 数据的存储和分发过程中要遵循某种约定成俗的规范。
- 约定成俗的规范称为: 协议
| Active MQ | Rabbit MQ | Kafka | Rocket MQ | |
|---|---|---|---|---|
| 文件存储 | √ | √ | √ | √ |
| 数据库 | √ | × | × | × |
- MQ消息队列角色
- 生产者
- 存储信息
- 消费者
- 生产者生成消息 --> MQ进行存储 --> 消费者获取信息
- 消费者获取信息的方式:
- 推 (Push)
- 拉 (Pull)
| Active MQ | Rabbit MQ | Kafka | Rocket MQ | |
|---|---|---|---|---|
| 发布订阅 | √ | √ | √ | √ |
| 轮询分发 | √ | √ | √ | × |
| 公平分发 | × | √ | √ | × |
| 重发 | √ | √ | × | √ |
| 消息拉取 | × | √ | √ | √ |
- 高可用是指产品在规定的条件和规定的时刻或时间内处于可执行规定功能状态的能力。
- 当业务量增加时,请求页过大,一台消息服务器到达极限,那么一台服务器已经无法满足业务需求,此时消息中间件必须支持集群来达到高可用的目的。
生产者将消息发送到Master节点,所有的都连接这个消息队列共享这块数据区域,Master负责写入,如果Master挂掉,Slave继续服务。
2.4.2.2. Master-Slave 主从数据同步消息同样在Master,Master会同步数据到Slave形成副本,和Zookeeper与Redis主从机制类同。可以达到负载均衡得效果,如果消费者有多个,则可以去不同的节点进行消费。但是消息的拷贝和同步会占用很大的带宽和网络资源。
2.4.2.3. 多主集群同步与 主从数据同步 区别不是很大,但是写入可以往任意节点写入
2.4.2.4. 多主集群转发如果插入数据时Broker1中,元数据信息会存储数据的相关描述和记录存放的位置(队列)。对描述信息(元数据信息)进行同步。如果消费者在Broker2中进行消费,发现没有消息,则对元数据进行查询,然后返回对应的消息。
2.4.2.5. Master-Slave 与 Broker-Cluster组合实现多主多从的热备机制来完成高可用以及数据的热备机制,适用于比较大型的项目。
2.4.2.6. 高可用小结- 消息共享
- 消息同步
- 元数据共享
- 系统可以无故障地持续运行。
- 系统出现崩溃、报错、异常等情况,不影响线上业务地正常运行。
- 保证可靠性:
- 消息的传输: 通过协议来保证系统间数据解析的正确性。
- 消息的存储: 通过持久化来保证消息的可靠性。
https://www.rabbitmq.com/
- RabbitMQ是部署最广泛的开源消息代理。
-
RabbitMQ拥有成千上万的用户,是最受欢迎的开源消息代理之一。
-
从T-Mobile 到 Runtastic,RabbitMQ在全球范围内的小型初创企业和大型企业中都得到使用。
-
RabbitMQ轻巧,易于在内部和云中部署。它支持多种消息传递协议。
-
RabbitMQ可以部署在分布式和联合配置中,以满足大规模,高可用性的要求。
-
RabbitMQ可在许多操作系统和云环境上运行,并为大多数流行语言提供了广泛的开发人员工具。
-
3.2.2. 下载Erlang rpm包https://www.rabbitmq.com/download.html
3.3. 开始安装 3.3.1. 创建存放 Rabbit MQ 的目录https://www.erlang-solutions.com/downloads/#
[root@Mashiro usr]# mkdir /usr/rabbitmq [root@Mashiro usr]# ls bin games java lib64 local sbin src etc include lib libexec rabbitmq share tmp [root@Mashiro usr]# cd rabbitmq/ [root@Mashiro rabbitmq]# ls [root@Mashiro rabbitmq]#3.3.2. 解压安装 Erlang
# 解压erlang的rpm包 [root@Mashiro rabbitmq]# rpm -Uvh erlang-solutions-2.0-1.noarch.rpm warning: erlang-solutions-2.0-1.noarch.rpm: Header V4 RSA/SHA256 Signature, key ID a14f4fca: NOKEY Preparing... ################################# [100%] Updating / installing... 1:erlang-solutions-2.0-1 ################################# [100%] # 安装erlang [root@Mashiro rabbitmq]# yum install -y erlang # 安装完成后输入`erl -v`查看 erlang版本号 确认安装成功 [root@Mashiro rabbitmq]# erl -v Erlang/OTP 24 [erts-12.0.1] [source] [64-bit] [smp:1:1] [ds:1:1:10] [async-threads:1] Eshell V12.0.1 (abort with ^G)3.3.3. 安装 Erlang 插件 Socat
[root@Mashiro rabbitmq]# yum install -y socat3.3.4. 安装 Rabbit MQ
# 解压 Rabbit MQ 的rpm包 [root@Mashiro rabbitmq]# rpm -Uvh rabbitmq-server-3.8.13-1.el8.noarch.rpm # 安装 rabbitmq-server [root@Mashiro rabbitmq]# yum install rabbitmq-server -y # 启动 rabbitmq-server [root@Mashiro rabbitmq]# systemctl start rabbitmq-server # 查看 rabbitmq-server 的状态 [root@Mashiro rabbitmq]# systemctl status rabbitmq-server # 设置开机自启动 [root@Mashiro rabbitmq]# systemctl enable rabbitmq-server # 停止 rabbitmq-server [root@Mashiro rabbitmq]# systemctl stop rabbitmq-server3.3.5. Docker安装RabbitMQ
- 配置了端口号的docker命令
docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management
- 配置了用户账号密码的docker命令
docker run -d --hostname host --name rabbitmq -e RABBITMQ_DEFAULT_USER=user -e RABBITMQ_DEFAULT_PASS=password rabbitmq:3-management
- 完整的docker命令
docker run -di --name rabbitmq_test -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=123456 -p 5672:5672 -p 15672:15672 -p 25672:25672 -p 61613:61613 -p 1883:1883 rabbitmq:management
- 查看日志
docker logs -f rabbitmq_test4. Rabbit MQ 图形化界面 4.1. 安装RabbitMQ web端的客户端插件
[root@Mashiro rabbitmq]# rabbitmq-plugins enable rabbitmq_management4.2. Rabbit MQ 浏览器访问
-
默认帐号、密码为 guest。
-
阿里云服务器需要进行安全组配置。开放15672和5672端口。
-
默认情况只能在localhost本机下访问,需要添加一个支持远程登陆的用户。
http://localhost:15672/4.3. Rabbit MQ 管理授权 4.3.1. 新增用户
[root@Mashiro /]# rabbitmqctl add_user admin adminAdding user "admin" ...Done. Don't forget to grant the user permissions to some virtual hosts! See 'rabbitmqctl help set_permissions' to learn more.4.3.2. 设置用户操作权限
[root@Mashiro /]# rabbitmqctl set_user_tags admin administratorSetting tags for user "admin" to [administrator] ...4.3.3. 设置用户资源权限
[root@Mashiro /]# rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"Setting permissions for user "admin" in vhost "/" ...4.3.4. 其它用户操作
# 新增用户rabbitmqctl add_user [帐号] [密码]# 分配用户权限rabbitmqctl set_user_tags [帐号] [权限]# 修改密码rabbitmqctl change_password [账号] [新密码]# 删除用户rabbitmqctl delete_user [帐号]# 查看用户清单rabbitmqctl list_users# 为用户分配资源权限rabbitmqctl set_permissions -p [用户名] ".*" ".*" ".*"4.3.5. 用户权限
- administrator 超级管理员。可以登录控制台,查看所有信息,可以对Rabbit MQ进行管理。
- monitoring 监控者。 登录控制台,查看所有信息。
- policymaker 策略制定者。登录控制台,指定策略。
- management 普通管理员。 登录控制台。
- 不能访问 Management Plugin
-
查看自己相关节点信息。
-
列出自己可以通过AMQP登入的虚拟机。
-
查看自己的虚拟机节点 Virtual Hosts 的Queues, Exchanges和Bindings信息。
-
查看和关闭自己的Channels和Connections。
-
查看有关自己的虚拟机节点Virtual Hosts的统计信息。包括其它用户在这个节点Virtual Hosts中的活动信息。
- 包含Management所有权限。
- 查看、创建和删除自己的Virtual Hosts所属的Policies和Parameters信息。
- 包含Management所有权限。
- 罗列出所有的Virtual Hosts,包括不能登录的Virtual Hosts。
- 查看其它用户的Connections和Channels信息。
- 查看节点级别的数据,如Clustering和Memory使用情况。
- 查看所有的Virtual Hosts的全局统计信息。
- 最高权限。
- 创建和删除Virtual Hosts。
- 查看、创建和删除Users
- 查看创建Permissions。
- 关闭所有用户的Connections。
- 创建连接工厂
- 创建连接 Connection
- 通过连接获取通道 Channel
- 通过通道 声明交换机、声明队列、绑定关系、路由Key、发送消息、接受消息
- 准备消息内容
- 发送消息给队列 Queue
- 关闭通道
- 关闭连接
package com.mashiro.rabbitmq.simple;import com.rabbitmq.client.AMQP;import com.rabbitmq.client.Connection;import com.rabbitmq.client.Channel;import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;import java.util.concurrent.TimeoutException;public class Producer { public static void main(String[] args) { String hostName = "127.0.0.1"; int port = 5672; String username = "admin"; String password = "123456"; String virtualHost = "/"; Connection connection = null; Channel channel = null; try { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(hostName); factory.setPort(port); factory.setUsername(username); factory.setPassword(password); factory.setVirtualHost(virtualHost); connection = factory.newConnection("Producer Test"); channel = connection.createChannel(); String queueName = "Producer Queue Test"; channel.queueDeclare(queueName,false,false,false,null); String message = "Hello Rabbit MQ!"; channel.basicPublish("",queueName,null,message.getBytes()); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } finally { if (channel != null && channel.isOpen()) { try { channel.close(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } if (connection != null && connection.isOpen()) { try { connection.close(); } catch (IOException e) { e.printStackTrace(); } } } }}
6.3. 消息消费者代码
package com.mashiro.rabbitmq.simple;import com.rabbitmq.client.*;import java.io.IOException;import java.util.concurrent.TimeoutException;public class Consumer { public static void main(String[] args) { String hostName = "xx.xxx.xxx.xxx"; int port = 5672; String username = "admin"; String password = "123456"; String virtualHost = "/"; Connection connection = null; Channel channel = null; try { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(hostName); factory.setPort(port); factory.setUsername(username); factory.setPassword(password); factory.setVirtualHost(virtualHost); connection = factory.newConnection("Producer Test"); channel = connection.createChannel(); String queueName = "Producer Queue Test"; channel.basicConsume(queueName, true, new DeliverCallback() { public void handle(String s, Delivery delivery) throws IOException { System.out.println("接受消息 ==> " + new String(delivery.getBody(), "UTF-8")); } }, new CancelCallback() { public void handle(String s) throws IOException { System.out.println("信息接收失败..."); } }); System.out.println("Blocking"); System.in.read(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } finally { if (channel != null && channel.isOpen()) { try { channel.close(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } if (connection != null && connection.isOpen()) { try { connection.close(); } catch (IOException e) { e.printStackTrace(); } } } }}
7. 什么是AMQP
AMQP,即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。Erlang中的实现有RabbitMQ等。
8. Rabbit MQ组件架构 8.1. Rabbit MQ核心组成部分 8.2. 核心概念- Server: 又称Broker,接收客户端的连接,实现AMQP实体服务,安装Rabbitmq-server。
- Connection: 连接,应用程序与Broker的网络连接 TCP/IP协议三次握手和四次挥手。
- Channel: 网络信道,几乎所有的操作都是Channel中进行,是进行消息读写的通道。每个Channel代表一个会话任务。
- Message: 消息,服务与应用程序之间传送的数据,由Properties和Body组成。Properties是对消息进行修饰,Body则是消息本身。
- Virtual Host: 虚拟地址,用于进行逻辑隔离。一个虚拟主机里有若干个Exchange和Queue,同一个虚拟主机中不能有相同名字的Exchange。
- Exchange: 交换机,接收信息,根据路由键发送消息到绑定的队列。
- Bindings: Exchange和Queue之间的虚拟连接。
- Routing Key: 路由规则,虚拟机可以用它来确定如何路由一个特定信息。
- Queue: 队列,保存消息并转发给消费者。
9.1. Fanout模式https://www.rabbitmq.com/getstarted.html
不处理路由键。你只需要简单的将队列绑定到交换机上。一个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。Fanout交换机转发消息是最快的。
- Queue1
- Queue2
- Queue3
- Queue4
处理路由键。需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全匹配。
- Queue1 Weixin (Routing Key)
- Queue2 Email (Routing Key)
- Queue3 SMS (Routing Key)
- Queue4 Email (Routing Key)
将路由键和某模式进行匹配。此时队列需要绑定要一个模式上。符号“#”匹配一个或多个词,符号“”匹配不多不少一个词。因此“audit.#”能够匹配到“audit.irs.corporate”,但是“audit.” 只会匹配到“audit.irs”
任何发送到Topic Exchange的消息都会被转发到所有关心RouteKey中指定话题的Queue上
- Queue1 com.#
- Queue2 *.course.*
- Queue3 #.order.#
- Queue4 #.user.*
9.4. Headers模式#:匹配 0~n 个词
*: 只能匹配1个词
header模式取消了Routing Key的设置,但要设置key-value匹配队列。
10.代码测试交换机模式 10.1. Fanout模式-入门案例 10.1.1. 交换机设置/fanoutTest --> Queue1
Queue2
Queue3
Queue4
10.1.2. Producerpackage com.mashiro.rabbitmq.routing;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;import java.util.concurrent.TimeoutException;public class Producer { public static void main(String[] args) { String hostName = "xx.xxx.xxx.xxx"; int port = 5672; String username = "admin"; String password = "123456"; String virtualHost = "/"; Connection connection = null; Channel channel = null; try { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(hostName); factory.setPort(port); factory.setUsername(username); factory.setPassword(password); factory.setVirtualHost(virtualHost); connection = factory.newConnection("Producer Test"); channel = connection.createChannel(); String message = "Hello Fanout Exchange Test!"; String exchangeName = "/fanoutTest"; // 绑定交换机名称 String routingKey = ""; // 绑定RoutingKey String type = "fanout"; // 绑定交换机模式类型 channel.basicPublish(exchangeName,routingKey,null,message.getBytes()); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } finally { if (channel != null && channel.isOpen()) { try { channel.close(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } if (connection != null && connection.isOpen()) { try { connection.close(); } catch (IOException e) { e.printStackTrace(); } } } }}
10.1.3. Producer - Result
| Overview | Messages | Message rates | |||||||
|---|---|---|---|---|---|---|---|---|---|
| Name | Type | Features | State | Ready | Unacked | Total | incoming | deliver / get | ack |
| Queue1 | classic | D Args | idle | 1 | 0 | 1 | 0.00/s | 0.00/s | 0.00/s |
| Queue2 | classic | D Args | idle | 1 | 0 | 1 | 0.00/s | 0.00/s | 0.00/s |
| Queue3 | classic | D Args | idle | 1 | 0 | 1 | 0.00/s | 0.00/s | 0.00/s |
| Queue4 | classic | D Args | idle | 1 | 0 | 1 | 0.00/s | 0.00/s | 0.00/s |
package com.mashiro.rabbitmq.routing;import com.rabbitmq.client.*;import java.io.IOException;import java.util.concurrent.TimeoutException;public class Consumer { public static void main(String[] args) { Consumer consumer = new Consumer(); for (int i = 1; i <= 3; i++) { new Thread(()->{ consumer.test(); },"Queue" + i).start(); } } public void test(){ System.out.println(Thread.currentThread().getName()); String hostName = "xx.xxx.xxx.xxx"; int port = 5672; String username = "admin"; String password = "123456"; String virtualHost = "/"; Connection connection = null; Channel channel = null; final String queueName = Thread.currentThread().getName(); try { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(hostName); factory.setPort(port); factory.setUsername(username); factory.setPassword(password); factory.setVirtualHost(virtualHost); connection = factory.newConnection("Fanout Consumer Test"); channel = connection.createChannel(); channel.basicConsume(queueName, true, new DeliverCallback() { public void handle(String s, Delivery delivery) throws IOException { System.out.println("接受消息 ==> " + new String(delivery.getBody(), "UTF-8")); } }, new CancelCallback() { public void handle(String s) throws IOException { System.out.println("信息接收失败..."); } }); System.out.println(queueName + "接收信息!"); System.in.read(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } finally { if (channel != null && channel.isOpen()) { try { channel.close(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } if (connection != null && connection.isOpen()) { try { connection.close(); } catch (IOException e) { e.printStackTrace(); } } } }}
10.1.5. Consumer - Result
Queue1Queue2Queue3Queue3接收信息!Queue2接收信息!Queue1接收信息!接受消息 ==> Hello Fanout Exchange Test!接受消息 ==> Hello Fanout Exchange Test!接受消息 ==> Hello Fanout Exchange Test!10.2. Direct模式-入门案例 10.2.1. 交换机设置
TestRoutingKey --> Queue1 Weixin
Queue2 Email
Queue3 SMS
Queue4 Email
10.2.2. Producerpackage com.mashiro.rabbitmq.direct;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;import java.util.concurrent.TimeoutException;public class Producer { public static void main(String[] args) { String hostName = "47.115.170.206"; int port = 5672; String username = "admin"; String password = "123456"; String virtualHost = "/"; Connection connection = null; Channel channel = null; try { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(hostName); factory.setPort(port); factory.setUsername(username); factory.setPassword(password); factory.setVirtualHost(virtualHost); connection = factory.newConnection("Direct Test"); channel = connection.createChannel(); String queueName = "Direct_Test"; String message = "Hello Direct Exchange Test!"; String exchangeName = "TestRoutingKey"; // 绑定交换机名称 String routingKey = "Email"; // 绑定RoutingKey String type = "direct"; // 绑定交换机模式类型 channel.basicPublish(exchangeName,routingKey,null,message.getBytes()); System.out.println("消息发送成功!"); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } finally { if (channel != null && channel.isOpen()) { try { channel.close(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } if (connection != null && connection.isOpen()) { try { connection.close(); } catch (IOException e) { e.printStackTrace(); } } } }}
10.2.3. Producer - Result
| Overview | Messages | Message rates | |||||||
|---|---|---|---|---|---|---|---|---|---|
| Name | Type | Features | State | Ready | Unacked | Total | incoming | deliver / get | ack |
| Queue1 | classic | D Args | idle | 0 | 0 | 0 | 0.00/s | 0.00/s | 0.00/s |
| Queue2 | classic | D Args | idle | 1 | 0 | 1 | 0.00/s | 0.00/s | 0.00/s |
| Queue3 | classic | D Args | idle | 0 | 0 | 0 | 0.00/s | 0.00/s | 0.00/s |
| Queue4 | classic | D Args | idle | 1 | 0 | 1 | 0.00/s | 0.00/s | 0.00/s |
Queue1Queue2Queue4Queue3Queue2接收信息!接受消息 ==> Hello Direct Exchange Test!Queue4接收信息!接受消息 ==> Hello Direct Exchange Test!Queue3接收信息!Queue1接收信息!



