栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

消息队列专题(实战篇):SpringBoot 集成 RabbitMQ 入门级实战

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

消息队列专题(实战篇):SpringBoot 集成 RabbitMQ 入门级实战

最近的学习劲头越来越足了,昨晚刚写完了 RabbitMQ 介绍与环境搭建篇,今天就开始更新实战篇了,想起原来同事对我的形容:生产队的驴都没你勤快,哈哈哈哈哈哈,说得可真对!

言归正传,本篇博客参考了慕课网的 RabbitMQ 课程,在此基础上进行了简化,提供了一个最简单的无存储功能的消息队列实现,课程链接如下,感兴趣的小伙伴可自行进行学习:

https://www.imooc.com/video/17845


配置消息队列

打开并登录消息队列的管理界面,首先来配置 exchange:

图中有三个参数,分别是 exchange 的名称,类型和是否持久化。

之后配置队列:

最后来配置 exchange 和 queue 之间的绑定:

完成基本配置之后,就可以开始写代码了。


我们知道,在消息队列中,生产者是消息的提供方,消费者是消息的消费方,所以我们需要创建两个项目,分别是 生产者项目 rabbitmq-producer 和 消费者项目 rabbitmq-consumer。

创建生产者项目

生产者项目 rabbitmq-producer 目录结构如下:

1. 创建项目结构

在 java 文件夹下新建 com.rabbitmq 包,然后在包中建立子包:

entity:用于存放实体对象;
producer:用于实现生产者生产消息的服务;

2. 引入依赖和配置

需要添加的依赖项:



    4.0.0

    org.example
    rabbitmq-producer
    1.0-SNAPSHOT
    
        org.springframework.boot
        spring-boot-starter-parent
        2.1.5.RELEASE
         
    
    
        
            org.springframework.boot
            spring-boot-starter
        
        
            org.springframework.boot
            spring-boot-starter-amqp
        
        
            org.springframework.boot
            spring-boot-starter-web
        

        
        
            org.apache.commons
            commons-lang3
            3.8.1
        
        
            commons-io
            commons-io
            2.4
        
        
            com.alibaba
            fastjson
            1.2.49
        
        
            org.springframework.boot
            spring-boot-starter-test
            test
        
    


系统配置:

server:
  servlet:
    context-path: /
  port: 8605
spring:
  application:
    name: rabbitmq-producer
  rabbitmq:
    host: 服务器IP
    port: 5672
    username: admin
    password: admin
    virtual-host: /
    publisher-/confirm/is: true #/confirm/i模式
    publisher-returns: true #return机制
    template:
      mandatory: true #与return机制结合配置次属性
    connection-timeout: 5000

3. 创建实体类

Order.java

package com.rabbitmq.entity;

import java.io.Serializable;

// 订单实体类
public class Order implements Serializable {
    // 订单id
    private String id;
    // 订单编号
    private String code;
    // 消息发送的唯一标识id
    private String messageId;

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public String getCode() {
        return code;
    }

    public void setCode(String code) {
        this.code = code;
    }

    public String getMessageId() {
        return messageId;
    }

    public void setMessageId(String messageId) {
        this.messageId = messageId;
    }
}

由于消息队列中的消息需要在网络中传输,所以这个实体类必须实现序列化接口;

4.创建生产消息服务类

OrderSender.java

package com.rabbitmq.producer;

import com.rabbitmq.entity.Order;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class OrderSender {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendOrder(Order order) throws Exception{
        CorrelationData correlationData = new CorrelationData();
        correlationData.setId(order.getMessageId());
        rabbitTemplate.convertAndSend("order-exchange","order.abcd", order, correlationData);
    }
}

这个类是非常重要的,通过自动注入用于操作消息队列的模板实例,调用它的 convertAndSend 方法进行消息的发送。

这里的 order-exchange 是我们提前设置好的 exchange 名称,order.abcd 可以匹配上我们设置的 Routing Key,可以写个测试用例来测试下消息的发布。

5.测试消息发布

在 test 包中新建子包,新增开发类 ApplicationTest.java:

ApplicationTest.java

package com.rabbitmq;

import com.rabbitmq.entity.Order;
import com.rabbitmq.producer.OrderSender;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import java.util.UUID;

@RunWith(SpringRunner.class)
@SpringBootTest
public class ApplicationTest {
    @Autowired
    OrderSender orderSender;

    @Test
    public void test() throws Exception {
        Order order = new Order();
        order.setId("1");
        order.setCode("订单编号001");
        order.setMessageId(System.currentTimeMillis() + "_" + UUID.randomUUID());
        orderSender.sendOrder(order);
    }

}

测试类的代码非常简单,就是新建一个订单并设置属性,之后调用消息发布方法将消息存储到消息队列。

测试通过后,此时在管理面板可以查看到消息队列里面出现了一条未消费消息:

既然消息队列里面已经有了消息,接下来我们来开发消费者项目进行消费。


创建消费者项目

新建消费者项目 rabbitmq-consumer,结构如下:

其中的实体类 Order.java 保持不变,但是需要修改配置文件。

1.修改配置文件

pom.xml 和生产者项目相比,需要修改端口号并新增配置项:

新增配置项如下:

	listener:
      simple:
        concurrency: 5 #初始并发数
        max-concurrency: 10 #最大并发数
        acknowledge-mode: manual #手动签收模式
        prefetch: 1 #限流处理(同一时间只能有一条消息,消费完了才能处理下一条)

2.开发消息消费类

OrderConsumer.java

package com.rabbitmq.consumer;

import com.rabbitmq.client.Channel;
import com.rabbitmq.entity.Order;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.Map;

@Component
public class OrderConsumer {

    @RabbitListener(
            bindings = @QueueBinding(
                    value = @Queue(value = "order-queue", durable = "true"),
                    exchange = @Exchange(name = "order-exchange", durable = "true", type = "topic"),
                    key = "order.#"
            )
    )
    @RabbitHandler
    public void onOrderMessage(@Payload Order order, @Headers Map headers, Channel channel) throws IOException {
        // 消费者进行消费操作
        System.out.println("-------------收到消息,开始消费------------");
        System.out.println("订单编号为:" + order.getCode());
        Long deliverTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
        // 由于我们开启了手动确认,所以这里消费者必须进行确认操作
        channel.basicAck(deliverTag,false);
    }
}

我们之前在配置文件里面配置了手动确认消息,所以需要传入一个 Channel 类型的对象去进行确认,一定要注意这个 Channel 并不是 NIO 中的,而是 com.rabbitmq.client 包下的,不要搞错。

在处理完成正常的业务逻辑后,需要进行手动确认:

channel.basicAck(deliverTag,false);

还有一个需要注意的地方是,@RabbitListener 不止实现了消费者和对应的消息队列、exchange 以及路由关键字的绑定,同时还具有默认创建的功能。也就是说,如果我们之前没有提前在控制台手动创建 exchange,queue,routing key,代码可以自动为我们创建。

3.运行程序进行消息消费

运行消费者项目,可以看到输出了消息消费的记录:

查看控制台界面,发现消息队列中的消息已经被消费了:

至此,一个最简单的消息队列实战就完成了,不过在实际生产环境中,这样的实现是远远不够的,我们还需要考虑消息的可靠性投递与消费,以及保证传输过程的可靠性,不然消息丢了可是要被祭天的,今天写不动了,下一篇博客再见!

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/605569.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号