栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

RabbitMQ架构、RabbitMQ的使用

RabbitMQ架构、RabbitMQ的使用

官方的简单架构图

 

Publisher - 生产者:发布消息到RabbitMQ中的Exchange

Consumer - 消费者:监听RabbitMQ中的Queue中的消息

Exchange - 交换机:和生产者建立连接并接收生产者的消息

Queue - 队列:Exchange会将消息分发到指定的Queue,Queue和消费者进行交互

Routes - 路由:交换机以什么样的策略将消息发布到Queue

简单架构图,巧记,pe,r,qc

 

一句话总结:生产者消息发布交给交换机,交换机把消息路由给队列,队列被消费者监听到有消息,消息被消费掉↑

完整架构图,下面的/test是假设等会要新建的虚拟主机的名字,默认的虚拟主机Virtual Host名字叫/

完整架构图,巧记pe,r,qc,ccc,vm

 

生产者和消费者都要先和虚拟主机Virtual Host建立连接,然后分别跟交换机和队列通过管道Channel来传输数据↑

Ready:待消费的消息总数

Unacked:待应答(待确认)的消息总数

Total:总数 Ready+Unacked

RabbitMQ的使用【重点】

 RabbitMQ的7种通讯方式↓

通讯方式

 

Java连接RabbitMQ

导入依赖

仓库https://mvnrepository.com/搜索amqp-client来下载java客户端依赖,版本一般找最多人用的即可↓


    
        com.rabbitmq
        amqp-client
        5.6.0
    

    
        junit
        junit
        4.12
    

创建工具类连接RabbitMQ,连接工厂指定参数,新建连接来得到

public class RabbitMQClient {
    public static Connection getConnection(){
        // 创建Connection工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.200.129");//这里要注意换成你自己虚拟机的ip,因为mq装在虚拟机上
        factory.setPort(5672);//注意图形化界面端口才是15672
        factory.setUsername("test");
        factory.setPassword("test");
        factory.setVirtualHost("/test");

        // 创建Connection
        Connection conn = null;
        try {
            conn = factory.newConnection();
        } catch (Exception e) {
            e.printStackTrace();
        }
        // 返回Connection
        return conn;
    }
}

测试连接

