上图中展示了三种topic、fanout、direct交换器,Q1、Q2等则是一个个消息队列,包含了多个消息。
交换器、队列、绑定
生产者把消息发布到交换器上;消息最终到达队列,并被消费者接受;绑定决定了消息如何从路由器路由到特定的队列
队列:
- 为消息提供了处所
- 对负载均衡来说,队列是绝佳的方案。只需要附加一堆消费者,并让RabbitMQ以循环的方式均匀地分配发来的消息。
- 队列是Rabbit中消息的最后的终点
交换器-四种类型
- direct:如果路由键匹配,消息就会被投递到对应的队列。
服务器必须实现direct类型交换器,包含一个空白字符串名称的默认交换器。当声明一个队列时,他会自动绑定到默认交换器,并以队列名称作为路由键。
- fanout
当你发送一条消息到fanout交换器时,他会把消息投递给所有附加在此交换器上的队列。这允许你对单条消息做不同的反应。举例来说。一个web应用程序可能需要在用户上传新的图片时,用户相册必须清除缓存,同时用户应该得到些积分奖励。你可以将两个队列绑定到图片上传交换器上,一个用于清除缓存,另一个用于增加用户积分。
- topic
它是的来自不同源头的消息能够到达同一队列。以Web应用程序日志系统为例。你拥有多个不同的日志级别,例如error、info、和warning。与此同时,你的应用程序分为以下几个模块,user、shopping、score等,如果在发送消息的动作失败时,你想要报告一个error,参见如上
- headers 允许你匹配消息的header而非路由键,除此之外,和direct交换器完全一直,但性能差一些,因此并不实用。
vhost之于RabbitMQ相当于虚拟机之于物理主机。每个vhost是分离的。
通过安装路径下的sbin目录中的rabbitmqctl工具来创建一个vhost
rabbitmqctl add_vhost[vhost_name]
删除vhost:
rabbitmqctl delete_vhost[vhost_name]
如果你想知道Rabbit服务器上运行着哪些vhost时可以使用以下命令:
rabbitmqctl list_vhosts消息持久化
默认情况下重启服务后,队列和交换器都消失了。
但我们可以将队列和交换器的durable属性设置为true,这样就不需要在屋企断电后重新创建队列和交换器了。
但这远远还不够,消息的持久化需要:
把它的投递模式选线设置为2(持久)
发送到持久化的交换器
发送到持久化的队列。
Rabbit的实现方式是,将它们写入磁盘上的一个持久化日志问件。当发布一条持久性消息到交换器上时,Rabbit会在消息提交到日志文件后才发送响应。如果这条消息路由到非持久化队列时,则会自动从持久性日志中删除。并且无法从服务器重启中恢复。
一旦从持久化队列中消费了一条持久性消息的话,RabbitMQ会在持久化日志中把这条消息标记为等待垃圾收集。在你消费之前,如果服务重启,服务器会自动重建交换器和队列,重播持久性日志文件中的消息到合适的队列或者交换器上。(取决于Rabbit服务器宕机的时候,消息处在路由过程的哪个环节)
同样的持久化会造成性能上的损失,写入磁盘比存入内存中慢不止一点点,会极大地减少服务器每秒可处理的消息总数。导致消息吞吐量降低至少10倍的情况并不少见。另外在集群环境下,持久化的这样的方案并不友好。即虽然允许你和急群众任何节点的任意队列进行通信,但队列并没有在每个节点上同步,如果某个队列的节点崩溃了,那么这个队列也就从集群中消失了。
一条消息历经从生产者到消费者的生命周期 发布者- 链接到RabbitMQ
- 获取信道
- 声明交换器
- 创建消息
- 关闭信道
- 关闭连接
java代码如下:
// connection是socket连接的抽象,并且为我们管理协议版本协商(protocol version negotiation),
// 认证(authentication )等等事情。这里我们要连接的消息代理在本地,因此我们将host设为“localhost”。
// 如果我们想连接其他机器上的代理,只需要将这里改为特定的主机名或IP地址。
ConnectionFactory factory = new ConnectionFactory();
// factory.setHost("localhost");
// factory.setPort(5672); //默认端口号
// factory.setUsername("guest");//默认用户名
// factory.setPassword("guest");//默认密码
Connection connection = factory.newConnection();
// 接下来,我们创建一个channel(信道),绝大部分API方法需要通过调用它来完成。
Channel channel = connection.createChannel();
channel.exchangeDeclare("hello-exchange", "direct",false,true,false,null);
// 发送之前,我们必须声明消息要发往哪个队列,然后我们可以向队列发一条消息:
channel.queueDeclare("queue1", false, false, false, null);
String message = "Hello world-----------";
channel.basicPublish("", "queue1", null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
channel.close();
connection.close();
消费者
- 连接到RabbitMQ
- 获得信道
- 声明交换器
- 声明队列
- 把队列和交换器绑定起来
- 消费消息
- 关闭信道
- 关闭连接
ConnectionFactory factory = new ConnectionFactory();
// factory.setHost("localhost");
// factory.setPort(5672);
// factory.setUsername("guest");
// factory.setPassword("guest");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("hello-exchange", "direct",false,true,false,null);
channel.queueDeclare("queue1", false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
}
};
channel.basicConsume("queue1", true, consumer);



