栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

RabbitMQ java API

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

RabbitMQ java API

RabbitMQ java API 队列持久化
# 生成一个队列 1、队列名称 2、队列里面的消息是否进行持久化 3、是否共享消息 4、是否自动删除 5、其他高级参
channel.queueDeclare(QUEUE_NAME, true, false, false, null);

消息持久化
# 发送一个消息 1、发送到哪个交换机 2、路由的key 3、其他的参数信息 4、发送消息内容
# 参数3:设置消息持久化
channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes("UTF-8"));
不公平分发(能力越大责任越大,多劳多得)
# 消费者代码处增加
channel.basicQos(1);

预期值
# 消费者代码处增加
channel.basicQos(3);

消息不丢失条件

1、队列持久化;
2、消息持久化;
3、发布确认;
3.1、单个确认发布;(1000条,耗时722ms)慢
3.2、批量发布确认;(1000条,耗时147ms)快,出问题后不知道是哪个,
3.3、异步发布确认;(1000条,耗时62ms发完)性价比最高,复杂

发布确认 开启发布确认
channel.confirmSelect();// 开启发布确认
单个确认发布
# 发一条,及时确认一条
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
channel.waitForConfirms();// 确认
批量确认发布
# 发多条后,再确认。比如10个一组或100个一组,然后确认一次
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
......
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
channel.waitForConfirms();// 确认
异步确认发布

发送消息前,增加监听

// 成功
ConfirmCallback ackCallback = (deliveryTag, multiple) -> {
	System.out.println("成功的消息标记:" + deliveryTag);
};

// 失败
ConfirmCallback nackCallback = (deliveryTag, multiple) -> {
	System.out.println("失败的消息标记:" + deliveryTag);
};

channel.addConfirmListener(ackCallback, ackCallback);


channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
channel.waitForConfirms();// 确认
失败消息处理
ConcurrentSkipListMap outStandingConfirms = new ConcurrentSkipListMap<>();

// 成功
ConfirmCallback ackCallback = (deliveryTag, multiple) -> {
	if(multiple) {
		ConcurrentNavigableMap confirmed = outStanding/confirm/is.headMap(deliveryTag);
		/confirm/ied.clear();
	}else {
		outStanding/confirm/is.remove(deliveryTag);
	}
	
	System.out.println("成功的消息标记:" + deliveryTag);
};

// 失败
ConfirmCallback nackCallback = (deliveryTag, multiple) -> {
	System.out.println("失败的消息标记:" + deliveryTag);
};

channel.addConfirmListener(ackCallback, ackCallback);


channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
outStanding/confirm/is.put(channel.getNextPublishSeqNo(), msg);
channel.waitForConfirms();// 确认
System.out.println("消息发送完毕");
广播/订阅(fanout-扇出交换机)

订阅者

import com.asia.tip.utils.RabbitMqUtils;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

public class ReceiveLog1 {

	public static final String EXCHANGE_NAME = "logs";

	public static void main(String[] args) throws Exception {
		Channel channel = RabbitMqUtils.getChannel();
		channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

		// 生成随机队列名称,消费完成后自动删除
		String queueName = channel.queueDeclare().getQueue();

		// 绑定队列到交换机上,routingkey为空字符串
		channel.queueBind(queueName, EXCHANGE_NAME, "");

		DeliverCallback deliverCallback = (consumerTag, message) -> {
			String msg = new String(message.getBody(), "UTF-8");
			System.out.println("ReceiveLog1接收到的消息:" + msg);
		};
		CancelCallback cancelCallback = consumerTag -> {

		};
		channel.basicConsume(queueName, true, deliverCallback, cancelCallback);
		System.out.println("ReceiveLog1准备就绪:");
	}
}

生产者

import com.asia.tip.utils.RabbitMqUtils;
import com.rabbitmq.client.Channel;

public class EmitLog {

	public static void main(String[] args) throws Exception {
		Channel channel = RabbitMqUtils.getChannel();
		channel.exchangeDeclare(ReceiveLog1.EXCHANGE_NAME, "fanout");
		String msg = "世界你好!";
		channel.basicPublish(ReceiveLog1.EXCHANGE_NAME, "", null, msg.getBytes());
		System.out.println("消息发送完成");
	}

}
路由模式(direct-直接交换机)

