栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

RabbitMQ学习笔记

RabbitMQ学习笔记

1 什么是MQ

MQ(message quene):翻译为消息队列,通过典型的生产者和消费者模型,生产者不断向消息队列中生产消息,消费者不断的从队列中获取消息。
因为消息的生产和消费都是异步的,而且只关心消息的发送和接收,没有业务逻辑的侵入,轻松的实现系统问题解耦。
通过利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成。

不同的MQ特点

ActiveMQ 是Apache出品,最流行,能力强劲的开源消息总线,它是一个完全支持JMS规范的消息中间件。
丰富的API,多种集群架构模式让ActiveMQ在业界成为老牌消息中间件,在中小企业中很受欢迎Kafka 是linkedIn开源的分布式发布-订阅消息系统,目前归属于Apache顶级项目。kafka主要的特点是基于Pull的模式来处理消息消费。追求高吞吐量,一开始的目的就是用于
日志手机和传输。0.8版本开始支持负载,不支持事务,对消息的重复、丢失、错误没有严格要求,适合生产大量数据互联网数据收集业务RocketMQ 是阿里开源的消息中间件,他是纯java开发,具有高吞吐量,高可用性,适合大规模分布式系统应用的特点,RocketMQ 思路起于与Kafka,但并不是Kafka的一个Copy,
他对消息可靠传输及事务做了优化,目前在阿里集团广泛应用于交易、充值、流计算、消息推送,日志流式处理,binglog分布式等场景RabbitMQ 是使用Erlang语言开发的开源消息队列系统,基于AMQP协议来实现。AMQP的主要特征是面向消息,队列、路由(包括点对点和发布、订阅)、可靠性、安全。
AMQP协议更多用在企业系统对数据一致性、稳定性和可靠性要求较高的场景,对性能和吞吐量要求其次。 2 Rabbitmq的引言

实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang语言编写的,而集群和故障转移是构建在开放电信平台框架上的。
所有主要的编程语言均有与代理接口通讯的客户端库。
AMQP,即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。
基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。Erlang中的实现有RabbitMQ等。

2.1 安装MQ

网上很多就不演示了,后期有空了在尝试自己安装试试

2.2 第一种模型(直连)


在上图模型中,有以下概念:

P生产者,也就是要发送消息的程序C消费者,消息的接受者,会一直等待消息到来Queue消息队列,图中红色部分。类似一个邮箱,可以缓存消息;生产者向其中投递消息,消费者从中获取消息。
生产者

public class Provider {
    @Test
    public void testSendMessage() throws IOException, TimeoutException {
        //创建连接mq连接工厂对象
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //连接主机 端口 虚拟主机 虚拟主机的用户名和密码
        connectionFactory.setHost("47.110.39.229");
        connectionFactory.setPort(3302);
        //connectionFactory.setVirtualHost("");
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("xxx");
        //创建连接 获取通道
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        
        channel.queueDeclare("hello",true,false,false,null);
        //发布消息
        //参数1交换机名称,参数2队列名称,BasicProperties 参数3传递消息额外设置 body参数4消息的具体内容
        channel.basicPublish("","hello",null,"hello rabbitmq".getBytes());

        channel.close();
        connection.close();
    }
}

消费者

public class Customer {
    public static void main(String[] args)throws IOException, TimeoutException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("47.110.39.229");
        connectionFactory.setPort(3302);
        //connectionFactory.setVirtualHost("");
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("xxx");

        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare("hello",true,false,false,null);
        
        channel.basicConsume("hello",true,new DefaultConsumer(channel){
            //最后一个参数:消息队列中的消息
            public void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties,byte[] body){
                System.out.println("body======>"+new String(body));
            }
        });
    }
}

工具封装

public class RabbitMQUtils {
    private static  ConnectionFactory connectionFactory;
    //静态代码块,类加载的时候只执行一次
    static {
        connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("47.110.39.229");
        connectionFactory.setPort(3302);
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("xxx");
    }

    public static Connection getConnection() {
        try {
            return connectionFactory.newConnection();
        } catch (Exception e) {
            e.printStackTrace();
        }
        return null;
    }

