栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

Spring boot + RabbitMQ 整合(模块化)

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Spring boot + RabbitMQ 整合(模块化)

数据库部分

虚拟机部分

代码部分

项目名称 rabbitmq-order,分为模块化创建了多个模块

entity : rabbitmq-order-entity

mapper: rabbitmq-order-mapper

service: rabbitmq-order-service

consumer:rabbitmq-order-consumer

 根: 

pom.xml 配置



    4.0.0

    com.etoal.et2106.rabbitmq
    rabbitmq-order
    1.0-SNAPSHOT
    
        
        rabbitmq-order-entity
        rabbitmq-order-mapper
        rabbitmq-order-service
        rabbitmq-order-delay-consumer
    
    pom


    
        org.springframework.boot
        spring-boot-starter-parent
        2.4.7
    

    
        

            
            
                com.etoak.et2106.rabbitmq
                rabbitmq-order-entity
                1.0-SNAPSHOT
            
            
            
                com.etoal.et2106.rabbitmq
                rabbitmq-order-mapper
                1.0-SNAPSHOT
            
            
            
                com.baomidou
                mybatis-plus
                3.4.3.1
            

            
            
                com.baomidou
                mybatis-plus-boot-starter
                3.4.3.1
            

        
    


rabbitmq-order-entity层级目录

entity的pom.xml



    
        rabbitmq-order
        com.etoal.et2106.rabbitmq
        1.0-SNAPSHOT
    
    4.0.0

    rabbitmq-order-entity

    

        
        
            org.projectlombok
            lombok
            true
        

        
        
            com.baomidou
            mybatis-plus
            
            true
        
    

entity下的Order

package com.etoak.entity;

import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;



@Data

@TableName("t_order")
public class Order {
    
    @TableId(type = IdType.AUTO)
    private Integer id;

    private String orderNo;

    private Integer orderStatus;

    private Integer payStatus;

    private String  createTime;

    private String cancelTime;
}

rabbitmq-order-mapper层级目录


mapper的pom.xml



    
        rabbitmq-order
        com.etoal.et2106.rabbitmq
        1.0-SNAPSHOT
    
    4.0.0

    rabbitmq-order-mapper

    

        
        
            rabbitmq-order-entity
            com.etoal.et2106.rabbitmq
        
        
        
            com.baomidou
            mybatis-plus
            true
        
        
        
            com.etoal.et2106.rabbitmq
            rabbitmq-order-entity
            1.0-SNAPSHOT
            compile
        
    

mapper的OrderMapper

package com.etoak.mapper;

import com.baomidou.mybatisplus.core.mapper.baseMapper;
import com.etoak.entity.Order;


public interface OrderMapper  extends baseMapper {
}

rabbitmq-order-service层级目录

service的xml



    
        rabbitmq-order
        com.etoal.et2106.rabbitmq
        1.0-SNAPSHOT
    
    4.0.0

    rabbitmq-order-service



    

        
        
            com.etoal.et2106.rabbitmq
            rabbitmq-order-mapper
        
        
        
            org.springframework.boot
            spring-boot-starter-web
        
        
        
            org.springframework.boot
            spring-boot-starter-amqp
        
        
        
            mysql
            mysql-connector-java
        
        
        
            com.baomidou
            mybatis-plus-boot-starter
        
        
        
            org.projectlombok
            lombok
            true
        
        
        
            cn.hutool
            hutool-all
            5.7.13
        

    


resources下的application.yml

#服务
server:
  port: 8003

#spring 数据源
spring:
  datasource:
    driver-class-name: com.mysql.cj.jdbc.Driver
    url: jdbc:mysql:///et2106?serverTimezone=UTC
    username: root
    password: etoak
  #rabbitmq 的配置
  rabbitmq:
    host: 192.168.132.139
    port: 5672
    username: et
    password: etoak
    virtual-host: /et
    listener:
      simple:
        #手动确认
        acknowledge-mode: manual

启动文件: OrderserverApp

package com.etoak;

import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.transaction.annotation.EnableTransactionManagement;

//Spring 项目中开始spring自动配置,是springboot 的核心配置类
@SpringBootApplication

@MapperScan(basePackages = "com.etoak.mapper")

@EnableTransactionManagement
public class OrderServerApp {
    public static void main(String[] args) {
        //SpringApplication是Springboot驱动Spring上下文的引导类,
        // run() 启动spring应用,
        // 实质上是为Spring应用创建并初始化Spring上下文。
        SpringApplication.run(OrderServerApp.class,args);
    }
}

controller下的OrderController

package com.etoak.controller;


import com.etoak.entity.Order;
import com.etoak.service.OrderService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;


@RestController
public class OrderController {
    
    @Autowired
    OrderService orderService;
    
    @RequestMapping("/create")
    
    public String createOrder(@RequestParam(required = false,defaultValue = "600000")int delay) {
        
        orderService.createOrder(new Order(),delay);
        
        return  "Success";
    }
}

service下的Orderservice

package com.etoak.service;

import cn.hutool.core.date.DateUtil;
import cn.hutool.core.util.IdUtil;
import com.etoak.config.OrderDelayConfig;
import com.etoak.entity.Order;
import com.etoak.mapper.OrderMapper;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import java.util.Date;


@Service
public class OrderService {
    //生命两个状态码 创建的,没付钱的
    public static final int  CREATE = 1;
    public static final int NO_PAY = 1;

    //引入OrderMapper
    @Autowired
    OrderMapper orderMapper;

    //引入RabbitTemplate 负责实现rabbirt队列的类
    @Autowired
    RabbitTemplate rabbitTemplate;

