生产者
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("39.106.2");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("test", true, false, false, null);//Server端的Queue持久化
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
builder.contentEncoding("UTF-8"); //编码集
builder.deliveryMode(2); //送消息设置发送模式 持久化模式
AMQP.BasicProperties build = builder.build();
for(int i=0; i < 10; i++){
JSonObject jo = new JSONObject();
jo.put("id",i+"");
jo.put("name","刘"+i);
jo.put("old",i+20+"");
//推消息
channel.basicPublish("", "test001", build, jo.toString().getBytes());
}
channel.close();
connection.close();
消费端,模拟报错,不报错情况下手动ACK,报错不ACK
//1 创建一个ConnectionFactory, 并进行配置
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("39.106.2");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
//2 通过连接工厂创建连接
Connection connection = connectionFactory.newConnection();
//3 通过connection创建一个Channel
Channel channel = connection.createChannel();
//4 声明(创建)一个队列
String queueName = "test";
// 参数:队列名称、持久化与否、独占与否、无消息队列是否自动删除、消息参数
channel.queueDeclare(queueName, true, false, false, null);
//5 创建消费者
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
//6 设置Channel
channel.basicConsume(queueName, false, queueingConsumer);
while(true){
//7 获取消息(Delivery:传送)
QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
String msg = new String(delivery.getBody());
JSonObject jsonObject = new JSONObject(msg);
String id = jsonObject.getString("id");
String name = jsonObject.getString("name");
String old = jsonObject.getString("old");
Boolean isTrue = true;
try {
//模拟报错
if(new Date().getTime() % 2 ==0){
int i = 1/0;
}
}catch (Exception e){
isTrue = false;
System.out.println("报错");
}
if(isTrue) {//不报错 输出控制台拿到的消息,手动ACK
System.err.println("消费端: id=" + id + " name=" + name + " old=" + old);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
消费了 3个,7个报错
查看web的RabbitMQ控制台,还有7个消息属于未ACK消息
切断消费者连接,7个未ACK消息回到Ready



