栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

消息中间件rabbitMQ之第二种消息模型(work quene)

消息中间件rabbitMQ之第二种消息模型(work quene)

Work queues,也被称为(Task queues),任务模型。当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消费速度。长此以往,消息就会堆积越来越多,无法及时处理。此时就可以使用work 模型:让多个消费者绑定到一个队列,共同消费队列中的消息。队列中的消息一旦消费,就会消失,因此任务是不会被重复执行的。

角色:

P:生产者:任务的发布者

C1:消费者-1,领取任务并且完成任务,假设完成速度较慢

C2:消费者-2:领取任务并完成任务,假设完成速度快

1. 开发生产者
package com.demo.workquene;

import com.demo.utils.RabbitMQUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

import java.io.IOException;
import java.util.concurrent.TimeoutException;


public class Provider {
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection rabbitConnection = RabbitMQUtils.getRabbitConnection();
        Channel channel = rabbitConnection.createChannel();
        channel.queueDeclare("work",true,false,false,null);
        for (int i = 0; i < 10; i++) {
            channel.basicPublish("","work",null,(i+"hello work quene").getBytes());

        }
        RabbitMQUtils.closeRabbitConnection(channel,rabbitConnection);
    }
}
2.开发消费者-1
package com.demo.workquene;

import com.demo.utils.RabbitMQUtils;
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;


public class Customer1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = RabbitMQUtils.getRabbitConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare("work",false,false,false,null);
        channel.basicConsume("work",false,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者-1 "+new String(body));
            
            }
        });
    }
}
3.开发消费者-2

跟消费者1的代码几乎一样

4.测试结果

 

总结:默认情况下,RabbitMQ将按顺序将每个消息发送给下一个使用者。平均而言,每个消费者都会收到相同数量的消息。这种分发消息的方式称为循环。  

5.消息自动确认机制
package com.demo.workquene;

import com.demo.utils.RabbitMQUtils;
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;


public class Customer1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = RabbitMQUtils.getRabbitConnection();
        Channel channel = connection.createChannel();
        channel.basicQos(1);//设置每次接受一个消息
        channel.queueDeclare("work",true,false,false,null);
        channel.basicConsume("work",false,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者-1 "+new String(body));
                channel.basicAck(envelope.getDeliveryTag(),false);//手动确认消息
            }
        });
    }
}

设置通道一次只能消费一个消息

关闭消息的自动确认,开启手动确认消息

通过线程睡眠来体现能者多劳的消费者,只需要在消费是睡眠1s即可

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/707161.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号