栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

rabbitmq消息队列5种模式(springboot连接rabbitmq集群)

rabbitmq消息队列5种模式(springboot连接rabbitmq集群)

springboot整合rabbitmq

一、案例结构二、集成流程

1.1、依赖1.2、配置文件1.3、配置类-序列化机制和定制RabbitTemplate 三、全局常量类和实体类

1.1、常量类 MqConstant1.2、实体类 四、测试案例一

1.1、创建和发送1.2、监听消费 三、延时队列测试

1.1、采用@bean创建1.2、发送消息1.3、监听消费 四、订单库存案例(消息事务)

1.1、创建

一、案例结构


第一部分==>rabbitMQ介绍与安装说明(一)

二、集成流程 1.1、依赖


    org.springframework.boot
    spring-boot-starter-amqp





    org.slf4j
    slf4j-api
    1.7.25



    ch.qos.logback
    logback-classic
    1.2.3




    org.projectlombok
    lombok
    1.18.8




    mysql
    mysql-connector-java
    8.0.19




    org.mybatis.spring.boot
    mybatis-spring-boot-starter
    2.1.3



    com.github.pagehelper
    pagehelper-spring-boot-starter
    1.2.12




    com.alibaba
    fastjson
    1.2.15

1.2、配置文件
#======================↓↓↓↓rabbitmq配置-开始↓↓↓↓======================
# rabbitmq地址
spring.rabbitmq.host=192.168.10.10
# rabbitmq端口
spring.rabbitmq.port=5672
# 虚拟主机
spring.rabbitmq.virtual-host=/
# 账号密码默认guest-这边我是新建了个超级管理员的用户
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin
# 开启发送端确认
spring.rabbitmq.publisher-confirms=true
# 开启发送端消息抵达队列的确认
spring.rabbitmq.publisher-returns=true
# 只要抵达队列,以异步发送优先回调我们这个return/confirm/i
spring.rabbitmq.template.mandatory=true
# 手动ack消息
spring.rabbitmq.listener.simple.acknowledge-mode=manual


#======================↓↓↓↓mysql配置-开始↓↓↓↓=================================
# 配置数据源 mysql8
spring.datasource.url=jdbc:mysql://127.0.0.1:3306/mq_test?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=utf8&allowMultiQueries=true&rewriteBatchedStatements=true
spring.datasource.username=root
spring.datasource.password=root
spring.datasource.driver-class-name=com.mysql.jdbc.Driver



