1 github:源码地址 2 环境-父工程(管理依赖版本)1.基本消息模型
2.消息确认机制(ack)
3.竞争消费者模式
4.0.0 org.springframework.boot spring-boot-starter-parent 2.5.0 com.yzm rabbitmq 0.0.1-SNAPSHOT pom rabbitmq Demo project for Spring Boot rabbitmq01 UTF-8 UTF-8 1.8 org.springframework.boot spring-boot-starter-web org.springframework.boot spring-boot-starter-test test org.springframework.boot spring-boot-starter-amqp org.projectlombok lombok org.apache.commons commons-lang3 com.alibaba fastjson 1.2.62 org.apache.maven.plugins maven-compiler-plugin ${java.version} ${java.version} ${project.build.sourceEncoding}
rabbitmq01 子工程
4.0.0 com.yzm rabbitmq 0.0.1-SNAPSHOT ../pom.xml rabbitmq01 0.0.1-SNAPSHOT jar rabbitmq01 Demo project for Spring Boot org.springframework.boot spring-boot-maven-plugin
项目结构
application.yml
spring:
rabbitmq:
port: 5672
host: 127.0.0.1
username: guest
password: guest
# listener:
# simple:
# acknowledge-mode: manual
# prefetch: 1
3 基本消息模型
开启定时器功能、创建队列hello-world
package com.yzm.rabbitmq01.config;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableScheduling;
@Configuration
@EnableScheduling
public class RabbitConfig {
public static final String HELLO_WORLD = "hello-world";
@Bean
public Queue helloQueue() {
return new Queue(HELLO_WORLD, true, false, false);
}
}
生产者定时生产消息
package com.yzm.rabbitmq01.service;
import com.yzm.rabbitmq01.config.RabbitConfig;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
@Component
public class HelloSenderService {
private final AmqpTemplate template;
private int count = 1;
public HelloSenderService(AmqpTemplate template) {
this.template = template;
}
// 项目启动后,过5秒开始第一个任务执行,之后每过1秒执行一次任务
@Scheduled(fixedDelay = 1000, initialDelay = 5000)
public void helloSend() {
if (count <= 10) {
String message = "Hello.........." + count++;
template.convertAndSend(RabbitConfig.HELLO_WORLD, message);
System.out.println(" [ 生产者 ] Sent ==> '" + message + "'");
}
}
}
消费者消费信息
package com.yzm.rabbitmq01.service;
import com.yzm.rabbitmq01.config.RabbitConfig;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@RabbitListener(queues = RabbitConfig.HELLO_WORLD)
public class HelloReceiverService {
@RabbitHandler
public void helloReceive(String message) {
System.out.println(" [ 消费者 ] Received ==> '" + message + "'");
}
}
4 消息确认机制(ack)启动项目,等待5秒时间,定时器启动,开始生产消息,控制台打印
同时RabbitMQ服务器上能看到hello-world队列正在允许
其中:
Ready:表示待消费数量;队列中拥有可以被消费者消费的消息数量
Unacked:表示待确认数量;对于队列来说,它只知道消费者在消费消息,在消费者未回复它之前,是不知道消息被消费完了没,所以就给该消息一个待确认状态;
Total:表示待消费数和待确认数的总和
上面的示例,由于生产者生产完消息就立即被消费者消费了,很难看出这三个值的变化
RabbitMQ默认的消息确认机制是:自动确认的 。
像上面的的示例,消费者只是消费了消息,并没有进行确认之类的操作。
现在将消息确认改为:手动确认。
在application.yml中
spring:
rabbitmq:
port: 5672
host: 127.0.0.1
username: guest
password: guest
listener:
simple:
acknowledge-mode: manual # 开启手动确认,默认是auto
# prefetch: 1
生产者、消费者不改,重启项目
消费者还是一样的消费消息(打印了),但服务器上显示的有10条消息未确认,因为消息是需要手动确认的,但我们消费者没确认。
如果我们停止项目,那么10条未确认的消息会回到Ready里面等待重新消费
不确认消息,那么消息会越来越多,再次重启项目
确认消息,修改消费者
package com.yzm.rabbitmq01.service;
import com.rabbitmq.client.Channel;
import com.yzm.rabbitmq01.config.RabbitConfig;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
@Component
//@RabbitListener(queues = RabbitConfig.HELLO_WORLD)
public class HelloReceiverService {
// @RabbitHandler
// public void helloReceive(String message) {
// System.out.println(" [ 消费者 ] Received ==> '" + message + "'");
// }
@RabbitListener(queues = RabbitConfig.HELLO_WORLD)
public void helloReceive(Message message, Channel channel) throws IOException {
System.out.println(" [ 消费者 ] 接收到消息 ==> '" + new String(message.getBody()));
// 确认消息
// 第一个参数,交付标签,相当于消息ID 64位的长整数
// 第二个参数,false表示仅确认提供的交付标签;true表示批量确认所有消息,包括提供的交付标签
//channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
重启项目,前两次积累的消息先被消费完,接着生产的也被消费,服务器上的队列消息全被消费了
拒绝消息,修改消费者
@RabbitListener(queues = RabbitConfig.HELLO_WORLD)
public void helloReceive(Message message, Channel channel) throws IOException {
System.out.println(" [ 消费者 ] 接收到消息 ==> '" + new String(message.getBody()));
// 确认消息
// 第一个参数,交付标签,相当于消息ID 64位的长整数
// 第二个参数,false表示仅确认提供的交付标签;true表示批量确认所有消息,包括提供的交付标签
//channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
// 拒绝消息方式一
// 第一个参数,交付标签
// 第二个参数,false表示仅拒绝提供的交付标签;true表示批量拒绝所有消息,包括提供的交付标签
// 第三个参数,false表示直接丢弃消息,true表示重新排队
//channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
// 拒绝消息方式二
// 第一个参数,交付标签
// 第二个参数,false表示直接丢弃消息,true表示重新排队
// 跟basicNack的区别就是始终只拒绝提供的交付标签
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
}
手动确认消息机制
未确认:什么也不用写,消息不会丢失,只会越来越多,重复消费
确认:确认后,消息从队列移除
拒绝:拒绝后,消息先从队列移除,然后可以选择重新排队,或者直接丢弃(丢弃还有一种选择,就是加入到死信队列中,用于追踪问题)
异常处理,修改消费者
package com.yzm.rabbitmq01.service;
import com.rabbitmq.client.Channel;
import com.yzm.rabbitmq01.config.RabbitConfig;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
@Component
//@RabbitListener(queues = RabbitConfig.HELLO_WORLD)
public class HelloReceiverService {
// @RabbitHandler
// public void helloReceive(String message) {
// System.out.println(" [ 消费者 ] Received ==> '" + message + "'");
// }
@RabbitListener(queues = RabbitConfig.HELLO_WORLD)
public void helloReceive(Message message, Channel channel) throws IOException {
System.out.println(" [ 消费者 ] 接收到消息 ==> '" + new String(message.getBody()));
// 制造异常
int i = 1 / 0;
System.out.println("成功处理了消息");
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
在手动确认之前,抛出异常
运行到异常的代码,抛出异常,下面的代码不再执行,这样就相当于未确认了
继续修改代码
@RabbitListener(queues = RabbitConfig.HELLO_WORLD)
public void helloReceive(Message message, Channel channel) throws IOException {
System.out.println(" [ 消费者 ] 接收到消息 ==> '" + new String(message.getBody()));
try {
// 制造异常
int i = 1 / 0;
System.out.println("成功处理了消息");
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
// 这里拒绝后,可以选择将异常消息发送到死信队列
System.out.println("有异常情况,将异常消息发送到死信队列,请尽快处理");
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
}
}
5 竞争消费者模式这种才是实际开发中的正常处理逻辑
创建队列work-queue
package com.yzm.rabbitmq01.config;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableScheduling;
@Configuration
@EnableScheduling
public class RabbitConfig {
public static final String HELLO_WORLD = "hello-world";
@Bean
public Queue helloQueue() {
return new Queue(HELLO_WORLD, true, false, false);
}
//------------------------------------------------------------------------------------------------------------------
public static final String WORK_QUEUE = "work-queue";
@Bean
public Queue workQueue() {
return new Queue(WORK_QUEUE);
}
}
生产者
package com.yzm.rabbitmq01.service;
import com.yzm.rabbitmq01.config.RabbitConfig;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.scheduling.annotation.Scheduled;
@Component
public class WorkSenderService {
private final AmqpTemplate template;
private int count = 1;
public WorkSenderService(AmqpTemplate template) {
this.template = template;
}
@Scheduled(fixedDelay = 500, initialDelay = 10000)
public void workSend() {
if (count <= 30) {
String message = "Hello.........." + count++;
template.convertAndSend(RabbitConfig.WORK_QUEUE, message);
System.out.println(" [ 生产者 ] Sent ==> '" + message + "'");
}
}
}
消费者
package com.yzm.rabbitmq01.service;
import com.yzm.rabbitmq01.config.RabbitConfig;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class WorkReceiverService {
private int count = 1;
private int count2 = 1;
@RabbitListener(queues = RabbitConfig.WORK_QUEUE)
public void workReceive(Message message, Channel channel) {
try {
System.out.println(" [ 消费者@1号 ] Received ==> '" + new String(message.getBody()) + "'");
Thread.sleep(1000);
System.out.println(" [ 消费者@1号 ] Dealt with:" + count++);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
e.printStackTrace();
}
}
@RabbitListener(queues = RabbitConfig.WORK_QUEUE)
public void workReceive2(Message message, Channel channel) {
try {
System.out.println(" [ 消费者@2号 ] Received ==> '" + new String(message.getBody()) + "'");
Thread.sleep(2000);
System.out.println(" [ 消费者@2号 ] Dealt with:" + count2++);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
e.printStackTrace();
}
}
}
为了不影响本次测试,关闭上次的生产者定时任务
重启项目,运行结果如下
我们有2个消费者1、2号,1号每秒消费一个消息,2号每两秒消费一个消息,也就是说1号处理能力是2号的2倍;但队列分配消息默认是平均分配的,这样就会导致有的消费者处理快了就有空闲时间,而我们想要尽快的处理掉消息,需要处理快的多处理一些。解决方法有两种:
第一种:设置prefetch参数
spring:
rabbitmq:
port: 5672
host: 127.0.0.1
username: guest
password: guest
listener:
simple:
acknowledge-mode: manual
prefetch: 1
重启项目,可以看到1号处理了20个,2号处理了10个
第二种:代码配置
修改配置类RabbitConfig
package com.yzm.rabbitmq01.config;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableScheduling;
@Configuration
@EnableScheduling
public class RabbitConfig {
public static final String HELLO_WORLD = "hello-world";
@Bean
public Queue helloQueue() {
return new Queue(HELLO_WORLD, true, false, false);
}
//------------------------------------------------------------------------------------------------------------------
public static final String WORK_QUEUE = "work-queue";
public static final String PREFETCH_ONE = "prefetchOne";
@Bean
public Queue workQueue() {
return new Queue(WORK_QUEUE);
}
@Bean(name = PREFETCH_ONE)
public RabbitListenerContainerFactory prefetchOne(ConnectionFactory rabbitConnectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(rabbitConnectionFactory);
factory.setPrefetchCount(1);
return factory;
}
}
修改消费者
@RabbitListener(queues = RabbitConfig.WORK_QUEUE, containerFactory = RabbitConfig.PREFETCH_ONE)
public void workReceive(Message message) {
try {
System.out.println(" [ 消费者@1号 ] Received ==> '" + new String(message.getBody()) + "'");
Thread.sleep(1000);
System.out.println(" [ 消费者@1号 ] Dealt with:" + count++);
} catch (Exception e) {
e.printStackTrace();
}
}
@RabbitListener(queues = RabbitConfig.WORK_QUEUE, containerFactory = RabbitConfig.PREFETCH_ONE)
public void workReceive2(Message message) {
try {
System.out.println(" [ 消费者@2号 ] Received ==> '" + new String(message.getBody()) + "'");
Thread.sleep(2000);
System.out.println(" [ 消费者@2号 ] Dealt with:" + count2++);
} catch (Exception e) {
e.printStackTrace();
}
}
注释掉第一种方法的 prefetch 参数,并开启自动确认机制(手动确认会报channel close错误)
重启项目,运行结果跟第一种是一样的



