栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

RabbitMQ 的五种工作模式实现(附代码)

RabbitMQ 的五种工作模式实现(附代码)

一、Work queues 工作队列模式

客户端P生产消息储存到队列中,在一个队列中有2个消费者C1和C2,那么消费者之间对于同一个消息的关系是竞争的关系对于任务过重或任务较多情况使用工作队列可以提高任务处理的速度。例如:短信服务部署多个,只需要有一个节点成功发送即可
模拟短信发送通知

常量类

public class RabbitConstant {
    //队列名称
    public static final String QUEUE_SMS = "messege";
}

生产者

public class Provider {

    public static void main(String[] args) throws IOException, TimeoutException {

        //建立与MQ的连接,基于TCP长连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("服务器地址");
        connectionFactory.setPort(5672);//5672是RabbitMQ的默认端口号
        connectionFactory.setUsername("MQ虚拟机用户");
        connectionFactory.setPassword("MQ虚拟机用户密码");
        connectionFactory.setVirtualHost("虚拟机");
        Connection conn=connectionFactory.newConnection();
        //创建通信“通道”,相当于TCP中的虚拟连接
        Channel channel = conn.createChannel();

        //创建队列,声明并创建一个队列,如果队列已存在,则使用这个队列
        //第一个参数:队列名称ID
        //第二个参数:是否持久化,false对应不持久化数据,MQ停掉数据就会丢失
        //第三个参数:是否队列私有化,false则代表所有消费者都可以访问,true代表只有第一次拥有它的消费者才能一直使用,其他消费者不让访问
        //第四个:是否自动删除,false代表连接停掉后不自动删除掉这个队列
        //其他额外的参数, null
        channel.queueDeclare(RabbitConstant.QUEUE_SMS, false, false, false, null);

        for(int i = 1 ; i <= 100 ; i++) {
            SMS sms = new SMS("你好" + i, "10000" + i, "发送成功");
            String jsonSMS = new Gson().toJson(sms);
            //四个参数
            //exchange 交换机,暂时用不到,在后面进行发布订阅时才会用到
            //队列名称
            //额外的设置属性
            //最后一个参数是要传递的消息字节数组
            channel.basicPublish("" , RabbitConstant.QUEUE_SMS , null , jsonSMS.getBytes());
        }
        System.out.println("发送数据成功");
        //关闭通道
        channel.close();
        //关闭TCP连接
        conn.close();
    }
}

消费者1

public class ConsumerOne {

    public static void main(String[] args) throws IOException, TimeoutException {


        //建立与MQ的连接,基于TCP长连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("服务器地址");
        connectionFactory.setPort(5672);//5672是RabbitMQ的默认端口号
        connectionFactory.setUsername("MQ虚拟机用户");
        connectionFactory.setPassword("MQ虚拟机用户密码");
        connectionFactory.setVirtualHost("虚拟机");
        Connection conn=connectionFactory.newConnection();
        //创建通信“通道”,相当于TCP中的虚拟连接
        final Channel channel = conn.createChannel();

        //创建队列,声明并创建一个队列,如果队列已存在,则使用这个队列
        //第一个参数:队列名称ID
        //第二个参数:是否持久化,false对应不持久化数据,MQ停掉数据就会丢失
        //第三个参数:是否队列私有化,false则代表所有消费者都可以访问,true代表只有第一次拥有它的消费者才能一直使用,其他消费者不让访问
        //第四个:是否自动删除,false代表连接停掉后不自动删除掉这个队列
        //其他额外的参数, null
        channel.queueDeclare(RabbitConstant.QUEUE_SMS, false, false, false, null);

        //如果不写basicQos(1),则自动MQ会将所有请求平均发送给所有消费者
        //basicQos,MQ不再对消费者一次发送多个请求,而是消费者处理完一个消息后(确认后),在从队列中获取一个新的
        channel.basicQos(1);//处理完一个取一个

        //从MQ服务器中获取数据

        //创建一个消息消费者
        //第一个参数:队列名
        //第二个参数代表是否自动确认收到消息,false代表手动编程来确认消息,这是MQ的推荐做法
        //第三个参数要传入DefaultConsumer的实现类
        channel.basicConsume(RabbitConstant.QUEUE_SMS , false , new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String jsonSMS = new String(body);
                System.out.println("SMSSender1-短信发送成功:" + jsonSMS);

                try {
                    Thread.sleep(10);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                channel.basicAck(envelope.getDeliveryTag() , false);
            }
        });
    }


}

消费者2

public class ConsumerTwo {

    public static void main(String[] args) throws IOException, TimeoutException {


        //建立与MQ的连接,基于TCP长连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("服务器地址");
        connectionFactory.setPort(5672);//5672是RabbitMQ的默认端口号
        connectionFactory.setUsername("MQ虚拟机用户");
        connectionFactory.setPassword("MQ虚拟机用户密码");
        connectionFactory.setVirtualHost("虚拟机");
        Connection conn=connectionFactory.newConnection();
        //创建通信“通道”,相当于TCP中的虚拟连接
        final Channel channel = conn.createChannel();

        //创建队列,声明并创建一个队列,如果队列已存在,则使用这个队列
        //第一个参数:队列名称ID
        //第二个参数:是否持久化,false对应不持久化数据,MQ停掉数据就会丢失
        //第三个参数:是否队列私有化,false则代表所有消费者都可以访问,true代表只有第一次拥有它的消费者才能一直使用,其他消费者不让访问
        //第四个:是否自动删除,false代表连接停掉后不自动删除掉这个队列
        //其他额外的参数, null
        channel.queueDeclare(RabbitConstant.QUEUE_SMS, false, false, false, null);

        //如果不写basicQos(1),则自动MQ会将所有请求平均发送给所有消费者
        //basicQos,MQ不再对消费者一次发送多个请求,而是消费者处理完一个消息后(确认后),在从队列中获取一个新的
        channel.basicQos(1);//处理完一个取一个

        //从MQ服务器中获取数据

        //创建一个消息消费者
        //第一个参数:队列名
        //第二个参数代表是否自动确认收到消息,false代表手动编程来确认消息,这是MQ的推荐做法
        //第三个参数要传入DefaultConsumer的实现类
        channel.basicConsume(RabbitConstant.QUEUE_SMS , false , new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String jsonSMS = new String(body);
                System.out.println("SMSSender1-短信发送成功:" + jsonSMS);

                try {
                    Thread.sleep(10);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                channel.basicAck(envelope.getDeliveryTag() , false);
            }
        });
    }


}
二、Pub/Sub 订阅模式

整理后更新-----

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/752777.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号