    public static void closeConnectionAndChannel(Connection con, Channel channel){
        try {
            if(channel!=null){
                channel.close();
            }
            if(con!=null){
                con.close();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

源码分析

String queue, 队列名称,如果队列不存在自动创建
boolean durable, 用来定于队列是否需要持久化 true 持久化队列 false 不持久化
boolean exclusive,是否独占队列 true独占队列 false 不独占
boolean autoDelete, 是否消费完后自动删除队列 true自动删除 false 不自动删除
Map arguments 额外附加参数

public com.rabbitmq.client.AMQP.Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map arguments) throws IOException {
    com.rabbitmq.client.AMQP.Queue.DeclareOk ok = this.delegate.queueDeclare(queue, durable, exclusive, autoDelete, arguments);
    this.recordQueue(ok, queue, durable, exclusive, autoDelete, arguments);
    return ok;
}

String exchange, 交换机名称
String routingKey,队列名称
BasicProperties props,传递消息的额外设置

byte[] body 消息的具体内容

public void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException {
    this.delegate.basicPublish(exchange, routingKey, props, body);
}
2.3 第二种模式(工作)

Work queues,也被称为(task queues),任务模型。当消息处理比较耗时,可能生产消息的速度会远远大于消息的消费速度。
长此以往,消息就会堆积越来越多,无法及时处理。此时就可以使用work模型;让多个消费者绑定一个队列,共同消费队列中的消息。
队列中的消息一旦消费,就会消失,因此任务是不会被重复执行的。

角色:

P生产者:任务的发布者C1消费者1,领取任务并且完成任务,假设完成速度较慢C2消费者2,领取任务并且完成任务,假设完成速度较快
生产者

public class Provider {
    public static void main(String[] args) throws IOException {
        Connection connection = RabbitMQUtils.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare("work",true,false,false,null);
        for (int i = 0; i < 10; i++) {
            channel.basicPublish("","work",null,("hello work queue " +i).getBytes());
        }
        RabbitMQUtils.closeConnectionAndChannel(connection,channel);
    }
}

消费者1

public class Customer1 {
    public static void main(String[] args) throws IOException {
        Connection connection = RabbitMQUtils.getConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare("work",true,false,false,null);
        channel.basicConsume("work",true,new DefaultConsumer(channel){
            public void  handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte[] body){
                System.out.println("消费者-1:"+new String(body));
            }
        });
    }
}

消费者2

public class Customer1 {
    public static void main(String[] args) throws IOException {
        Connection connection = RabbitMQUtils.getConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare("work",true,false,false,null);
        channel.basicConsume("work",true,new DefaultConsumer(channel){
            public void  handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte[] body){
                System.out.println("消费者-1:"+new String(body));
            }
        });
    }
}

信息打印


总结:
经过测试,我们发现默认情况下,rabbitMQ将按顺序将每个消息发送到下一个使用者。平均而言,每个消费者都会收到相同数量的消息。这种分发消息的方式称为循环。

消息的自动确认机制
关闭自动确认实现能者多劳

生产者不变~消费者3

public class Customer3 {
    public static void main(String[] args) throws IOException {
        Connection connection = RabbitMQUtils.getConnection();
        final Channel channel = connection.createChannel();

        channel.basicQos(1);//每次只能消费一个信息
        channel.queueDeclare("work",true,false,false,null);
        //参数2 自动确认 false关闭自动确认 true 开启自动确认
        channel.basicConsume("work",false,new DefaultConsumer(channel){
            public void  handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte[] body) throws IOException {
                System.out.println("消费者-3:"+new String(body));
                //手动确认 ~ 参数1 确认队列中那个具体消息 参数2 是否开启多个消息同时确认
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        });
    }
}

生产者不变~消费者4

public class Customer4 {
    public static void main(String[] args) throws IOException {
        Connection connection = RabbitMQUtils.getConnection();
        final Channel channel = connection.createChannel();

        channel.basicQos(1);//每次只能消费一个信息
        channel.queueDeclare("work",true,false,false,null);
        //参数2 自动确认 false关闭自动确认 true 开启自动确认
        channel.basicConsume("work",false,new DefaultConsumer(channel){
            public void  handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte[] body) throws IOException {
                try {
                   Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("消费者-4:"+new String(body));
                //手动确认 ~ 参数1 确认队列中那个具体消息 参数2 是否开启多个消息同时确认
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        });
    }
}

信息打印

2.4 第三种模式(fanout) ~ 广播模式

Fanout 扇出,也称广播模型

在广播模式下,消息发送流程是这样的:
可以有多个消费者

每个消费者有自己的queue(队列)每个队列都要绑定到自己的Exchange(交换机)生产者发送消息,只需要发送到交换机,交换机来决定要给那个队列,生产者无法决定。交换机把消息发送给绑定过的所有队列队列的消费者都能拿到消费。实现一条消息被多个消费者消费
生产者

public class Provider {
    public static void main(String[] args) throws IOException {
        Connection connection = RabbitMQUtils.getConnection();
        Channel channel = connection.createChannel();
        //将通道声明指定交换机 参数1 交换机名称 参数2 交换机类型 fanout 广播模式
        channel.exchangeDeclare("logs","fanout");
        //发送消息
        channel.basicPublish("logs","",null,"fanout type message".getBytes());
        RabbitMQUtils.closeConnectionAndChannel(connection,channel);
    }
}

消费者1

public class Customer1 {
    public static void main(String[] args) throws IOException {
        Connection connection = RabbitMQUtils.getConnection();
        Channel channel = connection.createChannel();
        //通道绑定交换机
        channel.exchangeDeclare("logs","fanout");
        //临时队列
        String queueName = channel.queueDeclare().getQueue();
        //绑定消息和队列
        channel.queueBind(queueName,"logs","");
        //消费
        channel.basicConsume(queueName,true,new DefaultConsumer(channel){
            public void  handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body){
                System.out.println("fanout 消费者-1:"+new String(body));
            }
        });
    }
}

消费者2

public class Customer2 {
    public static void main(String[] args) throws IOException {
        Connection connection = RabbitMQUtils.getConnection();
        Channel channel = connection.createChannel();
        //通道绑定交换机
        channel.exchangeDeclare("logs","fanout");
        //临时队列
        String queueName = channel.queueDeclare().getQueue();
        //绑定消息和队列
        channel.queueBind(queueName,"logs","");
        //消费
        channel.basicConsume(queueName,true,new DefaultConsumer(channel){
            public void  handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body){
                System.out.println("fanout 消费者-2:"+new String(body));
            }
        });
    }
}

