栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

rabbitmq springboot配置参数(springboot整合rabbitmq实战)

rabbitmq springboot配置参数(springboot整合rabbitmq实战)

提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档

文章目录

前言交换机类型交换机属性Direct exchange(直连交换机)Fanout exchange(扇型交换机)Topic exchange(主题交换机)消息持久化


前言

关于rabbitmq安装就不说了,直接安装下来是这个界面,默认账号 密码都是:guest,系统管理员角色,但是这个角色只能在本地使用,不可远程登陆,想要远程登录需要重写添加用户分配角色。

在可视化端可以手动添加交换机,队列,消息,创建用户,分配权限。

了解一下mq的生产消费流程,认真看别走神,要思考,带着疑问看下面的。

交换机有四种,消息发布(生产者)是发给哪一种交换机?交换机再把消息给到队列,给的又是哪个队列?订阅者又取哪个队列的消息?

写代码之前了解一下交换机都有哪些,里面都有啥属性。

交换机类型

Direct exchange(直连交换机)

Fanout exchange(扇型交换机)

Topic exchange(主题交换机)

Headers exchange(头交换机)

amq.* exchanges 默认交换机

交换机属性

Name 交换机名称

Type 交换机类型direct、topic、 fanout、 headers

Durability 是否需要持久化。如果持久化,则RabbitMQ重启后,交换机还存在

Auto-delete 当最后一个绑定到Exchange 上的队列删除后,自动删除该Exchange

Internal 当前Exchange是否于RabbitMQ内部使用,默认为False

导入依赖:

 		
        
            org.springframework.boot
            spring-boot-starter-amqp
        

配置yml

server:
  port: 8080
spring:
  #给项目来个名字
  application:
    name: rabbitmq-provider
  #配置rabbitMq 服务器
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
Direct exchange(直连交换机)
@Configuration
public class DirectRabbitConfig {
    //队列 起名:TestDirectQueue
    @Bean
    public Queue TestDirectQueue(){
        
        return new Queue("TestDirectQueue",true);
    }

    //Direct交换机 起名:TestDirectExchange
    @Bean
    DirectExchange TestDirectExchange(){
        return new DirectExchange("TestDirectExchange",true,false);
    }

    //绑定 将队列和交换机绑定, 并设置用于匹配键:TestDirectRouting(路由键)
    @Bean
    Binding bindingDirect(){
        return BindingBuilder.bind(TestDirectQueue()).to(TestDirectExchange()).with("TestDirectRouting");
    }

思想就是,定义一个队列,定义一个交换机,OK绑定他们,发消息的时候就可以把消息发送到定义的交换机,交换机把消息给绑定的队列,一个交换机可以绑定多个队列。
消息推送:

	@Resource
    RabbitTemplate rabbitTemplate; //使用RabbitTemplate,这提供了接收/发送等等方法

    //直连交换机
    @GetMapping("/sendDirectMessage")
    public String sendDirectMessage() {
        //设置值
        String messageId = String.valueOf(UUID.randomUUID());
        String messageData = "Direct Exchange test message, hello!";
        String createTime = DateUtil.now();

        //封装map一次发送
        Map map = new HashMap<>();
        map.put("messageId", messageId);
        map.put("messageData", messageData);
        map.put("createTime", createTime);

        //将消息携带绑定键值:TestDirectRouting(路由键)
        // 发送到交换机TestDirectExchange
        rabbitTemplate.convertAndSend(
                "TestDirectExchange",
                "TestDirectRouting",
                map);
        return "ok";
    }



消息已经加到交换机对应的队列下了。

创建消费模块,pom依赖不变,yml配置也不变。


@Component
@RabbitListener(queues = "TestDirectQueue")//注:监听的队列名称:TestDirectQueue
public class DirectReceiverService {

    @RabbitHandler
    public void process(Map testMessage) {
        System.out.println("DirectReceiver消费者收到消息  :"+testMessage.toString());
    }
}


收到消息,接收成功,实时消费。
注意:直连交换机是一个消费者监听一个队列,如果两个消费者同时监听一个队列,会出现轮询消费消息,消息不会重复。直白点儿就是消费者A,B都监听TestDirectQueue就会出现轮询消费的情况。

Fanout exchange(扇型交换机)

道理一样形式不同,上代码

@Configuration
public class FanoutRabbitConfig {
    @Resource
    private RabbitTemplate rabbitTemplate;
    
    @Bean
    public Queue queueA() {
        return new Queue("fanout.A",true);
    }

    @Bean
    public Queue queueB() {
        return new Queue("fanout.B",true);
    }

    @Bean
    public Queue queueC() {
        return new Queue("fanout.C",true);
    }

    @Bean
    FanoutExchange fanoutExchange() {
        return new FanoutExchange("fanoutExchange",true,false);
    }

    @Bean
    Binding bindingExchangeA() {
        return BindingBuilder.bind(queueA()).to(fanoutExchange());
    }

    @Bean
    Binding bindingExchangeB() {
        return BindingBuilder.bind(queueB()).to(fanoutExchange());
    }

    @Bean
    Binding bindingExchangeC() {
        return BindingBuilder.bind(queueC()).to(fanoutExchange());
    }

创建三个队列,一个交换机,将他们绑定在一起,不需要通过路由键绑定,定义了也无效。
推送消息

//扇型交换机
    @GetMapping("/sendFanoutMessage")
    public String sendFanoutMessage() {
        String messageId = String.valueOf(UUID.randomUUID());
        String messageData = "message: testFanoutMessage ";
        String createTime = DateUtil.now();

        Map map = new HashMap<>();
        map.put("messageId", messageId);
        map.put("messageData", messageData);
        map.put("createTime", createTime);
        rabbitTemplate.convertAndSend("fanoutExchange", null, map);
        return "ok";
    }

这里的路由键直接设为null就行
消费类:

@Component
@RabbitListener(queues = "fanout.A")
public class FanoutReceiverA {
    @RabbitHandler
    public void process(Map testMessage) {
        System.out.println("FanoutReceiverA消费者收到消息  : " +testMessage.toString());
    }
}
@Component
@RabbitListener(queues = "fanout.B")
public class FanoutReceiverB {
    @RabbitHandler
    public void process(Map testMessage) {
        System.out.println("FanoutReceiverB消费者收到消息  : " +testMessage.toString());
    }
}
@Component
@RabbitListener(queues = "fanout.C")
public class FanoutReceiverC {
    @RabbitHandler
    public void process(Map testMessage) {
        System.out.println("FanoutReceiverC消费者收到消息  : " +testMessage.toString());
    }
}



只要绑定了交换机,三个监听类会同时受到这条消息

Topic exchange(主题交换机)

直接上代码

@Configuration
public class TopicRabbitConfig {
    /
@Component
@RabbitListener(queues = "topic.woman")//注:监听的队列名称:topic.woman
public class TopicWomanReceiver {

    @RabbitHandler
    public void process(Map testMessage) {
        System.out.println("TopicTotalReceiver woman消费者收到消息  : " + testMessage.toString());
    }
}




消息持久化

需要设置交换机和队列都是持久化状态就可以了,消息默认是持久化的。


1是非持久化

本篇暂先到这里了,想继续深入了解自动手动确认,死信号,AMQP协议,以及队列长度,消息大小限制,后续再见。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/771679.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号