栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 数据挖掘与分析

RabbitMQ 5种模型实战使用,干货!!!

RabbitMQ 5种模型实战使用,干货!!!

核心依赖:


    com.rabbitmq
    amqp-client
    5.7.2
第一种模型:直连

 在上图的模型中,有以下概念:

P:生产者,也就是要发送消息的程序

C:消费者:消息的接受者,会一直等待消息到来。

queue:消息队列,图中红色部分。类似一个邮箱, 可以缓存消息;生产者向其中投递消息,消费者从其中取出消息。

生产者:

package com.eddie.helloworld;

import com.eddie.utiils.RabbitMQUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
import org.junit.Test;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Provider {

    //生产消息
    @Test
    public void testSendMessage() throws IOException, TimeoutException {

//        //创建连接mq的连接工厂对象
//        ConnectionFactory connectionFactory = new ConnectionFactory();
//        //设置连接rabbitmq主机
//        connectionFactory.setHost("192.168.2.2");
//        //设置端口号
//        connectionFactory.setPort(5672);
//        //设置连接那个虚拟主机
//        connectionFactory.setVirtualHost("/ems");
//        //设置访问虚拟主机的用户名和密码
//        connectionFactory.setUsername("ems");
//        connectionFactory.setPassword("123");
//
//        //获取连接对象
//        Connection connection = connectionFactory.newConnection();

        //通过工具类获取连接对象
        Connection connection = RabbitMQUtils.getConnection();

        //获取连接中通道
        Channel channel = connection.createChannel();

        //通道绑定对应消息队列
        //参数1:  队列名称 如果队列不存在自动创建
        //参数2:  用来定义队列特性是否要持久化 true 持久化队列   false 不持久化(服务重启后队列还保存,但不保存消息)
        //参数3:  exclusive 是否独占队列  true 独占队列   false  不独占
        //参数4:  autoDelete: 是否在消费完成后自动删除队列  true 自动删除  false 不自动删除
        //参数5:  额外附加参数
        channel.queueDeclare("hello",true,false,true,null);

        //发布消息
        //参数1: 交换机名称 参数2:队列名称  参数3:传递消息额外设置(如下参数保证服务重启后消息还存在,可继续消费)  参数4:消息的具体内容
        channel.basicPublish("","hello", MessageProperties.PERSISTENT_TEXT_PLAIN,"hello rabbitmq".getBytes());
        
//        channel.close();
//        connection.close();
        //调用工具类
        RabbitMQUtils.closeConnectionAndChanel(channel,connection);
    }
}

  

消费者:

package com.eddie.helloworld;

import com.eddie.utiils.RabbitMQUtils;
import com.rabbitmq.client.*;
import org.junit.Test;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer {
    //生产消息
    public static void main(String[] args) throws IOException, TimeoutException {

//        //创建连接mq的连接工厂对象
//        ConnectionFactory connectionFactory = new ConnectionFactory();
//        //设置连接rabbitmq主机
//        connectionFactory.setHost("192.168.2.2");
//        //设置端口号
//        connectionFactory.setPort(5672);
//        //设置连接那个虚拟主机
//        connectionFactory.setVirtualHost("/ems");
//        //设置访问虚拟主机的用户名和密码
//        connectionFactory.setUsername("ems");
//        connectionFactory.setPassword("123");
//
//        //获取连接对象
//        Connection connection = connectionFactory.newConnection();
//
        //通过工具类获取连接对象
        Connection connection = RabbitMQUtils.getConnection();

        //获取连接中通道
        Channel channel = connection.createChannel();

        //通道绑定对应消息队列
        //参数1:  队列名称 如果队列不存在自动创建
        //参数2:  用来定义队列特性是否要持久化 true 持久化队列   false 不持久化
        //参数3:  exclusive 是否独占队列  true 独占队列   false  不独占
        //参数4:  autoDelete: 是否在消费完成后自动删除队列  true 自动删除  false 不自动删除
        //参数5:  额外附加参数
        channel.queueDeclare("hello",true,false,true,null);


        channel.basicConsume("hello", true, new DefaultConsumer(channel){
            //最后一个参数:消息队列取出的消息
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("new String(body): " + new String(body));
            }
        });

//        channel.close();
//        connection.close();
    }
}

封装的公共类:

