栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

RabbitMQ学习记录

RabbitMQ学习记录

安装教程:https://blog.csdn.net/weixin_42673046/article/details/118442323

启动等命令
进入到rabbitmq文件目录下sbin文件夹下面执行命令
后台启动: ./rabbitmq-server -detached
查看状态:./rabbitmqctl status
应用和节点都将被关闭:./rabbitmqctl stop
应用启动:./rabbitmqctl start_app
应用关闭节点不关:./rabbitmqctl stop_app
开启管理插件web页面:./rabbitmq-plugins enable rabbitmq_management
查看插件列表状态:./rabbitmq-plugins list

springboot整合rabbitMQ的demo项目放在本地盘中

一 ,rabbitmq中消息的几种模型

第一种:直连
生产者(P) —> 队列 —> 消费者(C)
生产者连接队列发布消息方法

  public static void clientRb() throws IOException, TimeoutException {
  		//rabbitMQ连接步骤
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //参数设置
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);
        //要连接的虚拟主机
        connectionFactory.setVirtualHost("/dmo");
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");

        //创建连接对象
        Connection connection = connectionFactory.newConnection();

        //获取中间通道
        //需要通过中间通道将消息发送到队列
        Channel channel = connection.createChannel();
        //设置消息连接信息 给通道绑定队列
        //参数意思:1.队列名称(主机中没有会自动创建)  
        //2.是否开启持久化(保存队列信息) 此处只是队列持久化,消息不会持久化保存 不开启每次重启rabbitmq会删掉之前的队列
        //3.是否独占队列(只允许当前客户端向队列发送消息)
        //4.是否在消息消费完成后自动删除当前队列(消费者消费完并且断开连接才会删除)      
        //5.附加参数
        channel.queueDeclare("helloRb",false,false,false,null);

        //直连模式中其实不绑定队列也可以直接发布消息,因为下面发布消息的方法中参数路由key对应着队列名称
        //直连模式下消息直接发布到AMQP default默认交换机中,默认交换机与所有队列建立隐含绑定关系,路由的key就是队列名
        //所以在发布方法中直接写上队列名称也可以直接发送到队列中

        //发布消息
        //参数意思:1.交换机名称  2.路由key:因为默认交换机绑定了所有队列,在直连模式中可以直接送到队列中  3.发布消息的额外设置  4.发布消息集体内容
        //MessageProperties.PERSISTENT_TEXT_PLAIN  --开启消息持久化
        channel.basicPublish("", "helloRb", MessageProperties.PERSISTENT_TEXT_PLAIN, "hello rabbitMQ3".getBytes());



        channel.close();
        connection.close();
    }

消费者方法

   //在main函数中运行,因为方法调用时运行结束会直接杀死线程
    public static void main(String[] args) throws IOException, TimeoutException {
        //rabbitMQ连接步骤
        //前面的连接步骤 都一样
        //参数设置
        RabbitMQConnectionUtil rabbitMQConnectionUtil = new RabbitMQConnectionUtil();

        Connection connection = rabbitMQConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        //这里的参数要与队列一致 否则会报错
        //例如生产者新建的是一个消息开启持久化的队列,消费者这里绑定也要设置成持久化才能接收,否则报错
        channel.queueDeclare("helloRb", true, false, false, null);

        //获取/消费消息
        //参数意思: 1.消息队列名称 
  		//2.是否自动确认(此处为true)  只有确认了的消息才认为被消费完成了
        //3.消费回调可以在其中获取到消息内容等操作
        channel.basicConsume("helloRb", true, new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //super.handleDelivery(consumerTag, envelope, properties, body);
                System.out.println("获取到一条消息:"+ new String(body));
            }
        });
//这两行会直接关掉连接收不到消息
//        channel.close();
//        connection.close();
    }

封装的工具类

public class RabbitMQConnectionUtil {

    //对于较重量级的资源放到静态代码块只创建加载一次
    private static ConnectionFactory connectionFactory;
    static {
        connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/dmo");
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");
    }

    //连接方法封装
    public Connection getConnection() throws IOException, TimeoutException {
        try {
            Connection connection = connectionFactory.newConnection();
            return connection;
        }catch (Exception e){
            System.out.println(e.getMessage());
        }
        return null;
    }

    //关闭
    public void closeConnection(Channel channel, Connection connection){
        try {
            if(channel != null && connection != null){
                channel.close();
                connection.close();
            }
        }catch (Exception e){
            System.out.println(e.getMessage());
        }
    }

}

第二种:工作队列(Work Queue)
模型:

这种会有多个消费者订阅了这个消息队列,每个消费者会收到相同数量的消息,消费方式的机制为轮询,即:A第一条 B第二条 C第三条 A第四条 B第五条…这种方式来消费消息,每个消费者都会收到相同数量的消息,多余的按顺序消费。
即使某个消费者消费速度慢,执行速度慢,也会按照轮询来消费。

