栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

RabbitMQ从入门到实战(图文并茂)

RabbitMQ从入门到实战(图文并茂)

MQ概述

MQ全称 Message Queue(消息队列),是在消息的传输过程中保存消息的容器。多用于分布式系统 之间进行通信。

MQ优势

1、应用解耦

MQ相当于一个中介,生产方通过MQ与消费方交互,它将应用程序进行解耦合。

 

 2、任务异步处理

 

3、削峰填谷

如订单系统,在下单的时候就会往数据库写数据。但是数据库只能支撑每秒1000左右的并发写入,并发 量再高就容易宕机。低峰期的时候并发也就100多个,但是在高峰期时候,并发量会突然激增到5000以 上,这个时候数据库肯定卡死了。

 消息被MQ保存起来了,然后系统就可以按照自己的消费能力来消费,比如每秒1000个消息,这样慢慢 写入数据库,这样就不会卡死数据库了。

 但是使用了MQ之后,限制消费消息的速度为1000,但是这样一来,高峰期产生的数据势必会被积压在 MQ中,高峰就被“削”掉了。但是因为消息积压,在高峰期过后的一段时间内,消费消息的速度还是会 维持在1000QPS,直到消费完积压的消息,这就叫做“填谷”

 

MQ的劣势

系统可用性降低 系统引入的外部依赖越多,系统稳定性越差。一旦 MQ 宕机,就会对业务造成影响。如何保证MQ的高可用?

系统复杂度提高 MQ 的加入大大增加了系统的复杂度,以前系统间是同步的远程调用,现在是通过 MQ 进行异步调用。如何保证消息没有被重复消费?怎么处理消息丢失情况?那么保证消息传递的顺序性?

一致性问题 A 系统处理完业务,通过 MQ 给B、C、D三个系统发消息,如果 B 系统、C 系统处理成功,D 系统处理 失败。如何保证消息数据处理的一致性?

 实现MQ的大致有两种主流方式:AMQP、JMS。

 AMQP 与 JMS 区别

AMQP

JMS
通过规定协议来统一数据交互的格式定义了统一的接口,来对消息进行统一
只是协议,不规定实现方式,因此是跨语言的限定了必须使用java语言
消息模式单一

规定了两种消息模式

RabbitMQ官方地址:Messaging that just works — RabbitMQ

2007年,Rabbit 技术公司基于 AMQP 标准开发的 RabbitMQ 1.0 发布。RabbitMQ 采用 Erlang 语言开 发。Erlang 语言专门为开发高并发和分布式系统的一种语言,在电信领域使用广泛。 RabbitMQ 基础架构如下图

 

 官网对应模式介绍:RabbitMQ Tutorials — RabbitMQ

 

 添加依赖

 

 Work Queues 与入门程序的 简单模式 的代码是几乎一样的;可以完全复制,并复制多一个消费者进行 多个消费者同时消费消息的测试。

 Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑 定,或者没有符合路由规则的队列,那么消息会丢失!

 

交换机需要与队列进行绑定,绑定之后;一个消息可以被多个消费者都收到

发布订阅模式与工作队列模式的区别

1、工作队列模式不用定义交换机,而发布/订阅模式需要定义交换机。

2、发布/订阅模式的生产方是面向交换机发送消息,工作队列模式的生产方是面向队列发送消息(底层使用默认交换机)。

3、发布/订阅模式需要设置队列和交换机的绑定,工作队列模式不需要设置,实际上工作队列模式会将队列绑定到默认的交换机。

 

在编码上与 Publish/Subscribe发布与订阅模式 的区别是交换机的类型为:Direct,还有队列绑定交换 机的时候需要指定routing key

Routing模式要求队列在绑定交换机时要指定routing key,消息会转发到符合routing key的队列。

 

 

 

 

Topic主题模式可以实现 Publish/Subscribe发布与订阅模式 和 Routing路由模式 的功能;只是Topic 在配置routing key 的时候可以使用通配符,显得更加灵活

模式总结

RabbitMQ工作模式:

简单模式 HELLOWORLD一个生产者、一个消费者,不需要设置交换机(使用默认的交换机)
工作队列模式 Work Queue一个生产者、多个消费者(竞争关系),不需要设置交换机(使用默认的交换机)
发布订阅模式 Publish/subscribe需要设置类型为fanout的交换机,并且交换机和队列进行绑定, 当发送消息到交换机后,交换机会将消息发送到绑定的队列
路由模式 Routing需要设置类型为direct的交换机,交换机和队列进行绑定,并且指定routing key,当发送消息到交换机后,交换机会根据routing key将消息发送到对应的队列
通配符模式 Topic需要设置类型为topic的交换机,交换机和队列进行绑定,并且指定通配符方式的 routing key,当发送消息到交换机后,交换机会根据routing key将消息发送到对应的队列

RabbitMQ整合Spring

创建工程模块,添加依赖

     
4.0.0
    com.lxs    
spring-rabbitmq-producer    
1.0-SNAPSHOT
            
                    
            org.springframework            
            spring-context                        
            5.1.7.RELEASE        
        
                    
            org.springframework.amqp            
            spring-rabbit                    
            2.1.8.RELEASE        
        
                    
            junit            
            junit            
            4.12        
        
                    
            org.springframework            
            spring-test                        
            5.1.7.RELEASE        
            
    

配置整合

         
                
        
            
        
        
                                         
                            
                                                            
         

 

