yml配置:
server:
port: 8088
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
publisher-/confirm/i-type: correlated #消息确认方式,通过 correlated 来确认(将来的消息中才会带 correlation_id,只有通过 correlation_id 我们才能将发送的消息和返回值之间关联起来)
publisher-returns: true #开启发送失败退回
#1.开启 confirm 确认机制
maven依赖:
1.配置类:org.springframework.boot spring-boot-starter-amqp
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitConfig {
public static final String RPC_QUEUE1 = "queue_1"; //通道1
public static final String RPC_QUEUE2 = "queue_2"; //通道2
public static final String RPC_EXCHANGE = "rpc_exchange"; // 交换机
@Bean
Queue msgQueue() {
return new Queue(RPC_QUEUE1);
}
@Bean
Queue replyQueue() {
return new Queue(RPC_QUEUE2);
}
@Bean
TopicExchange exchange() {
return new TopicExchange(RPC_EXCHANGE);
}
@Bean
Binding msgBinding() {
return BindingBuilder.bind(msgQueue()).to(exchange()).with(RPC_QUEUE1);
}
@Bean
Binding replyBinding() {
return BindingBuilder.bind(replyQueue()).to(exchange()).with(RPC_QUEUE2);
}
@Bean
RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setReplyAddress(RPC_QUEUE2);
template.setReplyTimeout(6000);
return template;
}
@Bean
SimpleMessageListenerContainer replyContainer(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames(RPC_QUEUE2);
container.setMessageListener(rabbitTemplate(connectionFactory));
return container;
}
}
2.生产者
import com.yl.config.RabbitConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.HashMap;
@Slf4j
@RestController
public class RpcClientController {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/send")
public String send(String message) {
// 创建消息对象
Message newMessage = MessageBuilder.withBody(message.getBytes()).build();
log.info("生产者发送消息----->>>>>", newMessage);
//客户端发送消息
Message result = rabbitTemplate.sendAndReceive(RabbitConfig.RPC_EXCHANGE, RabbitConfig.RPC_QUEUE1, newMessage);
String response = "";
if (result != null) {
// 获取已发送的消息的 correlationId
String correlationId = newMessage.getMessageProperties().getCorrelationId();
log.info("生产者----->>>>>{}", correlationId);
// 获取响应头信息
HashMap headers = (HashMap) result.getMessageProperties().getHeaders();
// 获取 server 返回的消息 id
String msgId = (String) headers.get("spring_returned_message_correlation");
if (msgId.equals(correlationId)) {
response = new String(result.getBody());
log.info("生产者发送消息----->>>>>:{}", response);
}
}
return response;
}
}
3.消费者
import com.yl.config.RabbitConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.io.UnsupportedEncodingException;
@Slf4j
@Component
public class RpcServerController {
@Autowired
private RabbitTemplate rabbitTemplate;
@RabbitListener(queues = RabbitConfig.RPC_QUEUE1)
public void process(Message msg) throws UnsupportedEncodingException {
String message=new String(msg.getBody(),"UTF-8");
log.info("消费者消费消息的消息体:{}----->>>>>"+message);
Message response = MessageBuilder.withBody(("i'm receive:"+new String(msg.getBody())).getBytes()).build();
CorrelationData correlationData = new CorrelationData(msg.getMessageProperties().getCorrelationId());
rabbitTemplate.sendAndReceive(RabbitConfig.RPC_EXCHANGE, RabbitConfig.RPC_QUEUE2, response, correlationData);
}
}
postmain:http://localhost:8088/send?message=听闻广陵不知寒,大雪龙骑下江南
结果:
二.路由模式(路由--->交换机) yml配置:
server:
port: 8084
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
publisher-/confirm/is: true #1.开启 confirm 确认机制
publisher-returns: true #2.开启 return 确认机制
#3.设置开启Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数
template:
mandatory: true
maven配置:
1.RabbitMQ配置org.springframework.boot spring-boot-starter-amqp
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitQueueConfig {
public static final String QUEUE_NAME="rollback-queue"; //通道
public static final String EXCHANGE_NAME="rollback-exchange"; //交换机
public static final String ROUTINGKEY_NAME="rollback-routingkey"; //路由
@Bean
public Queue queue(){
return new Queue(QUEUE_NAME,true);
}
@Bean
public DirectExchange directExchange(){
return new DirectExchange(EXCHANGE_NAME);
}
@Bean
public Binding binding(){
return BindingBuilder.bind(queue()).to(directExchange())
.with(ROUTINGKEY_NAME);
}
}
2.重写RabbitTemplate,创建RabbitTemplateConfig配置类(手动确认消费)
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
@Slf4j
public class RabbitTemplateConfig {
//第二种方式
final RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
log.info("/confirm/iCallback,相关数据:{}", correlationData);
log.info("/confirm/iCallback,确认消息:{}", ack);
log.info("/confirm/iCallback,原因:{}", cause);
}
};
@Bean
public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory){
RabbitTemplate rabbitTemplate=new RabbitTemplate();
//设置连接工厂Bean
rabbitTemplate.setConnectionFactory(connectionFactory);
//手动开启
rabbitTemplate.setMandatory(true);
//设置传输数据是json格式
rabbitTemplate.setMessageConverter(jsonMessageConverter());
//流程:生产者-->交换机-->路由键-->队列
//ConfirmCallback
//流程:生产者-->交换机
//1)成功 触发回调
//2)失败 触发回调
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
log.info("/confirm/iCallback,相关数据:{}", correlationData);
log.info("/confirm/iCallback,确认消息:{}", ack);
log.info("/confirm/iCallback,原因:{}", cause);
}
});
//第二种方式
//rabbitTemplate.setConfirmCallback(/confirm/iCallback);
//ReturnCallback:该回调函数的触发器与mandatory: true参数有必要关系
//流程:交换机-->队列
//成功 不触发回调
//失败 触发回调
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
log.info("ReturnCallback,消息:{}", message);
log.info("ReturnCallback,回应码:{}", replyCode);
log.info("ReturnCallback,回应信息:{}", replyText);
log.info("ReturnCallback,交换机:{}", exchange);
log.info("ReturnCallback,路由键:{}", routingKey);
}
});
return rabbitTemplate;
}
@Bean
public Jackson2JsonMessageConverter jsonMessageConverter(){
return new Jackson2JsonMessageConverter();
}
}
3.消息监听指定具体的队列
import com.yl.controller.RabbitQueueReceiver;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class MessageListenerConfig {
@Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer(
ConnectionFactory connectionFactory,
RabbitQueueConfig rabbitQueueConfig,
RabbitQueueReceiver rabbitQueuReceiver){
SimpleMessageListenerContainer container=new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);//设置mq连接工厂对象
container.setConcurrentConsumers(1);//设置并发消费者
container.setMaxConcurrentConsumers(1);//设置最多的并发消费者
container.setAcknowledgeMode(AcknowledgeMode.MANUAL); // RabbitMQ默认是自动确认,这里改为手动确认消息
// container.setMessageConverter(jackson2JsonMessageConverter());
//注意:此处不能使用Autowired根据类型自动注入队列,必须调用rabbitmqDirectConfig.firstQueue()获得,why?
// 因为项目中可能存在多个队列,它们的类型都是Queue,自动注入会报错
container.setQueues(rabbitQueueConfig.queue());
container.setMessageListener(rabbitQueuReceiver);
return container;
}
@Bean
public Jackson2JsonMessageConverter jackson2JsonMessageConverter(){
return new Jackson2JsonMessageConverter();
}
}
4.消息生产者
import com.yl.common.Result;
import com.yl.common.SnowflakeIdWorker;
import com.yl.entity.Journal;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.time.LocalDateTime;
@RestController
@Slf4j
public class SendController {
final static SnowflakeIdWorker idWorker = new SnowflakeIdWorker(0, 0);
@Autowired
private RabbitTemplate rabbitTemplate;
@RequestMapping("send")
public Result send(){
String exchange="rollback-exchange";
String routingkey="rollback-routingkey";
log.info("生产者开始发送消息");
Journal journal = new Journal();
journal.setId(idWorker.nextId());
journal.setTitle("听闻广陵不知寒,大雪龙骑下江南");
// journal.setCreateTime(LocalDateTime.now());
journal.setTitleDesc("怒发冲冠⑵,凭阑处⑶、潇潇雨歇⑷。抬望眼,仰天长啸⑸,壮怀激烈⑹。三十功名尘与土⑺,八千里路云和月⑻。莫等闲⑼、白了少年头,空悲切⑽。 靖康耻⑾,犹未雪。臣子恨,何时灭。驾长车,踏破贺兰山缺⑿。壮志饥餐胡虏肉⒀,笑谈渴饮匈奴血⒁。待从头、收拾旧山河,朝天阙⒂。");
//注意:将消息推送到正常的交换机中
//参数一:交换机名称
//参数二:路由键
//参数三:传递参数
//流程:生产者-->交换机-->路由键-->队列
rabbitTemplate.convertAndSend(exchange,routingkey,journal);
log.info("生产者发送消息完成");
return Result.succ("操作成功");
}
}
5.消息的消费者
import com.alibaba.fastjson.JSONObject;
import com.fasterxml.jackson.databind.util.JSONPObject;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.listener.adapter.AbstractAdaptableMessageListener;
import org.springframework.stereotype.Component;
@Slf4j
@Component
@RabbitListener(queues = {"rollback_queue"})
public class RabbitQueueReceiver extends AbstractAdaptableMessageListener {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
//消息的唯一ID,单调递增正整数,从1开始,当multiple=trues,一次性处理<=deliveryTag的所有
long deliveryTag = message.getMessageProperties().getDeliveryTag();
boolean multiple=false; //false单条 true 批量
// channel.basicAck(deliveryTag, multiple); //正确消费消息
// channel.basicReject(deliveryTag,true); //为true会重新放回队列
// channel.basicNack(deliveryTag, multiple,true)
try {
String msg=new String(message.getBody(),"UTF-8");
JSonObject json = JSONObject.parseObject(msg);
Long id = json.getLong("id");
log.info("消费的消息id"+id+"-------->>>>>>>"+"消费者消费消息的消息体:{}----->>>>>"+message);
//睡眠四秒
for(int i=0;i<4;i++){
Thread.sleep(1000);
System.out.println("...");
}
// if(deliveryTag%2==0){
// throw new RuntimeException("偶数必须为0");
// }
log.info("消息已被正确消费--->>>>>>>>"+deliveryTag);
//当前模式为单条消费
channel.basicAck(deliveryTag, multiple);
} catch (Exception e) {
e.printStackTrace();
//报异常重新投递
channel.basicReject(deliveryTag,true);
}
}
// @RabbitHandler
// public void handlerMessage(Journal orderVo) {
// log.info("消费者消费消息"+orderVo.toString());
// }
}
6.实体类对象
import com.fasterxml.jackson.annotation.JsonFormat;
import lombok.Data;
import java.time.LocalDateTime;
@Data
public class Journal {
private Long id;
private String title;
private String titleDesc;
}
7.工具类
//雪花算法 生成主键ID
public class SnowflakeIdWorker {
// ==============================Fields===========================================
private final long twepoch = 1420041600000L;
private final long workerIdBits = 5L;
private final long datacenterIdBits = 5L;
private final long maxWorkerId = -1L ^ (-1L << workerIdBits);
private final long maxDatacenterId = -1L ^ (-1L << datacenterIdBits);
private final long sequenceBits = 12L;
private final long workerIdShift = sequenceBits;
private final long datacenterIdShift = sequenceBits + workerIdBits;
private final long timestampLeftShift = sequenceBits + workerIdBits + datacenterIdBits;
private final long sequenceMask = -1L ^ (-1L << sequenceBits);
private long workerId;
private long datacenterId;
private long sequence = 0L;
private long lastTimestamp = -1L;
// ==============================Constructors=====================================
public SnowflakeIdWorker(long workerId, long datacenterId) {
if (workerId > maxWorkerId || workerId < 0) {
throw new IllegalArgumentException(
String.format("worker Id can't be greater than %d or less than 0", maxWorkerId));
}
if (datacenterId > maxDatacenterId || datacenterId < 0) {
throw new IllegalArgumentException(
String.format("datacenter Id can't be greater than %d or less than 0", maxDatacenterId));
}
this.workerId = workerId;
this.datacenterId = datacenterId;
}
// ==============================Methods==========================================
public synchronized long nextId() {
long timestamp = timeGen();
// 如果当前时间小于上一次ID生成的时间戳,说明系统时钟回退过这个时候应当抛出异常
if (timestamp < lastTimestamp) {
throw new RuntimeException(String.format(
"Clock moved backwards. Refusing to generate id for %d milliseconds", lastTimestamp - timestamp));
}
// 如果是同一时间生成的,则进行毫秒内序列
if (lastTimestamp == timestamp) {
sequence = (sequence + 1) & sequenceMask;
// 毫秒内序列溢出
if (sequence == 0) {
// 阻塞到下一个毫秒,获得新的时间戳
timestamp = tilNextMillis(lastTimestamp);
}
}
// 时间戳改变,毫秒内序列重置
else {
sequence = 0L;
}
// 上次生成ID的时间截
lastTimestamp = timestamp;
// 移位并通过或运算拼到一起组成64位的ID
return ((timestamp - twepoch) << timestampLeftShift) //
| (datacenterId << datacenterIdShift) //
| (workerId << workerIdShift) //
| sequence;
}
protected long tilNextMillis(long lastTimestamp) {
long timestamp = timeGen();
while (timestamp <= lastTimestamp) {
timestamp = timeGen();
}
return timestamp;
}
protected long timeGen() {
return System.currentTimeMillis();
}
// ==============================Test=============================================
public static void main(String[] args) {
SnowflakeIdWorker idWorker = new SnowflakeIdWorker(0, 0);
for (int i = 0; i < 100; i++) {
long id = idWorker.nextId();
String insertSQL = "insert into orderNumber value('" + id + "');";
System.out.println(insertSQL);
}
}
}
import lombok.Data;
import java.io.Serializable;
@Data
public class Result implements Serializable {
private int code;
private String msg;
private Object data;
public static Result succ(Object data) {
return succ(200, "操作成功", data);
}
public static Result succ(int code, String msg, Object data) {
Result r = new Result();
r.setCode(code);
r.setMsg(msg);
r.setData(data);
return r;
}
public static Result fail(String msg) {
return fail(400, msg, null);
}
public static Result fail(int code, String msg, Object data) {
Result r = new Result();
r.setCode(code);
r.setMsg(msg);
r.setData(data);
return r;
}
}
postmain调用接口生产消息:http://localhost:8084/send
结果: 项目地址:



