栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

RabbitMQ 详细到落泪

RabbitMQ 详细到落泪

RabbitMQ笔记

高级部分点击
演示代码下载

RabbitMQ是一款Message Queue(消息队列)的产品,是在消息的传输过程中保存消息的容器。多用于分布式系统之间进行通信。 RabbitMQ官方地址:http://www.rabbitmq.com/

1.基础架构

RabbitMQ 中的相关概念:

    Broker:接收和分发消息的应用,RabbitMQ Server就是 Message Broker

    Virtual host:出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中,类似于网络中的 namespace 概念。当多个不同的用户使用同一个 RabbitMQ server 提供的服务时,可以划分出多个vhost,每个用户在自己的 vhost 创建 exchange/queue 等

    AMQP,即 Advanced Message Queuing Protocol(高级消息队列协议),是一个网络协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,遵循此协议,不收客户端和中间件产品和开发语言限制

    Connection:publisher/consumer 和 broker 之间的 TCP 连接

    Channel:如果每一次访问 RabbitMQ 都建立一个 Connection,在消息量大的时候建立 TCPConnection的开销将是巨大的,效率也较低。Channel 是在 connection 内部建立的逻辑连接,如果应用程序支持多线程,通常每个thread创建单独的 channel 进行通讯,AMQP method 包含了channel id 帮助客户端和message broker 识别 channel,所以 channel 之间是完全隔离的。Channel 作为轻量级的 Connection 极大减少了操作系统建立 TCP connection 的开销

    Exchange:message 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发消息到queue 中去。常用的类型有:direct (point-to-point), topic (publish-subscribe) andfanout (multicast)

    Queue:消息最终被送到这里等待 consumer 取走

    Binding:exchange 和 queue 之间的虚拟连接,binding 中可以包含 routing key。Binding 信息被保存到 exchange 中的查询表中,用于 message 的分发依据

2. 相关配置 开启管理界⾯
 rabbitmq-plugins enable rabbitmq_management
修改默认配置信息(⽐如修改密码、配置等等,例如: loopback_users 中的 <<“guest”>>,只保留guest)
vim /usr/lib/rabbitmq/lib/rabbitmq_server-3.6.5/ebin/rabbit.app
service rabbitmq-server start # 启动服务
service rabbitmq-server stop # 停⽌服务
service rabbitmq-server restart # 重启服务 

访问管理页面 http://ip地址:15672 账号密码都是guest

如果出现打不开管理页面可以考虑Linux防火墙

# 关闭防火墙服务
systemctl disable firewalld
systemctl stop firewalld  
有6种工作模式

我们使用java来学习RabbitMQ首先我们先做一个连接的准备工作


    com.rabbitmq
    amqp-client
    5.6.0

public class ConnectionUtils {
	//消费者和生产者共用
    public static Connection getConnection() throws IOException, TimeoutException {
        //创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //主机地址;默认为 localhost
        connectionFactory.setHost("192.168.220.128");
        //连接端口;默认为 5672
        connectionFactory.setPort(5672);
        //虚拟主机名称;默认为 /
        connectionFactory.setVirtualHost("/demo");
        //连接用户名;默认为guest
        connectionFactory.setUsername("sgp");
        //连接密码;默认为guest
        connectionFactory.setPassword("sgp");
        //创建连接
        Connection connection = connectionFactory.newConnection();
        return connection;
    }
}

下面我们开始介绍

1.最简单的模式

只有一个消费者 一个生产者和一个队列

[|||] -> (C)" />

生产者

1. 获取连接
2. 创建频道 Channel
3. 声明或者创建队列
4. 向队列中发送消息

=== 需要注意的是队列只需要在生产者或者消费者创建一次 另一个不写也可 ===

public class Producer {
	//定义队列名字
    public static String QUEUE_NAME = "work_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtils.getConnection();
        //创建频道
        Channel channel = connection.createChannel();
        //声明(创建)队列
        
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);

        for (int i = 1; i <= 30; i++) {
            //发送消息
            String message = "你好:小兔子~ work queue模式---" + i;
            
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
            System.out.println("已发送消息:" + message);
        }
        //释放资源
        channel.close();
        connection.close();
    }
}	

消费者

    获取连接创建频道 Channel声明或者创建队列接收消息
public class Consumer1 {


    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtils.getConnection();
        //创建频道
        Channel channel = connection.createChannel();
        //声明(创建)队列
        
        channel.queueDeclare(Producer.QUEUE_NAME, true, false, false, null);
        //接收消息
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //路由key
                System.out.println("路由key为:" + envelope.getRoutingKey());
                //交换机
                System.out.println("交换机为:" + envelope.getExchange());
                //消息id
                System.out.println("消息id为:" + envelope.getDeliveryTag());
                //收到的消息
                System.out.println("消费者1 - 接收到的消息为:" + new String(body, "utf-8"));
            }
        };

        
        channel.basicConsume(Producer.QUEUE_NAME, true, consumer);
        //释放资源
//        channel.close();
//        connection.close();
    }
}	

2.工作模式

与简单模式相比消费者可以有多个 彼此之间形成竞争者 默认按轮询的方式去消费消息

​ 与简单模式就是多了一个消费者

3.发布订阅模式Publish/Subscribe

生产者发送消息到交换机,交换机在转发给绑定该交换机队列,同时消费者一直监听与他绑定的队列一旦队列中有消息会立即收到;发布与订阅使用的交换机类型为:fanout

RabbitMQ 中消息传递模型的核心思想是生产者从不直接向队列发送任何消息。实际上,生产者通常根本不知道消息是否会被传递到任何队列。生产者只能向交换器发送消息。交换是一件非常简单的事情。一方面它接收来自生产者的消息,另一方面它将它们推送到队列中。那么交换机怎么知道应该吧消息发送到那个队列呢?其规则由 交换类型定义。有几种可用的交换类型:direct、topic、headers 和fanout。

生产者

    获取连接创建频道 Channel声明交换机类型为fanout声明或者创建队列绑定交换机和对应队列向交换机发送消息
public class Producer {
	
    //定义交换机
    public static String FANOUT_EXCHAGE = "fanout_exchage";
    //定义队列
    public static String FANOUT_QUEUE_1 = "fanout_queue_1";
    public static String FANOUT_QUEUE_2 = "fanout_queue_2";


    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtils.getConnection();
        //创建频道
        Channel channel = connection.createChannel();

        
        channel.exchangeDeclare(FANOUT_EXCHAGE, BuiltinExchangeType.FANOUT);

        //声明(创建)队列
        
        channel.queueDeclare(FANOUT_QUEUE_1, true, false, false, null);
        channel.queueDeclare(FANOUT_QUEUE_2, true, false, false, null);

        //队列绑定交换机
        channel.queueBind(FANOUT_QUEUE_1, FANOUT_EXCHAGE, "");
        channel.queueBind(FANOUT_QUEUE_2, FANOUT_EXCHAGE, "");

        for (int i = 1; i <= 10; i++) {
            //发送消息
            String message = "你好:小兔子~ fanout 模式---" + i;
            
            channel.basicPublish(FANOUT_EXCHAGE, "", null, message.getBytes());
            System.out.println("已发送消息:" + message);
        }
        //释放资源
        channel.close();
        connection.close();
    }
}

消费者 可以有多个

    获取连接创建频道 Channel声明或者创建队列绑定交换机接收队列中的消息
public class Consumer1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtils.getConnection();
        //创建频道
        Channel channel = connection.createChannel();
        //声明(创建)队列
        
        channel.queueDeclare(Producer.FANOUT_QUEUE_1, true, false, false, null);

        //队列绑定交换机
        channel.queueBind(Producer.FANOUT_QUEUE_1, Producer.FANOUT_EXCHAGE, "");

        //接收消息
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //路由key
                System.out.println("路由key为:" + envelope.getRoutingKey());
                //交换机
                System.out.println("交换机为:" + envelope.getExchange());
                //消息id
                System.out.println("消息id为:" + envelope.getDeliveryTag());
                //收到的消息
                System.out.println("消费者1 - 接收到的消息为:" + new String(body, "utf-8"));
            }
        };

        
        channel.basicConsume(Producer.FANOUT_QUEUE_1, true, consumer);
        //释放资源 注释是为了持续监听
//        channel.close();
//        connection.close();
    }
}

如果创建多个消费者来监听多个队列 那么当生产者发送消息时 所有的消费者都会接收到信息

