栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

rabbitmq api(rabbitmq是干什么的)

rabbitmq api(rabbitmq是干什么的)

提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档

文章目录

前言一、RabbitMQ是什么?二、项目中使用步骤

1.在项目中引入RabbitMQ的依赖2.写配置文件

1.生产者2.消费者 3.写配置类对RabbitMQ的工作模式进行配置

1.生产者2.消费者 总结


前言

记录一次使用RabbitMQ的使用过程。


提示:以下是本篇文章正文内容,下面案例可供参考

一、RabbitMQ是什么?

在使用RabbitMQ之前,需要对它有一个基本了解,更方便我们对它理解。
1.官网解释:RabbitMQ is the most widely deployed open source message broker.RabbitMQ is lightweight and easy to deploy on premises and in the cloud. It supports multiple messaging protocols. RabbitMQ can be deployed in distributed and federated configurations to meet high-scale, high-availability requirements.
RabbitMQ是最受欢迎的开源消息代理中间件。它是轻量级的,并且易于部署到企业的机房或者云端。它支持多种消息传递协议,可以以分布式或者集群配置的方式部署,解决大规模和高可用性需求。【源头不做太多解释,感兴趣可以自己查找详细资料】
总结:通俗来讲,它是一个消息代理中间件,它的优点是:开源(免费)、轻量级(自己理解:运行时占用资源少,不会导致程序卡顿)、支持多种消息传递协议、支持分布式部署。
2.它的作用:作为一个消息代理的中间件,它的作用就是将生产者(产生消息的一方)产生的消息存储到队列中,并确保其可以被正确的消费者(使用该消息的一方)消费。
3.使用场景:主要就是利用它的异步通信的能力,将一些业务解耦,提高用户的使用体验,比如:在电商购物平台的付款下单和短信通知付款成功两者之间就可以是异步的,用户不必等到受到付款成功的短信,只在平台付款就能确认自己已经买到该物品,然后系统将一个个的发送短信请求存到消息队列里,等队列满或者达到一定时间时一起处理(即付款的行为和收到短信的行为不是同时进行的);若是这两个行为是同步状态,当高并发时,会造成很多用户已经付款却迟迟不能确认自己购买成功,用户体验很差。
我自己使用的场景是:网页访问量埋点,用来在后台统计一段时间内有多少用户访问网站,因为这个如果来一个用户就将其写入数据库,当大量用户访问的时候,再加上还有其他业务执行会给数据库造成压力,而且这个是一个统计量,并不需要那么实时,所以就设计将数据存到消息队列中,当我们在后台管理页面进行一次刷新或者访问量超过一个阈值时,将消息队列清空。以此降低对数据库的频繁写操作造成的压力。
4.RabbitMQ原理:使用的是AMQP协议(高级消息队列协议)实现。其系统架构如下所示(图有点丑,有点汗颜):

如图所示,消费者和生产者通过TCP协议连接到RabbitMQ服务器(也可称作broker即消息代理,它实际上就是消息服务器实体)。channel是基于connection建立的,消息就通过这个channel通道传到broker。一个broker又包含多个虚拟主机vhost,这个虚拟主机是为了在一个单独的代理上实现多个隔离的环境,即不同用户或者用户组的消息使用同一个vhost,同其他vhost隔离开。然后一个vhost包含多个Exchange(交换机)和多个Queue(存放消息的实体)。Exchange和Queue通过BINDING_Key进行绑定,他们的关系是多对多,绑定好之后,经过交换机的消息就会被放到对应的Queue中。
消息存入消息队列的过程:
1.生产者先与broker建立连接(即注册到broker),创建消息传输通道channel,并指定一个vhost用来做消息传输;
2.生产者产生消息并将其通过channel传送到broker,此时消息带着一个ROUTING_KEY;
3.当该消息进入broker之,先到Exchange,交换机根据消息的ROUTING_KEY以及他自己的BINDING_Key将消息存入对应的队列中去;
4.消费者配置步骤类似,然后也是根据对应的ROUTING_KEY去相应的队列中取消息。
常用Exchange的种类:
1.Direct:该模式下Exchange会将消息送到ROUTING_KEY和BINDING_Key完全相同的队列中去。
2.Fanout:该模式下不管消息的ROUTING_KEY是什么,Exchange都会将消息转发给所有绑定的Queue。
3.Topic:该模式下,Exchange和队列的绑定使用的是通配符组成的BINDING_Key,其中“*”代表匹配任意一个词组,“#”代表一个或者多个词组,当消息过来时,Exchange会将根据ROUTING_KEY是否满足BINDING_Key的通配规则来将其送到对应的Queue。例:BINDING_Key为 “test.*.a.#”就可以匹配ROUTING_KEY为“test.ccc.a.com”这样的消息。

