栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

【rabbitmq】springboot整合rabbitmq

【rabbitmq】springboot整合rabbitmq

源码地址

https://gitee.com/shen-chuhao/walker.git

前提 安装rabbitmq

可以参考以前的文章进行安装
使用docker-compose安装rabbitmq
windows安装rabbitMq

概念

生产者
这里主要是指生产信息的一方
消费者
消费信息的一方
队列
用来存储信息的存储结构

交换机

这个主要用于决定消息推送和消息接收的模式
主要有以下几种交换机:

Direct Exchange 直连交换机Fanout Exchange 扇形交换机Topic Exchange 主题交换机Header Exchange 头交换机Default Exchange 默认交换机Dead Letter Exchange 死信交换机 直连交换机

直连型交换机,根据消息携带的路由键将消息投递给对应队列。
大概就是说你拿着什么样的证书A,就可以从队列A中拿去对应的数据

主题交换机

主题交换机,这个交换机其实跟直连交换机流程差不多,但是它的特点就是在它的路由键和绑定键之间是有规则的。
规则如下:

(星号) 用来表示一个单词 (必须出现的) (井号) 用来表示任意数量(零个或多个)单词

例如:
队列A的绑定路由键为 *.topic
队列B的绑定路由键为 topic.#
消息A的路由键: cat.topic
消息B的路由键: topic.cat

那么消息A就会进入队列A
消息B就会进入队列B

直连交换机 生产者-发送信息

创建一个项目 代表生产者的项目

1、 配置文件
server:
  port: 8000
  

spring:
  application:
    name: rabbitmq-provider # 生产者  
    
  rabbitmq:
    host: 127.0.0.1
    port: 5672  # 这里的端口要注意,在浏览器上访问15672可以访问,但是在springboot中要设置访问5672,而不是15672,否则会连接失败
    username: guest
    password: guest

2、配置application.yml

这里测试的是一个直连交换机的配置
步骤主要有:
1、创建队列
2、创建交换机
3、绑定队列和交换机

package com.walker.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;




@Configuration
public class DirectRabbitConfig {

    public final static String TEST_DIRECT_QUEUE="TestDirectQueue";
    public final static String TEST_DIRECT_EXCHANGE="TestDirectExchange";
    public final static String TEST_DIRECT_ROUTING="TestDirectRouting";

    
    @Bean
    public Queue DirectQueue() {
        
        return new Queue(TEST_DIRECT_QUEUE,true);
    }


    
    @Bean
    DirectExchange DirectExchange() {
        
        return new DirectExchange(TEST_DIRECT_EXCHANGE,true,false);
    }


    
    @Bean
    Binding bindingDirect() {
        
        return BindingBuilder.bind(DirectQueue()).to(DirectExchange()).with(TEST_DIRECT_ROUTING);
    }
    

}
3、编写controller
package com.walker.controller;

import com.alibaba.fastjson.JSON;
import com.walker.config.DirectRabbitConfig;
import com.walker.entity.data.SendMsgData;
import com.walker.entity.data.TestEntity;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;


import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.UUID;


@RestController
@RequestMapping("/test")
public class TestController {
    
    @Autowired
    RabbitTemplate rabbitTemplate;  //使用RabbitTemplate,这提供了接收/发送等等方法

    
    @GetMapping("/sendDirectMessage")
    public String sendDirectMessage() {
        String messageId = String.valueOf(UUID.randomUUID());
        String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));


        
        SendMsgData data = new SendMsgData<>();
        data.setId(messageId);
        data.setCreateTime(createTime);

        TestEntity testEntity = new TestEntity();
        testEntity.setName("walker");
        testEntity.setAge("18");
        data.setData(testEntity);

        
        rabbitTemplate.convertAndSend(DirectRabbitConfig.TEST_DIRECT_EXCHANGE,
                DirectRabbitConfig.TEST_DIRECT_ROUTING,
                JSON.toJSONString(data));
        
        return "ok";
    }


}

SendMsgData.java
package com.walker.entity.data;

import lombok.Data;

@Data
public class SendMsgData {
    private String id;
    private T data;
    private String createTime;
}

TestEntity.java
package com.walker.entity.data;

import lombok.Data;

@Data
public class TestEntity {
    private String name;
    private String age;
}

4、测试

可以在postman或者其他工具中进行测试

之后就可以在队列中看到有一条数据进来了

也可以查看这条数据的内容是什么,这里就将json字符串数据存储进来了