#======================↓↓↓↓mybatis配置-开始↓↓↓↓=================================
# 映射文件xml的位置
mybatis.mapper-locations=classpath:mapper/*Mapper.xml
# 使全局的映射器启用或禁用缓存
mybatis.configuration.cache-enabled=true
# 全局启用或禁用延迟加载。当禁用时,所有关联对象都会即时加载
mybatis.configuration.lazy-loading-enabled=true
# 当启用时,有延迟加载属性的对象在被调用时将会完全加载任意属性。否则,每种属性将会按需要加载。
mybatis.configuration.aggressive-lazy-loading=false
# 是否允许单条sql 返回多个数据集 (取决于驱动的兼容性) 默认:true
mybatis.configuration.multiple-result-sets-enabled=true
# 是否可以使用列的别名 (取决于驱动的兼容性) 默认:true
mybatis.configuration.use-column-label=true
# 指定MyBatis如何自动映射 数据基表的列 NONE:不隐射 PARTIAL:部分 FULL:全部
mybatis.configuration.auto-mapping-behavior=partial
# 这是默认的执行类型 (SIMPLE: 简单; REUSE: 执行器可能重复使用prepared statements语句;BATCH: 执行器可以重复执行语句和批量更新)
mybatis.configuration.default-executor-type=simple
# sql执行时间超时时间
mybatis.configuration.default-statement-timeout=25
# 允许在嵌套语句中使用分页(RowBounds)。如果允许使用则设置为false。
mybatis.configuration.safe-row-bounds-enabled=false
# 是否使用驼峰命名法转换字段
mybatis.configuration.map-underscore-to-camel-case=false
# 设置本地缓存范围 session:就会有数据的共享 statement:语句范围 (这样就不会有数据的共享 ) 默认:session
mybatis.configuration.local-cache-scope=session
# 设置当JDBC类型为空时,某些驱动程序 要指定值,默认:OTHER。当写入null值的字段时,部分数据库需要指定null的数据类型.mysql不用设置。【oracle需要设置】对应org.apache.ibatis.type.JdbcType的枚举值
mybatis.configuration.jdbc-type-for-null=other
# 指定对象哪个的方法触发一次延迟加载
mybatis.configuration.lazy-load-trigger-methods=equals,clone,hashCode,toString
# [默认false,推荐使用true] 如果数据为空的字段,则该字段省略不显示,查询数据映射数据类型使用的是Map。当字段值为null时,mybatis映射返回字段的时候会忽略,而原接口是null值也返回,为了兼容,需要设置不忽略null字段
mybatis.configuration.call-setters-on-nulls=true
# mybatis指定日志输出的前缀
mybatis.configuration.log-prefix=mybatis_
# 打印sql-开发时启用,会影响性能【使用boot默认的logback】
mybatis.configuration.log-impl=org.apache.ibatis.logging.stdout.StdOutImpl

# 将java类型(javaType)转化为jdbc类型(jdbcType),或者将jdbc类型(jdbcType)转化为java类型(javaType)。【这个参数还没搞得很明白】
# mybatis.type-handlers-package=com.mapper.typehandler
# 允许JDBC生成主键。需要驱动器支持。如果设为了true,这个设置将强制使用被生成的主键,有一些驱动器不兼容不过仍然可以执行。 默认:false
# mybatis.configuration.use-generated-keys=false
# 指定代理工厂:mybatis 实现2个:CglibProxyFactory和JavassistProxyFactory(默认)
# mybatis.configuration.proxy-factory=CGLIB


#======================↓↓↓↓pagehelper配置-开始↓↓↓↓=================================
# 分页
pagehelper.helperDialect=mysql
pagehelper.reasonable=true
pagehelper.supportMethodsArguments=true
pagehelper.params=count=countSql
1.3、配置类-序列化机制和定制RabbitTemplate
    rabbitmq默认采用jdk的序列化机制,我们改成json定制RabbitTemplate-开启ack模式

MyRabbitConfig01

package sqy.config;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;


@Configuration
public class MyRabbitConfig01 {

    RabbitTemplate rabbitTemplate;

    
    @Bean
    public MessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();
    }


    @Primary//注入容器的意思-和@Autowired意思差不多,但是Primary优先级更高
    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        this.rabbitTemplate = rabbitTemplate;
        rabbitTemplate.setMessageConverter(messageConverter());
        initRabbitTemplate();
        return rabbitTemplate;
    }


    
//    @PostConstruct //MyRabbitConfig对象创建完成以后,执行这个方法
    public void initRabbitTemplate(){
        //设置确认回调
        rabbitTemplate.set/confirm/iCallback(new RabbitTemplate./confirm/iCallback() {
            
            @Override
            public void /confirm/i(CorrelationData correlationData, boolean ack, String cause) {
                
                //服务器收到了;
                //修改消息的状态
                System.out.println("服务器收到...correlationData["+correlationData+"]==>ack["+ack+"]==>cause["+cause+"]");
            }
        });

        //设置消息抵达队列的确认回调
        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                //接收不到消息这边会接收到错误信息。【可以结合数据库-修改数据库当前消息的状态->记录错误然后重试】
                System.out.println("Fail Message["+message+"]==>replyCode["+replyCode+"]==>replyText["+replyText+"]===>exchange["+exchange+"]===>routingKey["+routingKey+"]");
            }
        });
    }


}


三、全局常量类和实体类 1.1、常量类 MqConstant
package sqy.constant;


public class MqConstant {

    
    public static final String C01_EXCHANGE_NAME = "hello-exchange";
    public static final String C01_QUEUE_NAME = "hello-queue";
    public static final String C01_ROUTING_KEY = "helloMQ.java";
    public static final String C01_ERROR_ROUTING_KEY = "helloMQ.java.golang";


    //------------------------------------

    
    public static final String C02_TOPIC_EXCHANGE = "test.topic.exchange";
    public static final String C02_DELAY_QUEUE_A = "test.delay.queue.a";
    public static final String C02_RELEASE_QUEUE_B = "test.release.queue.b";
    public static final String C02_DELAY_ROUTINGKEY = "test.delay.routingKey";
    public static final String C02_RELEASE_ROUTINGKEY = "test.release.routingKey";
    public static final Integer C02_MEG_TTL = 1 * 60 * 1000;//1分钟-60000毫秒

    //------------------------------------
    
    public static final String C03_TOPIC_EXCHANGE = "demo.topic.exchange";
    //....

}

1.2、实体类

Student

package sqy.pojo;
import lombok.Data;
import java.util.Date;

@Data
public class Student  {
    private String studentNo;
    private String studentName;
    private Date studentBirth;
}

Teacher

package sqy.pojo;
import lombok.Data;
import java.util.Date;


@Data
public class Teacher {
    private String teacherNo;
    private String teacherName;
    private Date teacherBirth;
}

MqMessage

package sqy.pojo.demo;
import lombok.Data;
import lombok.ToString;
import java.util.Date;


@Data
@ToString
public class MqMessage {

    private String messageId;//消息的唯一id
    private String content;//消息内容
    private String toExchange;//交换机
    private String routingKey;//路由键
    private String classType;//实体类类型
    private String messageStatus;//0-新建 1-已发送 2-错误抵达 3-已抵达
    private Date createTime;//创建时间
    private Date updateTime;//更新时间
}

Order

package sqy.pojo.demo;
import lombok.Data;
import lombok.ToString;


@Data
@ToString
public class Order {
    private String orderId;//订单号
    private String orderUserId;//用户id
    private String orderStatus;//订单状态【0->待付款;1->待发货;2->已发货;3->已完成;4->已关闭;5->无效订单】
}

OrderWareTask

package sqy.pojo.demo;

import lombok.Data;
import lombok.ToString;


@Data
@ToString
public class OrderWareTask {
    private int id;//自增主键
    private String orderId;//订单号-关联订单表
    private String productId;//商品id-关联商品
    private String productNum;//商品数量
    private String wareId;//仓库id
    private String lockStatus;//任务状态-1锁定 2解锁  [数据库锁-乐观锁]
}

Product

package sqy.pojo.demo;
import lombok.Data;
import lombok.ToString;


@Data
@ToString
public class Product {
    private String productId;
    private String productName;
    private String productPrice;
}

Warehouse

package sqy.pojo.demo;
import lombok.Data;
import lombok.ToString;
import java.math.BigDecimal;


@Data
@ToString
public class Warehouse {
    private String wareId;//仓库id
    private String wareName;//仓库名称
    private String wareAddr;//地址
    private String purchaseProductId;//采购物品id
    private String purchaseProductName;//采购物品name
    private BigDecimal purchaseProductPrice;//采购价格
}


四、测试案例一 1.1、创建和发送
package sqy.controller;


import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.lang.Nullable;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import sqy.constant.MqConstant;
import sqy.pojo.Student;
import sqy.pojo.Teacher;

import java.util.Date;
import java.util.UUID;


@Slf4j
@RestController
public class Test01CreateController {

    @Autowired
    AmqpAdmin amqpAdmin;

    @Autowired
    RabbitTemplate rabbitTemplate;

    


    
    //localhost:8080/testCreateMQ
    @GetMapping("/testCreateMQ")
    public String testCreateMQ() {
        //也可以用 BindingBuilder 、ExchangeBuilder、QueueBuilder来创建

        //[创建-声明]交换机
        DirectExchange directExchange = new DirectExchange(MqConstant.C01_EXCHANGE_NAME, true, false);
        amqpAdmin.declareExchange(directExchange);
        log.info("Exchange[{}]创建成功", MqConstant.C01_EXCHANGE_NAME);

        //[创建-声明]队列
        Queue queue = new Queue(MqConstant.C01_QUEUE_NAME, true, false, false);
        amqpAdmin.declareQueue(queue);
        log.info("Queue[{}]创建成功", MqConstant.C01_QUEUE_NAME);

        //将exchange指定的交换机和destination目的地进行绑定,使用routingKey作为指定的路由键【绑定】
        Binding binding = new Binding(MqConstant.C01_QUEUE_NAME,
                Binding.DestinationType.QUEUE,
                MqConstant.C01_EXCHANGE_NAME,
                MqConstant.C01_ROUTING_KEY, null);
        amqpAdmin.declareBinding(binding);
        log.info("Binding创建成功-路由key为:[{}]", MqConstant.C01_ROUTING_KEY);
        return "创建交换机-队列成功,并指定 <路由key> 绑定";
    }

    
    @GetMapping("/sendMq")
    public String sendMq(@RequestParam(value = "num", defaultValue = "20") Integer num) {
        for (int i = 0; i < num; i++) {
            if (i<=10){
                if (i % 2 == 0) {
                    Student student = new Student();
                    student.setStudentNo(i+"号");
                    student.setStudentName("学生" + i);
                    student.setStudentBirth(new Date());
                    //public void convertAndSend(String exchange【交换机】, String routingKey【路由key】, Object object【消息】, @Nullable CorrelationData correlationData)【消息的唯一标识<推荐使用分布式id>】
                    rabbitTemplate.convertAndSend(MqConstant.C01_EXCHANGE_NAME, MqConstant.C01_ROUTING_KEY, student, new CorrelationData("学生"+UUID.randomUUID().toString().substring(0,7)));
                } else {
                    Teacher teacher=new Teacher();
                    teacher.setTeacherNo(i+"号");
                    teacher.setTeacherName("教师"+i);
                    //使用正常的路由键
                    rabbitTemplate.convertAndSend(MqConstant.C01_EXCHANGE_NAME, MqConstant.C01_ROUTING_KEY, teacher, new CorrelationData("教师"+UUID.randomUUID().toString().substring(0,7)));
                    //使用错误的路由键-模拟进入队列失败
                    //rabbitTemplate.convertAndSend(MqConstant.C01_EXCHANGE_NAME, MqConstant.C01_ERROR_ROUTING_KEY, teacher, new CorrelationData("教师"+UUID.randomUUID().toString().substring(0,7)));
                }
            }else {
                if (i % 2 != 0) {
                    Student student = new Student();
                    student.setStudentNo(i+"号");
                    student.setStudentName("学生" + i);
                    student.setStudentBirth(new Date());
                    //交换机、路由key、消息、消息的唯一标识<推荐使用分布式id>
                    rabbitTemplate.convertAndSend(MqConstant.C01_EXCHANGE_NAME, MqConstant.C01_ROUTING_KEY, student, new CorrelationData("学生"+UUID.randomUUID().toString().substring(0,7)));
                } else {
                    Teacher teacher=new Teacher();
                    teacher.setTeacherNo(i+"号");
                    teacher.setTeacherName("教师"+i);
                    //使用正常的路由键-
                    rabbitTemplate.convertAndSend(MqConstant.C01_EXCHANGE_NAME, MqConstant.C01_ROUTING_KEY, teacher, new CorrelationData("教师"+UUID.randomUUID().toString().substring(0,7)));
                    //使用错误的路由键-模拟进入队列失败
                    //rabbitTemplate.convertAndSend(MqConstant.C01_EXCHANGE_NAME, MqConstant.C01_ERROR_ROUTING_KEY, teacher, new CorrelationData("教师"+UUID.randomUUID().toString().substring(0,7)));
                }
            }
        }

        return "发送20条消息完成";
    }
}

1.2、监听消费
package sqy.service;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
import sqy.constant.MqConstant;
import sqy.pojo.Student;
import sqy.pojo.Teacher;


//监听队列
@RabbitListener(queues = {MqConstant.C01_QUEUE_NAME})
@Service
public class Test01Listener {


    
    @RabbitHandler
    public void receiveMessage01(Message message,
                                 Student student,
                                 Channel channel) throws InterruptedException {


        byte[] body = message.getBody();//消息体
        MessageProperties properties = message.getMessageProperties();//消息头属性信息
        Thread.sleep(2000);//模拟业务耗时
//        System.out.println("接受学生消息=>" + student);
        //channel内按顺序自增的。
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        System.out.println("信道自增交货标签:" + deliveryTag);
        //todo  basicAck和basicNack实际场景不会一起使用
        try {
            if (deliveryTag % 2 == 0) {
                //模拟系统中断,debug停止,后面代码还是会执行。所以模拟的话也可以用kill后台进程
                if (deliveryTag > 6) {
                    System.out.println("开始闪断-网络中断.........");
                    System.exit(1);//模拟项目突然挂了,队列里面消息的情况
                }
                //信道里面-手动签收  basicAck(long deliveryTag[消息的id], boolean multiple[是否批量签收-以可以理解为签收当前>=deliveryTag的消息])
                channel.basicAck(deliveryTag, false);
                System.out.println("学生-签收.." + student.getStudentName());
            } else {
                
//                channel.basicReject();//不签收法1
                channel.basicNack(deliveryTag, false, false);//不签收法2--参数多用这个
                System.out.println("学生--没有签收.." + student.getStudentName());
            }
        } catch (Exception e) {
            //网络中断
        }

    }

    
    @RabbitHandler
    public void receiveMessage02(Message message, Teacher teacher, Channel channel) throws Exception {
        Thread.sleep(2000);//模拟业务耗时
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        //System.out.println("接收到消息..." + teacher);
        //签收
        channel.basicAck(deliveryTag, false);
        System.out.println("教师-签收了=>" + teacher.getTeacherName());
    }

}

三、延时队列测试 1.1、采用@bean创建

MyRabbitConfig02

package sqy.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import sqy.constant.MqConstant;

import java.util.HashMap;
import java.util.Map;


@Configuration
public class MyRabbitConfig02 {

    

    


    //发布订阅模式的交换机
    @Bean
    public Exchange testTopicExchange() {
        //String name[名称], boolean durable[持久化], boolean autoDelete[自动删除], Map arguments[其他参数]
        return new TopicExchange(MqConstant.C02_TOPIC_EXCHANGE, true, false);
    }


    //延迟队列a,存放有效期为1分钟的消息
    @Bean
    public Queue testDelayQueueA() {
        Map arguments = new HashMap<>();
        
        arguments.put("x-dead-letter-exchange", MqConstant.C02_TOPIC_EXCHANGE);
        arguments.put("x-dead-letter-routing-key", MqConstant.C02_RELEASE_ROUTINGKEY);
        arguments.put("x-message-ttl", MqConstant.C02_MEG_TTL);
        //String name[名称], boolean durable[持久化], boolean exclusive[独占], boolean autoDelete[自动删除], Map arguments[其他参数]
        Queue queue = new Queue(MqConstant.C02_DELAY_QUEUE_A, true, false, false, arguments);
        return queue;
    }

    //释放队列b,存放过期消息的队列-客户端监听-用于释放消息
    @Bean
    public Queue testReleaseQueueB() {
        Queue queue = new Queue(MqConstant.C02_RELEASE_QUEUE_B, true, false, false);
        return queue;
    }


    //绑定队列a和交换机-指定路由键
    @Bean
    public Binding testDelayQueueABingding() {
        //String destination[目的地], DestinationType destinationType[枚举-目的地类型], String exchange[交换机名称],
        //      String routingKey[路由键], Map arguments[其他参数]
        return new Binding(MqConstant.C02_DELAY_QUEUE_A,
                Binding.DestinationType.QUEUE,
                MqConstant.C02_TOPIC_EXCHANGE,
                MqConstant.C02_DELAY_ROUTINGKEY,
                null);
    }

    //绑定队列b和交换机-指定路由键
    @Bean
    public Binding testReleaseQueueBBingding() {
        return new Binding(MqConstant.C02_RELEASE_QUEUE_B,
                Binding.DestinationType.QUEUE,
                MqConstant.C02_TOPIC_EXCHANGE,
                MqConstant.C02_RELEASE_ROUTINGKEY,
                null);
    }
}

1.2、发送消息
package sqy.controller;
import com.alibaba.fastjson.JSON;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;


@RestController
public class Test02DelayController {

    @Autowired
    RabbitTemplate rabbitTemplate;

    
    //http://127.0.0.1:8080/sendMsgToQueueA
    @GetMapping("/sendMsgToQueueA")
    public String sendMsgToQueueA() {
        rabbitTemplate.convertAndSend("test.topic.exchange", "test.delay.routingKey", "这个一条存活时间为1分钟的消息");
        return "向队列a发送1条消息成功";
    }
}

1.3、监听消费
package sqy.service;

import com.alibaba.fastjson.JSON;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
import sqy.constant.MqConstant;



@Service
public class Test02DelayMqListener {

    @RabbitListener(queues = {MqConstant.C02_RELEASE_QUEUE_B})
    public void receiveMessage02(Message message, Channel channel) throws Exception {
        //System.out.println(JSON.toJSonString(message.getBody()));
        System.out.println("这个已过期的被转交给队列b的消息:==>" + message);
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        //签收
        channel.basicAck(deliveryTag, false);
        System.out.println("释放了消息");
    }
}


四、订单库存案例(消息事务) 1.1、创建
package sqy.config;
import org.springframework.context.annotation.Configuration;



@Configuration
public class MyRabbitConfig03 {
    



}

后面补充…


上一篇:rabbitMQ介绍与安装说明(一)

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/771959.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号