栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

SpringBoot2整合RabbitMQ入门

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

SpringBoot2整合RabbitMQ入门

一、消息队列概述 1. 消息服务中两个重要概念:

消息代理(message broker)和 目的地(Destination)
当消息发送者发出消息以后,将由消息代理接管,消息代理保证消息传递到指定目的地。

2. 消息队列主要有两种形式的目的地
1. 队列(queue): 点对点消息通信(point-to-point)
2. 主题(topic): 发布(publish)/ 订阅(subscribe)消息通信
  • 点对点式
    消息发送者发送消息,消息代理将其放入一个队列中,消息接收者从队列中获取消息内容,消息读取后被移出队列
  • 发布订阅式
    发送者(发布者)发送消息到主题,多个接收者(订阅者)监听(订阅)这个主题,那么就会在消息到达时,同时收到消息
  • JMS(Java Message Service)JAVA消息服务
    基于JVM消息代理的规范。ActivateMQ、HornetMQ是JMS实现
  • AMQP(Advanced Message Queuing Protocol)
    高级消息队列协议,也是一个消息代理的规范,兼容JMS
    RabbitMQ是AMQP的实现
二、RabbitMQ概念 1. 核心概念

  1. Message
    消息:消息是不具名的,它由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可能需要持久性存储)等。

  2. Publisher
    消息的生产者:一个向交换器发布消息的客户端应用程序。

  3. Exchange
    交换器:用来接收生产者发送的消息并将这些消息路由给服务器中的队列。
    Exchange有四种类型:direct (默认),fanout,topic与headers,不同类型的Exchange转发消息的策略有所区别。

  4. Queue
    消息队列:用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。

  5. Binding
    绑定:用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。
    Exchange 和 Queue 的绑定可以是多对多的关系。

  6. Connection
    网络连接,比如一个TCP连接。

  7. Channel
    信道:多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内的虚拟连接,AMQP命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说建立和销毁TCP都是非常昂贵的开销,所以引入了信道的概念,以复用一条TCP连接。

  8. Consumer
    消息的消费者:表示一个从消息队列中取得消息的客户端应用程序。

  9. Virtual Host
    虚拟主机:表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证与加密环境的独立服务器域。每个 vhost 本质上就是一个mini版的 RabbitMQ 服务器,拥有自己的队列、交换器、绑定和权限机制。vhost 是 AMQP 概念的基础,必须在连接时指定,RabbitMQ 默认的 vhost 是 /

  10. Broker
    表示消息队列服务器实体。

三、Docker 安装 RabbitMQ
docker run -d --name rabbitmq -p 5671:5671 -p 5672:5672 -p 4369:4369 -p 25672:25672 -p 15671:15671 -p 15672:15672 rabbitmq:management

docker update rabbitmq --restart=always

docker ps (查看当前的容器状态)

4369,25672 (Erlang发现 & 集群端口)
5672,5671 (AMQP端口)
15672 (web管理后台端口)
61613,61614 (STOMP协议端口)
1883,8883(MQTT协议端口)

进入到后台管理界面

http://xxx(你的ip):15672

进入后,默认用户名密码都是guest

四、RabbitMQ 运行机制 1. AMQP 中的消息路由

2. Exchange 类型

Exchange 分发消息时根据类型的不同分发策略有区别,目前共四种类型:direct、fanout、topic、headers
headers 匹配 AMQP 消息的 header 而不是路由键,header 交换器和 direct 交换器完全一致,但性能差很多,目前几乎用不到。

1)Direct Exchange


消息中的路由键 (routing key) 如果和 Binding 中的 binding key 一致,交换器就将消息发到对应的队列中。路由键与队列名完全匹配,如果一个队列绑定到交换机要求路由键为 “dog”,则只转发 routing key 标记为 “dog” 的消息,不会转发 “dog.puppy” 等等。它是完全匹配、单播的模式。

2)Fanout Exchange


