栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

【自撰】RabbitMQ集成java代码实现(简单模式,工作模式,订阅模式,路由模式,主题模式)

【自撰】RabbitMQ集成java代码实现(简单模式,工作模式,订阅模式,路由模式,主题模式)

java集成RabbitMQ

导入maven依赖


        
        
            com.rabbitmq
            amqp-client
            5.12.0
        
    
首先在Win

dows 或者 Linux开启rabbitMQ-Server服务,再进行java代码集成。

1. 简单模式
  1. 编辑Util类

    private static final String HOST = "192.168.43.17"; // 设置IP地址
    private static final String VIRTUALHOST = "/"; // 虚拟主机
    private static final String USERNAME = "guest"; // 用户名
    private static final String PASSWORD = "guest"; // 密码
    private static final Integer PORT = 5672; // rabbitmq-server 端口
    
    public Connection getConnection () {
    Connection connection = null;
    // 获取链接
    ConnectionFactory connectionFactory = new ConnectionFactory();
    connectionFactory.setHost(HOST);
    connectionFactory.setVirtualHost(VIRTUALHOST);
    connectionFactory.setUsername(USERNAME);
    connectionFactory.setPassword(PASSWORD);
    // rabbitmq 的服务器地址 15672:给rabbitmq management web程序,插件 web端客户端管理工具
    //5672:给rabbitmq-server 服务器的
    connectionFactory.setPort(PORT);
    // 建立链接
    try {
        Connection newConnection = connectionFactory.newConnection();
        connection = newConnection;
    } catch (IOException e) {
        System.out.println("连接失败!!!");
        e.printStackTrace();
    } catch (TimeoutException e) {
        System.out.println("连接超时!!!");
        e.printStackTrace();
    }
    return connection;
    }
    
  2. 编写消息队列发布者

    public static void main(String[] args) throws IOException, TimeoutException {
    Connection getConnection = new ConnectionUtil().getConnection();
    // 创建频道
    Channel channel = getConnection.createChannel();
    
    // 声明队列 (队列名称  是否可持久化  是否独占  是否自动删除  配置其他参数)
    channel.queueDeclare("zjw_test", true, false, false, null);
    // 消息发布 (交换机名  路由名  其他属性  消息的字节数组)
    channel.basicPublish("","zjw_test",null,"Hello Word!!!".getBytes());
    // 关闭连接
    getConnection.close();
    }
    
  3. 编写消息队列消费者

    public static void main(String[] args) throws IOException {
    // 获取连接
    Connection getConnection = new ConnectionUtil().getConnection();
    // 创建频道
    Channel channel = getConnection.createChannel();
    // 声明队列
    channel.queueDeclare("zjw_test",true,false,false,null);
    
    DeliverCallback deliverCallback = (consumerTag, delivery) -> {
        String message = new String(delivery.getBody(), "UTF-8");
        System.out.println(" [x] Received '" + message + "'");
    };
    channel.basicConsume("zjw_test", true, deliverCallback, consumerTag -> {});
    }
    
2. 工作模式 (一个发布者多个消费者)
消费者不做关闭连接的操作  同时运行在后台 再启动发布者




总结:工作模式就是多人同时消费一个队列(并且队列会平均分配给消费者)

3. 发布/订阅(模式)

多了声明交换机和绑定交换机的操作

  1. 发布者
public static void main(String[] args) throws Exception {
        // 获取连接
        Connection connection = new ConnectionUtil().getConnection();
        // 创建频道
        Channel channel = connection.createChannel();
        // 声明队列   (队列名称 是否持久化 是否独占队列 是否自动删除 其他属性)
        channel.queueDeclare("fanout_message1",true,false,false,null);
        channel.queueDeclare("fanout_message2",true,false,false,null);
        // 声明交换机 (交换机名 类型)
        channel.exchangeDeclare("fanout_exchange", BuiltinExchangeType.FANOUT);
        // 交换机绑定队列  (队列名 交换机名 路由)
        channel.queueBind("fanout_message1","fanout_exchange","");
        channel.queueBind("fanout_message2","fanout_exchange","");
        // 发布消息
        String message = "消息队列:发布/订阅模式";
        // channel.basicPublish("fanout_exchange","",null,message.getBytes());
        for (int i = 0; i < 10; i++) {
            channel.basicPublish("fanout_exchange","",null,message.getBytes());
        }
        // 关闭资源
        channel.close();
        connection.close();
    }
  1. 消费者
