栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

RabbitMQ

RabbitMQ

RabbitMQ
  • MQ基本概念
    • MQ简介
    • MQ的优势和劣势
    • 使用MQ要满足的条件
  • RabbitMQ
    • RabbitMQ 简介
    • rabbitmq六种工作模式
    • RabbitMQ 的工作模式总结
  • RabbitMQ高级特性
    • 消息的可靠投递
    • 消息的确认
    • 消息可靠性总结
    • 消费端限流
    • 消息的TTL
    • 死信队列
    • 延迟队列
    • 消息追踪-rabbitmq_tracing
  • RabbitMQ的应用问题
  • RabbitMQ集群的搭建

MQ基本概念 MQ简介

MQ全称 Message Queue(消息队列),是在消息的传输过程中保存消息的容器。多用于分布式系统之间进行通信。

小结

  • MQ,消息队列,存储消息的中间件。
  • 分布式系统通信两种方式:直接远程调用和借助第三方 完成-间接通信。
  • 发送方称为生产者,接收方称为消费者。
MQ的优势和劣势

MQ 的优势
应用解耦:系统的耦合性越高,容错性就越低,可维护性就越低。使用 MQ 使得应用间解耦,提升容错性和可维护性。

异步提速:提升用户体验和系统吞吐量(单位时间内处理请求的数目)。
对于一些耗时的,可以异步的服务,使用MQ可以实现服务间异步消息传输。
削峰填谷:并发量大时所有请求直接查询数据库压力大导致宕机。削峰使用缓冲队列形式,减少高峰时期对服务器的压力。
使用了 MQ 之后,限制消费消息的速度为1000,这样一来,高峰期产生的数据势必会被积压在 MQ 中,高峰就被“削”掉了,但是因为消息积压,在高峰期过后的一段时间内,消费消息的速度还是会维持在1000,直到消费完积压的消息,这就叫做“填谷”。
使用MQ后,可以提高系统稳定性。
MQ 的劣势
系统可用性降低:系统引入的外部依赖越多,系统稳定性越差。一旦 MQ 宕机,就会对业务造成影响。
系统复杂度提高:MQ 的加入大大增加了系统的复杂度,以前系统间是同步的远程调用,现在是通过 MQ 进行异步调用。
一致性问题:A 系统处理完业务,通过 MQ 给B、 C、 D三个系统发消息数据,如果 B 系统、 C 系统处理成功, D 系统处理失败。

使用MQ要满足的条件
  • 生产者不需要从消费者处获得反馈。引入消息队列之前的直接调用,其接口的返回值应该为空,这才让明明下层的动作还没做,上层却当成动作做完了继续往后走,即所谓异步成为了可能。
  • 容许短暂的不一致性。
  • 确实是用了有效果。即解耦、提速、削峰这些方面的收益,超过加入MQ,管理MQ这些成本。
RabbitMQ

常见的MQ产品,由于RabbitMQ 综合能力强劲,所以选择学习 RabbitMQ。

RabbitMQ 简介

2007年, Rabbit 技术公司基于 AMQP高级消息队列协议(Advanced Message Queuing Protocol) 标准开发的 RabbitMQ 1.0 发布。 RabbitMQ 采用 Erlang 语言开发。Erlang 语言由 Ericson 设计,专门为开发高并发和分布式系统的一种语言,在电信领域使用广泛。
RabbitMQ基础架构:

RabbitMQ 中的相关概念:

  • Broker: 接收和分发消息的应用, RabbitMQ Server就是 Message Broker
  • Virtual host: 出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中,类似于网络中的 namespace 概念。当多个不同的用户使用同一个 RabbitMQ server 提供的服务时,可以划分出多个vhost,每个用户在自己的 vhost 创建 exchange/queue 等。
  • Connection: publisher/ consumer 和 broker 之间的 TCP 连接
  • Channel: 如果每一次访问 RabbitMQ 都建立一个 Connection,在消息量大的时候建立 TCP Connection的开销将是巨大的,效率也较低。 Channel 是在 connection 内部建立的逻辑连接,如果应用程序支持多线程,通常每个thread创建单独的 channel 进行通讯, AMQP method 包含了channel id 帮助客户端和message broker 识别 channel,所以 channel 之间是完全隔离的。
  • Exchange: message 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发消息到queue 中去。
  • Queue: 消息最终被送到这里等待 consumer 取走。
  • Binding: exchange 和 queue 之间的虚拟连接, binding 中可以包含 routing key。 Binding 信息被保存到 exchange 中的查询表中,用于 message 的分发依据。
