栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

RabbitMQ

RabbitMQ

RabbitMQ

MQ:
消息队列, 本质是个 队列, FIFO 先入先出原则
在队列中 存放的 是 message,
是一种 跨进程的 通信机制,用于 上下游传递消息

使用 MQ 之后, 消息发送者 只需要 依赖 MQ ,不需要 依赖 其他服务

MQ 三大功能:

  1. 流量消峰

优点 : 访问的人员 进行 排队 , 使得 服务器 不会 因为 压力过大而导致 宕机


  1. 应用解耦


  1. 异步处理

MQ的分类

ActiveMq kafka RocketMq RabbitMQ



RabbitMQ
在AMQP(高级消息队列协议)基础上完成的 可复用的企业消息系统

由于使用 erlang语言 故支持多种语言

高并发性 性能较好 吞吐量达到万级

Mq 功能 完善 健壮 稳定 易用 跨平台 支持 AJAX 文档齐全

MQ 的选择

3.RabbitMQ
结合 erlang 语言本身的并发优势,性能好时效性微秒级,社区活跃度也比较高,管理界面用起来十分
方便,如果你的数据量没有那么大,中小型公司优先选择功能比较完备的 RabbitMQ。


RabbitMq 概念

是一个 消息中间件 ,接受 并 转发 消息

主要用于 接收 存储 转发 消息数据

四项核心概念

生产者

交换机

队列

消费者


RabbitMq 核心部分 (6大模式)

简单模式 队列模式 发布订阅模式 路由模式 主题模式 发布确认模式

工作原理

Broker:接收和分发消息的应用,
RabbitMQ Server 就是 Message Broker

Virtual host:出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中,类似于网络中的 namespace 概念。当多个不同的用户使用同一个 RabbitMQ server 提供的服务时,可以划分出
多个 vhost,每个用户在自己的 vhost 创建 exchange/queue 等

Connection:publisher/consumer 和 broker 之间的 TCP 连接

Channel(信道):如果每一次访问 RabbitMQ 都建立一个 Connection,在消息量大的时候建立 TCPConnection 的开销将是巨大的,效率也较低。Channel 是在 connection 内部建立的逻辑连接,如果应用程序支持多线程,通常每个 thread 创建单独的 channel 进行通讯,AMQP method 包含了 channel id 帮助客户端和 message broker 识别 channel,所以 channel 之间是完全隔离的。Channel 作为轻量级的

Connection 中的Channel 信道 极大减少了操作系统建立 TCP connection 的开销

Exchange(交换机):message 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发消息到 queue 中去。常用的类型有:direct (point-to-point), topic (publish-subscribe) and fanout
(multicast)

Queue:消息最终被送到这里等待 consumer 取走

Binding:exchange 和 queue 之间的虚拟连接,binding 中可以包含 routing key,Binding 信息被保存到 exchange 中的查询表中,用于 message 的分发依据


Mq 的使用

  1. Hello Word 简单模式
package One;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;



public class Producer {
    //队列名
    public final  static String QUEUE_NAME="hello";

    public static void main(String[] args) {
        //创建连接工厂
        ConnectionFactory factory=new ConnectionFactory();

        factory.setHost("192.168.31.112");
        factory.setUsername("admin");
        factory.setPassword("123");

        //channel 实现了自动 close 接口 自动关闭 不需要显示关闭
        //创建连接 //获取信道
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()){
            
            channel.queueDeclare(QUEUE_NAME,false,false,false,null);
            String message="hello world";
            
            channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
            System.out.println("初次发送消息");
        }  catch (Exception e) {
            e.printStackTrace();
        }
    }

}
import com.rabbitmq.client.*;

import java.io.IOException;

public class Consumer {
    private final static String QUEUE_NAME = "hello";


    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.31.112");
        factory.setUsername("admin");
        factory.setPassword("123");
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
            System.out.println("等待接收消息.........");
                //推送的消息如何进行消费的接口回调
            DeliverCallback deliverCallback=(consumerTag, delivery)->{
                String message= new String(delivery.getBody());
            System.out.println(message);
            };
        //取消消费的一个回调接口 如在消费的时候队列被删除掉了
            CancelCallback cancelCallback=(consumerTag)->{
                System.out.println("消息消费被中断");
            };
                
            channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
        }

}

DeliverCallback 传递消息时的回调
CancelCallback 消费信息时的回调


  1. Work Queues 工作队列模式

工作队列(又称任务队列)的主要思想是

避免立即执行资源密集型任务,而不得不等待它完成。
相反我们安排任务在之后执行。
我们把任务封装为消息并将其发送到队列。
在后台运行的工作进程将弹出任务并最终执行作业。
当有多个工作线程时,这些工作线程将一起处理这些任务。


轮训分发消息

工作线程之间 是 竞争关系

package Two;

import com.rabbitmq.client.Channel;
import utlis.RabbitMqUtils;

import java.util.Scanner;

public class Task01 {
    private static final String QUEUE_NAME="hello";
    public static void main(String[] args) throws Exception {
        try(Channel channel= RabbitMqUtils.getChannel();)
        { channel.queueDeclare(QUEUE_NAME,false,false,false,null);
            //从控制台当中接受信息
            Scanner scanner = new Scanner(System.in);
            while (scanner.hasNext()){
                String message = scanner.next();
                channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
                System.out.println("发送消息完成:"+message);
            }
        }
    }

}

package Two;

import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import utlis.RabbitMqUtils;



public class Worker01 {

    private static final String QUEUE_NAME="hello";
    public static void main(String[] args) throws Exception
    {
        Channel channel = RabbitMqUtils.getChannel();
        DeliverCallback deliverCallback=(consumerTag, delivery)->{
            String receivedMessage = new String(delivery.getBody());
            System.out.println("接收到消息:"+receivedMessage);
        };
        CancelCallback cancelCallback=(consumerTag)->{
            System.out.println(consumerTag+"消费者取消消费接口回调逻辑");

    };
        System.out.println("C2 消费者启动等待消费.................. ");
        channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
    }
}






转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/335456.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号