public class TestConnection {
    @Test
    public void test1(){
        Connection connection = RabbitMQClient.getConnection();
        System.out.println(connection);//amqp://test@192.168.200.129:5672//test
        try {
            connection.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}
通讯方式

1、Hello-World

一个生产者,一个默认的交换机,一个队列,一个消费者

结构图

创建生产者,创建一个channel,发布消息到exchange,指定路由规则

public class Publisher {
    @Test
    public void publish() throws Exception {
        //1. 获取Connection
        Connection connection = RabbitMQClient.getConnection();

        //2. 创建Channel
        Channel channel = connection.createChannel();

        //3. 发布消息到exchange,同时指定路由的规则
        String msg = "Hello-World!";
        // 参数1:指定exchange,使用"",使用默认的交换机
        // 参数2:指定路由的规则,使用具体的队列名称。
        // 参数3:指定传递的消息所携带的properties属性,暂时用null。
        // 参数4:指定发布的具体消息,byte[]类型
        channel.basicPublish("","HelloWorld",null,"hellomsg".getBytes());
        // Ps:exchange是不会帮你将消息持久化到本地的,Queue才会帮你持久化消息。
        System.out.println("生产者发布消息成功!");

        //4. 释放资源
        channel.close();
        connection.close();
    }
}

创建消费者,创建一个channel,创建一个队列,并且去消费当前队列

public class Consumer {
    @Test
    public void consume() throws Exception {
        //1. 获取连接对象
        Connection connection = RabbitMQClient.getConnection();

        //2. 创建channel
        Channel channel = connection.createChannel();

        //3. 声明队列-HelloWorld
        //参数1:queue - 指定队列的名称
        //参数2:durable - 当前队列是否需要持久化(true,设置false后MQ重启后队列全部删除)
        //参数3:exclusive - 是否排外(conn.close()当前队列会被自动删除,还有当前队列只能被一个消费者消费)
        //参数4:autoDelete - 如果这个队列没有消费者在消费,队列自动删除
        //参数5:arguments - 指定当前队列的其他信息
        channel.queueDeclare("HelloWorld",true,false,false,null);

        //4. 开启监听Queue
        DefaultConsumer consume = new DefaultConsumer(channel){//重写方法做接收消息
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("接收到消息:" + new String(body,"UTF-8"));
            }
        };

        //参数1:queue - 指定消费哪个队列
        //参数2:autoAck - 指定是否自动ACK (true,接收到消息后,会立即告诉RabbitMQ,MQ就会进行删除)
        //参数3:consumer - 指定消费回调
        channel.basicConsume("HelloWorld",true,consume);

        System.out.println("消费者开始监听队列!");
        // System.in.read();
        System.in.read();//让程序不停止,可以理解为死循环即可

        //5. 释放资源
        channel.close();
        connection.close();
    }
}

2、work

一个生产者,一个默认的交换机,一个队列,两个消费者

结构图

默认情况下,如有有两个消费者,生产者for循环生产10条消息,RabbitMQ平均分配,每个消费者接收5条↓

public class Publisher {
    @Test
    public void publish() throws Exception {
        //1. 获取Connection
        Connection connection = RabbitMQClient.getConnection();

        //2. 创建Channel
        Channel channel = connection.createChannel();

        //3. 发布消息到exchange,同时指定路由的规则
        //10.for
        for (int i = 0; i < 10; i++) {
            String msg = "Hello-World!"+ i;
            channel.basicPublish("","Work",null,msg.getBytes());
        }

        System.out.println("生产者发布消息成功!");

        //4. 释放资源
        channel.close();
        connection.close();
    }
}

 

public class Consumer1 {
    @Test
    public void consume() throws Exception {
        //1. 获取连接对象
        Connection connection = RabbitMQClient.getConnection();

        //2. 创建channel
        Channel channel = connection.createChannel();

        //3. 声明队列-HelloWorld
        channel.queueDeclare("Work",true,false,false,null);

        //4. 开启监听Queue
        DefaultConsumer consume = new DefaultConsumer(channel){//重写方法做接收消息
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者1号接收到消息:" + new String(body,"UTF-8"));
            }
        };

        channel.basicConsume("Work",true,consume);

        System.out.println("消费者开始监听队列!");
        // System.in.read();
        System.in.read();//让程序不停止,只有键盘录入数据才会往下走

        //5. 释放资源
        channel.close();
        connection.close();
    }
}
public class Consumer2 {
    @Test
    public void consume() throws Exception {
        //1. 获取连接对象
        Connection connection = RabbitMQClient.getConnection();

        //2. 创建channel
        Channel channel = connection.createChannel();

        //3. 声明队列-HelloWorld
        channel.queueDeclare("Work",true,false,false,null);

        //4. 开启监听Queue
        DefaultConsumer consume = new DefaultConsumer(channel){//重写方法做接收消息
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者2号接收到消息:" + new String(body,"UTF-8"));
            }
        };

        channel.basicConsume("Work",true,consume);

        System.out.println("消费者开始监听队列!");
        // System.in.read();
        System.in.read();//让程序不停止,只有键盘录入数据才会往下走

        //5. 释放资源
        channel.close();
        connection.close();
    }
}

不想默认的消息平均分配,只需要在消费者端,添加Qos能力以及更改为手动ack即可让消费者,根据自己的能力去消费指定的消息,生产者不变

消费者指定Qos和手动ack,即把上述消费者的代码的第4步和消费换成下面的整体即可

//1 指定当前消费者,一次消费多少个消息
channel.basicQos(1);

DefaultConsumer consumer = new DefaultConsumer(channel){
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("消费者1号接收到消息:" + new String(body,"UTF-8"));

        //2. 手动ack
        channel.basicAck(envelope.getDeliveryTag(),false);
    }
};