消费者1

import com.asia.tip.utils.RabbitMqUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

public class ReceiveLogDirect1 {

	public static final String EXCHANGE_NAME = "direct_logs";

	public static void main(String[] args) throws Exception {
		Channel channel = RabbitMqUtils.getChannel();
		channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);

		String queueName = "console";

		// 声明一个队列
		channel.queueDeclare(queueName, false, false, false, null);
		channel.queueBind(queueName, EXCHANGE_NAME, "info");
		channel.queueBind(queueName, EXCHANGE_NAME, "warning");

		DeliverCallback deliverCallback = (consumerTag, message) -> {
			String msg = new String(message.getBody(), "UTF-8");
			System.out.println("console接收到的消息:" + msg);
		};
		CancelCallback cancelCallback = consumerTag -> {

		};
		channel.basicConsume(queueName, true, deliverCallback, cancelCallback);
		System.out.println("console准备就绪:");
	}

}

消费者2

import com.asia.tip.utils.RabbitMqUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

public class ReceiveLogDirect2 {

	public static final String EXCHANGE_NAME = "direct_logs";

	public static void main(String[] args) throws Exception {
		Channel channel = RabbitMqUtils.getChannel();
		channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);

		String queueName = "console";

		// 声明一个队列
		channel.queueDeclare(queueName, false, false, false, null);
		channel.queueBind(queueName, EXCHANGE_NAME, "error");

		DeliverCallback deliverCallback = (consumerTag, message) -> {
			String msg = new String(message.getBody(), "UTF-8");
			System.out.println("disk接收到的消息:" + msg);
		};
		CancelCallback cancelCallback = consumerTag -> {

		};
		channel.basicConsume(queueName, true, deliverCallback, cancelCallback);
		System.out.println("disk准备就绪:");
	}

}

生产者

import com.asia.tip.utils.RabbitMqUtils;
import com.rabbitmq.client.Channel;

public class EmitLog {

	public static void main(String[] args) throws Exception {
		Channel channel = RabbitMqUtils.getChannel();
		channel.exchangeDeclare(ReceiveLogDirect1.EXCHANGE_NAME, "direct");
		String msg = "世界你好aaaaaaaaaaaaaaaaaaaa-warning!";
		channel.basicPublish(ReceiveLogDirect1.EXCHANGE_NAME, "warning", null, msg.getBytes());
		System.out.println("消息发送完成");
	}

}

(Topics-主题交换机)(可包括:fanout、direct)

routing_key格式要求:
1、必须是一个单词列表,以点号分割开;
2、比如:hao.ok.very;
3、* 可以代替一个单词;
4、# 可以代替零个或多个单词;
5、若只有 # ,则表示所有记录,相当于:fanout;
6、若没有 * 和 # ,则表示绑定具体队列,相当于:direct;
消费者1

import com.asia.tip.utils.RabbitMqUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

public class C1 {

	public static final String EXCHANGE_NAME = "topic_logs";

	public static void main(String[] args) throws Exception {
		Channel channel = RabbitMqUtils.getChannel();
		channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);

		String queueName = "Q1";
		channel.queueDeclare(queueName, false, false, false, null);
		channel.queueBind(queueName, EXCHANGE_NAME, "*.orange.*");

		DeliverCallback deliverCallback = (consumerTag, message) -> {
			String msg = new String(message.getBody(), "UTF-8");
			System.out.println("C1接收到的消息:" + msg+" 绑定键:"+message.getEnvelope().getRoutingKey());
		};
		CancelCallback cancelCallback = consumerTag -> {

		};

		channel.basicConsume(queueName, true, deliverCallback, cancelCallback);
		System.out.println("C1准备就绪:");
	}

}

消费者2

import com.asia.tip.utils.RabbitMqUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

public class C2 {

	public static final String EXCHANGE_NAME = "topic_logs";

	public static void main(String[] args) throws Exception {
		Channel channel = RabbitMqUtils.getChannel();
		channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);