public static void main(String[] args) throws Exception {
        // 获取连接
        Connection connection = new ConnectionUtil().getConnection();
        // 创建频道
        Channel channel = connection.createChannel();
        // 声明队列
        channel.queueDeclare("fanout_message1",true,false,false,null);
        // 声明函数
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag,
                                       Envelope envelope,
                                       AMQP.BasicProperties properties,
                                       byte[] body)
                    throws IOException
            {
                // 如果之前没有指定,则会自动生成一个消费者标签,对消费者进行唯一标识
                System.out.println("consumerTag:"+consumerTag);
                // 可以通过envelope获取exchange和routingkey信息
                System.out.println("exchange:"+envelope.getExchange()+";routingkey:"+envelope.getRoutingKey()
                        +";deliveryTag:"+envelope.getDeliveryTag());
                // 打印消息
                System.out.println("fanout_message1:" + new String(body,"UTF-8"));
            }
        };
        // 消费队列  (队列名 是否自动化 函数名)
        channel.basicConsume("fanout_message1",true,defaultConsumer);
    }

总结:发布/订阅模式指的是将消息平均发布到交换机里(消费者可根据队列名消费分配到的队列消息)

4. 路由模式

填写了routingKey路由名称

  1. 消息队列发布者
public static void main(String[] args) throws Exception{
        // 获取连接
        Connection connection = new ConnectionUtil().getConnection();
        // 创建频道
        Channel channel = connection.createChannel();
        // 声明队列
        channel.queueDeclare("routing_message1",true,false,false,null);
        channel.queueDeclare("routing_message2",true,false,false,null);
        // 声明交换机 (交换机名 类型)
        channel.exchangeDeclare("routing_exchange", BuiltinExchangeType.DIRECT);
        // 绑定交换机 (队列名 交换机名 路由)
        channel.queueBind("routing_message1","routing_exchange","insert");
        channel.queueBind("routing_message2","routing_exchange","update");
        channel.queueBind("routing_message1","routing_exchange","delete");
        channel.queueBind("routing_message2","routing_exchange","select");
        // 发布消息
        channel.basicPublish("routing_exchange","insert",null,"路由模式insert".getBytes());
        channel.basicPublish("routing_exchange","update",null,"路由模式update".getBytes());
        channel.basicPublish("routing_exchange","delete",null,"路由模式delete".getBytes());
        channel.basicPublish("routing_exchange","select",null,"路由模式select".getBytes());
        // 关闭资源
        channel.close();
        connection.close();
    }
  1. 消息队列消费者
public static void main(String[] args) throws Exception {
        // 获取连接
        Connection connection = new ConnectionUtil().getConnection();
        // 创建频道
        Channel channel = connection.createChannel();
        // 声明队列
        channel.queueDeclare("routing_message1",true,false,false,null);
        // defautl函数
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag,
                                       Envelope envelope,
                                       AMQP.BasicProperties properties,
                                       byte[] body)
                    throws IOException
            {
                System.out.println("消费1:"+new String(body,"utf-8"));
            }
        };
        // 消费队列
        channel.basicConsume("routing_message1",defaultConsumer);
    }

总结:路由模式是指 将消息发布在不同的交换机并且在交换机分配不同的路由空间

5. 主题模式

基于路由模式 比路由模式更实用(推荐)

  1. 发布者
public static void main(String[] args) throws Exception{
        // 获取连接
        Connection connection = new ConnectionUtil().getConnection();
        // 创建频道
        Channel channel = connection.createChannel();
        // 声明队列
        channel.queueDeclare("topic_message1",true,false,false,null);
        // 声明交换机
        channel.exchangeDeclare("topic_exchange", BuiltinExchangeType.TOPIC);
        // 绑定交换机 (队列名 交换机名 路由名(#代表下面的所有 *代表下一个目录))
        channel.queueBind("topic_message1","topic_exchange","item.*");
        channel.queueBind("topic_message2","topic_exchange","menu.#");
        // 发布消息
        channel.basicPublish("topic_exchange","item.delete",null,"商品的删除".getBytes());
        channel.basicPublish("topic_exchange","item.update",null,"商品的修改".getBytes());
        channel.basicPublish("topic_exchange","item.insert.del",null,"商品的新增下面的删除".getBytes());

        channel.basicPublish("topic_exchange","menu.system",null,"菜单的系统设置".getBytes());
        channel.basicPublish("topic_exchange","menu.system.sel",null,"菜单的系统设置下的查询".getBytes());
        channel.basicPublish("topic_exchange","menu.system.info",null,"菜单的系统设置下的个人信息".getBytes());
        // 关闭资源
        channel.close();
        connection.close();
    }
  1. 消费者
public static void main(String[] args) throws Exception{
        // 获取连接
        Connection connection = new ConnectionUtil().getConnection();
        // 创建频道
        Channel channel = connection.createChannel();
        // 声明队列
        channel.queueDeclare("topic_message1",true,false,false,null);
        // 默认函数
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag,
                                       Envelope envelope,
                                       AMQP.BasicProperties properties,
                                       byte[] body)
                    throws IOException
            {
                System.out.println("消费1:"+new String(body,"utf-8"));
            }
        };
        // 消费队列
        channel.basicConsume("topic_message1",defaultConsumer);
    }

参考项目点击获取连接

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/699995.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号