栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

RabbitMQ

RabbitMQ

MQ,本质是个队列,FIFO先进先出。

三大功能:流量削峰、应用解耦、异步处理。

erlang语言编写。

1、RabbitMQ基础

(1)MQ四大核心概念

生产者

交换机(1)

队列(n)

消费者

(2)安装

rpm -ivh erlang-21.3-1.el7.x86_64.rpm
yum install socat -y
rpm -ivh rabbitmq-server-3.8.8-1.el7.noarch.rpm

(3)启动

// 开机启动
chkconfig rabbitmq-server on
// 本次启动
/sbin/service rabbitmq-server start

(5)安装后台

// 先关闭rabbitmq
/sbin/service rabbitmq-server stop
// 安装插件
rabbitmq-plugins enable rabbit_management
​
// 防火墙状态
systemctl status firewalld
// 关闭防火墙
systemctl stop firewalld

(6)添加用户

// 创建账号
rabbitmqctl add_user admin 123
// 设置用户权限
rabbitmqctl set_user_tags admin administrator
// 设置用户权限
set_permissions[-p ]    
rabbitmqctl set_permissions -p "/" admin ".*"".*"".*"
// 当前用户和角色
rabbitmqctl list_users

2、Hello world模式

消费者

public class Producer {
    // 队列名称
    public static final String QUEUE_NAME = "hello";
​
    // 发消息
    public static void main(String[] args) throws IOException, TimeoutException {
        // 创建一个连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 工厂ip,连接RabbitMQ的队列
        factory.setHost("172.18.108.173");
        // 用户名和密码
        factory.setUsername("briup");
        factory.setPassword("briup");
        // 创建连接
        Connection connection = factory.newConnection();
        // 获取信道
        Channel channel = connection.createChannel();
        
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);
        // 发消息
        String message = "hello world";
        
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
        System.out.println("消息发送成功!");
    }
}

生产者

public class Consumer {
    // 队列名称
    public static final String QUEUE_NAME = "hello";
​
​
    // 接收消息
    public static void main(String[] args) throws IOException, TimeoutException {
        // 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("172.18.108.173");
        factory.setUsername("briup");
        factory.setPassword("briup");
        Connection connection = factory.newConnection();
​
        Channel channel = connection.createChannel();
​
        // 声明接收消息
        DeliverCallback deliverCallback = (consumerTag, message) -> {
            System.out.println(new String(message.getBody()));
        };
        // 取消消息时的回调
        CancelCallback cancelCallback = consumerTag -> {
            System.out.println("消息消费被中断");
        };
​
        
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
    }
}

3、Work Queues 工作队列

(1)轮询分发消息

一个生产者,多个消费者。

工具类:

public class RabbitUtils {
​
    public static Channel getChannel() throws IOException, TimeoutException {
        // 创建一个连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 工厂ip,连接RabbitMQ的队列
        factory.setHost("172.18.108.173");
        // 用户名和密码
        factory.setUsername("briup");
        factory.setPassword("briup");
        // 创建连接
        Connection connection = factory.newConnection();
        // 获取信道
        com.rabbitmq.client.Channel channel = connection.createChannel();
        return channel;
    }
​
}

消费者:

public class Worker01 {
    // 队列名称
    public static final String QUEUE_NAME = "hello";
​
    // 接收消息
    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitUtils.getChannel();
        // 声明接收消息
        DeliverCallback deliverCallback = (consumerTag, message) -> {
            System.out.println("接收到的消息:" + new String(message.getBody()));
        };
        // 取消消息时的回调
        CancelCallback cancelCallback = consumerTag -> {
            System.out.println("消息者取消消费接口回调逻辑");
        };
        
        System.out.println("C1等待接收消息");
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
    }
}

生产者:

public class Task01 {
    // 队列名称
    public static final String QUEUE_NAME = "hello";
​
    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitUtils.getChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 从控制台中接收消息
        Scanner scanner = new Scanner(System.in);
        while (scanner.hasNext()) {
            String message = scanner.next();
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
            System.out.println("发送消息成功" + message);
        }
    }
​
}

(2)、应答机制

①、自动应答:不太可靠

②、手动应答:可以批量应答并且减少网络拥堵

1、Channel.basicAck(用于肯定确认):RabbitMQ已经知道该消息并且成功的处理消息,可以将其丢弃了
2、Channel.basicNack(用于否定确认)
3、Channel.basicReject(用于否定确认):比上面的少了一个批量处理的参数
​
// true表示批量,false表示不批量(推荐)

消息自动重新入队

mq发现某一个消费者和某一条消息未收到ack确认,就会重新入队,让其他消费者消费。

手动应答

生产者:

public class Task02 {
​
    // 队列名称
    public static final String TASK_QUEUE_NAME = "ack_queue";
​
    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitUtils.getChannel();
        channel.queueDeclare(TASK_QUEUE_NAME, false, false, false, null);
        Scanner scanner = new Scanner(System.in);
        while (scanner.hasNext()) {
            String message = scanner.next();
            channel.basicPublish("", TASK_QUEUE_NAME, null, message.getBytes("utf-8"));
            System.out.println("生产者发送消息" + message);
        }
    }
}