package com.eddie.utiils;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class RabbitMQUtils {
    private static ConnectionFactory connectionFactory;
    //private static Properties properties;
    static{
        //重量级资源  类加载执行之执行一次
        //创建连接mq的连接工厂对象
         connectionFactory = new ConnectionFactory();
        //设置连接rabbitmq主机
        connectionFactory.setHost("192.168.2.2");
        //设置端口号
        connectionFactory.setPort(5672);
        //设置连接那个虚拟主机
        connectionFactory.setVirtualHost("/ems");
        //设置访问虚拟主机的用户名和密码
        connectionFactory.setUsername("ems");
        connectionFactory.setPassword("123");

    }

    //定义提供连接对象的方法
    public static Connection getConnection() {
        try {
            return connectionFactory.newConnection();
        } catch (Exception e) {
            e.printStackTrace();
        }
        return null;
    }

    //关闭通道和关闭连接工具方法
    public static void closeConnectionAndChanel(Channel channel, Connection conn) {
        try {
            if(channel!=null) channel.close();
            if(conn!=null)   conn.close();
        } catch (Exception e) {
            e.printStackTrace();

        }
    }
}
第二种模型(work quene)

Work queues,也被称为(Task queues),任务模型。
当消息处理比较耗时的时候,可能生产消息的速度会
远远大于消息的消费速度。长此以往,消息就会堆积
越来越多,无法及时处理。此时就可以使用work 模型:
让多个消费者绑定到一个队列,共同消费队列中的消息。
队列中的消息一旦消费,就会消失,因此任务是不会被重复执行的。

 角色:

P:生产者:任务的发布者

C1:消费者-1,领取任务并且完成任务,假设完成速度较慢

C2:消费者-2:领取任务并完成任务,假设完成速度快

总结:默认情况下,RabbitMQ将按顺序将每个消息发送给 下一个使用者。平均而言,每个消费者都会收到相同数量 的消息。这种分发消息的方式称为循环。

假如生产者发送了10条消息,每个消费者将获取5条数据,其中一个消费者读取到第三条的时候发生宕机,则剩下的消息就会丢失,这样我们就需要关闭rabbitmq自动确认机制,手动去给一个确认标识,即可以解决宕机消息丢失问题,又可以产生能者多劳的效果

生产者:

package com.eddie.workqueues;

import com.eddie.utiils.RabbitMQUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.MessageProperties;
import org.junit.Test;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Provider2 {
    //生产消息
    @Test
    public void testSendMessage() throws IOException, TimeoutException {

        //通过工具类获取连接对象
        Connection connection = RabbitMQUtils.getConnection();
        //获取连接中通道
        Channel channel = connection.createChannel();

        //通道绑定对应消息队列
        //参数1:  队列名称 如果队列不存在自动创建
        //参数2:  用来定义队列特性是否要持久化 true 持久化队列   false 不持久化(服务重启后队列还保存,但不保存消息)
        //参数3:  exclusive 是否独占队列  true 独占队列   false  不独占
        //参数4:  autoDelete: 是否在消费完成后自动删除队列  true 自动删除  false 不自动删除
        //参数5:  额外附加参数
        channel.queueDeclare("hello",true,false,true,null);

        //发布消息
        //参数1: 交换机名称 参数2:队列名称  参数3:传递消息额外设置(如下参数保证服务重启后消息还存在,可继续消费)  参数4:消息的具体内容
        for(int i = 0; i<20; i++){
            channel.basicPublish("","hello", MessageProperties.PERSISTENT_TEXT_PLAIN,(i+"hello rabbitmq").getBytes());
        }
        //调用工具类
        RabbitMQUtils.closeConnectionAndChanel(channel,connection);
    }
}

消费者1:

package com.eddie.workqueues;

import com.eddie.utiils.RabbitMQUtils;
import com.rabbitmq.client.*;

import java.io.IOException;