    //开启事务
    @Transactional
    //添加order 的方法,需要一个order,还有延迟时间
    public int createOrder(Order order,int delay) {
        //生成简单的id,唯一识别码
        String orderNo = IdUtil.getSnowflake().nextIdStr();
        //简单的方式大得到当前的时间,
        String createTime = DateUtil.format(new Date(), "yyyy-MM-dd HH:mm:ss");
        //对这个Order 设置 id 创建状态,支付状态,创建时间
        order.setOrderNo(orderNo);
        order.setOrderStatus(CREATE);
        order.setPayStatus(NO_PAY);
        order.setCreateTime(createTime);

        //入库 添加到数据库
        int result = orderMapper.insert(order);

        //发送消息到延迟队列,
        
        rabbitTemplate.convertAndSend(OrderDelayConfig.EXCHANGE,OrderDelayConfig.ROUTING_KEY,orderNo,
                //设置延迟事件,不是过期时间
                message -> {
                
                    message.getMessageProperties().setDelay(delay);
                    return message;
        });
        
        return  result;





    }
}

config下的OrderDelayConfig

package com.etoak.config;

import com.rabbitmq.client.BuiltinExchangeType;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class OrderDelayConfig {
    //定义交换机 名字,队列名字,路由key
    public static final String EXCHANGE = "order.exchange";

    public static final String QUEUE = "order.queue";

    public static final String ROUTING_KEY = "order.delay";

    
    @Bean
    public CustomExchange orderExchange() {
        
        Map args = new HashMap<>();
        //定义属性延迟
        args.put("x-delayed-type", BuiltinExchangeType.DIRECT.getType());
        
        return  new CustomExchange(EXCHANGE,"x-delayed-message",true,false,args);
    }

    @Bean
    
    public Queue orderQueue() {
        return new Queue(QUEUE);
    }
    @Bean
    
    public Binding orderBinding() {
        return BindingBuilder
                .bind(orderQueue())
                .to(orderExchange())
                .with(ROUTING_KEY)
                .noargs();
    }

}

rabbitmq-order-delay-consumer层级目录

comsumer的pom.xml 配置



    
        rabbitmq-order
        com.etoal.et2106.rabbitmq
        1.0-SNAPSHOT
    
    4.0.0

    rabbitmq-order-delay-consumer

    

        
        
            org.springframework.boot
            spring-boot-starter-web
        
        
        
            org.springframework.boot
            spring-boot-starter-amqp
        
        
        
            com.etoal.et2106.rabbitmq
            rabbitmq-order-mapper
        
        
        
            mysql
            mysql-connector-java
        
        
        
            com.baomidou
            mybatis-plus-boot-starter
        
        
        
            cn.hutool
            hutool-all
            5.7.13
        


    

resources下的application.yml

#服务
server:
  port: 8003

#spring 数据源
spring:
  datasource:
    driver-class-name: com.mysql.cj.jdbc.Driver
    url: jdbc:mysql:///et2106?serverTimezone=UTC
    username: root
    password: etoak
  #rabbitmq 的配置
  rabbitmq:
    host: 192.168.132.139
    port: 5672
    username: et
    password: etoak
    virtual-host: /et
    listener:
      simple:
        acknowledge-mode: manual

启动项 ConsumerApp

package com.etoak;

import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.transaction.annotation.EnableTransactionManagement;


@SpringBootApplication

@MapperScan(basePackages = "com.etoak.mapper")

@EnableTransactionManagement
public class ConsumerApp {
    //SpringApplication.run 核心方法,启动Spring应用
    public static void main(String[] args) {
        SpringApplication.run(ConsumerApp.class,args);
    }
}

sercvice下的ConcelService

package com.etoak.service;

import cn.hutool.core.date.DateUtil;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.etoak.entity.Order;
import com.etoak.mapper.OrderMapper;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import java.io.IOException;
import java.util.Date;

@Service
public class ConcelService {
    

    
    @Autowired
    OrderMapper orderMapper;
    
    @RabbitListener(queues = "order.queue")
    
    @Transactional(rollbackFor = Exception.class)
    
    public void handle(Message message, Channel channel) throws IOException {
        //得到队列里消息的orderNo
        String orderNo = new String(message.getBody());
        //调用方法 得到order
        Order order = this.getOrder(orderNo);
        //如果是新增订单并且未支付,
        if(order.getOrderStatus() == 1 && order.getPayStatus() == 1){
            //3.自动取消,设置为3
            order.setOrderStatus(3);
            
            order.setCancelTime(DateUtil.format(new Date(),"yyyy-MM-dd HH:mm:ss"));
            //对order进行修改
            orderMapper.updateById(order);
        }
        //对当前操作的信道进行处理,手动确认: 参数 标签, :为了减少网络流量,手动确认可以被批处理,当该参数为 true 时,则可以一次性确认 delivery_tag 小于等于传入值的所有消息
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
    }

    public Order getOrder(String orderNo){

        QueryWrapper queryWrapper = new QueryWrapper<>();
        
        queryWrapper.select("id","order_no","order_status","pay_status").eq("order_no",orderNo);
        
        return orderMapper.selectOne(queryWrapper);
    }
}


测试: 启动 

rabbitmq-order-server 的启动类,然后再浏览器地输入

locathost:8002/create?delay

locathost:8002/create?delay=3000

locathost:8002/create?delay=30000

然后查看rabbitmq 的后台关于queue 的消息条数,应当是随着时间进行添加的

然后启动 

rabbitmq-order-delay-consumer

然后去rabbitmq 的后台查看,队里里面的消息是否还存在.应当是被处理完毕,然后去数据库查看没一个roder 的 cancel_time的时间是否正确

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/356809.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号