数据库部分
虚拟机部分
代码部分
项目名称 rabbitmq-order,分为模块化创建了多个模块
entity : rabbitmq-order-entity
mapper: rabbitmq-order-mapper
service: rabbitmq-order-service
consumer:rabbitmq-order-consumer
根:
pom.xml 配置
4.0.0 com.etoal.et2106.rabbitmq rabbitmq-order1.0-SNAPSHOT rabbitmq-order-entity rabbitmq-order-mapper rabbitmq-order-service rabbitmq-order-delay-consumer pom org.springframework.boot spring-boot-starter-parent2.4.7 com.etoak.et2106.rabbitmq rabbitmq-order-entity1.0-SNAPSHOT com.etoal.et2106.rabbitmq rabbitmq-order-mapper1.0-SNAPSHOT com.baomidou mybatis-plus3.4.3.1 com.baomidou mybatis-plus-boot-starter3.4.3.1
rabbitmq-order-entity层级目录
entity的pom.xml
rabbitmq-order com.etoal.et2106.rabbitmq 1.0-SNAPSHOT 4.0.0 rabbitmq-order-entityorg.projectlombok lomboktrue com.baomidou mybatis-plustrue
entity下的Order
package com.etoak.entity;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
@Data
@TableName("t_order")
public class Order {
@TableId(type = IdType.AUTO)
private Integer id;
private String orderNo;
private Integer orderStatus;
private Integer payStatus;
private String createTime;
private String cancelTime;
}
rabbitmq-order-mapper层级目录
mapper的pom.xml
rabbitmq-order com.etoal.et2106.rabbitmq 1.0-SNAPSHOT 4.0.0 rabbitmq-order-mapperrabbitmq-order-entity com.etoal.et2106.rabbitmq com.baomidou mybatis-plustrue com.etoal.et2106.rabbitmq rabbitmq-order-entity1.0-SNAPSHOT compile
mapper的OrderMapper
package com.etoak.mapper; import com.baomidou.mybatisplus.core.mapper.baseMapper; import com.etoak.entity.Order; public interface OrderMapper extends baseMapper{ }
rabbitmq-order-service层级目录
service的xml
rabbitmq-order com.etoal.et2106.rabbitmq 1.0-SNAPSHOT 4.0.0 rabbitmq-order-servicecom.etoal.et2106.rabbitmq rabbitmq-order-mapperorg.springframework.boot spring-boot-starter-weborg.springframework.boot spring-boot-starter-amqpmysql mysql-connector-javacom.baomidou mybatis-plus-boot-starterorg.projectlombok lomboktrue cn.hutool hutool-all5.7.13
resources下的application.yml
#服务
server:
port: 8003
#spring 数据源
spring:
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql:///et2106?serverTimezone=UTC
username: root
password: etoak
#rabbitmq 的配置
rabbitmq:
host: 192.168.132.139
port: 5672
username: et
password: etoak
virtual-host: /et
listener:
simple:
#手动确认
acknowledge-mode: manual
启动文件: OrderserverApp
package com.etoak;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.transaction.annotation.EnableTransactionManagement;
//Spring 项目中开始spring自动配置,是springboot 的核心配置类
@SpringBootApplication
@MapperScan(basePackages = "com.etoak.mapper")
@EnableTransactionManagement
public class OrderServerApp {
public static void main(String[] args) {
//SpringApplication是Springboot驱动Spring上下文的引导类,
// run() 启动spring应用,
// 实质上是为Spring应用创建并初始化Spring上下文。
SpringApplication.run(OrderServerApp.class,args);
}
}
controller下的OrderController
package com.etoak.controller;
import com.etoak.entity.Order;
import com.etoak.service.OrderService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class OrderController {
@Autowired
OrderService orderService;
@RequestMapping("/create")
public String createOrder(@RequestParam(required = false,defaultValue = "600000")int delay) {
orderService.createOrder(new Order(),delay);
return "Success";
}
}
service下的Orderservice
package com.etoak.service;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.util.IdUtil;
import com.etoak.config.OrderDelayConfig;
import com.etoak.entity.Order;
import com.etoak.mapper.OrderMapper;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.util.Date;
@Service
public class OrderService {
//生命两个状态码 创建的,没付钱的
public static final int CREATE = 1;
public static final int NO_PAY = 1;
//引入OrderMapper
@Autowired
OrderMapper orderMapper;
//引入RabbitTemplate 负责实现rabbirt队列的类
@Autowired
RabbitTemplate rabbitTemplate;
//开启事务
@Transactional
//添加order 的方法,需要一个order,还有延迟时间
public int createOrder(Order order,int delay) {
//生成简单的id,唯一识别码
String orderNo = IdUtil.getSnowflake().nextIdStr();
//简单的方式大得到当前的时间,
String createTime = DateUtil.format(new Date(), "yyyy-MM-dd HH:mm:ss");
//对这个Order 设置 id 创建状态,支付状态,创建时间
order.setOrderNo(orderNo);
order.setOrderStatus(CREATE);
order.setPayStatus(NO_PAY);
order.setCreateTime(createTime);
//入库 添加到数据库
int result = orderMapper.insert(order);
//发送消息到延迟队列,
rabbitTemplate.convertAndSend(OrderDelayConfig.EXCHANGE,OrderDelayConfig.ROUTING_KEY,orderNo,
//设置延迟事件,不是过期时间
message -> {
message.getMessageProperties().setDelay(delay);
return message;
});
return result;
}
}
config下的OrderDelayConfig
package com.etoak.config;
import com.rabbitmq.client.BuiltinExchangeType;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class OrderDelayConfig {
//定义交换机 名字,队列名字,路由key
public static final String EXCHANGE = "order.exchange";
public static final String QUEUE = "order.queue";
public static final String ROUTING_KEY = "order.delay";
@Bean
public CustomExchange orderExchange() {
Map args = new HashMap<>();
//定义属性延迟
args.put("x-delayed-type", BuiltinExchangeType.DIRECT.getType());
return new CustomExchange(EXCHANGE,"x-delayed-message",true,false,args);
}
@Bean
public Queue orderQueue() {
return new Queue(QUEUE);
}
@Bean
public Binding orderBinding() {
return BindingBuilder
.bind(orderQueue())
.to(orderExchange())
.with(ROUTING_KEY)
.noargs();
}
}
rabbitmq-order-delay-consumer层级目录
comsumer的pom.xml 配置
rabbitmq-order com.etoal.et2106.rabbitmq 1.0-SNAPSHOT 4.0.0 rabbitmq-order-delay-consumerorg.springframework.boot spring-boot-starter-weborg.springframework.boot spring-boot-starter-amqpcom.etoal.et2106.rabbitmq rabbitmq-order-mappermysql mysql-connector-javacom.baomidou mybatis-plus-boot-startercn.hutool hutool-all5.7.13
resources下的application.yml
#服务
server:
port: 8003
#spring 数据源
spring:
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql:///et2106?serverTimezone=UTC
username: root
password: etoak
#rabbitmq 的配置
rabbitmq:
host: 192.168.132.139
port: 5672
username: et
password: etoak
virtual-host: /et
listener:
simple:
acknowledge-mode: manual
启动项 ConsumerApp
package com.etoak;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.transaction.annotation.EnableTransactionManagement;
@SpringBootApplication
@MapperScan(basePackages = "com.etoak.mapper")
@EnableTransactionManagement
public class ConsumerApp {
//SpringApplication.run 核心方法,启动Spring应用
public static void main(String[] args) {
SpringApplication.run(ConsumerApp.class,args);
}
}
sercvice下的ConcelService
package com.etoak.service;
import cn.hutool.core.date.DateUtil;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.etoak.entity.Order;
import com.etoak.mapper.OrderMapper;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.io.IOException;
import java.util.Date;
@Service
public class ConcelService {
@Autowired
OrderMapper orderMapper;
@RabbitListener(queues = "order.queue")
@Transactional(rollbackFor = Exception.class)
public void handle(Message message, Channel channel) throws IOException {
//得到队列里消息的orderNo
String orderNo = new String(message.getBody());
//调用方法 得到order
Order order = this.getOrder(orderNo);
//如果是新增订单并且未支付,
if(order.getOrderStatus() == 1 && order.getPayStatus() == 1){
//3.自动取消,设置为3
order.setOrderStatus(3);
order.setCancelTime(DateUtil.format(new Date(),"yyyy-MM-dd HH:mm:ss"));
//对order进行修改
orderMapper.updateById(order);
}
//对当前操作的信道进行处理,手动确认: 参数 标签, :为了减少网络流量,手动确认可以被批处理,当该参数为 true 时,则可以一次性确认 delivery_tag 小于等于传入值的所有消息
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}
public Order getOrder(String orderNo){
QueryWrapper queryWrapper = new QueryWrapper<>();
queryWrapper.select("id","order_no","order_status","pay_status").eq("order_no",orderNo);
return orderMapper.selectOne(queryWrapper);
}
}
测试: 启动
rabbitmq-order-server 的启动类,然后再浏览器地输入
locathost:8002/create?delay
locathost:8002/create?delay=3000
locathost:8002/create?delay=30000
然后查看rabbitmq 的后台关于queue 的消息条数,应当是随着时间进行添加的
然后启动
rabbitmq-order-delay-consumer
然后去rabbitmq 的后台查看,队里里面的消息是否还存在.应当是被处理完毕,然后去数据库查看没一个roder 的 cancel_time的时间是否正确