每个发到 fanout 类型交换器的消息都会分到所有绑定的队列上去。fanout交换器不处理路由键,知识简单的将队列绑定到交换器上,每个发送到交换器的消息都会被转发到与该交换器绑定的所有队列上。比较像子网广播,每个子网内的主机都获得了一份复制的消息。fanout类型转发消息是最快的。

3)Topic Exchange

topic 交换器通过模式匹配分配消息的路由键属性,将路由键和某个模式进行匹配,此时队列需要绑定到一个模式上。它由路由键和绑定键的字符串切分成单词,这些单词之间用点隔开。它同样也会识别两个通配符:符号 “ # ” 和符号 " * "。# 匹配 0 个或多个单词, * 匹配一个单词。

五、SpringBoot 整合 RabbitMQ 1. 引入spring-boot-starter-amqp

	org.springframework.boot
	spring-boot-starter-amqp

2. 配置文件 application.properties
spring.rabbitmq.host=xxx(你的ip)
spring.rabbitmq.port=5672
spring.rabbitmq.virtual-host=/
3. 使用方法
  1. 引入了amqp场景:RabbitAutoConfiguration就会自动生效
  2. 给容器中自动配置了
    RabbitTemplate、AmqpAdmin、CachingConnectionFactory、RabbitMessagingTemplate
    所有的属性都在@ConfigurationProperties(prefix = “spring.rabbitmq”)
  3. @EnableRabbit 开启功能
@EnableRabbit
@SpringBootApplication
public class RabbitApplication{
	public static void main(String[] args){
		SpringApplication.run(RabbitApplication.class, args);
	}
}
@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitApplicationTests{

	@Autowired
	AmqpAdmin amqpAdmin;

	@Autowired
	RabbitTemplate rabbitTemplate;
	
	
	@Test
	public void createExchange(){
		// Exchange
		
		DirectExchange directExchange = new DirectExchange("hello-java-exchange", true, false);
		amqpAdmin.declareExchange(directExchange);
		log.info("Exchange[{}]创建成功", "hello-java-exchange");
	}

	@Test
	public void createQueue(){
		
		Queue queue = new Queue("hello-java-queue", true, false, false);
		amqpAdmin.declareQueue(queue);
		log.info("Queue[{}]创建成功", "hello-java-queue");
	}

	@Test
	public void createBinding(){
		
		Binding binding = new Binding("hello-java-queue",
				Binding.DestinationType.QUEUE,
				"hello-java-exchange",
				"hello.java",
				null);
		amqpAdmin.declareBinding(binding);
		log.info("Binding[{}]创建成功", "hello-java-binding");
	}

	@Test
	public void sendMessageTest(){
		// 1.发送消息 convertAndSend(String exchange, String routingKey, Object object)
		// 	 如果发送的消息是个对象,会使用序列化机制,将对象写出去。对象必须实现Serializable
		String msg = "Hello world!";
		rabbitTemplate.convertAndSend("hello-java-exchange", "hello.java", msg);
		log.info("消息发送完成{}", msg);
	}
}

可以将序列化的数据转化为JSON对象存储,需要自己编写配置类

import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class MyRabbitConfig{
	@Bean
	public MessageConverter messageConverter(){
		return new Jackson2JsonMessageConverter();
	}
}
  1. 监听消息:使用 @RabbitListener: 必须有 @EnableRabbit
    编写在业务类中
@Service
public class RabbitServiceImpl implements rabbitService{

	
	@RabbitListener(queues = {"hello-java-queue"})
	public void receiveMessage(Message message, HelloEntity content, Channel channel) throws InterruptedException{
		// 消息体
		byte[] body = message.getBody();
		// 消息头属性信息
		MessageProperties properties = message.getMessageProperties();
		System.out.println("收到消息:" + message + "==> 内容" + content);
	}
}

或者

@RabbitListener(queues = {"hello-java-queue"})
@Service
public class RabbitServiceImpl implements rabbitService{
	@RabbitHandler
	public void receiveMessage(HelloEntity content) throws InterruptedException{
		System.out.println("内容" + content);
	}
}
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/871659.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号