- 一、基础知识
- 二、Rabbitmq消息发送模式
- 1、简单队列
- 2、工作队列
- 3、发布/订阅
- 4、路由模式
- 5、主题模式
- 三、RabbitMQ交换机类型
- 1、Direct exchange
- 2、Fanout exchange
- 3、Topic exchange
- 四、实战
- 1、运行环境
- 2、统一连接类
- 3、简单队列消息模式
- 4、发布/订阅消息模式---交换机fanout类型
- 5、路由模式---交换机direct类型
- 6、主题模式---交换机topic类型
- 五、Rabbit Management
- 1、connections管理
- 2、channel管理
- 3、exchange管理
- Queue管理
- 六、参考资料
对于RabbitMQ其他知识体系,本文中就不仔细讲解了,先列出系列核概念,帮助大家建立知识体系;
Message
消息,消息是不具名的,它由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可能需要持久性存储)等。
Publisher
消息的生产者,也是一个向交换器发布消息的客户端应用程序。
Exchange
交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。
Binding
绑定,用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。
Queue
消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。
Channel
信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内地虚拟连接,AMQP 命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说建立和销毁 TCP 都是非常昂贵的开销,所以引入了信道的概念,以复用一条 TCP 连接。
Consumer
消息的消费者,表示一个从消息队列中取得消息的客户端应用程序。
Virtual Host
虚拟主机,表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥有自己的队列、交换器、绑定和权限机制。vhost 是 AMQP 概念的基础,必须在连接时指定,RabbitMQ 默认的 vhost 是 / 。
Broker
表示消息队列服务器实体。
消息发送基本流程总结:生产者(Producer)发送->中间件->消费者(Consumer)接收消息。
RabbitMQ包括五种队列模式,简单队列、工作队列、发布/订阅、路由、主题等。
1、简单队列1)生产者将消息发送到队列,消费者从队列获取消息。
2)一个队列对应一个消费者。
1)一个生产者,多个消费者。
2)一个消息发送到队列时,只能被一个消费者获取。
3)多个消费者并行处理消息,提升消息处理速度。
注意:channel.basicQos(1)表示同一时刻只发送一条消息给消费者。
消息模型在Web应用程序中特别有用,可以处理短的HTTP请求窗口中无法处理复杂的任务。
将消息发送到交换机,队列从交换机获取消息,队列需要绑定到交换机。
1)一个生产者,多个消费者。
2)每一个消费者都有自己的一个队列。
3)生产者没有将消息直接发送到队列,而是发送到交换机。
4)每一个队列都要绑定到交换机。
5)生产者发送的消息,经过交换机到达队列,实现一个消息被多个消费者获取的目的。
6)发布/订阅映射的交换机类型为“fanout”。
注意:交换机本身没有存储消息的能力,消息只能存储到队列中。
了解到这里,大家会熟悉了工作队列模式和发布/订阅模式,思考一下两者存在什么区别?
1、从模式图中,可以清楚明确到发布/订阅模式需要定义交换机exchange,而工作队列模式中并未明确定义交换机(那意味着底层没有使用交换机???不对。)
2、发布/订阅生产者是面向交换机发送消息;
工作队列模式生产者是面向队列发送消息(底层使用默认交换机)。
3、发布/订阅消费者需要设置队列和交换机的绑定;
工作队列中消费者开发者不需要设置绑定关系,底层会将队列绑定到默认的交换机;
第二点和第三点,在下面实践编码过程中,会明确定义其区别;
1)路由模式的交换机类型为“direct”。
2)绑定队列到交换机时指定 key,即路由键,一个队列可以指定多个路由键。
3)生产者发送消息时指定路由键,这时,消息只会发送到绑定的key的对应队列中。
解释上图含义:
P:生产者,向Exchange发送消息,发送消息时,会指定一个routing key。
X:Exchange(交换机),接收生产者的消息,然后把消息递交给 与routing key完全匹配的队列
C1:消费者,其所在队列指定了需要routing key 为 error 的消息;
C2:消费者,其所在队列指定了需要routing key 为 info、error、warning 的消息
1)每个消费者监听自己的队列,并且设置带统配符的routingkey,生产者将消息发给broker,由交换机根据routingkey来转发消息到指定的队列。
2)Routingkey一般都是有一个或者多个单词组成,多个单词之间以“.”分割;
3)主题模式的交换机类型为“topic”
4)通配符规则:
#:匹配一个或多个词;
*:匹配不多不少恰好1个词;
消息中的路由键(routing key)如果和 Binding 中的 binding key 一致, 交换器就将消息发到对应的队列中。路由键与队列名完全匹配,如果一个队列绑定到交换机要求路由键为“dog”,则只转发 routing key 标记为“dog”的消息,不会转发“dog.puppy”,也不会转发“dog.guard”等等。它是完全匹配、单播的模式。
每个发到 fanout 类型交换器的消息都会分到所有绑定的队列上去。fanout 交换器不处理路由键,只是简单的将队列绑定到交换器上,每个发送到交换器的消息都会被转发到与该交换器绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。fanout 类型转发消息是最快的。
topic 交换器通过模式匹配分配消息的路由键属性,将路由键和某个模式进行匹配,此时队列需要绑定到一个模式上。它将路由键和绑定键的字符串切分成单词,这些单词之间用点隔开。它同样也会识别两个通配符:符号“#”和符号“”。#匹配0个或多个单词,匹配不多不少一个单词。
四、实战针对以上知识体系,小编part仅针对部分进行演示,其他消息模式或者交换机类型,大家有兴趣的可以进行扩展;
1、运行环境1、centos镜像或者已安装rabbitmq虚拟机、rabbitmq图形化界面;
2、JDK1.8+版本+maven;
3、依赖配置
2、统一连接类com.rabbitmq amqp-client 5.7.1
注意点:factory.setHost("****");配置成自己的服务器地址
package com.itwx.mq.util;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class ConnectionUtil {
public static Connection getConnection() throws Exception {
//定义连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置服务地址
//todo 配置成自己的服务器地址
factory.setHost("****");
//端口
factory.setPort(5672);
//设置虚拟机,一个mq服务可以设置多个虚拟机,每个虚拟机就相当于一个独立的mq
factory.setVirtualHost("/");
factory.setUsername("guest");
factory.setPassword("guest");
// 通过工厂获取连接
return factory.newConnection();
}
}
3、简单队列消息模式
此种消息模式,底层选择rabbitmq默认的交换机类型,无需开发者自定义交换机、以及交换机和队列的绑定关系;
面向队列编程
故开发者需要定义的角色:队列;
- 生产者
package com.itwx.mq.basic;
import com.itwx.mq.util.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
public class ProviderSend {
// private final static String EXCHANGE_NAME = "wx_test_exchange";
private final static String QUEUE_NAME = "wx_test_queue";
public static void main(String[] argv) throws Exception {
// 1、获取到连接
Connection connection = ConnectionUtil.getConnection();
// 2、从连接中创建通道,使用通道才能完成消息相关的操作
Channel channel = connection.createChannel();
// 3、声明(创建)队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 4、消息内容
String message = "Hello rabbitmq!";
// 向指定的队列中发送消息
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println("provider send:" + message);
//关闭通道和连接(资源关闭最好用try-catch-finally语句处理)
try {
channel.close();
connection.close();
} catch (Exception e) {
}
}
}
- 消费者
package com.itwx.mq.basic;
import com.itwx.mq.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Consumer {
private final static String QUEUE_NAME = "wx_test_queue";
public static void main(String[] argv) throws Exception {
// 获取到连接
Connection connection = ConnectionUtil.getConnection();
//创建会话通道,生产者和mq服务所有通信都在channel通道中完成
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//实现消费方法
DefaultConsumer consumer = new DefaultConsumer(channel){
// 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//TODO 手动抛异常,造成消息丢失现象
//int i= 1 / 0;
//交换机
String exchange = envelope.getExchange();
//消息id,mq在channel中用来标识消息的id,可用于确认消息已接收
long deliveryTag = envelope.getDeliveryTag();
// body 即消息体
String msg = new String(body,"utf-8");
System.out.println("consumer receive message:" + msg + ",messageId:" + deliveryTag + ",exchange name:" + exchange);
}
};
// 监听队列,第二个参数:是否自动进行消息确认。
//参数:String queue, boolean autoAck, Consumer callback
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
4、发布/订阅消息模式—交换机fanout类型
发布订阅上面基础概念我们介绍其与工作队列消息模式的区别,故面向交换机编程;
开发者需要定义的角色:交换机、队列、路由key、交换机和队列之间的绑定;
- 生产者
package com.itwx.mq.fanout;
import com.itwx.mq.util.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
public class ProviderFanout {
private final static String EXCHANGE_NAME = "test_fanout_exchange";
public static void main(String[] args) throws Exception {
// 获取到连接
Connection connection = ConnectionUtil.getConnection();
// 获取通道
Channel channel = connection.createChannel();
// 声明exchange,指定类型为fanout
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
// 消息内容
String message = "hello rabbitmq, fanout type message!";
// 发布消息到Exchange
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
System.out.println("provider fanout send message:" + message);
channel.close();
connection.close();
}
}
- 消费者1
package com.itwx.mq.fanout;
import com.itwx.mq.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
public class EmailFanoutConsumer {
private final static String QUEUE_NAME = "test_email_fanout_queue";//邮件队列
private final static String EXCHANGE_NAME = "test_fanout_exchange";
public static void main(String[] argv) throws Exception {
// 获取到连接
Connection connection = ConnectionUtil.getConnection();
// 获取通道
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 绑定队列到交换机
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
// 定义队列的消费者
DefaultConsumer consumer = new DefaultConsumer(channel) {
// 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
// body 即消息体
String msg = new String(body);
System.out.println("email received : " + msg + "!");
}
};
// 监听队列,自动返回完成
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
- 消费者2
package com.itwx.mq.fanout;
import com.itwx.mq.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
public class SmsFanoutConsumer {
private final static String QUEUE_NAME = "test_sms_fanout_queue";//短信队列
private final static String EXCHANGE_NAME = "test_fanout_exchange";
public static void main(String[] argv) throws Exception {
// 获取到连接
Connection connection = ConnectionUtil.getConnection();
// 获取通道
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 绑定队列到交换机
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
// 定义队列的消费者
DefaultConsumer consumer = new DefaultConsumer(channel) {
// 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
// body 即消息体
String msg = new String(body);
System.out.println("sms received : " + msg + "!");
}
};
// 监听队列,自动返回完成
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
代码存在一些思考,大家可以验证一下,避免做伸手党!
5、路由模式—交换机direct类型- 生产者
package com.itwx.mq.direct;
import com.itwx.mq.util.ConnectionUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
public class ProviderDirect {
private final static String EXCHANGE_NAME = "test_direct_exchange";
public static void main(String[] argv) throws Exception {
// 获取到连接
Connection connection = ConnectionUtil.getConnection();
// 获取通道
Channel channel = connection.createChannel();
// 声明exchange,指定类型为direct
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
// 消息内容,
String message = "欢迎您使用rabbitmq消息中间件,如有疑问及时反馈";
// 发送消息,并且指定routing key 为:sms,只有短信服务能接收到消息
channel.basicPublish(EXCHANGE_NAME, "sms", null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
channel.close();
connection.close();
}
}
- 消费者1
package com.itwx.mq.direct;
import com.itwx.mq.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
public class EmailDirectConsumer {
private final static String QUEUE_NAME = "direct_exchange_queue_email";//邮件队列
private final static String EXCHANGE_NAME = "test_direct_exchange";
public static void main(String[] argv) throws Exception {
// 获取到连接
Connection connection = ConnectionUtil.getConnection();
// 获取通道
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 绑定队列到交换机,同时指定需要订阅的routing key。可以指定多个
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "email");//指定接收发送方指定routing key为email的消息
// 定义队列的消费者
DefaultConsumer consumer = new DefaultConsumer(channel) {
// 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
// body 即消息体
String msg = new String(body);
System.out.println(" email received : " + msg + "!");
}
};
// 监听队列,自动ACK
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
- 消费者2
package com.itwx.mq.direct;
import com.itwx.mq.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
public class SmsDirectConsumer {
private final static String QUEUE_NAME = "direct_exchange_queue_sms";//短信队列
private final static String EXCHANGE_NAME = "test_direct_exchange";
public static void main(String[] argv) throws Exception {
// 获取到连接
Connection connection = ConnectionUtil.getConnection();
// 获取通道
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 绑定队列到交换机,同时指定需要订阅的routing key。可以指定多个
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "sms");//指定接收发送方指定routing key为sms的消息
//channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "email");
// 定义队列的消费者
DefaultConsumer consumer = new DefaultConsumer(channel) {
// 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
// body 即消息体
String msg = new String(body);
System.out.println(" sms received : " + msg + "!");
}
};
// 监听队列,自动ACK
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
6、主题模式—交换机topic类型
- 生产者
package com.itwx.mq.topic;
import com.itwx.mq.util.ConnectionUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
public class ProviderTopic {
private final static String EXCHANGE_NAME = "test_topic_exchange";
public static void main(String[] argv) throws Exception {
// 获取到连接
Connection connection = ConnectionUtil.getConnection();
// 获取通道
Channel channel = connection.createChannel();
// 声明exchange,指定类型为topic
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
// 消息内容
String message = "发送手机验证码,登录信息已发送邮件通知";
// 发送消息,并且指定routing key为:quick.orange.rabbit
channel.basicPublish(EXCHANGE_NAME, "phone.verification_code.email.login.notice", null, message.getBytes());
System.out.println(" [动物描述:] Sent '" + message + "'");
channel.close();
connection.close();
}
}
- 消费者1
package com.itwx.mq.topic;
import com.itwx.mq.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
public class TopicConsumerOne {
private final static String QUEUE_NAME = "topic_exchange_queue_Q1";
private final static String EXCHANGE_NAME = "test_topic_exchange";
public static void main(String[] argv) throws Exception {
// 获取到连接
Connection connection = ConnectionUtil.getConnection();
// 获取通道
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 绑定队列到交换机,同时指定需要订阅的routing key
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "*.verification_code.#");
// 定义队列的消费者
DefaultConsumer consumer = new DefaultConsumer(channel) {
// 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
// body 即消息体
String msg = new String(body);
System.out.println(" [消费者1] received : " + msg + "!");
}
};
// 监听队列,自动ACK
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
- 消费者2
package com.itwx.mq.topic;
import com.itwx.mq.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
public class TopicConsumerTwo {
private final static String QUEUE_NAME = "topic_exchange_queue_Q2";
private final static String EXCHANGE_NAME = "test_topic_exchange";
public static void main(String[] argv) throws Exception {
// 获取到连接
Connection connection = ConnectionUtil.getConnection();
// 获取通道
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 绑定队列到交换机,同时指定需要订阅的routing key
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "*.email");
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "#.login.#");
// 定义队列的消费者
DefaultConsumer consumer = new DefaultConsumer(channel) {
// 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
// body 即消息体
String msg = new String(body);
System.out.println(" [消费者2] received : " + msg + "!");
}
};
// 监听队列,自动ACK
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
五、Rabbit Management
对于上面的实战,大家需要结合图形化界面验证更便于理解,而且在日常开发过程也方便排查问题,故小编顺便简单介绍一下Rabbit Management!
登录地址:http://******:15672
账户密码:一般均是默认值guest/guest
介绍各个服务器的连接信息;
2、channel管理建立在连接基础上的通道,实际开发中链接应为全局变量,通道为线程级;
3、exchange管理如图:大家可以进行查询已定义的交换机信息;
如果想要在图形化界面新增交换机,如图:
里面的参数,与编码中的含义一致,根据自己需求设定交换机类型、是否持久化、自动删除等;
点击具体队列名字,进入详情设置:绑定交换机、路由key,以及消息内容等;
其他的细节属性,大家可以仔细摸索,简单的英文名字,很明确见名知其义;
1、rabbitmq基础概念知识
2、rabbitmq官网
3、最详细的RabbitMQ介绍