消费者3

public class Customer3 {
    public static void main(String[] args) throws IOException {
        Connection connection = RabbitMQUtils.getConnection();
        Channel channel = connection.createChannel();
        //通道绑定交换机
        channel.exchangeDeclare("logs","fanout");
        //临时队列
        String queueName = channel.queueDeclare().getQueue();
        //绑定消息和队列
        channel.queueBind(queueName,"logs","");
        //消费
        channel.basicConsume(queueName,true,new DefaultConsumer(channel){
            public void  handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body){
                System.out.println("fanout 消费者-3:"+new String(body));
            }
        });
    }
}

测试 所有消费者都输出结果


2.5 第四种模型(Routing)~ 路由模式 2.5.1 routing之订阅模型-Direct(直连)

在fanout模型中,一条消息,会被所有订阅的队列都消费。但是,在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的exchange
在direct模型下:
队列与交换机的绑定,不能是任意绑定了,而是需要指定一个routingKey(路由key)
消息的发送方在向exchange发送消息时,也必须指定消息的routingKey。
Exchange不能把消息交给每一个绑定的队列,而是根据消息的 routing key进行判断,只有队列的routingkey 与消息的routing key完全一致,才会接收到消息

P生产者,向exchange发送消息,发送消息时,会指定一个routing key.
X exchange(交换机),接收生产者的消息,然后把消息递交给routing key 完全匹配的队列
C1消费者,其所在队列指定了需要routing key 为error的消息
C2消费者,其所在队列指定了需要routing key 为info,error,waring的消息

生产者

public class DirectProvider {
    public static void main(String[] args) throws IOException {
        Connection connection = RabbitMQUtils.getConnection();
        Channel channel = connection.createChannel();
        //声明交换机 参数1 交换机名称 参数2 direct 路由模式
        String exchange = "logs_direct";
        String type = "direct";
        channel.exchangeDeclare(exchange,type);
        //发送消息
        String routingKey = "info";
        String message = "这是direct模型发布的基于rout key:["+routingKey+"]发送的消息";
        channel.basicPublish(exchange,routingKey,null,message.getBytes());
        RabbitMQUtils.closeConnectionAndChannel(connection,channel);
    }
}

Error~消费者

public class DirectErrorCustomer {
    public static void main(String[] args) throws IOException {
        Connection connection = RabbitMQUtils.getConnection();
        Channel channel = connection.createChannel();
        //声明交换机以及交换机类型
        String exchange = "logs_direct";
        String type = "direct";
        channel.exchangeDeclare(exchange,type);
        //创建一个临时队列
        String queue = channel.queueDeclare().getQueue();
        //基于rout key绑定队列和交换机
        String routingKey = "error";
        channel.queueBind(queue,exchange,routingKey);
        channel.basicConsume(queue,true,new DefaultConsumer(channel){
            public void  handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body){
                System.out.println("Direct Error Customer 消费者:"+new String(body));
            }
        });
    }
}

All~消费者