工具类:

public class SleepUtils {
    public static void sleep(int second) {
        try {
            Thread.sleep(1000*second);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

消费者:

public class Work03 {
    public static final String TASK_QUEUE_NAME = "ack_queue";
​
    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitUtils.getChannel();
        System.out.println("C1等待接收消息时间较短");
        // 声明接收消息
        DeliverCallback deliverCallback = (consumerTag, message) -> {
            // 沉睡1s
            SleepUtils.sleep(1);
            System.out.println("接收到的消息:" + new String(message.getBody()));
            
            channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
        };
        // 取消消息时的回调
        CancelCallback cancelCallback = consumerTag -> {
            System.out.println("消息者取消消费接口回调逻辑");
        };
        // 采用手动应答
        boolean autoAck = false;
        channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, cancelCallback);
    }
}
public class Work04 {
    public static final String TASK_QUEUE_NAME = "ack_queue";
​
    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitUtils.getChannel();
        System.out.println("C2等待接收消息时间较短");
        // 声明接收消息
        DeliverCallback deliverCallback = (consumerTag, message) -> {
            // 沉睡1s
            SleepUtils.sleep(30);
            System.out.println("接收到的消息:" + new String(message.getBody()));
            
            channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
        };
        // 取消消息时的回调
        CancelCallback cancelCallback = consumerTag -> {
            System.out.println("消息者取消消费接口回调逻辑");
        };
        // 采用手动应答
        boolean autoAck = false;
        channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, cancelCallback);
    }
}

(3)RabbitMQ持久化

在生产者设置

队列的持久化

消息的持久化

public class Task02 {
​
    // 队列名称
    public static final String TASK_QUEUE_NAME = "ack_queue";
​
    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitUtils.getChannel();
        // 需要持久化
        boolean durable = true;
        channel.queueDeclare(TASK_QUEUE_NAME, durable, false, false, null);
        Scanner scanner = new Scanner(System.in);
        while (scanner.hasNext()) {
            String message = scanner.next();
            // 要求消息保存到磁盘上
            channel.basicPublish("", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("utf-8"));
            System.out.println("生产者发送消息" + message);
        }
    }
}

(4)不公平分发

消费者设置

// 设置参数,不设置就默认是轮询分发
Channel.basicQos(1);
public class Work03 {
    public static final String TASK_QUEUE_NAME = "ack_queue";
​
    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitUtils.getChannel();
        System.out.println("C1等待接收消息时间较短");
        // 声明接收消息
        DeliverCallback deliverCallback = (consumerTag, message) -> {
            // 沉睡1s
            SleepUtils.sleep(1);
            System.out.println("接收到的消息:" + new String(message.getBody()));
            
            channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
        };
        // 设置不公平分发
        int prefetch = 1;
        channel.basicQos(prefetch);
        // 取消消息时的回调
        CancelCallback cancelCallback = consumerTag -> {
            System.out.println("消息者取消消费接口回调逻辑");
        };
        // 采用手动应答
        boolean autoAck = false;
        channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, cancelCallback);
    }
}

(5)预取值

不公平分发的各个消费者的分发数量。

public class Work03 {
    public static final String TASK_QUEUE_NAME = "ack_queue";
​
    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitUtils.getChannel();
        System.out.println("C1等待接收消息时间较短");
        // 声明接收消息
        DeliverCallback deliverCallback = (consumerTag, message) -> {
            // 沉睡1s
            SleepUtils.sleep(1);
            System.out.println("接收到的消息:" + new String(message.getBody()));
            
            channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
        };
        // 设置不公平分发
        //int prefetch = 1;
        // 预取值
        int prefetch = 2;
        channel.basicQos(prefetch);
        // 取消消息时的回调
        CancelCallback cancelCallback = consumerTag -> {
            System.out.println("消息者取消消费接口回调逻辑");
        };
        // 采用手动应答
        boolean autoAck = false;
        channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, cancelCallback);
    }
}

4、发布确认

设置要求队列必须持久化

设置要求队列中的消息必须持久化

发布确认:mq告诉生产者消息已经保存到磁盘

(1)开启发布确认

Channel channel = RabbitUtils.getChannel();
// 开启发布确认
channel.confirmSelect();

单个确认发布:同步确认发布,发布速度特别慢

批量确认发布:发布出现故障,不知道是哪个出现了问题

异步确认发布:性价比高,可靠性强(回调函数来确认),是否发送成功均有应答

public class ConfirmMessage {
​
    // 批量发消息的个数
    public static final int MESSAGE_COUNT = 1000;
​
    public static void main(String[] args) throws Exception{
        // 1、单个
        /confirm/iMessage.publishMessageIndividually(); //722ms
        // 2、批量
        /confirm/iMessage.publishMessageBatch(); //147ms
        // 3、异步
        /confirm/iMessage.publishMessageAsync(); //62ms
    }
​
    // 1、单个确认
    public static void publishMessageIndividually() throws Exception{
        Channel channel = RabbitUtils.getChannel();
        // 声明队列
        String queuqName = UUID.randomUUID().toString();
        channel.queueDeclare(queuqName, true, false, false, null);
        // 开启发布确认
        channel.confirmSelect();
        // 开始时间
        long begin = System.currentTimeMillis();
        // 批量发消息
        for (int i = 0; i < MESSAGE_COUNT; i++) {
            String message = i + "";
            channel.basicPublish("", queuqName, null, message.getBytes());
            boolean flag = channel.waitForConfirms();
            // 单个消息马上进行发布确认
            if(flag) {
                System.out.println("消息发送成功");
            }
        }
        long end = System.currentTimeMillis();
        System.out.println("发布" + MESSAGE_COUNT + "单个确认,耗时" + (end-begin) + "ms");
    }
​
    // 2、批量确认
    public static void publishMessageBatch() throws Exception{
        Channel channel = RabbitUtils.getChannel();
        // 声明队列
        String queuqName = UUID.randomUUID().toString();
        channel.queueDeclare(queuqName, true, false, false, null);
        // 开启发布确认
        channel.confirmSelect();
        // 开始时间
        long begin = System.currentTimeMillis();
        // 批量确认消息大小
        int batchSize = 100;
        // 批量发消息
        for (int i = 0; i < MESSAGE_COUNT; i++) {
            String message = i + "";
            channel.basicPublish("", queuqName, null, message.getBytes());
            if (i%batchSize == 0) {
                // 批量消息发布确认
                channel.waitForConfirms();
            }
        }
        long end = System.currentTimeMillis();
        System.out.println("发布" + MESSAGE_COUNT + "批量确认,耗时" + (end-begin) + "ms");
    }
​
    // 3、异步确认
    public static void publishMessageAsync() throws Exception{
        Channel channel = RabbitUtils.getChannel();
        // 声明队列
        String queuqName = UUID.randomUUID().toString();
        channel.queueDeclare(queuqName, true, false, false, null);
        // 开启发布确认
        channel.confirmSelect();
        // 开始时间
        long begin = System.currentTimeMillis();
        // 准备消息的监听器,监听那些消息成功了,哪些失败了
        
        ConfirmCallback ackCallback = (deliveryTag, multiple) -> {
            System.out.println("确认的消息:" + deliveryTag);
        };
        ConfirmCallback nackCallback = (deliveryTag, multiple) -> {
            System.out.println("未确认的消息:" + deliveryTag);
        };
        channel.addConfirmListener(ackCallback, nackCallback);
        // 批量发消息
        for (int i = 0; i < MESSAGE_COUNT; i++) {
            String message = i + "";
            channel.basicPublish("", queuqName, null, message.getBytes());
            // 异步发布确认
        }
        long end = System.currentTimeMillis();
        System.out.println("发布" + MESSAGE_COUNT + "异步确认,耗时" + (end-begin) + "ms");
    }
​
}

如何处理异步未处理消息:把未确认的消息放到一个基于内存的能被发布线程访问的队列,比如说用ConcurrentlinkQueue这个队列在confirm callbacks与发布线程之间进行消息的传递。

1、在发送部分记录下所有要发送的消息

2、在消息确认成功部分删除已经发送的消息

3、在消息确认失败部分打印未确认的消息

// 3、异步确认
    public static void publishMessageAsync() throws Exception{
        Channel channel = RabbitUtils.getChannel();
        // 声明队列
        String queuqName = UUID.randomUUID().toString();
        channel.queueDeclare(queuqName, true, false, false, null);
        // 开启发布确认
        channel.confirmSelect();
        // 开始时间
        long begin = System.currentTimeMillis();
​
        
        ConcurrentSkipListMap outstandingConfirms = new ConcurrentSkipListMap<>();
​
​
        // 准备消息的监听器,监听那些消息成功了,哪些失败了
        
        ConfirmCallback ackCallback = (deliveryTag, multiple) -> {
            if (multiple) {
                // ***********2、在消息确认成功部分删除已经发送的消息***********
                ConcurrentNavigableMap confirmed =
                        outstanding/confirm/is.headMap(deliveryTag);
            } else {
                outstanding/confirm/is.remove(deliveryTag);
            }
​
            System.out.println("确认的消息:" + deliveryTag);
        };
        ConfirmCallback nackCallback = (deliveryTag, multiple) -> {
            // ***********3、在消息确认失败部分打印未确认的消息***********
            String message = outstanding/confirm/is.get(deliveryTag);
            System.out.println("未确认的消息" + message + "未确认的消息的标记:" + deliveryTag);
        };
        channel.addConfirmListener(ackCallback, nackCallback);
        // 批量发消息
        for (int i = 0; i < MESSAGE_COUNT; i++) {
            String message = i + "";
            channel.basicPublish("", queuqName, null, message.getBytes());
            // 异步发布确认
            // ***********1、在发送部分记录下所有要发送的消息***********
            outstanding/confirm/is.put(channel.getNextPublishSeqNo(), message);
        }
        long end = System.currentTimeMillis();
        System.out.println("发布" + MESSAGE_COUNT + "异步确认,耗时" + (end-begin) + "ms");
    }

