# 生成一个队列 1、队列名称 2、队列里面的消息是否进行持久化 3、是否共享消息 4、是否自动删除 5、其他高级参 channel.queueDeclare(QUEUE_NAME, true, false, false, null);消息持久化
# 发送一个消息 1、发送到哪个交换机 2、路由的key 3、其他的参数信息 4、发送消息内容
# 参数3:设置消息持久化
channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes("UTF-8"));
不公平分发(能力越大责任越大,多劳多得)
# 消费者代码处增加 channel.basicQos(1);预期值
# 消费者代码处增加 channel.basicQos(3);消息不丢失条件
1、队列持久化;
2、消息持久化;
3、发布确认;
3.1、单个确认发布;(1000条,耗时722ms)慢
3.2、批量发布确认;(1000条,耗时147ms)快,出问题后不知道是哪个,
3.3、异步发布确认;(1000条,耗时62ms发完)性价比最高,复杂
channel.confirmSelect();// 开启发布确认单个确认发布
# 发一条,及时确认一条
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
channel.waitForConfirms();// 确认
批量确认发布
# 发多条后,再确认。比如10个一组或100个一组,然后确认一次
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
......
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
channel.waitForConfirms();// 确认
异步确认发布
发送消息前,增加监听
// 成功
ConfirmCallback ackCallback = (deliveryTag, multiple) -> {
System.out.println("成功的消息标记:" + deliveryTag);
};
// 失败
ConfirmCallback nackCallback = (deliveryTag, multiple) -> {
System.out.println("失败的消息标记:" + deliveryTag);
};
channel.addConfirmListener(ackCallback, ackCallback);
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
channel.waitForConfirms();// 确认
失败消息处理
ConcurrentSkipListMap广播/订阅(fanout-扇出交换机)outStandingConfirms = new ConcurrentSkipListMap<>(); // 成功 ConfirmCallback ackCallback = (deliveryTag, multiple) -> { if(multiple) { ConcurrentNavigableMap confirmed = outStanding/confirm/is.headMap(deliveryTag); /confirm/ied.clear(); }else { outStanding/confirm/is.remove(deliveryTag); } System.out.println("成功的消息标记:" + deliveryTag); }; // 失败 ConfirmCallback nackCallback = (deliveryTag, multiple) -> { System.out.println("失败的消息标记:" + deliveryTag); }; channel.addConfirmListener(ackCallback, ackCallback); channel.basicPublish("", QUEUE_NAME, null, msg.getBytes()); outStanding/confirm/is.put(channel.getNextPublishSeqNo(), msg); channel.waitForConfirms();// 确认 System.out.println("消息发送完毕");
订阅者
import com.asia.tip.utils.RabbitMqUtils;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
public class ReceiveLog1 {
public static final String EXCHANGE_NAME = "logs";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
// 生成随机队列名称,消费完成后自动删除
String queueName = channel.queueDeclare().getQueue();
// 绑定队列到交换机上,routingkey为空字符串
channel.queueBind(queueName, EXCHANGE_NAME, "");
DeliverCallback deliverCallback = (consumerTag, message) -> {
String msg = new String(message.getBody(), "UTF-8");
System.out.println("ReceiveLog1接收到的消息:" + msg);
};
CancelCallback cancelCallback = consumerTag -> {
};
channel.basicConsume(queueName, true, deliverCallback, cancelCallback);
System.out.println("ReceiveLog1准备就绪:");
}
}
生产者
import com.asia.tip.utils.RabbitMqUtils;
import com.rabbitmq.client.Channel;
public class EmitLog {
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(ReceiveLog1.EXCHANGE_NAME, "fanout");
String msg = "世界你好!";
channel.basicPublish(ReceiveLog1.EXCHANGE_NAME, "", null, msg.getBytes());
System.out.println("消息发送完成");
}
}
路由模式(direct-直接交换机)
消费者1
import com.asia.tip.utils.RabbitMqUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
public class ReceiveLogDirect1 {
public static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
String queueName = "console";
// 声明一个队列
channel.queueDeclare(queueName, false, false, false, null);
channel.queueBind(queueName, EXCHANGE_NAME, "info");
channel.queueBind(queueName, EXCHANGE_NAME, "warning");
DeliverCallback deliverCallback = (consumerTag, message) -> {
String msg = new String(message.getBody(), "UTF-8");
System.out.println("console接收到的消息:" + msg);
};
CancelCallback cancelCallback = consumerTag -> {
};
channel.basicConsume(queueName, true, deliverCallback, cancelCallback);
System.out.println("console准备就绪:");
}
}
消费者2
import com.asia.tip.utils.RabbitMqUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
public class ReceiveLogDirect2 {
public static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
String queueName = "console";
// 声明一个队列
channel.queueDeclare(queueName, false, false, false, null);
channel.queueBind(queueName, EXCHANGE_NAME, "error");
DeliverCallback deliverCallback = (consumerTag, message) -> {
String msg = new String(message.getBody(), "UTF-8");
System.out.println("disk接收到的消息:" + msg);
};
CancelCallback cancelCallback = consumerTag -> {
};
channel.basicConsume(queueName, true, deliverCallback, cancelCallback);
System.out.println("disk准备就绪:");
}
}
生产者
import com.asia.tip.utils.RabbitMqUtils;
import com.rabbitmq.client.Channel;
public class EmitLog {
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(ReceiveLogDirect1.EXCHANGE_NAME, "direct");
String msg = "世界你好aaaaaaaaaaaaaaaaaaaa-warning!";
channel.basicPublish(ReceiveLogDirect1.EXCHANGE_NAME, "warning", null, msg.getBytes());
System.out.println("消息发送完成");
}
}
(Topics-主题交换机)(可包括:fanout、direct)
routing_key格式要求:
1、必须是一个单词列表,以点号分割开;
2、比如:hao.ok.very;
3、* 可以代替一个单词;
4、# 可以代替零个或多个单词;
5、若只有 # ,则表示所有记录,相当于:fanout;
6、若没有 * 和 # ,则表示绑定具体队列,相当于:direct;
消费者1
import com.asia.tip.utils.RabbitMqUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
public class C1 {
public static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
String queueName = "Q1";
channel.queueDeclare(queueName, false, false, false, null);
channel.queueBind(queueName, EXCHANGE_NAME, "*.orange.*");
DeliverCallback deliverCallback = (consumerTag, message) -> {
String msg = new String(message.getBody(), "UTF-8");
System.out.println("C1接收到的消息:" + msg+" 绑定键:"+message.getEnvelope().getRoutingKey());
};
CancelCallback cancelCallback = consumerTag -> {
};
channel.basicConsume(queueName, true, deliverCallback, cancelCallback);
System.out.println("C1准备就绪:");
}
}
消费者2
import com.asia.tip.utils.RabbitMqUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
public class C2 {
public static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
String queueName = "Q2";
channel.queueDeclare(queueName, false, false, false, null);
channel.queueBind(queueName, EXCHANGE_NAME, "*.*.rabbit");
channel.queueBind(queueName, EXCHANGE_NAME, "lazy.#");
DeliverCallback deliverCallback = (consumerTag, message) -> {
String msg = new String(message.getBody(), "UTF-8");
System.out.println("C2接收到的消息:" + msg + " 绑定键:" + message.getEnvelope().getRoutingKey());
};
CancelCallback cancelCallback = consumerTag -> {
};
channel.basicConsume(queueName, true, deliverCallback, cancelCallback);
System.out.println("C2准备就绪:");
}
}
生产者
import com.asia.tip.utils.RabbitMqUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
public class P {
public static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
String msg = "topic模式测试走起";
channel.basicPublish(EXCHANGE_NAME, "quick.orange.rabbit", null, msg.getBytes("UTF-8"));
System.out.println("消息发送成功");
}
}
死信队列
死信队列的3大来源:
1、超时;
2、队列满;
3、消息被拒绝;
消费者
import java.util.HashMap;
import java.util.Map;
import com.asia.tip.utils.RabbitMqUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
public class C1 {
public static final String EXCHANGE_NORMAL_NAME = "normal_exchange";
public static final String EXCHANGE_DEAD_NAME = "dead_exchange";
public static final String QUEUE_NORMAL_NAME = "normal_queue";
public static final String QUEUE_DEAD_NAME = "dead_queue";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(EXCHANGE_NORMAL_NAME, BuiltinExchangeType.DIRECT);
channel.exchangeDeclare(EXCHANGE_DEAD_NAME, BuiltinExchangeType.DIRECT);
Map arguments = new HashMap();
// 过期时间 10s=10000ms(一般过期时间在生产方控制,此处不写)
// arguments.put("x-message-ttl",10000);
// 过期后进入的死信交换机
arguments.put("x-dead-letter-exchange", EXCHANGE_DEAD_NAME);
// 设置死信routingkey
arguments.put("x-dead-letter-routing-key", "lisi");
channel.queueDeclare(QUEUE_NORMAL_NAME, false, false, false, arguments);
channel.queueBind(QUEUE_NORMAL_NAME, EXCHANGE_NORMAL_NAME, "zhangsan");
// 死信队列
channel.queueDeclare(QUEUE_DEAD_NAME, false, false, false, null);
channel.queueBind(QUEUE_DEAD_NAME, EXCHANGE_DEAD_NAME, "lisi");
DeliverCallback deliverCallback = (consumerTag, message) -> {
String msg = new String(message.getBody(), "UTF-8");
System.out.println("C1接收到消息:" + msg);
};
channel.basicConsume(QUEUE_NORMAL_NAME, deliverCallback, consumerTag -> {
});
}
}
消费者2
import com.asia.tip.utils.RabbitMqUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
public class C2 {
public static final String EXCHANGE_DEAD_NAME = "dead_exchange";
public static final String QUEUE_DEAD_NAME = "dead_queue";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
// 死信队列
//channel.queueDeclare(QUEUE_DEAD_NAME, false, false, false, null);
//channel.queueBind(QUEUE_DEAD_NAME, EXCHANGE_DEAD_NAME, "lisi");
DeliverCallback deliverCallback = (consumerTag, message) -> {
String msg = new String(message.getBody(), "UTF-8");
System.out.println("C2接收到消息:" + msg);
};
channel.basicConsume(QUEUE_DEAD_NAME, deliverCallback, consumerTag -> {
});
}
}
生产者
import com.asia.tip.utils.RabbitMqUtils;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
public class P {
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(C1.EXCHANGE_NORMAL_NAME, BuiltinExchangeType.DIRECT);
String msg = "死信消息要来了";
// 过期时间,单位毫秒,设置10s
BasicProperties properties = new BasicProperties().builder().expiration("10000").build();
channel.basicPublish(C1.EXCHANGE_NORMAL_NAME, "zhangsan", properties, msg.getBytes("UTF-8"));
System.out.println("死信消息发送成功!");
}
}
MAX死信
# C1增加代码
// 设置队列长度限制(队列里面超过6条时,则进入到死信队列)
arguments.put("x-max-length", 6);
# P 删除超时设置即可
消息被拒绝-死信
消费者增加以下代码进行拒绝
DeliverCallback deliverCallback = (consumerTag, message) -> {
String msg = new String(message.getBody(), "UTF-8");
if("拒绝条件".equals(msg)) {
System.out.println("C2拒绝接收消息接收到消息:" + msg);
channel.basicReject(message.getEnvelope().getDeliveryTag(), false);
}else {
System.out.println("C2接收消息接收到消息:" + msg);
channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
}
};
// 需要开启手动应答
channel.basicConsume(QUEUE_NORMAL_NAME,false, deliverCallback, consumerTag -> {
});
延迟队列(死信队列的一种)
使用场景:
1、订单在十分钟内未支付自动取消;
2、新创建的店铺,如果10天内都没上传过商品,则自动发送消息提醒;
3、新注册用户,如果3天内都没进行登录,则短信提醒;
4、用户发起退款,如果3天都没处理,则通知相关运营人员介入;
5、预定会议,在预定会议开始前10分钟发消息通知参会人员;
总结:虽然和定时任务非常相似。但特点是:数据量大,时效性强;
注意:QC队列,消息在排队的时候并非按时“死亡”
原因:RabbitMQ只会检查第一个消息是否过期,如果过期则丢到死信队列,如果第一条消息的延时时长很长,而第二条很短,第二条并不会优先得到执行。
延时插件下载地址:rabbitmq_delayed_message_exchange
将插件放入插件目录:RabbitMQ Serverrabbitmq_server-3.9.13plugins
执行一下命令:
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
重启服务
rabbitmq-service stop rabbitmq-service start
队列延时与插件延时对比图
代码架构
延时队列其他选择:比如java的DelayQueue,redis的zset,Quartz等。
Mandatory
备份交换机
说明:备份交换机和回退消息同时使用时,备份交换机优先级高
消费者在消费MQ中的消息时,MQ已把消息发送给消费者,消费者在给MQ返回ack时网路终端,故MQ未收到确认消息,该条消息会被重发给消费者,或者在网路重连后再次发送给该消费者,造成重复消费。
解决办法:
1、使用时间戳或UUID。
2、利用redis执行setnx命令,填入具有幂等性。
队列增加参数
Mapparams=new HashMap(); params.put("x-max-priority",10);// 建议10以内数字 channel.queueDeclare("hello",true,false,false,params);
消息发送参数
说明:消费者优先级的值,一定要小于上面设置的最大值10
AMQP.BasicProperties properties=new AMQP.BasicProperties().builder().priority(5).build();
channel.basicPublish("",QUEUE_NAME,properties,message.getBytes());
说明:只有MQ内有消息堆积时,排序才有效果,否则,及时消费,排不排序都一样。
惰性队列
在声明队列时增加
Mapargs=new HashMap (); args.put("x-queue-mode","lazy"); channel.queueDeclare("myqueue",false,false,false,args);
内存开销对比