public class DirectAllCustomer {
    public static void main(String[] args) throws IOException {
        Connection connection = RabbitMQUtils.getConnection();
        Channel channel = connection.createChannel();
        //声明交换机以及交换机类型
        String exchange = "logs_direct";
        String type = "direct";
        channel.exchangeDeclare(exchange,type);
        //创建一个临时队列
        String queue = channel.queueDeclare().getQueue();
        //基于rout key绑定队列和交换机
        String routingKey1 = "info";
        String routingKey2 = "warning";
        String routingKey3 = "error";
        channel.queueBind(queue,exchange,routingKey1);
        channel.queueBind(queue,exchange,routingKey2);
        channel.queueBind(queue,exchange,routingKey3);
        channel.basicConsume(queue,true,new DefaultConsumer(channel){
            public void  handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body){
                System.out.println("Direct All Customer 消费者:"+new String(body));
            }
        });
    }
}

测试结果
当为error时都输出,当info时只有all输出:

2.5.2 Routing之订阅模型-topic

Topic类型的exchange与direct相比,都可以根据routingkey把消息路由到不同的队列。
只不过topic类型的exchange可以让队列在绑定routing key的时候使用通配符!
这种模型routingkey一般都是由一个或者多个单词组成,多个单词之间以“.”分割。列如:item.insert

通配符

(star)匹配不多不少恰好1个词
#(hash)匹配零个或多个词
如:
audit.# 匹配audit.irs.cor或者audit.irs等
audit.
只能匹配audit.irs

生产者

public class TopicProvider {
    public static void main(String[] args) throws IOException {
        Connection connection = RabbitMQUtils.getConnection();
        Channel channel = connection.createChannel();
        //声明交换机 参数1 交换机名称 参数2 direct 路由模式
        String exchangeName = "exchange_topics";
        String type = "topic";
        channel.exchangeDeclare(exchangeName,type);
        //发送消息
        String routingKey = "user";
        String message = "这是topic模型发布的基于rout key:["+routingKey+"]发送的消息";
        channel.basicPublish(exchangeName,routingKey,null,message.getBytes());
        RabbitMQUtils.closeConnectionAndChannel(connection,channel);
    }
}

Star消费者

public class StarCustomer {
    public static void main(String[] args) throws IOException {
        Connection connection = RabbitMQUtils.getConnection();
        Channel channel = connection.createChannel();
        //声明交换机以及交换机类型
        String exchangeName = "exchange_topics";
        String type = "topic";
        channel.exchangeDeclare(exchangeName,type);
        //创建一个临时队列
        String queue = channel.queueDeclare().getQueue();
        //基于rout key绑定队列和交换机
        String routingKey = "user.*";
        channel.queueBind(queue,exchangeName,routingKey);
        channel.basicConsume(queue,true,new DefaultConsumer(channel){
            public void  handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body){
                System.out.println("topic *(star) Customer 消费者:"+new String(body));
            }
        });
    }
}

Hash消费者

public class HashCustomer {
    public static void main(String[] args) throws IOException {
        Connection connection = RabbitMQUtils.getConnection();
        Channel channel = connection.createChannel();
        //声明交换机以及交换机类型
        String exchangeName = "exchange_topics";
        String type = "topic";
        channel.exchangeDeclare(exchangeName,type);
        //创建一个临时队列
        String queue = channel.queueDeclare().getQueue();
        //基于rout key绑定队列和交换机
        String routingKey = "user.#";
        channel.queueBind(queue,exchangeName,routingKey);
        channel.basicConsume(queue,true,new DefaultConsumer(channel){
            public void  handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body){
                System.out.println("topic #(hash) Customer 消费者:"+new String(body));
            }
        });
    }
}

测试:*只能匹配一个,#匹配一个或者多个

2 Springboot整合

后续更新持续更新
添加依赖及配置
RabbitTemplate

3 应用场景 3.1 异步处理 3.2 应用解耦 3.3 流量消峰 4 MQ常见面试问题 4.1 RabbitMQ 如何保证消息不丢失?

第一种:生产者弄丢了数据。生产者将数据发送到 RabbitMQ 的时候,可能数据就在半路给搞丢了,因为网络问题啥的,都有可能。(开启/confirm/i模式)
第二种:RabbitMQ 弄丢了数据。MQ还没有持久化自己挂了(开启rabbitMQ持久化)
第三种:消费端弄丢了数据。刚消费到,还没处理,结果进程挂了,比如重启了。(关闭rabbitMQ自动ACK)

4.2 Rabbitmq保证消息不被重复消费

1.当拿到这个消息做数据库的insert操作。那就容易了,给这个消息做一个唯一主键,
那么就算出现重复消费的情况,就会导致主键冲突,避免数据库出现脏数据。
2.当拿到这个消息做redis的set的操作,那就容易了,不用解决,因为你无论set几次结果都是一样的,set操作本来就算幂等操作。
3.如果上面两种情况还不行,准备一个第三方存储,来做消费记录。以redis为例,给消息分配一个全局id,
只要消费过该消息,将以K-V形式写入redis。那消费者开始消费前,先去redis中查询有没消费记录即可。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/746069.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号