流量削峰应用解耦异步处理 组成 4大部分
生成者交换机队列消费者信道Channel 用于 复用一个连接(Connection–抽象socket连接,实现协议转换认证等) 使用
服务启动/停止:service rabbitmq-server start/stop安装管理插件:rabbitmq-plugins enable rabbitmq_management 默认访问端口:15672
rabbitmqctl add_user Producer --> RabbitMQ --> Consumer导入amqp-clientProducer
Consumer
queueDeclare(…)
若不存在,则创建队列。若已存在,则获取已存在队列(不允许参数不一致)— 幂等
工作队列 Work Queue
使用消息队列分发耗时的任务,避免立即执行资源密集型任务,并等待其完成 默认使用 Round-robin(轮询)分派消息 消息确认(Message Acknowlegment) 机制 消息队列收到Ack后,认为消息已经送达并被处理消费者终止或断开后,若没有收到Ack则快速将消息发送给下一个消费者超时时间(默认设为30s)autoAck = true自动Ackchannel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);手动Ack 消息持久性 queueDeclare的durable参数设为true: RabbitMQ关闭或崩溃后, 标记为persistent消息不会从该队列中丢失(通过MessageProperties设置)并不保证完全不会丢失(写入磁盘缓存区的数据可能未刷新 fsync(2) ) channel.basicQos(prefetchCount); – 限制同一时间一个消费者可获取的消息的数量 一个消息传递给多个消费者 – 发布/订阅模式 创建临时队列 channel.queueDeclare() – 无参数 控制exchange将消息派送到 匹配该消息的routingKey的 消息队列将消息队列绑定至exchange时,设置其binding key(routingKey),以匹配消息
queueBind(String queue, String exchange, String routingKey) 一个消息队列 可以 以不同的binding key多次绑定到同一个exchange 多个消息队列 可以 以相同的binding key同时绑定到同一个exchange 发送消息时,设置routingKey以匹配队列
basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
exchange
绑定一个或多个channel 将消息分派到指定的channel 类型: direct:将消息的routingKey 与 队列绑定到exchange的bindingKey 精确匹配topic:
消息和队列绑定所用 routing key 应为以’.'分割的一组单词。exchange转发时进行匹配‘*’:表示任意一个词‘#’:表示任意多个词 headers:fanout: 将消息送至所有与其绑定的队列。 将忽略routingKey 声明exchange channel.exchangeDeclare(); channel.basicPublish("", "hello", null, message.getBytes())使用匿名exchange分派消息到hello队列; 绑定exchange 和 channel channel.queueBind(queueName, exchangeName, ""); 若无队列绑定到exchange上,则送至该exchange的消息将被丢弃 请求端: 设置请求消息的correlationId属性,以标识一个request消息设置消息的replyTo属性,用于通知接收方用该队列返回response消息发送request消息等待reponse消息
服务端
等待接收request消息处理request消息使用request消息的replyTo属性指定的消息队列返回response消息(设置reponse消息的correlationId属性为request消息的correlationId以标识其回应的请求)
消息的常用属性(AMQP 0-9-1协议)
deliveryMode: 是否持久contentType: 消息内容的类型 (application/json …)replyTo: 用于callback的队列correlationId:用于标识一个消息(用于response和request匹配) public class Producer {
private static final String QUEUE_NAME = "Test_Queue";
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("8.141.151.176");
connectionFactory.setUsername("admin");
connectionFactory.setPassword(" ");
Connection connection = connectionFactory.newConnection();
System.out.println("Connection...");
//创建信道
Channel channel = connection.createChannel();
//声明队列
//声明队列是幂等的
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//发送消息
for(int i = 0; i<10000000; i++){
String message = "Hello!!!!!!! " + i;
channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
System.out.println("Send: " + message);
}
channel.close();
connection.close();
}
}
public class Consumer {
private static final String QUEUE_NAME = "Test_Queue";
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接
//......
//创建信道
//......
//声明队列
//声明队列是幂等的
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//回调函数
DeliverCallback deliverCallback = (consumerTag, delivery)->{
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("Received: " + message);
};
//注册回调函数,开始监听
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {});
}
}
Channel channel = RabbitMQUtils.getChannel();
//声明一个匿名的消息队列,用于服务端传回响应
String callbackQueueName = channel.queueDeclare().getQueue();
//correlationId 携带在request上,再由response带回,以确定response对应的request
String myCorrelationId = "misakamikoto";
//配置
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
AMQP.BasicProperties props = builder.replyTo(callbackQueueName).correlationId(myCorrelationId).build();
//发送携带着 correlationId 和 replyTo 属性的消息
channel.basicPublish("", "RPCQueue", props, "hello.".getBytes(StandardCharsets.UTF_8));
System.out.println("request...");
//等待response
System.out.println("wait response...");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
System.out.println("response: " + new String(delivery.getBody()));
System.out.println("message correlationId: " + delivery.getProperties().getCorrelationId());
};
channel.basicConsume(callbackQueueName, deliverCallback, consumerTag -> {});
Channel channel = RabbitMQUtils.getChannel();
channel.queueDeclare("RPCQueue", false, false, true, null);
DeliverCallback deliverCallback = ((consumerTag, delivery) -> {
String request = new String(delivery.getBody(), StandardCharsets.UTF_8);
System.out.println("receive: " + request);
//处理请求
work();
String callbackQueueName = delivery.getProperties().getReplyTo(); //获取用于返回的队列
String myCorrelationId = delivery.getProperties().getCorrelationId(); //获取该消息的correlationId,并将其返回,以标识对那个request进行response
//发送response
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
AMQP.BasicProperties props = builder.correlationId(myCorrelationId).build();
channel.basicPublish("", callbackQueueName, props, "work out.".getBytes(StandardCharsets.UTF_8));
});
channel.basicConsume("RPCQueue", deliverCallback, consumerTag -> {});



