栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

RabbitMQ六种工作模式—路由模式

RabbitMQ六种工作模式—路由模式

1.发布订阅模式下,我们构建了一个简单的日志记录系统,我们能够向许多接收者广播日志消息。而在路由模式下,我们将向其中添加一些特别的功能,比如说我们只让某个消费者订阅发布的部分消息,例如我们只把严重错误的消息定向存储到日志文件(以节省磁盘空间),同时仍然能够在控制台上打印所有日志消息 

2.Fanout 这种交换机类型并不能给我们带来很大的灵活性,它只能进行无意识的广播,在这里我们将使用 direct 这种交换机类型来进行替换,这种交换机类型的工作方式是消息只去到它绑定的 routingKey 队列中去

在这种绑定情况下,生产者发布消息到 交换机上,绑定键为 orange 的消息会被发布到队列 Q1,绑定键为 black、green的消息会被发布到队列 Q2,其他消息类型的消息将被丢弃

 3.实战演示

package org.example.direct;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;


public class Producer {
    // 交换机名称
    public static final String EXCHANGE_NAME="direct_logs";
    public static void main(String[] args) throws Exception{
        ConnectionFactory factory=new ConnectionFactory();
        factory.setHost("112.124.16.34");
        factory.setUsername("admin");
        factory.setPassword("123456");
        factory.setPort(5672);
        Connection connection=factory.newConnection();
        Channel channel=connection.createChannel();
        
        String message="正常日志消息!";
        channel.basicPublish(EXCHANGE_NAME,"info",null,message.getBytes("UTF-8"));
        String message1="警告消息!";
        channel.basicPublish(EXCHANGE_NAME,"warning",null,message1.getBytes("UTF-8"));
        String message2="错误消息!";
        channel.basicPublish(EXCHANGE_NAME,"error",null,message2.getBytes("UTF-8"));
        System.out.println("发送完毕");
        // 关闭通道和连接
        channel.close();
        // ***关闭连接***
        connection.close();
    }
}
package org.example.direct;

import com.rabbitmq.client.*;


public class ReceiveLogs01 {
    // 交换机的名称
    public static final String EXCHANGE_NAME="direct_logs";
    public static void main(String[] args) throws Exception{
        ConnectionFactory factory=new ConnectionFactory();
        factory.setHost("112.124.16.34");
        factory.setUsername("admin");
        factory.setPassword("123456");
        Connection connection= factory.newConnection();
        Channel channel=connection.createChannel();
        // 声明一个direct交换机
        channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.DIRECT);
        // 声明一个队列
        channel.queueDeclare("console",false,false,true,null);
        // 绑定
        channel.queueBind("console",EXCHANGE_NAME,"info");
        channel.queueBind("console",EXCHANGE_NAME,"warning");
        
        // 接收和处理消息的回调对象
        DeliverCallback deliverCallback=(consumerTag, message)->{
            String mes=new String(message.getBody(),"UTF-8");
            System.out.println("logs01接收到消息:"+mes);
        };
        // 消费者取消时的回调对象
        CancelCallback cancelCallback=(consumerTag)->{
            System.out.println("消费被中断");
        };
        channel.basicConsume("console",true,deliverCallback,cancelCallback);
        // channel.close();
        // ***关闭连接***
        // connection.close();
    }
}
package org.example.direct;

import com.rabbitmq.client.*;


public class ReceiveLogs02 {
    // 交换机的名称
    public static final String EXCHANGE_NAME="direct_logs";
    public static void main(String[] args) throws Exception{
        ConnectionFactory factory=new ConnectionFactory();
        factory.setHost("112.124.16.34");
        factory.setUsername("admin");
        factory.setPassword("123456");
        Connection connection= factory.newConnection();
        Channel channel=connection.createChannel();
        // 声明一个direct交换机
        channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.DIRECT);
        // 声明一个队列
        channel.queueDeclare("disk",false,false,true,null);
        // 绑定
        channel.queueBind("disk",EXCHANGE_NAME,"error");
        
        // 接收和处理消息的回调对象
        DeliverCallback deliverCallback=(consumerTag, message)->{
            String mes=new String(message.getBody(),"UTF-8");
            System.out.println("logs02接收到消息:"+mes);
        };
        // 消费者取消时的回调对象
        CancelCallback cancelCallback=(consumerTag)->{
            System.out.println("消费被中断");
        };
        channel.basicConsume("disk",true,deliverCallback,cancelCallback);
        // channel.close();
        // ***关闭连接***
        // connection.close();
    }
}

 

 

 

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/350640.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号