4.Routing路由模式
    队列与交换机的绑定,不能是任意绑定了,而是要指定一个 RoutingKey (路由key)
    消息的发送方在 向 Exchange发送消息时,也必须指定消息的 RoutingKey 。Exchange不再把消息交给每一个绑定的队列,而是根据消息的 Routing Key 进行判断,只有队列的 Routingkey 与消息的 Routing key 完全一致,才会接收到消息

生产者

    获取连接创建频道 Channel声明交换机的类型为direct声明或者创建队列通过RoutingKey绑定交换机和列队向交换机发送消息
public class Producer {

    public static String DIRECT_EXCHAGE = "direct_exchage";
    public static String DIRECT_QUEUE_INSERT = "direct_queue_insert";
    public static String DIRECT_QUEUE_UPDATE = "direct_queue_update";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtils.getConnection();
        //创建频道
        Channel channel = connection.createChannel();

        
        channel.exchangeDeclare(DIRECT_EXCHAGE, BuiltinExchangeType.DIRECT);

        //声明(创建)队列
        
        channel.queueDeclare(DIRECT_QUEUE_INSERT, true, false, false, null);
        channel.queueDeclare(DIRECT_QUEUE_UPDATE, true, false, false, null);

        //队列绑定交换机
        channel.queueBind(DIRECT_QUEUE_INSERT, DIRECT_EXCHAGE, "insert");
        channel.queueBind(DIRECT_QUEUE_UPDATE, DIRECT_EXCHAGE, "update");

        //发送消息
        String message = "新增商品。 路由模式:routing key insert";
        
        channel.basicPublish(DIRECT_EXCHAGE, "insert", null, message.getBytes());
        System.out.println("已发送消息:" + message);

        //发送消息
        message = "修改商品。 路由模式:routing key update";
        
        channel.basicPublish(DIRECT_EXCHAGE, "update", null, message.getBytes());
        System.out.println("已发送消息:" + message);
        //释放资源
        channel.close();
        connection.close();
    }
}

消费者

    获取连接创建频道 Channel声明交换机的类型为direct声明或者创建队列通过RoutingKey绑定交换机和列队向交换机发送消息
public class Consumer1 {

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtils.getConnection();
        //创建频道
        Channel channel = connection.createChannel();
        //声明(创建)队列
        
        channel.queueDeclare(Producer.DIRECT_QUEUE_INSERT, true, false, false, null);

        //队列绑定交换机
        channel.queueBind(Producer.DIRECT_QUEUE_INSERT, Producer.DIRECT_EXCHAGE, "insert");

        //接收消息
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //路由key
                System.out.println("路由key为:" + envelope.getRoutingKey());
                //交换机
                System.out.println("交换机为:" + envelope.getExchange());
                //消息id
                System.out.println("消息id为:" + envelope.getDeliveryTag());
                //收到的消息
                System.out.println("消费者1 - 接收到的消息为:" + new String(body, "utf-8"));
            }
        };

        
        channel.basicConsume(Producer.DIRECT_QUEUE_INSERT, true, consumer);

        //释放资源
//        channel.close();
//        connection.close();
    }
}

将接收到routingkey为insert的消息 同理可写出routingkey为update的消费者

5.Topics通配符模式

Topic 类型与 Direct 相比,都是可以根据 RoutingKey 把消息路由到不同的队列。只不过 Topic 类型Exchange 可以让队列在绑定 Routing key 的时候使用通配符!Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,

通配符规则:# 匹配一个或多个词, * 匹配不多不少恰好1个词举例:

item.# :能够匹配 item.insert.abc 或者 item…* :只能匹配 item.insert

和路由模式基本相同 只不过采用通配符的RoutingKey 来绑定交换机和队列 代码就不写啦

6.远程过程调用 (RPC)不常用

RabbitMQ工作模式:
    简单模式 HelloWorld 一个生产者、一个消费者,不需要设置交换机(使用

默认的交换机)

    工作队列模式 Work Queue 一个生产者、多个消费者(竞争关系),不需要设置交换机(使用默认的交换机)发布订阅模式 Publish/subscribe 需要设置类型为fanout的交换机,并且交换机和队列进行绑定,当发送消息到交换机后,交换机会将消息发送到绑定的队列路由模式 Routing 需要设置类型为direct的交换机,交换机和队列进行绑定,并且指定routingkey,当发送消息到交换机后,交换机会根据routing key将消息发送到对应的队列通配符模式 Topic 需要设置类型为topic的交换机,交换机和队列进行绑定,并且指定通配符方式的routing key,当发送消息到交换机后,交换机会根据routing key将消息发送到对应的队列
