栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

RabbitMQ入门

RabbitMQ入门

rabbitmq是消息传递的中间组件,使用rabbitmq可以降低程序耦合性,流量消峰以及异步处理。

(1)降低耦合性:如当用户调用下单系统时,服务器故障,需几分钟修复,此时用户依旧可以下单,等系统修复后,再继续处理相应任务。

(2)流量消峰:如双十一时,下单系统会承受几万下单量,使用消息队列做缓冲,将一秒用户分为几部份,有的用户可能需等十几秒才能下单成功,但可以避免服务器宕机。

(3)异步处理:如A服务发送请求调用B服务,A不用等B服务处理完,即直接返回成功到用户,B服务处理完,会发送处理成功消息到mq,mq在发送给A服务,A便能收到异步处理成功消息。

rabbitmq入门案例:

1.启动rabbitmq服务(此处以安装在linux上的rabbitmq讲解)

(1)启动服务

 /sbin/service rabbitmq-server start

(2)查看服务状态

 /sbin/service rabbitmq-server status

   (3)关闭linux防火墙

systemctl stop firewalld

如下图表示启动成功

2.写生产者代码(即发送消息端)

(1)创建队列名称

(2)发送消息

        ①调用ConnectionFactory创建连接工厂

        ②设置工厂连接ip(即linux上的虚拟ip地址),设置用户名和密码

        ③调用newConnection()方法创建连接,调用createChannel()方法创建信道

        ④调用queueDeclare()方法声明队列,该方法有五个参数

          第一个参数:队列名称
          第二个参数:是否需要保存,消息默认存储中内存中,true开启持久化
          第三个参数:队列是否只供一个消费者,是否进行消息共享 ,false只能一个消费者消费
          第四个参数:是否自动删除 消费者断开是否自动删除
          第五个参数:其他参数

        ⑤调用basicPublish()方法发送消息,该方法有四个参数

            第一:发送到哪个交换机,不能为null
            第二:队列名称
            第三 :其他参数信息
            第四: 发送消息的消息体,要调取二进制

        

public class Producer {

    //队列名称
    public static final  String QUEUE_NAME="hello";

    //发送消息
    public static void main(String[] args) throws IOException, TimeoutException {
        //创建链接工厂
        ConnectionFactory factory=new ConnectionFactory();

        //设置工厂ip连接队列
        factory.setHost("192.168.23.111");

        //设置用户名,密码
        factory.setUsername("user");
        factory.setPassword("123");

        //创建连接
        Connection connection = factory.newConnection();

        //获取连接信道
        Channel channel = connection.createChannel();

        //信道连接队列
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);

        //准备发消息
        String  message="hello world";

        //发送消息
        channel.basicPublish("",QUEUE_NAME,null,message.getBytes());

        System.out.println("消息发送完毕");

    }


}

 3.消费者(即接受消息端)

(1)创建队列名称(需要与生产者队列名称一致)

(2)接受消息

        ①创建连接工厂,设置工厂ip,用户和密码(此处与生产者一样)

        ②调用newConnection()方法创建连接,调用createChannel()方法创建信道

        ③接受消息回调函数,第一个参数为消息标记,第二个参数为获取的消息

        ④消息中断回调函数

        ⑤调用basicConsume()方法接受消息,共有四个参数:

               1.消费队列名称
               2.是否自动应答(true代表自动答)
               3.消费者未成功消费回调
               4.消费者取消消费的回调

      

//消费者
public class Consumer {

    //队列名称,需要和生产者队列名字一样
    public static final  String QUEUE_NAME="hello";

    //接受消息
    public static void main(String[] args) throws IOException, TimeoutException {
        //连接工厂
        ConnectionFactory factory = new ConnectionFactory();

        //设置工厂ip连接队列
        factory.setHost("192.168.23.111");

        //设置用户名,密码
        factory.setUsername("user");
        factory.setPassword("123");

        Connection connection = factory.newConnection();

        Channel channel = connection.createChannel();

        //声明,接受消息
        DeliverCallback deliverCallback=(consumerTag,message)->{
            System.out.println(new String(message.getBody()));
        };

        //消息被中断声明
        CancelCallback cancelCallback=(consumerTag)->{
            System.out.println("消息消费被中断");
        };

        //接受消息
        channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);

    }

}

4.测试

启动生产者发送消息

 启动消费者接收消息

测试成功 

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/758414.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号