		String queueName = "Q2";
		channel.queueDeclare(queueName, false, false, false, null);
		channel.queueBind(queueName, EXCHANGE_NAME, "*.*.rabbit");
		channel.queueBind(queueName, EXCHANGE_NAME, "lazy.#");

		DeliverCallback deliverCallback = (consumerTag, message) -> {
			String msg = new String(message.getBody(), "UTF-8");
			System.out.println("C2接收到的消息:" + msg + " 绑定键:" + message.getEnvelope().getRoutingKey());
		};
		CancelCallback cancelCallback = consumerTag -> {

		};

		channel.basicConsume(queueName, true, deliverCallback, cancelCallback);
		System.out.println("C2准备就绪:");
	}

}

生产者

import com.asia.tip.utils.RabbitMqUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;

public class P {

	public static final String EXCHANGE_NAME = "topic_logs";

	public static void main(String[] args) throws Exception {
		Channel channel = RabbitMqUtils.getChannel();
		channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);

		String msg = "topic模式测试走起";
		channel.basicPublish(EXCHANGE_NAME, "quick.orange.rabbit", null, msg.getBytes("UTF-8"));
		System.out.println("消息发送成功");
	}

}

死信队列

死信队列的3大来源:
1、超时;
2、队列满;
3、消息被拒绝;

TTL死信

消费者

import java.util.HashMap;
import java.util.Map;

import com.asia.tip.utils.RabbitMqUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

public class C1 {

	public static final String EXCHANGE_NORMAL_NAME = "normal_exchange";

	public static final String EXCHANGE_DEAD_NAME = "dead_exchange";

	public static final String QUEUE_NORMAL_NAME = "normal_queue";
	public static final String QUEUE_DEAD_NAME = "dead_queue";

	public static void main(String[] args) throws Exception {
		Channel channel = RabbitMqUtils.getChannel();

		channel.exchangeDeclare(EXCHANGE_NORMAL_NAME, BuiltinExchangeType.DIRECT);
		channel.exchangeDeclare(EXCHANGE_DEAD_NAME, BuiltinExchangeType.DIRECT);

		Map arguments = new HashMap();
		// 过期时间 10s=10000ms(一般过期时间在生产方控制,此处不写)
		// arguments.put("x-message-ttl",10000);
		// 过期后进入的死信交换机
		arguments.put("x-dead-letter-exchange", EXCHANGE_DEAD_NAME);
		// 设置死信routingkey
		arguments.put("x-dead-letter-routing-key", "lisi");
		channel.queueDeclare(QUEUE_NORMAL_NAME, false, false, false, arguments);

		channel.queueBind(QUEUE_NORMAL_NAME, EXCHANGE_NORMAL_NAME, "zhangsan");

		// 死信队列
		channel.queueDeclare(QUEUE_DEAD_NAME, false, false, false, null);
		channel.queueBind(QUEUE_DEAD_NAME, EXCHANGE_DEAD_NAME, "lisi");

		DeliverCallback deliverCallback = (consumerTag, message) -> {
			String msg = new String(message.getBody(), "UTF-8");
			System.out.println("C1接收到消息:" + msg);

		};
		channel.basicConsume(QUEUE_NORMAL_NAME, deliverCallback, consumerTag -> {
		});
	}

}

消费者2

import com.asia.tip.utils.RabbitMqUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

public class C2 {

	public static final String EXCHANGE_DEAD_NAME = "dead_exchange";

	public static final String QUEUE_DEAD_NAME = "dead_queue";

	public static void main(String[] args) throws Exception {
		Channel channel = RabbitMqUtils.getChannel();

		// 死信队列
		//channel.queueDeclare(QUEUE_DEAD_NAME, false, false, false, null);
		//channel.queueBind(QUEUE_DEAD_NAME, EXCHANGE_DEAD_NAME, "lisi");

		DeliverCallback deliverCallback = (consumerTag, message) -> {
			String msg = new String(message.getBody(), "UTF-8");
			System.out.println("C2接收到消息:" + msg);

		};
		channel.basicConsume(QUEUE_DEAD_NAME, deliverCallback, consumerTag -> {
		});
	}

}