消费者-单个监听器消费信息 1、创建项目

2、导入依赖

和生成者项目基本上差不多

        
            org.springframework.boot
            spring-boot-starter-amqp
        

        
            org.springframework.boot
            spring-boot-starter-web
        

        
            org.projectlombok
            lombok
            true
        

        
            org.springframework.boot
            spring-boot-starter-test
            test
        

        
            org.springframework.amqp
            spring-rabbit-test
            test
        
        
        
            com.alibaba
            fastjson
            1.2.62
        

        
            cn.hutool
            hutool-all
            5.7.12
        
3、配置application.yml
server:
  port: 9000
spring:
  application:
    name: rabbitmq-consumer
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
4、定义rabbitmq监听器
package com.walker.listener;


import cn.hutool.core.bean.BeanUtil;
import com.alibaba.fastjson.JSONObject;
import com.walker.data.ReceiveData;
import com.walker.data.TestEntity;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;



@Component

@RabbitListener(queues = {"TestDirectQueue"})
@Slf4j //这个是lombok的一个日志注解
public class DirectListener {



    
    @RabbitHandler
    public void handler(String json){
        
        
        log.info("接受的数据json格式:{}"+json);
        ReceiveData receiveData = JSONObject.parseObject(json, ReceiveData.class);
        log.info("receiveData:{}", receiveData);
        log.info("testEntity:{}", BeanUtil.copyProperties(receiveData.getData(), TestEntity.class));
    }
}

ReceiveData.java
package com.walker.data;

import lombok.Data;

@Data
public class ReceiveData {
    private String id;
    private T data;
    private String createTime;
}

TestEntity.java
package com.walker.data;

import lombok.Data;

@Data
public class TestEntity {
    private String name;
    private String age;
}

5、测试

将项目启动后,发现这里打印出了结果,说明已经将数据接收了下来

