栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

RabbitMQ学习文档(进阶篇(Demo使用SpringBoot编写))

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

RabbitMQ学习文档(进阶篇(Demo使用SpringBoot编写))

目录
  • 一、依赖
  • 二、代码
    • 1、死信队列
      • 1.1、TTL过期
        • 1.1.1、生产者
        • 1.1.2、普通消费者
        • 1.1.3、死信消费者
      • 1.2、队列长度有限
        • 1.2.1、生产者
        • 1.2.2、普通消费者
        • 1.2.3、普通消费者
      • 1.3、消费者拒绝
        • 1.3.1、生产者
        • 1.3.2、普通消费者
        • 1.3.3、死信消费者
    • 2、优先级队列
      • 2.1、生产者
      • 2.2、消费者
    • 3、自定义延迟交换机
      • 3.1、生产者
      • 3.2、消费者
    • 4、备份交换机
      • 4.1、生产者
      • 4.2、普通消费者
      • 4.3、备份消费者

一、依赖

    com.rabbitmq
    amqp-client
    5.14.2

二、代码 1、死信队列 1.1、TTL过期 1.1.1、生产者
public class Provider {
    // 队列名称
    private static final String COMMON_EXCHANGE_NAME = "common_exchange";
    private static final String COMMON_QUEUE_NAME = "common_queue";

