创建RabbitMQ的消费方
第一步:首先创建消费者spring boot项目,然后引入maven
org.springframework.boot spring-boot-starter-amqp
第二步:同理创建mq的配置
具体详细配置可上官网查看
spring.application.name=consumer #启动端口 server.port=8082 spring.rabbitmq.host=127.0.0.1 spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest #虚拟空间地址 spring.rabbitmq.virtual-host=/ #是否开启消费者重试(为false时关闭消费者重试,这时消费端代码异常会一直重复收到消息) spring.rabbitmq.listener.simple.retry.enabled=true #最大重试次数 spring.rabbitmq.listener.simple.retry.max-attempts=5 #重试间隔时间(单位毫秒) spring.rabbitmq.listener.simple.retry.initial-interval=2000 #是否开启ack spring.rabbitmq.listener.simple.acknowledge-mode=manual #重试次数超过上面的设置之后是否丢弃(false不丢弃时需要写相应代码将该消息加入死信队列) spring.rabbitmq.listener.simple.default-requeue-rejected=false
第三步:编写消息的消费者,这一步也是最复杂的,因为可以编写出很多不同的需求出来,写法也有很多的不同。比如一个生产者,一个消费者
@RabbitHandler
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "spring.test.queue", durable = "true"),
exchange = @Exchange(
value = "fanoutExchange",
ignoreDeclarationExceptions = "true",
type = ExchangeTypes.TOPIC
),
key = {"127.0.0.1"}))
public void receive(Message message, @Headers Map headers, Channel channel) throws Exception{
// 获取消息Id id唯一由提供者创建
String messageId = message.getMessageProperties().getMessageId();
String msg = new String(message.getBody(), "UTF-8");
System.out.println("邮件消费者获取生产者消息" + "messageId:" + messageId + ",消息内容:" + msg);
JSonObject jsonObject = JSONObject.parseObject(msg);
// 获取email参数
String email = jsonObject.getString("email");
System.out.println(email);
// 开启ack后 手动ack
// Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
// 手动签收
//channel.basicAck(deliveryTag, false);
System.out.println("执行结束....");
}
当提供者发出消息后,消费者会监听到的同时获取消息,执行方法。
消息模式有多种:
work模型
发布订阅模型
topic模型
/confirm/i机制
return机制
TTL队列、死信队列