rabbitmq六种工作模式

简单模式
只有一个生产者和一个消费者。生产者将消息发送到 RabbitMQ,消费者从RabbitMQ中取走消息。

生产者代码

public class Producer_Hello {
    public static void main(String[] args) throws Exception {
        
        // 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 设置参数
        factory.setHost("127.0.0.1");
        factory.setPort(5672);
        factory.setVirtualHost("/rabbit");
        factory.setUsername("rabbit");
        factory.setPassword("rabbit");
        // 创建连接
        Connection connection = factory.newConnection();
        // 创建 channel
        Channel channel = connection.createChannel();
        // 创建队列
        
        channel.queueDeclare("Hello",true,false,false,null);
        // 发送消息
        
        channel.basicPublish("","Hello",null,"hello".getBytes());

        channel.close();
        connection.close();
    }
}

消费者代码

public class Consumer_Hello {
    public static void main(String[] args) throws Exception {
        
        // 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 设置参数
        factory.setHost("127.0.0.1");
        factory.setPort(5672);
        factory.setVirtualHost("/rabbit");
        factory.setUsername("rabbit");
        factory.setPassword("rabbit");
        // 创建连接
        Connection connection = factory.newConnection();
        // 创建 channel
        Channel channel = connection.createChannel();
        // 接受消息
        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                
                System.out.println("consumerTag: "+ consumerTag);
                System.out.println("envelope.Exchange: "+ envelope.getExchange());
                System.out.println("envelope.RoutingKey: "+ envelope.getRoutingKey());
                System.out.println("properties: "+ properties);
                System.out.println("body: "+ new String(body));
            }
        };
        
        channel.basicConsume("Hello",true,consumer);
    }
}

工作模式
一个生产者将消息发送到 RabbitMQ,多个消费者去Rabbit取出消息。消息会轮询发送个消费者。

生产者代码

public class Producer_WorkQueues {
    public static void main(String[] args) throws Exception {
        
        // 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 设置参数
        factory.setHost("127.0.0.1");
        factory.setPort(5672);
        factory.setVirtualHost("/rabbit");
        factory.setUsername("rabbit");
        factory.setPassword("rabbit");
        // 创建连接
        Connection connection = factory.newConnection();
        // 创建 channel
        Channel channel = connection.createChannel();
        // 创建队列
        
        channel.queueDeclare("Works",true,false,false,null);
        // 发送消息
        
        for(int i=0;i<10;i++){
            channel.basicPublish("","Works",null,(i+" hello rabbitmq").getBytes());
        }
        channel.close();
        connection.close();
    }
}

消费者代码:

// 消费者1
public class Consumer_WorkQueues1 {
    public static void main(String[] args) throws Exception {
        // 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 设置参数
        factory.setHost("127.0.0.1");
        factory.setPort(5672);
        factory.setVirtualHost("/rabbit");
        factory.setUsername("rabbit");
        factory.setPassword("rabbit");
        // 创建连接
        Connection connection = factory.newConnection();
        // 创建 channel
        Channel channel = connection.createChannel();
        // 接受消息
        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("body: "+ new String(body));
            }
        };
        channel.basicConsume("Works",true,consumer);
    }
}
// 消费者2
public class Consumer_WorkQueues2 {
    public static void main(String[] args) throws Exception {
        // 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 设置参数
        factory.setHost("127.0.0.1");
        factory.setPort(5672);
        factory.setVirtualHost("/rabbit");
        factory.setUsername("rabbit");
        factory.setPassword("rabbit");
        // 创建连接
        Connection connection = factory.newConnection();
        // 创建 channel
        Channel channel = connection.createChannel();
        // 接受消息
        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("body: "+ new String(body));
            }
        };
        channel.basicConsume("Works",true,consumer);
    }
}