2022-03-09 22:35:36.476  INFO 32100 --- [ntContainer#0-1] com.walker.listener.DirectListener       : 接受的数据json格式:{}{"createTime":"2022-03-09 22:11:32","data":{"age":"18","name":"walker"},"id":"e3919199-f3ee-486f-9632-286ad518ebd4"}
2022-03-09 22:35:36.497  INFO 32100 --- [ntContainer#0-1] com.walker.listener.DirectListener       : receiveData:ReceiveData(id=e3919199-f3ee-486f-9632-286ad518ebd4, data={"name":"walker","age":"18"}, createTime=2022-03-09 22:11:32)
2022-03-09 22:35:36.529  INFO 32100 --- [ntContainer#0-1] com.walker.listener.DirectListener       : testEntity:TestEntity(name=walker, age=18)
多个监听器

当新增多一个监听器的时候,在接手方法的时候,会是什么样的一种情况呢?

1、新增DirectListener2

package com.walker.listener;


import cn.hutool.core.bean.BeanUtil;
import com.alibaba.fastjson.JSONObject;
import com.walker.data.ReceiveData;
import com.walker.data.TestEntity;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;



@Component

@RabbitListener(queues = {"TestDirectQueue"})
@Slf4j //这个是lombok的一个日志注解
public class DirectListener2 {



    
    @RabbitHandler
    public void handler(String json){

        
        System.out.println("监听器2==========》");
        log.info("接受的数据json格式:{}"+json);
        ReceiveData receiveData = JSONObject.parseObject(json, ReceiveData.class);
        log.info("receiveData:{}", receiveData);
        log.info("testEntity:{}", BeanUtil.copyProperties(receiveData.getData(), TestEntity.class));
    }
}

2、重新启动项目
3、生产者发送几条数据
4、查看消费者接收的数据

总结:当出现有相同接收条件的多个监听器时,会轮训进行接收

参考:https://blog.csdn.net/qq_35387940/article/details/100514134

主题交换机 生产者发送信息

这里定义了三个发送信息的接口
1个是猫,将猫的信息发送到topic.cat队列
其他的是非猫的,将其发送到topic.random队列

1、编写主题交换机配置
package com.walker.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;




@Configuration
public class TopicRabbitConfig {
    public final static String TOPIC_CAT="topic.cat";
    public final static String TOPIC_RANDOM="topic.#";
    public final static String TOPIC_RANDOM_QUEUE_NAME="topic.random";
    public final static String TOPIC_EXCHANGE="topicExchange";




    

    
    @Bean
    public Queue catQueue(){
        return new Queue(TOPIC_CAT);
    }

    @Bean
    public Queue randomQueue(){
        return new Queue(TOPIC_RANDOM_QUEUE_NAME);
    }




    
    @Bean
    public TopicExchange exchange(){
        return new TopicExchange(TOPIC_EXCHANGE);
    }

    
    @Bean
    public Binding bindCat(){
        return BindingBuilder.bind(catQueue()).to(exchange()).with(TOPIC_CAT);
    }

    
    @Bean
    public Binding bindRandom(){
        return BindingBuilder.bind(randomQueue()).to(exchange()).with(TOPIC_RANDOM);
    }




}


2、编写controller
package com.walker.controller;

import com.walker.config.TopicRabbitConfig;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;


@RestController
@RequestMapping("/topic")
public class TopicController {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    public final static String TOPIC_MOUSE="topic.mouse";
    public final static String TOPIC_TIGER="topic.tiger";

    
    @GetMapping("/sendCatMsg")
    public void sendCatMsg(){

        
        String msg="i am a cat";
        rabbitTemplate.convertAndSend(TopicRabbitConfig.TOPIC_EXCHANGE,TopicRabbitConfig.TOPIC_CAT,msg);
    }


    
    @GetMapping("/sendMouseMsg")
    public void sendMouseMsg(){

        
        String msg="i am a mouse";
        rabbitTemplate.convertAndSend(TopicRabbitConfig.TOPIC_EXCHANGE,TOPIC_MOUSE,msg);
    }


    
    @GetMapping("/sendTigetMsg")
    public void sendTigetMsg(){

        

        String msg="i am a "+TOPIC_TIGER;
        rabbitTemplate.convertAndSend(TopicRabbitConfig.TOPIC_EXCHANGE, TOPIC_TIGER,msg);
    }
}

3、测试发送信息


消费者接收信息

这边定义两个监听器,分别用来接收路由键分别为猫和topic.random的信息

1、编写监听器

猫监听器

package com.walker.listener;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(queues = "topic.cat")
public class TopicCatListener {

    @RabbitHandler
    public void handler(String msg){
        System.out.println("cat监听器=======》");
        System.out.println("接收信息为:"+msg);
    }
}

topic.random队列监听器

package com.walker.listener;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;


@Component
@RabbitListener(queues = "topic.random")
public class TopicRandomListener {
    @RabbitHandler
    public void handler(String msg){
        System.out.println("随机监听器");
        System.out.println(msg);
    }
}

2、启动项目,测试

启动项目之后,就会接收到下面的信息
这里可以发现,当点击一次猫的信息之后,出现了两次监听结果。

扇形交换机 生产者发送信息 1、编写配置类
package com.walker.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


@Configuration
public class FanoutRabbitConfig {

    public final static String QUEUE_1="fanout.1";
    public final static String QUEUE_2="fanout.2";
    public final static String EXCAHNGE="fanout.exchange";

    @Bean
    public Queue queue1(){
        return new Queue(QUEUE_1);
    }
    @Bean
    public Queue queue2(){
        return new Queue(QUEUE_2);
    }

    @Bean
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange(EXCAHNGE);
    }

    
    @Bean
    public Binding bind1(){
        return BindingBuilder.bind(queue1()).to(fanoutExchange());
    }

    @Bean
    public Binding bind2(){
        return BindingBuilder.bind(queue2()).to(fanoutExchange());
    }
}


2、编写controller
package com.walker.controller;

import com.walker.config.FanoutRabbitConfig;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("fanout")
public class FanoutController {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping("/send")
    public void sendQueue(){
        String msg="i am queue";
        
        rabbitTemplate.convertAndSend(FanoutRabbitConfig.EXCAHNGE,null,msg);
    }
}

3、测试

调用一次postman接口

之后发现定义的两个队列都接收到了信息

消费者接收信息 1、编写监听器
package com.walker.listener;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(queues = "fanout.1")
public class FanoutListener1 {

    @RabbitHandler
    public void handler(String msg){
        System.out.println("FanoutListener1监听数据===》");
        System.out.println(msg);
    }
}




package com.walker.listener;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(queues = "fanout.2")
public class FanoutListener2 {

    @RabbitHandler
    public void handler(String msg){
        System.out.println("FanoutListener2监听数据===》");
        System.out.println(msg);
    }
}

2、测试

启动项目后,发现两个监听器都接收到了信息

参考地址:
Springboot 整合RabbitMq ,用心看完这一篇就够了

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/760906.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号