(2)三种发布确认比较

单个:简单,但吞吐量非常有限;

批量:简单,合理的吞吐量,一旦出现问题就很难推断出是那条消息出现了问题;

异步:最佳性能和资源使用,出现问题能很好控制,实现较难。

5、交换机

RabbitMQ消息传递模型的核心思想是:生产者生产的消息从不会直接发送到队列。实际上生产者只能将消息发送到交换机。

交换机:①接收来自生产者的消息;②将消息推入队列(routingKey / bindingKey)。

(1)交换机类型

直接/路由(direct)

主题(topic)

标题(headers)

扇出(fanout)

无名(默认)

(2)临时队列

features是否为D:一旦断开消费者的连接,队列将被自动删除。

// 创建方式
String queueName = channel.queueDeclare().getQueue();

(3)绑定(binding)

交换和队列直接的绑定。

(4)扇出

两个RoutingKey相同。

将接收到的所有消息广播到它知道的所有队列中。

消费者

public class ReceiveLog01 {
​
    // 交换机的名称
    private static final String EXCHANGE_NAME = "logs";
​
    public static void main(String[] args) throws Exception{
        Channel channel = RabbitUtils.getChannel();
        // 声明交换机
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
        // 声明临时队列
        String queue = channel.queueDeclare().getQueue();
        // 绑定交换机和队列
        channel.queueBind(queue, EXCHANGE_NAME, "");
        System.out.println("等待接收消息。。。。。。");
        // 接收消息
        DeliverCallback deliverCallback = (customerTag, message) -> {
            System.out.println("ReceiveLog01打印接收的消息:" + message);
        };
        channel.basicConsume(queue, true, deliverCallback, consumerTage -> {});
    }
}

生产者:

public class EmitLog {
    // 交换机的名称
    private static final String EXCHANGE_NAME = "logs";
​
    public static void main(String[] args) throws Exception{
        Channel channel = RabbitUtils.getChannel();
        // 声明交换机
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
        Scanner scanner = new Scanner(System.in);
        while(scanner.hasNext()){
            String message = scanner.next();
            channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("utf-8"));
            System.out.println("生产者发出消息:" + message);
        }
    }
}

(5)直接/路由

RoutingKey不同

消费者

public class ReceiveLogsDirect01 {
    private static final String EXCHANGE_NAME = "direct_logs";
    public static void main(String[] args) throws Exception{
        Channel channel = RabbitUtils.getChannel();
        // 声明交换机
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
        // 声明临时队列
        channel.queueDeclare("console", false,false,false,null);
        // 绑定交换机和队列
        channel.queueBind("console", EXCHANGE_NAME, "info");
        channel.queueBind("console", EXCHANGE_NAME, "warning");
        System.out.println("等待接收消息。。。。。。");
        // 接收消息
        DeliverCallback deliverCallback = (customerTag, message) -> {
            System.out.println("ReceiveLogsDirect01打印接收的消息:" + message);
        };
        channel.basicConsume("console", true, deliverCallback, consumerTage -> {});
    }
}

生产者

public class DirectLogs {
    // 交换机的名称
    private static final String EXCHANGE_NAME = "direct_logs";
​
    public static void main(String[] args) throws Exception{
        Channel channel = RabbitUtils.getChannel();
        Scanner scanner = new Scanner(System.in);
        while(scanner.hasNext()){
            String message = scanner.next();
            channel.basicPublish(EXCHANGE_NAME, "info", null, message.getBytes("utf-8"));
            System.out.println("生产者发出消息:" + message);
        }
    }
}

(6)topic

routing_key:单词列表,以点号隔开

*(星号)可以代替一个单词

#(井号)可以代替0个或者多个

注意:

当一个队列绑定键是#,那么这个队列接收所有数据 ,(fanout);

如果队列绑定键当中,没有#和*出现,那么该队列绑定类型就是direct。

消费者:

public class ReceiveLogsTopic01 {
    private static final String EXCHANGE_NAME = "topic_logs";
    public static void main(String[] args) throws Exception{
        Channel channel = RabbitUtils.getChannel();
        // 声明交换机
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
        // 声明临时队列
        String queue = "Q1";
        channel.queueDeclare(queue, false,false,false,null);
        // 绑定交换机和队列
        channel.queueBind(queue, EXCHANGE_NAME, "*.orange.*");
        System.out.println("等待接收消息。。。。。。");
        // 接收消息
        DeliverCallback deliverCallback = (customerTag, message) -> {
            System.out.println("ReceiveLogsTopic01打印接收的消息:" + message.getBody());
            System.out.println("接收队列" + queue + "绑定键" + message.getEnvelope().getRoutingKey());
        };
        channel.basicConsume(queue, true, deliverCallback, consumerTage -> {});
    }
}

