导入依赖
org.springframework.boot spring-boot-starter org.springframework.boot spring-boot-starter-test test org.springframework.boot spring-boot-starter-web org.projectlombok lombok com.fasterxml.jackson.core jackson-databind 2.9.9 org.springframework.boot spring-boot-starter-amqp
消息消费者
配置文件
spring:
rabbitmq:
host: 192.168.200.130 # ip
port: 5672
username: admin
password: admin
virtual-host: /itcast
listener:
type: simple
simple:
prefetch: 1 #消费者每次从队列获取的消息数量
concurrency: 2 #消费者数量
max-concurrency: # 启动消费者最大数量
server:
port: 8001
生产消息
@Test
public void testSend(){
for (int i=0;i<10;i++){
rabbitTemplate.convertAndSend(RabbitMQConfig1.QUEUE_NAME,"这是普通模式");
}
}
接收消息
@Component
public class RabbimtMQListener {
@RabbitListener(queues = "boot_queue")
public void ListenerQueue(Message message){
System.out.println("消费者1"+new String(message.getBody())+System.currentTimeMillis());
模拟处理需要1秒
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
结果打印
配置文件
spring:
rabbitmq:
host: 192.168.200.130 # ip
port: 5672
username: admin
password: admin
virtual-host: /itcast
listener:
type: simple
simple:
prefetch: 1 #消费者每次从队列获取的消息数量
concurrency: 2 #消费者数量
max-concurrency: # 启动消费者最大数量
publisher-/confirm/is: true #确认消息已发送到交换机(Exchange) 可以把publisher-/confirm/is: true 替换为 publisher-/confirm/i-type: correlated
publisher-returns: true #确认消息已发送到队列(Queue)
server:
port: 8001
配置文件
@Configuration
public class RabbitMQConfig1 {
public static final String QUEUE_NAME = "boot_queue";
@Bean
public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate();
rabbitTemplate.setConnectionFactory(connectionFactory);
//设置消息投递失败的策略,有两种策略:自动删除或返回到客户端。
//我们既然要做可靠性,当然是设置为返回到客户端(true是返回客户端,false是自动删除)
rabbitTemplate.setMandatory(true);
//消息发送到交换机,是否成功收到消息,true成功,false失败
rabbitTemplate.set/confirm/iCallback(new RabbitTemplate./confirm/iCallback() {
@Override
public void /confirm/i(CorrelationData correlationData, boolean ack, String cause) {
System.out.println();
System.out.println("相关数据:" + correlationData);
if (ack) {
System.out.println("投递成功,确认情况:" + ack);
} else {
System.out.println("投递失败,确认情况:" + ack);
System.out.println("原因:" + cause);
}
}
});
//消息发送到交换机,在有交换机发送到队列失败,才会执行
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
System.out.println();
System.out.println("ReturnCallback: " + "消息:" + message);
System.out.println("ReturnCallback: " + "回应码:" + replyCode);
System.out.println("ReturnCallback: " + "回应信息:" + replyText);
System.out.println("ReturnCallback: " + "交换机:" + exchange);
System.out.println("ReturnCallback: " + "路由键:" + routingKey);
System.out.println();
}
});
return rabbitTemplate;
}
}
消费端手动确认
@Component
public class RabbimtMQListener3 {
@RabbitListener(queues = "boot_queue")
public void onMessage(Message message, Channel channel) throws Exception {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
System.out.println(message.toString());
// int i=1/0;
channel.basicAck(deliveryTag,true); //手动确认
System.out.println("手动确认");
} catch (IOException e) {
//拒绝签收
//第三个参数,重回队列重新发送
channel.basicNack(deliveryTag,true,true);
System.out.println("拒绝签收");
}
}
}



