栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Springboot安装整合RabbitMQ实现发布订阅

Springboot安装整合RabbitMQ实现发布订阅

一、RabbitMQ概念

RabbitMQ是流行的开源消息队列系统,是AMQP(Advanced Message Queuing Protocol高级消息队列协议)的标准实现,用erlang语言开发。RabbitMQ据说具有良好的性能和时效性,同时还能够非常好的支持集群和负载部署,非常适合在较大规模的分布式系统中使用。

RabbitMQ的5种模式

1.不直接Exchange交换机(默认交换机)

simple简单模式:一个生产者生产一个消息到一个队列被一个消费者接收work工作队列模式:生产者发送消息到一个队列中,然后可以被多个消费者监听该队列;一个消息只能被一个消费者接收,消费者之间是竞争关系 2.使用Exchange交换机;订阅模式(交换机:广播fanout、定向direct、通配符topic)

订阅模式与前面的两种模式比较:多了一个角色Exchange交换机,接收生产者发送的消息并决定如何投递消息到其绑定的队列;消息的投递决定于交换机的类型。

发布与订阅模式:使用了fanout广播类型的交换机,可以将一个消息发送到所有绑定了该交换机的队列路由模式:使用了direct定向类型的交换机,消费会携带路由key,交换机根据消息的路由key与队列的路由key进行对比,一致的话那么该队列可以接收到消息通配符模式:使用了topic通配符类型的交换机,消费会携带路由key(*, #),交换机根据消息的路由key与队列的路由key进行对比,匹配的话那么该队列可以接收到消息 二、安装RabbitMQ(windows)

RabbitMQ由Erlang语言开发,Erlang语言用于并发及分布式系统的开发,在电信领域应用广泛,OTP(OpenTelecom Platform)作为Erlang语言的一部分,包含了很多基于Erlang开发的中间件及工具库,安装RabbitMQ需要安装Erlang/OTP,并保持版本匹配。

安装erlang

下载地址:http://erlang.org/download/otp_win64_20.3.exe
以管理员身份运行此文件进行安装。 。

 erlang安装完成需要配置erlang 系统环境变量: ERLANG_HOME=C:Program Fileserl9.3(以实际路径为主) 在path中添加%ERLANG_HOME%bin;

安装RabbitMQ

 下载地址:https://github.com/rabbitmq/rabbitmq-server/releases/tag/v3.7.14
以管理员身份运行此文件进行安装。安装完成后可以在系统服务中查看到RabbitMQ服务。

配置插件

为了更加方便的管理RabbitMQ服务,可以安装RabbitMQ提供的一个浏览器端管理插件,可以通过浏览器页面方便
的进行服务管理。
安装方式:
1、以管理员身份打开 cmd (不是PowerShell);然后进入在RabbitMQ的安装目录下sbin目录

2、在上述窗口执行命令: rabbitmq-plugins.bat enable rabbitmq_management

 打开浏览器访问网站http://localhost:15672进入登录页面,默认账号和密码都为guest

三、整合springboot

这边介绍订阅模式中的路由模式,广播则改变交换机类型并且不使用路由key即可。(队列模式则不使用交换机通过队列直接传输比较简单不做介绍)

先启动RabbitMQ

引入pom文件

      
            org.springframework.boot
            spring-boot-starter-amqp
        

基本配置 (admin是我自己添加的用户)

spring:
  rabbitmq:
    host: localhost
    port: 5672
    virtual-host: /admin
    username: admin
    password: 123456

该配置用于传输对象自动转换json

@Configuration
public class RabbitConfig {

    @Bean
    public Jackson2JsonMessageConverter jsonMessageConverter(){
        return new Jackson2JsonMessageConverter();
    }
}

代码实现

添加交换机、队列、路由key(一个交换机可绑定多个队列,注释的地方为另一种绑定队列交换机的方式)

@Configuration
public class RabbitMQConfig {

    //交换机名称
    public static final String ITEM_TOPIC_EXCHANGE = "item_topic_exchange";
    //队列名称
    public static final String ITEM_QUEUE = "item_queue";
    //路由key名称
    public static final String ITEM_KEY = "item.delete";


    //声明交换机
//    @Bean("itemTopicExchange")
//    public Exchange topicExchange(){
//        return ExchangeBuilder.topicExchange(ITEM_TOPIC_EXCHANGE).durable(true).build();
//    }
//    //声明队列
//    @Bean("itemQueue")
//    public Queue itemQueue(){
//        return QueueBuilder.durable(ITEM_QUEUE).build();
//    }
//
//    //绑定队列和交换机
//    @Bean
//    public Binding itemQueueExchange(@Qualifier("itemQueue") Queue queue,@Qualifier("itemTopicExchange") Exchange exchange){
//        return BindingBuilder.bind(queue).to(exchange).with("item.delete").noargs();
}

生产者及消费者

@Controller
public class StudentController {
//    @Autowired
//    private StudentMapper studentMapper;
    @Autowired
    private RabbitTemplate rabbitTemplate;
  
    @ResponseBody
    @RequestMapping(value = "/add" ,method = {RequestMethod.POST})
    public void addStudent(){
        Student student = new Student();
        student.setId(1);
        student.setNaem("zs");
        rabbitTemplate.convertAndSend(RabbitMQConfig.ITEM_TOPIC_EXCHANGE,"item.delete",student);
        rabbitTemplate.convertAndSend(RabbitMQConfig.ITEM_TOPIC_EXCHANGE,"item.update",student+"");
        rabbitTemplate.convertAndSend(RabbitMQConfig.ITEM_TOPIC_EXCHANGE,"item.insert",student+"");
    }
   
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = RabbitMQConfig.ITEM_QUEUE),
            exchange = @Exchange(name = RabbitMQConfig.ITEM_TOPIC_EXCHANGE,type = ExchangeTypes.TOPIC),
            key = RabbitMQConfig.ITEM_KEY
    ))
    public  void myListener1(Student message){
        System.out.println("消费者1接收到的消息为:"+message);
    }
}

调用接口之后消费者接收到 生产者中路由key为item.delete的对象信息。

需要注意若是修改交换机类型或者绑定队列,需要更改交换机名字或者进RabbitMQ后台删除原来的交换机,不然修改无效!

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/745302.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号