栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Rabbitmq简单模式和消息的手动应答以及Rabbitmq持久化

Rabbitmq简单模式和消息的手动应答以及Rabbitmq持久化

Hello模式

在idea中新建一个空工程

设置项目

添加模块

选择模块类型

设置模块

在pom文件中导入jar包依赖

书写生产者代码:

public class HelloProduct {

//    创建队列名称

    public static  final String queue_name = "hello";
    //发消息
    public static void main(String[] args) throws Exception{
        //创建一个连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //设置工厂ip 连接rabbitmq队列
        factory.setHost("192.168.14.131");
        //连接用户名
        factory.setUsername("admin");
        //密码
        factory.setPassword("admin");
        //创建连接
        Connection connection = factory.newConnection();
        //获取通道
        Channel channel = connection.createChannel();
        
        channel.queueDeclare(queue_name,false,false,false,null);
        //发消息
        String message = "hello world";//初次使用

        
        channel.basicPublish("",queue_name,null,message.getBytes());
        System.out.println("消息发送完毕");
    }

}

启动运行:

在web端找到刚才发送的队列:

书写消费者:

public static void main(String[] args) throws Exception{
        //创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //设置ip
        factory.setHost("192.168.14.131");
        factory.setUsername("admin");
        factory.setPassword("admin");

        Connection connection = factory.newConnection();

        Channel channel = connection.createChannel();


         System.out.println("消费者开始消费消息");
        //声明 接收消息
        DeliverCallback deliverCallback = (consumerTag,message) ->{

            System.out.println(new String(message.getBody()));
        };

        //取消消息时的回调
        CancelCallback cancelCallback = consumerTag ->{
            System.out.println("消息消费被中断");
        };
        
        channel.basicConsume(queue_name,true,deliverCallback,cancelCallback);
    }
}

运行消费者开始消费生产者生成的消息

由于在生产者和消费者中都需要建立连接工厂,代码重复,所以在这里可以将建立连接工厂的代码提取出来当做一个工具类来使用:

消息的轮询分配
创建工作线程队列:

public class Work1 {
    private static final String queue_name= "helllo";

    public static void main(String[] args) throws Exception{
        Channel channel = Rabbitmqutil.getChannel();

        //消息的接收
        DeliverCallback deliverCallback =(consumerTag,message)->{
            System.out.println("接收到的消息:"+new String(message.getBody()));
        };
        //消息接收被取消时 执行下面的内容
        CancelCallback cancelCallback =(consumerTag)->{
            System.out.println(consumerTag +"消费者取消消费接口回调逻辑");
        };
        System.out.println("C1接收消息");
        channel.basicConsume(queue_name,true,deliverCallback,cancelCallback);

    }
}

运行main方法
在这里插入图片描述

设置允许多线程启动


再次启动main方法


多线程启动成功

书写生产者代码:

public class WorkProduct {
    private static final String queue_name= "hello";

    public static void main(String[] args) throws Exception{
        Channel channel = Rabbitmqutil.getChannel();

        channel.queueDeclare(queue_name,false,false,false,null);


        System.out.println("请输入发送的消息:");
        //从控制台接收信息
        Scanner scanner = new Scanner(System.in);
        while (scanner.hasNext()){
            System.out.println("请输入发送的消息:");
            
            String message = scanner.next();
            channel.basicPublish("",queue_name,null,message.getBytes());
        }
    }
}

启动生产者,并在控制台中输入发送的4条消息:
A1,A2,A3,A4

根据消息的轮询消费,应该是C1消费A1和A3,C2消费A2和A4,查看结果是否符合预期:


符合预期,说明消息消费轮询验证成功

消息应答

Message acknowledgment(消息应答)
执行一个任务可能需要花费几秒钟,你可能会担心如果一个消费者在执行任务过程中挂掉了。一旦RabbitMQ将消息分发给了消费者,就会从内存中删除。在这种情况下,如果正在执行任务的消费者宕机,会丢失正在处理的消息和分发给这个消费者但尚未处理的消息。
但是,我们不想丢失任何任务,如果有一个消费者挂掉了,那么我们应该将分发给它的任务交付给另一个消费者去处理。
为了确保消息不会丢失,RabbitMQ支持消息应答。消费者发送一个消息应答,告诉RabbitMQ这个消息已经接收并且处理完毕了。RabbitMQ就可以删除它了。

如果一个消费者挂掉却没有发送应答,RabbitMQ会理解为这个消息没有处理完全,然后交给另一个消费者去重新处理。这样,你就可以确认即使消费者偶尔挂掉也不会丢失任何消息了。
没有任何消息超时限制;只有当消费者挂掉时,RabbitMQ才会重新投递。即使处理一条消息会花费很长的时间。

消息应答是默认打开的。我们通过显示的设置autoAsk=true关闭这种机制。现即自动应答开,一旦我们完成任务,消费者会自动发送应答。通知RabbitMQ消息已被处理,可以从内存删除。如果消费者因宕机或链接失败等原因没有发送ACK(不同于ActiveMQ,在RabbitMQ里,消息没有过期的概念),则RabbitMQ会将消息重新发送给其他监听在队列的下一个消费者。

创建生产者:

* 消息在手动应答时是不丢失 放回队列中会重新消费
 **/

public class Task {
    //队列名称
    public static final  String task_queue_name = "ack_queue";

    public static void main(String[] args) throws Exception{

        //创建通道
        Channel channel = Rabbitmqutil.getChannel();
        //声明队列
        channel.queueDeclare(task_queue_name,false,false,false,null);
        //从控制台中输入信息
        Scanner scanner = new Scanner(System.in);
        while(scanner.hasNext()){
            String message = scanner.next();
            channel.basicPublish("",task_queue_name,null,message.getBytes("UTF-8"));
            System.out.println("生产者发出消息:"+message);
        }
    }
}

