栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

RabbitMQ入门实例

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

RabbitMQ入门实例

1.RabbitMQ工作原理

1.1 术语

            Broker:消息队列服务进程,此进程包括Exchange和Queue

            Exchange:消息队列交换机,按一定的规则将消息路由转发到某个队列,对消息进行过滤

            Queue:消息队列,存储消息的队列,消息到达队列并转发给指定的消费方

            Producer:消息生产者,生产者通过通道将消息发送Broker

            Consumer:消息消费者,接收消息队列转发的消息

    1.2 发布流程

            i.生产者和broker建立tcp连接

            ii.生产者和broker建立通道

            iii.生产者通过通道将消息发送给broker,由Exchange(消息交换机)将消息进行转发到指定的queue(队列)

    1.3 接收流程

            i.消费者和broker建立tcp连接

            ii.消费者和broker建立通道

            iii.消费者监听指定的queue(队列),当有消息到达queue时,broker默认将消息推送给消费者,消费者接收消息

2.添加依赖


    com.rabbitmq
    amqp-client
    5.14.0 

3.生产者代码        

package com.ilearn.prducer;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class ProducerMsg {

    public static void main(String[] args) throws Exception {
        //创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");

        //设置连接的虚拟机(这里连接默认的虚拟机)
        connectionFactory.setVirtualHost("/");

        Connection connection = null;
        Channel channel = null;
        try {
            //生产者建立tcp连接
            connection = connectionFactory.newConnection();

            //创建exchange通道(每个连接可以创建多个通道,一个通道代表一个会话)
            channel = connection.createChannel();

            //声明队列(参数1:队列名字,参数2:队列中的消息是否持久化,参数3:队列是否独占此连接,参数4:队列不在使用时是否删除此队列,参数5:队列额外参数设定)
            channel.queueDeclare("test-queue", true, false, false,null);

            //发布消息(参数1:交换机,如不指定会使用默认交换机,参数2:消息的路由key,是用于Exchange(交换机)将消息转发到指定的消息队列,参数3:消息的属性信息,参数4:消息内容)
            channel.basicPublish("","test-queue",null,"我是一条测试消息".getBytes());

            System.out.println("消息发送完成!!!");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            channel.close();
            connection.close();
        }
    }
}

4.生产者执行结果

5.消费者代码

package com.ilearn.consumer;

import com.rabbitmq.client.*;
import java.io.IOException;

public class ConsumerMsg {

    public static void main(String[] args) throws Exception {
        //创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");

        Connection connection = null;
        Channel channel = null;
        try {
            //创建连接
            connection = connectionFactory.newConnection();

            //创建通道
            channel = connection.createChannel();

            //声明队列
            channel.queueDeclare("test-queue",true,false,false,null);

            //创建监听
            DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    //交换机名字
                    String exchange = envelope.getExchange();
                    //路由key
                    String routeKey = envelope.getRoutingKey();
                    //消息id
                    long msgId = envelope.getDeliveryTag();
                    //消息内容
                    String msg = new String(body,"utf-8");
                    System.out.println("交换机" + "t" + "路由key" + "t" + "消息id" + "t" + "消息内容");
                    System.out.println(exchange + "t" + routeKey + "t" + msgId + "t" + msg);
                }
            };

            //监听队列(参数1:监听的队列,参数2:是否自动给消息队列发送消息回执,参数3:消息消息的方法)
            channel.basicConsume("test-queue",true,defaultConsumer);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            channel.close();
            connection.close();
        }

    }
}

6.消费者执行结果

7.RabbitMQ六种工作模式

7.1 简单队列模式

            

            只包含一个生产者一个一个消费者,生产者将消息发送给队列中,消费者从队列中消费消息。单生产单消费。

           代码如上所示

7.2 工作队列模式-轮询分发  

                    

            多个消费者绑定在一个队列上,一条信息只能被一个消费者消费,消费者轮询消费队列中的消息。             注:这种模式就是简单队列模式情况下,起两个消费者就可以了。 7.3 工作队列模式-公平分发         多个消费者绑定在一个队列上,一条信息只能被一个消费者消费,消费者效率高的消费更多的消息。

        代码(只需要调整消费者代码即可):

        a)手动设置消息被消费了

              handleDelivery方法增加如下代码:

                channel.basicAck(deliveryTag,false);

        b)消费者消息应答模式修改为手动回复

                channel.basicConsume("test-queue",false,defaultConsumer);

        c) 创建通道以后,设置消费者每次接收的消息数

                channel.Qos(n);//n为数值
               

 

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/591691.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号