生产者:

public class EmitLogTopic {
    private static final String EXCHANGE_NAME = "topic_logs";
​
    public static void main(String[] args) throws Exception{
        Channel channel = RabbitUtils.getChannel();
        Map bindingKeyMap = new HashMap<>();
        bindingKeyMap.put("quick.orange.rabbit", "Q1和Q2接收到");
        bindingKeyMap.put("lazy.orange.elephant", "Q1和Q2接收到");
        bindingKeyMap.put("quick.orange.fox", "Q1接收到");
        bindingKeyMap.put("lazy.brown.fox", "Q2接收收到");
        bindingKeyMap.put("lazy.pink.rabbit", "Q2接收到一次");
        bindingKeyMap.put("quick.brown.fox", "丢弃");
        bindingKeyMap.put("quick.orange.male.rabbit", "丢弃");
        bindingKeyMap.put("lazy.orange.male.rabbit", "Q2接收到");
        for (Map.Entry bindingKeyEntry : bindingKeyMap.entrySet()) {
            String routingKey = bindingKeyEntry.getKey();
            String message = bindingKeyEntry.getValue();
            channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("utf-8"));
            System.out.println("生产者发送消息:" + message);
        }
    }
}

6、死信队列

(1)死信概念

无法被消费的消息。

(2)死信来源

消息TTL过期

队列达到最大长度(队列满了,无法再添加数据到mq中)

消息被拒绝(basic.reject或basic.nack)并且requence=false

(3)代码体现

消息TTL过期

消费者1:

public class Consumer01 {
    // 普通交换机
    public static final String NORMAL_EXCHANGE = "normal_exchange";
    // 死信交换机
    public static final String DEAD_EXCHANGE = "dead_exchange";
    // 普通队列
    public static final String NORMAL_QUEUE = "normal_queue";
    // 死信队列
    public static final String DEAD_QUEUE = "dead_queue";
​
    public static void main(String[] args) throws Exception{
        Channel channel = RabbitUtils.getChannel();
        // 声明交换机
        channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
        channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
        // 普通队列
        Map arguments = new HashMap<>();
        // 过期时间
//        arguments.put("x-message-ttl", 10000);
        // 正常队列设置死信队列的交换机
        arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE);
        // 设置死信routingKey
        arguments.put("x-dead-letter-routing-key", "lisi");
        channel.queueDeclare(NORMAL_QUEUE, false, false, false, arguments);
        
        // 死信队列
        channel.queueDeclare(DEAD_QUEUE, false, false, false, null);
        // 绑定交换机和队列
        channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "zhangsan");
        channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "lisi");
        DeliverCallback deliverCallback = (consumerTag, message) -> {
            System.out.println("Consumer01接收到:" + new String(message.getBody(), "utf-8"));
        };
        channel.basicConsume(NORMAL_QUEUE, true, deliverCallback, sonsumerTag -> {});
    }
}

生产者:

public class Producer {
    // 普通交换机
    public static final String NORMAL_EXCHANGE = "normal_exchange";
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitUtils.getChannel();
        // 死信消息,设置ttl时间, time to live
        AMQP.BasicProperties properties = new AMQP.BasicProperties()
                .builder().expiration("10000").build();
        for (int i = 1; i < 11; i++) {
            String message = "info" + i;
            channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", properties, message.getBytes());
        }
    }
}

消费者2:

public class Consumer02 {
    // 死信队列
    public static final String DEAD_QUEUE = "dead_queue";
​
    public static void main(String[] args) throws Exception{
        Channel channel = RabbitUtils.getChannel();
        DeliverCallback deliverCallback = (consumerTag, message) -> {
            System.out.println("Consumer02接收到:" + new String(message.getBody(), "utf-8"));
        };
        channel.basicConsume(DEAD_QUEUE, true, deliverCallback, consumerTag -> {});
    }
}

队列达到最大长度

生产者:

public class Producer {
    // 普通交换机
    public static final String NORMAL_EXCHANGE = "normal_exchange";
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitUtils.getChannel();
        // 死信消息,设置ttl时间, time to live
//        AMQP.BasicProperties properties = new AMQP.BasicProperties()
//                .builder().expiration("10000").build();
        for (int i = 1; i < 11; i++) {
            String message = "info" + i;
            channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", null, message.getBytes());
        }
    }
}

消费者1:

public class Consumer01 {
    // 普通交换机
    public static final String NORMAL_EXCHANGE = "normal_exchange";
    // 死信交换机
    public static final String DEAD_EXCHANGE = "dead_exchange";
    // 普通队列
    public static final String NORMAL_QUEUE = "normal_queue";
    // 死信队列
    public static final String DEAD_QUEUE = "dead_queue";
​
    public static void main(String[] args) throws Exception{
        Channel channel = RabbitUtils.getChannel();
        // 声明交换机
        channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
        channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
        // 普通队列
        Map arguments = new HashMap<>();
        // 过期时间
//        arguments.put("x-message-ttl", 10000);
        // 正常队列设置死信队列的交换机
        arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE);
        // 设置死信routingKey
        arguments.put("x-dead-letter-routing-key", "lisi");
        // 设置正常队列的长度限制
        arguments.put("x-max-length", 6);
        channel.queueDeclare(NORMAL_QUEUE, false, false, false, arguments);
        