发布订阅模式
生产者,将消息发送到 RabbitMQ中,将消息发送给所有的消费者。
在订阅模型中,多了一个 Exchange 角色。
Exchanges 交换机:RabbitMQ消息传递模型的核心思想是,生产者永远不会将任何消息直接发送到队列。生产者只能向交换机(Exchange)发送消息。
交换机是一个非常简单的东西。一边接收来自生产者的消息,另一边将消息推送到队列。
Exchange有常见以下3种类型:
➢ Fanout:广播,将消息交给所有绑定到交换机的队列
➢ Direct:定向,把消息交给符合指定routing key 的队列
➢ Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列
Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与 Exchange 绑定,或者没有符合路由规则的队列,那么消息会丢失!

生产者代码

  • 创建交换机
  • 创建队列
  • 队列与交换机绑定
  • 发送消息
    消息将会发送到所以与交换机绑定的队列中。
public class Producer_PubSub {
    public static void main(String[] args) throws Exception {
        //1.创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //2. 设置参数
        factory.setHost("127.0.0.1");//ip  默认值 localhost
        factory.setPort(5672); //端口  默认值 5672
        factory.setVirtualHost("/rabbit");//虚拟机 默认值/
        factory.setUsername("rabbit");//用户名 默认 guest
        factory.setPassword("rabbit");//密码 默认值 guest
        //3. 创建连接 Connection
        Connection connection = factory.newConnection();
        //4. 创建Channel
        Channel channel = connection.createChannel();
       

        String exchangeName = "test_fanout";
        //5. 创建交换机
        channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT,true,false,false,null);
        //6. 创建队列
        String queue1Name = "test_fanout_queue1";
        String queue2Name = "test_fanout_queue2";
        channel.queueDeclare(queue1Name,true,false,false,null);
        channel.queueDeclare(queue2Name,true,false,false,null);
        //7. 绑定队列和交换机
        
        channel.queueBind(queue1Name,exchangeName,"");
        channel.queueBind(queue2Name,exchangeName,"");

        String body = "日志信息:张三调用了findAll方法...日志级别:info...";
        //8. 发送消息
        channel.basicPublish(exchangeName,"",null,body.getBytes());
        //9. 释放资源
        channel.close();
        connection.close();
    }
}

消费者代码
消费者监听消息队列,如果有消息,则取出消息,进行消费。

// 消费者1
public class Consumer_Pubsub1 {
    public static void main(String[] args) throws Exception {
        // 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 设置参数
        factory.setHost("127.0.0.1");
        factory.setPort(5672);
        factory.setVirtualHost("/rabbit");
        factory.setUsername("rabbit");
        factory.setPassword("rabbit");
        // 创建连接
        Connection connection = factory.newConnection();
        // 创建 channel
        Channel channel = connection.createChannel();
        // 接受消息
        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("body: "+ new String(body));
            }
        };
        channel.basicConsume("test_fanout_queue1",true,consumer);
    }
}
// 消费者2
public class Consumer_Pubsub2 {
    public static void main(String[] args) throws Exception {
        // 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 设置参数
        factory.setHost("127.0.0.1");
        factory.setPort(5672);
        factory.setVirtualHost("/rabbit");
        factory.setUsername("rabbit");
        factory.setPassword("rabbit");
        // 创建连接
        Connection connection = factory.newConnection();
        // 创建 channel
        Channel channel = connection.createChannel();
        // 接受消息
        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("body: "+ new String(body));
            }
        };
        channel.basicConsume("test_fanout_queue2",true,consumer);
    }
}

路由模式
队列与交换机的绑定,不能是任意绑定了,而是要指定一个 RoutingKey(路由key)
消息的发送方在向 Exchange 发送消息时,也必须指定消息的 RoutingKey
Exchange 不再把消息交给每一个绑定的队列,而是根据消息的 Routing Key 进行判断,只有队列的Routingkey 与消息的 Routing key 完全一致,才会接收到消息。

生产者代码:
交换机和队列绑定的时候,设置 Routing key,在发送消息时到交换机时,也要设置 Routing key,交换机将根据 Routing key 发送到对应的队列中。

