package com.wang;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client./confirm/iCallback;
import com.wang.conf.Config;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
@Component
public class Producer {
private static final String QUEUE_NAME="hello";
public static void main(String[] args) throws IOException, InterruptedException {
Channel channel = Config.getChannel();
//处理并发的有序map集合,ConcurrentHashMap是无序的
ConcurrentSkipListMap map = new ConcurrentSkipListMap<>();
//开启发送确认模式
channel.confirmSelect();
ConfirmCallback confirmCallback =(deliveryTag, multiple )->{
System.out.println(deliveryTag+"success received");
//返回的消息有时候是批量确认的,批量确认需要判断!应该可以关闭批量确认的,但是不知道如何关闭
if (multiple){
ConcurrentNavigableMap confirmed = map.headMap(deliveryTag,true);
/confirm/ied.clear();
}else {
map.remove(deliveryTag);
}
if (map.size()==0){
System.out.println("success send all message");
}
};
ConfirmCallback /confirm/iCallback1 = (deliveryTag, multiple)->{
System.out.println(deliveryTag+"failed received");
};
channel.addConfirmListener(/confirm/iCallback,/confirm/iCallback1);
long begin = System.currentTimeMillis();
try {
channel.queueDeclare(QUEUE_NAME,true,false,false,null);
for (int i = 0; i < 100 ; i++) {
String mes = "hello world"+i;
channel.basicPublish("",QUEUE_NAME,null,mes.getBytes());
//channel.getNextPublishSeqNo()获得的是下一个发布的序列号,当前序列号需要减一!
map.put(channel.getNextPublishSeqNo()-1,mes);
}
long end = System.currentTimeMillis();
System.out.println("time:"+(begin-end));
} catch (IOException e) {
e.printStackTrace();
}
}
}
采用生产者创建监听器方式监听传来的异步确认消息



