栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

RabbitMQ 学习(二)—— 在 spring 和 springboot 中使用

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

RabbitMQ 学习(二)—— 在 spring 和 springboot 中使用

文章目录
  • 前言
  • 一、Spring 整合 RabbitMQ
    • 1.1 生产者
      • 1.1.1 pom.xml
      • 1.1.2 RabbitMQ配置信息
      • 1.1.3 整合配置文件
      • 1.1.4 测试类
      • 1.1.5 测试结果
        • 1.1.5.1 交换机、队列
        • 1.1.5.2 队列中的消息
    • 1.2 消费者
      • 1.2.1 pom.xml
      • 1.2.2 RabbitMQ 配置
      • 1.2.3 spring-rabbitmq配置
      • 1.2.4 测试类
      • 1.2.5 监听类
      • 1.2.6 测试结果
  • 二、springboot 整合 RabbitMQ
    • 2.1 介绍
    • 2.2 生产者工程
      • 2.2.1 pom.xml
      • 2.2.2 RabbitMQ配置
      • 2.2.3 配置类
      • 2.2.4 启动类
      • 2.2.5 测试类
      • 2.2.6 测试结果
    • 2.3 消费者工程
      • 2.3.1 pom.xml
      • 2.3.2 RabbitMQ 配置
      • 2.3.3 监听类
      • 2.3.4 启动类
      • 2.3.5 测试类
      • 2.3.6 测试结果


前言 记录我在 Spring 和 Springboot 中 整合使用 RabbitMQ 的项目示例。
一、Spring 整合 RabbitMQ

spring 整合 RabbitMQ 就是把创建队列、创建交换机、绑定路由 key 等等的所有公共操作都交给spring容器来进行,我们的代码只进行消息的产生发送和消费。
我们把所有的配置都放在 整合配置文件中 进行配置。没有业务代码,最后通过测试类,模拟发送消息。

1.1 生产者 1.1.1 pom.xml


    4.0.0

    org.example
    spring-rabbitmq-producer
    1.0-SNAPSHOT

    
    
        
            org.springframework
            spring-context
            5.1.7.RELEASE
        
        
            org.springframework.amqp
            spring-rabbit
            2.1.8.RELEASE
        
        
            junit
            junit
            4.12
        
        
            org.springframework
            spring-test
            5.1.7.RELEASE
        

    


1.1.2 RabbitMQ配置信息
rabbitmq.host=192.168.191.130
rabbitmq.port=5672
rabbitmq.username=admin
rabbitmq.password=admin
rabbitmq.virtual-host=/xzk
1.1.3 整合配置文件



    
    

    
    

    
    

    
    
    

    
    
    
    
    
    
        
            
            
        
    

    
    
    
    
  	
    
        
            
            
            
        
    

    
    

1.1.4 测试类
package com.kaikeba.spring.rabbitmq;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;


@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring/spring-rabbitmq.xml") // classpath: 与后面的路径之间不能有空格 classpath: spring/spring-rabbitmq.xml 就会报错
public class SpringRabbitTest {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    
    @Test
    public void simpleQueueTest() {
        // 路由键与队列同名
        rabbitTemplate.convertAndSend("spring_rabbitmq_simple_queue", "直接发送到队列中的消息!!!");
    }

    
    @Test
    public void  fanoutQueueTest() {
        rabbitTemplate.convertAndSend("spring_rabbitmq_fanout_exchange", "", "fanout 广播模式的消息,绑定到这个交换机上的队列都能收到!!");
    }


    
    @Test
    public void  topicQueueTest() {
        rabbitTemplate.convertAndSend("spring_rabbitmq_topic_exchange", "sjq.first", "routing key is sjq.first  send message test");
        rabbitTemplate.convertAndSend("spring_rabbitmq_topic_exchange", "sjq.second.lll","routing key is sjq.second.lll .....");
        rabbitTemplate.convertAndSend("spring_rabbitmq_topic_exchange","xzk.cn.com","路由key是xzk.xn.com,jiandancieh");
    }

}
1.1.5 测试结果 1.1.5.1 交换机、队列

spring容器启动的时候,就会将配置好的交换机和队列都生成出来。

  • 交换机
  • 队列
  • 交换机队列绑定关系

1.1.5.2 队列中的消息
  • 队列
  • 广播模式
  • 通配符模式

1.2 消费者 1.2.1 pom.xml


    4.0.0

    org.example
    spring-rabbitmq-consumer
    1.0-SNAPSHOT

    
        
            org.springframework
            spring-context
            5.1.7.RELEASE
        
        
            org.springframework.amqp
            spring-rabbit
            2.1.8.RELEASE
        
        
            junit
            junit
            4.12
        
        
            org.springframework
            spring-test
            5.1.7.RELEASE
        
    


1.2.2 RabbitMQ 配置
rabbitmq.host=192.168.191.130
rabbitmq.port=5672
rabbitmq.username=admin
rabbitmq.password=admin
rabbitmq.virtual-host=/xzk
1.2.3 spring-rabbitmq配置



    
    

    
    

    
    

    
    
    
    
    
    
    
    
    
        
        
        
        
        
        
    

1.2.4 测试类

消费者的测试类,目的只是启动 spring 容器。让对应队列的监听器开始工作,所以我们写个死循环。

package com.kaikeba.spring.rabbit.listener;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring/spring-rabbitmq.xml")
public class AllTest {

    
    @Test
    public void test() {
        while (true) {

        }
    }
}

1.2.5 监听类

我们每个监听类除了类名之外,完全相同,所以我们只放一个监听类的代码:

package com.kaikeba.spring.rabbit.listener;

import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;

import java.io.UnsupportedEncodingException;


public class SpringRabbitSimpleQueueListener implements MessageListener {
    public void onMessage(Message message) {
        try {
            String msg = new String(message.getBody(), "UTF-8");
            System.out.println("接收路由名为:" + message.getMessageProperties().getReceivedExchange());
            System.out.println("路由 key 为:" + message.getMessageProperties().getReceivedRoutingKey());
            System.out.println("队列名为:" + message.getMessageProperties().getConsumerQueue());
            System.out.println("接收的消息为:" + msg);
            System.out.println("==================================================================================");

        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }
    }
}

1.2.6 测试结果

控制台查看监听器的输出:

二、springboot 整合 RabbitMQ 2.1 介绍

在 Spring 项目中,可以使用 Spring-Rabbit 去操作 RabbitMQ
尤其是在 Springboot 项目中只需要引入对应的 amqp 启动依赖就可以使用, 方便的使用RabbitTemplate发 送消息,使用注解接收消息。

  • 生产者工程:
  1. 在 yml 或 properties 配置文件配置 RabbitMQ 的相关信息;
  2. 在生产只工程中编写配置类,创建交换机、队列 并 进行绑定;
  3. 注入 RabbitTemplate 对象, 通过RabbitTemplate对象发送消息到交换机 。

消费者工程:

  1. 在 yml 或 properties 配置文件配置 RabbitMQ 的相关信息;
  2. 创建消息处理类,用于接收队列中的消息并进行处理 。
2.2 生产者工程 2.2.1 pom.xml


    4.0.0

    
        org.springframework.boot
        spring-boot-starter-parent
        2.1.4.RELEASE
    

    org.example
    springboot-rabbitmq-producer
    1.0-SNAPSHOT

    
        
            org.springframework.boot
            spring-boot-starter-amqp
        
        
            org.springframework.boot
            spring-boot-starter-test
        
    

2.2.2 RabbitMQ配置
spring.rabbitmq.host=192.168.191.130
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin
spring.rabbitmq.virtual-host=/xzk
2.2.3 配置类
package com.kaikeba.springboot.rabbitmq.configuration;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


@Configuration
public class RabbitMQConfiguration {
    //交换机名称
    public static final String ITEM_TOPIC_EXCHANGE = "springboot_item_topic_exchange";
    //队列名称
    public static final String ITEM_QUEUE = "springboot_item_queue";

    
    @Bean("itemTopicExchange")
    public Exchange topicExchange() {
        return ExchangeBuilder.topicExchange(ITEM_TOPIC_EXCHANGE).durable(true).build();
    }

    
    @Bean("itemQueue")
    public Queue itemQueue() {
        return QueueBuilder.durable(ITEM_QUEUE).build();
    }

    
    @Bean
    public Binding itemQueueExchange(@Qualifier("itemQueue") Queue queue, @Qualifier("itemTopicExchange") Exchange itemTopicExchange) {
        return BindingBuilder.bind(queue).to(itemTopicExchange).with("item.#").noargs();
    }
}

2.2.4 启动类
package com.kaikeba.springboot.rabbitmq;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class SpringBootRabbitMQProducerApplication {
    public static void main(String[] args) {
        SpringApplication.run(SpringBootRabbitMQProducerApplication.class, args);
    }
}

2.2.5 测试类
package com.kaikeba.springboot.rabbitmq;

import com.kaikeba.springboot.rabbitmq.configuration.RabbitMQConfiguration;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitMQProducerTest {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void test() {
        rabbitTemplate.convertAndSend(RabbitMQConfiguration.ITEM_TOPIC_EXCHANGE, "item.insert", "商品新增,routing key 为item.insert");
        rabbitTemplate.convertAndSend(RabbitMQConfiguration.ITEM_TOPIC_EXCHANGE, "item.update", "商品修改,routing key 为item.update");
        rabbitTemplate.convertAndSend(RabbitMQConfiguration.ITEM_TOPIC_EXCHANGE, "item.delete", "商品删除,routing key 为item.delete");
    }
}

2.2.6 测试结果
  • 交换机

  • 绑定关系

  • 队列

  • 消息

2.3 消费者工程 2.3.1 pom.xml


    4.0.0

    
        org.springframework.boot
        spring-boot-starter-parent
        2.1.4.RELEASE
    

    org.example
    springboot-rabbitmq-consumer
    1.0-SNAPSHOT

    
        
            org.springframework.boot
            spring-boot-starter-amqp
        
        
            org.springframework.boot
            spring-boot-starter-test
        
    

2.3.2 RabbitMQ 配置
spring.rabbitmq.host=192.168.191.130
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin
spring.rabbitmq.virtual-host=/xzk
2.3.3 监听类
package com.kaikeba.springboot.rabbitmq.listener;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class QueueListener  {

    @RabbitListener(queues = "springboot_item_queue")
    public void  listenQueue(String message) {
        System.out.println("消费者接收到的消息为:" + message);
    }
}

2.3.4 启动类
package com.kaikeba.springboot.rabbitmq;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class SpringBootRabbitMQConsumerApplication {
    public static void main(String[] args) {
        SpringApplication.run(SpringBootRabbitMQConsumerApplication.class, args);
    }
}

2.3.5 测试类

测试类的目的是启动项目,使得监听类正常工作,所以没有逻辑代码:

package com.kaikeba.springboot.rabbitmq;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringbootRabbitMQConsumerTest {

    @Test
    public void test() {
        while (true) {

        }
    }
}

2.3.6 测试结果

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/351674.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号