    public static void main(String[] args) {
        // 创建连接工厂,配置连接相关设置
        ConnectionFactory factory = new ConnectionFactory();
        
        factory.setHost("localhost");
        
        try (
                // 根据上面设置的参数来创建连接
                Connection connection = factory.newConnection();
                // 创建通道;一个连接里面可以创建多个通道,而通道用于消息发送和接收,这样也可以减少资源浪费
                Channel channel = connection.createChannel();
        ) {
            
            channel.exchangeDeclare(COMMON_EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
            
            AMQP.BasicProperties properties = new AMQP.BasicProperties("text/plain",
                    null,
                    null,
                    2,
                    0, null, null, null,
                    null, null, null, null,
                    null, null).builder().expiration("10000").build();
            for (int i = 1; i <= 10; i++) {
                // 消息内容
                String message = "Hello World!" + i;
                
                channel.basicPublish(COMMON_EXCHANGE_NAME, COMMON_QUEUE_NAME, properties, message.getBytes(StandardCharsets.UTF_8));
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
1.1.2、普通消费者
public class Consumer1 {
    // 队列名称
    private static final String COMMON_EXCHANGE_NAME = "common_exchange";
    private static final String DEAD_EXCHANGE_NAME = "dead_exchange";
    private static final String COMMON_QUEUE_NAME = "common_queue";
    private static final String DEAD_QUEUE_NAME = "dead_queue";

    public static void main(String[] args) {
        // 创建连接工厂,配置连接相关设置
        ConnectionFactory factory = new ConnectionFactory();
        
        factory.setHost("localhost");
        // 不使用try-with-resource语法的原因是“我们希望进程在消费者异步监听消息到达时始终保持活动状态”,不希望try中代码运行结束,就关闭Connection和Channel
        try {
            // 根据上面设置的参数来创建连接
            Connection connection = factory.newConnection();
            // 创建通道;一个连接里面可以创建多个通道,而通道用于消息发送和接收,这样也可以减少资源浪费
            Channel channel = connection.createChannel();
            // 创建普通交换机
            channel.exchangeDeclare(COMMON_EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
            // 创建死信交换机
            channel.exchangeDeclare(DEAD_EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
            // 为死信指明死信交换机和死信路由
            Map arguments = new HashMap<>();
            // 设置死信的目的交换机
            arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE_NAME);
            // 设置死信携带的路由
            arguments.put("x-dead-letter-routing-key", DEAD_QUEUE_NAME);
            
            channel.queueDeclare(COMMON_QUEUE_NAME, true, false, false, arguments);
            
            channel.queueDeclare(DEAD_QUEUE_NAME, true, false, false, null);
            // 绑定普通队列和普通交换机
            channel.queueBind(COMMON_QUEUE_NAME, COMMON_EXCHANGE_NAME, COMMON_QUEUE_NAME);
            // 绑定死信队列和死信交换机
            channel.queueBind(DEAD_QUEUE_NAME, DEAD_EXCHANGE_NAME, DEAD_QUEUE_NAME);
            // 消息创建中的回调对象;用来处理从队列异步推送过来的消息
            DeliverCallback deliverCallback = (consumerTag, message) -> {
                String str = new String(message.getBody(), StandardCharsets.UTF_8);
                System.out.println(str);
            };
            
            channel.basicConsume(COMMON_QUEUE_NAME, true, deliverCallback, consumerTag -> {
            });
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
1.1.3、死信消费者
public class Consumer2 {
    // 队列名称
    private static final String DEAD_EXCHANGE_NAME = "dead_exchange";
    private static final String DEAD_QUEUE_NAME = "dead_queue";

    public static void main(String[] args) {
        // 创建连接工厂,配置连接相关设置
        ConnectionFactory factory = new ConnectionFactory();
        
        factory.setHost("localhost");
        // 不使用try-with-resource语法的原因是“我们希望进程在消费者异步监听消息到达时始终保持活动状态”,不希望try中代码运行结束,就关闭Connection和Channel
        try {
            // 根据上面设置的参数来创建连接
            Connection connection = factory.newConnection();
            // 创建通道;一个连接里面可以创建多个通道,而通道用于消息发送和接收,这样也可以减少资源浪费
            Channel channel = connection.createChannel();
            // 创建死信交换机
            channel.exchangeDeclare(DEAD_EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
            // 创建死信队列
            channel.queueDeclare(DEAD_QUEUE_NAME, true, false, false, null);
            // 绑定死信队列和死信交换机
            channel.queueBind(DEAD_QUEUE_NAME, DEAD_EXCHANGE_NAME, DEAD_QUEUE_NAME);
            // 消息创建中的回调对象;用来处理从队列异步推送过来的消息
            DeliverCallback deliverCallback = (consumerTag, message) -> {
                String str = new String(message.getBody(), StandardCharsets.UTF_8);
                System.out.println(str);
            };
            
            channel.basicConsume(DEAD_QUEUE_NAME, true, deliverCallback, consumerTag -> {
            });
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
1.2、队列长度有限 1.2.1、生产者
public class Provider {
    // 队列名称
    private static final String COMMON_EXCHANGE_NAME = "common_exchange";
    private static final String COMMON_QUEUE_NAME = "common_queue";

    public static void main(String[] args) {
        // 创建连接工厂,配置连接相关设置
        ConnectionFactory factory = new ConnectionFactory();
        
        factory.setHost("localhost");
        
        try (
                // 根据上面设置的参数来创建连接
                Connection connection = factory.newConnection();
                // 创建通道;一个连接里面可以创建多个通道,而通道用于消息发送和接收,这样也可以减少资源浪费
                Channel channel = connection.createChannel();
        ) {
            
            channel.exchangeDeclare(COMMON_EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
            for (int i = 1; i <= 10; i++) {
                // 消息内容
                String message = "Hello World!" + i;
                
                channel.basicPublish(COMMON_EXCHANGE_NAME, COMMON_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes(StandardCharsets.UTF_8));
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
1.2.2、普通消费者
public class Consumer1 {
    // 队列名称
    private static final String COMMON_EXCHANGE_NAME = "common_exchange";
    private static final String DEAD_EXCHANGE_NAME = "dead_exchange";
    private static final String COMMON_QUEUE_NAME = "common_queue";
    private static final String DEAD_QUEUE_NAME = "dead_queue";

    public static void main(String[] args) {
        // 创建连接工厂,配置连接相关设置
        ConnectionFactory factory = new ConnectionFactory();
        
        factory.setHost("localhost");
        // 不使用try-with-resource语法的原因是“我们希望进程在消费者异步监听消息到达时始终保持活动状态”,不希望try中代码运行结束,就关闭Connection和Channel
        try {
            // 根据上面设置的参数来创建连接
            Connection connection = factory.newConnection();
            // 创建通道;一个连接里面可以创建多个通道,而通道用于消息发送和接收,这样也可以减少资源浪费
            Channel channel = connection.createChannel();
            // 创建普通交换机
            channel.exchangeDeclare(COMMON_EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
            // 创建死信交换机
            channel.exchangeDeclare(DEAD_EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
            // 为死信指明死信交换机和死信路由
            Map arguments = new HashMap<>();
            // 设置普通队列最大长度,也就是队列同时最多能存在的消息数量
            arguments.put("x-max-length", 6);
            // 设置死信的目的交换机
            arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE_NAME);
            // 设置死信携带的路由
            arguments.put("x-dead-letter-routing-key", DEAD_QUEUE_NAME);
            
            channel.queueDeclare(COMMON_QUEUE_NAME, true, false, false, arguments);
            
            channel.queueDeclare(DEAD_QUEUE_NAME, true, false, false, null);
            // 绑定普通队列和普通交换机
            channel.queueBind(COMMON_QUEUE_NAME, COMMON_EXCHANGE_NAME, COMMON_QUEUE_NAME);
            // 绑定死信队列和死信交换机
            channel.queueBind(DEAD_QUEUE_NAME, DEAD_EXCHANGE_NAME, DEAD_QUEUE_NAME);
            // 消息创建中的回调对象;用来处理从队列异步推送过来的消息
            DeliverCallback deliverCallback = (consumerTag, message) -> {
                String str = new String(message.getBody(), StandardCharsets.UTF_8);
                System.out.println(str);
            };
            
            channel.basicConsume(COMMON_QUEUE_NAME, true, deliverCallback, consumerTag -> {
            });
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
1.2.3、普通消费者
public class Consumer2 {
    // 队列名称
    private static final String DEAD_EXCHANGE_NAME = "dead_exchange";
    private static final String DEAD_QUEUE_NAME = "dead_queue";

    public static void main(String[] args) {
        // 创建连接工厂,配置连接相关设置
        ConnectionFactory factory = new ConnectionFactory();
        
        factory.setHost("localhost");
        // 不使用try-with-resource语法的原因是“我们希望进程在消费者异步监听消息到达时始终保持活动状态”,不希望try中代码运行结束,就关闭Connection和Channel
        try {
            // 根据上面设置的参数来创建连接
            Connection connection = factory.newConnection();
            // 创建通道;一个连接里面可以创建多个通道,而通道用于消息发送和接收,这样也可以减少资源浪费
            Channel channel = connection.createChannel();
            // 创建死信交换机
            channel.exchangeDeclare(DEAD_EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
            // 创建死信队列
            channel.queueDeclare(DEAD_QUEUE_NAME, true, false, false, null);
            // 绑定死信队列和死信交换机
            channel.queueBind(DEAD_QUEUE_NAME, DEAD_EXCHANGE_NAME, DEAD_QUEUE_NAME);
            // 消息创建中的回调对象;用来处理从队列异步推送过来的消息
            DeliverCallback deliverCallback = (consumerTag, message) -> {
                String str = new String(message.getBody(), StandardCharsets.UTF_8);
                System.out.println(str);
            };
            
            channel.basicConsume(DEAD_QUEUE_NAME, true, deliverCallback, consumerTag -> {
            });
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
1.3、消费者拒绝 1.3.1、生产者
public class Provider {
    // 队列名称
    private static final String COMMON_EXCHANGE_NAME = "common_exchange";
    private static final String COMMON_QUEUE_NAME = "common_queue";

    public static void main(String[] args) {
        // 创建连接工厂,配置连接相关设置
        ConnectionFactory factory = new ConnectionFactory();
        
        factory.setHost("localhost");
        
        try (
                // 根据上面设置的参数来创建连接
                Connection connection = factory.newConnection();
                // 创建通道;一个连接里面可以创建多个通道,而通道用于消息发送和接收,这样也可以减少资源浪费
                Channel channel = connection.createChannel();
        ) {
            
            channel.exchangeDeclare(COMMON_EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
            for (int i = 1; i <= 10; i++) {
                // 消息内容
                String message = "Hello World!" + i;
                
                channel.basicPublish(COMMON_EXCHANGE_NAME, COMMON_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes(StandardCharsets.UTF_8));
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
1.3.2、普通消费者
public class Consumer1 {
    // 队列名称
    private static final String COMMON_EXCHANGE_NAME = "common_exchange";
    private static final String DEAD_EXCHANGE_NAME = "dead_exchange";
    private static final String COMMON_QUEUE_NAME = "common_queue";
    private static final String DEAD_QUEUE_NAME = "dead_queue";

    public static void main(String[] args) {
        // 创建连接工厂,配置连接相关设置
        ConnectionFactory factory = new ConnectionFactory();
        
        factory.setHost("localhost");
        // 不使用try-with-resource语法的原因是“我们希望进程在消费者异步监听消息到达时始终保持活动状态”,不希望try中代码运行结束,就关闭Connection和Channel
        try {
            // 根据上面设置的参数来创建连接
            Connection connection = factory.newConnection();
            // 创建通道;一个连接里面可以创建多个通道,而通道用于消息发送和接收,这样也可以减少资源浪费
            Channel channel = connection.createChannel();
            // 创建普通交换机
            channel.exchangeDeclare(COMMON_EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
            // 创建死信交换机
            channel.exchangeDeclare(DEAD_EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
            // 为死信指明死信交换机和死信路由
            Map arguments = new HashMap<>();
            // 设置死信的目的交换机
            arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE_NAME);
            // 设置死信携带的路由
            arguments.put("x-dead-letter-routing-key", DEAD_QUEUE_NAME);
            
            channel.queueDeclare(COMMON_QUEUE_NAME, true, false, false, arguments);
            
            channel.queueDeclare(DEAD_QUEUE_NAME, true, false, false, null);
            // 绑定普通队列和普通交换机
            channel.queueBind(COMMON_QUEUE_NAME, COMMON_EXCHANGE_NAME, COMMON_QUEUE_NAME);
            // 绑定死信队列和死信交换机
            channel.queueBind(DEAD_QUEUE_NAME, DEAD_EXCHANGE_NAME, DEAD_QUEUE_NAME);
            // 消息创建中的回调对象;用来处理从队列异步推送过来的消息
            DeliverCallback deliverCallback = (consumerTag, message) -> {
                
                channel.basicReject(message.getEnvelope().getDeliveryTag(), false);
            };
            
            channel.basicConsume(COMMON_QUEUE_NAME, false, deliverCallback, consumerTag -> {
            });
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
1.3.3、死信消费者
public class Consumer2 {
    // 队列名称
    private static final String DEAD_EXCHANGE_NAME = "dead_exchange";
    private static final String DEAD_QUEUE_NAME = "dead_queue";

    public static void main(String[] args) {
        // 创建连接工厂,配置连接相关设置
        ConnectionFactory factory = new ConnectionFactory();
        
        factory.setHost("localhost");
        // 不使用try-with-resource语法的原因是“我们希望进程在消费者异步监听消息到达时始终保持活动状态”,不希望try中代码运行结束,就关闭Connection和Channel
        try {
            // 根据上面设置的参数来创建连接
            Connection connection = factory.newConnection();
            // 创建通道;一个连接里面可以创建多个通道,而通道用于消息发送和接收,这样也可以减少资源浪费
            Channel channel = connection.createChannel();
            // 创建死信交换机
            channel.exchangeDeclare(DEAD_EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
            // 创建死信队列
            channel.queueDeclare(DEAD_QUEUE_NAME, true, false, false, null);
            // 绑定死信队列和死信交换机
            channel.queueBind(DEAD_QUEUE_NAME, DEAD_EXCHANGE_NAME, DEAD_QUEUE_NAME);
            // 消息创建中的回调对象;用来处理从队列异步推送过来的消息
            DeliverCallback deliverCallback = (consumerTag, message) -> {
                String str = new String(message.getBody(), StandardCharsets.UTF_8);
                System.out.println(str);
            };
            
            channel.basicConsume(DEAD_QUEUE_NAME, true, deliverCallback, consumerTag -> {
            });
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
2、优先级队列 2.1、生产者
public class Provider {

    // 普通交换机
    public static final String COMMON_EXCHANGE_NAME = "common_exchange3";

    // 普通队列
    public static final String COMMON_QUEUE_NAME = "common_queue3";

    public static void main(String[] args) {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (
                Connection connection = factory.newConnection();
                Channel channel = connection.createChannel();
        ) {
            // 声明交换机
            channel.exchangeDeclare(COMMON_EXCHANGE_NAME, BuiltinExchangeType.DIRECT);

            // 消息发送
            for (int i = 1; i <= 10; i++) {
                String message = "测试消息" + i;
                if (i == 5) {
                    // 默认优先级都是0,设置5号消息的优先级是5
                    AMQP.BasicProperties properties = new AMQP.BasicProperties("text/plain",
                            null,
                            null,
                            2,
                            5, null, null, null,
                            null, null, null, null,
                            null, null).builder().build();
                    channel.basicPublish(COMMON_EXCHANGE_NAME, COMMON_QUEUE_NAME, properties, message.getBytes(StandardCharsets.UTF_8));
                } else {
                    channel.basicPublish(COMMON_EXCHANGE_NAME, COMMON_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes(StandardCharsets.UTF_8));
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
2.2、消费者
public class Consumer {

    // 普通交换机
    public static final String COMMON_EXCHANGE_NAME = "common_exchange3";

    // 普通队列
    public static final String COMMON_QUEUE_NAME = "common_queue3";

    public static void main(String[] args) {
        // 创建连接工厂,配置连接相关设置
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try {
            // 根据上面设置的参数来创建连接
            Connection connection = factory.newConnection();
            // 创建通道;一个连接里面可以创建多个通道,而通道用于消息发送和接收,这样也可以减少资源浪费
            Channel channel = connection.createChannel();
            // 声明交换机
            channel.exchangeDeclare(COMMON_EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
            // 创建队列
            Map arguments = new HashMap<>(1);
            arguments.put("x-max-priority", 10);
            channel.queueDeclare(COMMON_QUEUE_NAME, true, false, false, arguments);
            // 绑定普通队列和普通交换机
            channel.queueBind(COMMON_QUEUE_NAME, COMMON_EXCHANGE_NAME, COMMON_QUEUE_NAME);
            // 消息创建中的回调对象;用来处理从队列异步推送过来的消息
            DeliverCallback deliverCallback = (consumerTag, message) -> {
                String str = new String(message.getBody(), StandardCharsets.UTF_8);
                System.out.println(str);
            };
            channel.basicConsume(COMMON_QUEUE_NAME, true, deliverCallback, consumerTag -> {
            });
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
3、自定义延迟交换机 3.1、生产者
public class Provider {

    // 延时交换机
    public static final String DELAY_EXCHANGE_NAME = "delay_exchange2";

    // 普通队列
    public static final String COMMON_QUEUE_NAME = "common_queue2";

    public static void main(String[] args) {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (
                Connection connection = factory.newConnection();
                Channel channel = connection.createChannel();
        ) {
            // 创建延迟交换机;注意交换机类型是"x-delayed-message",另外该交换机本质上还是一个direct类型交换机
            Map arguments = new HashMap<>(1);
            arguments.put("x-delayed-type", "direct");
            channel.exchangeDeclare(DELAY_EXCHANGE_NAME, "x-delayed-message", true, false, arguments);

            // 10s消息;说明:10s消息应该比2s的消息更晚到达
            String message = "我是10s的消息";
            HashMap header = new HashMap<>();
            // 设置延迟10s:这个延迟操作是在交换机中进行了,10s之后会将消息发送给消息队列,当然本次使用的交换机是延迟交换机
            header.put("x-delay", 10000);
            AMQP.BasicProperties properties = new AMQP.BasicProperties("text/plain",
                    null,
                    null,
                    2,
                    0, null, null, null,
                    null, null, null, null,
                    null, null).builder().headers(header).build();
            channel.basicPublish(DELAY_EXCHANGE_NAME, COMMON_QUEUE_NAME, properties, message.getBytes(StandardCharsets.UTF_8));

            // 2s消息;说明:2s消息应该比10s的消息更早到达
            message = "我是2s的消息";
            header = new HashMap<>();
            header.put("x-delay", 2000);
            properties = new AMQP.BasicProperties("text/plain",
                    null,
                    null,
                    2,
                    0, null, null, null,
                    null, null, null, null,
                    null, null).builder().headers(header).build();
            channel.basicPublish(DELAY_EXCHANGE_NAME, COMMON_QUEUE_NAME, properties, message.getBytes(StandardCharsets.UTF_8));
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
3.2、消费者
public class Consumer {

    // 延时交换机
    public static final String DELAY_EXCHANGE_NAME = "delay_exchange2";

    // 普通队列
    public static final String COMMON_QUEUE_NAME = "common_queue2";

    public static void main(String[] args) {
        // 创建连接工厂,配置连接相关设置
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try {
            // 根据上面设置的参数来创建连接
            Connection connection = factory.newConnection();
            // 创建通道;一个连接里面可以创建多个通道,而通道用于消息发送和接收,这样也可以减少资源浪费
            Channel channel = connection.createChannel();
            // 创建延迟交换机
            Map arguments = new HashMap<>(1);
            arguments.put("x-delayed-type", "direct");
            channel.exchangeDeclare(DELAY_EXCHANGE_NAME, "x-delayed-message", true, false, arguments);
            // 创建队列
            channel.queueDeclare(COMMON_QUEUE_NAME, true, false, false, null);
            // 绑定普通队列和延迟交换机
            channel.queueBind(COMMON_QUEUE_NAME, DELAY_EXCHANGE_NAME, COMMON_QUEUE_NAME);
            // 消息创建中的回调对象;用来处理从队列异步推送过来的消息
            DeliverCallback deliverCallback = (consumerTag, message) -> {
                String str = new String(message.getBody(), StandardCharsets.UTF_8);
                System.out.println(str);
            };
            channel.basicConsume(COMMON_QUEUE_NAME, true, deliverCallback, consumerTag -> {
            });
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
4、备份交换机 4.1、生产者
public class Provider {

    // 普通交换机
    public static final String COMMON_EXCHANGE_NAME = "common_exchange5";

    // 备份交换机
    public static final String BACKUP_EXCHANGE_NAME = "backup_exchange5";

    // 普通队列
    public static final String COMMON_QUEUE_NAME = "common_queue5";

    public static void main(String[] args) {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (
                Connection connection = factory.newConnection();
                Channel channel = connection.createChannel();
        ) {
            // 声明普通交换机
            Map arguments = new HashMap<>();
            arguments.put("alternate-exchange", BACKUP_EXCHANGE_NAME);
            channel.exchangeDeclare(COMMON_EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true, false, arguments);

            // 消息发送
            String message = "测试消息1";
            channel.basicPublish(COMMON_EXCHANGE_NAME, COMMON_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes(StandardCharsets.UTF_8));

            // 消息发送(说明:我们故意将路由键写错,让交换机无法传递消息到普通队列,然后消息被被传递到了备份交换机中,然后我们的备份队列就可以收到消息了)
            message = "测试消息2";
            channel.basicPublish(COMMON_EXCHANGE_NAME, COMMON_QUEUE_NAME + "1", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes(StandardCharsets.UTF_8));

        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
4.2、普通消费者
public class Consumer1 {

    // 普通交换机
    public static final String COMMON_EXCHANGE_NAME = "common_exchange5";

    // 备份交换机
    public static final String BACKUP_EXCHANGE_NAME = "backup_exchange5";

    // 普通队列
    public static final String COMMON_QUEUE_NAME = "common_queue5";

    public static void main(String[] args) {
        // 创建连接工厂,配置连接相关设置
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try {
            // 根据上面设置的参数来创建连接
            Connection connection = factory.newConnection();
            // 创建通道;一个连接里面可以创建多个通道,而通道用于消息发送和接收,这样也可以减少资源浪费
            Channel channel = connection.createChannel();

            // 声明普通交换机
            Map arguments = new HashMap<>();
            arguments.put("alternate-exchange", BACKUP_EXCHANGE_NAME);
            channel.exchangeDeclare(COMMON_EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true, false, arguments);

            // 声明备份交换机
            channel.exchangeDeclare(BACKUP_EXCHANGE_NAME, BuiltinExchangeType.FANOUT);

            // 创建队列
            channel.queueDeclare(COMMON_QUEUE_NAME, true, false, false, null);
            // 绑定死信队列和死信交换机
            channel.queueBind(COMMON_QUEUE_NAME, COMMON_EXCHANGE_NAME, COMMON_QUEUE_NAME);
            // 消息创建中的回调对象;用来处理从队列异步推送过来的消息
            DeliverCallback deliverCallback = (consumerTag, message) -> {
                String str = new java.lang.String(message.getBody(), StandardCharsets.UTF_8);
                System.out.println("普通队列接收到的消息:" + str);
            };
            channel.basicConsume(COMMON_QUEUE_NAME, true, deliverCallback, consumerTag -> {
            });
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
4.3、备份消费者
public class Consumer2 {

    // 普通交换机
    public static final String COMMON_EXCHANGE_NAME = "common_exchange5";

    // 备份交换机
    public static final String BACKUP_EXCHANGE_NAME = "backup_exchange5";

    // 普通队列
    public static final String COMMON_QUEUE_NAME = "common_queue5";

    // 备份队列
    public static final String BACKUP_QUEUE_NAME = "backup_queue5";

    public static void main(String[] args) {
        // 创建连接工厂,配置连接相关设置
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try {
            // 根据上面设置的参数来创建连接
            Connection connection = factory.newConnection();
            // 创建通道;一个连接里面可以创建多个通道,而通道用于消息发送和接收,这样也可以减少资源浪费
            Channel channel = connection.createChannel();

            // 声明备份交换机
            channel.exchangeDeclare(BACKUP_EXCHANGE_NAME, BuiltinExchangeType.FANOUT);

            // 创建队列
            channel.queueDeclare(BACKUP_QUEUE_NAME, true, false, false, null);
            // 绑定死信队列和死信交换机
            channel.queueBind(BACKUP_QUEUE_NAME, BACKUP_EXCHANGE_NAME, "");
            // 消息创建中的回调对象;用来处理从队列异步推送过来的消息
            DeliverCallback deliverCallback = (consumerTag, message) -> {
                String str = new String(message.getBody(), StandardCharsets.UTF_8);
                System.out.println("备份队列接收到的消息:" + str);
            };
            channel.basicConsume(BACKUP_QUEUE_NAME, true, deliverCallback, consumerTag -> {
            });
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/851245.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号