//3. 指定手动ack
channel.basicConsume("Work",false,consumer);

3、Publish/Subscribe,广播模式

一个生产者,一个交换机,两个队列,两个消费者

结构图

 

声明一个Fanout类型的exchange,并且将exchange和queue绑定在一起,绑定的方式就是直接绑定。

让生产者创建一个exchange并且指定类型,和一个或多个队列绑定到一起。

//3. 创建exchange - 绑定某一个队列
//参数1: exchange的名称
//参数2: 指定exchange的类型  FANOUT - pubsub方式用 , DIRECT - Routing方式用 , TOPIC - Topics方式用
channel.exchangeDeclare("pubsub-exchange", BuiltinExchangeType.FANOUT);//FANOUT扇形广播
channel.queueBind("pubsub-queue1","pubsub-exchange","");
channel.queueBind("pubsub-queue2","pubsub-exchange","");

消费者还是正常的监听某一个队列即可。

新建一个publish包,拷贝生产者和消费者过来修改,生产者修改一下交换机和绑定队列名↓

public class Publisher {
    @Test
    public void publish() throws Exception {
        //1. 获取Connection
        Connection connection = RabbitMQClient.getConnection();

        //2. 创建Channel
        Channel channel = connection.createChannel();

        //增加的部分↓
        //3. 创建exchange - 绑定某一个队列
        //参数1: exchange的名称
        //参数2: 指定exchange的类型  FANOUT - pubsub方式用 , DIRECT - Routing方式用 , TOPIC - Topics方式用
        channel.exchangeDeclare("pubsub-exchange", BuiltinExchangeType.FANOUT);
        channel.queueBind("pubsub-queue1","pubsub-exchange","");
        channel.queueBind("pubsub-queue2","pubsub-exchange","");
        //增加的部分↑

        //3. 发布消息到exchange,同时指定路由的规则
        //10.for
        for (int i = 0; i < 10; i++) {
            String msg = "Hello-World!"+ i;
            channel.basicPublish("pubsub-exchange","Work",null,msg.getBytes());//
        }

        System.out.println("生产者发布消息成功!");

        //4. 释放资源
        channel.close();
        connection.close();
    }
}

消费者把之前的队列名改成跟生产者绑定的一致即可↓

测试结果,生产者生产10条消息,两个消费者都能收到10条消息↓

4、Routing

一个生产者,一个交换机,两个队列,两个消费者

结构图

 

生产者在创建DIRECT类型的exchange后,根据RoutingKey去绑定相应的队列,并且在发送消息时,指定消息的具体RoutingKey即可。

//3. 创建exchange, routing-queue-error,routing-queue-info,
channel.exchangeDeclare("routing-exchange", BuiltinExchangeType.DIRECT);//DIRECT定向
channel.queueBind("routing-queue-error","routing-exchange","ERROR");
channel.queueBind("routing-queue-info","routing-exchange","INFO");

//4. 发布消息到exchange,同时指定路由的规则
channel.basicPublish("routing-exchange","ERROR",null,"ERROR".getBytes());
channel.basicPublish("routing-exchange","INFO",null,"INFO1".getBytes());
channel.basicPublish("routing-exchange","INFO",null,"INFO2".getBytes());
channel.basicPublish("routing-exchange","INFO",null,"INFO3".getBytes());

消费者没有变化

新建一个routing包,拷贝生产者和消费者过来修改,生产者修改一下交换机和绑定队列名和路由键名字↓