        // 死信队列
        channel.queueDeclare(DEAD_QUEUE, false, false, false, null);
        // 绑定交换机和队列
        channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "zhangsan");
        channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "lisi");
        DeliverCallback deliverCallback = (consumerTag, message) -> {
            System.out.println("Consumer01接收到:" + new String(message.getBody(), "utf-8"));
        };
        channel.basicConsume(NORMAL_QUEUE, true, deliverCallback, consumerTag -> {});
    }
}

消息被拒

消费者1:

public class Consumer01 {
    // 普通交换机
    public static final String NORMAL_EXCHANGE = "normal_exchange";
    // 死信交换机
    public static final String DEAD_EXCHANGE = "dead_exchange";
    // 普通队列
    public static final String NORMAL_QUEUE = "normal_queue";
    // 死信队列
    public static final String DEAD_QUEUE = "dead_queue";
​
    public static void main(String[] args) throws Exception{
        Channel channel = RabbitUtils.getChannel();
        // 声明交换机
        channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
        channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
        // 普通队列
        Map arguments = new HashMap<>();
        // 过期时间
//        arguments.put("x-message-ttl", 10000);
        // 正常队列设置死信队列的交换机
        arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE);
        // 设置死信routingKey
        arguments.put("x-dead-letter-routing-key", "lisi");
        // 设置正常队列的长度限制
//        arguments.put("x-max-length", 6);
        channel.queueDeclare(NORMAL_QUEUE, false, false, false, arguments);
        
        // 死信队列
        channel.queueDeclare(DEAD_QUEUE, false, false, false, null);
        // 绑定交换机和队列
        channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "zhangsan");
        channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "lisi");
        DeliverCallback deliverCallback = (consumerTag, message) -> {
            String msg = new String(message.getBody(), "utf-8");
            if (msg.equals("info5")) {
                System.out.println(msg + "被拒收");
                channel.basicReject(message.getEnvelope().getDeliveryTag(), false);
            } else {
                System.out.println("Consumer01接收到:" + message);
                channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
            }
        };
        channel.basicConsume(NORMAL_QUEUE, false, deliverCallback, consumerTag -> {});
    }
}

7、延迟队列

(1)整合SpringBoot

配置类代码

@Configuration
public class TtlQueueConfig {
    // 普通交换机
    private static final String X_EXCHANGE = "X";
    // 死信交换机
    private static final String Y_DEAD_EXCHANGE = "Y";
    // 普通队列
    private static final String QUEUE_A = "QA";
    private static final String QUEUE_B = "QB";
    // 死信队列
    private static final String DEAD_QUEUE_D = "QD";
​
    // 声明普通交换机
    @Bean("xExchange")
    public DirectExchange xExchange() {
        return new DirectExchange(X_EXCHANGE);
    }
​
    // 声明死信交换机
    @Bean("yExchange")
    public DirectExchange yExchange() {
        return new DirectExchange(Y_DEAD_EXCHANGE);
    }
​
    // 声明普通队列,过期时间10秒
    @Bean("queueA")
    public Queue queueA() {
        Map arguments = new HashMap<>();
        // 设置死信交换机
        arguments.put("x-dead-letter-exchange", Y_DEAD_EXCHANGE);
        // 设置死信routingKey
        arguments.put("x-dead-letter-routing-key", "YD");
        // 设置ttl
        arguments.put("x-message-ttl", 10000);
        return QueueBuilder.durable(QUEUE_A).withArguments(arguments).build();
    }
    @Bean("queueB")
    public Queue queueB() {
        Map arguments = new HashMap<>();
        // 设置死信交换机
        arguments.put("x-dead-letter-exchange", Y_DEAD_EXCHANGE);
        // 设置死信routingKey
        arguments.put("x-dead-letter-routing-key", "YD");
        // 设置ttl
        arguments.put("x-message-ttl", 40000);
        return QueueBuilder.durable(QUEUE_B).withArguments(arguments).build();
    }
​
    // 声明死信队列
    @Bean("deadQueueD")
    public Queue deadQueueD() {
        return QueueBuilder.durable(DEAD_QUEUE_D).build();
    }
​
    // 绑定rountingKey
    @Bean
    public Binding queueABindingX(@Qualifier("queueA") Queue queueA,
                                  @Qualifier("xExchange") DirectExchange xExchange) {
        return BindingBuilder.bind(queueA).to(xExchange).with("XA");
    }
    @Bean
    public Binding queueBBindingX(@Qualifier("queueB") Queue queueB,
                                  @Qualifier("xExchange") DirectExchange xExchange) {
        return BindingBuilder.bind(queueB).to(xExchange).with("XB");
    }
    @Bean
    public Binding queueDBindingY(@Qualifier("deadQueueD") Queue queueD,
                                  @Qualifier("yExchange") DirectExchange yExchange) {
        return BindingBuilder.bind(queueD).to(yExchange).with("YD");
    }
}

