栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

RabbitMQ-入门一

RabbitMQ-入门一

1.RabbitMQ 四大核心概念
  1. 生产者
    产生数据发送消息的程序是生产者。
  2. 交换机
    交换机是 RabbitMQ 非常重要的一个部件,一方面它接收来自生产者的消息,另一方面它将消息推送到队列中。交换机必须确切知道如何处理它接收到的消息,是将这些消息推送到特定队列还是推送到多个队列,亦或者是把消息丢弃,这个得有交换机类型决定。
  3. 队列
    队列是 RabbitMQ 内部使用的一种数据结构,尽管消息流经 RabbitMQ 和应用程序,但它们只能存储在队列中。队列仅受主机的内存和磁盘限制的约束,本质上是一个大的消息缓冲区。许多生产者可以将消息发送到一个队列,许多消费者可以尝试从一个队列接收数据。这就是我们使用队列的方式。
  4. 消费者
    消费与接收具有相似的含义。消费者大多时候是一个等待接收消息的程序。请注意生产者,消费
    者和消息中间件很多时候并不在同一机器上。同一个应用程序既可以是生产者又是可以是消费者(这种现象我们称之为自产自销)。
2.RabbitMQ 名词介绍

首先我们看下RabbitMQ的工作原理图

  1. Broker
    接收和分发消息的应用服务,RabbitMQ Server 就是 Message Broker。
  2. Virtual host
    出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中,类似于网络中的 namespace 概念。当多个不同的用户使用同一个 RabbitMQ server 提供的服务时,可以划分出多个 vhost,每个用户在自己的 vhost 创建 exchange/queue 等
  3. Connection
    producer/consumer 和 broker 之间的 TCP 连接。
  4. Channel
    如果每一次访问 RabbitMQ 都建立一个 Connection,在消息量大的时候建立 TCP Connection 的开销将是巨大的,效率也较低。Channel 是在 connection 内部建立的逻辑连接,如果应用程序支持多线程,通常每个 thread 创建单独的 channel 进行通讯,AMQP method 包含了 channel id 帮助客户端和 message broker 识别 channel,所以 channel 之间是完全隔离的。Channel 作为轻量级的Connection极大减少了操作系统建立 TCP connection 的开销
  5. Exchange
    message 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发消息到 queue 中去。常用的类型有:direct (point-to-point), topic (publish-subscribe) and fanout (multicast)。
  6. Queue
    消息最终被送到这里等待 consumer 取走。详情可以见队列介绍
  7. Binding
    exchange 和 queue 之间的虚拟连接,binding 中可以包含 routing key,Binding 信息被保
    存到 exchange 中的查询表中,用于 message 的分发依据。
3.RabbitMQ 安装

安装见
Linux下搭建RabbitMQ(3.9.8)版本
Docker下安装RabbitMQ

4.RabbitMQ 快速入门 4.1.添加RabbitMQ 依赖

创建Maven项目或者SpringBoot项目。

	    
 		
 			com.rabbitmq
 			amqp-client
 		
 		
 		
 			commons-io
 			commons-io
 			2.6
 		
4.2.连接RabbitMQ
public class RabbitUtils {
	
	public static Channel connectionRabbitMQ(String host,String username,String password) throws IOException, TimeoutException {
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost(host);
		factory.setUsername(username);
		factory.setPassword(password);
		Connection connection = factory.newConnection();
		Channel channel = connection.createChannel();
		return channel;
	}
}
4.3.发送消息给RabbitMQ
public class Producer {
	//队列名称
	private final static String QUEUE_NAME = "hello";
	//换成你MQ的地址和账号密码
	private static String host = "xxxxx";
	private static String username = "xxxx";
	private static String password = "xxxxx";
	
	public static void main(String[] args) throws Exception {
		Channel channel = RabbitUtils.connectionRabbitMQ(host, username, password);
		channel.queueDeclare(QUEUE_NAME,false,false,false,null);
		//消息
		String message="hello world";
		channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
		System.out.println("消息发送完毕");
	}
4.4.检查RabbitMQ是否接收到消息

在上述我们发送消息时,队列为String QUEUE_NAME = “hello”;
消息内容为:String message=“hello world”;
通过网页登录我们的MQ,我们可以发现我们定义的队列“hello”,

点击hello队列,查看我们消息内容。可以发现我们的消息:“hello world”。

4.5.从RabbitMQ 消费消息
public class Consumer {
	private final static String QUEUE_NAME = "hello";
	private static String host = "49.232.70.33";
	private static String username = "admin";
	private static String password = "admin";
	
	public static void main(String[] args) throws Exception {
		
		Channel channel = RabbitUtils.connectionRabbitMQ(host, username, password);
		//取消息
		DeliverCallback deliverCallback=(consumerTag,delivery)->{
			String message= new String(delivery.getBody());
			System.out.println(message);
		};
		//取消消费的一个回调接口 如在消费的时候队列被删除掉了
		CancelCallback cancelCallback=(consumerTag)->{
			System.out.println("消息消费被中断");
		};
		channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
	}
}

通过执行上述代码,我们取到了消息。

4.6.检查RabbitMQ中的消息是否消费

当我们进行上述代码消费以后,再去MQ中查看这条消息时,发现已经为空了,因此被我们消费了。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/433825.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号