public class Consumer1 {
    public static void main(String[] args) throws IOException {
        //通过工具类获取连接对象
        Connection connection = RabbitMQUtils.getConnection();
        //获取连接中通道
        Channel channel = connection.createChannel();
        channel.basicQos(1); //每次只能消费一条消息
        channel.queueDeclare("hello",true,false,true,null);

        //参数1:队列名称    参数2:消息确认 true消费者自动向rabbitmq确认消费 false不会自动确认
        channel.basicConsume("hello", false, new DefaultConsumer(channel){
            //最后一个参数:消息队列取出的消息
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws
                    IOException {
                System.out.println("111new String(body): " + new String(body));
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        });
    }

}

消费者2:

package com.eddie.workqueues;

import com.eddie.utiils.RabbitMQUtils;
import com.rabbitmq.client.*;

import java.io.IOException;

import static java.lang.Thread.sleep;

public class Consumer2 {
    public static void main(String[] args) throws IOException {
        //通过工具类获取连接对象
        Connection connection = RabbitMQUtils.getConnection();
        //获取连接中通道
        Channel channel = connection.createChannel();
        channel.basicQos(1); //每次只能消费一条消息
        channel.queueDeclare("hello",true,false,true,null);
        //参数1:队列名称    参数2:消息确认 true消费者自动向rabbitmq确认消费 false不会自动确认
        channel.basicConsume("hello", false, new DefaultConsumer(channel){
            //最后一个参数:消息队列取出的消息
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws
                    IOException {
                try {
                    sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("222new String(body): " + new String(body));
                //手动确认  参数1:手动确认消息标识   参数2:是否开启多个消息同时确认
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        });
    }

}

设置通道一次只能消费一个消息 关闭消息的自动确认,开启手动确认消息

 第三种模型(fanout)
添加了exchange,交换机功能:
生产者:从之前直接将消息发送到队列改变为生产者将消息发送到交换机
消费者:通过临时队列绑定交换机,从临时队列进行消息消费

先启动消费者监听,再启动生产者:
a)消费者是无法订阅或者获取不存在的MessageQueue中信息。
b)消息被Exchange接受以后,如果没有匹配的Queue,则会被丢弃。

就管理页面而言

//临时队列
String queueName = channel.queueDeclare().getQueue();

String queueName = channel.queueDeclare().getQueue() 产生的临时队列:AutoDelete,Exclusive(自动删除,独占队列),所以我们可以自定义临时队列,从而实现消息确认机制

Provider:

package com.eddie.exchange;

import com.eddie.utiils.RabbitMQUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

import java.io.IOException;
public class Provider {

    private final static String EXCHANGE_NAME = "test_exchange_fanout";

    public static void main(String[] args) throws IOException {
        //获取连接对象
        Connection connection = RabbitMQUtils.getConnection();
        Channel channel = connection.createChannel();

        //将通道声明指定交换机   //参数1: 交换机名称    参数2: 交换机类型  fanout 广播类型
        channel.exchangeDeclare(EXCHANGE_NAME,"fanout");

        //发送消息
        for(int i = 0; i<10; i++) {
            channel.basicPublish(EXCHANGE_NAME, "", null, (i+"fanout type message").getBytes());
        }
        System.out.println("Sent: " + "fanout type message");
        //释放资源
        RabbitMQUtils.closeConnectionAndChanel(channel,connection);
    }
}

 Consumer1:

package com.eddie.exchange;

import com.rabbitmq.client.*;
import com.eddie.utiils.RabbitMQUtils;

import java.io.IOException;

public class Consumer1 {
    private final static String QUEUE_NAME = "test_queue_fanout_1";
    private final static String EXCHANGE_NAME = "test_exchange_fanout";

    public static void main(String[] args) throws IOException {
        //获取连接对象
        Connection connection = RabbitMQUtils.getConnection();
        Channel channel = connection.createChannel();

        //通道绑定交换机
        channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
        //声明队列
        channel.queueDeclare(QUEUE_NAME, false, true, true, null);
        //临时队列
        //String queueName = channel.queueDeclare().getQueue();
        //绑定交换机和队列
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"");
        channel.basicQos(1);
        //消费消息,第二个参数:关闭消息确认机制
        channel.basicConsume(QUEUE_NAME,false,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者1: "+new String(body));
                //手动确认
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        });
    }
}

Consumer2

package com.eddie.exchange;

import com.rabbitmq.client.*;
import com.eddie.utiils.RabbitMQUtils;

import java.io.IOException;

public class Consumer2 {
    private final static String QUEUE_NAME = "test_queue_fanout_2";
    private final static String EXCHANGE_NAME = "test_exchange_fanout";

    public static void main(String[] args) throws IOException {
        //获取连接对象
        Connection connection = RabbitMQUtils.getConnection();
        Channel channel = connection.createChannel();
        //通道绑定交换机
        channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
        //声明队列
        channel.queueDeclare(QUEUE_NAME, false, true, true, null);
        //临时队列
        //String queueName = channel.queueDeclare().getQueue();
        //绑定交换机和队列
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"");
        channel.basicQos(1);
        //消费消息,第二个参数:关闭消息确认机制
        channel.basicConsume(QUEUE_NAME,false,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者1: "+new String(body));
                //手动确认
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        });
    }
}