public class Producer_Routing {
    public static void main(String[] args) throws Exception {
        //1.创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //2. 设置参数
        factory.setHost("127.0.0.1");//ip  默认值 localhost
        factory.setPort(5672); //端口  默认值 5672
        factory.setVirtualHost("/rabbit");//虚拟机 默认值/
        factory.setUsername("rabbit");//用户名 默认 guest
        factory.setPassword("rabbit");//密码 默认值 guest
        //3. 创建连接 Connection
        Connection connection = factory.newConnection();
        //4. 创建Channel
        Channel channel = connection.createChannel();
        String exchangeName = "test_routing";
        //5. 创建交换机
        channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT,true,false,false,null);
        //6. 创建队列
        String queue1Name = "test_routing_queue1";
        String queue2Name = "test_routing_queue2";
        channel.queueDeclare(queue1Name,true,false,false,null);
        channel.queueDeclare(queue2Name,true,false,false,null);
        //7. 绑定队列和交换机
        
        channel.queueBind(queue1Name,exchangeName,"error");
        channel.queueBind(queue2Name,exchangeName,"info");
        channel.queueBind(queue2Name,exchangeName,"warning");
        String body = "日志信息:张三调用了findAll方法...日志级别:error...";
        //8. 发送消息
        channel.basicPublish(exchangeName,"error",null,body.getBytes());
        body = "日志信息:张三调用了findAll方法...日志级别:info...";
        channel.basicPublish(exchangeName,"info",null,body.getBytes());
        body = "日志信息:张三调用了findAll方法...日志级别:warning...";
        channel.basicPublish(exchangeName,"warning",null,body.getBytes());
        //9. 释放资源
        channel.close();
        connection.close();
    }
}

消费者代码:

// 消费者1
public class Consumer_Routing1 {
    public static void main(String[] args) throws Exception {
        // 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 设置参数
        factory.setHost("127.0.0.1");
        factory.setPort(5672);
        factory.setVirtualHost("/rabbit");
        factory.setUsername("rabbit");
        factory.setPassword("rabbit");
        // 创建连接
        Connection connection = factory.newConnection();
        // 创建 channel
        Channel channel = connection.createChannel();
        // 接受消息
        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("body: "+ new String(body));
            }
        };
        channel.basicConsume("test_routing_queue1",true,consumer);
    }
}
// 消费者2
public class Consumer_Routing2 {
    public static void main(String[] args) throws Exception {
        // 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 设置参数
        factory.setHost("127.0.0.1");
        factory.setPort(5672);
        factory.setVirtualHost("/rabbit");
        factory.setUsername("rabbit");
        factory.setPassword("rabbit");
        // 创建连接
        Connection connection = factory.newConnection();
        // 创建 channel
        Channel channel = connection.createChannel();
        // 接受消息
        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("body: "+ new String(body));
            }
        };
        channel.basicConsume("test_routing_queue2",true,consumer);
    }
}

主题模式
交换机模式为 Topics模式,Topic 类型与 Direct 相比,都是可以根据 RoutingKey 把消息路由到不同的队列。
只不过 Topic 类型Exchange 可以让队列在绑定 Routing key 的时候使用通配符!
Routingkey 一般都是有一个或多个单词组成,多个单词之间以 " . " 分割。
通配符规则: # 匹配一个或多个词, 匹配不多不少恰好1个词。

生产者代码

public class Producer_Topics {
    public static void main(String[] args) throws Exception {
        //1.创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //2. 设置参数
        factory.setHost("127.0.0.1");//ip  默认值 localhost
        factory.setPort(5672); //端口  默认值 5672
        factory.setVirtualHost("/rabbit");//虚拟机 默认值/
        factory.setUsername("rabbit");//用户名 默认 guest
        factory.setPassword("rabbit");//密码 默认值 guest
        //3. 创建连接 Connection
        Connection connection = factory.newConnection();
        //4. 创建Channel
        Channel channel = connection.createChannel();
        String exchangeName = "test_topics";
        //5. 创建交换机
        channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC,true,false,false,null);
        //6. 创建队列
        String queue1Name = "test_topics_queue1";
        String queue2Name = "test_topics_queue2";
        channel.queueDeclare(queue1Name,true,false,false,null);
        channel.queueDeclare(queue2Name,true,false,false,null);
        //7. 绑定队列和交换机
        channel.queueBind(queue1Name,exchangeName,"#.error");
        channel.queueBind(queue1Name,exchangeName,"order.*");
        channel.queueBind(queue2Name,exchangeName,"*.*");
        String body = "日志信息:张三调用了findAll方法...日志级别:goods.error...";
        //8. 发送消息
        channel.basicPublish(exchangeName,"goods.error",null,body.getBytes());
        body = "日志信息:张三调用了findAll方法...日志级别:order.info...";
        channel.basicPublish(exchangeName,"order.info",null,body.getBytes());
        body = "日志信息:张三调用了findAll方法...日志级别:goods.info...";
        channel.basicPublish(exchangeName,"goods.info",null,body.getBytes());
        //9. 释放资源
        channel.close();
        connection.close();
    }
}