发送消息

创建测试文件 spring-rabbitmqproducersrctestjavacomlxsrabbitmqProducerTest.java

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring/spring-rabbitmq.xml")
public class ProducerTest {
@Autowired
private RabbitTemplate rabbitTemplate;

@Test
public void queueTest(){
//路由键与队列同名
rabbitTemplate.convertAndSend("spring_queue", "只发队列spring_queue的消
息。");
}
                              
@Test
public void fanoutTest(){

rabbitTemplate.convertAndSend("spring_fanout_exchange", "", "发送到
spring_fanout_exchange交换机的广播消息");
}

@Test
public void topicTest(){

rabbitTemplate.convertAndSend("spring_topic_exchange", "lxs.bj", "发送到
spring_topic_exchange交换机lxs.bj的消息");
rabbitTemplate.convertAndSend("spring_topic_exchange", "lxs.bj.1", "发送
到spring_topic_exchange交换机lxs.bj.1的消息");
rabbitTemplate.convertAndSend("spring_topic_exchange", "lxs.bj.2", "发送
到spring_topic_exchange交换机lxs.bj.2的消息");
rabbitTemplate.convertAndSend("spring_topic_exchange", "xzk.cn", "发送到
spring_topic_exchange交换机xzk.cn的消息");
}
}

消费者依赖



4.0.0
com.lxs
spring-rabbitmq-consumer
1.0-SNAPSHOT


org.springframework
spring-context
5.1.7.RELEASE


org.springframework.amqp
spring-rabbit
2.1.8.RELEASE


junit
junit
4.12


org.springframework
spring-test
5.1.7.RELEASE



配置整合

  2. 创建 spring-rabbitmq-consumersrcmainresourcesspringspring-rabbitmq.xml 整合 配置文件;





















队列监听器

创建 spring-rabbitmqconsumersrcmainjavacomlxsrabbitmqlistenerSpringQueueListener.java

public class SpringQueueListener implements MessageListener{
public void onMessage(Message message) {
try {
String msg = new String(message.getBody(), "utf-8");
System.out.printf("接收路由名称为:%s,路由键为:%s,队列名为:%s的消息:%s n",
message.getMessageProperties().getReceivedExchange(),
message.getMessageProperties().getReceivedRoutingKey(),
message.getMessageProperties().getConsumerQueue(),
msg);
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
}

广播监听器

创建spring-rabbitmqconsumersrcmainjavacomlxsrabbitmqlistenerFanoutListener1.java

public class FanoutListener1 implements MessageListener {
public void onMessage(Message message) {
try {
String msg = new String(message.getBody(), "utf-8");
System.out.printf("广播监听器1:接收路由名称为:%s,路由键为:%s,队列名为:
%s的消息:%s n",
message.getMessageProperties().getReceivedExchange(),
message.getMessageProperties().getReceivedRoutingKey(),
message.getMessageProperties().getConsumerQueue(),
msg);
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
}

Spring Boot整合RabbitMQ

在Spring项目中,可以使用Spring-Rabbit去操作RabbitMQ https://github.com/spring-projects/spring-amqp

尤其是在spring boot项目中只需要引入对应的amqp启动器依赖即可,方便的使用RabbitTemplate发 送消息,使用注解接收消息。

一般在开发过程中:

生产者工程:

    application.yml文件配置RabbitMQ相关信息

    在生产者工程中编写配置类,用于创建交换机和队列,并进行绑定

    注入RabbitTemplate对象,通过RabbitTemplate对象发送消息到交换机

消费者工程:

    application.yml文件配置RabbitMQ相关信息

    创建消息处理类,用于接收队列中的消息并进行处理

创建工程,添加依赖



4.0.0

org.springframework.boot
spring-boot-starter-parent
2.1.4.RELEASE

com.lxs
springboot-rabbitmq-producer
1.0-SNAPSHOT


org.springframework.boot
spring-boot-starter-amqp


org.springframework.boot
spring-boot-starter-test


启动类

package com.lxs.rabbitmq;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class ProducerApplication {
	public static void main(String[] args) {
		SpringApplication.run(ProducerApplication.class);
	}
}

配置

 

package com.lxs.rabbitmq.config;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfig {
//交换机名称
public static final String ITEM_TOPIC_EXCHANGE =
"springboot_item_topic_exchange";
//队列名称
public static final String ITEM_QUEUE = "springboot_item_queue";
//声明交换机
@Bean("itemTopicExchange")
public Exchange topicExchange(){
return
ExchangeBuilder.topicExchange(ITEM_TOPIC_EXCHANGE).durable(true).build();
}
    //声明队列
@Bean("itemQueue")
public Queue itemQueue(){
return QueueBuilder.durable(ITEM_QUEUE).build();
}
//绑定队列和交换机
@Bean
public Binding itemQueueExchange(@Qualifier("itemQueue") Queue queue,
@Qualifier("itemTopicExchange") Exchange
exchange){
	return BindingBuilder.bind(queue).to(exchange).with("item.#").noargs();
	}
}

测试

 

 

消费者端

创建模块,添加依赖



4.0.0

org.springframework.boot
spring-boot-starter-parent
2.1.4.RELEASE

com.lxs
springboot-rabbitmq-consumer
1.0-SNAPSHOT

    
org.springframework.boot
spring-boot-starter-amqp


org.springframework.boot
spring-boot-starter-test



 消息监听处理

 

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/736371.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号