生产者

import com.asia.tip.utils.RabbitMqUtils;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;

public class P {

	public static void main(String[] args) throws Exception {
		Channel channel = RabbitMqUtils.getChannel();

		channel.exchangeDeclare(C1.EXCHANGE_NORMAL_NAME, BuiltinExchangeType.DIRECT);
		String msg = "死信消息要来了";
		// 过期时间,单位毫秒,设置10s
		BasicProperties properties = new BasicProperties().builder().expiration("10000").build();
		channel.basicPublish(C1.EXCHANGE_NORMAL_NAME, "zhangsan", properties, msg.getBytes("UTF-8"));
		System.out.println("死信消息发送成功!");
	}

}

MAX死信
# C1增加代码
// 设置队列长度限制(队列里面超过6条时,则进入到死信队列)
arguments.put("x-max-length", 6);

# P 删除超时设置即可

消息被拒绝-死信

消费者增加以下代码进行拒绝

DeliverCallback deliverCallback = (consumerTag, message) -> {
	String msg = new String(message.getBody(), "UTF-8");
	if("拒绝条件".equals(msg)) {
		System.out.println("C2拒绝接收消息接收到消息:" + msg);
		channel.basicReject(message.getEnvelope().getDeliveryTag(), false);
	}else {
		System.out.println("C2接收消息接收到消息:" + msg);
		channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
	}
};
// 需要开启手动应答
channel.basicConsume(QUEUE_NORMAL_NAME,false, deliverCallback, consumerTag -> {
});
延迟队列(死信队列的一种)

使用场景:
1、订单在十分钟内未支付自动取消;
2、新创建的店铺,如果10天内都没上传过商品,则自动发送消息提醒;
3、新注册用户,如果3天内都没进行登录,则短信提醒;
4、用户发起退款,如果3天都没处理,则通知相关运营人员介入;
5、预定会议,在预定会议开始前10分钟发消息通知参会人员;
总结:虽然和定时任务非常相似。但特点是:数据量大,时效性强;

Springboot 实战


注意:QC队列,消息在排队的时候并非按时“死亡”
原因:RabbitMQ只会检查第一个消息是否过期,如果过期则丢到死信队列,如果第一条消息的延时时长很长,而第二条很短,第二条并不会优先得到执行。

插件实现延时队列

延时插件下载地址:rabbitmq_delayed_message_exchange


将插件放入插件目录:RabbitMQ Serverrabbitmq_server-3.9.13plugins
执行一下命令:

rabbitmq-plugins enable rabbitmq_delayed_message_exchange


重启服务

rabbitmq-service stop
rabbitmq-service start


队列延时与插件延时对比图

代码架构

延时队列其他选择:比如java的DelayQueue,redis的zset,Quartz等。

发布确认

回退消息

Mandatory

备份交换机


说明:备份交换机和回退消息同时使用时,备份交换机优先级高

其他知识点 消息重复消费

消费者在消费MQ中的消息时,MQ已把消息发送给消费者,消费者在给MQ返回ack时网路终端,故MQ未收到确认消息,该条消息会被重发给消费者,或者在网路重连后再次发送给该消费者,造成重复消费。
解决办法:
1、使用时间戳或UUID。
2、利用redis执行setnx命令,填入具有幂等性。

优先级队列


队列增加参数

Map params=new HashMap();
params.put("x-max-priority",10);// 建议10以内数字
channel.queueDeclare("hello",true,false,false,params);

消息发送参数

说明:消费者优先级的值,一定要小于上面设置的最大值10
AMQP.BasicProperties properties=new AMQP.BasicProperties().builder().priority(5).build();
channel.basicPublish("",QUEUE_NAME,properties,message.getBytes());

说明:只有MQ内有消息堆积时,排序才有效果,否则,及时消费,排不排序都一样。

惰性队列


在声明队列时增加

Map args=new HashMap();
args.put("x-queue-mode","lazy");
channel.queueDeclare("myqueue",false,false,false,args);

内存开销对比

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/769101.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号