一:RabbitMq初始连接
1.创建maven工程在pom中导入
com.rabbitmq amqp-client 3.6.5
1.创建生成者
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Producer {
//队列名称
private final static String QUEUE_NAME = "rabbitmq-test";
public static void main(String[] argv) throws Exception {
//1.创建一个ConnectionFactory连接工厂connectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
//2.通过connectionFactory设置RabbitMQ所在IP等信息
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672); //指定端口
connectionFactory.setUsername("guest");//用户名
connectionFactory.setPassword("guest");//密码
//3.通过connectionFactory创建一个连接connection
Connection connection = connectionFactory.newConnection();
//4.通过connection创建一个频道channel
Channel channel = connection.createChannel();
//5.通过channel指定一个队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//发送的消息
String message = "hello world!";
//6.通过channel向队列中添加消息,第一个参数是转发器,使用空的转发器(默认的转发器,类型是direct)
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println("向" + QUEUE_NAME + "中添加了一条消息:" + message);
//7.关闭频道
channel.close();
//8.关闭连接
connection.close();
}
}
2.创建消费者
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;
public class Customer {
//队列名称
private final static String QUEUE_NAME = "rabbitmq-test";
public static void main(String[] argv) throws Exception {
//1.创建一个ConnectionFactory连接工厂connectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
//2.通过connectionFactory设置RabbitMQ所在IP等信息
connectionFactory.setHost("127.0.0.1");
//3.通过connectionFactory创建一个连接connection
Connection connection = connectionFactory.newConnection();
//4.通过connection创建一个频道channel
Channel channel = connection.createChannel();
//5.通过channel指定队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//与发送消息不同的地方
//6.创建一个消费者队列consumer,并指定channel
QueueingConsumer consumer = new QueueingConsumer(channel);
//7.为channel指定消费者
channel.basicConsume(QUEUE_NAME, true, consumer);
while (true) {
//从consumer中获取队列中的消息,nextDelivery是一个阻塞方法,如果队列中无内容,则等待
Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println("接收到了" + QUEUE_NAME + "中的消息:" + message);
}
}
}
二.自动应答与手动应答
此配置在消费端进行配置
Channel.basicAck(用于肯定确认)
Channel.basicNack(用于否定确认)
Channel.basicReject(用于否定确认)
手动应答的优势:可以批量应答并且减少网络拥堵
public class Customer1 {
//队列名称
private final static String QUEUE_NAME = "rabbitmq-ack";
public static void main(String[] args) throws Exception {
Channel channel = ConnectionMq.getChannel();
System.out.println("Customer1 等待接收消息处理时间较短");
//消息消费的时候如何处理消息
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody());
SleepUtils.sleep(1);
System.out.println("接收到消息:" + message);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
};
// //不公平分发
// int perfetchCount = 1;
//预期值
int perfetchCount = 2;
channel.basicQos(perfetchCount);
//采用手动应答
boolean autoAck = false;
String s = channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, (consumerTag) -> {
System.out.println(consumerTag + "消费者取消消费接口回调逻辑");
});
}
}
并且在channel.basicConsume()配置手动应答
三.不公平分发与预期值
总结要做到持久化需要做的以下几点
1.队列持久化(服务端)
2.消息持久化(服务端)
3.发布确认(消费端)
包含:单个确认,批量确认,异步批量确认
//批量个数
public static final int count=10000;
//单个确认
public static void publishMessageIndividually() throws Exception {
try (Channel channel = ConnectionMq.getChannel()) {
String queueName = UUID.randomUUID().toString();
channel.queueDeclare(queueName, false, false, false, null);
//开启发布确认
channel./confirm/iSelect();
long begin = System.currentTimeMillis();
for (int i = 0; i < count; i++) {
String message = i + "";
channel.basicPublish("", queueName, null, message.getBytes());
//服务端返回 false 或超时时间内未返回,生产者可以消息重发
boolean flag = channel.waitFor/confirm/is();
if(flag){
System.out.println("消息发送成功");
}
}
long end = System.currentTimeMillis();
System.out.println("发布" + count + "个单独确认消息,耗时" + (end - begin) +
"ms");
}
}
//批量确认
public static void publishMessageBatch() throws Exception {
try (Channel channel = ConnectionMq.getChannel()) {
String queueName = UUID.randomUUID().toString();
channel.queueDeclare(queueName, false, false, false, null);
//开启发布确认
channel./confirm/iSelect();
//批量确认消息大小
int batchSize = 100;
//未确认消息个数
int outstandingMessageCount = 0;
long begin = System.currentTimeMillis();
for (int i = 0; i < count; i++) {
String message = i + "";
channel.basicPublish("", queueName, null, message.getBytes());
outstandingMessageCount++;
if (outstandingMessageCount == batchSize) {
channel.waitFor/confirm/is();
outstandingMessageCount = 0;
}
}
//为了确保还有剩余没有确认消息 再次确认
if (outstandingMessageCount > 0) {
channel.waitFor/confirm/is();
}
long end = System.currentTimeMillis();
System.out.println("发布" + count + "个批量确认消息,耗时" + (end - begin) +
"ms");
}
}
//异步批量确认
public static void publishMessageAsync() throws Exception {
try (Channel channel = ConnectionMq.getChannel()) {
String queueName = UUID.randomUUID().toString();
channel.queueDeclare(queueName, false, false, false, null);
//开启发布确认
channel./confirm/iSelect();
ConcurrentSkipListMap outstandingConfirms = new
ConcurrentSkipListMap<>();
/confirm/iCallback ackCallback = (sequenceNumber, multiple) -> {
if (multiple) {
//返回的是小于等于当前序列号的未确认消息 是一个 map
ConcurrentNavigableMap confirmed =
outstanding/confirm/is.headMap(sequenceNumber, true);
//清除该部分未确认消息
/confirm/ied.clear();
}else{
//只清除当前序列号的消息
outstanding/confirm/is.remove(sequenceNumber);
}
};
/confirm/iCallback nackCallback = (sequenceNumber, multiple) -> {
String message = outstanding/confirm/is.get(sequenceNumber);
System.out.println("发布的消息"+message+"未被确认,序列号"+sequenceNumber);
};
channel.add/confirm/iListener(ackCallback, null);
long begin = System.currentTimeMillis();
for (int i = 0; i < count; i++) {
String message = "消息" + i;
outstanding/confirm/is.put(channel.getNextPublishSeqNo(), message);
channel.basicPublish("", queueName, null, message.getBytes());
}
long end = System.currentTimeMillis();
System.out.println("发布" + count + "个异步确认消息,耗时" + (end - begin) + "ms");
}
}
链接: 不定时进行更新