消费者代码

// 消费者1
public class Consumer_Topics1 {
    public static void main(String[] args) throws Exception {
        // 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 设置参数
        factory.setHost("127.0.0.1");
        factory.setPort(5672);
        factory.setVirtualHost("/rabbit");
        factory.setUsername("rabbit");
        factory.setPassword("rabbit");
        // 创建连接
        Connection connection = factory.newConnection();
        // 创建 channel
        Channel channel = connection.createChannel();
        // 接受消息
        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("body: "+ new String(body));
            }
        };
        channel.basicConsume("test_topics_queue1",true,consumer);
    }
}
// 消费者2
public class Consumer_Topics2 {
    public static void main(String[] args) throws Exception {
        // 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 设置参数
        factory.setHost("127.0.0.1");
        factory.setPort(5672);
        factory.setVirtualHost("/rabbit");
        factory.setUsername("rabbit");
        factory.setPassword("rabbit");
        // 创建连接
        Connection connection = factory.newConnection();
        // 创建 channel
        Channel channel = connection.createChannel();
        // 接受消息
        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("body: "+ new String(body));
            }
        };
        channel.basicConsume("test_topics_queue2",true,consumer);
    }
}
RabbitMQ 的工作模式总结
  1. 简单模式 HelloWorld,一个生产者、一个消费者,不需要设置交换机(使用默认的交换机)。
  2. 工作队列模式 Work Queue,一个生产者、多个消费者(竞争关系),不需要设置交换机(使用默认的交换机)。
  3. 发布订阅模式 Publish/subscribe,需要设置类型为 fanout 的交换机,并且交换机和队列进行绑定,当发送消息到交换机后,交换机会将消息发送到绑定的队列。
  4. 路由模式 Routing,需要设置类型为 direct 的交换机,交换机和队列进行绑定,并且指定 routing key,当发送消息到交换机后,交换机会根据 routing key 将消息发送到对应的队列。
  5. 通配符模式 Topic,需要设置类型为 topic 的交换机,交换机和队列进行绑定,并且指定通配符方式的 routing key,当发送消息到交换机后,交换机会根据 routing key 将消息发送到对应的队列。
RabbitMQ高级特性 消息的可靠投递

在使用 RabbitMQ 的时候,作为消息发送方希望杜绝任何消息丢失或者投递失败场景。 RabbitMQ 为我们提供了两种方式用来控制消息的投递可靠性模式。
confirm 确认模式
return 退回模式

rabbitmq 整个消息投递的路径为:
producer—>rabbitmq broker—>exchange—>queue—>consumer
消息从 producer–>exchange 则会返回一个 confirmCallback 。
消息从 exchange–>queue 投递失败则会返回一个 returnCallback 。

/confirm/iCallback代码
确认模式:

  1. 确认模式开启:ConnectionFactory中开启publisher-confirms=“true”
  2. 在rabbitTemplate定义/confirm/iCallBack回调函数
  3. 发送消息,消息从producer–>exchange 则会返回一个 /confirm/iCallback
 	public void test/confirm/i() {
        //2. 定义回调
        rabbitTemplate.set/confirm/iCallback(new RabbitTemplate./confirm/iCallback() {
            
            @Override
            public void /confirm/i(CorrelationData correlationData, boolean ack, String cause) {
                System.out.println("/confirm/i方法被执行了....");
                if (ack) {
                    //接收成功
                    System.out.println("接收成功消息" + cause);
                } else {
                    //接收失败
                    System.out.println("接收失败消息" + cause);
                    //做一些处理,让消息再次发送。
                }
            }
        });
        //3. 发送消息,指定错误的交换机,在/confirm/i进行处理。
        rabbitTemplate.convertAndSend("test_exchange_/confirm/i111", "/confirm/i", "message /confirm/i....");
    }

