栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 其他

Rabbitmq集群

其他 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Rabbitmq集群

Rabbitmq集群

镜像队列

FederationExchange 为什么使用【联合交换机】?

(broker 北京),(broker 深圳)彼此之间相距甚远,网络延迟是一个不得不面对的问题。有一个在北京

的业务(Client 北京) 需要连接(broker 北京),向其中的交换器 exchangeA 发送消息,此时的网络延迟很小,

(Client 北京)可以迅速将消息发送至 exchangeA 中,就算在开启了 publisherconfirm 机制或者事务机制的

情况下,也可以迅速收到确认信息。此时又有个在深圳的业务(Client 深圳)需要向 exchangeA 发送消息,

那么(Client 深圳) (broker 北京)之间有很大的网络延迟,(Client 深圳) 将发送消息至 exchangeA 会经历一

定的延迟,尤其是在开启了 publisherconfirm 机制或者事务机制的情况下,(Client 深圳) 会等待很长的延

迟时间来接收(broker 北京)的确认信息,进而必然造成这条发送线程的性能降低,甚至造成一定程度上的

阻塞。

将业务(Client 深圳)部署到北京的机房可以解决这个问题,但是如果(Client 深圳)调用的另些服务都部

署在深圳,那么又会引发新的时延问题,总不见得将所有业务全部部署在一个机房,那么容灾又何以实现?

这里使用 Federation 插件就可以很好地解决这个问题

搭建步骤
  1. 需要保证每台节点单独运行
  2. 在每台机器上开启 federation 相关插件
$ rabbitmq-plugins enable rabbitmq_federation
$ rabbitmq-plugins enable rabbitmq_federation_management
  1. 原理图(先运行 consumer 在 node2 创建 fed_exchange)
public class Task2 {

    //创建队列
    public static final String FED_EXCHANGE = "fed_exchange";

    public static void main(String[] args) throws Exception {
        //创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUsername("admin");
        factory.setPassword("123456");
        factory.setVirtualHost("/");
        //设置 RabbitMQ 地址
        factory.setHost("192.168.25.31");
        //建立到代理服务器到连接
        Connection conn = factory.newConnection();
        //获得信道
        Channel channel = conn.createChannel();
        //声明交换器
        channel.exchangeDeclare(FED_EXCHANGE, BuiltinExchangeType.DIRECT, true);
        channel.queueDeclare("node1_queue",true,false,false,null);
        //绑定交换机和队列
        channel.queueBind("node1_queue",FED_EXCHANGE,"routeKey");
        //发布消息
//        byte[] messageBodyBytes = "quit".getBytes();
//        channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes);

        
        channel.basicConsume("node1_queue",false,"",new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body, "UTF-8");
                System.out.println("接收到的消息: -》 " + msg);
            }
        });
//        channel.close();
//        conn.close();

    }
}
  1. 在 downstream(node2)配置 upstream(node1)

  1. 添加 policy

  1. 成功的前提

FederationQueue

联邦队列

Shovel

Federation 具备的数据转发功能类似,Shovel 够可靠、持续地从一个 Broker 中的队列(作为源端,即

source)拉取数据并转发至另一个 Broker 中的交换器(作为目的端,即 destination)。作为源端的队列和作

为目的端的交换器可以同时位于同一个 Broker,也可以位于不同的 Broker 上。Shovel 可以翻译为"铲子",

是一种比较形象的比喻,这个"铲子"可以将消息从一方"铲子"另一方。Shovel 行为就像优秀的客户端应用

程序能够负责连接源和目的地、负责消息的读写及负责连接失败问题的处理

搭建步骤
  1. 开启插件(需要的机器都开启)
$ rabbitmq-plugins enable rabbitmq_shovel
$ rabbitmq-plugins enable rabbitmq_shovel_management
  1. 原理图
  1. 添加 shovel 源和目的地
l 源和目的地
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/279376.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号