创建一个睡眠工具类,以便于直接对不同的消费者进行不同时间的睡眠执行:

public class SleepUtil {
    public static void sleep(int second){
        try{
            Thread.sleep(1000*second);
        }catch (InterruptedException e){
            Thread.currentThread().interrupt();
        }
    }
}

创建消费者work1,沉睡时间设置为1s:

public class Work1 {

    //队列名称
    public static  final String task_queue_name = "ack_queue";

    //接收消息
    public static void main(String[] args) throws Exception{
        //创建通道
        Channel channel = Rabbitmqutil.getChannel();
        System.out.println("C1等待接收消息,处理时间短");

        DeliverCallback deliverCallback = (consumerTag,message)->{
            //沉睡1s
            SleepUtil.sleep(1);
            System.out.println("接收到的消息"+new String(message.getBody(),"UTF-8"));
            //手动应答
            
            channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
        };

        //采用手动应答
        boolean autoAck = false;
        channel.basicConsume(task_queue_name,autoAck,deliverCallback,(consumerTag,message)->{
            System.out.println(consumerTag+"消费者取消消费接口回调逻辑");
        });
    }
}

创建消费者2,沉睡时间设置为30s:

public class Work2 {

    //队列名称
    public static  final String task_queue_name = "ack_queue";

    //接收消息
    public static void main(String[] args) throws Exception{
        //创建通道
        Channel channel = Rabbitmqutil.getChannel();
        System.out.println("C2等待接收消息,处理时间长");

        DeliverCallback deliverCallback = (consumerTag,message)->{
            //沉睡1s
            SleepUtil.sleep(30);
            System.out.println("接收到的消息"+new String(message.getBody(),"UTF-8"));
            //手动应答
            
            channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
        };

        //采用手动应答
        boolean autoAck = false;
        channel.basicConsume(task_queue_name,autoAck,deliverCallback,(consumerTag,message)->{
            System.out.println(consumerTag+"消费者取消消费接口回调逻辑");
        });
    }
}

首先执行生产者,由生产者首先创建队列:

再分别执行消费者1和消费者2:


由生产者发送消息:11和22

可以看到消费者1很快就接收到了消息:

消费者2在生产者发送22消息后30s才收到消息:

测试:
生产者发送两条消息,“33"和"44”,消费者1在1s后收到消息33,消费者2应该在30s后收到消息44,那么如果消费者2在30s内还没有收到消息时候出现宕机情况,那么消息44是否会丢失,还是会被其他的消费者(消费者1)所消费掉呢?
开始测试,生产者发送消息33和44

消费者1很快收到消息33

在消费者2还未收到消息44之前手动使其宕机

然后查看正常运行的消费者1,可以看到消费者1接收到消息44将其消费掉

由此可以得出结论,消息并未丢失,而是由其他消费者消费掉

Rabbitmq持久化方式

消息持久化和队列持久化的联系:
队列设置为持久化,那么在RabbitMQ重启之后,持久化的队列也会存在,并会保持和重启前一致的队列参数。
消息设置为持久化,在RabbitMQ重启之后,持久化的消息也会存在。

队列持久化

可以看到上面截图中ack_queue这个队列的Features属性为null,而其他下面的队列都有个D(durable),也就是该队列是非持久化的,一旦服务器中的mq宕机或者重启,该队列就会消失,这个时候我们就需要设置该队列持久化来防止mq宕机或者重启时该队列消失的问题

在我们前面创建的生产者task中将ack_queue队列更改为持久化队列

这个时候启动生产者会报错:

也就是说mq中该队列原先已经为非持久化,不可以直接更改为持久化,此时我们可以直接在mq中将该队列删除掉,然后重新创建该队列:


可以看到队列列表中已经没有该队列了:

重新启动生产者创建ack_queue队列:

消息持久化
队列发送消息一般存储在内存中,mq宕机和重启后消息也会消失,我们可以将消息进行持久化,将消息存储到磁盘中实现持久化

不公平分发

RabbitMQ分发消息默认采用的轮训分发,但是在某种场景下这种策略并不是很好,比方说有两个消费者在处理任务,其中有个消费者1处理任务的速度非常快,而另外一个消费者 2处理速度却很慢,这个时候我们还是采用轮训分发的化就会到这处理速度快的这个消费者很大-部分时间处于空闲状态,而处理慢的那个消费者-直在干活, 这种分配方式在这种情况下其实就不太好,但是RabbitMQ并不知道这种情况它依然很公平的进行分发。

我们可以设置参数channel.basicQos(1);
在两个消费者中都进行设置:

启动生产者和消费者进行发送消息测试:

沉睡时间短的消费者1消费消息多

沉睡时间长的消费者2消费消息少,遵循能者多劳规则

预取值
表示分发时候,每个消费者可以分配到多少条消息,预取值为多少该消费者通道中就可以堆积多少条消息供其消费
设置消费者1

设置消费者2:

分别启动生产者消费者并生产消息:
生产者发送20条消息:

消费者1:

由于消费者1处理消息快,在消费者2堆积满5条或者还未满5条时消费者1就已经消费完队列中堆积的一条消息,队列中空出一个位置还可以堆积,于是造成消费者1多消费消息

但是仍然可以看到消费者2中堆积了5条消息


最后在消费者2消费完堆积的5条消息中的一条而又空出来一个位置后队列中就又会存进去等待消费

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/582357.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号