一、案例结构二、集成流程
1.1、依赖1.2、配置文件1.3、配置类-序列化机制和定制RabbitTemplate 三、全局常量类和实体类
1.1、常量类 MqConstant1.2、实体类 四、测试案例一
1.1、创建和发送1.2、监听消费 三、延时队列测试
1.1、采用@bean创建1.2、发送消息1.3、监听消费 四、订单库存案例(消息事务)
1.1、创建
一、案例结构第一部分==>rabbitMQ介绍与安装说明(一)
二、集成流程 1.1、依赖1.2、配置文件org.springframework.boot spring-boot-starter-amqp org.slf4j slf4j-api 1.7.25 ch.qos.logback logback-classic 1.2.3 org.projectlombok lombok 1.18.8 mysql mysql-connector-java 8.0.19 org.mybatis.spring.boot mybatis-spring-boot-starter 2.1.3 com.github.pagehelper pagehelper-spring-boot-starter 1.2.12 com.alibaba fastjson 1.2.15
#======================↓↓↓↓rabbitmq配置-开始↓↓↓↓====================== # rabbitmq地址 spring.rabbitmq.host=192.168.10.10 # rabbitmq端口 spring.rabbitmq.port=5672 # 虚拟主机 spring.rabbitmq.virtual-host=/ # 账号密码默认guest-这边我是新建了个超级管理员的用户 spring.rabbitmq.username=admin spring.rabbitmq.password=admin # 开启发送端确认 spring.rabbitmq.publisher-confirms=true # 开启发送端消息抵达队列的确认 spring.rabbitmq.publisher-returns=true # 只要抵达队列,以异步发送优先回调我们这个return/confirm/i spring.rabbitmq.template.mandatory=true # 手动ack消息 spring.rabbitmq.listener.simple.acknowledge-mode=manual #======================↓↓↓↓mysql配置-开始↓↓↓↓================================= # 配置数据源 mysql8 spring.datasource.url=jdbc:mysql://127.0.0.1:3306/mq_test?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=utf8&allowMultiQueries=true&rewriteBatchedStatements=true spring.datasource.username=root spring.datasource.password=root spring.datasource.driver-class-name=com.mysql.jdbc.Driver #======================↓↓↓↓mybatis配置-开始↓↓↓↓================================= # 映射文件xml的位置 mybatis.mapper-locations=classpath:mapper/*Mapper.xml # 使全局的映射器启用或禁用缓存 mybatis.configuration.cache-enabled=true # 全局启用或禁用延迟加载。当禁用时,所有关联对象都会即时加载 mybatis.configuration.lazy-loading-enabled=true # 当启用时,有延迟加载属性的对象在被调用时将会完全加载任意属性。否则,每种属性将会按需要加载。 mybatis.configuration.aggressive-lazy-loading=false # 是否允许单条sql 返回多个数据集 (取决于驱动的兼容性) 默认:true mybatis.configuration.multiple-result-sets-enabled=true # 是否可以使用列的别名 (取决于驱动的兼容性) 默认:true mybatis.configuration.use-column-label=true # 指定MyBatis如何自动映射 数据基表的列 NONE:不隐射 PARTIAL:部分 FULL:全部 mybatis.configuration.auto-mapping-behavior=partial # 这是默认的执行类型 (SIMPLE: 简单; REUSE: 执行器可能重复使用prepared statements语句;BATCH: 执行器可以重复执行语句和批量更新) mybatis.configuration.default-executor-type=simple # sql执行时间超时时间 mybatis.configuration.default-statement-timeout=25 # 允许在嵌套语句中使用分页(RowBounds)。如果允许使用则设置为false。 mybatis.configuration.safe-row-bounds-enabled=false # 是否使用驼峰命名法转换字段 mybatis.configuration.map-underscore-to-camel-case=false # 设置本地缓存范围 session:就会有数据的共享 statement:语句范围 (这样就不会有数据的共享 ) 默认:session mybatis.configuration.local-cache-scope=session # 设置当JDBC类型为空时,某些驱动程序 要指定值,默认:OTHER。当写入null值的字段时,部分数据库需要指定null的数据类型.mysql不用设置。【oracle需要设置】对应org.apache.ibatis.type.JdbcType的枚举值 mybatis.configuration.jdbc-type-for-null=other # 指定对象哪个的方法触发一次延迟加载 mybatis.configuration.lazy-load-trigger-methods=equals,clone,hashCode,toString # [默认false,推荐使用true] 如果数据为空的字段,则该字段省略不显示,查询数据映射数据类型使用的是Map。当字段值为null时,mybatis映射返回字段的时候会忽略,而原接口是null值也返回,为了兼容,需要设置不忽略null字段 mybatis.configuration.call-setters-on-nulls=true # mybatis指定日志输出的前缀 mybatis.configuration.log-prefix=mybatis_ # 打印sql-开发时启用,会影响性能【使用boot默认的logback】 mybatis.configuration.log-impl=org.apache.ibatis.logging.stdout.StdOutImpl # 将java类型(javaType)转化为jdbc类型(jdbcType),或者将jdbc类型(jdbcType)转化为java类型(javaType)。【这个参数还没搞得很明白】 # mybatis.type-handlers-package=com.mapper.typehandler # 允许JDBC生成主键。需要驱动器支持。如果设为了true,这个设置将强制使用被生成的主键,有一些驱动器不兼容不过仍然可以执行。 默认:false # mybatis.configuration.use-generated-keys=false # 指定代理工厂:mybatis 实现2个:CglibProxyFactory和JavassistProxyFactory(默认) # mybatis.configuration.proxy-factory=CGLIB #======================↓↓↓↓pagehelper配置-开始↓↓↓↓================================= # 分页 pagehelper.helperDialect=mysql pagehelper.reasonable=true pagehelper.supportMethodsArguments=true pagehelper.params=count=countSql1.3、配置类-序列化机制和定制RabbitTemplate
- rabbitmq默认采用jdk的序列化机制,我们改成json定制RabbitTemplate-开启ack模式
MyRabbitConfig01
package sqy.config;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
@Configuration
public class MyRabbitConfig01 {
RabbitTemplate rabbitTemplate;
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
@Primary//注入容器的意思-和@Autowired意思差不多,但是Primary优先级更高
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
this.rabbitTemplate = rabbitTemplate;
rabbitTemplate.setMessageConverter(messageConverter());
initRabbitTemplate();
return rabbitTemplate;
}
// @PostConstruct //MyRabbitConfig对象创建完成以后,执行这个方法
public void initRabbitTemplate(){
//设置确认回调
rabbitTemplate.set/confirm/iCallback(new RabbitTemplate./confirm/iCallback() {
@Override
public void /confirm/i(CorrelationData correlationData, boolean ack, String cause) {
//服务器收到了;
//修改消息的状态
System.out.println("服务器收到...correlationData["+correlationData+"]==>ack["+ack+"]==>cause["+cause+"]");
}
});
//设置消息抵达队列的确认回调
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
//接收不到消息这边会接收到错误信息。【可以结合数据库-修改数据库当前消息的状态->记录错误然后重试】
System.out.println("Fail Message["+message+"]==>replyCode["+replyCode+"]==>replyText["+replyText+"]===>exchange["+exchange+"]===>routingKey["+routingKey+"]");
}
});
}
}
三、全局常量类和实体类 1.1、常量类 MqConstant
package sqy.constant;
public class MqConstant {
public static final String C01_EXCHANGE_NAME = "hello-exchange";
public static final String C01_QUEUE_NAME = "hello-queue";
public static final String C01_ROUTING_KEY = "helloMQ.java";
public static final String C01_ERROR_ROUTING_KEY = "helloMQ.java.golang";
//------------------------------------
public static final String C02_TOPIC_EXCHANGE = "test.topic.exchange";
public static final String C02_DELAY_QUEUE_A = "test.delay.queue.a";
public static final String C02_RELEASE_QUEUE_B = "test.release.queue.b";
public static final String C02_DELAY_ROUTINGKEY = "test.delay.routingKey";
public static final String C02_RELEASE_ROUTINGKEY = "test.release.routingKey";
public static final Integer C02_MEG_TTL = 1 * 60 * 1000;//1分钟-60000毫秒
//------------------------------------
public static final String C03_TOPIC_EXCHANGE = "demo.topic.exchange";
//....
}
1.2、实体类
Student
package sqy.pojo;
import lombok.Data;
import java.util.Date;
@Data
public class Student {
private String studentNo;
private String studentName;
private Date studentBirth;
}
Teacher
package sqy.pojo;
import lombok.Data;
import java.util.Date;
@Data
public class Teacher {
private String teacherNo;
private String teacherName;
private Date teacherBirth;
}
MqMessage
package sqy.pojo.demo;
import lombok.Data;
import lombok.ToString;
import java.util.Date;
@Data
@ToString
public class MqMessage {
private String messageId;//消息的唯一id
private String content;//消息内容
private String toExchange;//交换机
private String routingKey;//路由键
private String classType;//实体类类型
private String messageStatus;//0-新建 1-已发送 2-错误抵达 3-已抵达
private Date createTime;//创建时间
private Date updateTime;//更新时间
}
Order
package sqy.pojo.demo;
import lombok.Data;
import lombok.ToString;
@Data
@ToString
public class Order {
private String orderId;//订单号
private String orderUserId;//用户id
private String orderStatus;//订单状态【0->待付款;1->待发货;2->已发货;3->已完成;4->已关闭;5->无效订单】
}
OrderWareTask
package sqy.pojo.demo;
import lombok.Data;
import lombok.ToString;
@Data
@ToString
public class OrderWareTask {
private int id;//自增主键
private String orderId;//订单号-关联订单表
private String productId;//商品id-关联商品
private String productNum;//商品数量
private String wareId;//仓库id
private String lockStatus;//任务状态-1锁定 2解锁 [数据库锁-乐观锁]
}
Product
package sqy.pojo.demo;
import lombok.Data;
import lombok.ToString;
@Data
@ToString
public class Product {
private String productId;
private String productName;
private String productPrice;
}
Warehouse
package sqy.pojo.demo;
import lombok.Data;
import lombok.ToString;
import java.math.BigDecimal;
@Data
@ToString
public class Warehouse {
private String wareId;//仓库id
private String wareName;//仓库名称
private String wareAddr;//地址
private String purchaseProductId;//采购物品id
private String purchaseProductName;//采购物品name
private BigDecimal purchaseProductPrice;//采购价格
}
四、测试案例一 1.1、创建和发送
package sqy.controller;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.lang.Nullable;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import sqy.constant.MqConstant;
import sqy.pojo.Student;
import sqy.pojo.Teacher;
import java.util.Date;
import java.util.UUID;
@Slf4j
@RestController
public class Test01CreateController {
@Autowired
AmqpAdmin amqpAdmin;
@Autowired
RabbitTemplate rabbitTemplate;
//localhost:8080/testCreateMQ
@GetMapping("/testCreateMQ")
public String testCreateMQ() {
//也可以用 BindingBuilder 、ExchangeBuilder、QueueBuilder来创建
//[创建-声明]交换机
DirectExchange directExchange = new DirectExchange(MqConstant.C01_EXCHANGE_NAME, true, false);
amqpAdmin.declareExchange(directExchange);
log.info("Exchange[{}]创建成功", MqConstant.C01_EXCHANGE_NAME);
//[创建-声明]队列
Queue queue = new Queue(MqConstant.C01_QUEUE_NAME, true, false, false);
amqpAdmin.declareQueue(queue);
log.info("Queue[{}]创建成功", MqConstant.C01_QUEUE_NAME);
//将exchange指定的交换机和destination目的地进行绑定,使用routingKey作为指定的路由键【绑定】
Binding binding = new Binding(MqConstant.C01_QUEUE_NAME,
Binding.DestinationType.QUEUE,
MqConstant.C01_EXCHANGE_NAME,
MqConstant.C01_ROUTING_KEY, null);
amqpAdmin.declareBinding(binding);
log.info("Binding创建成功-路由key为:[{}]", MqConstant.C01_ROUTING_KEY);
return "创建交换机-队列成功,并指定 <路由key> 绑定";
}
@GetMapping("/sendMq")
public String sendMq(@RequestParam(value = "num", defaultValue = "20") Integer num) {
for (int i = 0; i < num; i++) {
if (i<=10){
if (i % 2 == 0) {
Student student = new Student();
student.setStudentNo(i+"号");
student.setStudentName("学生" + i);
student.setStudentBirth(new Date());
//public void convertAndSend(String exchange【交换机】, String routingKey【路由key】, Object object【消息】, @Nullable CorrelationData correlationData)【消息的唯一标识<推荐使用分布式id>】
rabbitTemplate.convertAndSend(MqConstant.C01_EXCHANGE_NAME, MqConstant.C01_ROUTING_KEY, student, new CorrelationData("学生"+UUID.randomUUID().toString().substring(0,7)));
} else {
Teacher teacher=new Teacher();
teacher.setTeacherNo(i+"号");
teacher.setTeacherName("教师"+i);
//使用正常的路由键
rabbitTemplate.convertAndSend(MqConstant.C01_EXCHANGE_NAME, MqConstant.C01_ROUTING_KEY, teacher, new CorrelationData("教师"+UUID.randomUUID().toString().substring(0,7)));
//使用错误的路由键-模拟进入队列失败
//rabbitTemplate.convertAndSend(MqConstant.C01_EXCHANGE_NAME, MqConstant.C01_ERROR_ROUTING_KEY, teacher, new CorrelationData("教师"+UUID.randomUUID().toString().substring(0,7)));
}
}else {
if (i % 2 != 0) {
Student student = new Student();
student.setStudentNo(i+"号");
student.setStudentName("学生" + i);
student.setStudentBirth(new Date());
//交换机、路由key、消息、消息的唯一标识<推荐使用分布式id>
rabbitTemplate.convertAndSend(MqConstant.C01_EXCHANGE_NAME, MqConstant.C01_ROUTING_KEY, student, new CorrelationData("学生"+UUID.randomUUID().toString().substring(0,7)));
} else {
Teacher teacher=new Teacher();
teacher.setTeacherNo(i+"号");
teacher.setTeacherName("教师"+i);
//使用正常的路由键-
rabbitTemplate.convertAndSend(MqConstant.C01_EXCHANGE_NAME, MqConstant.C01_ROUTING_KEY, teacher, new CorrelationData("教师"+UUID.randomUUID().toString().substring(0,7)));
//使用错误的路由键-模拟进入队列失败
//rabbitTemplate.convertAndSend(MqConstant.C01_EXCHANGE_NAME, MqConstant.C01_ERROR_ROUTING_KEY, teacher, new CorrelationData("教师"+UUID.randomUUID().toString().substring(0,7)));
}
}
}
return "发送20条消息完成";
}
}
1.2、监听消费
package sqy.service;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
import sqy.constant.MqConstant;
import sqy.pojo.Student;
import sqy.pojo.Teacher;
//监听队列
@RabbitListener(queues = {MqConstant.C01_QUEUE_NAME})
@Service
public class Test01Listener {
@RabbitHandler
public void receiveMessage01(Message message,
Student student,
Channel channel) throws InterruptedException {
byte[] body = message.getBody();//消息体
MessageProperties properties = message.getMessageProperties();//消息头属性信息
Thread.sleep(2000);//模拟业务耗时
// System.out.println("接受学生消息=>" + student);
//channel内按顺序自增的。
long deliveryTag = message.getMessageProperties().getDeliveryTag();
System.out.println("信道自增交货标签:" + deliveryTag);
//todo basicAck和basicNack实际场景不会一起使用
try {
if (deliveryTag % 2 == 0) {
//模拟系统中断,debug停止,后面代码还是会执行。所以模拟的话也可以用kill后台进程
if (deliveryTag > 6) {
System.out.println("开始闪断-网络中断.........");
System.exit(1);//模拟项目突然挂了,队列里面消息的情况
}
//信道里面-手动签收 basicAck(long deliveryTag[消息的id], boolean multiple[是否批量签收-以可以理解为签收当前>=deliveryTag的消息])
channel.basicAck(deliveryTag, false);
System.out.println("学生-签收.." + student.getStudentName());
} else {
// channel.basicReject();//不签收法1
channel.basicNack(deliveryTag, false, false);//不签收法2--参数多用这个
System.out.println("学生--没有签收.." + student.getStudentName());
}
} catch (Exception e) {
//网络中断
}
}
@RabbitHandler
public void receiveMessage02(Message message, Teacher teacher, Channel channel) throws Exception {
Thread.sleep(2000);//模拟业务耗时
long deliveryTag = message.getMessageProperties().getDeliveryTag();
//System.out.println("接收到消息..." + teacher);
//签收
channel.basicAck(deliveryTag, false);
System.out.println("教师-签收了=>" + teacher.getTeacherName());
}
}
三、延时队列测试
1.1、采用@bean创建
MyRabbitConfig02
package sqy.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import sqy.constant.MqConstant;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class MyRabbitConfig02 {
//发布订阅模式的交换机
@Bean
public Exchange testTopicExchange() {
//String name[名称], boolean durable[持久化], boolean autoDelete[自动删除], Map arguments[其他参数]
return new TopicExchange(MqConstant.C02_TOPIC_EXCHANGE, true, false);
}
//延迟队列a,存放有效期为1分钟的消息
@Bean
public Queue testDelayQueueA() {
Map arguments = new HashMap<>();
arguments.put("x-dead-letter-exchange", MqConstant.C02_TOPIC_EXCHANGE);
arguments.put("x-dead-letter-routing-key", MqConstant.C02_RELEASE_ROUTINGKEY);
arguments.put("x-message-ttl", MqConstant.C02_MEG_TTL);
//String name[名称], boolean durable[持久化], boolean exclusive[独占], boolean autoDelete[自动删除], Map arguments[其他参数]
Queue queue = new Queue(MqConstant.C02_DELAY_QUEUE_A, true, false, false, arguments);
return queue;
}
//释放队列b,存放过期消息的队列-客户端监听-用于释放消息
@Bean
public Queue testReleaseQueueB() {
Queue queue = new Queue(MqConstant.C02_RELEASE_QUEUE_B, true, false, false);
return queue;
}
//绑定队列a和交换机-指定路由键
@Bean
public Binding testDelayQueueABingding() {
//String destination[目的地], DestinationType destinationType[枚举-目的地类型], String exchange[交换机名称],
// String routingKey[路由键], Map arguments[其他参数]
return new Binding(MqConstant.C02_DELAY_QUEUE_A,
Binding.DestinationType.QUEUE,
MqConstant.C02_TOPIC_EXCHANGE,
MqConstant.C02_DELAY_ROUTINGKEY,
null);
}
//绑定队列b和交换机-指定路由键
@Bean
public Binding testReleaseQueueBBingding() {
return new Binding(MqConstant.C02_RELEASE_QUEUE_B,
Binding.DestinationType.QUEUE,
MqConstant.C02_TOPIC_EXCHANGE,
MqConstant.C02_RELEASE_ROUTINGKEY,
null);
}
}
1.2、发送消息
package sqy.controller;
import com.alibaba.fastjson.JSON;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class Test02DelayController {
@Autowired
RabbitTemplate rabbitTemplate;
//http://127.0.0.1:8080/sendMsgToQueueA
@GetMapping("/sendMsgToQueueA")
public String sendMsgToQueueA() {
rabbitTemplate.convertAndSend("test.topic.exchange", "test.delay.routingKey", "这个一条存活时间为1分钟的消息");
return "向队列a发送1条消息成功";
}
}
1.3、监听消费
package sqy.service;
import com.alibaba.fastjson.JSON;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
import sqy.constant.MqConstant;
@Service
public class Test02DelayMqListener {
@RabbitListener(queues = {MqConstant.C02_RELEASE_QUEUE_B})
public void receiveMessage02(Message message, Channel channel) throws Exception {
//System.out.println(JSON.toJSonString(message.getBody()));
System.out.println("这个已过期的被转交给队列b的消息:==>" + message);
long deliveryTag = message.getMessageProperties().getDeliveryTag();
//签收
channel.basicAck(deliveryTag, false);
System.out.println("释放了消息");
}
}
四、订单库存案例(消息事务) 1.1、创建
package sqy.config;
import org.springframework.context.annotation.Configuration;
@Configuration
public class MyRabbitConfig03 {
}
后面补充…
上一篇:rabbitMQ介绍与安装说明(一)