public class Publisher {
    @Test
    public void publish() throws Exception {
        //1. 获取Connection
        Connection connection = RabbitMQClient.getConnection();

        //2. 创建Channel
        Channel channel = connection.createChannel();

        //增加的部分↓
        //3. 创建exchange, routing-queue-error,routing-queue-info,
        channel.exchangeDeclare("routing-exchange", BuiltinExchangeType.DIRECT);
        channel.queueBind("routing-queue-error","routing-exchange","ERROR");
        channel.queueBind("routing-queue-info","routing-exchange","INFO");

        //4. 发布消息到exchange,同时指定路由的规则
        channel.basicPublish("routing-exchange","ERROR",null,"ERROR".getBytes());
        channel.basicPublish("routing-exchange","INFO",null,"INFO1".getBytes());
        channel.basicPublish("routing-exchange","INFO",null,"INFO2".getBytes());
        channel.basicPublish("routing-exchange","INFO",null,"INFO3".getBytes());
        //增加的部分↑

        System.out.println("生产者发布消息成功!");

        //4. 释放资源
        channel.close();
        connection.close();
    }
}

消费者把之前的队列名改成跟生产者绑定的一致即可

测试结果,生产者生产的路由键对应的消息,对应的路由键的队列都能给对应的消费者消费↓

5、Topic

一个生产者,一个交换机,两个队列,两个消费者

结构图

 

生产者创建Topic的exchange并且绑定到队列中,这次绑定可以通过*和#关键字,对指定RoutingKey内容,编写时注意格式 xxx.xxx.xxx去编写, * 代表一个xxx,而#代表多个xxx.xxx,在发送消息时,指定具体的RoutingKey到底是什么。说白了就是匹配一个单词的情况,一般使用#号匹配0个或者多个单词,比如red是一个单词,这里说的是单词而不是字母!!!

public class Publisher {
    @Test
    public void publish() throws Exception {
        //1. 获取Connection
        Connection connection = RabbitMQClient.getConnection();

        //2. 创建Channel
        Channel channel = connection.createChannel();

        //增加的部分↓
        //2. 创建exchange并指定绑定方式,巧记,闪耀明星只有一个,落井下石太多了↓
        channel.exchangeDeclare("topic-exchange", BuiltinExchangeType.TOPIC);
        channel.queueBind("topic-queue-1","topic-exchange","*.red.*");
        channel.queueBind("topic-queue-2","topic-exchange","fast.#");
        channel.queueBind("topic-queue-2","topic-exchange","*.*.rabbit");

        //3. 发布消息到exchange,同时指定路由的规则
        channel.basicPublish("topic-exchange","fast.red.monkey",null,"红快猴子".getBytes());
        channel.basicPublish("topic-exchange","slow.black.dog",null,"黑漫狗".getBytes());
        channel.basicPublish("topic-exchange","fast.white.cat",null,"快白猫".getBytes());
        //增加的部分↑

        System.out.println("生产者发布消息成功!");

        //4. 释放资源
        channel.close();
        connection.close();
    }
}

消费者只是监听队列,没变化。

消费者把之前的队列名改成跟生产者绑定的一致即可

1)Topic模式是最常用的模式,灵活,方便,强大; 2)使用Topic模式生产者在声明队列时需要制定消息到达队列方式为topic; 3)路由键和某模式匹配,主要有两种模糊匹配:

*匹配一个单词的情况,一般使用#号匹配0个或者多个单词,巧记,闪耀明星只有一个,落井下石太多了↓

// 创建交换机
channel.exchangeDeclare("exchange_topic",BuiltinExchangeType.TOPIC);

// 创建队列
channel.queueDeclare("topic-emp",true,false,false,null);
channel.queueDeclare("topic-dept",true,false,false,null);

// 交换机绑定队列,设置路由规则
channel.queueBind("topic-emp","exchange_topic","emp.*");
channel.queueBind("topic-dept","exchange_topic","*.update");
channel.queueBind("topic-dept","exchange_topic","#.delete");

// 发布消息
channel.basicPublish("exchange_topic","emp.add",null,"emp.add".getBytes("utf-8"));
channel.basicPublish("exchange_topic","emp.add.10",null,"emp.add.10".getBytes("utf-8"));
channel.basicPublish("exchange_topic","dept.update.10",null,"dept.update.10".getBytes("utf-8"));
channel.basicPublish("exchange_topic","20.emp.delete",null,"20.emp.delete".getBytes("utf-8"));

消费者只是监听队列,没变化。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/729405.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号