消费者中通道也可以不用声明交换机,仅需要 将队列与已有交换机绑定就可获取消息(待考证,但官网在消费者中进行了交换机声明,为了保险起见还是加上为好)!!!

第四种模型(Routing)

Routing 之订阅模型-Direct(直连)

在Fanout模式中,一条消息,会被所有订阅的队列都消费。
但是,在某些场景下,我们希望不同的消息被不同的队列消费。
这时就要用到Direct类型的Exchange。
在Direct模型下:队列与交换机的绑定,不能是任意绑定了,
而是要指定一个RoutingKey(路由key)
消息的发送方在 向 Exchange发送消息时,
也必须指定消息的 RoutingKey。

Exchange不再把消息交给每一个绑定的队列,
而是根据消息的Routing Key进行判断,
只有队列的Routingkey与消息的 Routing key完全一致,
才会接收到消息

 

P:生产者,向Exchange发送消息,发送消息时,
会指定一个routing key。
X:Exchange(交换机),接收生产者的消息,
然后把消息递交给 与routing key完全匹配的队列
C1:消费者,其所在队列指定了需要routing key 为 error 的消息
C2:消费者,其所在队列指定了需要routing key 为 info、
error、warning 的消息

 provider:

package com.eddie.Routing_direct;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.eddie.utiils.RabbitMQUtils;

import java.io.IOException;

public class Provider {
    private final static String EXCHANGE_NAME = "test_exchange_direct";
    public static void main(String[] args) throws IOException {
        //获取连接对象
        Connection connection = RabbitMQUtils.getConnection();
        //获取连接通道对象
        Channel channel = connection.createChannel();
        //通过通道声明交换机  参数1:交换机名称  参数2:direct  路由模式
        channel.exchangeDeclare(EXCHANGE_NAME,"direct");
        //发送消息
        String routingkey = "info";
        for(int i = 0; i<10; i++) {
            channel.basicPublish(EXCHANGE_NAME, routingkey, null, ("这是direct模型发布的基于route key: [" + routingkey + "] 发送的消息").getBytes());
        }
        //关闭资源
        RabbitMQUtils.closeConnectionAndChanel(channel,connection);
    }
}

Consumer1:

package com.eddie.Routing_direct;

import com.eddie.utiils.RabbitMQUtils;
import com.rabbitmq.client.*;

import java.io.IOException;

public class Consumer1 {
    private final static String QUEUE_NAME = "test_queue_direct_1";
    private final static String EXCHANGE_NAME = "test_exchange_direct";

    public static void main(String[] args) throws IOException {
        //获取连接对象
        Connection connection = RabbitMQUtils.getConnection();
        Channel channel = connection.createChannel();

        //通道绑定交换机
        channel.exchangeDeclare(EXCHANGE_NAME,"direct");
        //声明队列
        channel.queueDeclare(QUEUE_NAME, false, true, true, null);
        //临时队列
        //String queueName = channel.queueDeclare().getQueue();
        //绑定交换机和队列 ,第三个参数:路由键类型
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"info");
        channel.basicQos(1);
        //消费消息,第二个参数:关闭消息确认机制
        channel.basicConsume(QUEUE_NAME,false,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者1: "+new String(body));
                //手动确认
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        });
    }
}

Consumer2:

package com.eddie.Routing_direct;

import com.eddie.utiils.RabbitMQUtils;
import com.rabbitmq.client.*;

import java.io.IOException;

public class Consumer2 {
    private final static String QUEUE_NAME = "test_queue_direct_2";
    private final static String EXCHANGE_NAME = "test_exchange_direct";

    public static void main(String[] args) throws IOException {
        //获取连接对象
        Connection connection = RabbitMQUtils.getConnection();
        Channel channel = connection.createChannel();
        //通道绑定交换机
        channel.exchangeDeclare(EXCHANGE_NAME,"direct");
        //声明队列
        channel.queueDeclare(QUEUE_NAME, false, true, true, null);
        //临时队列
        //String queueName = channel.queueDeclare().getQueue();
        //绑定交换机和队列 ,第三个参数:路由键类型
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"info");
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"error");
        channel.basicQos(1);
        //消费消息,第二个参数:关闭消息确认机制
        channel.basicConsume(QUEUE_NAME,false,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者1: "+new String(body));
                //手动确认
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        });
    }
}

Consumer3:

package com.eddie.Routing_direct;

import com.eddie.utiils.RabbitMQUtils;
import com.rabbitmq.client.*;