returnCallback代码
回退模式: 当消息发送给Exchange后,Exchange路由到Queue失败是 才会执行 ReturnCallBack
步骤:

  1. 开启回退模式:ConnectionFactory 中设置 publisher-returns=“true”
  2. 设置ReturnCallBack
  3. 设置Exchange处理消息的模式:
    如果消息没有路由到Queue,则丢弃消息(默认)
    如果消息没有路由到Queue,返回给消息发送方ReturnCallBack
    public void testReturn() {
        //设置交换机处理失败消息的模式
        rabbitTemplate.setMandatory(true);
        //2.设置ReturnCallBack
        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                System.out.println("return 执行了....");
                System.out.println(message);
                System.out.println(replyCode);
                System.out.println(replyText);
                System.out.println(exchange);
                System.out.println(routingKey);
                //处理
            }
        });
        //3. 发送消息
        rabbitTemplate.convertAndSend("test_exchange_/confirm/i", "/confirm/i", "message /confirm/i....");
    }

确认模式:
设置ConnectionFactory的publisher-confirms=“true” 开启确认模式。
使用rabbitTemplate.set/confirm/iCallback设置回调函数。
当消息发送到exchange后回调/confirm/i方法。在方法中判断ack,如果为true,则发送成功,如果为false,则发送失败,需要处理。
回退模式:
设置ConnectionFactory的publisher-returns=“true” 开启 回退模式。
使用rabbitTemplate.setReturnCallback设置回退方法,当消息从exchange路由到queue失败后,如果设置了rabbitTemplate.setMandatory(true)参数,则会将消息回退给producer。并执行回调方法returnedMessage。

消息的确认

Consumer Ack:ack指Acknowledge,确认。 表示消费端收到消息后的确认方式。
有三种确认方式:

  • 自动确认: acknowledge=“none”
  • 手动确认: acknowledge=“manual”
  • 根据异常情况确认: acknowledge=“auto”,(这种方式使用麻烦,不作讲解)

自动确认是指,当消息一旦被Consumer接收到,则自动确认收到,并将相应 message 从 RabbitMQ 的消息缓存中移除。自动确认是默认的方法。默认 acknowledge=“none”。
手动确认方式,则需要在业务处理成功后,调用channel.basicAck(),手动签收,如果出现异常,则调用channel.basicNack()方法,让其自动重新发送消息。手动确认,acknowledge = “manual”。

手动确认代码
Consumer ACK机制:

  1. 设置手动签收。acknowledge=“manual”
  2. 让监听器类实现ChannelAwareMessageListener接口
  3. 如果消息成功处理,则调用channel的 basicAck()签收
  4. 如果消息处理失败,则调用channel的basicNack()拒绝签收,broker重新发送给consumer
public class AckListener implements ChannelAwareMessageListener {
    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try {
            //1.接收转换消息
            System.out.println(new String(message.getBody()));
            //2. 处理业务逻辑
            System.out.println("处理业务逻辑...");
            int i = 3/0;//出现错误
            //3. 手动签收
            
            channel.basicAck(deliveryTag,true);
        } catch (Exception e) {
            //4.拒绝签收
            
            channel.basicNack(deliveryTag,true,true);
             
        }
    }
}

Consumer Ack小结:
在rabbit:listener-container标签中设置acknowledge属性,设置ack方式 none:自动确认, manual:手动确认。
如果在消费端没有出现异常,则调用channel.basicAck(deliveryTag,false);方法确认签收消息。
如果出现异常,则在catch中调用 basicNack或 basicReject,拒绝消息,让MQ重新发送消息。

消息可靠性总结
  1. 持久化 exchange要持久化,queue要持久化,massage要持久化。
  2. 生产方确认/confirm/i
  3. 消费方确认Ack
  4. Broker高可用
消费端限流

消费端限流是指通过设置消费端每次从MQ获取消息的个数,达到对消费端限流的目的。
消费端代码
Consumer 限流机制

  1. 确保ack机制为手动确认。
  2. listener-container配置属性perfetch = 1,表示消费端每次从mq拉去一条消息来消费,直到手动确认消费完毕后,才会继续拉去下一条消息。
public class QosListener implements ChannelAwareMessageListener {

    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        Thread.sleep(1000);
        //1.获取消息
        System.out.println(new String(message.getBody()));

        //2. 处理业务逻辑