第二种修改:工作队列之能者多劳模式
模型与上面一样
假设有消费者A,B。消费者A执行速度很慢,B是正常速度,使用此模式B会消费大部分消息,A消费小部分
开启此模式消费者代码
1.通道设置每次只消费一个消息
2.关闭自动确认消息
3.开启手动确认消息

 public static void main(String[] args) throws IOException, TimeoutException {
        //rabbitMQ连接步骤
        //前面的连接步骤 都一样
        //参数设置
        RabbitMQConnectionUtil rabbitMQConnectionUtil = new RabbitMQConnectionUtil();

        Connection connection = rabbitMQConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        //设置每次只能消费一个消息
        channel.basicQos(1);
        //这里的参数要与队列一致 否则会报错
        //例如生产者新建的是一个消息开启持久化的队列,消费者这里绑定也要设置成持久化才能接收,否则报错
        channel.queueDeclare("workQue", true, false, false, null);

        //获取/消费消息
        //参数意思: 1.消息队列名称  
        //2.是否自动确认(此处改为false)  只有确认了的消息才认为被消费完成了
        //3.消费回调可以在其中获取到消息内容等操作
        channel.basicConsume("workQue", false, new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                try {
                    Thread.sleep(2000);
                }catch (Exception e){
                    e.printStackTrace();
                }
                System.out.println("获取到一条消息:"+ new String(body));
                //参数意思
                //1.确认标识  
                //2.是否开启确认多个消息,否一次一个 此处每次消费一个消息,true,false都一样
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        });

    }

第三种:fanout模式/广播模式
模型:

此时生产者只能将消息发送到交换机中,由交换机来决定发送到哪个队列,每个消费者都有自己的队列(由交换机生成),每个队列都绑定了交换机,交换机会把消息发给所有的队列,实现一个消息被多个消费者消费。

生产者代码:

  public static void clientRb() throws IOException, TimeoutException {
        Connection connection = RabbitMQConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        //1.交换机名称  2.交换机类型 fanout广播类型
        channel.exchangeDeclare("register","fanout");

        //此处就需要指定交换机名称
        channel.basicPublish("register","",null,"fanout type msg".getBytes());

        RabbitMQConnectionUtil.closeConnection(channel, connection);
    }

消费者代码:

   //在main函数中运行,因为方法调用时运行结束会直接杀死线程
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = RabbitMQConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        //通道绑定交换机
        channel.exchangeDeclare("register", "fanout");
        //获取临时队列名称
        //fanout类型交换机会为每个消费者生成一个对应的临时队列
        String queueName = channel.queueDeclare().getQueue();
        //绑定队列和交换机
        channel.queueBind(queueName,"register","");

        channel.basicConsume(queueName, true, new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("consumer_A:"+ new String(body));
            }
        });
//这两行会直接关掉连接收不到消息
//        channel.close();
//        connection.close();
    }

第四种:Routing模式/路由模式
交换机模式为direct模式
模型:

此时队列与交换机之间的绑定就不是直接绑定了,而是根据路由来绑定队列,即需要指定一个RoutingKey。
生产者在发送消息到交换机中时也需要指定RoutingKey
交换机会根据RoutingKey来判断该把消息放到哪条队列中

生产者方法:

   public static void clientRb() throws IOException, TimeoutException {
        Connection connection = RabbitMQConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        //1.交换机名称  2.交换机类型 direct直连类型
        channel.exchangeDeclare("register_direct","direct");

        String routingKey = "error";
        //此处就需要指定交换机名称与路由key
        channel.basicPublish("register_direct",routingKey,null,("routing msg routingKey = "+routingKey).getBytes());

        RabbitMQConnectionUtil.closeConnection(channel, connection);
    }

消费者A方法:

  public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = RabbitMQConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        //声明交换机 以及类型
        channel.exchangeDeclare("register_direct","direct");
        //获取临时队列
        String queueName = channel.queueDeclare().getQueue();
        //绑定临时队列以及交换机
        //可以绑定多个队列
        //参数 1.队列名称 2.交换机名称  3.路由key
        channel.queueBind(queueName,"register_direct","error");
        channel.basicConsume(queueName, true, new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("consumerB收到消息:" + new String(body));
            }

        });
    }

消费者B方法:
绑定了多个队列路由key

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = RabbitMQConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        //声明交换机
        channel.exchangeDeclare("register_direct", "direct");
        //获取临时队列
        String queueName = channel.queueDeclare().getQueue();
        //绑定交换机与临时队列
        channel.queueBind(queueName,"register_direct","error");
        channel.queueBind(queueName,"register_direct","info");
        channel.queueBind(queueName,"register_direct","warning");

        channel.basicConsume(queueName, true, new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("consumerB收到消息:" + new String(body));
            }

        });
    }

第五种:topic模式/主题模式/动态路由
模型:

基本与上一个模式相同,不同点就是routingKey在这种模式中可以使用通配符,可以做到一次性绑定多个队列。
通配符:
*:匹配一个单词
#:匹配多个单词
例:user.*可以匹配user.save、user.ok,但是不能匹配user.save.good
user.#可以匹配user.save.good等,不能匹配del.user
生产者方法:

   private static void creatMQ() throws IOException, TimeoutException {
        Connection connection = RabbitMQConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare("topics","topic");
        String routingKey = "user.save.ok";
        channel.basicPublish("topics",routingKey,null,"this is topic msg".getBytes());
        RabbitMQConnectionUtil.closeConnection(channel, connection);
    }

消费者方法:

   public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = RabbitMQConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        //声明交换机
        channel.exchangeDeclare("topics", "topic");
        String queueName = channel.queueDeclare().getQueue();
        //绑定队列和交换机,此处routingkey可以以通配符形式设置
        // *:代表匹配一个单词 #:匹配多个单词
        channel.queueBind(queueName, "topics", "user.*");
        channel.basicConsume(queueName,true, new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("consumerA收到:" + new String(body));
            }
        });
    }
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/389030.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号