生产者

@Slf4j
@RestController
@RequestMapping("/ttl")
public class SendMsgController {
​
    @Autowired
    private RabbitTemplate rabbitTemplate;
    // 开始发消息
    @GetMapping("/sendMsg/{message}")
    public void sendMsg(@PathVariable String message) {
        log.info("当前时间:{},发送一条消息给两个TTL队列:{}", new Date().toString(), message);
        rabbitTemplate.convertAndSend("X", "XA", "来自ttl为10s的消息:"+message);
        rabbitTemplate.convertAndSend("X", "XB", "来自ttl为40s的消息:"+message);
​
    }
}

消费者

@Component
@Slf4j
public class DeadLetterQueueConsumer {
    // 接收消息
    @RabbitListener(queues = "QD")
    public void receiveD(Message message, Channel channel) throws Exception{
        String msg = new String(message.getBody());
        log.info("当前时间:{}, 收到的死信队列的消息:{}", new Date().toString(), msg);
    }
}

(2)延迟队列优化

死信队列,再添加一个队列不设置延迟时间。

    private static final String QUEUE_C = "QC";
​
    
    @Bean("queueC")
    public Queue queueC() {
        Map arguments = new HashMap<>(3);
        // 设置死信交换机
        arguments.put("x-dead-letter-exchange", Y_DEAD_EXCHANGE);
        // 设置死信routingKey
        arguments.put("x-dead-letter-routing-key", "YD");
        return QueueBuilder.durable(QUEUE_C).withArguments(arguments).build();
    }
​
    
    @Bean
    public Binding queueCBindingX(@Qualifier("queueC") Queue queueC,
                                  @Qualifier("xExchange") DirectExchange xExchange) {
        return BindingBuilder.bind(queueC).to(xExchange).with("XC");
    }

生产者:

// 开始发消息, TTL
    @GetMapping("sendExpirationMsg/{message}/{ttlTime}")
    public void sendMsg(@PathVariable String message, @PathVariable String ttlTime) {
        log.info("当前时间:{},发送一条时长{}ms的ttl消息给队列QC:{}", new Date().toString(), ttlTime, message);
        rabbitTemplate.convertAndSend("X", "XC", message, msg -> {
            msg.getMessageProperties().setExpiration(ttlTime);
            return msg;
        });
    }

在消息属性上设置ttl:RabbitMQ只会检查第一个消息是否过期,如果过期则丢到死信队列,如果第一个消息的延时时长很长很长,第二个消息的延时时长很短,第二个消息并不会优先得到执行。

(3)RabbitMQ插件实现延迟队列

配置:

public class DelayQueueConfig {
    // 交换机
    public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";
    // 队列
    public static final String DELAYED_QUEUE_NAME = "delayed.queue";
    // routingKey
    public static final String DELAYED_ROUTING_KEY = "delayed.routingkey";
​
    // 声明交换机
    @Bean("delayExchange")
    public CustomExchange delayExchange() {
        Map arguments = new HashMap<>();
        arguments.put("x-delayed-type", "direct");
        
        return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false, arguments);
    }
    // 声明队列
    @Bean("delayedQueue")
    public Queue delayedQueue() {
        return new Queue(DELAYED_QUEUE_NAME);
    }
​
    // 绑定
    @Bean
    public Binding delayedQueueBindingDelayedExchange(
            @Qualifier("delayedQueue") Queue delayedQueue,
            @Qualifier("delayExchange") CustomExchange delayExchange
    ) {
        return BindingBuilder.bind(delayedQueue).to(delayExchange).with(DELAYED_ROUTING_KEY).noargs();
    }
}

生产者:

// 开始发消息, TTL
    @GetMapping("sendExpirationMsg/{message}/{delayTime}")
    public void sendMsg(@PathVariable String message, @PathVariable Integer delayTime) {
        log.info("当前时间:{},发送一条时长{}ms的消息给延迟队列delayed.queue:{}", new Date().toString(), delayTime, message);
        rabbitTemplate.convertAndSend(DelayQueueConfig.DELAYED_EXCHANGE_NAME, DelayQueueConfig.DELAYED_ROUTING_KEY, message, msg -> {
            msg.getMessageProperties().setDelay(delayTime);
            return msg;
        });
    }

消费者:

@Component
@Slf4j
public class DelayQueueConsumer {
    // 监听消息
    @RabbitListener(queues = DelayQueueConfig.DELAYED_QUEUE_NAME)
    public void receiveDelayQueue(Message message) throws Exception{
        String msg = new String(message.getBody());
        log.info("当前时间:{}, 收到的死信队列的消息:{}", new Date().toString(), msg);
    }
}

8、发布确认高级

配置:

public class ConfirmConfig {
    // 交换机
    public static final String /confirm/i_EXCHANGE_NAME = "/confirm/i_exchange";
    // 队列
    public static final String /confirm/i_QUEUE_NAME = "/confirm/i_queue";
    // routingKey
    public static final String /confirm/i_ROUTING_KEY = "/confirm/i_routing_key";
​
    // 声明交换机
    @Bean("/confirm/iExchange")
    public DirectExchange confirmExchange() {
        return new DirectExchange(/confirm/i_EXCHANGE_NAME);
    }
​
    // 声明队列
    @Bean("/confirm/iQueue")
    public Queue confirmQueue() {
        return QueueBuilder.durable(/confirm/i_QUEUE_NAME).build();
    }
​
    // 绑定
    @Bean
    public Binding queueBindingExchange(@Qualifier("/confirm/iQueue") Queue queue,
                                        @Qualifier("/confirm/iExchange") DirectExchange directExchange) {
        return BindingBuilder.bind(queue).to(directExchange).with(/confirm/i_ROUTING_KEY);
    }
}

生产者:

@Slf4j
@RestController
@RequestMapping("//confirm/i")
public class ProducerController {
​
    @Autowired
    private RabbitTemplate rabbitTemplate;
    // 发消息
    @GetMapping("/sendMessage/{message}")
    public void sendMessage(@PathVariable String message) {
        rabbitTemplate.convertAndSend(/confirm/iConfig./confirm/i_EXCHANGE_NAME, /confirm/iConfig./confirm/i_ROUTING_KEY, message);
        log.info("发送消息内容:{}", message);
    }
}

消费者:

@Component
@Slf4j
public class Consumer {
    @RabbitListener(queues = /confirm/iConfig./confirm/i_QUEUE_NAME)
    public void receiveConfirmMessage(Message message) {
        String msg = new String(message.getBody());
        log.info("接收到的队列/confirm/i.queue消息:{}", msg);
    }
}

(1)回调接口

注意:必须通过@PostConstruct注解注入接口

(2)交换机确认

配置文件需要添加:开启生产者确认机制

spring.rabbitmq.publisher-/confirm/i-type=correlated

RabbitTemplate./confirm/iCallback

交换机确认回调方法,confirm()

1、发消息,交换机接收到了,回调

correlationData,保存回调消息的ID及相关信息

交换机收到消息,ack=true

cause null

2、发消息,交换机接收失败了,回调

correlationData,保存回调消息的ID及相关信息

交换机收到消息,ack=false

cause,失败的原因

(3)回退消息

在仅开启了生产者确认机制的情况下,交换机接收到消息后,会直接给消息生产者发送确认消息,如果发现该消息不可路由,那么消息会被直接丢弃,此时生产者是不知道消息被丢弃这个事件的。

配置文件需要添加:开启回退消息给生产者

spring.rabbitmq.publisher-returns=true

接口:RabbitTemplate.ReturnsCallback

方法:returnedMessage(),在消息在传递过程中不可达目的地时将消息返回给生产者。

(4)备份交换机

添加备份交换机、备份队列、报警队列。

9、RabbitMQ其他知识点

(1)幂等性

消息被重复消费

设置全局id(UUID、时间戳)。

唯一ID+指纹吗机制:时间戳——唯一信息码;

redis的原子性,setnx命令(推荐)。

(2)优先级队列

订单催付:订单优先级(0-255,数字越大越优先执行)

①、队列中添加优先级

②、消息中添加优先级

③、消费者需要等待消息先发送到队列才去消费

(3)惰性队列

消息保存在内存中还是在磁盘中。

正常情况下,消息保存在内存中,消费速度快;

惰性队列中:消息保存在磁盘中,消费速度慢,适用于消费者下线,宕机时。

两种模式:default和lazy。

惰性队列消耗内存小。

10、RabbitMQ集群

(1)备份镜像

添加策略可以备份镜像,防止数据丢失。

参数:指定模式。

就算整个集群只剩下一台机器了,依然能消费队列里面的消息,说明队列里面的消息被镜像队列传递到相应的机器里面去了。

(2)Haproxy+Keepalive实现高可用

Haproxy+Keepalive实现高可用:建立备机,当主机宕机后,地址就会漂移到备机上,备机也会问主机的在线状态。

高可用:主机宕机备机可以接管主机的工作。

(3)Federation Exchange(联合交换机)

安装federation插件:rabbitmq-plugins enable rabbitmq_federation_management

FederationExchange原理

以交换机为节点,先准备上下游两个交换机节点,在上游交换机配置连接下游交换机。

FederationExchange实现

准备交换机和队列,设置上游,设置规则,添加上游策略。

FederationQueue实现

联邦队列可以将不同地区的数据进行同步。

(4)Shovel

Shovel:数据备份,将源端的数据转发给目的端,负责连接源和目的地,负责消息的读写和负责连接失败的处理。

安装shovel插件:rabbitmq-plugins enable rabbitmq_shovel

rabbitmq-plugins enable rabbitmq_shovel_managment

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/760913.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号