        //3. 签收
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
    }
}
消息的TTL

TTL 全称 Time To Live(存活时间/过期时间)。当消息到达存活时间后,还没有被消费,会被自动清除。RabbitMQ可以对消息设置过期时间,也可以对整个队列(Queue)设置过期时间。

代码实现:
TTL:过期时间

  1. 队列统一过期
  2. 消息单独过期
    • 如果设置了消息的过期时间,也设置了队列的过期时间,它以时间短的为准。
    • 队列过期后,会将队列所有消息全部移除。
    • 消息过期后,只有消息在队列顶端,才会判断其是否过期(移除掉)

设置队列的过期时间。


    
    
        
        
    



    
        
    

在代码中向队列发送消息

public void testTtl() {
	// 向队列发送消息,则消息的TTL为队列设置的TTL
	for (int i = 0; i < 10; i++) {
        // 发送消息
        rabbitTemplate.convertAndSend("test_exchange_ttl", "ttl.hehe", "message ttl....");
    }
   	// 设置单独的消息的过期时间
    // 消息后处理对象,设置一些消息的参数信息
    MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
        @Override
        public Message postProcessMessage(Message message) throws AmqpException {
            //1.设置message的信息
            message.getMessageProperties().setExpiration("5000");//消息的过期时间
            //2.返回该消息
            return message;
        }
    };     
    // 发送消息,在发送消息的时候,使用messagePostProcessor,对消息进行设置。
    rabbitTemplate.convertAndSend("test_exchange_ttl", "ttl.hehe", "message ttl....",messagePostProcessor);    
}
死信队列

死信队列,英文缩写: DLX 。 Dead Letter Exchange(死信交换机),当消息成为Dead message后,可以被重新发送到另一个交换机,这个交换机就是DLX。

消息成为死信的三种情况:

  1. 队列消息长度到达限制;
  2. 消费者拒接消费消息, basicNack/basicReject,并且不把消息重新放入原目标队列,requeue=false;
  3. 原队列存在消息过期设置,消息到达超时时间未被消费;

代码实现:
正常队列和交换机


    
    
        
        
        
        
        
        
        
        
    

    

    
        
    

死信队列和死信交换机



    
        
    

测试死信队列

public void testDlx(){
    //1. 测试过期时间,死信消息
    rabbitTemplate.convertAndSend("test_exchange_dlx","test.dlx.haha","我是一条消息,我会死吗?");
    //2. 测试长度限制后,消息死信
    for (int i = 0; i < 20; i++) {
        rabbitTemplate.convertAndSend("test_exchange_dlx","test.dlx.haha","我是一条消息,我会死吗?");
    }
    //3. 测试消息拒收
    rabbitTemplate.convertAndSend("test_exchange_dlx","test.dlx.haha","我是一条消息,我会死吗?");
}
// 监听正常队列,拒绝消息,测试是否进入死信队列。
// 
public class DlxListener implements ChannelAwareMessageListener {
    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try {
            //1.接收转换消息
            System.out.println(new String(message.getBody()));
            //2. 处理业务逻辑
            System.out.println("处理业务逻辑...");
            int i = 3/0;//出现错误
            //3. 手动签收
            channel.basicAck(deliveryTag,true);
        } catch (Exception e) {
            //e.printStackTrace();
            System.out.println("出现异常,拒绝接受");
            //4.拒绝签收,不重回队列 requeue=false
            channel.basicNack(deliveryTag,true,false);
        }
    }
}

死信队列小结:

  1. 死信交换机和死信队列和普通的没有区别
  2. 当消息成为死信后,如果该队列绑定了死信交换机,则消息会被死信交换机重新路由到死信队
    列。
  3. 消息成为死信的三种情况:
    1. 队列消息长度到达限制;
    2. 消费者拒接消费消息,并且不重回队列;
    3. 原队列存在消息过期设置,消息到达超时时间未被消费;
延迟队列

延迟队列,即消息进入队列后不会立即被消费,只有到达指定时间后,才会被消费。
在RabbitMQ中并未提供延迟队列功能,但是可以使用: TTL+死信队列 组合实现延迟队列的效果。
场景示例:订单下单后, 30分钟未支付,取消订单,回滚库存

设置队列过期时间,在队列中消息过期后,消息会进入死信队列,然后发送到库存系统,完成消息延迟消费的功能。