高级特性 1.消息的可靠性

在使用 RabbitMQ 的时候,作为消息发送方希望杜绝任何消息丢失或者投递失败场景。RabbitMQ 为我们提供了两种方式用来控制消息的投递可靠性模式。

rabbitmq 整个消息投递的路径为:
producer—>rabbitmq broker—>exchange—>queue—>consumer

1.confirm 确认模式

消息从 producer 到 exchange 则会返回一个 /confirm/iCallback。

需要开启

channel./confirm/iSelect();//开启确认模式
channel.waitFor/confirm/isOrDie(3000); //设置最晚响应时间

可以通过add/confirm/iListener方法来接收

//lambda表达式传参
channel.add/confirm/iListener((sequenceNumber, multiple) -> {
    System.out.println(sequenceNumber+"  送到exchange了 ");
    // code when message is /confirm/ied
}, (sequenceNumber, multiple) -> {
    System.out.println(sequenceNumber+"  送没了");
    // code when message is nack-ed
});
2.return 退回模式

​ 消息从 exchange–>queue 投递失败则会返回一个 returnCallback 或者ReturnListener (对参数不同)。 都可以通过参数的形式使用addReturnListener方法接收

public interface ReturnCallback {
    void handle(Return returnMessage);
}
public interface ReturnListener {
    void handleReturn(int replyCode,
            String replyText,
            String exchange,
            String routingKey,
            AMQP.BasicProperties properties,
            byte[] body)
        throws IOException;
}
channel.addReturnListener(new ReturnListener() {
    @Override
    public void handleReturn(int replyCode, String replyText, String exchange,
                             String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {

        System.err.println("---------handle  return----------");
        System.err.println("replyCode: " + replyCode);
        System.err.println("replyText: " + replyText);
        System.err.println("exchange: " + exchange);
        System.err.println("routingKey: " + routingKey);
        System.err.println("properties: " + properties);
        System.err.println("body: " + new String(body));
    }
});	

我们将利用这两个 callback 控制消息的可靠性投递

ack应答

是保证消费者在接收到消息做出的反应

ack指Acknowledge,确认。 表示消费端收到消息后的确认方式。
通过设置basicConsume方法的autoAck参数来设置自动确认还是手动 (自动的性能高)

手动提交相关的有三个方法

    拒绝 用于程序出错 第一个参数是消息的序列号 第二个参数是是否可以多拒绝 第三个参数是 是否重新排队
long deliveryTag = envelope.getDeliveryTag();
channel.basicNack(deliveryTag, true,true);

    确定接收 第二个参数是 确定所有消息接收

    channel.basicAck(deliveryTag,true);
    

    拒绝接收 第二个参数表示是否重新排队

    channel.basicReject(deliveryTag,true);
    

其中自动确认是指,当消息一旦被Consumer接收到,则自动确认收到,并将相应 message 从
RabbitMQ 的消息缓存中移除。但是在实际业务处理中,很可能消息接收到,业务处理出现异常,那么该消息就会丢失。如果设置了手动确认方式,则需要在业务处理成功后,调用channel.basicAck(),手动签收,如果出现异常,则调用channel.basicNack()方法,让其自动重新发送消息。

Acknowledge,确认。 表示消费端收到消息后的确认方式。
通过设置basicConsume方法的autoAck参数来设置自动确认还是手动 (自动的性能高)

手动提交相关的有三个方法

    拒绝 用于程序出错 第一个参数是消息的序列号 第二个参数是是否可以多拒绝 第三个参数是 是否重新排队
long deliveryTag = envelope.getDeliveryTag();
channel.basicNack(deliveryTag, true,true);

    确定接收 第二个参数是 确定所有消息接收

    channel.basicAck(deliveryTag,true);
    

    拒绝接收 第二个参数表示是否重新排队

    channel.basicReject(deliveryTag,true);
    

其中自动确认是指,当消息一旦被Consumer接收到,则自动确认收到,并将相应 message 从
RabbitMQ 的消息缓存中移除。但是在实际业务处理中,很可能消息接收到,业务处理出现异常,那么该消息就会丢失。如果设置了手动确认方式,则需要在业务处理成功后,调用channel.basicAck(),手动签收,如果出现异常,则调用channel.basicNack()方法,让其自动重新发送消息。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/752123.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号