import java.io.IOException;

public class Consumer3 {
    private final static String QUEUE_NAME = "test_queue_direct_3";
    private final static String EXCHANGE_NAME = "test_exchange_direct";

    public static void main(String[] args) throws IOException {
        //获取连接对象
        Connection connection = RabbitMQUtils.getConnection();
        Channel channel = connection.createChannel();
        //通道绑定交换机
        channel.exchangeDeclare(EXCHANGE_NAME,"direct");
        //声明队列
        channel.queueDeclare(QUEUE_NAME, false, true, true, null);
        //临时队列
        //String queueName = channel.queueDeclare().getQueue();
        //绑定交换机和队列 ,第三个参数:路由键类型
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"error");
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"warning");
        channel.basicQos(1);
        //消费消息,第二个参数:关闭消息确认机制
        channel.basicConsume(QUEUE_NAME,false,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者1: "+new String(body));
                //手动确认
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        });
    }
}

结论:只有队列绑定”info“路由键的才会收到消息

 Routing 之订阅模型-Topic
Topic类型的Exchange与Direct相比,都是可以根据RoutingKey
把消息路由到不同的队列。只不过Topic类型Exchange可以让
队列在绑定Routing key的时候使用通配符!这种模型Routingkey 
一般都是由一个或多个单词组成,多个单词之间以”.”分割,
例如: item.insert

Provider:

package com.eddie.Routing_topic;

import com.eddie.utiils.RabbitMQUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

import java.io.IOException;

public class Provider {
    private final static String EXCHANGE_NAME = "test_exchange_topic";
    public static void main(String[] args) throws IOException {
        //获取连接对象
        Connection connection = RabbitMQUtils.getConnection();
        //获取连接通道对象
        Channel channel = connection.createChannel();
        //通过通道声明交换机  参数1:交换机名称  参数2:direct  路由模式
        channel.exchangeDeclare(EXCHANGE_NAME,"topic");
        //发送消息
        String routingkey = "save.user.delete";
        for(int i = 0; i<10; i++) {
            channel.basicPublish(EXCHANGE_NAME, routingkey, null, ("这是topic模型发布的基于route key: [" + routingkey + "] 发送的消息").getBytes());
        }
        //关闭资源
        RabbitMQUtils.closeConnectionAndChanel(channel,connection);
    }
}

 Consumer1:

package com.eddie.Routing_topic;

import com.eddie.utiils.RabbitMQUtils;
import com.rabbitmq.client.*;

import java.io.IOException;

public class Consumer1 {
    private final static String QUEUE_NAME = "test_queue_topic_1";
    private final static String EXCHANGE_NAME = "test_exchange_topic";

    public static void main(String[] args) throws IOException {
        //获取连接对象
        Connection connection = RabbitMQUtils.getConnection();
        Channel channel = connection.createChannel();

        //通道绑定交换机
        channel.exchangeDeclare(EXCHANGE_NAME,"topic");
        //声明队列
        channel.queueDeclare(QUEUE_NAME, false, true, true, null);
        //临时队列
        //String queueName = channel.queueDeclare().getQueue();
        //绑定交换机和队列 ,第三个参数:路由键类型
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"*.user.*");
        channel.basicQos(1);
        //消费消息,第二个参数:关闭消息确认机制
        channel.basicConsume(QUEUE_NAME,false,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者1: "+new String(body));
                //手动确认
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        });
    }
}

 Consumer2:

package com.eddie.Routing_topic;

import com.eddie.utiils.RabbitMQUtils;
import com.rabbitmq.client.*;

import java.io.IOException;

public class Consumer2 {
    private final static String QUEUE_NAME = "test_queue_topic_2";
    private final static String EXCHANGE_NAME = "test_exchange_topic";

    public static void main(String[] args) throws IOException {
        //获取连接对象
        Connection connection = RabbitMQUtils.getConnection();
        Channel channel = connection.createChannel();

        //通道绑定交换机
        channel.exchangeDeclare(EXCHANGE_NAME,"topic");
        //声明队列
        channel.queueDeclare(QUEUE_NAME, false, true, true, null);
        //临时队列
        //String queueName = channel.queueDeclare().getQueue();
        //绑定交换机和队列 ,第三个参数:路由键类型
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"*.user.#");
        channel.basicQos(1);
        //消费消息,第二个参数:关闭消息确认机制
        channel.basicConsume(QUEUE_NAME,false,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者2: "+new String(body));
                //手动确认
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        });
    }
}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/279061.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号