代码实现
定义正常队列和交换机


    
    
        
        
        
    
    

    
        
    

定义死信队列和死信交换机

 


    
        
    

测试延时队列,发送信息,在队列中30分钟后过期,消息被发送到死信队列中。

public  void testDelay() throws InterruptedException {
    //发送订单消息。 
    rabbitTemplate.convertAndSend("order_exchange","order.msg","订单信息...");
}
消息追踪-rabbitmq_tracing

在使用任何消息中间件的过程中,难免会出现某条消息异常丢失的情况。对于RabbitMQ而言,可能
是因为生产者或消费者与RabbitMQ断开了连接,而它们与RabbitMQ又采用了不同的确认机制;也
有可能是因为交换器与队列之间不同的转发策略;甚至是交换器并没有与任何队列进行绑定,生产者
又不感知或者没有采取相应的措施;另外RabbitMQ本身的集群策略也可能导致消息的丢失。

在RabbitMQ中可以使用Firehose和rabbitmq_tracing插件功能来实现消息追踪。

消息追踪-Firehose
firehose的机制是将生产者投递给rabbitmq的消息, rabbitmq投递给消费者的消息按照指定的格式
发送到默认的exchange上。这个默认的exchange的名称为amq.rabbitmq.trace,它是一个topic类
型的exchange。发送到这个exchange上的消息的routing key为 publish.exchangename 和
deliver.queuename。其中exchangename和queuename为实际exchange和queue的名称,分别
对应生产者投递到exchange的消息,和消费者从queue上获取的消息。
打开 trace 会影响消息写入功能,适当打开后请关闭。
rabbitmqctl trace_on:开启Firehose命令
rabbitmqctl trace_off:关闭Firehose命令

消息追踪-rabbitmq_tracing
启用插件: rabbitmq-plugins enable rabbitmq_tracing
添加一个 trace,当对应的 虚拟机中有消息的投递和消费,将会记录下消息的详细信息。

如以下信息:
2021-12-01 8:12:42:058: Message published
Node: rabbit@YYYGF
Connection: rabbit@Admin.1638328642.15678.0
Virtual host: /
User: rabbit
Channel: 1
Exchange:
Routing keys: [<<“rooo”>>]
Routed queues: [<<“rooo”>>]
Properties: [{<<“delivery_mode”>>,signedint,1},{<<“headers”>>,table,[]}]
Payload:
asdasd

RabbitMQ的应用问题

消息可靠性保障:消息补偿机制,确保消息100% 发送成功。

过程:
Producer将要发送的消息存入DB,然后发送消息到队列,Consumer监听到队列有消息,取出消息消费,然后发送确认消息到队列,回调检查服务监听到确认消息,将消息写入MDB。Producer在发送消息后,会过一段时间发送一条延迟消息(与消息内容一样),回调检查服务会,监听到延迟消息,然后和MDB中的确认消息比较,如果MDB中存在这条消息的确认消息,证明,消息已经被消费过了,就什么操作也不做,如果MDB中不存在这条消息的确认消息,则说明消息丢失,回调检查服务,将会通知Producer重新发送消息。
如果消息发送失败,延迟消息也发送失败,定时检查服务,将会定时比较Producer的DB和回调检查服务存放确认消息的MDB。如果MDB中没有DB中消息对应的确认消息,则证明消息发送失败,Producer将会重新发送消息。

消息幂等性保障
幂等性指一次和多次请求某一个资源,对于资源本身应该具有同样的结果。也就是说,其任
意多次执行对资源本身所产生的影响均与一次执行的影响相同。
消息幂等性保障简单的实现机制:乐观锁的实现。
在生产端,发送相同的消息,在消息中携带版本信息。
在消费端,进行版本的判断,消费完成之后,版本改变,就可以避免消费相同的消息。

例如:
生产端发送相同的消息:
id=1,money=500,version=1
id=1,money=500,version=1
在消费端消费消息的时候,判断版本信息。消费完成,版本改变。
第一次执行: version=1
update account set money = money - 500 , version = version + 1 where id = 1 and version = 1
第二次执行: version=2
update account set money = money - 500 , version = version + 1 where id = 1 and version = 1

RabbitMQ集群的搭建

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/629751.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号