二、项目中使用步骤

提示:在项目中使用只是去配置已经建好的虚拟机、交换机和队列,这些是需要登录RabbitMQ的管理页面进行配置的

1.在项目中引入RabbitMQ的依赖

代码如下(示例):


            org.springframework.boot
            spring-boot-starter-amqp

2.写配置文件 1.生产者
spring:
    application:
    name: rabbitmq-provider
    #配置rabbitMq 服务器
    rabbitmq:
        host: xxx.xxx.xxx.xx
        port: 5672
        username: root
        password: 555555
        #虚拟host 可以不设置,使用server默认host
        #virtual-host
        listener:
            type: direct
            direct:
                retry:
                    #开启重试
                    enabled: true
                    #重试3次
                    max-attempts: 3
                    #间隔时间 1s
                    initial-interval: 1000
2.消费者
spring:
    application:
        name: rabbitmq-consumer
        #配置rabbitMq 服务器
    rabbitmq:
        host: xxx.xxx.xxx.xx
        port: 5672
        username: root
        password: 555555
        #虚拟host 可以不设置,使用server默认host
        listener:
            type: direct
            direct:
                retry:
                    #开启重试
                    enabled: true
                    #重试3次
                    max-attempts: 3
                    #间隔时间 1s
                    initial-interval: 1000

这里配置文件作用相当于将消费者和生产者注册到RabbitMQ服务器中。所以配置是一样的,这里可以不设置虚拟主机,不设置时,RabbitMQ服务器会给一个默认的host使用。

3.写配置类对RabbitMQ的工作模式进行配置

代码如下(示例):

1.生产者
@Configuration
@Slf4j
@AllArgsConstructor
public class RabbitConfig {
    
    public static final String RABBITMQ_KK_TOPIC="rabbitmq.kk.topic2";
    
    public static final String KK_EXCHANGE = "kk.exchange.c";
    
    public static final String RABBITMQ_DIRECT_ROUTING="rabbitmq.direct.routing";
    //队列 DirectQueue
    @Bean
    public Queue EventQueue() {
        // durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
        // exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
        // autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
        //一般设置一下队列的持久化就好,其余两个就是默认false
        return new Queue(RABBITMQ_KK_TOPIC,true,false,false);
    }
    //Direct交换机 起名:EventExchange
    @Bean
    DirectExchange EventDirectExchange() {
        //  return new DirectExchange("EventExchange",true,true);
        return new DirectExchange(KK_EXCHANGE,true,false);
    }
    //绑定  将队列和交换机绑定, 并设置用于匹配键:EventDirectRouting
    @Bean
    Binding bindingDirect() {
        return BindingBuilder.bind(EventQueue()).to(EventDirectExchange()).with(RABBITMQ_DIRECT_ROUTING);
    }
    // 转为json格式
    @Bean
    MessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();
    }
    @Bean
    RabbitTemplate rabbitTemplate(ConnectionFactory factory){
        RabbitTemplate rabbitTemplate=new RabbitTemplate(factory);
        rabbitTemplate.setMessageConverter(messageConverter());
        return rabbitTemplate;
    }
}

2.消费者
@Configuration
@Slf4j
@AllArgsConstructor
public class RabbitMQConfig {
    
    public static final String RABBITMQ_KK_TOPIC="rabbitmq.kk.topic2";
    
    public static final String KK_EXCHANGE = "kk.exchange.c";
    
    public static final String RABBITMQ_DIRECT_ROUTING="rabbitmq.direct.routing";

    @Bean
    public MessageConverter jsonMessageConverter(ObjectMapper objectMapper) {
        Jackson2JsonMessageConverter messageConverter = new Jackson2JsonMessageConverter(objectMapper);
        DefaultJackson2JavaTypeMapper javaTypeMapper = new DefaultJackson2JavaTypeMapper();
        Map> idClassMapping = new HashMap<>();
        idClassMapping.put("Bean1", KkEventRequest.class);
        //javaTypeMapper.setTrustedPackages("*");
        messageConverter.setJavaTypeMapper(javaTypeMapper);
        return messageConverter;
    }
    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
        RabbitTemplate rabbitTemplate = new RabbitTemplate();
        rabbitTemplate.setConnectionFactory(connectionFactory);
        //设置开启Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
        return rabbitTemplate;
    }


}

然后,写对应的业务逻辑即可。

总结

本文简单介绍了一下RabbitMQ及其在项目中的使用,以便自己加深学习记忆,若对各位读者有所帮助,不要忘记一件三连哦(狗头保命)。大神轻喷!

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/771577.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号