准备工作:
1、已经搭建好的RabbitMq服务器
2、idea创建一个空的maven项目并引入依赖
简单模式com.rabbitmq amqp-client 5.8.0 commons-io commons-io 2.6
一、生产者
1、获取连接服务器的信道 channel
ConnectionFactory factory=new ConnectionFactory();
factory.setHost("x.x.x.x");
factory.setUsername("xxx");
factory.setPassword("xxxx");
Connection connection=factory.newConnection();
Channel channel=connection.createChannel();
2、声明队列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
3、发送消息
String message="Hello World";
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
二、消费者
1、获取连接服务器的信道 channel
2、获取消息(参数需要队列名)
channel.basicConsume(参数1,参数2,deliverCallback,cancelCallback);
成功消费消息回调
DeliverCallback deliverCallback=(consumerTag,delivery)->{
//处理数据逻辑,如下
String message=new String(delivery.getBody());
System.out.println("接收到消息"+message);
};
消费消息失败回调
CancelCallback cancelCallback=(consumerTag)-> {
System.out.println("消费消息中断");
}
工作队列
一个消息生产者,多个消息消费者,默认是轮训分发消息,比如有1,2,3,4,5,6这六条消息,有两个消费者,则消费者1 会得到1,3,5消息,消费者2会得到2,4,6消息;
消费者——消息应答机制1、自动应答
//自动应答将参数2设为true,手动应答则设为false channel.basicConsume(参数1,参数2,deliverCallback,cancelCallback);
优点:每消费一条消息就应答一次,保证了数据传输的安全性
缺点:一对一应答,如果消息堆积,会影响数据传输效率,限制了数据的高吞吐量
所以这种模式仅适用在消费者可以高效以某种速率能够处理这些消息的情况下使用
2、手动应答
三种应答方法
(1)Channel.basicAck(long deliveryTag, boolean multiple)
用于肯定确认,调用该方法就表示mq服务器知道该消息已经被消费者成功处理了,可以将该消息删除了
一般使用在回调函数deldeliverCallback中
(2)Channel.basicNack(long deliveryTag, boolean multiple, boolean requeue)
用于否定确认,调用该方法就表示mq服务器知道该消息并没有被消费者处理
(3)Channel.basicReject(long deliveryTag, boolean requeue)
用于否定确认,同上
参数解释:
deliveryTag-> 消息标记;
multiple->是否批量确认截止该消息及之前的所有消息;
requeue->是否让该消息重新入队排列,待会再接收,或者让其他消费者消费;
3、应答可以让消息自动重新入队,如果服务器未接收到ack确认,服务器会对该消息进行重新排队,如果其他消费者可以处理,它将很快将其重新分发给另一个消费者,这样,即使某个消费者偶尔死亡,也可以确保不会丢失任何消息。
RabbitMQ持久化(生产者设置)
1、队列持久化
//将参数durable设为true channel.queueDeclare(queue,durable,exclusive,autoDelete,arguments);
2、消息持久化
//将参数props设为MessageProperties.PERSISTENT_TEXT_PLAIN basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)不公平分发
将参数设为1
channel.basicQos(1);
意思就是如果这个任务我还没有处理完或者我还没有答应你,你就先别分配给我,我目前只能处理一个任务,然后rabbitMq就会把任务分配给没有那么忙的那个空闲消费者,但是如果所有的消费者都没有完成手上任务,队列还在不停的添加新任务,队列有可能就会遇到队列被撑满的情况,这个时候就只能添加新的worker或者改变其他任务的策略。
预取值channel.basicQos(int prefetch);
prefetch可以定义通道上允许的未确认消息的最大数量,一旦数量达到配置的数量,rabbitmq将不会再发消息给这条信道末端的消费者,除非至少有一个未处理的消息被确认。消息应答和Qos预取值对用户吞吐量有重大影响,通常,增加预取值将提高向消费者传递消息的速度。虽然自动应答传输消息速率是最佳的,但是,在这种情况下已传递但尚未处理的消息的数量也会增加,从而增加了消费者的RAM消耗(随机存取存储器)应该小心使用具有无限预处理的自动确认模式或手动确认模式,消费者消费了大量的消息如果没有确认的话,会导致消费者连接节点的内存消耗变大,所以找到合适的预取值是一个反复试验的过程,不同的负载该值取值也不同 100 到 300 范围内的值通常可提供最佳的吞吐量,并且不会给消费者带来太大的风险。预取值为 1 是最保守的。当然这将使吞吐量变得很低,特别是消费者连接延迟很严重的情况下,特别是在消费者连接等待时间较长的环境中。对于大多数应用来说,稍微高一点的值将是最佳的。
1、开启确认发布
Channel channel=connection.createChannel(); channel./confirm/iSelect();
2、确认发布形式
(1)单个确认发布
属于同步确认发布的方式,也就是发布一个消息之后只有它被确认发布,后续的消息才能继续发布,channel.waitForConfirms(long)这个方法只有在消息被确认的时候才返回,如果在指定时间范围内这个消息没有被确认那么它将抛出异常
缺点:发布速度特别的慢,因为如果没有确认发布的消息就会阻塞所有后续消息的发布,这种方式最多提供每秒不超过数百条发布消息的吞吐量。
优点:确保了数据传输的安全性
(2)批量确认发布
也属于同步确认发布的方式,也会有阻塞消息发布风险
缺点:当发生故障导致发布出现问题时,不知道是哪个消息出现问题了,我们必须将整个批处理保存在内存中,以记录重要的信息而后重新发布消息
优点:与单个确认发布相比,先发布一批消息后一起确认可以极大的提高吞吐量
(3)异步确认发布
channel.add/confirm/iListener((deliveryTag,multiple)->{
//确认收到消息的处理逻辑,deliveryTag->消息标签
},(deliveryTag,multiple)->{
//未确认收到消息的处理逻辑
});
逻辑上比同步确认方式复杂,但是性价比最高,无论是可靠性还是效率都可以,他是利用回调函数来达到消息可靠性传递的。
如何处理异步未确认消息
最好的解决的解决方案就是把未确认的消息放到一个基于内存的能被发布线程访问的队列,
比如说用 ConcurrentlinkedQueue 这个队列在 confirm callbacks 与发布线程之间进行消息的传递。
3、确认速度对比
单独发布消息
同步等待确认,简单,但吞吐量非常有限。
批量发布消息
批量同步等待确认,简单,合理的吞吐量,一旦出现问题但很难推断出是那条
消息出现了问题。
异步处理:
最佳性能和资源使用,在出现错误的情况下可以很好地控制,但是实现起来稍微难些
1、定义
RabbitMQ 消息传递模型的核心思想是: 生产者生产的消息从不会直接发送到队列。实际上,通常生产
者甚至都不知道这些消息传递传递到了哪些队列中。
相反,生产者只能将消息发送到交换机(exchange),交换机工作的内容非常简单,一方面它接收来
自生产者的消息,另一方面将它们推入队列。交换机必须确切知道如何处理收到的消息。是应该把这些消
息放到特定队列还是说把他们到许多队列中还是说应该丢弃它们。这就的由交换机的类型来决定。
2、类型
直接(direct), 主题(topic) ,标题(headers) , 扇出(fanout)
未完待续…
如有不妥之处,欢迎补充及修改
如有疑问,欢迎下方提问
——主要内容来自于尚硅谷RabbitMq视频课件



